Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rust): removed NodeManager dependency when instantiating a MultiAddr #8599

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) use plain_tcp::PlainTcpInstantiator;
pub(crate) use project::ProjectInstantiator;
pub(crate) use secure::SecureChannelInstantiator;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

#[derive(Clone)]
pub struct Connection {
Expand Down Expand Up @@ -181,13 +182,49 @@ pub trait Instantiator: Send + Sync + 'static {
/// The returned [`Changes`] will be used to update the builder state.
async fn instantiate(
&self,
ctx: &Context,
node_manager: &NodeManager,
context: &Context,
transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
) -> Result<Changes, ockam_core::Error>;
}

/// Aggregates multiple [`Instantiator`]s, having a single object containing
/// all runtime dependencies.
pub struct ConnectionInstantiator {
instantiator: Vec<Arc<dyn Instantiator>>,
}

impl ConnectionInstantiator {
pub fn new() -> Self {
ConnectionInstantiator {
instantiator: vec![],
}
}

pub fn add(mut self, instantiator: impl Instantiator) -> Self {
self.instantiator.push(Arc::new(instantiator));
self
}

/// Resolve project ID (if any), create secure channel (if needed) and create a tcp connection
/// Returns [`Connection`]
pub async fn connect(&self, ctx: &Context, addr: &MultiAddr) -> Result<Connection> {
debug!("connecting to {}", &addr);

let mut connection_builder = ConnectionBuilder::new(addr.clone());
for instantiator in self.instantiator.clone() {
connection_builder = connection_builder
.instantiate(ctx, instantiator.as_ref())
.await?;
}
let connection = connection_builder.build();
connection.add_default_consumers(ctx);

debug!("connected to {connection:?}");
Ok(connection)
}
}

impl ConnectionBuilder {
pub fn new(multi_addr: MultiAddr) -> Self {
ConnectionBuilder {
Expand All @@ -214,11 +251,10 @@ impl ConnectionBuilder {
/// Used to instantiate a connection from a [`MultiAddr`]
/// when called multiple times the instantiator order matters and it's up to the
/// user make sure higher protocol abstraction are called before lower level ones
pub async fn instantiate(
pub async fn instantiate<T: Instantiator + ?Sized>(
mut self,
ctx: &Context,
node_manager: &NodeManager,
instantiator: impl Instantiator,
instantiator: &T,
) -> Result<Self, ockam_core::Error> {
//executing a regex-like search, shifting the starting point one by one
//not efficient by any mean, but it shouldn't be an issue
Expand All @@ -240,7 +276,6 @@ impl ConnectionBuilder {
let mut changes = instantiator
.instantiate(
ctx,
node_manager,
self.transport_route.clone(),
self.extract(start, instantiator.matches().len()),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ use crate::error::ApiError;
use crate::nodes::connection::{Changes, ConnectionBuilder, Instantiator};
use crate::{multiaddr_to_route, route_to_multiaddr};

use crate::nodes::NodeManager;
use ockam_core::{async_trait, Error, Route};
use ockam_core::{async_trait, Route};
use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Tcp};
use ockam_multiaddr::{Match, MultiAddr, Protocol};
use ockam_node::Context;
use ockam_transport_tcp::TcpTransport;

/// Creates the tcp connection.
pub(crate) struct PlainTcpInstantiator {}
pub(crate) struct PlainTcpInstantiator {
tcp_transport: TcpTransport,
}

impl PlainTcpInstantiator {
pub(crate) fn new() -> Self {
Self {}
pub(crate) fn new(tcp_transport: TcpTransport) -> Self {
Self { tcp_transport }
}
}

Expand All @@ -29,14 +31,13 @@ impl Instantiator for PlainTcpInstantiator {

async fn instantiate(
&self,
_ctx: &Context,
node_manager: &NodeManager,
_context: &Context,
_transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
) -> Result<Changes, Error> {
) -> Result<Changes, ockam_core::Error> {
let (before, tcp_piece, after) = extracted;

let mut tcp = multiaddr_to_route(&tcp_piece, &node_manager.tcp_transport)
let mut tcp = multiaddr_to_route(&tcp_piece, &self.tcp_transport)
.await
.ok_or_else(|| {
ApiError::core(format!(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,40 @@
use crate::error::ApiError;
use crate::nodes::connection::{Changes, Instantiator};
use crate::nodes::NodeManager;
use crate::{multiaddr_to_route, try_address_to_multiaddr};
use crate::{multiaddr_to_route, try_address_to_multiaddr, CliState};
use std::sync::Arc;

use ockam_core::{async_trait, Error, Route};
use ockam_core::{async_trait, Route};
use ockam_multiaddr::proto::Project;
use ockam_multiaddr::{Match, MultiAddr, Protocol};
use ockam_node::Context;

use crate::nodes::service::SecureChannelType;
use ockam::identity::Identifier;
use ockam::identity::{Identifier, SecureChannelOptions, SecureChannels};
use ockam_transport_tcp::TcpTransport;
use std::time::Duration;

/// Creates a secure connection to the project using provided credential
pub(crate) struct ProjectInstantiator {
identifier: Identifier,
timeout: Option<Duration>,
cli_state: CliState,
secure_channels: Arc<SecureChannels>,
tcp_transport: TcpTransport,
}

impl ProjectInstantiator {
pub fn new(identifier: Identifier, timeout: Option<Duration>) -> Self {
pub fn new(
identifier: Identifier,
timeout: Option<Duration>,
cli_state: CliState,
secure_channels: Arc<SecureChannels>,
tcp_transport: TcpTransport,
) -> Self {
Self {
identifier,
timeout,
cli_state,
secure_channels,
tcp_transport,
}
}
}
Expand All @@ -35,11 +47,10 @@ impl Instantiator for ProjectInstantiator {

async fn instantiate(
&self,
ctx: &Context,
node_manager: &NodeManager,
context: &Context,
_transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
) -> Result<Changes, Error> {
) -> Result<Changes, ockam_core::Error> {
let (_before, project_piece, after) = extracted;

let project_protocol_value = project_piece
Expand All @@ -50,11 +61,25 @@ impl Instantiator for ProjectInstantiator {
.cast::<Project>()
.ok_or_else(|| ApiError::core("invalid project protocol in multiaddr"))?;

let (project_multiaddr, project_identifier) =
node_manager.resolve_project(&project).await?;
let (project_multiaddr, project_identifier) = self
.cli_state
.projects()
.get_project_by_name(&project)
.await
.map(|project| {
(
project.project_multiaddr().cloned(),
project
.project_identifier()
.ok_or_else(|| ApiError::core("project identifier is missing")),
)
})?;

let project_identifier = project_identifier?;
let project_multiaddr = project_multiaddr?;

debug!(addr = %project_multiaddr, "creating secure channel");
let tcp = multiaddr_to_route(&project_multiaddr, &node_manager.tcp_transport)
let tcp = multiaddr_to_route(&project_multiaddr, &self.tcp_transport)
.await
.ok_or_else(|| {
ApiError::core(format!(
Expand All @@ -63,27 +88,29 @@ impl Instantiator for ProjectInstantiator {
})?;

debug!("create a secure channel to the project {project_identifier}");
let sc = node_manager
.create_secure_channel_internal(
ctx,
tcp.route,
&self.identifier.clone(),
Some(vec![project_identifier]),
None,
self.timeout,
SecureChannelType::KeyExchangeAndMessages,
)

let options = SecureChannelOptions::new().with_authority(project_identifier);
let options = if let Some(timeout) = self.timeout {
options.with_timeout(timeout)
} else {
options
};

let secure_channel = self
.secure_channels
.create_secure_channel(context, &self.identifier.clone(), tcp.route, options)
.await?;

// when creating a secure channel we want the route to pass through that
// when creating a secure channel, we want the route to pass through that
// ignoring previous steps, since they will be implicit
let mut current_multiaddr = try_address_to_multiaddr(sc.encryptor_address()).unwrap();
let mut current_multiaddr =
try_address_to_multiaddr(secure_channel.encryptor_address()).unwrap();
current_multiaddr.try_extend(after.iter())?;

Ok(Changes {
flow_control_id: Some(sc.flow_control_id().clone()),
flow_control_id: Some(secure_channel.flow_control_id().clone()),
current_multiaddr,
secure_channel_encryptors: vec![sc.encryptor_address().clone()],
secure_channel_encryptors: vec![secure_channel.encryptor_address().clone()],
tcp_connection: tcp.tcp_connection,
})
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::sync::Arc;
use std::time::Duration;

use crate::nodes::connection::{Changes, Instantiator};
use crate::nodes::NodeManager;
use crate::{local_multiaddr_to_route, try_address_to_multiaddr};

use crate::nodes::service::SecureChannelType;
use ockam::identity::Identifier;
use ockam_core::{async_trait, route, AsyncTryClone, Error, Route};
use ockam::identity::{
Identifier, SecureChannelOptions, SecureChannels, TrustEveryonePolicy,
TrustMultiIdentifiersPolicy,
};
use ockam_core::{async_trait, route, Route};
use ockam_multiaddr::proto::Secure;
use ockam_multiaddr::{Match, MultiAddr, Protocol};
use ockam_node::Context;
Expand All @@ -16,18 +18,24 @@ pub(crate) struct SecureChannelInstantiator {
identifier: Identifier,
authorized_identities: Option<Vec<Identifier>>,
timeout: Option<Duration>,
secure_channels: Arc<SecureChannels>,
authority: Option<Identifier>,
}

impl SecureChannelInstantiator {
pub(crate) fn new(
identifier: &Identifier,
timeout: Option<Duration>,
authorized_identities: Option<Vec<Identifier>>,
authority: Option<Identifier>,
secure_channels: Arc<SecureChannels>,
) -> Self {
Self {
identifier: identifier.clone(),
authorized_identities,
authority,
timeout,
secure_channels,
}
}
}
Expand All @@ -40,39 +48,53 @@ impl Instantiator for SecureChannelInstantiator {

async fn instantiate(
&self,
ctx: &Context,
node_manager: &NodeManager,
context: &Context,
transport_route: Route,
extracted: (MultiAddr, MultiAddr, MultiAddr),
) -> Result<Changes, Error> {
) -> Result<Changes, ockam_core::Error> {
let (_before, secure_piece, after) = extracted;
debug!(%secure_piece, %transport_route, "creating secure channel");
let route = local_multiaddr_to_route(&secure_piece)?;

let sc_ctx = ctx.async_try_clone().await?;
let sc = node_manager
.create_secure_channel_internal(
&sc_ctx,
//the transport route is needed to reach the secure channel listener
//since it can be in another node
route![transport_route, route],
let options = SecureChannelOptions::new();

let options = match self.authorized_identities.clone() {
Some(ids) => options.with_trust_policy(TrustMultiIdentifiersPolicy::new(ids)),
None => options.with_trust_policy(TrustEveryonePolicy),
};

let options = if let Some(authority) = self.authority.clone() {
options.with_authority(authority)
} else {
options
};

let options = if let Some(timeout) = self.timeout {
options.with_timeout(timeout)
} else {
options
};

let secure_channel = self
.secure_channels
.create_secure_channel(
context,
&self.identifier,
self.authorized_identities.clone(),
None,
self.timeout,
SecureChannelType::KeyExchangeAndMessages,
route![transport_route, route],
options,
)
.await?;

// when creating a secure channel we want the route to pass through that
// ignoring previous steps, since they will be implicit
let mut current_multiaddr = try_address_to_multiaddr(sc.encryptor_address()).unwrap();
let mut current_multiaddr =
try_address_to_multiaddr(secure_channel.encryptor_address()).unwrap();
current_multiaddr.try_extend(after.iter())?;

Ok(Changes {
current_multiaddr,
flow_control_id: Some(sc.flow_control_id().clone()),
secure_channel_encryptors: vec![sc.encryptor_address().clone()],
flow_control_id: Some(secure_channel.flow_control_id().clone()),
secure_channel_encryptors: vec![secure_channel.encryptor_address().clone()],
tcp_connection: None,
})
}
Expand Down
Loading
Loading