Skip to content

Commit

Permalink
chore: update axum and hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
banditopazzo committed Jan 3, 2025
1 parent e94d977 commit e29bd9d
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 475 deletions.
785 changes: 396 additions & 389 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 15 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ tokio = { workspace = true, features = ["full"] }
[features]
default = ["full", "tls-openssl"]
full = ["core", "extra"]
core = ["threat-logger", "process-monitor", "network-monitor", "file-system-monitor"]
core = [
"threat-logger",
"process-monitor",
"network-monitor",
"file-system-monitor",
]
extra = ["rules-engine", "desktop-notifier", "smtp-notifier"]
tls-openssl = ["smtp-notifier/tls-openssl"]
tls-rustls = ["smtp-notifier/tls-rustls"]
Expand Down Expand Up @@ -112,7 +117,7 @@ anyhow = "1.0.75"
aya = { version = "0.12.0", features = ["async_tokio"] }
aya-ebpf-bindings = "0.1.0"
aya-obj = "0.1.0"
axum = { version = "0.6.20", features = ["ws"] }
axum = { version = "0.8.1", features = ["ws"] }
bytes = "1.5.0"
cgroups-rs = { version = "0.3.4" }
chrono = { version = "0.4.31" }
Expand All @@ -129,8 +134,10 @@ gethostname = "0.4.3"
glob = "0.3.1"
hex = "0.4.3"
hickory-resolver = "0.24.1"
hyper = "0.14.28"
hyperlocal = "0.8"
http-body-util = "0.1.2"
hyper = "1.5.1"
hyper-util = "0.1.10"
hyperlocal = "0.9.1"
indicatif = "0.17"
lalrpop = "0.20.0"
lalrpop-util = { version = "0.20.0", features = ["lexer"] }
Expand Down Expand Up @@ -162,7 +169,7 @@ procfs = { version = "0.16.0", default-features = false }
quote = "1.0.33"
rand = { version = "0.8.5" }
regex = "1.10.2"
reqwest = { version = "0.11.23", default-features = false, features = [
reqwest = { version = "0.12.9", default-features = false, features = [
"blocking",
"json",
"rustls-tls",
Expand All @@ -177,11 +184,11 @@ strum = { version = "0.25", features = ["derive"] }
syn = "2.0.41"
sys-mount = { version = "2.1.0", default-features = false }
tar = "0.4"
thiserror = "1.0.51"
thiserror = "2.0.9"
toml_edit = { version = "0.15.0", features = ["easy"] }
tokio = { version = "1.35.1", features = ["full"] }
tokio = { version = "1.42", features = ["full"] }
tokio-fd = "0.3.0"
tokio-tungstenite = "0.21.0"
tokio-tungstenite = "0.26.1"
uuid = { version = "1.6.1", features = ["v4"] }
which = "5.0.0"
xshell = "0.2.5"
Expand Down
2 changes: 2 additions & 0 deletions crates/bpf-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ procfs = { workspace = true }
libc = { workspace = true }
glob = { workspace = true }
hex = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyperlocal = { workspace = true }
hyper-util = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
lazy_static = { workspace = true }
Expand Down
26 changes: 15 additions & 11 deletions crates/bpf-common/src/containers/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use std::{
str::FromStr,
};

use hyper::{body, Client};
use hyperlocal::{UnixClientExt, Uri as HyperlocalUri};
use bytes::Buf;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper_util::client::legacy::Client;
use hyperlocal::{UnixClientExt, UnixConnector, Uri as HyperlocalUri};
use nix::unistd::Uid;
use serde::Deserialize;

Expand Down Expand Up @@ -86,7 +89,7 @@ struct Rootfs {

/// Returns a list of layer paths for the given Docker image ID.
pub(crate) async fn docker_layers(image_id: &str) -> Result<Vec<PathBuf>, ContainerError> {
let client = Client::unix();
let client: Client<UnixConnector, Full<Bytes>> = Client::unix();
let uri = HyperlocalUri::new(DOCKER_SOCKET, &format!("/images/{}/json", image_id));
let uri: hyper::Uri = uri.into();

Expand All @@ -98,15 +101,16 @@ pub(crate) async fn docker_layers(image_id: &str) -> Result<Vec<PathBuf>, Contai
source,
uri: uri.clone(),
})?;
let body_bytes =
body::to_bytes(response)
.await
.map_err(|source| ContainerError::HyperResponse {
source,
uri: uri.clone(),
})?;
let body_bytes = response
.collect()
.await
.map_err(|source| ContainerError::HyperResponse {
source,
uri: uri.clone(),
})?
.aggregate();

let response: ImageInspect = serde_json::from_slice(&body_bytes)
let response: ImageInspect = serde_json::from_reader(body_bytes.reader())
.map_err(|source| ContainerError::ParseResponse { source, uri })?;

match response.graph_driver.name {
Expand Down
2 changes: 1 addition & 1 deletion crates/bpf-common/src/containers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum ContainerError {
#[error("failed to make a request to the UNIX socket `{uri:?}`")]
HyperRequest {
#[source]
source: hyper::Error,
source: hyper_util::client::legacy::Error,
uri: hyper::Uri,
},
#[error("failed to parse a response from the UNIX socket `{uri:?}`")]
Expand Down
2 changes: 2 additions & 0 deletions crates/engine-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ axum = { workspace = true, features = ["ws"] }
anyhow = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
futures = { workspace = true }
hyperlocal = { workspace = true }
serde_json = { workspace = true }
Expand Down
37 changes: 25 additions & 12 deletions crates/engine-api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{ffi::CString, os::unix::prelude::FileTypeExt};

use anyhow::{anyhow, bail, ensure, Context, Result};
use futures::{Stream, StreamExt};
use hyper::{Body, Method, Request, StatusCode, Uri};
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::body::{Buf, Bytes};
use hyper::{Method, Request, StatusCode, Uri};
use hyper_util::client::legacy::Client;
use hyperlocal::{UnixClientExt, UnixConnector};
use pulsar_core::pdk::{Event, ModuleOverview};
use serde::de::DeserializeOwned;
Expand All @@ -16,7 +19,7 @@ use crate::{
#[derive(Debug, Clone)]
pub struct EngineApiClient {
socket: String,
client: hyper::Client<UnixConnector, Body>,
client: Client<UnixConnector, Either<Full<Bytes>, Empty<Bytes>>>,
}

impl EngineApiClient {
Expand Down Expand Up @@ -59,7 +62,7 @@ impl EngineApiClient {

Ok(Self {
socket,
client: hyper::Client::unix(),
client: Client::unix(),
})
}

Expand All @@ -68,15 +71,21 @@ impl EngineApiClient {
}

async fn get<T: DeserializeOwned>(&self, uri: Uri) -> Result<T> {
let req = Request::builder()
.method(Method::GET)
.uri(uri)
.body(Either::Right(Empty::<Bytes>::new()))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;

let res = self
.client
.get(uri)
.request(req)
.await
.map_err(|err| anyhow!("Error during the http request: reason {}", err))?;

let buf = hyper::body::to_bytes(res).await?;
let buf = res.collect().await?.aggregate();

let output = serde_json::from_slice(&buf)?;
let output = serde_json::from_reader(buf.reader())?;

Ok(output)
}
Expand Down Expand Up @@ -129,7 +138,7 @@ impl EngineApiClient {
.method(Method::PATCH)
.uri(url)
.header("content-type", "application/json")
.body(Body::from(body_string))
.body(Either::Left(Full::from(body_string)))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;

let res = self
Expand All @@ -143,9 +152,11 @@ impl EngineApiClient {
match status {
StatusCode::OK => Ok(()),
_ => {
let error = hyper::body::to_bytes(res)
let error = res
.collect()
.await
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?;
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?
.to_bytes();
let error = std::str::from_utf8(&error)
.map_err(|err| anyhow!("Cannot parse error str. Reason: {}", err))?;
Err(anyhow!("Error during request. {error}"))
Expand All @@ -157,7 +168,7 @@ impl EngineApiClient {
let req = Request::builder()
.method(Method::POST)
.uri(uri)
.body(Body::empty())
.body(Either::Right(Empty::<Bytes>::new()))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;

let res = self
Expand All @@ -171,9 +182,11 @@ impl EngineApiClient {
match status {
StatusCode::OK => Ok(()),
_ => {
let error = hyper::body::to_bytes(res)
let error = res
.collect()
.await
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?;
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?
.to_bytes();
let error = std::str::from_utf8(&error)
.map_err(|err| anyhow!("Cannot parse error str. Reason: {}", err))?;
Err(anyhow!("Error during request. {error}"))
Expand Down
20 changes: 7 additions & 13 deletions crates/engine-api/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::fmt::Display;

use axum::{
body,
http::{Response, StatusCode},
response::IntoResponse,
};
use axum::{http::StatusCode, response::IntoResponse};
use pulsar_core::pdk::PulsarDaemonError;
use thiserror::Error;

Expand Down Expand Up @@ -43,15 +39,13 @@ impl EngineApiError {

impl IntoResponse for EngineApiError {
fn into_response(self) -> axum::response::Response {
let status_code = &self.status_code();
let status_code = self.status_code();

let body = match self {
Self::InternalServerError => body::boxed(body::Full::from("internal")),
Self::BadRequest(err) => body::boxed(body::Full::from(err)),
Self::ServiceUnavailable => body::boxed(body::Full::from("unavailable")),
};

Response::builder().status(status_code).body(body).unwrap()
match self {
Self::InternalServerError => (status_code, "internal").into_response(),
Self::BadRequest(err) => (status_code, err).into_response(),
Self::ServiceUnavailable => (status_code, "unavailable").into_response(),
}
}
}

Expand Down
52 changes: 11 additions & 41 deletions crates/engine-api/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use anyhow::{anyhow, Result};
use axum::{
extract::{
Expand All @@ -11,19 +6,13 @@ use axum::{
},
response::Response,
routing::{get, patch, post},
BoxError, Json, Router,
Json, Router,
};
use futures::ready;
use hyper::server::accept::Accept;
use pulsar_core::{
bus::Bus,
pdk::{ModuleOverview, PulsarDaemonHandle},
};
use tokio::{
net::{UnixListener, UnixStream},
sync::oneshot,
task::JoinHandle,
};
use tokio::{net::UnixListener, sync::oneshot, task::JoinHandle};

use crate::{
dto::{ConfigKV, ModuleConfigKVs},
Expand Down Expand Up @@ -54,11 +43,11 @@ pub fn run_api_server(
) -> Result<ServerHandle> {
let modules = Router::new()
.route("/", get(modules))
.route("/:module_name/start", post(module_start))
.route("/:module_name/restart", post(module_restart))
.route("/:module_name/stop", post(module_stop))
.route("/:module_name/config", get(get_module_cfg))
.route("/:module_name/config", patch(update_module_cfg));
.route("/{module_name}/start", post(module_start))
.route("/{module_name}/restart", post(module_restart))
.route("/{module_name}/stop", post(module_stop))
.route("/{module_name}/config", get(get_module_cfg))
.route("/{module_name}/config", patch(update_module_cfg));

let app = Router::new()
.nest("/modules", modules)
Expand All @@ -74,11 +63,9 @@ pub fn run_api_server(

let (tx_shutdown, rx_shutdown) = oneshot::channel();

let server = axum::Server::builder(ServerAccept { uds })
.serve(app.into_make_service())
.with_graceful_shutdown(async move {
let _ = rx_shutdown.await;
});
let server = axum::serve(uds, app).with_graceful_shutdown(async move {
let _ = rx_shutdown.await;
});

let server_join_handle = tokio::spawn(async move {
if let Err(e) = server.await {
Expand Down Expand Up @@ -180,7 +167,7 @@ async fn event_monitor_handler(
match pulsar_core::pdk::receive_from_broadcast(&mut bus_receiver, "engine_api").await {
Ok(event) => match serde_json::to_string(&*event) {
Ok(json) => {
if socket.send(Message::Text(json)).await.is_err() {
if socket.send(Message::Text(json.into())).await.is_err() {
// client disconnected
return;
}
Expand All @@ -200,20 +187,3 @@ async fn event_monitor_handler(

ws.on_upgrade(handle_socket)
}

struct ServerAccept {
uds: UnixListener,
}

impl Accept for ServerAccept {
type Conn = UnixStream;
type Error = BoxError;

fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let (stream, _addr) = ready!(self.uds.poll_accept(cx))?;
Poll::Ready(Some(Ok(stream)))
}
}

0 comments on commit e29bd9d

Please sign in to comment.