multer/
field.rs

1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use bytes::{Bytes, BytesMut};
6use encoding_rs::{Encoding, UTF_8};
7use futures_util::stream::{Stream, TryStreamExt};
8use http::header::HeaderMap;
9#[cfg(feature = "json")]
10use serde::de::DeserializeOwned;
11use spin::mutex::spin::SpinMutex as Mutex;
12
13use crate::content_disposition::ContentDisposition;
14use crate::multipart::{MultipartState, StreamingStage};
15use crate::{helpers, Error};
16
17/// A single field in a multipart stream.
18///
19/// Its content can be accessed via the [`Stream`] API or the methods defined in
20/// this type.
21///
22/// # Lifetime
23///
24/// The lifetime of the stream `'r` corresponds to the lifetime of the
25/// underlying `Stream`. If the underlying stream holds no references directly
26/// or transitively, then the lifetime can be `'static`.
27///
28/// # Examples
29///
30/// ```
31/// use std::convert::Infallible;
32///
33/// use bytes::Bytes;
34/// use futures_util::stream::once;
35/// use multer::Multipart;
36///
37/// # async fn run() {
38/// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
39///     name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
40///
41/// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
42/// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
43///
44/// while let Some(field) = multipart.next_field().await.unwrap() {
45///     let content = field.text().await.unwrap();
46///     assert_eq!(content, "abcd");
47/// }
48/// # }
49/// # tokio::runtime::Runtime::new().unwrap().block_on(run());
50/// ```
51///
52/// [`Multipart`]: crate::Multipart
53#[derive(Debug)]
54pub struct Field<'r> {
55    state: Arc<Mutex<MultipartState<'r>>>,
56    done: bool,
57    headers: HeaderMap,
58    content_disposition: ContentDisposition,
59    content_type: Option<mime::Mime>,
60    idx: usize,
61}
62
63impl<'r> Field<'r> {
64    pub(crate) fn new(
65        state: Arc<Mutex<MultipartState<'r>>>,
66        headers: HeaderMap,
67        idx: usize,
68        content_disposition: ContentDisposition,
69    ) -> Self {
70        let content_type = helpers::parse_content_type(&headers);
71        Field {
72            state,
73            headers,
74            content_disposition,
75            content_type,
76            idx,
77            done: false,
78        }
79    }
80
81    /// The field name found in the [`Content-Disposition`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition) header.
82    pub fn name(&self) -> Option<&str> {
83        self.content_disposition.field_name.as_deref()
84    }
85
86    /// The file name found in the [`Content-Disposition`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition) header.
87    pub fn file_name(&self) -> Option<&str> {
88        self.content_disposition.file_name.as_deref()
89    }
90
91    /// Get the content type of the field.
92    pub fn content_type(&self) -> Option<&mime::Mime> {
93        self.content_type.as_ref()
94    }
95
96    /// Get a map of headers as [`HeaderMap`].
97    pub fn headers(&self) -> &HeaderMap {
98        &self.headers
99    }
100
101    /// Get the full data of the field as [`Bytes`].
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use std::convert::Infallible;
107    ///
108    /// use bytes::Bytes;
109    /// use futures_util::stream::once;
110    /// use multer::Multipart;
111    ///
112    /// # async fn run() {
113    /// let data =
114    ///     "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
115    /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
116    /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
117    ///
118    /// while let Some(field) = multipart.next_field().await.unwrap() {
119    ///     let bytes = field.bytes().await.unwrap();
120    ///     assert_eq!(bytes.len(), 4);
121    /// }
122    /// # }
123    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
124    /// ```
125    pub async fn bytes(self) -> crate::Result<Bytes> {
126        let mut buf = BytesMut::new();
127
128        let mut this = self;
129        while let Some(bytes) = this.chunk().await? {
130            buf.extend_from_slice(&bytes);
131        }
132
133        Ok(buf.freeze())
134    }
135
136    /// Stream a chunk of the field data.
137    ///
138    /// When the field data has been exhausted, this will return [`None`].
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// use std::convert::Infallible;
144    ///
145    /// use bytes::Bytes;
146    /// use futures_util::stream::once;
147    /// use multer::Multipart;
148    ///
149    /// # async fn run() {
150    /// let data =
151    ///     "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
152    /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
153    /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
154    ///
155    /// while let Some(mut field) = multipart.next_field().await.unwrap() {
156    ///     while let Some(chunk) = field.chunk().await.unwrap() {
157    ///         println!("Chunk: {:?}", chunk);
158    ///     }
159    /// }
160    /// # }
161    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
162    /// ```
163    pub async fn chunk(&mut self) -> crate::Result<Option<Bytes>> {
164        self.try_next().await
165    }
166
167    /// Try to deserialize the field data as JSON.
168    ///
169    /// # Optional
170    ///
171    /// This requires the optional `json` feature to be enabled.
172    ///
173    /// # Examples
174    ///
175    /// ```
176    /// use multer::Multipart;
177    /// use bytes::Bytes;
178    /// use std::convert::Infallible;
179    /// use futures_util::stream::once;
180    /// use serde::Deserialize;
181    ///
182    /// // This `derive` requires the `serde` dependency.
183    /// #[derive(Deserialize)]
184    /// struct User {
185    ///     name: String
186    /// }
187    ///
188    /// # async fn run() {
189    /// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\n{ \"name\": \"Alice\" }\r\n--X-BOUNDARY--\r\n";
190    /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
191    /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
192    ///
193    /// while let Some(field) = multipart.next_field().await.unwrap() {
194    ///     let user = field.json::<User>().await.unwrap();
195    ///     println!("User Name: {}", user.name);
196    /// }
197    /// # }
198    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
199    /// ```
200    ///
201    /// # Errors
202    ///
203    /// This method fails if the field data is not in JSON format
204    /// or it cannot be properly deserialized to target type `T`. For more
205    /// details please see [`serde_json::from_slice`].
206    #[cfg(feature = "json")]
207    #[cfg_attr(nightly, doc(cfg(feature = "json")))]
208    pub async fn json<T: DeserializeOwned>(self) -> crate::Result<T> {
209        serde_json::from_slice(&self.bytes().await?).map_err(Error::DecodeJson)
210    }
211
212    /// Get the full field data as text.
213    ///
214    /// This method decodes the field data with `BOM sniffing` and with
215    /// malformed sequences replaced with the `REPLACEMENT CHARACTER`.
216    /// Encoding is determined from the `charset` parameter of `Content-Type`
217    /// header, and defaults to `utf-8` if not presented.
218    ///
219    /// # Examples
220    ///
221    /// ```
222    /// use std::convert::Infallible;
223    ///
224    /// use bytes::Bytes;
225    /// use futures_util::stream::once;
226    /// use multer::Multipart;
227    ///
228    /// # async fn run() {
229    /// let data =
230    ///     "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
231    /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
232    /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
233    ///
234    /// while let Some(field) = multipart.next_field().await.unwrap() {
235    ///     let content = field.text().await.unwrap();
236    ///     assert_eq!(content, "abcd");
237    /// }
238    /// # }
239    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
240    /// ```
241    pub async fn text(self) -> crate::Result<String> {
242        self.text_with_charset("utf-8").await
243    }
244
245    /// Get the full field data as text given a specific encoding.
246    ///
247    /// This method decodes the field data with `BOM sniffing` and with
248    /// malformed sequences replaced with the `REPLACEMENT CHARACTER`.
249    /// You can provide a default encoding for decoding the raw message, while
250    /// the `charset` parameter of `Content-Type` header is still prioritized.
251    /// For more information about the possible encoding name, please go to
252    /// [encoding_rs] docs.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// use std::convert::Infallible;
258    ///
259    /// use bytes::Bytes;
260    /// use futures_util::stream::once;
261    /// use multer::Multipart;
262    ///
263    /// # async fn run() {
264    /// let data =
265    ///     "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
266    /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
267    /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
268    ///
269    /// while let Some(field) = multipart.next_field().await.unwrap() {
270    ///     let content = field.text_with_charset("utf-8").await.unwrap();
271    ///     assert_eq!(content, "abcd");
272    /// }
273    /// # }
274    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
275    /// ```
276    pub async fn text_with_charset(self, default_encoding: &str) -> crate::Result<String> {
277        let encoding_name = self
278            .content_type()
279            .and_then(|mime| mime.get_param(mime::CHARSET))
280            .map(|charset| charset.as_str())
281            .unwrap_or(default_encoding);
282
283        let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8);
284        let bytes = self.bytes().await?;
285        Ok(encoding.decode(&bytes).0.into_owned())
286    }
287
288    /// Get the index of this field in order they appeared in the stream.
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// use std::convert::Infallible;
294    ///
295    /// use bytes::Bytes;
296    /// use futures_util::stream::once;
297    /// use multer::Multipart;
298    ///
299    /// # async fn run() {
300    /// let data =
301    ///     "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
302    /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
303    /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
304    ///
305    /// while let Some(field) = multipart.next_field().await.unwrap() {
306    ///     let idx = field.index();
307    ///     println!("Field index: {}", idx);
308    /// }
309    /// # }
310    /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
311    /// ```
312    pub fn index(&self) -> usize {
313        self.idx
314    }
315}
316
317impl Stream for Field<'_> {
318    type Item = Result<Bytes, Error>;
319
320    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
321        if self.done {
322            return Poll::Ready(None);
323        }
324
325        debug_assert!(self.state.try_lock().is_some(), "expected exlusive lock");
326        let state = self.state.clone();
327        let mut lock = match state.try_lock() {
328            Some(lock) => lock,
329            None => return Poll::Ready(Some(Err(Error::LockFailure))),
330        };
331
332        let state = &mut *lock;
333        if let Err(err) = state.buffer.poll_stream(cx) {
334            return Poll::Ready(Some(Err(err)));
335        }
336
337        match state
338            .buffer
339            .read_field_data(&state.boundary, state.curr_field_name.as_deref())
340        {
341            Ok(Some((done, bytes))) => {
342                state.curr_field_size_counter += bytes.len() as u64;
343
344                if state.curr_field_size_counter > state.curr_field_size_limit {
345                    return Poll::Ready(Some(Err(Error::FieldSizeExceeded {
346                        limit: state.curr_field_size_limit,
347                        field_name: state.curr_field_name.clone(),
348                    })));
349                }
350
351                if done {
352                    state.stage = StreamingStage::ReadingBoundary;
353                    self.done = true;
354                }
355
356                Poll::Ready(Some(Ok(bytes)))
357            }
358            Ok(None) => Poll::Pending,
359            Err(err) => Poll::Ready(Some(Err(err))),
360        }
361    }
362}