multer/field.rs
1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use bytes::{Bytes, BytesMut};
6use encoding_rs::{Encoding, UTF_8};
7use futures_util::stream::{Stream, TryStreamExt};
8use http::header::HeaderMap;
9#[cfg(feature = "json")]
10use serde::de::DeserializeOwned;
11use spin::mutex::spin::SpinMutex as Mutex;
12
13use crate::content_disposition::ContentDisposition;
14use crate::multipart::{MultipartState, StreamingStage};
15use crate::{helpers, Error};
16
17/// A single field in a multipart stream.
18///
19/// Its content can be accessed via the [`Stream`] API or the methods defined in
20/// this type.
21///
22/// # Lifetime
23///
24/// The lifetime of the stream `'r` corresponds to the lifetime of the
25/// underlying `Stream`. If the underlying stream holds no references directly
26/// or transitively, then the lifetime can be `'static`.
27///
28/// # Examples
29///
30/// ```
31/// use std::convert::Infallible;
32///
33/// use bytes::Bytes;
34/// use futures_util::stream::once;
35/// use multer::Multipart;
36///
37/// # async fn run() {
38/// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
39/// name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
40///
41/// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
42/// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
43///
44/// while let Some(field) = multipart.next_field().await.unwrap() {
45/// let content = field.text().await.unwrap();
46/// assert_eq!(content, "abcd");
47/// }
48/// # }
49/// # tokio::runtime::Runtime::new().unwrap().block_on(run());
50/// ```
51///
52/// [`Multipart`]: crate::Multipart
53#[derive(Debug)]
54pub struct Field<'r> {
55 state: Arc<Mutex<MultipartState<'r>>>,
56 done: bool,
57 headers: HeaderMap,
58 content_disposition: ContentDisposition,
59 content_type: Option<mime::Mime>,
60 idx: usize,
61}
62
63impl<'r> Field<'r> {
64 pub(crate) fn new(
65 state: Arc<Mutex<MultipartState<'r>>>,
66 headers: HeaderMap,
67 idx: usize,
68 content_disposition: ContentDisposition,
69 ) -> Self {
70 let content_type = helpers::parse_content_type(&headers);
71 Field {
72 state,
73 headers,
74 content_disposition,
75 content_type,
76 idx,
77 done: false,
78 }
79 }
80
81 /// The field name found in the [`Content-Disposition`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition) header.
82 pub fn name(&self) -> Option<&str> {
83 self.content_disposition.field_name.as_deref()
84 }
85
86 /// The file name found in the [`Content-Disposition`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition) header.
87 pub fn file_name(&self) -> Option<&str> {
88 self.content_disposition.file_name.as_deref()
89 }
90
91 /// Get the content type of the field.
92 pub fn content_type(&self) -> Option<&mime::Mime> {
93 self.content_type.as_ref()
94 }
95
96 /// Get a map of headers as [`HeaderMap`].
97 pub fn headers(&self) -> &HeaderMap {
98 &self.headers
99 }
100
101 /// Get the full data of the field as [`Bytes`].
102 ///
103 /// # Examples
104 ///
105 /// ```
106 /// use std::convert::Infallible;
107 ///
108 /// use bytes::Bytes;
109 /// use futures_util::stream::once;
110 /// use multer::Multipart;
111 ///
112 /// # async fn run() {
113 /// let data =
114 /// "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
115 /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
116 /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
117 ///
118 /// while let Some(field) = multipart.next_field().await.unwrap() {
119 /// let bytes = field.bytes().await.unwrap();
120 /// assert_eq!(bytes.len(), 4);
121 /// }
122 /// # }
123 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
124 /// ```
125 pub async fn bytes(self) -> crate::Result<Bytes> {
126 let mut buf = BytesMut::new();
127
128 let mut this = self;
129 while let Some(bytes) = this.chunk().await? {
130 buf.extend_from_slice(&bytes);
131 }
132
133 Ok(buf.freeze())
134 }
135
136 /// Stream a chunk of the field data.
137 ///
138 /// When the field data has been exhausted, this will return [`None`].
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// use std::convert::Infallible;
144 ///
145 /// use bytes::Bytes;
146 /// use futures_util::stream::once;
147 /// use multer::Multipart;
148 ///
149 /// # async fn run() {
150 /// let data =
151 /// "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
152 /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
153 /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
154 ///
155 /// while let Some(mut field) = multipart.next_field().await.unwrap() {
156 /// while let Some(chunk) = field.chunk().await.unwrap() {
157 /// println!("Chunk: {:?}", chunk);
158 /// }
159 /// }
160 /// # }
161 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
162 /// ```
163 pub async fn chunk(&mut self) -> crate::Result<Option<Bytes>> {
164 self.try_next().await
165 }
166
167 /// Try to deserialize the field data as JSON.
168 ///
169 /// # Optional
170 ///
171 /// This requires the optional `json` feature to be enabled.
172 ///
173 /// # Examples
174 ///
175 /// ```
176 /// use multer::Multipart;
177 /// use bytes::Bytes;
178 /// use std::convert::Infallible;
179 /// use futures_util::stream::once;
180 /// use serde::Deserialize;
181 ///
182 /// // This `derive` requires the `serde` dependency.
183 /// #[derive(Deserialize)]
184 /// struct User {
185 /// name: String
186 /// }
187 ///
188 /// # async fn run() {
189 /// let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\n{ \"name\": \"Alice\" }\r\n--X-BOUNDARY--\r\n";
190 /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
191 /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
192 ///
193 /// while let Some(field) = multipart.next_field().await.unwrap() {
194 /// let user = field.json::<User>().await.unwrap();
195 /// println!("User Name: {}", user.name);
196 /// }
197 /// # }
198 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
199 /// ```
200 ///
201 /// # Errors
202 ///
203 /// This method fails if the field data is not in JSON format
204 /// or it cannot be properly deserialized to target type `T`. For more
205 /// details please see [`serde_json::from_slice`].
206 #[cfg(feature = "json")]
207 #[cfg_attr(nightly, doc(cfg(feature = "json")))]
208 pub async fn json<T: DeserializeOwned>(self) -> crate::Result<T> {
209 serde_json::from_slice(&self.bytes().await?).map_err(Error::DecodeJson)
210 }
211
212 /// Get the full field data as text.
213 ///
214 /// This method decodes the field data with `BOM sniffing` and with
215 /// malformed sequences replaced with the `REPLACEMENT CHARACTER`.
216 /// Encoding is determined from the `charset` parameter of `Content-Type`
217 /// header, and defaults to `utf-8` if not presented.
218 ///
219 /// # Examples
220 ///
221 /// ```
222 /// use std::convert::Infallible;
223 ///
224 /// use bytes::Bytes;
225 /// use futures_util::stream::once;
226 /// use multer::Multipart;
227 ///
228 /// # async fn run() {
229 /// let data =
230 /// "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
231 /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
232 /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
233 ///
234 /// while let Some(field) = multipart.next_field().await.unwrap() {
235 /// let content = field.text().await.unwrap();
236 /// assert_eq!(content, "abcd");
237 /// }
238 /// # }
239 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
240 /// ```
241 pub async fn text(self) -> crate::Result<String> {
242 self.text_with_charset("utf-8").await
243 }
244
245 /// Get the full field data as text given a specific encoding.
246 ///
247 /// This method decodes the field data with `BOM sniffing` and with
248 /// malformed sequences replaced with the `REPLACEMENT CHARACTER`.
249 /// You can provide a default encoding for decoding the raw message, while
250 /// the `charset` parameter of `Content-Type` header is still prioritized.
251 /// For more information about the possible encoding name, please go to
252 /// [encoding_rs] docs.
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// use std::convert::Infallible;
258 ///
259 /// use bytes::Bytes;
260 /// use futures_util::stream::once;
261 /// use multer::Multipart;
262 ///
263 /// # async fn run() {
264 /// let data =
265 /// "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
266 /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
267 /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
268 ///
269 /// while let Some(field) = multipart.next_field().await.unwrap() {
270 /// let content = field.text_with_charset("utf-8").await.unwrap();
271 /// assert_eq!(content, "abcd");
272 /// }
273 /// # }
274 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
275 /// ```
276 pub async fn text_with_charset(self, default_encoding: &str) -> crate::Result<String> {
277 let encoding_name = self
278 .content_type()
279 .and_then(|mime| mime.get_param(mime::CHARSET))
280 .map(|charset| charset.as_str())
281 .unwrap_or(default_encoding);
282
283 let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8);
284 let bytes = self.bytes().await?;
285 Ok(encoding.decode(&bytes).0.into_owned())
286 }
287
288 /// Get the index of this field in order they appeared in the stream.
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// use std::convert::Infallible;
294 ///
295 /// use bytes::Bytes;
296 /// use futures_util::stream::once;
297 /// use multer::Multipart;
298 ///
299 /// # async fn run() {
300 /// let data =
301 /// "--X-BOUNDARY\r\nContent-Disposition: form-data; name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--\r\n";
302 /// let stream = once(async move { Result::<Bytes, Infallible>::Ok(Bytes::from(data)) });
303 /// let mut multipart = Multipart::new(stream, "X-BOUNDARY");
304 ///
305 /// while let Some(field) = multipart.next_field().await.unwrap() {
306 /// let idx = field.index();
307 /// println!("Field index: {}", idx);
308 /// }
309 /// # }
310 /// # tokio::runtime::Runtime::new().unwrap().block_on(run());
311 /// ```
312 pub fn index(&self) -> usize {
313 self.idx
314 }
315}
316
317impl Stream for Field<'_> {
318 type Item = Result<Bytes, Error>;
319
320 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
321 if self.done {
322 return Poll::Ready(None);
323 }
324
325 debug_assert!(self.state.try_lock().is_some(), "expected exlusive lock");
326 let state = self.state.clone();
327 let mut lock = match state.try_lock() {
328 Some(lock) => lock,
329 None => return Poll::Ready(Some(Err(Error::LockFailure))),
330 };
331
332 let state = &mut *lock;
333 if let Err(err) = state.buffer.poll_stream(cx) {
334 return Poll::Ready(Some(Err(err)));
335 }
336
337 match state
338 .buffer
339 .read_field_data(&state.boundary, state.curr_field_name.as_deref())
340 {
341 Ok(Some((done, bytes))) => {
342 state.curr_field_size_counter += bytes.len() as u64;
343
344 if state.curr_field_size_counter > state.curr_field_size_limit {
345 return Poll::Ready(Some(Err(Error::FieldSizeExceeded {
346 limit: state.curr_field_size_limit,
347 field_name: state.curr_field_name.clone(),
348 })));
349 }
350
351 if done {
352 state.stage = StreamingStage::ReadingBoundary;
353 self.done = true;
354 }
355
356 Poll::Ready(Some(Ok(bytes)))
357 }
358 Ok(None) => Poll::Pending,
359 Err(err) => Poll::Ready(Some(Err(err))),
360 }
361 }
362}