multer/
multipart.rs

1use std::sync::Arc;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use futures_util::future;
6use futures_util::stream::{Stream, TryStreamExt};
7use spin::mutex::spin::SpinMutex as Mutex;
8#[cfg(feature = "tokio-io")]
9use {tokio::io::AsyncRead, tokio_util::io::ReaderStream};
10
11use crate::buffer::StreamBuffer;
12use crate::constraints::Constraints;
13use crate::content_disposition::ContentDisposition;
14use crate::error::Error;
15use crate::field::Field;
16use crate::{constants, helpers, Result};
17
18/// Represents the implementation of `multipart/form-data` formatted data.
19///
20/// This will parse the source stream into [`Field`] instances via
21/// [`next_field()`](Self::next_field).
22///
23/// # Field Exclusivity
24///
25/// A `Field` represents a raw, self-decoding stream into multipart data. As
26/// such, only _one_ `Field` from a given `Multipart` instance may be live at
27/// once. That is, a `Field` emitted by `next_field()` must be dropped before
28/// calling `next_field()` again. Failure to do so will result in an error.
29///
30/// ```rust
31/// use std::convert::Infallible;
32///
33/// use bytes::Bytes;
34/// use futures_util::stream::once;
35/// use multer::Multipart;
36///
37/// # async fn run() {
38/// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
39///     name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
40///
41/// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
42/// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
43///
44/// let field1 = multipart.next_field().await;
45/// let field2 = multipart.next_field().await;
46///
47/// assert!(field2.is_err());
48/// # }
49/// # tokio::runtime::Runtime::new().unwrap().block_on(run());
50/// ```
51///
52/// # Examples
53///
54/// ```
55/// use std::convert::Infallible;
56///
57/// use bytes::Bytes;
58/// use futures_util::stream::once;
59/// use multer::Multipart;
60///
61/// # async fn run() {
62/// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
63///     name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
64///
65/// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
66/// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
67///
68/// while let Some(field) = multipart.next_field().await.unwrap() {
69///     println!("Field: {:?}", field.text().await)
70/// }
71/// # }
72/// # tokio::runtime::Runtime::new().unwrap().block_on(run());
73/// ```
74#[derive(Debug)]
75pub struct Multipart<'r> {
76    state: Arc<Mutex<MultipartState<'r>>>,
77}
78
79#[derive(Debug)]
80pub(crate) struct MultipartState<'r> {
81    pub(crate) buffer: StreamBuffer<'r>,
82    pub(crate) boundary: String,
83    pub(crate) stage: StreamingStage,
84    pub(crate) next_field_idx: usize,
85    pub(crate) curr_field_name: Option<String>,
86    pub(crate) curr_field_size_limit: u64,
87    pub(crate) curr_field_size_counter: u64,
88    pub(crate) constraints: Constraints,
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub(crate) enum StreamingStage {
93    FindingFirstBoundary,
94    ReadingBoundary,
95    DeterminingBoundaryType,
96    ReadingTransportPadding,
97    ReadingFieldHeaders,
98    ReadingFieldData,
99    Eof,
100}
101
102impl<'r> Multipart<'r> {
103    /// Construct a new `Multipart` instance with the given [`Bytes`] stream and
104    /// the boundary.
105    pub fn new<S, O, E, B>(stream: S, boundary: B) -> Self
106    where
107        S: Stream<Item = Result<O, E>> + Send + 'r,
108        O: Into<Bytes> + 'static,
109        E: Into<Box<dyn std::error::Error + Send + Sync>> + 'r,
110        B: Into<String>,
111    {
112        Multipart::with_constraints(stream, boundary, Constraints::default())
113    }
114
115    /// Construct a new `Multipart` instance with the given [`Bytes`] stream and
116    /// the boundary.
117    pub fn with_constraints<S, O, E, B>(stream: S, boundary: B, constraints: Constraints) -> Self
118    where
119        S: Stream<Item = Result<O, E>> + Send + 'r,
120        O: Into<Bytes> + 'static,
121        E: Into<Box<dyn std::error::Error + Send + Sync>> + 'r,
122        B: Into<String>,
123    {
124        let stream = stream
125            .map_ok(|b| b.into())
126            .map_err(|err| Error::StreamReadFailed(err.into()));
127
128        Multipart {
129            state: Arc::new(Mutex::new(MultipartState {
130                buffer: StreamBuffer::new(stream, constraints.size_limit.whole_stream),
131                boundary: boundary.into(),
132                stage: StreamingStage::FindingFirstBoundary,
133                next_field_idx: 0,
134                curr_field_name: None,
135                curr_field_size_limit: constraints.size_limit.per_field,
136                curr_field_size_counter: 0,
137                constraints,
138            })),
139        }
140    }
141
142    /// Construct a new `Multipart` instance with the given [`AsyncRead`] reader
143    /// and the boundary.
144    ///
145    /// # Optional
146    ///
147    /// This requires the optional `tokio-io` feature to be enabled.
148    ///
149    /// # Examples
150    ///
151    /// ```
152    /// use multer::Multipart;
153    ///
154    /// # async fn run() {
155    /// let data =
156    ///     "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
157    /// let reader = data.as_bytes();
158    /// let mut multipart = Multipart::with_reader(reader, "X-BOUNDARY");
159    ///
160    /// while let Some(mut field) = multipart.next_field().await.unwrap() {
161    ///     while let Some(chunk) = field.chunk().await.unwrap() {
162    ///         println!("Chunk: {:?}", chunk);
163    ///     }
164    /// }
165    /// # }
166    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
167    /// ```
168    #[cfg(feature = "tokio-io")]
169    #[cfg_attr(nightly, doc(cfg(feature = "tokio-io")))]
170    pub fn with_reader<R, B>(reader: R, boundary: B) -> Self
171    where
172        R: AsyncRead + Unpin + Send + 'r,
173        B: Into<String>,
174    {
175        let stream = ReaderStream::new(reader);
176        Multipart::new(stream, boundary)
177    }
178
179    /// Construct a new `Multipart` instance with the given [`AsyncRead`] reader
180    /// and the boundary.
181    ///
182    /// # Optional
183    ///
184    /// This requires the optional `tokio-io` feature to be enabled.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// use multer::Multipart;
190    ///
191    /// # async fn run() {
192    /// let data =
193    ///     "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
194    /// let reader = data.as_bytes();
195    /// let mut multipart = Multipart::with_reader(reader, "X-BOUNDARY");
196    ///
197    /// while let Some(mut field) = multipart.next_field().await.unwrap() {
198    ///     while let Some(chunk) = field.chunk().await.unwrap() {
199    ///         println!("Chunk: {:?}", chunk);
200    ///     }
201    /// }
202    /// # }
203    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
204    /// ```
205    #[cfg(feature = "tokio-io")]
206    #[cfg_attr(nightly, doc(cfg(feature = "tokio-io")))]
207    pub fn with_reader_with_constraints<R, B>(reader: R, boundary: B, constraints: Constraints) -> Self
208    where
209        R: AsyncRead + Unpin + Send + 'r,
210        B: Into<String>,
211    {
212        let stream = ReaderStream::new(reader);
213        Multipart::with_constraints(stream, boundary, constraints)
214    }
215
216    /// Yields the next [`Field`] if available.
217    ///
218    /// Any previous `Field` returned by this method must be dropped before
219    /// calling this method or [`Multipart::next_field_with_idx()`] again. See
220    /// [field-exclusivity](#field-exclusivity) for details.
221    pub async fn next_field(&mut self) -> Result<Option<Field<'r>>> {
222        future::poll_fn(|cx| self.poll_next_field(cx)).await
223    }
224
225    /// Yields the next [`Field`] if available.
226    ///
227    /// Any previous `Field` returned by this method must be dropped before
228    /// calling this method or [`Multipart::next_field_with_idx()`] again. See
229    /// [field-exclusivity](#field-exclusivity) for details.
230    ///
231    /// This method is available since version 2.1.0.
232    pub fn poll_next_field(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Field<'r>>>> {
233        // This is consistent as we have an `&mut` and `Field` is not `Clone`.
234        // Here, we are guaranteeing that the returned `Field` will be the
235        // _only_ field with access to the multipart parsing state. This ensure
236        // that lock failure can never occur. This is effectively a dynamic
237        // version of passing an `&mut` of `self` to the `Field`.
238        if Arc::strong_count(&self.state) != 1 {
239            return Poll::Ready(Err(Error::LockFailure));
240        }
241
242        debug_assert_eq!(Arc::strong_count(&self.state), 1);
243        debug_assert!(self.state.try_lock().is_some(), "expected exlusive lock");
244        let mut lock = match self.state.try_lock() {
245            Some(lock) => lock,
246            None => return Poll::Ready(Err(Error::LockFailure)),
247        };
248
249        let state = &mut *lock;
250        if state.stage == StreamingStage::Eof {
251            return Poll::Ready(Ok(None));
252        }
253
254        state.buffer.poll_stream(cx)?;
255
256        if state.stage == StreamingStage::FindingFirstBoundary {
257            let boundary = &state.boundary;
258            let boundary_deriv = format!("{}{}", constants::BOUNDARY_EXT, boundary);
259            match state.buffer.read_to(boundary_deriv.as_bytes()) {
260                Some(_) => state.stage = StreamingStage::ReadingBoundary,
261                None => {
262                    state.buffer.poll_stream(cx)?;
263                    if state.buffer.eof {
264                        return Poll::Ready(Err(Error::IncompleteStream));
265                    }
266                }
267            }
268        }
269
270        // The previous field did not finish reading its data.
271        if state.stage == StreamingStage::ReadingFieldData {
272            match state
273                .buffer
274                .read_field_data(state.boundary.as_str(), state.curr_field_name.as_deref())?
275            {
276                Some((done, bytes)) => {
277                    state.curr_field_size_counter += bytes.len() as u64;
278
279                    if state.curr_field_size_counter > state.curr_field_size_limit {
280                        return Poll::Ready(Err(Error::FieldSizeExceeded {
281                            limit: state.curr_field_size_limit,
282                            field_name: state.curr_field_name.clone(),
283                        }));
284                    }
285
286                    if done {
287                        state.stage = StreamingStage::ReadingBoundary;
288                    } else {
289                        return Poll::Pending;
290                    }
291                }
292                None => {
293                    return Poll::Pending;
294                }
295            }
296        }
297
298        if state.stage == StreamingStage::ReadingBoundary {
299            let boundary = &state.boundary;
300            let boundary_deriv_len = constants::BOUNDARY_EXT.len() + boundary.len();
301
302            let boundary_bytes = match state.buffer.read_exact(boundary_deriv_len) {
303                Some(bytes) => bytes,
304                None => {
305                    return if state.buffer.eof {
306                        Poll::Ready(Err(Error::IncompleteStream))
307                    } else {
308                        Poll::Pending
309                    };
310                }
311            };
312
313            if &boundary_bytes[..] == format!("{}{}", constants::BOUNDARY_EXT, boundary).as_bytes() {
314                state.stage = StreamingStage::DeterminingBoundaryType;
315            } else {
316                return Poll::Ready(Err(Error::IncompleteStream));
317            }
318        }
319
320        if state.stage == StreamingStage::DeterminingBoundaryType {
321            let ext_len = constants::BOUNDARY_EXT.len();
322            let next_bytes = match state.buffer.peek_exact(ext_len) {
323                Some(bytes) => bytes,
324                None => {
325                    return if state.buffer.eof {
326                        Poll::Ready(Err(Error::IncompleteStream))
327                    } else {
328                        Poll::Pending
329                    };
330                }
331            };
332
333            if next_bytes == constants::BOUNDARY_EXT.as_bytes() {
334                state.stage = StreamingStage::Eof;
335                return Poll::Ready(Ok(None));
336            } else {
337                state.stage = StreamingStage::ReadingTransportPadding;
338            }
339        }
340
341        if state.stage == StreamingStage::ReadingTransportPadding {
342            if !state.buffer.advance_past_transport_padding() {
343                return if state.buffer.eof {
344                    Poll::Ready(Err(Error::IncompleteStream))
345                } else {
346                    Poll::Pending
347                };
348            }
349
350            let crlf_len = constants::CRLF.len();
351            let crlf_bytes = match state.buffer.read_exact(crlf_len) {
352                Some(bytes) => bytes,
353                None => {
354                    return if state.buffer.eof {
355                        Poll::Ready(Err(Error::IncompleteStream))
356                    } else {
357                        Poll::Pending
358                    };
359                }
360            };
361
362            if &crlf_bytes[..] == constants::CRLF.as_bytes() {
363                state.stage = StreamingStage::ReadingFieldHeaders;
364            } else {
365                return Poll::Ready(Err(Error::IncompleteStream));
366            }
367        }
368
369        if state.stage == StreamingStage::ReadingFieldHeaders {
370            let header_bytes = match state.buffer.read_until(constants::CRLF_CRLF.as_bytes()) {
371                Some(bytes) => bytes,
372                None => {
373                    return if state.buffer.eof {
374                        return Poll::Ready(Err(Error::IncompleteStream));
375                    } else {
376                        Poll::Pending
377                    };
378                }
379            };
380
381            let mut headers = [httparse::EMPTY_HEADER; constants::MAX_HEADERS];
382
383            let headers = match httparse::parse_headers(&header_bytes, &mut headers).map_err(Error::ReadHeaderFailed)? {
384                httparse::Status::Complete((_, raw_headers)) => {
385                    match helpers::convert_raw_headers_to_header_map(raw_headers) {
386                        Ok(headers) => headers,
387                        Err(err) => {
388                            return Poll::Ready(Err(err));
389                        }
390                    }
391                }
392                httparse::Status::Partial => {
393                    return Poll::Ready(Err(Error::IncompleteHeaders));
394                }
395            };
396
397            state.stage = StreamingStage::ReadingFieldData;
398
399            let field_idx = state.next_field_idx;
400            state.next_field_idx += 1;
401
402            let content_disposition = ContentDisposition::parse(&headers);
403            let field_size_limit = state
404                .constraints
405                .size_limit
406                .extract_size_limit_for(content_disposition.field_name.as_deref());
407
408            state.curr_field_name = content_disposition.field_name.clone();
409            state.curr_field_size_limit = field_size_limit;
410            state.curr_field_size_counter = 0;
411
412            let field_name = content_disposition.field_name.as_deref();
413            if !state.constraints.is_it_allowed(field_name) {
414                return Poll::Ready(Err(Error::UnknownField {
415                    field_name: field_name.map(str::to_owned),
416                }));
417            }
418
419            drop(lock); // The lock will be dropped anyway, but let's be explicit.
420            let field = Field::new(self.state.clone(), headers, field_idx, content_disposition);
421            return Poll::Ready(Ok(Some(field)));
422        }
423
424        Poll::Pending
425    }
426
427    /// Yields the next [`Field`] with their positioning index as a tuple
428    /// `(`[`usize`]`, `[`Field`]`)`.
429    ///
430    /// Any previous `Field` returned by this method must be dropped before
431    /// calling this method or [`Multipart::next_field()`] again. See
432    /// [field-exclusivity](#field-exclusivity) for details.
433    ///
434    /// # Examples
435    ///
436    /// ```
437    /// use std::convert::Infallible;
438    ///
439    /// use bytes::Bytes;
440    /// use futures_util::stream::once;
441    /// use multer::Multipart;
442    ///
443    /// # async fn run() {
444    /// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
445    ///     name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
446    ///
447    /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
448    /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
449    ///
450    /// while let Some((idx, field)) = multipart.next_field_with_idx().await.unwrap() {
451    ///     println!("Index: {:?}, Content: {:?}", idx, field.text().await)
452    /// }
453    /// # }
454    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
455    /// ```
456    pub async fn next_field_with_idx(&mut self) -> Result<Option<(usize, Field<'r>)>> {
457        self.next_field().await.map(|f| f.map(|field| (field.index(), field)))
458    }
459}