diff --git a/crates/rproxy/src/config.rs b/crates/rproxy/src/config.rs
index 32ee416..0a116aa 100644
--- a/crates/rproxy/src/config.rs
+++ b/crates/rproxy/src/config.rs
@@ -4,7 +4,7 @@ use clap::Parser;
use thiserror::Error;
use crate::server::{
- config::{ConfigLoggingError, ConfigLogging, ConfigMetrics, ConfigMetricsError},
+ config::{ConfigLogging, ConfigLoggingError, ConfigMetrics, ConfigMetricsError},
proxy::config::{
ConfigAuthrpc,
ConfigAuthrpcError,
diff --git a/crates/rproxy/src/jrpc/jrpc_request.rs b/crates/rproxy/src/jrpc/jrpc_request.rs
index 2523e96..fba69a7 100644
--- a/crates/rproxy/src/jrpc/jrpc_request.rs
+++ b/crates/rproxy/src/jrpc/jrpc_request.rs
@@ -1,6 +1,7 @@
-use serde::Deserialize;
use std::borrow::Cow;
+use serde::Deserialize;
+
// JrpcRequestMeta -----------------------------------------------------
const JRPC_METHOD_FCUV1_WITH_PAYLOAD: Cow<'static, str> =
diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs
index 5417f83..c6766a4 100644
--- a/crates/rproxy/src/server.rs
+++ b/crates/rproxy/src/server.rs
@@ -20,7 +20,6 @@ use crate::{
server::{
metrics::Metrics,
proxy::{
- ProxyInner,
circuit_breaker::CircuitBreaker,
config::{ConfigAuthrpc, ConfigFlashblocks, ConfigRpc},
http::{ProxyHttp, ProxyHttpInnerAuthrpc, ProxyHttpInnerRpc},
@@ -102,13 +101,14 @@ impl Server {
config,
tls,
metrics,
+ "rproxy-authrpc",
canceller.clone(),
resetter,
)
.await
.inspect_err(|err| {
error!(
- proxy = ProxyHttpInnerRpc::name(),
+ proxy = "rproxy-authrpc",
error = ?err,
"Failed to start http-proxy, terminating...",
);
@@ -130,13 +130,14 @@ impl Server {
config,
tls,
metrics,
+ "rproxy-rpc",
canceller.clone(),
resetter,
)
.await
.inspect_err(|err| {
error!(
- proxy = ProxyHttpInnerRpc::name(),
+ proxy = "rproxy-rpc",
error = ?err,
"Failed to start http-proxy, terminating...",
);
@@ -158,13 +159,14 @@ impl Server {
config,
tls,
metrics,
+ "rproxy-flashblocks",
canceller.clone(),
resetter,
)
.await
.inspect_err(|err| {
error!(
- proxy = ProxyHttpInnerRpc::name(),
+ proxy = "rproxy-flashblocks",
error = ?err,
"Failed to start websocket-proxy, terminating...",
);
diff --git a/crates/rproxy/src/server/proxy.rs b/crates/rproxy/src/server/proxy.rs
index b1fb68b..436c57a 100644
--- a/crates/rproxy/src/server/proxy.rs
+++ b/crates/rproxy/src/server/proxy.rs
@@ -21,18 +21,16 @@ use crate::server::metrics::{LabelsProxy, Metrics};
// Proxy ---------------------------------------------------------------
-pub(crate) trait Proxy
-where
- P: ProxyInner,
-{
+pub(crate) trait Proxy {
fn on_connect(
metrics: Arc,
client_connections_count: Arc,
+ proxy_name: &'static str,
) -> impl Fn(&dyn Any, &mut Extensions) {
move |connection, extensions| {
{
let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1;
- let metric_labels = LabelsProxy { proxy: P::name() };
+ let metric_labels = LabelsProxy { proxy: proxy_name };
metrics.client_connections_active_count.get_or_create(&metric_labels).set(val);
metrics.client_connections_established_count.get_or_create(&metric_labels).inc();
@@ -54,20 +52,20 @@ where
let remote_addr = match stream.peer_addr() {
Ok(local_addr) => Some(local_addr.to_string()),
Err(err) => {
- warn!(proxy = P::name(), error = ?err, "Failed to get remote address");
+ warn!(proxy = proxy_name, error = ?err, "Failed to get remote address");
None
}
};
let local_addr = match stream.local_addr() {
Ok(local_addr) => Some(local_addr.to_string()),
Err(err) => {
- warn!(proxy = P::name(), error = ?err, "Failed to get remote address");
+ warn!(proxy = proxy_name, error = ?err, "Failed to get remote address");
None
}
};
debug!(
- proxy = P::name(),
+ proxy = proxy_name,
connection_id = %id,
remote_addr = remote_addr.as_ref().map_or("unknown", |v| v.as_str()),
local_addr = local_addr.as_ref().map_or("unknown", |v| v.as_str()),
@@ -76,7 +74,7 @@ where
extensions.insert(ProxyConnectionGuard::new(
id,
- P::name(),
+ proxy_name,
remote_addr,
local_addr,
&metrics,
@@ -87,12 +85,6 @@ where
}
}
-// ProxyInner ----------------------------------------------------------
-
-pub(crate) trait ProxyInner: 'static {
- fn name() -> &'static str;
-}
-
// ProxyConnectionGuard ------------------------------------------------
pub struct ProxyConnectionGuard {
diff --git a/crates/rproxy/src/server/proxy/http/authrpc.rs b/crates/rproxy/src/server/proxy/http/authrpc.rs
index 6d45ff8..87230b7 100644
--- a/crates/rproxy/src/server/proxy/http/authrpc.rs
+++ b/crates/rproxy/src/server/proxy/http/authrpc.rs
@@ -1,14 +1,11 @@
use crate::{
jrpc::{JrpcRequestMeta, JrpcRequestMetaMaybeBatch},
server::proxy::{
- ProxyInner,
config::ConfigAuthrpc,
http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttpInner},
},
};
-const PROXY_HTTP_INNER_AUTHRPC_NAME: &str = "rproxy-authrpc";
-
// ProxyHttpInnerAuthrpc -----------------------------------------------
#[derive(Clone)]
@@ -16,13 +13,6 @@ pub(crate) struct ProxyHttpInnerAuthrpc {
config: ConfigAuthrpc,
}
-impl ProxyInner for ProxyHttpInnerAuthrpc {
- #[inline]
- fn name() -> &'static str {
- PROXY_HTTP_INNER_AUTHRPC_NAME
- }
-}
-
impl ProxyHttpInner for ProxyHttpInnerAuthrpc {
fn new(config: ConfigAuthrpc) -> Self {
Self { config }
diff --git a/crates/rproxy/src/server/proxy/http/inner.rs b/crates/rproxy/src/server/proxy/http/inner.rs
index dda0bc8..1f66737 100644
--- a/crates/rproxy/src/server/proxy/http/inner.rs
+++ b/crates/rproxy/src/server/proxy/http/inner.rs
@@ -1,15 +1,11 @@
use crate::{
jrpc::JrpcRequestMetaMaybeBatch,
- server::proxy::{
- ProxyInner,
- http::{ProxiedHttpRequest, ProxiedHttpResponse, config::ConfigProxyHttp},
- },
+ server::proxy::http::{ProxiedHttpRequest, ProxiedHttpResponse, config::ConfigProxyHttp},
};
// ProxyHttpInner ------------------------------------------------------
-pub(crate) trait ProxyHttpInner:
- ProxyInner + Clone + Send + Sized + Sync + 'static
+pub(crate) trait ProxyHttpInner: Clone + Send + Sized + Sync + 'static
where
C: ConfigProxyHttp,
{
diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs
index f491d40..807d22f 100644
--- a/crates/rproxy/src/server/proxy/http/proxy.rs
+++ b/crates/rproxy/src/server/proxy/http/proxy.rs
@@ -90,7 +90,7 @@ where
fn new(shared: ProxyHttpSharedState, connections_limit: usize) -> Self {
let id = Uuid::now_v7();
- debug!(proxy = P::name(), worker_id = %id, "Creating http-proxy worker...");
+ debug!(proxy = shared.proxy_name, worker_id = %id, "Creating http-proxy worker...");
let config = shared.config();
let inner = shared.inner();
@@ -99,6 +99,7 @@ where
inner.clone(),
id,
shared.metrics.clone(),
+ shared.proxy_name,
config.backend_url(),
connections_limit,
config.backend_timeout(),
@@ -113,6 +114,7 @@ where
shared.inner(),
id,
shared.metrics.clone(),
+ shared.proxy_name,
peer_url.to_owned(),
config.backend_max_concurrent_requests(),
config.backend_timeout(),
@@ -126,6 +128,7 @@ where
worker_id: id,
inner: inner.clone(),
metrics: shared.metrics.clone(),
+ proxy_name: shared.proxy_name,
mirroring_peers: peers.clone(),
mirroring_peer_round_robin_index: AtomicUsize::new(0),
}
@@ -138,6 +141,7 @@ where
config: C,
tls: ConfigTls,
metrics: Arc,
+ proxy_name: &'static str,
canceller: tokio_util::sync::CancellationToken,
resetter: broadcast::Sender<()>,
) -> Result<(), Box> {
@@ -147,7 +151,7 @@ where
Ok(listener) => listener,
Err(err) => {
error!(
- proxy = P::name(),
+ proxy = proxy_name,
addr = %config.listen_address(),
error = ?err,
"Failed to initialise a socket"
@@ -171,11 +175,11 @@ where
);
}
- let shared = ProxyHttpSharedState::::new(config, &metrics);
+ let shared = ProxyHttpSharedState::::new(config, &metrics, proxy_name);
let client_connections_count = shared.client_connections_count.clone();
info!(
- proxy = P::name(),
+ proxy = proxy_name,
listen_address = %listen_address,
workers_count = workers_count,
max_concurrent_requests_per_worker = max_concurrent_requests_per_worker,
@@ -191,7 +195,7 @@ where
.wrap(NormalizePath::new(TrailingSlash::Trim))
.default_service(web::route().to(Self::receive))
})
- .on_connect(Self::on_connect(metrics, client_connections_count))
+ .on_connect(Self::on_connect(metrics.clone(), client_connections_count, proxy_name))
.shutdown_signal(canceller.cancelled_owned())
.workers(workers_count);
@@ -212,7 +216,7 @@ where
Ok(server) => server,
Err(err) => {
error!(
- proxy = P::name(),
+ proxy = proxy_name,
error = ?err,
"Failed to initialise http-proxy",
);
@@ -225,16 +229,16 @@ where
let mut resetter = resetter.subscribe();
tokio::spawn(async move {
if resetter.recv().await.is_ok() {
- info!(proxy = P::name(), "Reset signal received, stopping http-proxy...");
+ info!(proxy = proxy_name, "Reset signal received, stopping http-proxy...");
handler.stop(true).await;
}
});
if let Err(err) = server.await {
- error!(proxy = P::name(), error = ?err, "Failure while running http-proxy")
+ error!(proxy = proxy_name, error = ?err, "Failure while running http-proxy")
}
- info!(proxy = P::name(), "Stopped http-proxy");
+ info!(proxy = proxy_name, "Stopped http-proxy");
Ok(())
}
@@ -307,7 +311,7 @@ where
.metrics
.client_info
.get_or_create(&LabelsProxyClientInfo {
- proxy: P::name(),
+ proxy: this.shared.proxy_name,
user_agent: user_agent.to_string(),
})
.inc();
@@ -325,7 +329,7 @@ where
Ok(res) => res,
Err(err) => {
warn!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
request_id = %id,
connection_id = %connection_id,
worker_id = %this.id,
@@ -336,7 +340,7 @@ where
this.shared
.metrics
.http_proxy_failure_count
- .get_or_create(&LabelsProxy { proxy: P::name() })
+ .get_or_create(&LabelsProxy { proxy: this.shared.proxy_name })
.inc();
return Ok(HttpResponse::BadGateway().body(format!("Backend error: {:?}", err)));
}
@@ -364,7 +368,7 @@ where
if self.requests.insert_sync(id, req).is_err() {
error!(
- proxy = P::name(),
+ proxy = self.shared.proxy_name,
request_id = %id,
connection_id = %connection_id,
worker_id = %self.id,
@@ -378,7 +382,7 @@ where
Some((_, req)) => req,
None => {
error!(
- proxy = P::name(),
+ proxy = self.shared.proxy_name,
request_id = %bck_res.info.id,
worker_id = %self.id,
"Proxied http response for unmatching request",
@@ -392,12 +396,14 @@ where
self.postprocessor.do_send(ProxiedHttpCombo { req: cli_req, res: bck_res });
}
+ #[expect(clippy::too_many_arguments)]
fn finalise_proxying(
mut cli_req: ProxiedHttpRequest,
mut bck_res: ProxiedHttpResponse,
inner: Arc,
worker_id: Uuid,
metrics: Arc,
+ proxy_name: &'static str,
mirroring_peers: Arc>>>,
mut mirroring_peer_round_robin_index: usize,
) {
@@ -438,15 +444,22 @@ where
&cli_req,
&bck_res,
inner.clone(),
+ proxy_name,
worker_id,
);
- Self::emit_metrics_on_proxy_success(&jrpc, &cli_req, &bck_res, metrics.clone());
+ Self::emit_metrics_on_proxy_success(
+ &jrpc,
+ &cli_req,
+ &bck_res,
+ metrics.clone(),
+ proxy_name,
+ );
}
Err(err) => {
warn!(
- proxy = P::name(),
+ proxy = proxy_name,
request_id = %cli_req.info.id,
connection_id = %cli_req.info.connection_id,
worker_id = %worker_id,
@@ -462,6 +475,7 @@ where
mut mrr_res: ProxiedHttpResponse,
inner: Arc,
metrics: Arc,
+ proxy_name: &'static str,
worker_id: Uuid,
) {
if cli_req.decompressed_size < cli_req.size {
@@ -474,12 +488,12 @@ where
decompress(mrr_res.body.clone(), mrr_res.size, mrr_res.info.content_encoding());
}
- Self::maybe_log_mirrored_request(&cli_req, &mrr_res, worker_id, inner.config());
+ Self::maybe_log_mirrored_request(&cli_req, &mrr_res, proxy_name, worker_id, inner.config());
metrics
.http_mirror_success_count
.get_or_create(&LabelsProxyHttpJrpc {
- proxy: P::name(),
+ proxy: proxy_name,
jrpc_method: cli_req.info.jrpc_method_enriched,
})
.inc();
@@ -490,6 +504,7 @@ where
req: &ProxiedHttpRequest,
res: &ProxiedHttpResponse,
inner: Arc,
+ proxy_name: &'static str,
worker_id: Uuid,
) {
let config = inner.config();
@@ -513,7 +528,7 @@ where
};
info!(
- proxy = P::name(),
+ proxy = proxy_name,
request_id = %req.info.id,
connection_id = %req.info.connection_id,
worker_id = %worker_id,
@@ -532,6 +547,7 @@ where
fn maybe_log_mirrored_request(
req: &ProxiedHttpRequest,
res: &ProxiedHttpResponse,
+ proxy_name: &'static str,
worker_id: Uuid,
config: &C,
) {
@@ -554,7 +570,7 @@ where
};
info!(
- proxy = P::name(),
+ proxy = proxy_name,
request_id = %req.info.id,
connection_id = %req.info.connection_id,
worker_id = %worker_id,
@@ -723,14 +739,15 @@ where
req: &ProxiedHttpRequest,
res: &ProxiedHttpResponse,
metrics: Arc,
+ proxy_name: &'static str,
) {
let metric_labels_jrpc = match jrpc {
JrpcRequestMetaMaybeBatch::Single(jrpc) => {
- LabelsProxyHttpJrpc { jrpc_method: jrpc.method_enriched(), proxy: P::name() }
+ LabelsProxyHttpJrpc { jrpc_method: jrpc.method_enriched(), proxy: proxy_name }
}
JrpcRequestMetaMaybeBatch::Batch(_) => {
- LabelsProxyHttpJrpc { jrpc_method: Cow::Borrowed("batch"), proxy: P::name() }
+ LabelsProxyHttpJrpc { jrpc_method: Cow::Borrowed("batch"), proxy: proxy_name }
}
};
@@ -765,7 +782,7 @@ where
for jrpc in batch.iter() {
let metric_labels_jrpc = LabelsProxyHttpJrpc {
jrpc_method: jrpc.method_enriched(),
- proxy: P::name(),
+ proxy: proxy_name,
};
metrics.http_proxy_success_count.get_or_create(&metric_labels_jrpc).inc();
}
@@ -792,7 +809,7 @@ where
}
}
-impl Proxy for ProxyHttp
+impl Proxy for ProxyHttp
where
C: ConfigProxyHttp,
P: ProxyHttpInner,
@@ -806,7 +823,7 @@ where
{
fn drop(&mut self) {
debug!(
- proxy = P::name(),
+ proxy = self.shared.proxy_name,
worker_id = %self.id,
"Destroying http-proxy worker...",
);
@@ -823,6 +840,7 @@ where
{
inner: Arc,
metrics: Arc,
+ proxy_name: &'static str,
client_connections_count: Arc,
@@ -834,10 +852,11 @@ where
C: ConfigProxyHttp,
P: ProxyHttpInner,
{
- fn new(config: C, metrics: &Arc) -> Self {
+ fn new(config: C, metrics: &Arc, proxy_name: &'static str) -> Self {
Self {
inner: Arc::new(P::new(config)),
metrics: metrics.clone(),
+ proxy_name,
client_connections_count: Arc::new(AtomicI64::new(0)),
_config: PhantomData,
}
@@ -864,6 +883,7 @@ where
inner: Arc,
worker_id: Uuid,
metrics: Arc,
+ proxy_name: &'static str,
/// mirroring_peers is the vector of endpoints for mirroring peers.
mirroring_peers: Arc>>>,
@@ -896,6 +916,7 @@ where
fn handle(&mut self, msg: ProxiedHttpCombo, ctx: &mut Self::Context) -> Self::Result {
let inner = self.inner.clone();
let metrics = self.metrics.clone();
+ let proxy_name = self.proxy_name;
let worker_id = self.worker_id;
let mirroring_peers = self.mirroring_peers.clone();
let mut mirroring_peer_round_robin_index =
@@ -909,6 +930,7 @@ where
inner,
worker_id,
metrics,
+ proxy_name,
mirroring_peers,
mirroring_peer_round_robin_index,
);
@@ -935,6 +957,7 @@ where
inner: Arc,
worker_id: Uuid,
metrics: Arc,
+ proxy_name: &'static str,
client: Client,
url: Url,
@@ -951,6 +974,7 @@ where
inner: Arc,
worker_id: Uuid,
metrics: Arc,
+ proxy_name: &'static str,
url: Url,
connections_limit: usize,
timeout: std::time::Duration,
@@ -966,7 +990,7 @@ where
.timeout(timeout)
.finish();
- Self { inner, worker_id, metrics, client, url, _config: PhantomData }
+ Self { inner, worker_id, metrics, proxy_name, client, url, _config: PhantomData }
}
fn new_backend_request(&self, info: &ProxyHttpRequestInfo) -> ClientRequest {
@@ -1008,6 +1032,7 @@ where
let inner = self.inner.clone();
let worker_id = self.worker_id;
let metrics = self.metrics.clone();
+ let proxy_name = self.proxy_name;
let mrr_req = self.new_backend_request(&cli_req.info);
let mrr_req_body = cli_req.body.clone();
@@ -1040,12 +1065,12 @@ where
end,
};
ProxyHttp::::postprocess_mirrored_response(
- cli_req, mrr_res, inner, metrics, worker_id,
+ cli_req, mrr_res, inner, metrics, proxy_name, worker_id,
);
}
Err(err) => {
warn!(
- proxy = P::name(),
+ proxy = proxy_name,
request_id = %cli_req.info.id,
connection_id = %cli_req.info.connection_id,
error = ?err,
@@ -1053,7 +1078,7 @@ where
);
metrics
.http_mirror_failure_count
- .get_or_create(&LabelsProxy { proxy: P::name() })
+ .get_or_create(&LabelsProxy { proxy: proxy_name })
.inc();
}
};
@@ -1061,7 +1086,7 @@ where
Err(err) => {
warn!(
- proxy = P::name(),
+ proxy = proxy_name,
request_id = %cli_req.info.id,
connection_id = %cli_req.info.connection_id,
error = ?err,
@@ -1069,7 +1094,7 @@ where
);
metrics
.http_mirror_failure_count
- .get_or_create(&LabelsProxy { proxy: P::name() })
+ .get_or_create(&LabelsProxy { proxy: proxy_name })
.inc();
}
}
@@ -1287,7 +1312,7 @@ where
Poll::Ready(Some(Err(err))) => {
if let Some(info) = mem::take(this.info) {
warn!(
- proxy = P::name(),
+ proxy = this.proxy.shared.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
error = ?err,
@@ -1295,7 +1320,7 @@ where
);
} else {
warn!(
- proxy = P::name(),
+ proxy = this.proxy.shared.proxy_name,
error = ?err,
request_id = "unknown",
"Proxy http request stream error",
@@ -1385,14 +1410,14 @@ where
Poll::Ready(Some(Err(err))) => {
if let Some(info) = mem::take(this.info) {
warn!(
- proxy = P::name(),
+ proxy = this.proxy.shared.proxy_name,
request_id = %info.id(),
error = ?err,
"Proxy http response stream error",
);
} else {
warn!(
- proxy = P::name(),
+ proxy = this.proxy.shared.proxy_name,
error = ?err,
request_id = "unknown",
"Proxy http response stream error",
diff --git a/crates/rproxy/src/server/proxy/http/rpc.rs b/crates/rproxy/src/server/proxy/http/rpc.rs
index fb80257..59eb2e5 100644
--- a/crates/rproxy/src/server/proxy/http/rpc.rs
+++ b/crates/rproxy/src/server/proxy/http/rpc.rs
@@ -3,14 +3,11 @@ use tracing::warn;
use crate::{
jrpc::{JrpcError, JrpcRequestMeta, JrpcRequestMetaMaybeBatch, JrpcResponseMeta},
server::proxy::{
- ProxyInner,
config::ConfigRpc,
http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttpInner},
},
};
-const PROXY_HTTP_INNER_RPC_NAME: &str = "rproxy-rpc";
-
// ProxyHttpInnerRpc ---------------------------------------------------
#[derive(Clone)]
@@ -18,13 +15,6 @@ pub(crate) struct ProxyHttpInnerRpc {
config: ConfigRpc,
}
-impl ProxyInner for ProxyHttpInnerRpc {
- #[inline]
- fn name() -> &'static str {
- PROXY_HTTP_INNER_RPC_NAME
- }
-}
-
impl ProxyHttpInner for ProxyHttpInnerRpc {
fn new(config: ConfigRpc) -> Self {
Self { config }
@@ -54,20 +44,20 @@ impl ProxyHttpInner for ProxyHttpInnerRpc {
match jrpc_req {
JrpcRequestMetaMaybeBatch::Single(jrpc_req_single) => {
- let jrpc_res_single = match serde_json::from_slice::(
- &http_res.decompressed_body(),
- ) {
- Ok(jrpc_response) => jrpc_response,
- Err(err) => {
- warn!(proxy = Self::name(), error = ?err, "Failed to parse json-rpc response");
+ let jrpc_res_single =
+ match serde_json::from_slice::(&http_res.decompressed_body())
+ {
+ Ok(jrpc_response) => jrpc_response,
+ Err(err) => {
+ warn!(error = ?err, "Failed to parse json-rpc response");
- return should_mirror(
- jrpc_req_single,
- &JrpcResponseMeta { error: Some(JrpcError {}) },
- self.config.mirror_errored_requests,
- );
- }
- };
+ return should_mirror(
+ jrpc_req_single,
+ &JrpcResponseMeta { error: Some(JrpcError {}) },
+ self.config.mirror_errored_requests,
+ );
+ }
+ };
should_mirror(
jrpc_req_single,
@@ -82,14 +72,13 @@ impl ProxyHttpInner for ProxyHttpInnerRpc {
) {
Ok(jrpc_response) => jrpc_response,
Err(err) => {
- warn!(proxy = Self::name(), error = ?err, "Failed to parse json-rpc response");
+ warn!(error = ?err, "Failed to parse json-rpc response");
vec![JrpcResponseMeta { error: Some(JrpcError {}) }; jrpc_req_batch.len()]
}
};
if jrpc_res_batch.len() != jrpc_req_batch.len() {
warn!(
- proxy = Self::name(),
"A response to jrpc-batch has mismatching count of objects (want: {}, got: {})",
jrpc_req_batch.len(),
jrpc_res_batch.len(),
diff --git a/crates/rproxy/src/server/proxy/ws/flashblocks.rs b/crates/rproxy/src/server/proxy/ws/flashblocks.rs
index 6af6476..bf9c51a 100644
--- a/crates/rproxy/src/server/proxy/ws/flashblocks.rs
+++ b/crates/rproxy/src/server/proxy/ws/flashblocks.rs
@@ -1,5 +1,4 @@
-use crate::server::proxy::{ProxyInner, config::ConfigFlashblocks, ws::ProxyWsInner};
-const PROXY_WS_FLASHBLOCKS_RPC_NAME: &str = "rproxy-flashblocks";
+use crate::server::proxy::{config::ConfigFlashblocks, ws::ProxyWsInner};
// ProxyWsInnerFlashblocks ---------------------------------------------
@@ -8,12 +7,6 @@ pub(crate) struct ProxyWsInnerFlashblocks {
config: ConfigFlashblocks,
}
-impl ProxyInner for ProxyWsInnerFlashblocks {
- fn name() -> &'static str {
- PROXY_WS_FLASHBLOCKS_RPC_NAME
- }
-}
-
impl ProxyWsInner for ProxyWsInnerFlashblocks {
fn new(config: ConfigFlashblocks) -> Self {
Self { config }
diff --git a/crates/rproxy/src/server/proxy/ws/inner.rs b/crates/rproxy/src/server/proxy/ws/inner.rs
index c655eea..69726a5 100644
--- a/crates/rproxy/src/server/proxy/ws/inner.rs
+++ b/crates/rproxy/src/server/proxy/ws/inner.rs
@@ -1,8 +1,8 @@
-use crate::server::proxy::{ProxyInner, ws::config::ConfigProxyWs};
+use crate::server::proxy::ws::config::ConfigProxyWs;
// ProxyWsInner --------------------------------------------------------
-pub(crate) trait ProxyWsInner: ProxyInner + Clone + Send + Sync
+pub(crate) trait ProxyWsInner: Clone + Send + Sync + 'static
where
C: ConfigProxyWs,
{
diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs
index cea8955..2abc1f5 100644
--- a/crates/rproxy/src/server/proxy/ws/proxy.rs
+++ b/crates/rproxy/src/server/proxy/ws/proxy.rs
@@ -98,11 +98,12 @@ where
let config = shared.config();
- let backend = ProxyWsBackendEndpoint::new(id, config.backend_url());
+ let backend = ProxyWsBackendEndpoint::new(id, shared.proxy_name, config.backend_url());
let postprocessor = ProxyWsPostprocessor:: {
inner: shared.inner.clone(),
metrics: shared.metrics.clone(),
+ proxy_name: shared.proxy_name,
worker_id: id,
_config: PhantomData,
}
@@ -131,6 +132,7 @@ where
config: C,
tls: ConfigTls,
metrics: Arc,
+ proxy_name: &'static str,
canceller: tokio_util::sync::CancellationToken,
resetter: broadcast::Sender<()>,
) -> Result<(), Box> {
@@ -140,7 +142,7 @@ where
Ok(listener) => listener,
Err(err) => {
error!(
- proxy = P::name(),
+ proxy = proxy_name,
addr = %config.listen_address(),
error = ?err,
"Failed to initialise a socket"
@@ -151,13 +153,13 @@ where
let workers_count = PARALLELISM.to_static();
- let shared = ProxyWsSharedState::::new(config, &metrics);
+ let shared = ProxyWsSharedState::::new(config, &metrics, proxy_name);
let client_connections_count = shared.client_connections_count.clone();
let worker_canceller = canceller.clone();
let worker_resetter = resetter.clone();
info!(
- proxy = P::name(),
+ proxy = proxy_name,
listen_address = %listen_address,
workers_count = workers_count,
"Starting websocket-proxy...",
@@ -175,7 +177,7 @@ where
.wrap(NormalizePath::new(TrailingSlash::Trim))
.default_service(web::route().to(Self::receive))
})
- .on_connect(Self::on_connect(metrics, client_connections_count))
+ .on_connect(Self::on_connect(metrics.clone(), client_connections_count, proxy_name))
.shutdown_signal(canceller.cancelled_owned())
.workers(workers_count);
@@ -195,7 +197,7 @@ where
} {
Ok(server) => server,
Err(err) => {
- error!(proxy = P::name(), error = ?err, "Failed to initialise websocket-proxy");
+ error!(proxy = proxy_name, error = ?err, "Failed to initialise websocket-proxy");
return Err(Box::new(err));
}
}
@@ -205,16 +207,16 @@ where
let mut resetter = resetter.subscribe();
tokio::spawn(async move {
if resetter.recv().await.is_ok() {
- info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy...");
+ info!(proxy = proxy_name, "Reset signal received, stopping websocket-proxy...");
handler.stop(true).await;
}
});
if let Err(err) = proxy.await {
- error!(proxy = P::name(), error = ?err, "Failure while running websocket-proxy")
+ error!(proxy = proxy_name, error = ?err, "Failure while running websocket-proxy")
}
- info!(proxy = P::name(), "Stopped websocket-proxy");
+ info!(proxy = proxy_name, "Stopped websocket-proxy");
Ok(())
}
@@ -253,7 +255,7 @@ where
Ok(res) => res,
Err(err) => {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
worker_id = %this.id,
@@ -277,7 +279,7 @@ where
) {
let bck_uri = this.backend.new_backend_uri(&info);
trace!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
worker_id = %this.id,
@@ -295,7 +297,7 @@ where
Ok(Err(err)) => {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
worker_id = %this.id,
@@ -311,7 +313,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
worker_id = %this.id,
@@ -324,7 +326,7 @@ where
Err(_) => {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
worker_id = %this.id,
@@ -339,7 +341,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
worker_id = %this.id,
@@ -365,7 +367,7 @@ where
mut bck_rx: SplitStream>>,
) {
info!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"Starting websocket pump..."
@@ -424,7 +426,7 @@ where
msg != WS_CLOSE_OK
{
debug!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
msg = %msg,
@@ -438,7 +440,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
msg = %msg,
@@ -448,7 +450,7 @@ where
}
debug!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
msg = %msg,
@@ -462,7 +464,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
msg = %msg,
@@ -472,7 +474,7 @@ where
}
} else {
debug!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"Closing client websocket session..."
@@ -485,7 +487,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -494,7 +496,7 @@ where
}
debug!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"Closing backend websocket session..."
@@ -507,7 +509,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -517,7 +519,7 @@ where
}
info!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"Stopped websocket pump"
@@ -538,7 +540,7 @@ where
if this.ping_balance_cli.load(Ordering::Relaxed) > ping_threshold {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"More than {} websocket pings sent to client didn't return, terminating the pump...", ping_threshold,
@@ -549,7 +551,7 @@ where
let cli_ping = ProxyWsPing::new(info.connection_id());
if let Err(err) = cli_tx.ping(&cli_ping.to_slice()).await {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -566,7 +568,7 @@ where
if this.ping_balance_bck.load(Ordering::Relaxed) > ping_threshold {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"More than {} websocket pings sent to backend didn't return, terminating the pump...", ping_threshold,
@@ -577,7 +579,7 @@ where
let bck_ping = ProxyWsPing::new(info.connection_id());
if let Err(err) = bck_tx.send(tungstenite::Message::Ping(bck_ping.to_bytes())).await {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -608,7 +610,7 @@ where
bck_tx.send(tungstenite::Message::Binary(bytes.clone())).await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -618,7 +620,7 @@ where
.metrics
.ws_proxy_failure_count
.get_or_create(&LabelsProxyWs {
- proxy: P::name(),
+ proxy: this.shared.proxy_name,
destination: WS_LABEL_BACKEND,
})
.inc();
@@ -645,7 +647,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -655,7 +657,7 @@ where
.metrics
.ws_proxy_failure_count
.get_or_create(&LabelsProxyWs {
- proxy: P::name(),
+ proxy: this.shared.proxy_name,
destination: WS_LABEL_BACKEND,
})
.inc();
@@ -674,7 +676,7 @@ where
actix_ws::Message::Ping(bytes) => {
if let Err(err) = cli_tx.pong(&bytes).await {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -696,7 +698,7 @@ where
.metrics
.ws_latency_client
.get_or_create(&LabelsProxyWs {
- proxy: P::name(),
+ proxy: this.shared.proxy_name,
destination: WS_LABEL_BACKEND,
})
.record(
@@ -706,7 +708,7 @@ where
return Ok(());
}
warn!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"Unexpected websocket pong received from client",
@@ -728,7 +730,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -745,7 +747,7 @@ where
Some(Err(err)) => {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -756,7 +758,7 @@ where
None => {
info!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"Client had closed websocket stream"
@@ -781,7 +783,7 @@ where
tungstenite::Message::Binary(bytes) => {
if let Err(err) = cli_tx.binary(bytes.clone()).await {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -791,7 +793,7 @@ where
.metrics
.ws_proxy_failure_count
.get_or_create(&LabelsProxyWs {
- proxy: P::name(),
+ proxy: this.shared.proxy_name,
destination: WS_LABEL_CLIENT,
})
.inc();
@@ -810,7 +812,7 @@ where
tungstenite::Message::Text(text) => {
if let Err(err) = cli_tx.text(text.clone().as_str()).await {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -820,7 +822,7 @@ where
.metrics
.ws_proxy_failure_count
.get_or_create(&LabelsProxyWs {
- proxy: P::name(),
+ proxy: this.shared.proxy_name,
destination: WS_LABEL_CLIENT,
})
.inc();
@@ -839,7 +841,7 @@ where
tungstenite::Message::Ping(bytes) => {
if let Err(err) = bck_tx.send(tungstenite::Message::Pong(bytes)).await {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -861,7 +863,7 @@ where
.metrics
.ws_latency_backend
.get_or_create(&LabelsProxyWs {
- proxy: P::name(),
+ proxy: this.shared.proxy_name,
destination: WS_LABEL_BACKEND,
})
.record(
@@ -871,7 +873,7 @@ where
return Ok(());
}
warn!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"Unexpected websocket pong received from backend",
@@ -890,7 +892,7 @@ where
.await
{
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -907,7 +909,7 @@ where
Some(Err(err)) => {
error!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
error = ?err,
@@ -918,7 +920,7 @@ where
None => {
info!(
- proxy = P::name(),
+ proxy = this.shared.proxy_name,
connection_id = %info.connection_id(),
worker_id = %this.id,
"Backend had closed websocket stream"
@@ -932,14 +934,20 @@ where
msg: ProxyWsMessage,
inner: Arc,
metrics: Arc,
+ proxy_name: &'static str,
worker_id: Uuid,
) {
- Self::maybe_log_proxied_message(&msg, inner.clone(), worker_id);
+ Self::maybe_log_proxied_message(&msg, inner.clone(), proxy_name, worker_id);
- Self::emit_metrics_on_proxy_success(&msg, metrics.clone());
+ Self::emit_metrics_on_proxy_success(&msg, metrics.clone(), proxy_name);
}
- fn maybe_log_proxied_message(msg: &ProxyWsMessage, inner: Arc, worker_id: Uuid) {
+ fn maybe_log_proxied_message(
+ msg: &ProxyWsMessage,
+ inner: Arc
,
+ proxy_name: &'static str,
+ worker_id: Uuid,
+ ) {
let config = inner.config();
match msg {
@@ -954,7 +962,7 @@ where
};
info!(
- proxy = P::name(),
+ proxy = proxy_name,
connection_id = %info.connection_id(),
worker_id = %worker_id,
remote_addr = info.remote_addr(),
@@ -976,7 +984,7 @@ where
};
info!(
- proxy = P::name(),
+ proxy = proxy_name,
connection_id = %info.connection_id(),
worker_id = %worker_id,
remote_addr = info.remote_addr(),
@@ -998,7 +1006,7 @@ where
};
info!(
- proxy = P::name(),
+ proxy = proxy_name,
connection_id = %info.connection_id(),
worker_id = %worker_id,
remote_addr = info.remote_addr(),
@@ -1020,7 +1028,7 @@ where
};
info!(
- proxy = P::name(),
+ proxy = proxy_name,
connection_id = %info.connection_id(),
worker_id = %worker_id,
remote_addr = info.remote_addr(),
@@ -1051,10 +1059,14 @@ where
message
}
- fn emit_metrics_on_proxy_success(msg: &ProxyWsMessage, metrics: Arc) {
+ fn emit_metrics_on_proxy_success(
+ msg: &ProxyWsMessage,
+ metrics: Arc,
+ proxy_name: &'static str,
+ ) {
match msg {
ProxyWsMessage::BackendToClientBinary { msg, info: _, start, end } => {
- let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLIENT };
+ let labels = LabelsProxyWs { proxy: proxy_name, destination: WS_LABEL_CLIENT };
metrics
.ws_latency_proxy
.get_or_create(&labels)
@@ -1064,7 +1076,7 @@ where
}
ProxyWsMessage::BackendToClientText { msg, info: _, start, end } => {
- let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLIENT };
+ let labels = LabelsProxyWs { proxy: proxy_name, destination: WS_LABEL_CLIENT };
metrics
.ws_latency_proxy
.get_or_create(&labels)
@@ -1074,7 +1086,7 @@ where
}
ProxyWsMessage::ClientToBackendBinary { msg, info: _, start, end } => {
- let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BACKEND };
+ let labels = LabelsProxyWs { proxy: proxy_name, destination: WS_LABEL_BACKEND };
metrics
.ws_latency_proxy
.get_or_create(&labels)
@@ -1084,7 +1096,7 @@ where
}
ProxyWsMessage::ClientToBackendText { msg, info: _, start, end } => {
- let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BACKEND };
+ let labels = LabelsProxyWs { proxy: proxy_name, destination: WS_LABEL_BACKEND };
metrics
.ws_latency_proxy
.get_or_create(&labels)
@@ -1096,7 +1108,7 @@ where
}
}
-impl Proxy for ProxyWs
+impl Proxy for ProxyWs
where
C: ConfigProxyWs,
P: ProxyWsInner,
@@ -1113,6 +1125,7 @@ where
{
inner: Arc,
metrics: Arc,
+ proxy_name: &'static str,
client_connections_count: Arc,
@@ -1124,10 +1137,11 @@ where
C: ConfigProxyWs,
P: ProxyWsInner,
{
- fn new(config: C, metrics: &Arc) -> Self {
+ fn new(config: C, metrics: &Arc, proxy_name: &'static str) -> Self {
Self {
inner: Arc::new(P::new(config)),
metrics: metrics.clone(),
+ proxy_name,
client_connections_count: Arc::new(AtomicI64::new(0)),
_config: PhantomData,
}
@@ -1147,7 +1161,7 @@ where
P: ProxyWsInner,
{
worker_id: Uuid,
-
+ proxy_name: &'static str,
url: tungstenite::http::Uri,
_config: PhantomData,
@@ -1159,8 +1173,8 @@ where
C: ConfigProxyWs,
P: ProxyWsInner,
{
- fn new(worker_id: Uuid, url: tungstenite::http::Uri) -> Self {
- Self { worker_id, url, _config: PhantomData, _inner: PhantomData }
+ fn new(worker_id: Uuid, proxy_name: &'static str, url: tungstenite::http::Uri) -> Self {
+ Self { worker_id, proxy_name, url, _config: PhantomData, _inner: PhantomData }
}
fn new_backend_uri(&self, info: &ProxyHttpRequestInfo) -> tungstenite::http::Uri {
@@ -1168,7 +1182,7 @@ where
let pq = tungstenite::http::uri::PathAndQuery::from_str(info.path_and_query())
.inspect_err(|err| {
error!(
- proxy = P::name(),
+ proxy = self.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
worker_id = %self.worker_id,
@@ -1182,7 +1196,7 @@ where
tungstenite::http::Uri::from_parts(parts)
.inspect_err(|err| {
error!(
- proxy = P::name(),
+ proxy = self.proxy_name,
request_id = %info.id(),
connection_id = %info.connection_id(),
worker_id = %self.worker_id,
@@ -1203,6 +1217,7 @@ where
inner: Arc,
worker_id: Uuid,
metrics: Arc,
+ proxy_name: &'static str,
_config: PhantomData,
}
@@ -1229,11 +1244,12 @@ where
fn handle(&mut self, msg: ProxyWsMessage, ctx: &mut Self::Context) -> Self::Result {
let inner = self.inner.clone();
let metrics = self.metrics.clone();
+ let proxy_name = self.proxy_name;
let worker_id = self.worker_id;
ctx.spawn(
async move {
- ProxyWs::::finalise_proxying(msg, inner, metrics, worker_id);
+ ProxyWs::::finalise_proxying(msg, inner, metrics, proxy_name, worker_id);
}
.into_actor(self),
);