From fed6b058ad897eb1613bc1b823c4393e9d82ef07 Mon Sep 17 00:00:00 2001 From: Jason LeBrun Date: Tue, 14 Jan 2025 14:13:56 +0000 Subject: [PATCH] Add some clarifying comments to the forward_stream helper Change-Id: I2af834376eed1e3920a32692457d01379a97adb7 --- .../hello_world/host_app/src/service.rs | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/oak_containers/examples/hello_world/host_app/src/service.rs b/oak_containers/examples/hello_world/host_app/src/service.rs index 07122644cb..cd73e8ca28 100644 --- a/oak_containers/examples/hello_world/host_app/src/service.rs +++ b/oak_containers/examples/hello_world/host_app/src/service.rs @@ -43,17 +43,35 @@ enum Action { Send(Option>), } -async fn forward_stream( +/// A generic helper method to pass messages between client and enclave app. +/// +/// A standard Oak Containers host application is just a simple message +/// forwarder. This function is an implementation of a generic bi-directional +/// message forwarding strategy. +/// +/// It's possible that we can move this into the Rust SDK as a host application +/// helper. +/// +/// `request_stream` is the stream of requests coming into a tonic gRPC handler +/// from the client, typically the argument to the tonic handler method. +/// +/// `upstream_starter` A function that initiates a streaming connection to an +/// enclave application that exposes an oak streaming session protocol endpoint. +/// The method receives an `rx` channel that's created internally by the +/// forward_stream method, and will feed client requests to the enclave app. +async fn forward_stream( request_stream: tonic::Streaming, upstream_starter: impl FnOnce(mpsc::Receiver) -> Fut, ) -> Result>, tonic::Status> where - Fut: Future>, tonic::Status>>, + Fut: Future>, E>>, { let mut request_stream = request_stream; let (mut tx, rx) = mpsc::channel(10); - - let mut upstream = upstream_starter(rx).await?.into_inner(); + let mut upstream = upstream_starter(rx) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))? + .into_inner(); Ok(async_stream::try_stream! { loop { @@ -91,19 +109,17 @@ impl HostApplication for HostApplicationImpl { async fn session( &self, - request: tonic::Request>, + client_request_stream: tonic::Request>, ) -> Result, tonic::Status> { - let request_stream = request.into_inner(); - + // Clone the app implementation `Arc` so that we have a reference to use the in + // callback below. let enclave_app = self.enclave_app.clone(); + let enclave_response_stream_starter = + |rx| async move { enclave_app.lock().await.legacy_session(rx).await }; - let response_stream = forward_stream(request_stream, |rx| async move { - let mut app = enclave_app.lock().await; - app.legacy_session(rx).await.map_err(|err| { - tonic::Status::internal(format!("Failed to start enclave app stream: {err:?}")) - }) - }) - .await?; + let response_stream = + forward_stream(client_request_stream.into_inner(), enclave_response_stream_starter) + .await?; Ok(tonic::Response::new(Box::pin(response_stream) as Self::SessionStream)) }