dicom_ul/association/
pdata.rs

1use std::{
2    collections::VecDeque,
3    io::{Read, Write},
4};
5
6use tracing::warn;
7
8use crate::{pdu::reader::PDU_HEADER_SIZE, read_pdu, Pdu};
9
10/// A P-Data value writer.
11///
12/// This exposes an API to iteratively construct and send Data messages
13/// to another node.
14/// Using this as a [standard writer](std::io::Write)
15/// will automatically split the incoming bytes
16/// into separate PDUs if they do not fit in a single one.
17///
18/// # Example
19///
20/// Use an association's `send_pdata` method
21/// to create a new P-Data value writer.
22///
23/// ```no_run
24/// # use std::io::Write;
25/// # use dicom_ul::association::{ClientAssociationOptions, PDataWriter};
26/// # use dicom_ul::pdu::{Pdu, PDataValue, PDataValueType};
27/// # fn command_data() -> Vec<u8> { unimplemented!() }
28/// # fn dicom_data() -> &'static [u8] { unimplemented!() }
29/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
30/// let mut association = ClientAssociationOptions::new()
31///    .establish("129.168.0.5:104")?;
32///
33/// let presentation_context_id = association.presentation_contexts()[0].id;
34///
35/// // send a command first
36/// association.send(&Pdu::PData {
37///     data: vec![PDataValue {
38///     presentation_context_id,
39///     value_type: PDataValueType::Command,
40///         is_last: true,
41///         data: command_data(),
42///     }],
43/// });
44///
45/// // then send a DICOM object which may be split into multiple PDUs
46/// let mut pdata = association.send_pdata(presentation_context_id);
47/// pdata.write_all(dicom_data())?;
48/// pdata.finish()?;
49///
50/// let pdu_ac = association.receive()?;
51/// # Ok(())
52/// # }
53#[must_use]
54pub struct PDataWriter<W: Write> {
55    buffer: Vec<u8>,
56    stream: W,
57    max_data_len: u32,
58}
59
60impl<W> PDataWriter<W>
61where
62    W: Write,
63{
64    /// Construct a new P-Data value writer.
65    ///
66    /// `max_pdu_length` is the maximum value of the PDU-length property.
67    pub(crate) fn new(stream: W, presentation_context_id: u8, max_pdu_length: u32) -> Self {
68        let max_data_length = calculate_max_data_len_single(max_pdu_length);
69        let mut buffer = Vec::with_capacity((max_data_length + PDU_HEADER_SIZE) as usize);
70        // initial buffer set up
71        buffer.extend([
72            // PDU-type + reserved byte
73            0x04,
74            0x00,
75            // full PDU length, unknown at this point
76            0xFF,
77            0xFF,
78            0xFF,
79            0xFF,
80            // presentation data length, unknown at this point
81            0xFF,
82            0xFF,
83            0xFF,
84            0xFF,
85            // presentation context id
86            presentation_context_id,
87            // message control header, unknown at this point
88            0xFF,
89        ]);
90
91        PDataWriter {
92            stream,
93            max_data_len: max_data_length,
94            buffer,
95        }
96    }
97
98    /// Declare to have finished sending P-Data fragments,
99    /// thus emitting the last P-Data fragment PDU.
100    ///
101    /// This is also done automatically once the P-Data writer is dropped.
102    pub fn finish(mut self) -> std::io::Result<()> {
103        self.finish_impl()?;
104        Ok(())
105    }
106
107    /// Set up the P-Data PDU header for sending.
108    fn setup_pdata_header(&mut self, is_last: bool) {
109        let data_len = (self.buffer.len() - 12) as u32;
110
111        // full PDU length (minus PDU type and reserved byte)
112        let pdu_len = data_len + 4 + 2;
113        let pdu_len_bytes = pdu_len.to_be_bytes();
114
115        self.buffer[2] = pdu_len_bytes[0];
116        self.buffer[3] = pdu_len_bytes[1];
117        self.buffer[4] = pdu_len_bytes[2];
118        self.buffer[5] = pdu_len_bytes[3];
119
120        // presentation data length (data + 2 properties below)
121        let pdv_data_len = data_len + 2;
122        let data_len_bytes = pdv_data_len.to_be_bytes();
123
124        self.buffer[6] = data_len_bytes[0];
125        self.buffer[7] = data_len_bytes[1];
126        self.buffer[8] = data_len_bytes[2];
127        self.buffer[9] = data_len_bytes[3];
128
129        // message control header
130        self.buffer[11] = if is_last { 0x02 } else { 0x00 };
131    }
132
133    fn finish_impl(&mut self) -> std::io::Result<()> {
134        if !self.buffer.is_empty() {
135            // send last PDU
136            self.setup_pdata_header(true);
137            self.stream.write_all(&self.buffer[..])?;
138            // clear buffer so that subsequent calls to `finish_impl`
139            // do not send any more PDUs
140            self.buffer.clear();
141        }
142        Ok(())
143    }
144
145    /// Use the current state of the buffer to send new PDUs
146    ///
147    /// Pre-condition:
148    /// buffer must have enough data for one P-Data-tf PDU
149    fn dispatch_pdu(&mut self) -> std::io::Result<()> {
150        debug_assert!(self.buffer.len() >= 12);
151        // send PDU now
152        self.setup_pdata_header(false);
153        self.stream.write_all(&self.buffer)?;
154
155        // back to just the header
156        self.buffer.truncate(12);
157
158        Ok(())
159    }
160}
161
162impl<W> Write for PDataWriter<W>
163where
164    W: Write,
165{
166    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
167        let total_len = self.max_data_len as usize + 12;
168        if self.buffer.len() + buf.len() <= total_len {
169            // accumulate into buffer, do nothing
170            self.buffer.extend(buf);
171            Ok(buf.len())
172        } else {
173            // fill in the rest of the buffer, send PDU,
174            // and leave out the rest for subsequent writes
175            let buf = &buf[..total_len - self.buffer.len()];
176            self.buffer.extend(buf);
177            debug_assert_eq!(self.buffer.len(), total_len);
178            self.dispatch_pdu()?;
179            Ok(buf.len())
180        }
181    }
182
183    fn flush(&mut self) -> std::io::Result<()> {
184        // do nothing
185        Ok(())
186    }
187}
188
189/// With the P-Data writer dropped,
190/// this `Drop` implementation
191/// will construct and emit the last P-Data fragment PDU
192/// if there is any data left to send.
193impl<W> Drop for PDataWriter<W>
194where
195    W: Write,
196{
197    fn drop(&mut self) {
198        let _ = self.finish_impl();
199    }
200}
201
202/// A P-Data value reader.
203///
204/// This exposes an API which provides a byte stream of data
205/// by iteratively collecting Data messages from another node.
206/// Using this as a [standard reader](std::io::Read)
207/// will provide all incoming bytes,
208/// even if they reside in separate PDUs,
209/// until the last message is received.
210///
211/// # Example
212///
213/// Use an association's `receive_pdata` method
214/// to create a new P-Data value reader.
215///
216/// ```no_run
217/// # use std::io::Read;
218/// # use dicom_ul::association::{ClientAssociationOptions, PDataReader};
219/// # use dicom_ul::pdu::{Pdu, PDataValue, PDataValueType};
220/// # fn command_data() -> Vec<u8> { unimplemented!() }
221/// # fn dicom_data() -> &'static [u8] { unimplemented!() }
222/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
223/// # let mut association = ClientAssociationOptions::new()
224/// #    .establish("129.168.0.5:104")?;
225///
226/// // expecting a DICOM object which may be split into multiple PDUs,
227/// let mut pdata = association.receive_pdata();
228/// let all_pdata_bytes = {
229///     let mut v = Vec::new();
230///     pdata.read_to_end(&mut v)?;
231///     v
232/// };
233/// # Ok(())
234/// # }
235#[must_use]
236pub struct PDataReader<R> {
237    buffer: VecDeque<u8>,
238    stream: R,
239    presentation_context_id: Option<u8>,
240    max_data_length: u32,
241    last_pdu: bool,
242}
243
244impl<R> PDataReader<R>
245where
246    R: Read,
247{
248    pub fn new(stream: R, max_data_length: u32) -> Self {
249        PDataReader {
250            buffer: VecDeque::with_capacity(max_data_length as usize),
251            stream,
252            presentation_context_id: None,
253            max_data_length,
254            last_pdu: false,
255        }
256    }
257
258    /// Declare no intention to read more PDUs from the remote node.
259    ///
260    /// Attempting to read more bytes
261    /// will only consume the inner buffer and not result in
262    /// more PDUs being received.
263    pub fn stop_receiving(&mut self) -> std::io::Result<()> {
264        self.last_pdu = true;
265        Ok(())
266    }
267}
268
269impl<R> Read for PDataReader<R>
270where
271    R: Read,
272{
273    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
274        if self.buffer.is_empty() {
275            if self.last_pdu {
276                // reached the end of PData stream
277                return Ok(0);
278            }
279
280            let pdu = read_pdu(&mut self.stream, self.max_data_length, false)
281                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
282
283            match pdu {
284                Pdu::PData { data } => {
285                    for pdata_value in data {
286                        self.presentation_context_id = match self.presentation_context_id {
287                            None => Some(pdata_value.presentation_context_id),
288                            Some(cid) if cid == pdata_value.presentation_context_id => Some(cid),
289                            Some(cid) => {
290                                warn!("Received PData value of presentation context {}, but should be {}", pdata_value.presentation_context_id, cid);
291                                Some(cid)
292                            }
293                        };
294                        self.buffer.extend(pdata_value.data);
295                        self.last_pdu = pdata_value.is_last;
296                    }
297                }
298                _ => {
299                    return Err(std::io::Error::new(
300                        std::io::ErrorKind::UnexpectedEof,
301                        "Unexpected PDU type",
302                    ))
303                }
304            }
305        }
306        Read::read(&mut self.buffer, buf)
307    }
308}
309
310/// Determine the maximum length of actual PDV data
311/// when encapsulated in a PDU with the given length property.
312/// Does not account for the first 2 bytes (type + reserved).
313#[inline]
314fn calculate_max_data_len_single(pdu_len: u32) -> u32 {
315    // data length: 4 bytes
316    // control header: 2 bytes
317    pdu_len - 4 - 2
318}
319
320#[cfg(test)]
321mod tests {
322    use std::collections::VecDeque;
323    use std::io::{Read, Write};
324
325    use crate::pdu::reader::{read_pdu, MINIMUM_PDU_SIZE, PDU_HEADER_SIZE};
326    use crate::pdu::Pdu;
327    use crate::pdu::{PDataValue, PDataValueType};
328    use crate::write_pdu;
329
330    use super::{PDataReader, PDataWriter};
331
332    #[test]
333    fn test_write_pdata_and_finish() {
334        let presentation_context_id = 12;
335
336        let mut buf = Vec::new();
337        {
338            let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
339            writer.write_all(&(0..64).collect::<Vec<u8>>()).unwrap();
340            writer.finish().unwrap();
341        }
342
343        let mut cursor = &buf[..];
344        let same_pdu = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
345
346        // concatenate data chunks, compare with all data
347
348        match same_pdu {
349            Pdu::PData { data: data_1 } => {
350                let data_1 = &data_1[0];
351
352                // check that this PDU is consistent
353                assert_eq!(data_1.value_type, PDataValueType::Data);
354                assert_eq!(data_1.presentation_context_id, presentation_context_id);
355                assert_eq!(data_1.data.len(), 64);
356                assert_eq!(data_1.data, (0..64).collect::<Vec<u8>>());
357            }
358            pdu => panic!("Expected PData, got {:?}", pdu),
359        }
360
361        assert_eq!(cursor.len(), 0);
362    }
363
364    #[test]
365    fn test_write_large_pdata_and_finish() {
366        let presentation_context_id = 32;
367
368        let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
369
370        let mut buf = Vec::new();
371        {
372            let mut writer = PDataWriter::new(&mut buf, presentation_context_id, MINIMUM_PDU_SIZE);
373            writer.write_all(&my_data).unwrap();
374            writer.finish().unwrap();
375        }
376
377        let mut cursor = &buf[..];
378        let pdu_1 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
379        let pdu_2 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
380        let pdu_3 = read_pdu(&mut cursor, MINIMUM_PDU_SIZE, true).unwrap();
381
382        // concatenate data chunks, compare with all data
383
384        match (pdu_1, pdu_2, pdu_3) {
385            (
386                Pdu::PData { data: data_1 },
387                Pdu::PData { data: data_2 },
388                Pdu::PData { data: data_3 },
389            ) => {
390                assert_eq!(data_1.len(), 1);
391                let data_1 = &data_1[0];
392                assert_eq!(data_2.len(), 1);
393                let data_2 = &data_2[0];
394                assert_eq!(data_3.len(), 1);
395                let data_3 = &data_3[0];
396
397                // check that these two PDUs are consistent
398                assert_eq!(data_1.value_type, PDataValueType::Data);
399                assert_eq!(data_2.value_type, PDataValueType::Data);
400                assert_eq!(data_1.presentation_context_id, presentation_context_id);
401                assert_eq!(data_2.presentation_context_id, presentation_context_id);
402
403                // check expected lengths
404                assert_eq!(
405                    data_1.data.len(),
406                    (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
407                );
408                assert_eq!(
409                    data_2.data.len(),
410                    (MINIMUM_PDU_SIZE - PDU_HEADER_SIZE) as usize
411                );
412                assert_eq!(data_3.data.len(), 820);
413
414                // check data consistency
415                assert_eq!(
416                    &data_1.data[..],
417                    (0..MINIMUM_PDU_SIZE - PDU_HEADER_SIZE)
418                        .map(|x| x as u8)
419                        .collect::<Vec<_>>()
420                );
421                assert_eq!(
422                    data_1.data.len() + data_2.data.len() + data_3.data.len(),
423                    9000
424                );
425
426                let data_1 = &data_1.data;
427                let data_2 = &data_2.data;
428                let data_3 = &data_3.data;
429
430                let mut all_data: Vec<u8> = Vec::new();
431                all_data.extend(data_1);
432                all_data.extend(data_2);
433                all_data.extend(data_3);
434                assert_eq!(all_data, my_data);
435            }
436            x => panic!("Expected 3 PDatas, got {:?}", x),
437        }
438
439        assert_eq!(cursor.len(), 0);
440    }
441
442    #[test]
443    fn test_read_large_pdata_and_finish() {
444        let presentation_context_id = 32;
445
446        let my_data: Vec<_> = (0..9000).map(|x: u32| x as u8).collect();
447        let pdata_1 = vec![PDataValue {
448            value_type: PDataValueType::Data,
449            data: my_data[0..3000].to_owned(),
450            presentation_context_id,
451            is_last: false,
452        }];
453        let pdata_2 = vec![PDataValue {
454            value_type: PDataValueType::Data,
455            data: my_data[3000..6000].to_owned(),
456            presentation_context_id,
457            is_last: false,
458        }];
459        let pdata_3 = vec![PDataValue {
460            value_type: PDataValueType::Data,
461            data: my_data[6000..].to_owned(),
462            presentation_context_id,
463            is_last: true,
464        }];
465
466        let mut pdu_stream = VecDeque::new();
467
468        // write some PDUs
469        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_1 }).unwrap();
470        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_2 }).unwrap();
471        write_pdu(&mut pdu_stream, &Pdu::PData { data: pdata_3 }).unwrap();
472
473        let mut buf = Vec::new();
474        {
475            let mut reader = PDataReader::new(&mut pdu_stream, MINIMUM_PDU_SIZE);
476            reader.read_to_end(&mut buf).unwrap();
477        }
478        assert_eq!(buf, my_data);
479    }
480}