multer/multipart.rs
1use std::sync::Arc;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use futures_util::future;
6use futures_util::stream::{Stream, TryStreamExt};
7use spin::mutex::spin::SpinMutex as Mutex;
8#[cfg(feature = "tokio-io")]
9use {tokio::io::AsyncRead, tokio_util::io::ReaderStream};
10
11use crate::buffer::StreamBuffer;
12use crate::constraints::Constraints;
13use crate::content_disposition::ContentDisposition;
14use crate::error::Error;
15use crate::field::Field;
16use crate::{constants, helpers, Result};
17
18/// Represents the implementation of `multipart/form-data` formatted data.
19///
20/// This will parse the source stream into [`Field`] instances via
21/// [`next_field()`](Self::next_field).
22///
23/// # Field Exclusivity
24///
25/// A `Field` represents a raw, self-decoding stream into multipart data. As
26/// such, only _one_ `Field` from a given `Multipart` instance may be live at
27/// once. That is, a `Field` emitted by `next_field()` must be dropped before
28/// calling `next_field()` again. Failure to do so will result in an error.
29///
30/// ```rust
31/// use std::convert::Infallible;
32///
33/// use bytes::Bytes;
34/// use futures_util::stream::once;
35/// use multer::Multipart;
36///
37/// # async fn run() {
38/// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
39/// name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
40///
41/// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
42/// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
43///
44/// let field1 = multipart.next_field().await;
45/// let field2 = multipart.next_field().await;
46///
47/// assert!(field2.is_err());
48/// # }
49/// # tokio::runtime::Runtime::new().unwrap().block_on(run());
50/// ```
51///
52/// # Examples
53///
54/// ```
55/// use std::convert::Infallible;
56///
57/// use bytes::Bytes;
58/// use futures_util::stream::once;
59/// use multer::Multipart;
60///
61/// # async fn run() {
62/// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
63/// name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
64///
65/// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
66/// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
67///
68/// while let Some(field) = multipart.next_field().await.unwrap() {
69/// println!("Field: {:?}", field.text().await)
70/// }
71/// # }
72/// # tokio::runtime::Runtime::new().unwrap().block_on(run());
73/// ```
74#[derive(Debug)]
75pub struct Multipart<'r> {
76 state: Arc<Mutex<MultipartState<'r>>>,
77}
78
79#[derive(Debug)]
80pub(crate) struct MultipartState<'r> {
81 pub(crate) buffer: StreamBuffer<'r>,
82 pub(crate) boundary: String,
83 pub(crate) stage: StreamingStage,
84 pub(crate) next_field_idx: usize,
85 pub(crate) curr_field_name: Option<String>,
86 pub(crate) curr_field_size_limit: u64,
87 pub(crate) curr_field_size_counter: u64,
88 pub(crate) constraints: Constraints,
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub(crate) enum StreamingStage {
93 FindingFirstBoundary,
94 ReadingBoundary,
95 DeterminingBoundaryType,
96 ReadingTransportPadding,
97 ReadingFieldHeaders,
98 ReadingFieldData,
99 Eof,
100}
101
102impl<'r> Multipart<'r> {
103 /// Construct a new `Multipart` instance with the given [`Bytes`] stream and
104 /// the boundary.
105 pub fn new<S, O, E, B>(stream: S, boundary: B) -> Self
106 where
107 S: Stream<Item = Result<O, E>> + Send + 'r,
108 O: Into<Bytes> + 'static,
109 E: Into<Box<dyn std::error::Error + Send + Sync>> + 'r,
110 B: Into<String>,
111 {
112 Multipart::with_constraints(stream, boundary, Constraints::default())
113 }
114
115 /// Construct a new `Multipart` instance with the given [`Bytes`] stream and
116 /// the boundary.
117 pub fn with_constraints<S, O, E, B>(stream: S, boundary: B, constraints: Constraints) -> Self
118 where
119 S: Stream<Item = Result<O, E>> + Send + 'r,
120 O: Into<Bytes> + 'static,
121 E: Into<Box<dyn std::error::Error + Send + Sync>> + 'r,
122 B: Into<String>,
123 {
124 let stream = stream
125 .map_ok(|b| b.into())
126 .map_err(|err| Error::StreamReadFailed(err.into()));
127
128 Multipart {
129 state: Arc::new(Mutex::new(MultipartState {
130 buffer: StreamBuffer::new(stream, constraints.size_limit.whole_stream),
131 boundary: boundary.into(),
132 stage: StreamingStage::FindingFirstBoundary,
133 next_field_idx: 0,
134 curr_field_name: None,
135 curr_field_size_limit: constraints.size_limit.per_field,
136 curr_field_size_counter: 0,
137 constraints,
138 })),
139 }
140 }
141
142 /// Construct a new `Multipart` instance with the given [`AsyncRead`] reader
143 /// and the boundary.
144 ///
145 /// # Optional
146 ///
147 /// This requires the optional `tokio-io` feature to be enabled.
148 ///
149 /// # Examples
150 ///
151 /// ```
152 /// use multer::Multipart;
153 ///
154 /// # async fn run() {
155 /// let data =
156 /// "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
157 /// let reader = data.as_bytes();
158 /// let mut multipart = Multipart::with_reader(reader, "X-BOUNDARY");
159 ///
160 /// while let Some(mut field) = multipart.next_field().await.unwrap() {
161 /// while let Some(chunk) = field.chunk().await.unwrap() {
162 /// println!("Chunk: {:?}", chunk);
163 /// }
164 /// }
165 /// # }
166 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
167 /// ```
168 #[cfg(feature = "tokio-io")]
169 #[cfg_attr(nightly, doc(cfg(feature = "tokio-io")))]
170 pub fn with_reader<R, B>(reader: R, boundary: B) -> Self
171 where
172 R: AsyncRead + Unpin + Send + 'r,
173 B: Into<String>,
174 {
175 let stream = ReaderStream::new(reader);
176 Multipart::new(stream, boundary)
177 }
178
179 /// Construct a new `Multipart` instance with the given [`AsyncRead`] reader
180 /// and the boundary.
181 ///
182 /// # Optional
183 ///
184 /// This requires the optional `tokio-io` feature to be enabled.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// use multer::Multipart;
190 ///
191 /// # async fn run() {
192 /// let data =
193 /// "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
194 /// let reader = data.as_bytes();
195 /// let mut multipart = Multipart::with_reader(reader, "X-BOUNDARY");
196 ///
197 /// while let Some(mut field) = multipart.next_field().await.unwrap() {
198 /// while let Some(chunk) = field.chunk().await.unwrap() {
199 /// println!("Chunk: {:?}", chunk);
200 /// }
201 /// }
202 /// # }
203 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
204 /// ```
205 #[cfg(feature = "tokio-io")]
206 #[cfg_attr(nightly, doc(cfg(feature = "tokio-io")))]
207 pub fn with_reader_with_constraints<R, B>(reader: R, boundary: B, constraints: Constraints) -> Self
208 where
209 R: AsyncRead + Unpin + Send + 'r,
210 B: Into<String>,
211 {
212 let stream = ReaderStream::new(reader);
213 Multipart::with_constraints(stream, boundary, constraints)
214 }
215
216 /// Yields the next [`Field`] if available.
217 ///
218 /// Any previous `Field` returned by this method must be dropped before
219 /// calling this method or [`Multipart::next_field_with_idx()`] again. See
220 /// [field-exclusivity](#field-exclusivity) for details.
221 pub async fn next_field(&mut self) -> Result<Option<Field<'r>>> {
222 future::poll_fn(|cx| self.poll_next_field(cx)).await
223 }
224
225 /// Yields the next [`Field`] if available.
226 ///
227 /// Any previous `Field` returned by this method must be dropped before
228 /// calling this method or [`Multipart::next_field_with_idx()`] again. See
229 /// [field-exclusivity](#field-exclusivity) for details.
230 ///
231 /// This method is available since version 2.1.0.
232 pub fn poll_next_field(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Field<'r>>>> {
233 // This is consistent as we have an `&mut` and `Field` is not `Clone`.
234 // Here, we are guaranteeing that the returned `Field` will be the
235 // _only_ field with access to the multipart parsing state. This ensure
236 // that lock failure can never occur. This is effectively a dynamic
237 // version of passing an `&mut` of `self` to the `Field`.
238 if Arc::strong_count(&self.state) != 1 {
239 return Poll::Ready(Err(Error::LockFailure));
240 }
241
242 debug_assert_eq!(Arc::strong_count(&self.state), 1);
243 debug_assert!(self.state.try_lock().is_some(), "expected exlusive lock");
244 let mut lock = match self.state.try_lock() {
245 Some(lock) => lock,
246 None => return Poll::Ready(Err(Error::LockFailure)),
247 };
248
249 let state = &mut *lock;
250 if state.stage == StreamingStage::Eof {
251 return Poll::Ready(Ok(None));
252 }
253
254 state.buffer.poll_stream(cx)?;
255
256 if state.stage == StreamingStage::FindingFirstBoundary {
257 let boundary = &state.boundary;
258 let boundary_deriv = format!("{}{}", constants::BOUNDARY_EXT, boundary);
259 match state.buffer.read_to(boundary_deriv.as_bytes()) {
260 Some(_) => state.stage = StreamingStage::ReadingBoundary,
261 None => {
262 state.buffer.poll_stream(cx)?;
263 if state.buffer.eof {
264 return Poll::Ready(Err(Error::IncompleteStream));
265 }
266 }
267 }
268 }
269
270 // The previous field did not finish reading its data.
271 if state.stage == StreamingStage::ReadingFieldData {
272 match state
273 .buffer
274 .read_field_data(state.boundary.as_str(), state.curr_field_name.as_deref())?
275 {
276 Some((done, bytes)) => {
277 state.curr_field_size_counter += bytes.len() as u64;
278
279 if state.curr_field_size_counter > state.curr_field_size_limit {
280 return Poll::Ready(Err(Error::FieldSizeExceeded {
281 limit: state.curr_field_size_limit,
282 field_name: state.curr_field_name.clone(),
283 }));
284 }
285
286 if done {
287 state.stage = StreamingStage::ReadingBoundary;
288 } else {
289 return Poll::Pending;
290 }
291 }
292 None => {
293 return Poll::Pending;
294 }
295 }
296 }
297
298 if state.stage == StreamingStage::ReadingBoundary {
299 let boundary = &state.boundary;
300 let boundary_deriv_len = constants::BOUNDARY_EXT.len() + boundary.len();
301
302 let boundary_bytes = match state.buffer.read_exact(boundary_deriv_len) {
303 Some(bytes) => bytes,
304 None => {
305 return if state.buffer.eof {
306 Poll::Ready(Err(Error::IncompleteStream))
307 } else {
308 Poll::Pending
309 };
310 }
311 };
312
313 if &boundary_bytes[..] == format!("{}{}", constants::BOUNDARY_EXT, boundary).as_bytes() {
314 state.stage = StreamingStage::DeterminingBoundaryType;
315 } else {
316 return Poll::Ready(Err(Error::IncompleteStream));
317 }
318 }
319
320 if state.stage == StreamingStage::DeterminingBoundaryType {
321 let ext_len = constants::BOUNDARY_EXT.len();
322 let next_bytes = match state.buffer.peek_exact(ext_len) {
323 Some(bytes) => bytes,
324 None => {
325 return if state.buffer.eof {
326 Poll::Ready(Err(Error::IncompleteStream))
327 } else {
328 Poll::Pending
329 };
330 }
331 };
332
333 if next_bytes == constants::BOUNDARY_EXT.as_bytes() {
334 state.stage = StreamingStage::Eof;
335 return Poll::Ready(Ok(None));
336 } else {
337 state.stage = StreamingStage::ReadingTransportPadding;
338 }
339 }
340
341 if state.stage == StreamingStage::ReadingTransportPadding {
342 if !state.buffer.advance_past_transport_padding() {
343 return if state.buffer.eof {
344 Poll::Ready(Err(Error::IncompleteStream))
345 } else {
346 Poll::Pending
347 };
348 }
349
350 let crlf_len = constants::CRLF.len();
351 let crlf_bytes = match state.buffer.read_exact(crlf_len) {
352 Some(bytes) => bytes,
353 None => {
354 return if state.buffer.eof {
355 Poll::Ready(Err(Error::IncompleteStream))
356 } else {
357 Poll::Pending
358 };
359 }
360 };
361
362 if &crlf_bytes[..] == constants::CRLF.as_bytes() {
363 state.stage = StreamingStage::ReadingFieldHeaders;
364 } else {
365 return Poll::Ready(Err(Error::IncompleteStream));
366 }
367 }
368
369 if state.stage == StreamingStage::ReadingFieldHeaders {
370 let header_bytes = match state.buffer.read_until(constants::CRLF_CRLF.as_bytes()) {
371 Some(bytes) => bytes,
372 None => {
373 return if state.buffer.eof {
374 return Poll::Ready(Err(Error::IncompleteStream));
375 } else {
376 Poll::Pending
377 };
378 }
379 };
380
381 let mut headers = [httparse::EMPTY_HEADER; constants::MAX_HEADERS];
382
383 let headers = match httparse::parse_headers(&header_bytes, &mut headers).map_err(Error::ReadHeaderFailed)? {
384 httparse::Status::Complete((_, raw_headers)) => {
385 match helpers::convert_raw_headers_to_header_map(raw_headers) {
386 Ok(headers) => headers,
387 Err(err) => {
388 return Poll::Ready(Err(err));
389 }
390 }
391 }
392 httparse::Status::Partial => {
393 return Poll::Ready(Err(Error::IncompleteHeaders));
394 }
395 };
396
397 state.stage = StreamingStage::ReadingFieldData;
398
399 let field_idx = state.next_field_idx;
400 state.next_field_idx += 1;
401
402 let content_disposition = ContentDisposition::parse(&headers);
403 let field_size_limit = state
404 .constraints
405 .size_limit
406 .extract_size_limit_for(content_disposition.field_name.as_deref());
407
408 state.curr_field_name = content_disposition.field_name.clone();
409 state.curr_field_size_limit = field_size_limit;
410 state.curr_field_size_counter = 0;
411
412 let field_name = content_disposition.field_name.as_deref();
413 if !state.constraints.is_it_allowed(field_name) {
414 return Poll::Ready(Err(Error::UnknownField {
415 field_name: field_name.map(str::to_owned),
416 }));
417 }
418
419 drop(lock); // The lock will be dropped anyway, but let's be explicit.
420 let field = Field::new(self.state.clone(), headers, field_idx, content_disposition);
421 return Poll::Ready(Ok(Some(field)));
422 }
423
424 Poll::Pending
425 }
426
427 /// Yields the next [`Field`] with their positioning index as a tuple
428 /// `(`[`usize`]`, `[`Field`]`)`.
429 ///
430 /// Any previous `Field` returned by this method must be dropped before
431 /// calling this method or [`Multipart::next_field()`] again. See
432 /// [field-exclusivity](#field-exclusivity) for details.
433 ///
434 /// # Examples
435 ///
436 /// ```
437 /// use std::convert::Infallible;
438 ///
439 /// use bytes::Bytes;
440 /// use futures_util::stream::once;
441 /// use multer::Multipart;
442 ///
443 /// # async fn run() {
444 /// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
445 /// name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
446 ///
447 /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
448 /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
449 ///
450 /// while let Some((idx, field)) = multipart.next_field_with_idx().await.unwrap() {
451 /// println!("Index: {:?}, Content: {:?}", idx, field.text().await)
452 /// }
453 /// # }
454 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
455 /// ```
456 pub async fn next_field_with_idx(&mut self) -> Result<Option<(usize, Field<'r>)>> {
457 self.next_field().await.map(|f| f.map(|field| (field.index(), field)))
458 }
459}