3131//! }
3232//! ```
3333
34+ use std:: io:: { Cursor , Read } ;
3435use std:: task:: Context ;
3536use std:: task:: Poll ;
3637use std:: { fmt:: Debug , pin:: Pin , str:: FromStr } ;
3738
3839use futures_core:: stream:: Stream ;
40+ use futures_lite:: { io, prelude:: * } ;
41+ use futures_util:: stream:: TryStreamExt ;
3942use multipart:: server:: Multipart as Parser ;
40- use std:: io:: { Cursor , Read } ;
4143
42- use crate :: { format_err, Mime , Status } ;
44+ use crate :: mime;
45+ use crate :: { format_err, Body , Mime , Status } ;
4346pub use entry:: Entry ;
4447
4548mod entry;
@@ -67,7 +70,6 @@ impl Multipart {
6770
6871 /// Parse a `Body` stream as a `Multipart` instance.
6972 pub async fn from_req ( req : & mut crate :: Request ) -> crate :: Result < Self > {
70- let body = req. take_body ( ) . into_string ( ) . await ?;
7173 let boundary = req
7274 . content_type ( )
7375 . map ( |ct| ct. param ( "boundary" ) . cloned ( ) )
@@ -83,6 +85,9 @@ impl Multipart {
8385 }
8486 } ;
8587
88+ // Not ideal, but done for now so we can avoid implementing all of Multipart ourselves for the time being.
89+ let body = req. take_body ( ) . into_string ( ) . await ?;
90+
8691 let multipart = Parser :: with_body ( Cursor :: new ( body) , boundary) ;
8792 Ok ( Self {
8893 entries : vec ! [ ] ,
@@ -96,6 +101,11 @@ impl Multipart {
96101 E : Into < Entry > ,
97102 {
98103 self . entries . push ( entry. into ( ) ) ;
104+ // if let Some(entries) = self.entries.as_mut() {
105+ // entries.push(entry.into());
106+ // } else {
107+ // self.entries = Some(vec![entry.into()]);
108+ // }
99109 }
100110}
101111
@@ -120,20 +130,91 @@ impl Stream for Multipart {
120130 . content_type
121131 . map ( |ct| Mime :: from_str ( & ct. to_string ( ) ) )
122132 . transpose ( ) ?;
123- entry. set_content_type ( mime) ;
133+ if let Some ( mime) = mime {
134+ entry. set_mime ( mime) ;
135+ } else {
136+ // https://tools.ietf.org/html/rfc7578#section-4.4
137+ entry. set_mime ( mime:: PLAIN ) ;
138+ }
124139
125140 Poll :: Ready ( Some ( Ok ( entry) ) )
126141 }
127142 Ok ( None ) => Poll :: Ready ( None ) ,
128- Err ( _e) => {
129- // TODO: forward error?
130- let mut err = format_err ! ( "Invalid multipart entry" ) ;
143+ Err ( e) => {
144+ let mut err = format_err ! ( "Invalid multipart entry: {}" , e) ;
131145 err. set_status ( 400 ) ;
132146 Poll :: Ready ( Some ( Err ( err) ) )
133147 }
134148 }
135149 }
136150}
137151
138- // TODO
139- // impl From<Multipart> for Body {}
152+ // struct MultipartReader {
153+ // entry_iter: Box<dyn Iterator<Item = Entry>>,
154+ // }
155+
156+ // impl From<Multipart> for MultipartReader {
157+ // fn from(multipart: Multipart) -> Self {
158+ // Self {
159+ // entry_iter: Box::new(multipart.entries.into_iter())
160+ // }
161+ // }
162+ // }
163+
164+ // impl AsyncRead for MultipartReader {
165+ // #[allow(missing_doc_code_examples)]
166+ // fn poll_read(
167+ // mut self: Pin<&mut Self>,
168+ // cx: &mut Context<'_>,
169+ // buf: &mut [u8],
170+ // ) -> Poll<io::Result<usize>> {
171+ // if let Some(entry) = self.entry_iter.next() {
172+ // Pin::new(&mut entry).poll_read(cx, buf)
173+ // } else {
174+ // Poll::Ready()
175+ // }
176+ // }
177+ // }
178+
179+ // impl AsyncBufRead for MultipartReader {
180+ // #[allow(missing_doc_code_examples)]
181+ // fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&'_ [u8]>> {
182+ // let this = self.project();
183+ // this.reader.poll_fill_buf(cx)
184+ // }
185+
186+ // fn consume(mut self: Pin<&mut Self>, amt: usize) {
187+ // Pin::new(&mut self.reader).consume(amt)
188+ // }
189+ // }
190+
191+ struct BufReader < R : AsyncRead > {
192+ inner : io:: BufReader < R > ,
193+ }
194+
195+ impl < R : AsyncRead > BufReader < R > {
196+ pub fn new ( inner : R ) -> Self {
197+ Self {
198+ inner : io:: BufReader :: new ( inner) ,
199+ }
200+ }
201+ }
202+
203+ impl < R : AsyncRead > AsRef < [ u8 ] > for BufReader < R > {
204+ fn as_ref ( & self ) -> & [ u8 ] {
205+ self . inner . buffer ( )
206+ }
207+ }
208+
209+ impl From < Multipart > for Body {
210+ fn from ( multipart : Multipart ) -> Self {
211+ let stream = multipart. map ( |maybe_entry| {
212+ maybe_entry
213+ . map ( |entry| BufReader :: new ( entry) )
214+ . map_err ( |err| {
215+ std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , err. to_string ( ) . to_owned ( ) )
216+ } )
217+ } ) ;
218+ Body :: from_reader ( io:: BufReader :: new ( stream. into_async_read ( ) ) , None )
219+ }
220+ }
0 commit comments