Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
565 changes: 199 additions & 366 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pretty_env_logger = "0.5"
runtime = { path = "crates/runtime", default-features = false }
http-service = { path = "crates/http-service" }
http-backend = { path = "crates/http-backend" }
dictionary = { path = "crates/dictionary" }
utils = { path = "crates/utils" }
secret = { path = "crates/secret" }
key-value-store = { path = "crates/key-value-store" }
hyper-tls = "0.6"
Expand Down
27 changes: 23 additions & 4 deletions crates/http-backend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
pub mod stats;

use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

Expand All @@ -17,6 +20,7 @@ use tokio::net::TcpStream;
use tower_service::Service;
use tracing::{debug, trace, warn};

use crate::stats::{ExtRequestStats, ExtStatsTimer};
use reactor::gcore::fastedge::http::Headers;
use reactor::gcore::fastedge::{
http::{Error as HttpError, Method, Request, Response},
Expand All @@ -40,20 +44,21 @@ pub struct Connection {
/// A custom Hyper client connector, which is needed to override Hyper's default behavior of
/// connecting to host specified by the request's URI; we instead want to connect to the host
/// specified by our backend configuration, regardless of what the URI says.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct FastEdgeConnector {
inner: HttpConnector,
backend: Uri,
}

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct Backend<C> {
client: Client<C, Full<Bytes>>,
uri: Uri,
propagate_headers: HeaderMap,
propagate_header_names: HeaderNameList,
max_sub_requests: usize,
pub strategy: BackendStrategy,
ext_http_stats: Option<Arc<dyn ExtRequestStats>>,
}

pub struct Builder {
Expand Down Expand Up @@ -81,7 +86,7 @@ impl Builder {
where
C: Connect + Clone,
{
let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
let client = Client::builder(TokioExecutor::new())
.set_host(false)
.pool_idle_timeout(Duration::from_secs(30))
.build(connector);
Expand All @@ -90,9 +95,10 @@ impl Builder {
client,
uri: self.uri.to_owned(),
propagate_headers: HeaderMap::new(),
propagate_header_names: self.propagate_header_names.to_owned(),
propagate_header_names: self.propagate_header_names.clone(),
max_sub_requests: self.max_sub_requests,
strategy: self.strategy,
ext_http_stats: None,
}
}
}
Expand All @@ -111,6 +117,11 @@ impl<C> Backend<C> {
self.uri.to_owned()
}

/// Set external request stats
pub fn set_ext_http_stats(&mut self, stats: Arc<dyn ExtRequestStats>) {
self.ext_http_stats.replace(stats);
}

pub fn propagate_header_names(&self) -> HeaderNameList {
self.propagate_header_names.clone()
}
Expand Down Expand Up @@ -150,6 +161,7 @@ impl<C> Backend<C> {

fn make_request(&self, req: Request) -> Result<http::Request<Full<Bytes>>> {
trace!("strategy: {:?}", self.strategy);

let builder = match self.strategy {
BackendStrategy::Direct => {
let mut headers = req.headers.into_iter().collect::<Vec<(String, String)>>();
Expand Down Expand Up @@ -282,6 +294,13 @@ where
warn!(cause=?error, "making request to backend");
HttpError::RequestError
})?;

// start external request stats timer
let _stats_timer = self
.ext_http_stats
.as_ref()
.map(|s| ExtStatsTimer::new(s.clone()));

let res = self.client.request(request).await.map_err(|error| {
warn!(cause=?error, "sending request to backend");
HttpError::RequestError
Expand Down
50 changes: 50 additions & 0 deletions crates/http-backend/src/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::sync::Arc;
use std::time::{Duration, Instant};

pub trait ExtRequestStats: Sync + Send {
/// Observe elapsed time
fn observe_ext(&self, elapsed: Duration);
}

pub struct ExtStatsTimer {
/// A stats ref for automatic recording of observations.
stats: Arc<dyn ExtRequestStats>,
/// Whether the timer has already been observed once.
observed: bool,
/// Starting instant for the timer.
start: Instant,
}

impl ExtStatsTimer {
pub fn new(stats: Arc<dyn ExtRequestStats>) -> Self {
Self {
stats,
observed: false,
start: Instant::now(),
}
}

/// Discard timer without recording duration
pub fn discard(mut self) {
self.observed = true;
}

/// Observe and record timer duration
pub fn observe_duration(mut self) {
self.observe();
}

fn observe(&mut self) {
let v = self.start.elapsed();
self.observed = true;
self.stats.observe_ext(v);
}
}

impl Drop for ExtStatsTimer {
fn drop(&mut self) {
if !self.observed {
self.observe()
}
}
}
4 changes: 1 addition & 3 deletions crates/http-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ authors.workspace = true
[features]
default = []
metrics = ["runtime/metrics"]
stats = ["runtime/stats"]

[dependencies]
anyhow = { workspace = true }
Expand All @@ -25,10 +24,9 @@ smol_str = { workspace = true }
reactor = { path = "../reactor" }
runtime = { path = "../runtime" }
http-backend = { path = "../http-backend" }
dictionary = { path = "../dictionary" }
utils = { path = "../utils" }
secret = { path = "../secret" }
nanoid = "0.4"
bytesize = { workspace = true }
async-trait = "0.1"
hyper-util = { version = "0.1", features = ["server", "server-graceful"] }
http-body-util = "0.1"
Expand Down
Loading
Loading