Skip to content

Commit 61bb3da

Browse files
author
victor
committed
transport: unify request modifiers and reduce allocations
1 parent 688522a commit 61bb3da

File tree

6 files changed

+248
-242
lines changed

6 files changed

+248
-242
lines changed

tonic/src/transport/channel/endpoint.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,10 @@ impl Endpoint {
585585
pub fn get_tcp_keepalive_retries(&self) -> Option<u32> {
586586
self.tcp_keepalive_retries
587587
}
588+
589+
pub(crate) fn get_origin(&self) -> &Uri {
590+
self.origin.as_ref().unwrap_or(self.uri())
591+
}
588592
}
589593

590594
impl From<Uri> for Endpoint {

tonic/src/transport/channel/service/add_origin.rs

Lines changed: 0 additions & 69 deletions
This file was deleted.

tonic/src/transport/channel/service/connection.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
2+
use crate::transport::channel::service::Modifier;
23
use crate::{
34
body::Body,
45
transport::{channel::BoxFuture, service::GrpcTimeout, Endpoint},
@@ -25,7 +26,7 @@ pub(crate) struct Connection {
2526
}
2627

2728
impl Connection {
28-
fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
29+
fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Result<Self, crate::BoxError>
2930
where
3031
C: Service<Uri> + Send + 'static,
3132
C::Error: Into<crate::BoxError> + Send,
@@ -55,13 +56,17 @@ impl Connection {
5556
settings.max_header_list_size(val);
5657
}
5758

59+
// We shift detecting abscence of both scheme and authority here
60+
let add_origin = AddOrigin::new(endpoint.get_origin())?;
5861
let stack = ServiceBuilder::new()
5962
.layer_fn(|s| {
60-
let origin = endpoint.origin.as_ref().unwrap_or(endpoint.uri()).clone();
61-
62-
AddOrigin::new(s, origin)
63+
// The clone here is just &Uri
64+
Modifier::new(s, add_origin.clone().into_fn())
65+
})
66+
.layer_fn(|s| {
67+
let ua = UserAgent::new(endpoint.user_agent.clone());
68+
Modifier::new(s, ua.into_fn())
6369
})
64-
.layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
6570
.layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
6671
.option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
6772
.option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
@@ -72,9 +77,9 @@ impl Connection {
7277

7378
let conn = Reconnect::new(make_service, endpoint.uri().clone(), is_lazy);
7479

75-
Self {
80+
Ok(Self {
7681
inner: BoxService::new(stack.layer(conn)),
77-
}
82+
})
7883
}
7984

8085
pub(crate) async fn connect<C>(
@@ -87,7 +92,7 @@ impl Connection {
8792
C::Future: Unpin + Send,
8893
C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
8994
{
90-
Self::new(connector, endpoint, false).ready_oneshot().await
95+
Self::new(connector, endpoint, false)?.ready_oneshot().await
9196
}
9297

9398
pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
@@ -97,7 +102,7 @@ impl Connection {
97102
C::Future: Send,
98103
C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
99104
{
100-
Self::new(connector, endpoint, true)
105+
Self::new(connector, endpoint, true).expect("Endpoint origin scheme and authority are set")
101106
}
102107
}
103108

tonic/src/transport/channel/service/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
mod add_origin;
2-
use self::add_origin::AddOrigin;
3-
4-
mod user_agent;
5-
use self::user_agent::UserAgent;
1+
mod request_modifiers;
2+
use self::request_modifiers::*;
63

74
mod reconnect;
85
use self::reconnect::Reconnect;
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use http::{header::USER_AGENT, HeaderValue, Request, Uri};
2+
use std::task::{Context, Poll};
3+
use tower_service::Service;
4+
use crate::body::Body;
5+
6+
#[derive(Debug)]
7+
pub(crate) struct Modifier<M, T> {
8+
modifier_fn: M,
9+
next: T,
10+
}
11+
12+
impl<M, T> Modifier<M, T> {
13+
pub(crate) fn new(next: T, modifier_fn: M) -> Self {
14+
Self { next, modifier_fn }
15+
}
16+
}
17+
18+
impl<M, Body, T> Service<Request<Body>> for Modifier<M, T>
19+
where
20+
T: Service<Request<Body>>,
21+
M: FnOnce(Request<Body>) -> Request<Body> + Clone,
22+
Body: Send + 'static,
23+
{
24+
type Response = T::Response;
25+
type Error = T::Error;
26+
type Future = T::Future;
27+
28+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
29+
self.next.poll_ready(cx)
30+
}
31+
32+
fn call(&mut self, req: Request<Body>) -> Self::Future {
33+
let modifier_fn = self.modifier_fn.clone();
34+
self.next.call(modifier_fn(req))
35+
}
36+
}
37+
38+
// We're borrowing to avoid cloning the Uri more than once in layer which expects Fn
39+
// and not FnOnce
40+
#[derive(Debug, Clone)]
41+
pub(crate) struct AddOrigin<'a> {
42+
origin: &'a Uri
43+
}
44+
45+
impl<'a> AddOrigin<'a> {
46+
pub(crate) fn new(origin: &'a Uri) -> Result<Self, crate::BoxError> {
47+
// We catch error right at initiation... This single line
48+
// eliminates countless heap allocations at `runtime`
49+
if origin.scheme().is_none() || origin.authority().is_none() {
50+
return Err(crate::transport::Error::new_invalid_uri().into());
51+
}
52+
53+
Ok(Self { origin })
54+
}
55+
56+
pub(crate) fn into_fn(
57+
self,
58+
) -> impl FnOnce(Request<Body>) -> Request<Body> + Clone {
59+
let http::uri::Parts {
60+
scheme, authority, ..
61+
} = self.origin.clone().into_parts();
62+
63+
// Both have been checked
64+
let scheme = scheme.unwrap();
65+
let authority = authority.unwrap();
66+
67+
move |req| {
68+
// Split the request into the head and the body.
69+
let (mut head, body) = req.into_parts();
70+
71+
// Update the request URI
72+
head.uri = {
73+
// Split the request URI into parts.
74+
let mut uri: http::uri::Parts = head.uri.into();
75+
// Update the URI parts, setting the scheme and authority
76+
uri.scheme = Some(scheme);
77+
uri.authority = Some(authority);
78+
79+
http::Uri::from_parts(uri).expect("valid uri")
80+
};
81+
82+
Request::from_parts(head, body)
83+
}
84+
}
85+
}
86+
87+
const TONIC_USER_AGENT: &str = concat!("tonic/", env!("CARGO_PKG_VERSION"));
88+
89+
#[derive(Debug)]
90+
pub(crate) struct UserAgent {
91+
user_agent: HeaderValue,
92+
}
93+
94+
impl UserAgent {
95+
pub(crate) fn new(user_agent: Option<HeaderValue>) -> Self {
96+
let user_agent = user_agent
97+
.map(|value| {
98+
let mut buf = Vec::new();
99+
buf.extend(value.as_bytes());
100+
buf.push(b' ');
101+
buf.extend(TONIC_USER_AGENT.as_bytes());
102+
HeaderValue::from_bytes(&buf).expect("user-agent should be valid")
103+
})
104+
.unwrap_or_else(|| HeaderValue::from_static(TONIC_USER_AGENT));
105+
106+
Self { user_agent }
107+
}
108+
109+
pub(crate) fn into_fn(
110+
self,
111+
) -> impl FnOnce(Request<Body>) -> Request<Body> + Clone {
112+
move |mut req| {
113+
use http::header::Entry;
114+
115+
// The former code uses try_insert so we'll respect that
116+
if let Ok(entry) = req.headers_mut().try_entry(USER_AGENT) {
117+
// This is to avoid anticipative cloning which happened
118+
// in the former code
119+
match entry {
120+
Entry::Vacant(vacant_entry) => {
121+
vacant_entry.insert(self.user_agent);
122+
}
123+
Entry::Occupied(occupied_entry) => {
124+
// The User-Agent header has already been set on the request. Let's
125+
// append our user agent to the end.
126+
let occupied_entry = occupied_entry.into_mut();
127+
128+
let mut buf =
129+
Vec::with_capacity(occupied_entry.len() + 1 + self.user_agent.len());
130+
buf.extend(occupied_entry.as_bytes());
131+
buf.push(b' ');
132+
buf.extend(self.user_agent.as_bytes());
133+
134+
// with try_into http uses from_shared internally to probably minimize
135+
// allocations
136+
*occupied_entry = buf.try_into().expect("user-agent should be valid")
137+
}
138+
}
139+
}
140+
141+
req
142+
}
143+
}
144+
}
145+
146+
#[cfg(test)]
147+
mod tests {
148+
use super::*;
149+
150+
#[test]
151+
fn sets_default_if_no_custom_user_agent() {
152+
assert_eq!(
153+
UserAgent::new(None).user_agent,
154+
HeaderValue::from_static(TONIC_USER_AGENT)
155+
)
156+
}
157+
158+
#[test]
159+
fn prepends_custom_user_agent_to_default() {
160+
assert_eq!(
161+
UserAgent::new(Some(HeaderValue::from_static("Greeter 1.1"))).user_agent,
162+
HeaderValue::from_str(&format!("Greeter 1.1 {TONIC_USER_AGENT}")).unwrap()
163+
)
164+
}
165+
166+
async fn assert_user_agent_modified(
167+
genesis_user_agent: Option<impl TryInto<HeaderValue>>,
168+
expected_user_agent: impl TryInto<HeaderValue>,
169+
request: Option<Request<Body>>,
170+
) {
171+
let ua = UserAgent::new(genesis_user_agent.map(|v| {
172+
v.try_into()
173+
.unwrap_or_else(|_| panic!("invalid header value"))
174+
}))
175+
.into_fn();
176+
177+
let modified_request = ua(request.unwrap_or_default());
178+
let user_agent = modified_request.headers().get(USER_AGENT).unwrap();
179+
assert_eq!(
180+
user_agent,
181+
expected_user_agent
182+
.try_into()
183+
.unwrap_or_else(|_| panic!("invalid header value"))
184+
);
185+
}
186+
187+
#[tokio::test]
188+
async fn sets_default_user_agent_if_none_present() {
189+
let genesis_user_agent = Option::<&str>::None;
190+
let expected_user_agent = TONIC_USER_AGENT.to_string();
191+
let request = None;
192+
193+
assert_user_agent_modified(genesis_user_agent, expected_user_agent, request).await
194+
}
195+
196+
#[tokio::test]
197+
async fn sets_custom_user_agent_if_none_present() {
198+
let genesis_user_agent = Some("Greeter 1.1");
199+
let expected_user_agent = format!("Greeter 1.1 {TONIC_USER_AGENT}");
200+
let request = None;
201+
202+
assert_user_agent_modified(genesis_user_agent, expected_user_agent, request).await
203+
}
204+
205+
#[tokio::test]
206+
async fn appends_default_user_agent_to_request_fn_user_agent() {
207+
let genesis_user_agent = Option::<&str>::None;
208+
let expected_user_agent = format!("request-ua/x.y {TONIC_USER_AGENT}");
209+
let mut request = Request::default();
210+
request
211+
.headers_mut()
212+
.insert(USER_AGENT, HeaderValue::from_static("request-ua/x.y"));
213+
214+
assert_user_agent_modified(genesis_user_agent, expected_user_agent, Some(request)).await
215+
}
216+
217+
#[tokio::test]
218+
async fn appends_custom_user_agent_to_request_fn_user_agent() {
219+
let genesis_user_agent = Some("Greeter 1.1");
220+
let expected_user_agent = format!("request-ua/x.y Greeter 1.1 {TONIC_USER_AGENT}");
221+
let mut request = Request::default();
222+
request
223+
.headers_mut()
224+
.insert(USER_AGENT, HeaderValue::from_static("request-ua/x.y"));
225+
226+
assert_user_agent_modified(genesis_user_agent, expected_user_agent, Some(request)).await
227+
}
228+
}

0 commit comments

Comments
 (0)