Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion orion-configuration/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ impl Config {
let runtime = self.runtime.update_from_env_and_options(opt);
let max_cpus = num_cpus::get();
if runtime.num_cpus() > max_cpus {
tracing::warn!(max_cpus, ORION_GATEWAY_CORES = runtime.num_cpus(), "Requested more cores than available CPUs");
tracing::warn!(
max_cpus,
ORION_GATEWAY_CORES = runtime.num_cpus(),
"Requested more cores than available CPUs"
);
}
if runtime.num_runtimes() > runtime.num_cpus() {
tracing::warn!(
Expand Down
1 change: 0 additions & 1 deletion orion-proxy/tests/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ fn check_config_file(file_path: &str) -> Result<(), orion_error::Error> {
with_current_dir(&d, || get_listeners_and_clusters(bootstrap).map(|_| ()))
}


#[traced_test]
#[test]
fn bootstrap_demo_static() -> Result<(), orion_error::Error> {
Expand Down
26 changes: 19 additions & 7 deletions orion-xds/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with(tracing_subscriber::fmt::layer())
.init();

let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100);
let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100);
let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
let (stream_resource_tx, _stream_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
let addr = "127.0.0.1:50051".parse()?;

let grpc_server = tokio::spawn(async move {
info!("Server started");
let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await;
let res = start_aggregate_server(addr, _delta_resources_rx, _stream_resources_rx).await;
info!("Server stopped {res:?}");
});
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
Expand All @@ -37,7 +37,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Adding cluster {cluster_id}");
let cluster_resource = resources::create_cluster_resource(&cluster);

if delta_resource_tx.send(ServerAction::Add(cluster_resource.clone())).await.is_err() {
if delta_resource_tx.send(ServerAction::Add(cluster_resource.clone())).is_err() {
break;
};
if stream_resource_tx.send(ServerAction::Add(cluster_resource.clone())).is_err() {
break;
};
tokio::time::sleep(Duration::from_secs(5)).await;
Expand All @@ -50,13 +53,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
let listener_resource = resources::create_listener_resource(&listener);
info!("Adding listener {listener_resource:?}");
if delta_resource_tx.send(ServerAction::Add(listener_resource)).await.is_err() {
if delta_resource_tx.send(ServerAction::Add(listener_resource.clone())).is_err() {
break;
};
if stream_resource_tx.send(ServerAction::Add(listener_resource)).is_err() {
break;
};
tokio::time::sleep(Duration::from_secs(15)).await;

info!("Removing cluster {cluster_id}");
if delta_resource_tx.send(ServerAction::Remove(cluster_resource)).await.is_err() {
if delta_resource_tx.send(ServerAction::Remove(cluster_resource.clone())).is_err() {
break;
};
if stream_resource_tx.send(ServerAction::Remove(cluster_resource)).is_err() {
break;
};
tokio::time::sleep(Duration::from_secs(5)).await;
Expand All @@ -69,7 +78,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
let listener_resource = resources::create_listener_resource(&listener);
info!("Removing listener {listener_resource:?}");
if delta_resource_tx.send(ServerAction::Remove(listener_resource)).await.is_err() {
if delta_resource_tx.send(ServerAction::Remove(listener_resource.clone())).is_err() {
break;
};
if stream_resource_tx.send(ServerAction::Remove(listener_resource)).is_err() {
break;
};
}
Expand Down
26 changes: 19 additions & 7 deletions orion-xds/examples/server_routes_and_loads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with(tracing_subscriber::fmt::layer())
.init();

let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100);
let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100);
let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
let (stream_resource_tx, _stream_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
let addr = "127.0.0.1:50051".parse()?;

let grpc_server = tokio::spawn(async move {
info!("Server started");
let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await;
let res = start_aggregate_server(addr, _delta_resources_rx, _stream_resources_rx).await;
info!("Server stopped {res:?}");
});
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
Expand All @@ -38,7 +38,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Adding Cluster Load Assignment for cluster {cluster_id}");
let load_assigment_resource = resources::create_load_assignment_resource(&cluster_id, &cla);

if delta_resource_tx.send(ServerAction::Add(load_assigment_resource.clone())).await.is_err() {
if delta_resource_tx.send(ServerAction::Add(load_assigment_resource.clone())).is_err() {
return;
};
if stream_resource_tx.send(ServerAction::Add(load_assigment_resource.clone())).is_err() {
return;
};
tokio::time::sleep(Duration::from_secs(5)).await;
Expand All @@ -49,20 +52,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let route_configuration_resource =
resources::create_route_configuration_resource(&route_id, &route_configuration);

if delta_resource_tx.send(ServerAction::Add(route_configuration_resource.clone())).await.is_err() {
if delta_resource_tx.send(ServerAction::Add(route_configuration_resource.clone())).is_err() {
return;
};
if stream_resource_tx.send(ServerAction::Add(route_configuration_resource.clone())).is_err() {
return;
};

tokio::time::sleep(Duration::from_secs(15)).await;

info!("Removing cluster load assignment {cluster_id}");
if delta_resource_tx.send(ServerAction::Remove(load_assigment_resource)).await.is_err() {
if delta_resource_tx.send(ServerAction::Remove(load_assigment_resource.clone())).is_err() {
return;
};
if stream_resource_tx.send(ServerAction::Remove(load_assigment_resource)).is_err() {
return;
};
tokio::time::sleep(Duration::from_secs(5)).await;

info!("Removing route configuration {route_id}");
if delta_resource_tx.send(ServerAction::Remove(route_configuration_resource)).await.is_err() {
if delta_resource_tx.send(ServerAction::Remove(route_configuration_resource.clone())).is_err() {
return;
};
if stream_resource_tx.send(ServerAction::Remove(route_configuration_resource)).is_err() {
return;
};
tokio::time::sleep(Duration::from_secs(5)).await;
Expand Down
10 changes: 5 additions & 5 deletions orion-xds/examples/server_secret_rotation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with(tracing_subscriber::fmt::layer())
.init();

let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100);
let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100);
let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
let (_, _stream_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
let addr = "127.0.0.1:50051".parse()?;

let grpc_server = tokio::spawn(async move {
info!("Server started");
let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await;
let res = start_aggregate_server(addr, _delta_resources_rx, _stream_resources_rx).await;
info!("Server stopped {res:?}");
});
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
Expand Down Expand Up @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Adding downstream secret {secret_id}");
let secret_resource = resources::create_secret_resource(secret_id, &secret);

if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).await.is_err() {
if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).is_err() {
return;
};

Expand All @@ -77,7 +77,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Adding upstream secret {secret_id}");
let secret_resource = resources::create_secret_resource(secret_id, &secret);

if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).await.is_err() {
if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).is_err() {
return;
};

Expand Down
24 changes: 16 additions & 8 deletions orion-xds/examples/server_secret_rotation_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use orion_data_plane_api::envoy_data_plane_api::envoy::{
config::core::v3::{data_source::Specifier, DataSource},
extensions::transport_sockets::tls::v3::{secret, CertificateValidationContext},
};
use orion_xds::xds::{resources, server::start_aggregate_server};
use orion_xds::xds::{
resources,
server::{start_aggregate_server, ServerAction},
};
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -15,18 +18,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with(tracing_subscriber::fmt::layer())
.init();

let (_, delta_resources_rx) = tokio::sync::mpsc::channel(100);
let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100);
let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
let (_, _stream_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
let addr = "127.0.0.1:50051".parse()?;

let grpc_server = tokio::spawn(async move {
info!("Server started");
let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await;
let res = start_aggregate_server(addr, _delta_resources_rx, _stream_resources_rx).await;
info!("Server stopped {res:?}");
});
tokio::time::sleep(std::time::Duration::from_secs(10)).await;

let var_name = async move {
let delta_resource_tx_clone = delta_resource_tx.clone();

let _xds_resource_producer = tokio::spawn(async move {
// the secret name needs to match ../orion-proxy/conf/orion-bootstap-sds-simple.yaml
// we are trying to change secret beefcake_ca to point to a different cert store
// initially the proxy should return 502 error as it can't set up tls to upstream
Expand All @@ -49,9 +54,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let secret_type = secret::Type::ValidationContext(validation_context);
let secret = resources::create_secret(secret_id, secret_type);
info!("Adding upstream secret {secret_id}");
let _secret_resource = resources::create_secret_resource(secret_id, &secret);
};
let _xds_resource_producer = tokio::spawn(var_name);
let secret_resource = resources::create_secret_resource(secret_id, &secret);

if delta_resource_tx_clone.send(ServerAction::Add(secret_resource)).is_err() {
info!("Failed to send secret resource");
}
});

let _ = grpc_server.into_future().await;
Ok(())
Expand Down
25 changes: 12 additions & 13 deletions orion-xds/src/xds/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@

use std::{net::SocketAddr, pin::Pin};

use atomic_take::AtomicTake;
use orion_data_plane_api::envoy_data_plane_api::envoy::service::discovery::v3::{
aggregated_discovery_service_server::{AggregatedDiscoveryService, AggregatedDiscoveryServiceServer},
DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse, Resource, ResourceName,
};
use orion_data_plane_api::envoy_data_plane_api::tonic::{
self, transport::Server, IntoStreamingRequest, Response, Status,
};
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::{
broadcast::Receiver,
mpsc::{self},
};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::info;

use crate::xds::{self, model::XdsError};

#[derive(Debug, Clone)]
pub enum ServerAction {
Add(Resource),
Remove(Resource),
Expand All @@ -42,16 +45,13 @@ pub enum ServerAction {
pub type ResourceAction = ServerAction;
#[derive(Debug)]
pub struct AggregateServer {
delta_resources_rx: AtomicTake<Receiver<ServerAction>>,
stream_resources_rx: AtomicTake<Receiver<ServerAction>>,
delta_resources_rx: Receiver<ServerAction>,
stream_resources_rx: Receiver<ServerAction>,
}

impl AggregateServer {
pub fn new(delta_resources_rx: Receiver<ServerAction>, stream_resources_rx: Receiver<ServerAction>) -> Self {
Self {
delta_resources_rx: AtomicTake::new(delta_resources_rx),
stream_resources_rx: AtomicTake::new(stream_resources_rx),
}
Self { delta_resources_rx, stream_resources_rx }
}
}

Expand All @@ -70,10 +70,9 @@ impl AggregatedDiscoveryService for AggregateServer {
info!("\tclient connected from: {:?}", req.remote_addr());

let (tx, rx) = mpsc::channel(128);
let mut resources_rx =
self.stream_resources_rx.take().ok_or(Status::internal("Resource stream is unavailable"))?;
let mut resources_rx = self.stream_resources_rx.resubscribe();
tokio::spawn(async move {
while let Some(action) = resources_rx.recv().await {
while let Ok(action) = resources_rx.recv().await {
let item = match action {
xds::server::ServerAction::Add(resource) => {
let Some(resource) = resource.resource else {
Expand Down Expand Up @@ -136,9 +135,9 @@ impl AggregatedDiscoveryService for AggregateServer {
// spawn and channel are required if you want handle "disconnect" functionality
// the `out_stream` will not be polled after client disconnect
let (tx, rx) = mpsc::channel(128);
let mut resources_rx = self.delta_resources_rx.take().ok_or(Status::internal("Delta stream is unavailable"))?;
let mut resources_rx = self.delta_resources_rx.resubscribe();
tokio::spawn(async move {
while let Some(action) = resources_rx.recv().await {
while let Ok(action) = resources_rx.recv().await {
let item = match action {
xds::server::ServerAction::Add(r) => {
let Some(ref resource) = r.resource else {
Expand Down
Loading