From ba0aa34899d3977b7aa2404c6c9eb4a37c05061e Mon Sep 17 00:00:00 2001 From: Oscar Cowdery Lack Date: Wed, 6 Apr 2022 16:18:39 +1000 Subject: [PATCH] Make FtpStream::get read the final response Once the client has finished reading a file from the server, the server sends a status message to the client. The current API of FtpStream::get doesn't allow us to read that response, since we return a BufReader that reads directly from the data stream. Instead of doing this, we can return a new AsyncRead type (FileReader) that wraps the data stream and reads the status message once the data stream is finished. There is one caveat to this - if the FileReader is dropped then the data stream will be closed, and the server will send us an error message, which still needs to be read manually. There isn't really a nice way around this without an AsyncDrop trait. Since this is already a breaking change, we also no longer use a BufReader at all - this lets the caller decide whether to buffer or not. --- src/file_reader.rs | 95 ++++++++++++++++++++++++++++++++++++++++++++++ src/ftp.rs | 12 +++--- src/lib.rs | 2 + 3 files changed, 104 insertions(+), 5 deletions(-) create mode 100644 src/file_reader.rs diff --git a/src/file_reader.rs b/src/file_reader.rs new file mode 100644 index 0000000..a9cf529 --- /dev/null +++ b/src/file_reader.rs @@ -0,0 +1,95 @@ +use std::{ + future::Future, + io, mem, + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::io::{AsyncRead, ReadBuf}; + +use crate::{status, DataStream, FtpStream}; + +pub struct FileReader<'a> { + state: State<'a>, +} + +enum State<'a> { + Stream { + data_stream: DataStream, + ftp_stream: &'a mut FtpStream, + }, + FinalRead(Pin>>>), + Finished, +} + +impl FileReader<'_> { + pub(crate) fn new(data_stream: DataStream, ftp_stream: &mut FtpStream) -> FileReader { + FileReader { + state: State::Stream { + data_stream, + ftp_stream, + }, + } + } +} + +impl AsyncRead for FileReader<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let bytes_read_before = buf.filled().len(); + let (state, result) = match mem::replace(&mut self.state, State::Finished) { + State::Stream { + mut data_stream, + ftp_stream, + } => match Pin::new(&mut data_stream).poll_read(cx, buf) { + Poll::Ready(result) => { + let bytes_read_after = buf.filled().len(); + if bytes_read_after == bytes_read_before { + // finished reading the file, wait for a status message from the server + let mut status_fut = Box::pin(async move { + ftp_stream + .read_response_in(&[ + status::CLOSING_DATA_CONNECTION, + status::REQUESTED_FILE_ACTION_OK, + ]) + .await + .map(|_| ()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .and(result) + }); + match Pin::new(&mut status_fut).poll(cx) { + Poll::Ready(r) => (State::Finished, Poll::Ready(r)), + Poll::Pending => (State::FinalRead(status_fut), Poll::Pending), + } + } else { + ( + State::Stream { + data_stream, + ftp_stream, + }, + Poll::Ready(result), + ) + } + } + Poll::Pending => ( + State::Stream { + data_stream, + ftp_stream, + }, + Poll::Pending, + ), + }, + State::FinalRead(mut status_fut) => match Pin::new(&mut status_fut).poll(cx) { + Poll::Ready(r) => (State::Finished, Poll::Ready(r)), + Poll::Pending => (State::FinalRead(status_fut), Poll::Pending), + }, + State::Finished => panic!("poll called on finished FileReader"), + }; + + self.state = state; + result + } +} diff --git a/src/ftp.rs b/src/ftp.rs index 7ff174c..899aa84 100644 --- a/src/ftp.rs +++ b/src/ftp.rs @@ -17,6 +17,7 @@ use tokio::net::{TcpStream, ToSocketAddrs}; use tokio_rustls::{rustls::ClientConfig, rustls::ServerName, TlsConnector}; use crate::data_stream::DataStream; +use crate::file_reader::FileReader; use crate::status; use crate::types::{FileType, FtpError, Line, Result}; @@ -316,14 +317,15 @@ impl FtpStream { /// Retrieves the file name specified from the server. /// This method is a more complicated way to retrieve a file. - /// The reader returned should be dropped. - /// Also you will have to read the response to make sure it has the correct value. - pub async fn get(&mut self, file_name: &str) -> Result> { + /// + /// If the reader is dropped before the file is fully read, the server will send a error message that + /// should be read with [`Self::read_response`]/[`Self::read_response_in`]. + pub async fn get(&mut self, file_name: &str) -> Result> { let retr_command = format!("RETR {}\r\n", file_name); - let data_stream = BufReader::new(self.data_command(&retr_command).await?); + let data_stream = self.data_command(&retr_command).await?; self.read_response_in(&[status::ABOUT_TO_SEND, status::ALREADY_OPEN]) .await?; - Ok(data_stream) + Ok(FileReader::new(data_stream, self)) } /// Renames the file from_name to to_name diff --git a/src/lib.rs b/src/lib.rs index 6cadbe3..c9b498f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,10 +54,12 @@ //! mod data_stream; +mod file_reader; mod ftp; pub mod status; pub mod types; pub use self::data_stream::DataStream; +pub use self::file_reader::FileReader; pub use self::ftp::FtpStream; pub use self::types::FtpError;