diff --git a/reflectapi-demo/clients/rust/generated/Cargo.toml b/reflectapi-demo/clients/rust/generated/Cargo.toml index 6fc37de2..1c4f1e41 100644 --- a/reflectapi-demo/clients/rust/generated/Cargo.toml +++ b/reflectapi-demo/clients/rust/generated/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" workspace = true [dependencies] -reflectapi = { workspace = true, features = ["rt", "reqwest", "chrono"] } +reflectapi = { workspace = true, features = ["rt", "rt-sse", "reqwest", "chrono"] } chrono = { version = "0.4.37", features = ["serde"] } tracing = "0.1" serde = { version = "1.0.218", features = ["derive"] } diff --git a/reflectapi/src/rt.rs b/reflectapi/src/rt.rs index 4ffa7d68..54a61129 100644 --- a/reflectapi/src/rt.rs +++ b/reflectapi/src/rt.rs @@ -89,7 +89,52 @@ impl std::erro pub type BoxStream = Pin + Send + 'static>>; -pub type StreamResponse = Result>>, Error>; +/// Error type for individual stream items. +/// +/// Unlike [`Error`], this does not include an `Application` variant because +/// application-level errors can only occur during the initial request/response +/// cycle (stream creation), not per-item during streaming. +pub enum StreamItemError { + Network(NE), + Protocol { + info: String, + stage: ProtocolErrorStage, + }, +} + +impl core::fmt::Debug for StreamItemError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + StreamItemError::Network(err) => write!(f, "network error: {err:?}"), + StreamItemError::Protocol { info, stage } => { + write!(f, "protocol error: {info} at {stage:?}") + } + } + } +} + +impl core::fmt::Display for StreamItemError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + StreamItemError::Network(err) => write!(f, "network error: {err}"), + StreamItemError::Protocol { info, stage } => { + write!(f, "protocol error: {info} at {stage}") + } + } + } +} + +impl std::error::Error for StreamItemError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + StreamItemError::Network(err) => Some(err), + StreamItemError::Protocol { .. } => None, + } + } +} + +pub type StreamResponse = + Result>>, Error>; pub enum ProtocolErrorStage { SerializeRequestBody, @@ -216,6 +261,7 @@ where } } +#[cfg(feature = "rt-sse")] fn __serialize_headers_for_stream( headers: H, ) -> Result { @@ -256,12 +302,13 @@ fn __serialize_headers_for_stream( } #[doc(hidden)] +#[cfg(feature = "rt-sse")] pub async fn __stream_request_impl( client: &C, url: Url, body: I, headers: H, -) -> Result>>, Error> +) -> Result>>, Error> where C: Client, C::Error: Send + 'static, @@ -270,6 +317,11 @@ where O: serde::de::DeserializeOwned + Send + 'static, E: serde::de::DeserializeOwned + Send + 'static, { + use futures_util::StreamExt; + use sseer::event_stream::EventStream; + use sseer::json_stream::JsonStream; + use sseer::{errors::EventStreamError, json_stream::JsonStreamError}; + let body = serde_json::to_vec(&body).map_err(|e| Error::Protocol { info: e.to_string(), stage: ProtocolErrorStage::SerializeRequestBody, @@ -284,40 +336,24 @@ where .map_err(Error::Network)?; if status.is_success() { - #[cfg(feature = "rt-sse")] - { - use futures_util::StreamExt; - use sseer::event_stream::EventStream; - use sseer::json_stream::JsonStream; - use sseer::{errors::EventStreamError, json_stream::JsonStreamError}; - - let event_stream = EventStream::new(byte_stream); - let json_stream = JsonStream::::new_default(event_stream); - let stream = json_stream.map(|item| { - item.map_err(|err| match err { - JsonStreamError::Stream(err) => match err { - EventStreamError::Transport(err) => Error::Network(err), - EventStreamError::Utf8Error(err) => Error::Protocol { - info: err.to_string(), - stage: ProtocolErrorStage::DeserializeResponseBody(bytes::Bytes::new()), - }, - }, - JsonStreamError::Deserialize(err) => Error::Protocol { + let event_stream = EventStream::new(byte_stream); + let json_stream = JsonStream::::new_default(event_stream); + let stream = json_stream.map(|item| { + item.map_err(|err| match err { + JsonStreamError::Stream(err) => match err { + EventStreamError::Transport(err) => StreamItemError::Network(err), + EventStreamError::Utf8Error(err) => StreamItemError::Protocol { info: err.to_string(), stage: ProtocolErrorStage::DeserializeResponseBody(bytes::Bytes::new()), }, - }) - }); - return Ok(Box::pin(stream)); - } - - #[cfg(not(feature = "rt-sse"))] - { - return Err(Error::Protocol { - info: "SSE streaming requires the 'rt-sse' feature flag".to_string(), - stage: ProtocolErrorStage::DeserializeResponseBody(bytes::Bytes::new()), - }); - } + }, + JsonStreamError::Deserialize(err) => StreamItemError::Protocol { + info: err.to_string(), + stage: ProtocolErrorStage::DeserializeResponseBody(bytes::Bytes::new()), + }, + }) + }); + return Ok(Box::pin(stream)); } let body = __collect_byte_stream(byte_stream) @@ -333,6 +369,7 @@ where } } +#[cfg(feature = "rt-sse")] async fn __collect_byte_stream( stream: Pin> + Send>>, ) -> Result {