Skip to content

Commit

Permalink
feat(rust): implemented control api http server
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Feb 5, 2025
1 parent 8cf0036 commit 65ca76c
Show file tree
Hide file tree
Showing 26 changed files with 2,015 additions and 69 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_abac/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl std::error::Error for EvalError {
impl From<ParseError> for ockam_core::Error {
#[track_caller]
fn from(e: ParseError) -> Self {
ockam_core::Error::new(Origin::Application, Kind::Invalid, e.to_string())
ockam_core::Error::new(Origin::Application, Kind::Parse, e.to_string())
}
}

Expand Down
3 changes: 3 additions & 0 deletions implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ futures = { version = "0.3.30", features = [] }
gethostname = "0.5.0"
hex = { version = "0.4.3", default-features = false, features = ["alloc", "serde"] }
home = "0.5"
http = "1"
http-body-util = "0"
httparse = "1.9.5"
hyper = { version = "1", default-features = false, features = ["server", "http1"] }
Expand Down Expand Up @@ -99,6 +100,7 @@ sqlx = { version = "0.8.3", default-features = false }
sqlx-core = { version = "0.8.3", default-features = false }
strip-ansi-escapes = "0.2"
strum = { version = "0.26.3", default-features = false, features = ["derive"] }
subtle = "2"
syntect = { version = "5.2.0", default-features = false, features = ["default-syntaxes", "regex-onig"] }
sysinfo = "0.32"
termbg = "0.5.2"
Expand Down Expand Up @@ -160,6 +162,7 @@ default-features = false
[dev-dependencies]
cddl-cat = "0.6.1"
hex = "0.4.3"
hyper = { version = "1", default-features = false, features = ["server", "http1", "client"] }
multimap = "0.10.0"
ockam_macros = { path = "../ockam_macros", features = ["std"], version = "^0.37.0" }
ockam_transport_core = { path = "../ockam_transport_core", version = "^0.101.0" }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::control_api::{ControlApiHttpRequest, ControlApiHttpResponse, ErrorResponse};
use crate::nodes::NodeManager;
use crate::DefaultAddress;
use http::{StatusCode, Uri};
use ockam_abac::{IncomingAbac, OutgoingAbac, PolicyExpression};
use ockam_core::errcode::Kind;
use ockam_core::{
async_trait, Address, AllowAll, IncomingAccessControl, NeutralMessage, OutgoingAccessControl,
Routed, Worker,
};
use ockam_node::{Context, WorkerBuilder};
use std::str::FromStr;
use std::sync::Arc;

pub struct HttpControlNodeApiBackend {
pub(crate) node_manager: Arc<NodeManager>,
}

impl HttpControlNodeApiBackend {
pub fn new(node_manager: Arc<NodeManager>) -> Self {
Self { node_manager }
}
}

#[async_trait]
impl Worker for HttpControlNodeApiBackend {
type Message = NeutralMessage;
type Context = Context;

async fn handle_message(
&mut self,
context: &mut Self::Context,
message: Routed<Self::Message>,
) -> ockam_core::Result<()> {
let request: ControlApiHttpRequest = minicbor::decode(message.payload())?;

let uri = Uri::from_str(&request.uri).unwrap();
// The syntax for restful API is:
// - /NODE_ID/RESOURCE/
// - /NODE_ID/RESOURCE/RESOURCE_ID/

let path = uri.path().split('/').collect::<Vec<&str>>();
if path.len() < 3 {
return context
.send(
message.return_route().clone(),
NeutralMessage::from(minicbor::to_vec(&ControlApiHttpResponse::with_body(
StatusCode::BAD_REQUEST,
ErrorResponse {
message: "Invalid URI".to_string(),
},
)?)?),
)
.await;
}

// we can ignore the node identifier since it has been already addressed by
// the reverse proxy
let _node_identifier = path[1];

let resource_kind = path[2].to_lowercase();
let resource_id = path.get(3).copied();

let result: ockam_core::Result<ControlApiHttpResponse> = match resource_kind.as_str() {
"tcp-inlet" => {
self.handle_tcp_inlet(context, request.method.as_str(), resource_id, request.body)
.await
}
"tcp-outlet" => {
self.handle_tcp_outlet(context, request.method.as_str(), resource_id, request.body)
.await
}
_ => ControlApiHttpResponse::with_body(
StatusCode::BAD_REQUEST,
ErrorResponse {
message: "Unknown resource kind".to_string(),
},
),
};

let response = match result {
Ok(response) => response,
Err(error) => match error.code().kind {
// We make an assumption that every parsing error originates from the
// client; This is not necessarily always true, but it's a good approximation
Kind::Parse => ControlApiHttpResponse::with_body(
StatusCode::BAD_REQUEST,
ErrorResponse {
message: error.to_string(),
},
)?,
_ => ControlApiHttpResponse::with_body(
StatusCode::INTERNAL_SERVER_ERROR,
ErrorResponse {
message: error.to_string(),
},
)?,
},
};

let response = minicbor::to_vec(&response)?;

context
.send(
message.return_route().clone(),
NeutralMessage::from(response),
)
.await
}
}

impl NodeManager {
pub fn create_control_api_backend(
self: &Arc<NodeManager>,
context: &Context,
policy_expression: Option<PolicyExpression>,
) -> ockam_core::Result<()> {
let service = HttpControlNodeApiBackend::new(self.clone());
let address: Address = DefaultAddress::CONTROL_API.into();

let flow_controls = context.flow_controls();

let incoming_access_control: Arc<dyn IncomingAccessControl>;
let outgoing_access_control: Arc<dyn OutgoingAccessControl>;

match policy_expression {
Some(policy_expression) => {
incoming_access_control = Arc::new(IncomingAbac::create(
self.secure_channels.identities().identities_attributes(),
self.project_authority(),
policy_expression.to_expression(),
));
outgoing_access_control = Arc::new(OutgoingAbac::create(
context,
self.secure_channels.identities().identities_attributes(),
self.project_authority(),
policy_expression.to_expression(),
)?);
}
None => {
incoming_access_control = Arc::new(AllowAll);
outgoing_access_control = Arc::new(AllowAll);
}
}

WorkerBuilder::new(service)
.with_address(address.clone())
.with_incoming_access_control_arc(incoming_access_control)
.with_outgoing_access_control_arc(outgoing_access_control)
.start(context)?;

if let Some(secure_channel_listener) = &self.api_sc_listener {
flow_controls.add_consumer(&address, secure_channel_listener.flow_control_id());
}

Ok(())
}
}
Loading

0 comments on commit 65ca76c

Please sign in to comment.