Skip to content

Commit 973e2dc

Browse files
committed
tonic-web: proxy any kind of service
This allows applying the GrpcWebLayer to any kind of Service, not just ones that tonic generates. This makes it possible to use tonic-web as a grpc-web proxy to a gRPC server implemented in another language for example.
1 parent ff7b540 commit 973e2dc

File tree

6 files changed

+201
-52
lines changed

6 files changed

+201
-52
lines changed

tests/web/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ tonic-prost = { path = "../../tonic-prost" }
1818

1919
[dev-dependencies]
2020
tonic-web = { path = "../../tonic-web" }
21+
tower-layer = "0.3"
22+
tower-service = "0.3"
2123

2224
[build-dependencies]
2325
tonic-prost-build = { path = "../../tonic-prost-build" }

tests/web/tests/grpc_web.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,29 @@ async fn binary_request() {
4242
assert_eq!(&trailers[..], b"grpc-status:0\r\n");
4343
}
4444

45+
#[tokio::test]
46+
async fn binary_request_reverse_proxy() {
47+
let server_url = spawn_reverse_proxy().await;
48+
let client = Client::builder(TokioExecutor::new()).build_http();
49+
50+
let req = build_request(server_url, "grpc-web", "grpc-web");
51+
let res = client.request(req).await.unwrap();
52+
let content_type = res.headers().get(header::CONTENT_TYPE).unwrap().clone();
53+
let content_type = content_type.to_str().unwrap();
54+
55+
assert_eq!(res.status(), StatusCode::OK);
56+
assert_eq!(content_type, "application/grpc-web+proto");
57+
58+
let (message, trailers) = decode_body(res.into_body(), content_type).await;
59+
let expected = Output {
60+
id: 1,
61+
desc: "one".to_owned(),
62+
};
63+
64+
assert_eq!(message, expected);
65+
assert_eq!(&trailers[..], b"grpc-status:0\r\n");
66+
}
67+
4568
#[tokio::test]
4669
async fn text_request() {
4770
let server_url = spawn().await;
@@ -84,6 +107,73 @@ async fn spawn() -> String {
84107
url
85108
}
86109

110+
/// Spawn two servers, one serving the gRPC API and another acting as a grpc-web proxy
111+
async fn spawn_reverse_proxy() -> String {
112+
use hyper_util::rt::TokioIo;
113+
use hyper_util::client::legacy::Client;
114+
use tower_layer::Layer;
115+
use tower_service::Service;
116+
117+
// Set up gRPC service
118+
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
119+
let listener = TcpListener::bind(addr).await.expect("listener");
120+
let url = format!("http://{}", listener.local_addr().unwrap());
121+
let listener_stream = TcpListenerStream::new(listener);
122+
123+
drop(tokio::spawn(async move {
124+
Server::builder()
125+
.add_service(TestServer::new(Svc))
126+
.serve_with_incoming(listener_stream)
127+
.await
128+
.unwrap()
129+
}));
130+
131+
// Set up proxy to the above service that applies tonic-web
132+
let addr2 = SocketAddr::from(([127, 0, 0, 1], 0));
133+
let http_client = Client::builder(TokioExecutor::new())
134+
.http2_only(true)
135+
.build_http();
136+
let listener2 = TcpListener::bind(addr2).await.expect("listener");
137+
let url2 = format!("http://{}", listener2.local_addr().unwrap());
138+
139+
let backend_url = url.clone();
140+
141+
drop(tokio::spawn(async move {
142+
loop {
143+
let (stream, _) = listener2.accept().await.unwrap();
144+
let io = TokioIo::new(stream);
145+
let client = http_client.clone();
146+
let backend = backend_url.clone();
147+
148+
tokio::spawn(async move {
149+
let svc = GrpcWebLayer::new().layer(client.clone());
150+
let hyper_svc = hyper::service::service_fn(move |mut req: Request<Incoming>| {
151+
let mut svc = svc.clone();
152+
let backend = backend.clone();
153+
async move {
154+
// Rewrite URI to point to backend
155+
let path = req.uri().path_and_query().map(|pq| pq.as_str()).unwrap_or("/");
156+
let new_uri = format!("{}{}", backend, path).parse().unwrap();
157+
*req.uri_mut() = new_uri;
158+
159+
let req = req.map(Body::new);
160+
svc.call(req).await
161+
}
162+
});
163+
164+
if let Err(err) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new())
165+
.serve_connection(io, hyper_svc)
166+
.await
167+
{
168+
eprintln!("Error serving connection: {:?}", err);
169+
}
170+
});
171+
}
172+
}));
173+
174+
url2
175+
}
176+
87177
fn encode_body() -> Bytes {
88178
let input = Input {
89179
id: 1,

tonic-web/src/call.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::fmt;
21
use std::pin::Pin;
32
use std::task::{ready, Context, Poll};
43

@@ -10,6 +9,8 @@ use pin_project::pin_project;
109
use tokio_stream::Stream;
1110
use tonic::Status;
1211

12+
use crate::BoxError;
13+
1314
use self::content_types::*;
1415

1516
// A grpc header is u8 (flag) + u32 (msg len)
@@ -158,11 +159,11 @@ impl<B> GrpcWebCall<B> {
158159
}
159160
}
160161

161-
impl<B> GrpcWebCall<B>
162+
impl<B, D> GrpcWebCall<B>
162163
where
163-
B: Body,
164-
B::Data: Buf,
165-
B::Error: fmt::Display,
164+
B: Body<Data = D>,
165+
B::Error: Into<BoxError> + Send,
166+
D: Buf,
166167
{
167168
// Poll body for data, decoding (e.g. via Base64 if necessary) and returning frames
168169
// to the caller. If the caller is a client, it should look for trailers before
@@ -247,10 +248,11 @@ where
247248
}
248249
}
249250

250-
impl<B> Body for GrpcWebCall<B>
251+
impl<B, D> Body for GrpcWebCall<B>
251252
where
252-
B: Body,
253-
B::Error: fmt::Display,
253+
B: Body<Data = D>,
254+
B::Error: Into<BoxError> + Send,
255+
D: Buf,
254256
{
255257
type Data = Bytes;
256258
type Error = Status;
@@ -336,10 +338,11 @@ where
336338
}
337339
}
338340

339-
impl<B> Stream for GrpcWebCall<B>
341+
impl<B, D> Stream for GrpcWebCall<B>
340342
where
341-
B: Body,
342-
B::Error: fmt::Display,
343+
B: Body<Data = D>,
344+
B::Error: Into<BoxError> + Send,
345+
D: Buf,
343346
{
344347
type Item = Result<Frame<Bytes>, Status>;
345348

@@ -372,7 +375,8 @@ impl Encoding {
372375
}
373376
}
374377

375-
fn internal_error(e: impl std::fmt::Display) -> Status {
378+
fn internal_error(e: impl Into<BoxError> + Send) -> Status {
379+
let e = e.into();
376380
Status::internal(format!("tonic-web: {e}"))
377381
}
378382

tonic-web/src/layer.rs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,49 @@
1-
use super::GrpcWebService;
1+
use std::error::Error;
2+
3+
use tonic::body::Body;
4+
use super::{BoxError, GrpcWebService};
25

36
use tower_layer::Layer;
7+
use tower_service::Service;
48

59
/// Layer implementing the grpc-web protocol.
6-
#[derive(Debug, Default, Clone)]
7-
pub struct GrpcWebLayer {
8-
_priv: (),
10+
#[derive(Debug)]
11+
pub struct GrpcWebLayer<ResBody = Body> {
12+
_markers: std::marker::PhantomData<fn() -> ResBody>,
13+
}
14+
15+
impl<ResBody> Clone for GrpcWebLayer<ResBody> {
16+
fn clone(&self) -> Self {
17+
Self {
18+
_markers: std::marker::PhantomData,
19+
}
20+
}
921
}
1022

11-
impl GrpcWebLayer {
23+
impl<ResBody> GrpcWebLayer<ResBody> {
1224
/// Create a new grpc-web layer.
13-
pub fn new() -> GrpcWebLayer {
14-
Self::default()
25+
pub fn new() -> Self {
26+
Self {
27+
_markers: std::marker::PhantomData,
28+
}
29+
}
30+
}
31+
32+
impl<ResBody> Default for GrpcWebLayer<ResBody> {
33+
fn default() -> Self {
34+
Self::new()
1535
}
1636
}
1737

18-
impl<S> Layer<S> for GrpcWebLayer {
19-
type Service = GrpcWebService<S>;
38+
impl<S, ResBody> Layer<S> for GrpcWebLayer<ResBody>
39+
where
40+
S: Service<http::Request<Body>, Response = http::Response<ResBody>> + Send + 'static,
41+
S::Future: Send + 'static,
42+
S::Error: Into<BoxError> + Send,
43+
ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
44+
ResBody::Error: Error + Send + Sync + 'static,
45+
{
46+
type Service = GrpcWebService<S, ResBody>;
2047

2148
fn layer(&self, inner: S) -> Self::Service {
2249
GrpcWebService::new(inner)

tonic-web/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ mod client;
8080
mod layer;
8181
mod service;
8282

83-
type BoxError = Box<dyn std::error::Error + Send + Sync>;
83+
/// Alias for a type-erased error type.
84+
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
8485

8586
pub(crate) mod util {
8687
pub(crate) mod base64 {

0 commit comments

Comments
 (0)