diff --git a/orion-configuration/src/config.rs b/orion-configuration/src/config.rs index 572af746..05ed45b8 100644 --- a/orion-configuration/src/config.rs +++ b/orion-configuration/src/config.rs @@ -57,7 +57,11 @@ impl Config { let runtime = self.runtime.update_from_env_and_options(opt); let max_cpus = num_cpus::get(); if runtime.num_cpus() > max_cpus { - tracing::warn!(max_cpus, ORION_GATEWAY_CORES = runtime.num_cpus(), "Requested more cores than available CPUs"); + tracing::warn!( + max_cpus, + ORION_GATEWAY_CORES = runtime.num_cpus(), + "Requested more cores than available CPUs" + ); } if runtime.num_runtimes() > runtime.num_cpus() { tracing::warn!( diff --git a/orion-proxy/tests/configs.rs b/orion-proxy/tests/configs.rs index 57600996..49869309 100644 --- a/orion-proxy/tests/configs.rs +++ b/orion-proxy/tests/configs.rs @@ -32,7 +32,6 @@ fn check_config_file(file_path: &str) -> Result<(), orion_error::Error> { with_current_dir(&d, || get_listeners_and_clusters(bootstrap).map(|_| ())) } - #[traced_test] #[test] fn bootstrap_demo_static() -> Result<(), orion_error::Error> { diff --git a/orion-xds/examples/server.rs b/orion-xds/examples/server.rs index a898a3a4..a4361770 100644 --- a/orion-xds/examples/server.rs +++ b/orion-xds/examples/server.rs @@ -11,13 +11,13 @@ async fn main() -> Result<(), Box> { .with(tracing_subscriber::fmt::layer()) .init(); - let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100); - let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100); + let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::(100); + let (stream_resource_tx, _stream_resources_rx) = tokio::sync::broadcast::channel::(100); let addr = "127.0.0.1:50051".parse()?; let grpc_server = tokio::spawn(async move { info!("Server started"); - let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await; + let res = start_aggregate_server(addr, _delta_resources_rx, _stream_resources_rx).await; info!("Server stopped {res:?}"); }); tokio::time::sleep(std::time::Duration::from_secs(10)).await; @@ -37,7 +37,10 @@ async fn main() -> Result<(), Box> { info!("Adding cluster {cluster_id}"); let cluster_resource = resources::create_cluster_resource(&cluster); - if delta_resource_tx.send(ServerAction::Add(cluster_resource.clone())).await.is_err() { + if delta_resource_tx.send(ServerAction::Add(cluster_resource.clone())).is_err() { + break; + }; + if stream_resource_tx.send(ServerAction::Add(cluster_resource.clone())).is_err() { break; }; tokio::time::sleep(Duration::from_secs(5)).await; @@ -50,13 +53,19 @@ async fn main() -> Result<(), Box> { ); let listener_resource = resources::create_listener_resource(&listener); info!("Adding listener {listener_resource:?}"); - if delta_resource_tx.send(ServerAction::Add(listener_resource)).await.is_err() { + if delta_resource_tx.send(ServerAction::Add(listener_resource.clone())).is_err() { + break; + }; + if stream_resource_tx.send(ServerAction::Add(listener_resource)).is_err() { break; }; tokio::time::sleep(Duration::from_secs(15)).await; info!("Removing cluster {cluster_id}"); - if delta_resource_tx.send(ServerAction::Remove(cluster_resource)).await.is_err() { + if delta_resource_tx.send(ServerAction::Remove(cluster_resource.clone())).is_err() { + break; + }; + if stream_resource_tx.send(ServerAction::Remove(cluster_resource)).is_err() { break; }; tokio::time::sleep(Duration::from_secs(5)).await; @@ -69,7 +78,10 @@ async fn main() -> Result<(), Box> { ); let listener_resource = resources::create_listener_resource(&listener); info!("Removing listener {listener_resource:?}"); - if delta_resource_tx.send(ServerAction::Remove(listener_resource)).await.is_err() { + if delta_resource_tx.send(ServerAction::Remove(listener_resource.clone())).is_err() { + break; + }; + if stream_resource_tx.send(ServerAction::Remove(listener_resource)).is_err() { break; }; } diff --git a/orion-xds/examples/server_routes_and_loads.rs b/orion-xds/examples/server_routes_and_loads.rs index eab08b0e..79b08db4 100644 --- a/orion-xds/examples/server_routes_and_loads.rs +++ b/orion-xds/examples/server_routes_and_loads.rs @@ -14,13 +14,13 @@ async fn main() -> Result<(), Box> { .with(tracing_subscriber::fmt::layer()) .init(); - let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100); - let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100); + let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::(100); + let (stream_resource_tx, _stream_resources_rx) = tokio::sync::broadcast::channel::(100); let addr = "127.0.0.1:50051".parse()?; let grpc_server = tokio::spawn(async move { info!("Server started"); - let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await; + let res = start_aggregate_server(addr, _delta_resources_rx, _stream_resources_rx).await; info!("Server stopped {res:?}"); }); tokio::time::sleep(std::time::Duration::from_secs(10)).await; @@ -38,7 +38,10 @@ async fn main() -> Result<(), Box> { info!("Adding Cluster Load Assignment for cluster {cluster_id}"); let load_assigment_resource = resources::create_load_assignment_resource(&cluster_id, &cla); - if delta_resource_tx.send(ServerAction::Add(load_assigment_resource.clone())).await.is_err() { + if delta_resource_tx.send(ServerAction::Add(load_assigment_resource.clone())).is_err() { + return; + }; + if stream_resource_tx.send(ServerAction::Add(load_assigment_resource.clone())).is_err() { return; }; tokio::time::sleep(Duration::from_secs(5)).await; @@ -49,20 +52,29 @@ async fn main() -> Result<(), Box> { let route_configuration_resource = resources::create_route_configuration_resource(&route_id, &route_configuration); - if delta_resource_tx.send(ServerAction::Add(route_configuration_resource.clone())).await.is_err() { + if delta_resource_tx.send(ServerAction::Add(route_configuration_resource.clone())).is_err() { + return; + }; + if stream_resource_tx.send(ServerAction::Add(route_configuration_resource.clone())).is_err() { return; }; tokio::time::sleep(Duration::from_secs(15)).await; info!("Removing cluster load assignment {cluster_id}"); - if delta_resource_tx.send(ServerAction::Remove(load_assigment_resource)).await.is_err() { + if delta_resource_tx.send(ServerAction::Remove(load_assigment_resource.clone())).is_err() { + return; + }; + if stream_resource_tx.send(ServerAction::Remove(load_assigment_resource)).is_err() { return; }; tokio::time::sleep(Duration::from_secs(5)).await; info!("Removing route configuration {route_id}"); - if delta_resource_tx.send(ServerAction::Remove(route_configuration_resource)).await.is_err() { + if delta_resource_tx.send(ServerAction::Remove(route_configuration_resource.clone())).is_err() { + return; + }; + if stream_resource_tx.send(ServerAction::Remove(route_configuration_resource)).is_err() { return; }; tokio::time::sleep(Duration::from_secs(5)).await; diff --git a/orion-xds/examples/server_secret_rotation.rs b/orion-xds/examples/server_secret_rotation.rs index 5f100f52..2d028e1e 100644 --- a/orion-xds/examples/server_secret_rotation.rs +++ b/orion-xds/examples/server_secret_rotation.rs @@ -18,13 +18,13 @@ async fn main() -> Result<(), Box> { .with(tracing_subscriber::fmt::layer()) .init(); - let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100); - let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100); + let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::(100); + let (_, _stream_resources_rx) = tokio::sync::broadcast::channel::(100); let addr = "127.0.0.1:50051".parse()?; let grpc_server = tokio::spawn(async move { info!("Server started"); - let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await; + let res = start_aggregate_server(addr, _delta_resources_rx, _stream_resources_rx).await; info!("Server stopped {res:?}"); }); tokio::time::sleep(std::time::Duration::from_secs(10)).await; @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { info!("Adding downstream secret {secret_id}"); let secret_resource = resources::create_secret_resource(secret_id, &secret); - if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).await.is_err() { + if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).is_err() { return; }; @@ -77,7 +77,7 @@ async fn main() -> Result<(), Box> { info!("Adding upstream secret {secret_id}"); let secret_resource = resources::create_secret_resource(secret_id, &secret); - if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).await.is_err() { + if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).is_err() { return; }; diff --git a/orion-xds/examples/server_secret_rotation_simple.rs b/orion-xds/examples/server_secret_rotation_simple.rs index d4da5b5a..67ff4366 100644 --- a/orion-xds/examples/server_secret_rotation_simple.rs +++ b/orion-xds/examples/server_secret_rotation_simple.rs @@ -4,7 +4,10 @@ use orion_data_plane_api::envoy_data_plane_api::envoy::{ config::core::v3::{data_source::Specifier, DataSource}, extensions::transport_sockets::tls::v3::{secret, CertificateValidationContext}, }; -use orion_xds::xds::{resources, server::start_aggregate_server}; +use orion_xds::xds::{ + resources, + server::{start_aggregate_server, ServerAction}, +}; use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -15,18 +18,20 @@ async fn main() -> Result<(), Box> { .with(tracing_subscriber::fmt::layer()) .init(); - let (_, delta_resources_rx) = tokio::sync::mpsc::channel(100); - let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100); + let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::(100); + let (_, _stream_resources_rx) = tokio::sync::broadcast::channel::(100); let addr = "127.0.0.1:50051".parse()?; let grpc_server = tokio::spawn(async move { info!("Server started"); - let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await; + let res = start_aggregate_server(addr, _delta_resources_rx, _stream_resources_rx).await; info!("Server stopped {res:?}"); }); tokio::time::sleep(std::time::Duration::from_secs(10)).await; - let var_name = async move { + let delta_resource_tx_clone = delta_resource_tx.clone(); + + let _xds_resource_producer = tokio::spawn(async move { // the secret name needs to match ../orion-proxy/conf/orion-bootstap-sds-simple.yaml // we are trying to change secret beefcake_ca to point to a different cert store // initially the proxy should return 502 error as it can't set up tls to upstream @@ -49,9 +54,12 @@ async fn main() -> Result<(), Box> { let secret_type = secret::Type::ValidationContext(validation_context); let secret = resources::create_secret(secret_id, secret_type); info!("Adding upstream secret {secret_id}"); - let _secret_resource = resources::create_secret_resource(secret_id, &secret); - }; - let _xds_resource_producer = tokio::spawn(var_name); + let secret_resource = resources::create_secret_resource(secret_id, &secret); + + if delta_resource_tx_clone.send(ServerAction::Add(secret_resource)).is_err() { + info!("Failed to send secret resource"); + } + }); let _ = grpc_server.into_future().await; Ok(()) diff --git a/orion-xds/src/xds/server.rs b/orion-xds/src/xds/server.rs index 17c71690..fda861a2 100644 --- a/orion-xds/src/xds/server.rs +++ b/orion-xds/src/xds/server.rs @@ -20,7 +20,6 @@ use std::{net::SocketAddr, pin::Pin}; -use atomic_take::AtomicTake; use orion_data_plane_api::envoy_data_plane_api::envoy::service::discovery::v3::{ aggregated_discovery_service_server::{AggregatedDiscoveryService, AggregatedDiscoveryServiceServer}, DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse, Resource, ResourceName, @@ -28,12 +27,16 @@ use orion_data_plane_api::envoy_data_plane_api::envoy::service::discovery::v3::{ use orion_data_plane_api::envoy_data_plane_api::tonic::{ self, transport::Server, IntoStreamingRequest, Response, Status, }; -use tokio::sync::mpsc::{self, Receiver}; +use tokio::sync::{ + broadcast::Receiver, + mpsc::{self}, +}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::info; use crate::xds::{self, model::XdsError}; +#[derive(Debug, Clone)] pub enum ServerAction { Add(Resource), Remove(Resource), @@ -42,16 +45,13 @@ pub enum ServerAction { pub type ResourceAction = ServerAction; #[derive(Debug)] pub struct AggregateServer { - delta_resources_rx: AtomicTake>, - stream_resources_rx: AtomicTake>, + delta_resources_rx: Receiver, + stream_resources_rx: Receiver, } impl AggregateServer { pub fn new(delta_resources_rx: Receiver, stream_resources_rx: Receiver) -> Self { - Self { - delta_resources_rx: AtomicTake::new(delta_resources_rx), - stream_resources_rx: AtomicTake::new(stream_resources_rx), - } + Self { delta_resources_rx, stream_resources_rx } } } @@ -70,10 +70,9 @@ impl AggregatedDiscoveryService for AggregateServer { info!("\tclient connected from: {:?}", req.remote_addr()); let (tx, rx) = mpsc::channel(128); - let mut resources_rx = - self.stream_resources_rx.take().ok_or(Status::internal("Resource stream is unavailable"))?; + let mut resources_rx = self.stream_resources_rx.resubscribe(); tokio::spawn(async move { - while let Some(action) = resources_rx.recv().await { + while let Ok(action) = resources_rx.recv().await { let item = match action { xds::server::ServerAction::Add(resource) => { let Some(resource) = resource.resource else { @@ -136,9 +135,9 @@ impl AggregatedDiscoveryService for AggregateServer { // spawn and channel are required if you want handle "disconnect" functionality // the `out_stream` will not be polled after client disconnect let (tx, rx) = mpsc::channel(128); - let mut resources_rx = self.delta_resources_rx.take().ok_or(Status::internal("Delta stream is unavailable"))?; + let mut resources_rx = self.delta_resources_rx.resubscribe(); tokio::spawn(async move { - while let Some(action) = resources_rx.recv().await { + while let Ok(action) = resources_rx.recv().await { let item = match action { xds::server::ServerAction::Add(r) => { let Some(ref resource) = r.resource else {