Skip to content

Commit

Permalink
Add some clarifying comments to the forward_stream helper
Browse files Browse the repository at this point in the history
Change-Id: I2af834376eed1e3920a32692457d01379a97adb7
  • Loading branch information
jblebrun committed Jan 21, 2025
1 parent 7b10d6a commit fed6b05
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions oak_containers/examples/hello_world/host_app/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,35 @@ enum Action {
Send(Option<Result<ResponseWrapper, tonic::Status>>),
}

async fn forward_stream<Fut>(
/// A generic helper method to pass messages between client and enclave app.
///
/// A standard Oak Containers host application is just a simple message
/// forwarder. This function is an implementation of a generic bi-directional
/// message forwarding strategy.
///
/// It's possible that we can move this into the Rust SDK as a host application
/// helper.
///
/// `request_stream` is the stream of requests coming into a tonic gRPC handler
/// from the client, typically the argument to the tonic handler method.
///
/// `upstream_starter` A function that initiates a streaming connection to an
/// enclave application that exposes an oak streaming session protocol endpoint.
/// The method receives an `rx` channel that's created internally by the
/// forward_stream method, and will feed client requests to the enclave app.
async fn forward_stream<Fut, E: std::fmt::Display>(
request_stream: tonic::Streaming<RequestWrapper>,
upstream_starter: impl FnOnce(mpsc::Receiver<RequestWrapper>) -> Fut,
) -> Result<impl Stream<Item = Result<ResponseWrapper, tonic::Status>>, tonic::Status>
where
Fut: Future<Output = Result<tonic::Response<tonic::Streaming<ResponseWrapper>>, tonic::Status>>,
Fut: Future<Output = Result<tonic::Response<tonic::Streaming<ResponseWrapper>>, E>>,
{
let mut request_stream = request_stream;
let (mut tx, rx) = mpsc::channel(10);

let mut upstream = upstream_starter(rx).await?.into_inner();
let mut upstream = upstream_starter(rx)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
.into_inner();

Ok(async_stream::try_stream! {
loop {
Expand Down Expand Up @@ -91,19 +109,17 @@ impl HostApplication for HostApplicationImpl {

async fn session(
&self,
request: tonic::Request<tonic::Streaming<RequestWrapper>>,
client_request_stream: tonic::Request<tonic::Streaming<RequestWrapper>>,
) -> Result<tonic::Response<Self::SessionStream>, tonic::Status> {
let request_stream = request.into_inner();

// Clone the app implementation `Arc` so that we have a reference to use the in
// callback below.
let enclave_app = self.enclave_app.clone();
let enclave_response_stream_starter =
|rx| async move { enclave_app.lock().await.legacy_session(rx).await };

let response_stream = forward_stream(request_stream, |rx| async move {
let mut app = enclave_app.lock().await;
app.legacy_session(rx).await.map_err(|err| {
tonic::Status::internal(format!("Failed to start enclave app stream: {err:?}"))
})
})
.await?;
let response_stream =
forward_stream(client_request_stream.into_inner(), enclave_response_stream_starter)
.await?;

Ok(tonic::Response::new(Box::pin(response_stream) as Self::SessionStream))
}
Expand Down

0 comments on commit fed6b05

Please sign in to comment.