Skip to content

Commit

Permalink
wip: custom kube config path
Browse files Browse the repository at this point in the history
  • Loading branch information
ElaBosak233 committed Jan 25, 2025
1 parent b00e490 commit 3d6b083
Show file tree
Hide file tree
Showing 20 changed files with 56 additions and 49 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ target
/logs
/dist
/queue
/cache
/cache
/data
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ sysinfo = { version = "0.33.1" }
tempfile = { version = "3.15.0" }
image = { version = "0.25.5" }
webp = { version = "0.3.0", features = ["image"] }
hex = "0.4.3"
hex = { version = "0.4.3" }
lettre = { version = "0.11" }

[profile.release]
opt-level = 3
Expand Down
3 changes: 2 additions & 1 deletion crates/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ once_cell = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
wsrx = { workspace = true }
wsrx = { workspace = true }
anyhow = { workspace = true }
26 changes: 16 additions & 10 deletions crates/cluster/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod traits;

use std::{collections::BTreeMap, process};

use std::path::Path;
use anyhow::anyhow;
use axum::extract::ws::WebSocket;
use k8s_openapi::{
api::core::v1::{
Expand All @@ -15,6 +16,7 @@ use kube::{
api::{Api, DeleteParams, ListParams, PostParams},
runtime::wait::conditions,
};
use kube::config::Kubeconfig;
use once_cell::sync::OnceCell;
use tokio_util::codec::Framed;
use tracing::{error, info};
Expand All @@ -27,17 +29,19 @@ pub fn get_k8s_client() -> K8sClient {
K8S_CLIENT.get().unwrap().clone()
}

pub async fn init() {
let result = Config::from_kubeconfig(&Default::default()).await;
pub async fn init() -> Result<(), ClusterError> {
let result = Config::from_custom_kubeconfig(Kubeconfig::read_from(
Path::new(cds_env::get_env().cluster.kube_config_path.as_str())
)?, &Default::default()).await;
if let Err(e) = result {
error!(
"Failed to create Kubernetes client from custom config: {:?}",
e
);
process::exit(1);
}
let config = result.unwrap();
let client = K8sClient::try_from(config).unwrap();
let config = result?;
let client = K8sClient::try_from(config)?;
if let Err(_) = client.apiserver_version().await {
error!("Failed to connect to Kubernetes API server.");
process::exit(1);
Expand All @@ -46,7 +50,7 @@ pub async fn init() {
info!("Kubernetes client initialized successfully.");

let namespace_api: Api<Namespace> = Api::all(get_k8s_client().clone());
let namespaces = namespace_api.list(&ListParams::default()).await.unwrap();
let namespaces = namespace_api.list(&ListParams::default()).await?;
if !namespaces.items.iter().any(|namespace| {
namespace.metadata.name == Some(cds_env::get_env().clone().cluster.namespace)
}) {
Expand All @@ -60,8 +64,10 @@ pub async fn init() {
let _ = namespace_api
.create(&PostParams::default(), &namespace)
.await;
info!("Namespace is created successfully.")
info!("Namespace is created successfully.");
}

Ok(())
}

pub async fn create(
Expand Down Expand Up @@ -151,13 +157,13 @@ pub async fn create(

let mut nats: Vec<cds_db::entity::pod::Nat> = Vec::new();

match cds_env::get_env().cluster.proxy.enabled {
match cds_env::get_env().cluster.proxy.is_enabled {
true => {
for port in env.ports {
nats.push(cds_db::entity::pod::Nat {
src: format!("{}", port),
dst: None,
proxy: cds_env::get_env().cluster.proxy.enabled,
proxy: cds_env::get_env().cluster.proxy.is_enabled,
entry: None,
});
}
Expand Down Expand Up @@ -202,7 +208,7 @@ pub async fn create(
nats.push(cds_db::entity::pod::Nat {
src: format!("{}", port.port),
dst: Some(format!("{}", node_port)),
proxy: cds_env::get_env().cluster.proxy.enabled,
proxy: cds_env::get_env().cluster.proxy.is_enabled,
entry: Some(format!(
"{}:{}",
cds_config::get_config().await.cluster.entry,
Expand Down
2 changes: 2 additions & 0 deletions crates/cluster/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ pub enum ClusterError {
KubeRuntimeWaitError(#[from] kube::runtime::wait::Error),
#[error("proxy error: {0}")]
ProxyError(#[from] wsrx::Error),
#[error(transparent)]
OtherError(#[from] anyhow::Error),
}
2 changes: 1 addition & 1 deletion crates/config/src/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod proxy;

pub mod strategy;

use sea_orm::FromJsonQueryResult;
Expand Down
7 changes: 0 additions & 7 deletions crates/config/src/cluster/proxy.rs

This file was deleted.

File renamed without changes.
1 change: 1 addition & 0 deletions crates/env/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Env {
pub namespace: String,
pub kube_config_path: String,
pub proxy: proxy::Env,
}
2 changes: 1 addition & 1 deletion crates/env/src/cluster/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Env {
pub enabled: bool,
pub is_enabled: bool,
pub traffic_capture: bool,
}
4 changes: 0 additions & 4 deletions crates/env/src/consts.rs

This file was deleted.

13 changes: 8 additions & 5 deletions crates/env/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
pub mod axum;
pub mod server;
pub mod cache;
pub mod cluster;
pub mod consts;
pub mod db;
pub mod metric;
pub mod queue;
pub mod media;
pub mod auth;

use std::{path::Path, process};

Expand All @@ -17,12 +18,14 @@ static APP_ENV: OnceCell<Env> = OnceCell::new();

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Env {
pub axum: axum::Env,
pub server: server::Env,
pub auth: auth::Env,
pub db: db::Env,
pub queue: queue::Env,
pub cache: cache::Env,
pub metric: metric::Env,
pub cluster: cluster::Env,
pub media: media::Env,
}

pub async fn init() {
Expand All @@ -36,6 +39,6 @@ pub async fn init() {
}
}

pub fn get_env() -> &'static Env {
APP_ENV.get().unwrap()
pub fn get_env() -> Env {
APP_ENV.get().unwrap().clone()
}
6 changes: 6 additions & 0 deletions crates/env/src/media/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Env {
pub path: String,
}
2 changes: 1 addition & 1 deletion crates/env/src/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Env {
pub enabled: bool,
pub is_enabled: bool,
pub namespace: String,
}
2 changes: 1 addition & 1 deletion crates/env/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
pub struct Env {
pub host: String,
pub port: u16,
pub user: String,
pub username: String,
pub password: String,
pub token: String,
pub tls: bool,
Expand Down
3 changes: 0 additions & 3 deletions crates/env/src/axum/mod.rs → crates/env/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
pub mod jwt;

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Env {
pub host: String,
pub port: u16,
pub jwt: jwt::Env,
}
10 changes: 5 additions & 5 deletions crates/media/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::traits::MediaError;

pub async fn get(path: String, filename: String) -> Result<Vec<u8>, MediaError> {
let filepath =
PathBuf::from(cds_env::consts::path::MEDIA).join(format!("{}/{}", path, filename));
PathBuf::from(cds_env::get_env().media.path).join(format!("{}/{}", path, filename));

match File::open(&filepath).await {
Ok(mut file) => {
Expand All @@ -27,7 +27,7 @@ pub async fn get(path: String, filename: String) -> Result<Vec<u8>, MediaError>
}

pub async fn scan_dir(path: String) -> Result<Vec<(String, u64)>, MediaError> {
let filepath = PathBuf::from(cds_env::consts::path::MEDIA).join(path);
let filepath = PathBuf::from(cds_env::get_env().media.path).join(path);
let mut files = Vec::new();

if metadata(&filepath).await.is_err() {
Expand All @@ -50,7 +50,7 @@ pub async fn scan_dir(path: String) -> Result<Vec<(String, u64)>, MediaError> {

pub async fn save(path: String, filename: String, data: Vec<u8>) -> Result<(), MediaError> {
let filepath =
PathBuf::from(cds_env::consts::path::MEDIA).join(format!("{}/{}", path, filename));
PathBuf::from(cds_env::get_env().media.path).join(format!("{}/{}", path, filename));
if let Some(parent) = filepath.parent() {
if metadata(parent).await.is_err() {
create_dir_all(parent).await?;
Expand All @@ -63,15 +63,15 @@ pub async fn save(path: String, filename: String, data: Vec<u8>) -> Result<(), M

pub async fn delete(path: String, filename: String) -> Result<(), MediaError> {
let filepath =
PathBuf::from(cds_env::consts::path::MEDIA).join(format!("{}/{}", path, filename));
PathBuf::from(cds_env::get_env().media.path).join(format!("{}/{}", path, filename));
if metadata(&filepath).await.is_ok() {
remove_file(&filepath).await?;
}
Ok(())
}

pub async fn delete_dir(path: String) -> Result<(), MediaError> {
let filepath = PathBuf::from(cds_env::consts::path::MEDIA).join(path);
let filepath = PathBuf::from(cds_env::get_env().media.path).join(path);
if metadata(&filepath).await.is_ok() {
remove_dir_all(&filepath).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub async fn init() {
let client = async_nats::ConnectOptions::new()
.require_tls(cds_env::get_env().queue.tls)
.user_and_password(
cds_env::get_env().queue.user.clone(),
cds_env::get_env().queue.username.clone(),
cds_env::get_env().queue.password.clone(),
)
.token(cds_env::get_env().queue.token.clone())
Expand Down
8 changes: 4 additions & 4 deletions crates/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ async fn bootstrap() {
migrator::run().await;

cds_config::init().await;
cds_cluster::init().await;
let _ = cds_cluster::init().await;
cds_web::init().await;

let addr = format!(
"{}:{}",
cds_env::get_env().axum.host,
cds_env::get_env().axum.port
cds_env::get_env().server.host,
cds_env::get_env().server.port
);
let listener = tokio::net::TcpListener::bind(&addr).await;

Expand All @@ -55,7 +55,7 @@ async fn bootstrap() {
)
.with_graceful_shutdown(shutdown_signal())
.await
.expect("Failed to start axum server");
.expect("Failed to start server server");
}

async fn shutdown_signal() {
Expand Down
6 changes: 3 additions & 3 deletions crates/web/src/util/jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ pub struct Claims {
pub exp: usize,
}

pub async fn get_jwt_config() -> cds_env::axum::jwt::Env {
if let Some(jwt) = cds_cache::get::<cds_env::axum::jwt::Env>("jwt")
pub async fn get_jwt_config() -> cds_env::auth::Env {
if let Some(jwt) = cds_cache::get::<cds_env::auth::Env>("jwt")
.await
.unwrap()
{
return jwt;
}

let mut jwt = cds_env::get_env().axum.jwt.clone();
let mut jwt = cds_env::get_env().auth.clone();
let re = Regex::new(r"\[([Uu][Uu][Ii][Dd])]").unwrap();
jwt.secret = re
.replace_all(&jwt.secret, Uuid::new_v4().simple().to_string())
Expand Down

0 comments on commit 3d6b083

Please sign in to comment.