Skip to content

Commit

Permalink
wip: get shell
Browse files Browse the repository at this point in the history
  • Loading branch information
ElaBosak233 committed Feb 20, 2025
1 parent 689d4e7 commit 23332ee
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 15 deletions.
3 changes: 2 additions & 1 deletion crates/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ anyhow = { workspace = true }
uuid = { workspace = true }
regex = { workspace = true }
futures-util = { workspace = true }
serde_json = { workspace = true }
serde_json = { workspace = true }
nanoid = { workspace = true }
112 changes: 104 additions & 8 deletions crates/cluster/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
pub mod traits;
mod util;
pub mod worker;

use std::{collections::BTreeMap, fmt::format, path::Path, process};

use axum::extract::ws::WebSocket;
use axum::extract::ws::{Message, Utf8Bytes, WebSocket};
use cds_db::get_db;
use futures_util::{
SinkExt, StreamExt, TryStreamExt,
stream::{SplitSink, SplitStream},
};
use k8s_openapi::{
api::core::v1::{
Container as K8sContainer, ContainerPort, EnvVar, Namespace, Pod, PodSpec,
Expand All @@ -15,13 +20,18 @@ use k8s_openapi::{
};
use kube::{
Client as K8sClient, Config as K8sConfig, ResourceExt,
api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams},
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, PostParams},
config::Kubeconfig,
runtime::{wait::conditions, watcher},
};
use nanoid::nanoid;
use once_cell::sync::OnceCell;
use regex::Regex;
use tokio_util::codec::Framed;
use tokio::io::{
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader,
stdin, stdout,
};
use tokio_util::codec::{BytesCodec, Framed, FramedRead};
use tracing::{error, info, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -207,8 +217,8 @@ pub async fn create_challenge_env(
user: cds_db::transfer::User, team: Option<cds_db::transfer::Team>,
game: Option<cds_db::transfer::Game>, challenge: cds_db::transfer::Challenge,
) -> Result<(), ClusterError> {
let id = Uuid::new_v4();
let name = format!("cds-{}", id.to_string());
let id = util::gen_safe_nanoid();
let name = format!("cds-{}", id);

let env = challenge.clone().env.unwrap();

Expand Down Expand Up @@ -286,7 +296,7 @@ pub async fn create_challenge_env(
metadata: metadata.clone(),
spec: Some(PodSpec {
containers: vec![K8sContainer {
name: name.clone(),
name: format!("cds-{}", util::gen_safe_nanoid()),
image: Some(env.image),
env: Some(env_vars),
ports: Some(container_ports),
Expand Down Expand Up @@ -439,8 +449,8 @@ pub async fn delete_challenge_env(id: &str) -> Result<(), ClusterError> {
Ok(())
}

pub async fn wsrx(id: Uuid, port: u16, ws: WebSocket) -> Result<(), ClusterError> {
let name = format!("cds-{}", id.to_string());
pub async fn wsrx(id: &str, port: u16, ws: WebSocket) -> Result<(), ClusterError> {
let name = format!("cds-{}", id);

let pod_api: Api<Pod> = Api::namespaced(
get_k8s_client(),
Expand All @@ -455,3 +465,89 @@ pub async fn wsrx(id: Uuid, port: u16, ws: WebSocket) -> Result<(), ClusterError
}
Ok(())
}

pub async fn exec(
id: &str, container_id: &str, command: String, ws: WebSocket,
) -> Result<(), ClusterError> {
async fn process_client_to_pod<W>(mut receiver: SplitStream<WebSocket>, mut stdin_writer: W)
where
W: AsyncWrite + Unpin + Sized, {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
if stdin_writer.write_all(text.as_bytes()).await.is_err() {
break;
}
}
Message::Close(_) => break,
_ => {}
}
}
let _ = stdin_writer.shutdown().await;
}

async fn process_pod_to_client<R, S>(stdout_reader: R, mut sender: S)
where
R: AsyncRead + Unpin,
S: SinkExt<Message> + Unpin, {
let mut reader = FramedRead::new(stdout_reader, BytesCodec::new());
while let Some(result) = reader.next().await {
match result {
Ok(bytes) => {
if let Ok(text) = String::from_utf8(bytes.to_vec()) {
if sender
.send(Message::Text(Utf8Bytes::from(text)))
.await
.is_err()
{
break;
}
}
}
Err(_) => break
}
}
let _ = sender.close().await;
}

let (sender, receiver) = ws.split();
let name = format!("cds-{}", id.to_string());

let pod_api: Api<Pod> = Api::namespaced(
get_k8s_client(),
cds_config::get_config().cluster.namespace.as_str(),
);

let attach_params = AttachParams {
container: Some(format!("cds-{}", container_id)),
stdin: true,
stdout: true,
stderr: false,
tty: true,
..Default::default()
};

let mut attached = pod_api.exec(&name, vec![command], &attach_params).await?;

let stdin_writer = attached.stdin().unwrap();
let stdout_reader = BufReader::new(attached.stdout().unwrap());

let mut recv_task = tokio::spawn(async move {
process_client_to_pod(receiver, stdin_writer).await;
});

let mut send_task = tokio::spawn(async move {
process_pod_to_client(stdout_reader, sender).await;
});

tokio::select! {
_ = &mut recv_task => {
send_task.abort();
},
_ = &mut send_task => {
recv_task.abort();
},
}

Ok(())
}
8 changes: 8 additions & 0 deletions crates/cluster/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub fn gen_safe_nanoid() -> String {
const ALPHABET: [char; 36] = [
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
];

nanoid::nanoid!(12, &ALPHABET)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use axum::{Router, extract::WebSocketUpgrade, response::IntoResponse};
use cds_db::entity::user::Group;
use serde::{Deserialize, Serialize};
use serde_json::json;

use crate::{
extract::{Extension, Path, Query},
traits::{Ext, WebError},
};

pub fn router() -> Router {
Router::new().route("/shell", axum::routing::get(get_shell))
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetShellRequest {
pub command: String,
}

pub async fn get_shell(
Extension(ext): Extension<Ext>, Path((pod_id, container_id)): Path<(String, String)>,
Query(params): Query<GetShellRequest>, ws: WebSocketUpgrade,
) -> Result<impl IntoResponse, WebError> {
let operator = ext.operator.ok_or(WebError::Unauthorized(json!("")))?;

if operator.group != Group::Admin {
return Err(WebError::Forbidden(json!("")));
}

Ok(ws.on_upgrade(move |socket| async move {
let _ = cds_cluster::exec(&pod_id, &container_id, params.command, socket).await;
}))
}
18 changes: 18 additions & 0 deletions crates/web/src/router/api/pod/pod_id/containers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use axum::{Router, http::StatusCode};

use crate::traits::{WebError, WebResponse};

pub mod container_id;

pub fn router() -> Router {
Router::new()
.route("/", axum::routing::get(get_container))
.nest("/{container_id}", container_id::router())
}

pub async fn get_container() -> Result<WebResponse<()>, WebError> {
Ok(WebResponse {
code: StatusCode::OK.as_u16(),
..Default::default()
})
}
16 changes: 10 additions & 6 deletions crates/web/src/router/api/pod/pod_id/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
mod containers;

use axum::{Router, extract::WebSocketUpgrade, http::StatusCode, response::IntoResponse};
use cds_db::entity::user::Group;
use containers::container_id;
use serde::Deserialize;
use serde_json::json;
use tracing::debug;
Expand All @@ -15,14 +18,15 @@ pub fn router() -> Router {
.route("/renew", axum::routing::post(renew_pod))
.route("/stop", axum::routing::post(stop_pod))
.route("/wsrx", axum::routing::get(wsrx))
.nest("/containers", containers::router())
}

pub async fn renew_pod(
Extension(ext): Extension<Ext>, Path(pod_id): Path<Uuid>,
Extension(ext): Extension<Ext>, Path(pod_id): Path<String>,
) -> Result<WebResponse<()>, WebError> {
let operator = ext.operator.ok_or(WebError::Unauthorized(json!("")))?;

let pod = cds_cluster::get_pod(&pod_id.to_string()).await?;
let pod = cds_cluster::get_pod(&pod_id).await?;

let labels = pod.metadata.labels.unwrap_or_default();
let id = labels
Expand Down Expand Up @@ -84,11 +88,11 @@ pub async fn renew_pod(
}

pub async fn stop_pod(
Extension(ext): Extension<Ext>, Path(pod_id): Path<Uuid>,
Extension(ext): Extension<Ext>, Path(pod_id): Path<String>,
) -> Result<WebResponse<()>, WebError> {
let operator = ext.operator.ok_or(WebError::Unauthorized(json!("")))?;

let pod = cds_cluster::get_pod(&pod_id.to_string()).await?;
let pod = cds_cluster::get_pod(&pod_id).await?;

let labels = pod.metadata.labels.unwrap_or_default();
let id = labels
Expand Down Expand Up @@ -128,12 +132,12 @@ pub struct WsrxRequest {
}

pub async fn wsrx(
Path(pod_id): Path<Uuid>, Query(query): Query<WsrxRequest>, ws: WebSocketUpgrade,
Path(pod_id): Path<String>, Query(query): Query<WsrxRequest>, ws: WebSocketUpgrade,
) -> Result<impl IntoResponse, WebError> {
let port = query.port;

Ok(ws.on_upgrade(move |socket| async move {
let result = cds_cluster::wsrx(pod_id, port as u16, socket).await;
let result = cds_cluster::wsrx(&pod_id, port as u16, socket).await;
if let Err(e) = result {
debug!("Failed to link pods: {:?}", e);
}
Expand Down

0 comments on commit 23332ee

Please sign in to comment.