Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): use localhost constructor for HostnamePort #8737

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// 4. create a tcp outlet with the above policy
tcp.create_outlet(
"outlet",
HostnamePort::new("127.0.0.1", 5000)?,
HostnamePort::localhost(5000),
TcpOutletOptions::new()
.with_incoming_access_control_impl(incoming_access_control)
.with_outgoing_access_control_impl(outgoing_access_control),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
let mut consumer_mock_kafka = TcpServerSimulator::start("127.0.0.1:0").await;
handle.tcp.create_outlet(
"kafka_consumer_outlet",
HostnamePort::new("127.0.0.1", consumer_mock_kafka.port)?,
HostnamePort::localhost(consumer_mock_kafka.port),
TcpOutletOptions::new(),
)?;

Expand All @@ -173,7 +173,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
let mut producer_mock_kafka = TcpServerSimulator::start("127.0.0.1:0").await;
handle.tcp.create_outlet(
"kafka_producer_outlet",
HostnamePort::new("127.0.0.1", producer_mock_kafka.port)?,
HostnamePort::localhost(producer_mock_kafka.port),
TcpOutletOptions::new(),
)?;
let request =
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
let mut consumer_mock_kafka = TcpServerSimulator::start("127.0.0.1:0").await;
handle.tcp.create_outlet(
"kafka_consumer_outlet",
HostnamePort::new("127.0.0.1", consumer_mock_kafka.port)?,
HostnamePort::localhost(consumer_mock_kafka.port),
TcpOutletOptions::new(),
)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,6 @@ mod tests {
}

fn outlet_info(worker_addr: Address) -> OutletInfo {
OutletInfo::new(
HostnamePort::new("127.0.0.1", 0).unwrap(),
Some(&worker_addr),
true,
)
OutletInfo::new(HostnamePort::localhost(0), Some(&worker_addr), true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl RendezvousHealthcheck {
udp_socket_address: SocketAddr,
) -> Result<Self> {
let peer = if udp_socket_address.ip().is_unspecified() {
HostnamePort::new("localhost", udp_socket_address.port())?.to_string()
HostnamePort::localhost(udp_socket_address.port()).to_string()
} else {
udp_socket_address.to_string()
};
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_api/tests/latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub fn measure_buffer_latency_two_nodes_portal() {
.node_manager
.create_inlet(
&first_node.context,
HostnamePort::new("127.0.0.1", 0)?,
HostnamePort::localhost(0),
route![],
route![],
second_node_listen_address
Expand Down
10 changes: 5 additions & 5 deletions implementations/rust/ockam/ockam_api/tests/portals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn inlet_outlet_local_successful(context: &mut Context) -> ockam::Result<(
.node_manager
.create_inlet(
context,
HostnamePort::new("127.0.0.1", 0)?,
HostnamePort::localhost(0),
route![],
route![],
MultiAddr::from_str("/secure/api/service/outlet")?,
Expand Down Expand Up @@ -122,7 +122,7 @@ fn portal_node_goes_down_reconnect() {
.node_manager
.create_inlet(
&first_node.context,
HostnamePort::new("127.0.0.1", 0)?,
HostnamePort::localhost(0),
route![],
route![],
second_node_listen_address
Expand Down Expand Up @@ -280,7 +280,7 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() {
.node_manager
.create_inlet(
&first_node.context,
HostnamePort::new("127.0.0.1", 0)?,
HostnamePort::localhost(0),
route![],
route![],
InternetAddress::from(passthrough_server_handle.chosen_addr)
Expand Down Expand Up @@ -394,7 +394,7 @@ fn portal_heavy_load_exchanged() {
.node_manager
.create_inlet(
&first_node.context,
HostnamePort::new("127.0.0.1", 0)?,
HostnamePort::localhost(0),
route![],
route![],
second_node_listen_address
Expand Down Expand Up @@ -547,7 +547,7 @@ fn test_portal_payload_transfer(outgoing_disruption: Disruption, incoming_disrup
.node_manager
.create_inlet(
&first_node.context,
HostnamePort::new("127.0.0.1", 0)?,
HostnamePort::localhost(0),
route![],
route![],
InternetAddress::from(passthrough_server_handle.chosen_addr)
Expand Down
39 changes: 17 additions & 22 deletions implementations/rust/ockam/ockam_command/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ pub(crate) mod outlet;
pub(crate) mod producer;
pub(crate) mod util;

const KAFKA_DEFAULT_CONSUMER_SERVER: StaticHostnamePort =
StaticHostnamePort::new("127.0.0.1", 4000);
const KAFKA_DEFAULT_PRODUCER_SERVER: StaticHostnamePort =
StaticHostnamePort::new("127.0.0.1", 5000);
const KAFKA_DEFAULT_BOOTSTRAP_ADDRESS: StaticHostnamePort = StaticHostnamePort::localhost(9092);
fn kafka_default_outlet_server() -> SchemeHostnamePort {
KAFKA_DEFAULT_BOOTSTRAP_ADDRESS.try_into().unwrap()
}

const KAFKA_DEFAULT_PROJECT_ROUTE: &str = "/project/default";
fn kafka_default_project_route() -> MultiAddr {
MultiAddr::from_str(KAFKA_DEFAULT_PROJECT_ROUTE).expect("Failed to parse default project route")
}

const KAFKA_DEFAULT_CONSUMER_SERVER: StaticHostnamePort = StaticHostnamePort::localhost(4000);
fn kafka_default_consumer_server() -> SchemeHostnamePort {
KAFKA_DEFAULT_CONSUMER_SERVER.try_into().unwrap()
}

const KAFKA_DEFAULT_INLET_BIND_ADDRESS: StaticHostnamePort = StaticHostnamePort::localhost(4000);
fn kafka_default_inlet_bind_address() -> SchemeHostnamePort {
KAFKA_DEFAULT_INLET_BIND_ADDRESS.try_into().unwrap()
}

const KAFKA_DEFAULT_PRODUCER_SERVER: StaticHostnamePort = StaticHostnamePort::localhost(5000);
fn kafka_default_producer_server() -> SchemeHostnamePort {
KAFKA_DEFAULT_PRODUCER_SERVER.try_into().unwrap()
}

const KAFKA_DEFAULT_BOOTSTRAP_ADDRESS: StaticHostnamePort =
StaticHostnamePort::new("127.0.0.1", 9092);
const KAFKA_DEFAULT_PROJECT_ROUTE: &str = "/project/default";
const KAFKA_DEFAULT_INLET_BIND_ADDRESS: StaticHostnamePort =
StaticHostnamePort::new("127.0.0.1", 4000);

fn kafka_default_outlet_addr() -> String {
DefaultAddress::KAFKA_OUTLET.to_string()
}
Expand All @@ -37,18 +44,6 @@ fn kafka_inlet_default_addr() -> String {
DefaultAddress::KAFKA_INLET.to_string()
}

fn kafka_default_project_route() -> MultiAddr {
MultiAddr::from_str(KAFKA_DEFAULT_PROJECT_ROUTE).expect("Failed to parse default project route")
}

fn kafka_default_outlet_server() -> SchemeHostnamePort {
KAFKA_DEFAULT_BOOTSTRAP_ADDRESS.try_into().unwrap()
}

fn kafka_default_inlet_bind_address() -> SchemeHostnamePort {
KAFKA_DEFAULT_INLET_BIND_ADDRESS.try_into().unwrap()
}

pub(crate) fn make_brokers_port_range<T: Into<HostnamePort>>(bootstrap_server: T) -> PortRange {
let boostrap_server_port = bootstrap_server.into().port() as u32;
let start = min(boostrap_server_port + 1, u16::MAX as u32) as u16;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ impl StaticHostnamePort {
pub const fn new(hostname: &'static str, port: u16) -> Self {
Self { hostname, port }
}

pub const fn localhost(port: u16) -> Self {
Self {
hostname: "127.0.0.1",
port,
}
}
}

impl TryFrom<StaticHostnamePort> for HostnamePort {
Expand Down Expand Up @@ -169,6 +176,13 @@ impl HostnamePort {
port,
})
}

pub fn localhost(port: u16) -> Self {
Self {
hostname: "127.0.0.1".into(),
port,
}
}
}

impl From<SocketAddr> for HostnamePort {
Expand Down Expand Up @@ -226,12 +240,12 @@ impl FromStr for HostnamePort {
fn from_str(hostname_port: &str) -> ockam_core::Result<HostnamePort> {
// edge case: only the port is given
if let Ok(port) = hostname_port.parse::<u16>() {
return HostnamePort::new("127.0.0.1", port);
return Ok(HostnamePort::localhost(port));
}

if let Some(port_str) = hostname_port.strip_prefix(':') {
if let Ok(port) = port_str.parse::<u16>() {
return HostnamePort::new("127.0.0.1", port);
return Ok(HostnamePort::localhost(port));
}
}

Expand Down Expand Up @@ -260,14 +274,14 @@ mod tests {
let valid_cases = vec![
("localhost:80", HostnamePort::new("localhost", 80)?),
("33domain:80", HostnamePort::new("33domain", 80)?),
("127.0.0.1:80", HostnamePort::new("127.0.0.1", 80)?),
("127.0.0.1:80", HostnamePort::localhost(80)),
("xn--74h.com:80", HostnamePort::new("xn--74h.com", 80)?),
(
"sub.xn_74h.com:80",
HostnamePort::new("sub.xn_74h.com", 80)?,
),
(":80", HostnamePort::new("127.0.0.1", 80)?),
("80", HostnamePort::new("127.0.0.1", 80)?),
(":80", HostnamePort::localhost(80)),
("80", HostnamePort::localhost(80)),
(
"[2001:db8:85a3::8a2e:370:7334]:8080",
HostnamePort::new("[2001:db8:85a3::8a2e:370:7334]", 8080)?,
Expand All @@ -286,7 +300,7 @@ mod tests {
let socket_address_cases = vec![
(
SocketAddr::from_str("127.0.0.1:8080").unwrap(),
HostnamePort::new("127.0.0.1", 8080)?,
HostnamePort::localhost(8080),
),
(
SocketAddr::from_str("[2001:db8:85a3::8a2e:370:7334]:8080").unwrap(),
Expand Down
Loading