Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update hyper axum #322

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
785 changes: 396 additions & 389 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 15 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ tokio = { workspace = true, features = ["full"] }
[features]
default = ["full", "tls-openssl"]
full = ["core", "extra"]
core = ["threat-logger", "process-monitor", "network-monitor", "file-system-monitor"]
core = [
"threat-logger",
"process-monitor",
"network-monitor",
"file-system-monitor",
]
extra = ["rules-engine", "desktop-notifier", "smtp-notifier"]
tls-openssl = ["smtp-notifier/tls-openssl"]
tls-rustls = ["smtp-notifier/tls-rustls"]
Expand Down Expand Up @@ -112,7 +117,7 @@ anyhow = "1.0.75"
aya = { version = "0.12.0", features = ["async_tokio"] }
aya-ebpf-bindings = "0.1.0"
aya-obj = "0.1.0"
axum = { version = "0.6.20", features = ["ws"] }
axum = { version = "0.8.1", features = ["ws"] }
bytes = "1.5.0"
cgroups-rs = { version = "0.3.4" }
chrono = { version = "0.4.31" }
Expand All @@ -129,8 +134,10 @@ gethostname = "0.4.3"
glob = "0.3.1"
hex = "0.4.3"
hickory-resolver = "0.24.1"
hyper = "0.14.28"
hyperlocal = "0.8"
http-body-util = "0.1.2"
hyper = "1.5.1"
hyper-util = "0.1.10"
hyperlocal = "0.9.1"
indicatif = "0.17"
lalrpop = "0.20.0"
lalrpop-util = { version = "0.20.0", features = ["lexer"] }
Expand Down Expand Up @@ -162,7 +169,7 @@ procfs = { version = "0.16.0", default-features = false }
quote = "1.0.33"
rand = { version = "0.8.5" }
regex = "1.10.2"
reqwest = { version = "0.11.23", default-features = false, features = [
reqwest = { version = "0.12.9", default-features = false, features = [
"blocking",
"json",
"rustls-tls",
Expand All @@ -177,11 +184,11 @@ strum = { version = "0.25", features = ["derive"] }
syn = "2.0.41"
sys-mount = { version = "2.1.0", default-features = false }
tar = "0.4"
thiserror = "1.0.51"
thiserror = "2.0.9"
toml_edit = { version = "0.15.0", features = ["easy"] }
tokio = { version = "1.35.1", features = ["full"] }
tokio = { version = "1.42", features = ["full"] }
tokio-fd = "0.3.0"
tokio-tungstenite = "0.21.0"
tokio-tungstenite = "0.26.1"
uuid = { version = "1.6.1", features = ["v4"] }
which = "5.0.0"
xshell = "0.2.5"
Expand Down
2 changes: 2 additions & 0 deletions crates/bpf-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ procfs = { workspace = true }
libc = { workspace = true }
glob = { workspace = true }
hex = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyperlocal = { workspace = true }
hyper-util = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
lazy_static = { workspace = true }
Expand Down
26 changes: 15 additions & 11 deletions crates/bpf-common/src/containers/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use std::{
str::FromStr,
};

use hyper::{body, Client};
use hyperlocal::{UnixClientExt, Uri as HyperlocalUri};
use bytes::Buf;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper_util::client::legacy::Client;
use hyperlocal::{UnixClientExt, UnixConnector, Uri as HyperlocalUri};
use nix::unistd::Uid;
use serde::Deserialize;

Expand Down Expand Up @@ -86,7 +89,7 @@ struct Rootfs {

/// Returns a list of layer paths for the given Docker image ID.
pub(crate) async fn docker_layers(image_id: &str) -> Result<Vec<PathBuf>, ContainerError> {
let client = Client::unix();
let client: Client<UnixConnector, Full<Bytes>> = Client::unix();
let uri = HyperlocalUri::new(DOCKER_SOCKET, &format!("/images/{}/json", image_id));
let uri: hyper::Uri = uri.into();

Expand All @@ -98,15 +101,16 @@ pub(crate) async fn docker_layers(image_id: &str) -> Result<Vec<PathBuf>, Contai
source,
uri: uri.clone(),
})?;
let body_bytes =
body::to_bytes(response)
.await
.map_err(|source| ContainerError::HyperResponse {
source,
uri: uri.clone(),
})?;
let body_bytes = response
.collect()
.await
.map_err(|source| ContainerError::HyperResponse {
source,
uri: uri.clone(),
})?
.aggregate();

let response: ImageInspect = serde_json::from_slice(&body_bytes)
let response: ImageInspect = serde_json::from_reader(body_bytes.reader())
.map_err(|source| ContainerError::ParseResponse { source, uri })?;

match response.graph_driver.name {
Expand Down
2 changes: 1 addition & 1 deletion crates/bpf-common/src/containers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum ContainerError {
#[error("failed to make a request to the UNIX socket `{uri:?}`")]
HyperRequest {
#[source]
source: hyper::Error,
source: hyper_util::client::legacy::Error,
uri: hyper::Uri,
},
#[error("failed to parse a response from the UNIX socket `{uri:?}`")]
Expand Down
2 changes: 2 additions & 0 deletions crates/engine-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ axum = { workspace = true, features = ["ws"] }
anyhow = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
futures = { workspace = true }
hyperlocal = { workspace = true }
serde_json = { workspace = true }
Expand Down
37 changes: 25 additions & 12 deletions crates/engine-api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{ffi::CString, os::unix::prelude::FileTypeExt};

use anyhow::{anyhow, bail, ensure, Context, Result};
use futures::{Stream, StreamExt};
use hyper::{Body, Method, Request, StatusCode, Uri};
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::body::{Buf, Bytes};
use hyper::{Method, Request, StatusCode, Uri};
use hyper_util::client::legacy::Client;
use hyperlocal::{UnixClientExt, UnixConnector};
use pulsar_core::pdk::{Event, ModuleOverview};
use serde::de::DeserializeOwned;
Expand All @@ -16,7 +19,7 @@ use crate::{
#[derive(Debug, Clone)]
pub struct EngineApiClient {
socket: String,
client: hyper::Client<UnixConnector, Body>,
client: Client<UnixConnector, Either<Full<Bytes>, Empty<Bytes>>>,
}

impl EngineApiClient {
Expand Down Expand Up @@ -59,7 +62,7 @@ impl EngineApiClient {

Ok(Self {
socket,
client: hyper::Client::unix(),
client: Client::unix(),
})
}

Expand All @@ -68,15 +71,21 @@ impl EngineApiClient {
}

async fn get<T: DeserializeOwned>(&self, uri: Uri) -> Result<T> {
let req = Request::builder()
.method(Method::GET)
.uri(uri)
.body(Either::Right(Empty::<Bytes>::new()))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit, something to handle in a separate PR: I think it would be better to use thiserror and return specific errors in such cases, especially since it's a client crate.


let res = self
.client
.get(uri)
.request(req)
.await
.map_err(|err| anyhow!("Error during the http request: reason {}", err))?;

let buf = hyper::body::to_bytes(res).await?;
let buf = res.collect().await?.aggregate();

let output = serde_json::from_slice(&buf)?;
let output = serde_json::from_reader(buf.reader())?;

Ok(output)
}
Expand Down Expand Up @@ -129,7 +138,7 @@ impl EngineApiClient {
.method(Method::PATCH)
.uri(url)
.header("content-type", "application/json")
.body(Body::from(body_string))
.body(Either::Left(Full::from(body_string)))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;

let res = self
Expand All @@ -143,9 +152,11 @@ impl EngineApiClient {
match status {
StatusCode::OK => Ok(()),
_ => {
let error = hyper::body::to_bytes(res)
let error = res
.collect()
.await
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?;
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?
.to_bytes();
let error = std::str::from_utf8(&error)
.map_err(|err| anyhow!("Cannot parse error str. Reason: {}", err))?;
Err(anyhow!("Error during request. {error}"))
Expand All @@ -157,7 +168,7 @@ impl EngineApiClient {
let req = Request::builder()
.method(Method::POST)
.uri(uri)
.body(Body::empty())
.body(Either::Right(Empty::<Bytes>::new()))
.map_err(|err| anyhow!("Error building the request. Reason: {}", err))?;

let res = self
Expand All @@ -171,9 +182,11 @@ impl EngineApiClient {
match status {
StatusCode::OK => Ok(()),
_ => {
let error = hyper::body::to_bytes(res)
let error = res
.collect()
.await
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?;
.map_err(|err| anyhow!("Error to bytes. Reason: {}", err))?
.to_bytes();
let error = std::str::from_utf8(&error)
.map_err(|err| anyhow!("Cannot parse error str. Reason: {}", err))?;
Err(anyhow!("Error during request. {error}"))
Expand Down
20 changes: 7 additions & 13 deletions crates/engine-api/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::fmt::Display;

use axum::{
body,
http::{Response, StatusCode},
response::IntoResponse,
};
use axum::{http::StatusCode, response::IntoResponse};
use pulsar_core::pdk::PulsarDaemonError;
use thiserror::Error;

Expand Down Expand Up @@ -43,15 +39,13 @@ impl EngineApiError {

impl IntoResponse for EngineApiError {
fn into_response(self) -> axum::response::Response {
let status_code = &self.status_code();
let status_code = self.status_code();

let body = match self {
Self::InternalServerError => body::boxed(body::Full::from("internal")),
Self::BadRequest(err) => body::boxed(body::Full::from(err)),
Self::ServiceUnavailable => body::boxed(body::Full::from("unavailable")),
};

Response::builder().status(status_code).body(body).unwrap()
match self {
Self::InternalServerError => (status_code, "internal").into_response(),
Self::BadRequest(err) => (status_code, err).into_response(),
Self::ServiceUnavailable => (status_code, "unavailable").into_response(),
}
}
}

Expand Down
52 changes: 11 additions & 41 deletions crates/engine-api/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use anyhow::{anyhow, Result};
use axum::{
extract::{
Expand All @@ -11,19 +6,13 @@ use axum::{
},
response::Response,
routing::{get, patch, post},
BoxError, Json, Router,
Json, Router,
};
use futures::ready;
use hyper::server::accept::Accept;
use pulsar_core::{
bus::Bus,
pdk::{ModuleOverview, PulsarDaemonHandle},
};
use tokio::{
net::{UnixListener, UnixStream},
sync::oneshot,
task::JoinHandle,
};
use tokio::{net::UnixListener, sync::oneshot, task::JoinHandle};

use crate::{
dto::{ConfigKV, ModuleConfigKVs},
Expand Down Expand Up @@ -54,11 +43,11 @@ pub fn run_api_server(
) -> Result<ServerHandle> {
let modules = Router::new()
.route("/", get(modules))
.route("/:module_name/start", post(module_start))
.route("/:module_name/restart", post(module_restart))
.route("/:module_name/stop", post(module_stop))
.route("/:module_name/config", get(get_module_cfg))
.route("/:module_name/config", patch(update_module_cfg));
.route("/{module_name}/start", post(module_start))
.route("/{module_name}/restart", post(module_restart))
.route("/{module_name}/stop", post(module_stop))
.route("/{module_name}/config", get(get_module_cfg))
.route("/{module_name}/config", patch(update_module_cfg));

let app = Router::new()
.nest("/modules", modules)
Expand All @@ -74,11 +63,9 @@ pub fn run_api_server(

let (tx_shutdown, rx_shutdown) = oneshot::channel();

let server = axum::Server::builder(ServerAccept { uds })
.serve(app.into_make_service())
.with_graceful_shutdown(async move {
let _ = rx_shutdown.await;
});
let server = axum::serve(uds, app).with_graceful_shutdown(async move {
let _ = rx_shutdown.await;
});

let server_join_handle = tokio::spawn(async move {
if let Err(e) = server.await {
Expand Down Expand Up @@ -180,7 +167,7 @@ async fn event_monitor_handler(
match pulsar_core::pdk::receive_from_broadcast(&mut bus_receiver, "engine_api").await {
Ok(event) => match serde_json::to_string(&*event) {
Ok(json) => {
if socket.send(Message::Text(json)).await.is_err() {
if socket.send(Message::Text(json.into())).await.is_err() {
// client disconnected
return;
}
Expand All @@ -200,20 +187,3 @@ async fn event_monitor_handler(

ws.on_upgrade(handle_socket)
}

struct ServerAccept {
uds: UnixListener,
}

impl Accept for ServerAccept {
type Conn = UnixStream;
type Error = BoxError;

fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let (stream, _addr) = ready!(self.uds.poll_accept(cx))?;
Poll::Ready(Some(Ok(stream)))
}
}
Loading