Skip to content

Commit

Permalink
fix: Handle port updates (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
gonzalezzfelipe authored May 8, 2024
1 parent 2170f92 commit 7bfd1ee
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 59 deletions.
99 changes: 55 additions & 44 deletions proxy/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@ use futures_util::TryStreamExt;

use operator::{
kube::{
api::ListParams,
runtime::{
watcher::{self, Config as ConfigWatcher},
WatchStreamExt,
},
runtime::watcher::{self, Config as ConfigWatcher, Event},
Api, Client, ResourceExt,
},
ScrollsPort,
};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use tokio::pin;
use tracing::error;
use tracing::{error, info};

use crate::{Consumer, State};

Expand All @@ -27,36 +23,6 @@ impl AuthBackgroundService {
pub fn new(state: Arc<State>) -> Self {
Self { state }
}

async fn update_auth(&self, api: Api<ScrollsPort>) {
let result = api.list(&ListParams::default()).await;
if let Err(err) = result {
error!(
error = err.to_string(),
"error to get crds while updating auth keys"
);
return;
}

let mut consumers = HashMap::new();
for crd in result.unwrap().items.iter() {
if crd.status.is_some() {
let network = crd.spec.network.to_string();
let version = crd.spec.version.to_string();
let tier = crd.spec.throughput_tier.to_string();
let key = crd.status.as_ref().unwrap().auth_token.clone();
let namespace = crd.metadata.namespace.as_ref().unwrap().clone();
let port_name = crd.name_any();

let consumer =
Consumer::new(namespace, port_name, tier, key.clone(), network, version);

consumers.insert(key, consumer);
}
}

*self.state.consumers.write().await = consumers;
}
}

#[async_trait]
Expand All @@ -67,19 +33,64 @@ impl BackgroundService for AuthBackgroundService {
.expect("failed to create kube client");

let api = Api::<ScrollsPort>::all(client.clone());
self.update_auth(api.clone()).await;

let stream = watcher::watcher(api.clone(), ConfigWatcher::default()).touched_objects();
let stream = watcher::watcher(api.clone(), ConfigWatcher::default());
pin!(stream);

loop {
let result = stream.try_next().await;
if let Err(err) = result {
error!(error = err.to_string(), "fail crd auth watcher");
continue;
match result {
// Stream restart, also run on startup.
Ok(Some(Event::Restarted(crds))) => {
info!("auth: Watcher restarted, reseting consumers");
let consumers: HashMap<String, Consumer> = crds
.iter()
.map(|crd| {
let consumer = Consumer::from(crd);
(consumer.key.clone(), consumer)
})
.collect();
*self.state.consumers.write().await = consumers;
self.state.limiter.write().await.clear();
}
// New port created or updated.
Ok(Some(Event::Applied(crd))) => match crd.status {
Some(_) => {
info!("auth: Updating consumer: {}", crd.name_any());
let consumer = Consumer::from(&crd);
self.state.limiter.write().await.remove(&consumer.key);
self.state
.consumers
.write()
.await
.insert(consumer.key.clone(), consumer);
}
None => {
// New ports are created without status. When the status is added, a new
// Applied event is triggered.
info!("auth: New port created: {}", crd.name_any());
}
},
// Port deleted.
Ok(Some(Event::Deleted(crd))) => {
info!(
"auth: Port deleted, removing from state: {}",
crd.name_any()
);
let consumer = Consumer::from(&crd);
self.state.consumers.write().await.remove(&consumer.key);
self.state.limiter.write().await.remove(&consumer.key);
}
// Empty response from stream. Should never happen.
Ok(None) => {
error!("auth: Empty response from watcher.");
continue;
}
// Unexpected error when streaming CRDs.
Err(err) => {
error!(error = err.to_string(), "auth: Failed to update crds.");
std::process::exit(1);
}
}

self.update_auth(api.clone()).await;
}
}
}
31 changes: 16 additions & 15 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use auth::AuthBackgroundService;
use config::Config;
use dotenv::dotenv;
use operator::{kube::ResourceExt, ScrollsPort};
use pingora::{
server::{configuration::Opt, Server},
services::background::background_service,
Expand Down Expand Up @@ -87,30 +88,30 @@ pub struct Consumer {
network: String,
version: String,
}
impl Consumer {
pub fn new(
namespace: String,
port_name: String,
tier: String,
key: String,
network: String,
version: String,
) -> Self {
impl Display for Consumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.namespace, self.port_name)
}
}
impl From<&ScrollsPort> for Consumer {
fn from(value: &ScrollsPort) -> Self {
let network = value.spec.network.to_string();
let version = value.spec.version.to_string();
let tier = value.spec.throughput_tier.to_string();
let key = value.status.as_ref().unwrap().auth_token.clone();
let namespace = value.metadata.namespace.as_ref().unwrap().clone();
let port_name = value.name_any();

Self {
namespace,
port_name,
key,
tier,
key,
network,
version,
}
}
}
impl Display for Consumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.namespace, self.port_name)
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct Tier {
Expand Down

0 comments on commit 7bfd1ee

Please sign in to comment.