From 23332ee3e24c016d0144858bfd75f86fa6c0f003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9F=83=E6=8B=89?= Date: Thu, 20 Feb 2025 16:25:24 +0800 Subject: [PATCH] wip: get shell --- crates/cluster/Cargo.toml | 3 +- crates/cluster/src/lib.rs | 112 ++++++++++++++++-- crates/cluster/src/util/mod.rs | 8 ++ .../pod/pod_id/containers/container_id/mod.rs | 33 ++++++ .../router/api/pod/pod_id/containers/mod.rs | 18 +++ crates/web/src/router/api/pod/pod_id/mod.rs | 16 ++- 6 files changed, 175 insertions(+), 15 deletions(-) create mode 100644 crates/cluster/src/util/mod.rs create mode 100644 crates/web/src/router/api/pod/pod_id/containers/container_id/mod.rs create mode 100644 crates/web/src/router/api/pod/pod_id/containers/mod.rs diff --git a/crates/cluster/Cargo.toml b/crates/cluster/Cargo.toml index e3479ac..5cdf957 100644 --- a/crates/cluster/Cargo.toml +++ b/crates/cluster/Cargo.toml @@ -23,4 +23,5 @@ anyhow = { workspace = true } uuid = { workspace = true } regex = { workspace = true } futures-util = { workspace = true } -serde_json = { workspace = true } \ No newline at end of file +serde_json = { workspace = true } +nanoid = { workspace = true } \ No newline at end of file diff --git a/crates/cluster/src/lib.rs b/crates/cluster/src/lib.rs index 7f4cde8..2f22f86 100644 --- a/crates/cluster/src/lib.rs +++ b/crates/cluster/src/lib.rs @@ -1,10 +1,15 @@ pub mod traits; +mod util; pub mod worker; use std::{collections::BTreeMap, fmt::format, path::Path, process}; -use axum::extract::ws::WebSocket; +use axum::extract::ws::{Message, Utf8Bytes, WebSocket}; use cds_db::get_db; +use futures_util::{ + SinkExt, StreamExt, TryStreamExt, + stream::{SplitSink, SplitStream}, +}; use k8s_openapi::{ api::core::v1::{ Container as K8sContainer, ContainerPort, EnvVar, Namespace, Pod, PodSpec, @@ -15,13 +20,18 @@ use k8s_openapi::{ }; use kube::{ Client as K8sClient, Config as K8sConfig, ResourceExt, - api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams}, + api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, PostParams}, config::Kubeconfig, runtime::{wait::conditions, watcher}, }; +use nanoid::nanoid; use once_cell::sync::OnceCell; use regex::Regex; -use tokio_util::codec::Framed; +use tokio::io::{ + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, + stdin, stdout, +}; +use tokio_util::codec::{BytesCodec, Framed, FramedRead}; use tracing::{error, info, warn}; use uuid::Uuid; @@ -207,8 +217,8 @@ pub async fn create_challenge_env( user: cds_db::transfer::User, team: Option, game: Option, challenge: cds_db::transfer::Challenge, ) -> Result<(), ClusterError> { - let id = Uuid::new_v4(); - let name = format!("cds-{}", id.to_string()); + let id = util::gen_safe_nanoid(); + let name = format!("cds-{}", id); let env = challenge.clone().env.unwrap(); @@ -286,7 +296,7 @@ pub async fn create_challenge_env( metadata: metadata.clone(), spec: Some(PodSpec { containers: vec![K8sContainer { - name: name.clone(), + name: format!("cds-{}", util::gen_safe_nanoid()), image: Some(env.image), env: Some(env_vars), ports: Some(container_ports), @@ -439,8 +449,8 @@ pub async fn delete_challenge_env(id: &str) -> Result<(), ClusterError> { Ok(()) } -pub async fn wsrx(id: Uuid, port: u16, ws: WebSocket) -> Result<(), ClusterError> { - let name = format!("cds-{}", id.to_string()); +pub async fn wsrx(id: &str, port: u16, ws: WebSocket) -> Result<(), ClusterError> { + let name = format!("cds-{}", id); let pod_api: Api = Api::namespaced( get_k8s_client(), @@ -455,3 +465,89 @@ pub async fn wsrx(id: Uuid, port: u16, ws: WebSocket) -> Result<(), ClusterError } Ok(()) } + +pub async fn exec( + id: &str, container_id: &str, command: String, ws: WebSocket, +) -> Result<(), ClusterError> { + async fn process_client_to_pod(mut receiver: SplitStream, mut stdin_writer: W) + where + W: AsyncWrite + Unpin + Sized, { + while let Some(Ok(msg)) = receiver.next().await { + match msg { + Message::Text(text) => { + if stdin_writer.write_all(text.as_bytes()).await.is_err() { + break; + } + } + Message::Close(_) => break, + _ => {} + } + } + let _ = stdin_writer.shutdown().await; + } + + async fn process_pod_to_client(stdout_reader: R, mut sender: S) + where + R: AsyncRead + Unpin, + S: SinkExt + Unpin, { + let mut reader = FramedRead::new(stdout_reader, BytesCodec::new()); + while let Some(result) = reader.next().await { + match result { + Ok(bytes) => { + if let Ok(text) = String::from_utf8(bytes.to_vec()) { + if sender + .send(Message::Text(Utf8Bytes::from(text))) + .await + .is_err() + { + break; + } + } + } + Err(_) => break + } + } + let _ = sender.close().await; + } + + let (sender, receiver) = ws.split(); + let name = format!("cds-{}", id.to_string()); + + let pod_api: Api = Api::namespaced( + get_k8s_client(), + cds_config::get_config().cluster.namespace.as_str(), + ); + + let attach_params = AttachParams { + container: Some(format!("cds-{}", container_id)), + stdin: true, + stdout: true, + stderr: false, + tty: true, + ..Default::default() + }; + + let mut attached = pod_api.exec(&name, vec![command], &attach_params).await?; + + let stdin_writer = attached.stdin().unwrap(); + let stdout_reader = BufReader::new(attached.stdout().unwrap()); + + let mut recv_task = tokio::spawn(async move { + process_client_to_pod(receiver, stdin_writer).await; + }); + + let mut send_task = tokio::spawn(async move { + process_pod_to_client(stdout_reader, sender).await; + }); + + tokio::select! { + _ = &mut recv_task => { + send_task.abort(); + }, + _ = &mut send_task => { + recv_task.abort(); + }, + } + + Ok(()) +} diff --git a/crates/cluster/src/util/mod.rs b/crates/cluster/src/util/mod.rs new file mode 100644 index 0000000..0aaa686 --- /dev/null +++ b/crates/cluster/src/util/mod.rs @@ -0,0 +1,8 @@ +pub fn gen_safe_nanoid() -> String { + const ALPHABET: [char; 36] = [ + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', + 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + ]; + + nanoid::nanoid!(12, &ALPHABET) +} diff --git a/crates/web/src/router/api/pod/pod_id/containers/container_id/mod.rs b/crates/web/src/router/api/pod/pod_id/containers/container_id/mod.rs new file mode 100644 index 0000000..0cd3c65 --- /dev/null +++ b/crates/web/src/router/api/pod/pod_id/containers/container_id/mod.rs @@ -0,0 +1,33 @@ +use axum::{Router, extract::WebSocketUpgrade, response::IntoResponse}; +use cds_db::entity::user::Group; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +use crate::{ + extract::{Extension, Path, Query}, + traits::{Ext, WebError}, +}; + +pub fn router() -> Router { + Router::new().route("/shell", axum::routing::get(get_shell)) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetShellRequest { + pub command: String, +} + +pub async fn get_shell( + Extension(ext): Extension, Path((pod_id, container_id)): Path<(String, String)>, + Query(params): Query, ws: WebSocketUpgrade, +) -> Result { + let operator = ext.operator.ok_or(WebError::Unauthorized(json!("")))?; + + if operator.group != Group::Admin { + return Err(WebError::Forbidden(json!(""))); + } + + Ok(ws.on_upgrade(move |socket| async move { + let _ = cds_cluster::exec(&pod_id, &container_id, params.command, socket).await; + })) +} diff --git a/crates/web/src/router/api/pod/pod_id/containers/mod.rs b/crates/web/src/router/api/pod/pod_id/containers/mod.rs new file mode 100644 index 0000000..f130563 --- /dev/null +++ b/crates/web/src/router/api/pod/pod_id/containers/mod.rs @@ -0,0 +1,18 @@ +use axum::{Router, http::StatusCode}; + +use crate::traits::{WebError, WebResponse}; + +pub mod container_id; + +pub fn router() -> Router { + Router::new() + .route("/", axum::routing::get(get_container)) + .nest("/{container_id}", container_id::router()) +} + +pub async fn get_container() -> Result, WebError> { + Ok(WebResponse { + code: StatusCode::OK.as_u16(), + ..Default::default() + }) +} diff --git a/crates/web/src/router/api/pod/pod_id/mod.rs b/crates/web/src/router/api/pod/pod_id/mod.rs index 5f2b735..b59af5a 100644 --- a/crates/web/src/router/api/pod/pod_id/mod.rs +++ b/crates/web/src/router/api/pod/pod_id/mod.rs @@ -1,5 +1,8 @@ +mod containers; + use axum::{Router, extract::WebSocketUpgrade, http::StatusCode, response::IntoResponse}; use cds_db::entity::user::Group; +use containers::container_id; use serde::Deserialize; use serde_json::json; use tracing::debug; @@ -15,14 +18,15 @@ pub fn router() -> Router { .route("/renew", axum::routing::post(renew_pod)) .route("/stop", axum::routing::post(stop_pod)) .route("/wsrx", axum::routing::get(wsrx)) + .nest("/containers", containers::router()) } pub async fn renew_pod( - Extension(ext): Extension, Path(pod_id): Path, + Extension(ext): Extension, Path(pod_id): Path, ) -> Result, WebError> { let operator = ext.operator.ok_or(WebError::Unauthorized(json!("")))?; - let pod = cds_cluster::get_pod(&pod_id.to_string()).await?; + let pod = cds_cluster::get_pod(&pod_id).await?; let labels = pod.metadata.labels.unwrap_or_default(); let id = labels @@ -84,11 +88,11 @@ pub async fn renew_pod( } pub async fn stop_pod( - Extension(ext): Extension, Path(pod_id): Path, + Extension(ext): Extension, Path(pod_id): Path, ) -> Result, WebError> { let operator = ext.operator.ok_or(WebError::Unauthorized(json!("")))?; - let pod = cds_cluster::get_pod(&pod_id.to_string()).await?; + let pod = cds_cluster::get_pod(&pod_id).await?; let labels = pod.metadata.labels.unwrap_or_default(); let id = labels @@ -128,12 +132,12 @@ pub struct WsrxRequest { } pub async fn wsrx( - Path(pod_id): Path, Query(query): Query, ws: WebSocketUpgrade, + Path(pod_id): Path, Query(query): Query, ws: WebSocketUpgrade, ) -> Result { let port = query.port; Ok(ws.on_upgrade(move |socket| async move { - let result = cds_cluster::wsrx(pod_id, port as u16, socket).await; + let result = cds_cluster::wsrx(&pod_id, port as u16, socket).await; if let Err(e) = result { debug!("Failed to link pods: {:?}", e); }