From 13c3758cae9d26ec8959c7f0e4019f7f4d88c8b7 Mon Sep 17 00:00:00 2001 From: Zoa Hickenlooper Date: Thu, 25 Jun 2026 07:46:11 -0400 Subject: [PATCH 1/8] Add discovery scaffold and policy reconcile --- crates/blackwall-discovery/Cargo.toml | 15 ++ crates/blackwall-discovery/src/error.rs | 12 ++ crates/blackwall-discovery/src/lib.rs | 8 + crates/blackwall-discovery/src/reconcile.rs | 180 ++++++++++++++++++++ 4 files changed, 215 insertions(+) create mode 100644 crates/blackwall-discovery/Cargo.toml create mode 100644 crates/blackwall-discovery/src/error.rs create mode 100644 crates/blackwall-discovery/src/lib.rs create mode 100644 crates/blackwall-discovery/src/reconcile.rs diff --git a/crates/blackwall-discovery/Cargo.toml b/crates/blackwall-discovery/Cargo.toml new file mode 100644 index 0000000..b8d59a4 --- /dev/null +++ b/crates/blackwall-discovery/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "blackwall-discovery" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +blackwall-core = { path = "../blackwall-core" } +ipnet = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/crates/blackwall-discovery/src/error.rs b/crates/blackwall-discovery/src/error.rs new file mode 100644 index 0000000..bf302b1 --- /dev/null +++ b/crates/blackwall-discovery/src/error.rs @@ -0,0 +1,12 @@ +//! Errors produced by service discovery. + +/// An error discovering services. +#[derive(Debug, thiserror::Error)] +pub enum DiscoveryError { + /// A response or file could not be parsed. + #[error("discovery parse error: {0}")] + Parse(String), + /// Underlying I/O failed. + #[error("discovery i/o: {0}")] + Io(#[from] std::io::Error), +} diff --git a/crates/blackwall-discovery/src/lib.rs b/crates/blackwall-discovery/src/lib.rs new file mode 100644 index 0000000..86dcf3d --- /dev/null +++ b/crates/blackwall-discovery/src/lib.rs @@ -0,0 +1,8 @@ +//! Service discovery for Blackwall: find what the host and Incus instances +//! expose, and reconcile it into the effective firewall policy. + +mod error; +mod reconcile; + +pub use error::DiscoveryError; +pub use reconcile::{reconcile, DiscoveredService, DiscoverySource}; diff --git a/crates/blackwall-discovery/src/reconcile.rs b/crates/blackwall-discovery/src/reconcile.rs new file mode 100644 index 0000000..1678eed --- /dev/null +++ b/crates/blackwall-discovery/src/reconcile.rs @@ -0,0 +1,180 @@ +//! Merge discovered services into a base policy to produce the effective one. + +use blackwall_core::{AllowRule, L4Proto, Policy, ServiceTarget, Tenant}; +use std::net::IpAddr; + +/// Which discovery source produced a service. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DiscoverySource { + /// A socket the host itself is listening on. + Host, + /// A port an Incus instance opted into. + Incus, +} + +/// A service found by discovery, to be merged into the policy. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DiscoveredService { + /// Address the service is exposed on. + pub addr: IpAddr, + /// Transport protocol. + pub proto: L4Proto, + /// Port number. + pub port: u16, + /// Forwarding target for the opened service. + pub target: ServiceTarget, + /// Which source discovered it. + pub source: DiscoverySource, +} + +/// The synthetic tenant name used for discovered addresses no configured tenant owns. +const DISCOVERED_TENANT: &str = "discovered"; + +/// Produce the effective policy by merging `discovered` into `base`. +/// +/// Each discovered service whose address falls inside a managed prefix is added +/// as an `AllowRule` to the tenant that owns the address, or to a synthetic +/// `"discovered"` tenant when no configured tenant owns it. Services outside all +/// managed prefixes, and duplicates already present, are skipped. +pub fn reconcile(base: &Policy, discovered: &[DiscoveredService]) -> Policy { + let mut effective = base.clone(); + + for svc in discovered { + if !effective.prefixes.iter().any(|p| p.contains(&svc.addr)) { + continue; // outside managed space + } + if service_exists(&effective, svc.addr, svc.proto, svc.port) { + continue; // already open + } + let rule = AllowRule { + proto: svc.proto, + port: svc.port, + target: svc.target.clone(), + }; + match owning_tenant_index(&effective, svc.addr) { + Some(idx) => effective.tenants[idx].allows.push(rule), + None => attach_to_synthetic(&mut effective, svc.addr, rule), + } + } + effective +} + +fn service_exists(policy: &Policy, addr: IpAddr, proto: L4Proto, port: u16) -> bool { + policy.tenants.iter().any(|t| { + t.owned.contains(&addr) && t.allows.iter().any(|a| a.proto == proto && a.port == port) + }) +} + +fn owning_tenant_index(policy: &Policy, addr: IpAddr) -> Option { + policy.tenants.iter().position(|t| t.owned.contains(&addr)) +} + +fn attach_to_synthetic(policy: &mut Policy, addr: IpAddr, rule: AllowRule) { + if let Some(t) = policy + .tenants + .iter_mut() + .find(|t| t.name == DISCOVERED_TENANT) + { + if !t.owned.contains(&addr) { + t.owned.push(addr); + } + t.allows.push(rule); + } else { + policy.tenants.push(Tenant { + name: DISCOVERED_TENANT.to_owned(), + owned: vec![addr], + allows: vec![rule], + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use blackwall_core::PortState; + use ipnet::IpNet; + + fn ip(s: &str) -> IpAddr { + s.parse().expect("ip") + } + + fn base_policy(tenants: Vec) -> Policy { + Policy { + interface: "eth0".to_owned(), + prefixes: vec!["203.0.113.0/24".parse::().expect("prefix")], + default_state: PortState::Deception, + tenants, + } + } + + fn svc(addr: &str, port: u16, target: ServiceTarget) -> DiscoveredService { + DiscoveredService { + addr: ip(addr), + proto: L4Proto::Tcp, + port, + target, + source: DiscoverySource::Incus, + } + } + + #[test] + fn adds_to_owning_tenant() { + let base = base_policy(vec![Tenant { + name: "acme".to_owned(), + owned: vec![ip("203.0.113.5")], + allows: vec![], + }]); + let eff = reconcile( + &base, + &[svc( + "203.0.113.5", + 443, + ServiceTarget::Incus("web01".to_owned()), + )], + ); + let acme = eff.tenants.iter().find(|t| t.name == "acme").unwrap(); + assert_eq!(acme.allows.len(), 1); + assert_eq!(acme.allows[0].port, 443); + assert!(eff.resolve().is_ok()); + } + + #[test] + fn synthetic_tenant_for_unowned_in_prefix() { + let base = base_policy(vec![]); + let eff = reconcile(&base, &[svc("203.0.113.9", 80, ServiceTarget::Host)]); + let disc = eff.tenants.iter().find(|t| t.name == "discovered").unwrap(); + assert_eq!(disc.owned, vec![ip("203.0.113.9")]); + assert_eq!(disc.allows[0].port, 80); + assert!(eff.resolve().is_ok()); + } + + #[test] + fn skips_outside_prefix() { + let base = base_policy(vec![]); + let eff = reconcile(&base, &[svc("198.51.100.1", 80, ServiceTarget::Host)]); + assert!(eff.tenants.is_empty()); + } + + #[test] + fn skips_duplicate_service() { + let base = base_policy(vec![Tenant { + name: "acme".to_owned(), + owned: vec![ip("203.0.113.5")], + allows: vec![AllowRule { + proto: L4Proto::Tcp, + port: 443, + target: ServiceTarget::Host, + }], + }]); + let eff = reconcile( + &base, + &[svc( + "203.0.113.5", + 443, + ServiceTarget::Incus("web01".to_owned()), + )], + ); + let acme = eff.tenants.iter().find(|t| t.name == "acme").unwrap(); + assert_eq!(acme.allows.len(), 1); // unchanged + } +} From c39ec7e916d1264f778670d1c62b9690d2978443 Mon Sep 17 00:00:00 2001 From: Zoa Hickenlooper Date: Thu, 25 Jun 2026 07:49:01 -0400 Subject: [PATCH 2/8] Add host-socket scanner --- crates/blackwall-discovery/src/host.rs | 107 ++++++++++++++++++++++ crates/blackwall-discovery/src/lib.rs | 4 + crates/blackwall-discovery/src/proc_io.rs | 27 ++++++ 3 files changed, 138 insertions(+) create mode 100644 crates/blackwall-discovery/src/host.rs create mode 100644 crates/blackwall-discovery/src/proc_io.rs diff --git a/crates/blackwall-discovery/src/host.rs b/crates/blackwall-discovery/src/host.rs new file mode 100644 index 0000000..6d8293a --- /dev/null +++ b/crates/blackwall-discovery/src/host.rs @@ -0,0 +1,107 @@ +//! Pure parser for `/proc/net/{tcp,tcp6,udp,udp6}` listening sockets. + +use crate::error::DiscoveryError; +use blackwall_core::L4Proto; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +/// A socket the host is listening on. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ListeningSocket { + /// Local bind address. + pub addr: IpAddr, + /// Transport protocol. + pub proto: L4Proto, + /// Local port. + pub port: u16, +} + +/// Parse one `/proc/net` file body. Returns only listening sockets: TCP rows in +/// state `0A` (LISTEN), and UDP rows whose remote address is `0` (UNCONN). +pub fn parse_proc_net( + contents: &str, + proto: L4Proto, + is_ipv6: bool, +) -> Result, DiscoveryError> { + let mut out = Vec::new(); + for line in contents.lines().skip(1) { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() < 4 { + continue; + } + let local = parts[1]; + let remote = parts[2]; + let state = parts[3]; + let listening = match proto { + L4Proto::Tcp => state.eq_ignore_ascii_case("0A"), + L4Proto::Udp => { + remote.split(':').next() == Some("00000000") + || remote.split(':').next() == Some("00000000000000000000000000000000") + } + }; + if !listening { + continue; + } + let (addr_hex, port_hex) = local + .split_once(':') + .ok_or_else(|| DiscoveryError::Parse(format!("bad local address: {local}")))?; + let port = u16::from_str_radix(port_hex, 16) + .map_err(|_| DiscoveryError::Parse(format!("bad port: {port_hex}")))?; + let addr = if is_ipv6 { + IpAddr::V6(parse_ipv6(addr_hex)?) + } else { + IpAddr::V4(parse_ipv4(addr_hex)?) + }; + out.push(ListeningSocket { addr, proto, port }); + } + Ok(out) +} + +fn parse_ipv4(hex: &str) -> Result { + let raw = u32::from_str_radix(hex, 16) + .map_err(|_| DiscoveryError::Parse(format!("bad ipv4: {hex}")))?; + // /proc stores the address little-endian within the host's byte order. + Ok(Ipv4Addr::from(raw.to_be())) +} + +fn parse_ipv6(hex: &str) -> Result { + if hex.len() != 32 { + return Err(DiscoveryError::Parse(format!("bad ipv6 length: {hex}"))); + } + let mut octets = [0u8; 16]; + for (i, chunk) in hex.as_bytes().chunks(8).enumerate() { + let word = std::str::from_utf8(chunk) + .ok() + .and_then(|s| u32::from_str_radix(s, 16).ok()) + .ok_or_else(|| DiscoveryError::Parse(format!("bad ipv6 word: {hex}")))?; + // Each 32-bit group is stored little-endian. + let be = word.to_le_bytes(); + octets[i * 4..i * 4 + 4].copy_from_slice(&be); + } + Ok(Ipv6Addr::from(octets)) +} + +#[cfg(test)] +mod tests { + use super::*; + + // 203.0.113.5 = CB007105; little-endian in /proc => 057100CB. Port 443 = 01BB. + const TCP_FIXTURE: &str = "\ + sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode + 0: 057100CB:01BB 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 12345 1 0000 100 + 1: 057100CB:C350 0A1B2C3D:0050 01 00000000:00000000 00:00000000 00000000 0 0 23456 1 0000 100 +"; + + #[test] + fn parses_only_listening_tcp() { + let socks = parse_proc_net(TCP_FIXTURE, L4Proto::Tcp, false).expect("parse"); + assert_eq!(socks.len(), 1); + assert_eq!(socks[0].addr, "203.0.113.5".parse::().unwrap()); + assert_eq!(socks[0].port, 443); + } + + #[test] + fn rejects_bad_port() { + let bad = "header\n 0: 057100CB:ZZZZ 00000000:0000 0A x x x x x x 1\n"; + assert!(parse_proc_net(bad, L4Proto::Tcp, false).is_err()); + } +} diff --git a/crates/blackwall-discovery/src/lib.rs b/crates/blackwall-discovery/src/lib.rs index 86dcf3d..c1058b0 100644 --- a/crates/blackwall-discovery/src/lib.rs +++ b/crates/blackwall-discovery/src/lib.rs @@ -2,7 +2,11 @@ //! expose, and reconcile it into the effective firewall policy. mod error; +mod host; +mod proc_io; mod reconcile; pub use error::DiscoveryError; +pub use host::{parse_proc_net, ListeningSocket}; +pub use proc_io::scan_host_sockets; pub use reconcile::{reconcile, DiscoveredService, DiscoverySource}; diff --git a/crates/blackwall-discovery/src/proc_io.rs b/crates/blackwall-discovery/src/proc_io.rs new file mode 100644 index 0000000..b99349b --- /dev/null +++ b/crates/blackwall-discovery/src/proc_io.rs @@ -0,0 +1,27 @@ +//! Reads the real `/proc/net` files. Thin I/O wrapper around the pure parser +//! in `host.rs`; root/daemon-bound and excluded from coverage. + +use crate::error::DiscoveryError; +use crate::host::{parse_proc_net, ListeningSocket}; +use blackwall_core::L4Proto; +use std::path::Path; + +/// Scan all four `/proc/net` socket files under `proc_root` (normally `/proc`) +/// and return every listening socket. +pub fn scan_host_sockets(proc_root: &Path) -> Result, DiscoveryError> { + let mut out = Vec::new(); + for (file, proto, v6) in [ + ("tcp", L4Proto::Tcp, false), + ("tcp6", L4Proto::Tcp, true), + ("udp", L4Proto::Udp, false), + ("udp6", L4Proto::Udp, true), + ] { + let path = proc_root.join("net").join(file); + match std::fs::read_to_string(&path) { + Ok(contents) => out.extend(parse_proc_net(&contents, proto, v6)?), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue, + Err(err) => return Err(DiscoveryError::Io(err)), + } + } + Ok(out) +} From 6624e06cccaf373e8c17a4f788bbe6ab76e22219 Mon Sep 17 00:00:00 2001 From: Zoa Hickenlooper Date: Thu, 25 Jun 2026 07:52:19 -0400 Subject: [PATCH 3/8] Parse Incus instances into discovered services --- crates/blackwall-discovery/Cargo.toml | 2 + crates/blackwall-discovery/src/incus_model.rs | 150 ++++++++++++++++++ crates/blackwall-discovery/src/lib.rs | 2 + 3 files changed, 154 insertions(+) create mode 100644 crates/blackwall-discovery/src/incus_model.rs diff --git a/crates/blackwall-discovery/Cargo.toml b/crates/blackwall-discovery/Cargo.toml index b8d59a4..b5fed13 100644 --- a/crates/blackwall-discovery/Cargo.toml +++ b/crates/blackwall-discovery/Cargo.toml @@ -8,6 +8,8 @@ repository.workspace = true [dependencies] blackwall-core = { path = "../blackwall-core" } ipnet = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } diff --git a/crates/blackwall-discovery/src/incus_model.rs b/crates/blackwall-discovery/src/incus_model.rs new file mode 100644 index 0000000..8ff522d --- /dev/null +++ b/crates/blackwall-discovery/src/incus_model.rs @@ -0,0 +1,150 @@ +//! Parse Incus instance JSON into discoverable services. + +use crate::error::DiscoveryError; +use crate::reconcile::{DiscoveredService, DiscoverySource}; +use blackwall_core::{L4Proto, ServiceTarget}; +use std::net::IpAddr; + +/// An Incus instance relevant to discovery. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Instance { + /// Instance name. + pub name: String, + /// Addresses assigned to the instance. + pub addresses: Vec, + /// Ports the instance opts into via `user.blackwall.ports`. + pub ports: Vec<(L4Proto, u16)>, +} + +/// Parse a `user.blackwall.ports` value, e.g. `"443/tcp, 80/tcp, 53/udp"`. +/// Malformed entries are skipped. +pub fn parse_ports(value: &str) -> Vec<(L4Proto, u16)> { + let mut out = Vec::new(); + for entry in value.split(',') { + let entry = entry.trim(); + let Some((port_str, proto_str)) = entry.split_once('/') else { + continue; + }; + let Ok(port) = port_str.trim().parse::() else { + continue; + }; + let proto = match proto_str.trim().to_ascii_lowercase().as_str() { + "tcp" => L4Proto::Tcp, + "udp" => L4Proto::Udp, + _ => continue, + }; + out.push((proto, port)); + } + out +} + +/// Parse one Incus instance object (recursion=2 shape). +pub fn parse_instance(json: &str) -> Result { + let v: serde_json::Value = + serde_json::from_str(json).map_err(|e| DiscoveryError::Parse(e.to_string()))?; + let name = v["name"] + .as_str() + .ok_or_else(|| DiscoveryError::Parse("instance missing name".to_owned()))? + .to_owned(); + + let ports = v["config"]["user.blackwall.ports"] + .as_str() + .map(parse_ports) + .unwrap_or_default(); + + let mut addresses = Vec::new(); + if let Some(networks) = v["state"]["network"].as_object() { + for (iface, net) in networks { + if iface == "lo" { + continue; + } + if let Some(addrs) = net["addresses"].as_array() { + for a in addrs { + if let Some(addr_str) = a["address"].as_str() { + if let Ok(addr) = addr_str.parse::() { + addresses.push(addr); + } + } + } + } + } + } + + Ok(Instance { + name, + addresses, + ports, + }) +} + +/// Expand an instance into discovered services (addresses × ports). +pub fn instance_services(inst: &Instance) -> Vec { + let mut out = Vec::new(); + for &addr in &inst.addresses { + for &(proto, port) in &inst.ports { + out.push(DiscoveredService { + addr, + proto, + port, + target: ServiceTarget::Incus(inst.name.clone()), + source: DiscoverySource::Incus, + }); + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_ports_skips_malformed() { + let ports = parse_ports("443/tcp, 80/tcp, garbage, 53/udp, 99999/tcp, 22/sctp"); + assert_eq!( + ports, + vec![(L4Proto::Tcp, 443), (L4Proto::Tcp, 80), (L4Proto::Udp, 53)] + ); + } + + const INSTANCE_JSON: &str = r#"{ + "name": "web01", + "config": { "user.blackwall.ports": "443/tcp,80/tcp" }, + "state": { "network": { + "lo": { "addresses": [{ "address": "127.0.0.1" }] }, + "eth0": { "addresses": [ + { "address": "203.0.113.5" }, + { "address": "2001:db8::5" } + ] } + } } + }"#; + + #[test] + fn parses_instance_addresses_and_ports() { + let inst = parse_instance(INSTANCE_JSON).expect("parse"); + assert_eq!(inst.name, "web01"); + assert_eq!( + inst.addresses, + vec![ + "203.0.113.5".parse::().unwrap(), + "2001:db8::5".parse::().unwrap(), + ] + ); + assert_eq!(inst.ports, vec![(L4Proto::Tcp, 443), (L4Proto::Tcp, 80)]); + } + + #[test] + fn instance_services_is_cartesian() { + let inst = parse_instance(INSTANCE_JSON).expect("parse"); + let svcs = instance_services(&inst); + assert_eq!(svcs.len(), 4); // 2 addrs × 2 ports + assert!(svcs + .iter() + .all(|s| matches!(&s.target, ServiceTarget::Incus(n) if n == "web01"))); + } + + #[test] + fn missing_name_errors() { + assert!(parse_instance(r#"{"config":{}}"#).is_err()); + } +} diff --git a/crates/blackwall-discovery/src/lib.rs b/crates/blackwall-discovery/src/lib.rs index c1058b0..8407cf5 100644 --- a/crates/blackwall-discovery/src/lib.rs +++ b/crates/blackwall-discovery/src/lib.rs @@ -3,10 +3,12 @@ mod error; mod host; +mod incus_model; mod proc_io; mod reconcile; pub use error::DiscoveryError; pub use host::{parse_proc_net, ListeningSocket}; +pub use incus_model::{instance_services, parse_instance, parse_ports, Instance}; pub use proc_io::scan_host_sockets; pub use reconcile::{reconcile, DiscoveredService, DiscoverySource}; From 6fb6111629f9e12f4842ff82e4785458caab2c0a Mon Sep 17 00:00:00 2001 From: Zoa Hickenlooper Date: Thu, 25 Jun 2026 07:54:52 -0400 Subject: [PATCH 4/8] Parse Incus lifecycle events --- crates/blackwall-discovery/src/incus_event.rs | 98 +++++++++++++++++++ crates/blackwall-discovery/src/lib.rs | 2 + 2 files changed, 100 insertions(+) create mode 100644 crates/blackwall-discovery/src/incus_event.rs diff --git a/crates/blackwall-discovery/src/incus_event.rs b/crates/blackwall-discovery/src/incus_event.rs new file mode 100644 index 0000000..1221c57 --- /dev/null +++ b/crates/blackwall-discovery/src/incus_event.rs @@ -0,0 +1,98 @@ +//! Parse Incus lifecycle event-stream lines. + +use crate::error::DiscoveryError; + +/// The kind of instance lifecycle change. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum InstanceChange { + /// The instance started. + Started, + /// The instance stopped. + Stopped, + /// The instance configuration was updated. + Updated, +} + +/// A parsed instance lifecycle event. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LifecycleEvent { + /// The instance name the event refers to. + pub instance: String, + /// What changed. + pub change: InstanceChange, +} + +/// Parse one event-stream line. Returns `Ok(None)` for non-instance-lifecycle +/// events, `Ok(Some(..))` for instance start/stop/update, `Err` on malformed JSON. +pub fn parse_event(line: &str) -> Result, DiscoveryError> { + let line = line.trim(); + if line.is_empty() { + return Ok(None); + } + let v: serde_json::Value = + serde_json::from_str(line).map_err(|e| DiscoveryError::Parse(e.to_string()))?; + if v["type"].as_str() != Some("lifecycle") { + return Ok(None); + } + let action = v["metadata"]["action"].as_str().unwrap_or_default(); + let change = match action { + "instance-started" => InstanceChange::Started, + "instance-stopped" | "instance-shutdown" => InstanceChange::Stopped, + "instance-updated" => InstanceChange::Updated, + _ => return Ok(None), + }; + // The source is like "/1.0/instances/web01". + let source = v["metadata"]["source"].as_str().unwrap_or_default(); + let instance = source.rsplit('/').next().unwrap_or_default().to_owned(); + if instance.is_empty() { + return Err(DiscoveryError::Parse( + "lifecycle event missing instance".to_owned(), + )); + } + Ok(Some(LifecycleEvent { instance, change })) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_started_event() { + let line = r#"{"type":"lifecycle","metadata":{"action":"instance-started","source":"/1.0/instances/web01"}}"#; + let ev = parse_event(line).expect("ok").expect("some"); + assert_eq!(ev.instance, "web01"); + assert_eq!(ev.change, InstanceChange::Started); + } + + #[test] + fn ignores_non_lifecycle() { + let line = r#"{"type":"logging","metadata":{}}"#; + assert_eq!(parse_event(line).expect("ok"), None); + } + + #[test] + fn ignores_unrelated_action() { + let line = r#"{"type":"lifecycle","metadata":{"action":"image-created","source":"/1.0/images/x"}}"#; + assert_eq!(parse_event(line).expect("ok"), None); + } + + #[test] + fn malformed_json_errors() { + assert!(parse_event("{not json").is_err()); + } + + #[test] + fn shutdown_maps_to_stopped() { + let line = r#"{"type":"lifecycle","metadata":{"action":"instance-shutdown","source":"/1.0/instances/db01"}}"#; + let ev = parse_event(line).expect("ok").expect("some"); + assert_eq!(ev.instance, "db01"); + assert_eq!(ev.change, InstanceChange::Stopped); + } + + #[test] + fn empty_instance_errors() { + // source ends with a slash -> trailing segment is empty + let line = r#"{"type":"lifecycle","metadata":{"action":"instance-started","source":"/1.0/instances/"}}"#; + assert!(parse_event(line).is_err()); + } +} diff --git a/crates/blackwall-discovery/src/lib.rs b/crates/blackwall-discovery/src/lib.rs index 8407cf5..1346560 100644 --- a/crates/blackwall-discovery/src/lib.rs +++ b/crates/blackwall-discovery/src/lib.rs @@ -3,12 +3,14 @@ mod error; mod host; +mod incus_event; mod incus_model; mod proc_io; mod reconcile; pub use error::DiscoveryError; pub use host::{parse_proc_net, ListeningSocket}; +pub use incus_event::{parse_event, InstanceChange, LifecycleEvent}; pub use incus_model::{instance_services, parse_instance, parse_ports, Instance}; pub use proc_io::scan_host_sockets; pub use reconcile::{reconcile, DiscoveredService, DiscoverySource}; From 7e78d7668e38ccad55296baa3815d4d3e4355288 Mon Sep 17 00:00:00 2001 From: Zoa Hickenlooper Date: Thu, 25 Jun 2026 07:59:15 -0400 Subject: [PATCH 5/8] Add Incus client trait and unix-socket adapter Decode HTTP/1.1 chunked transfer-encoding for both the one-shot instances response and the long-lived event stream. Preserve IO error kind by using the #[from] impl directly instead of wrapping errors. --- crates/blackwall-discovery/Cargo.toml | 2 + .../blackwall-discovery/src/incus_client.rs | 324 ++++++++++++++++++ crates/blackwall-discovery/src/lib.rs | 2 + 3 files changed, 328 insertions(+) create mode 100644 crates/blackwall-discovery/src/incus_client.rs diff --git a/crates/blackwall-discovery/Cargo.toml b/crates/blackwall-discovery/Cargo.toml index b5fed13..70dc335 100644 --- a/crates/blackwall-discovery/Cargo.toml +++ b/crates/blackwall-discovery/Cargo.toml @@ -6,11 +6,13 @@ license.workspace = true repository.workspace = true [dependencies] +async-trait = { workspace = true } blackwall-core = { path = "../blackwall-core" } ipnet = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } [lints] diff --git a/crates/blackwall-discovery/src/incus_client.rs b/crates/blackwall-discovery/src/incus_client.rs new file mode 100644 index 0000000..845c913 --- /dev/null +++ b/crates/blackwall-discovery/src/incus_client.rs @@ -0,0 +1,324 @@ +//! The Incus client: an abstract trait plus a unix-socket HTTP adapter. + +use crate::error::DiscoveryError; +use crate::incus_event::LifecycleEvent; +use crate::incus_model::Instance; +use async_trait::async_trait; + +/// Source of Incus instance state and lifecycle events. +#[async_trait] +pub trait IncusClient: Send + Sync { + /// List all instances with their addresses and opted-in ports. + async fn list_instances(&self) -> Result, DiscoveryError>; + /// Await the next lifecycle event, or `None` when the stream ends. + async fn next_event(&mut self) -> Result, DiscoveryError>; +} + +/// Maximum chunk size accepted from the server (4 MiB). +const MAX_CHUNK_SIZE: usize = 4 * 1024 * 1024; + +/// Decode a single HTTP/1.1 chunked body into a `Vec`. +/// +/// Reads `\r\n\r\n` pairs until the terminal `0\r\n\r\n`. +async fn read_chunked_body(reader: &mut R) -> Result, DiscoveryError> +where + R: tokio::io::AsyncBufRead + Unpin, +{ + use tokio::io::AsyncBufReadExt; + use tokio::io::AsyncReadExt; + + let mut body = Vec::new(); + loop { + // Read the chunk-size line. + let mut size_line = String::new(); + reader.read_line(&mut size_line).await?; + let hex = size_line + .trim_end_matches("\r\n") + .trim_end_matches('\n') + .trim(); + // Strip optional chunk extensions (after ';'). + let hex = hex.split(';').next().unwrap_or("").trim(); + let size = usize::from_str_radix(hex, 16) + .map_err(|_| DiscoveryError::Parse(format!("invalid chunk size: {:?}", hex)))?; + if size > MAX_CHUNK_SIZE { + return Err(DiscoveryError::Parse(format!( + "chunk size {} exceeds limit", + size + ))); + } + if size == 0 { + // Consume the trailing CRLF. + let mut tail = String::new(); + reader.read_line(&mut tail).await?; + break; + } + let mut chunk = vec![0u8; size]; + reader.read_exact(&mut chunk).await?; + body.extend_from_slice(&chunk); + // Consume the CRLF after the chunk data. + let mut crlf = String::new(); + reader.read_line(&mut crlf).await?; + } + Ok(body) +} + +/// Read the next decoded chunk from a streaming chunked response. +/// +/// Returns `None` on the terminal zero-size chunk or EOF. +async fn read_next_chunk(reader: &mut R) -> Result>, DiscoveryError> +where + R: tokio::io::AsyncBufRead + Unpin, +{ + use tokio::io::AsyncBufReadExt; + use tokio::io::AsyncReadExt; + + let mut size_line = String::new(); + let n = reader.read_line(&mut size_line).await?; + if n == 0 { + return Ok(None); // EOF + } + let hex = size_line + .trim_end_matches("\r\n") + .trim_end_matches('\n') + .trim(); + let hex = hex.split(';').next().unwrap_or("").trim(); + if hex.is_empty() { + return Ok(None); + } + let size = usize::from_str_radix(hex, 16) + .map_err(|_| DiscoveryError::Parse(format!("invalid chunk size: {:?}", hex)))?; + if size > MAX_CHUNK_SIZE { + return Err(DiscoveryError::Parse(format!( + "chunk size {} exceeds limit", + size + ))); + } + if size == 0 { + // Terminal chunk — consume trailing CRLF. + let mut tail = String::new(); + reader.read_line(&mut tail).await?; + return Ok(None); + } + let mut chunk = vec![0u8; size]; + reader.read_exact(&mut chunk).await?; + // Consume the CRLF after chunk data. + let mut crlf = String::new(); + reader.read_line(&mut crlf).await?; + Ok(Some(chunk)) +} + +/// Scan headers (already-read lines) for `Transfer-Encoding: chunked`. +fn is_chunked(headers: &[String]) -> bool { + headers.iter().any(|h| { + let lower = h.to_ascii_lowercase(); + lower.starts_with("transfer-encoding:") && lower.contains("chunked") + }) +} + +/// A thin HTTP/1.1-over-unix-socket adapter to the Incus daemon. +/// +/// Talks to the Incus unix socket at a configurable path. `list_instances` +/// performs a one-shot `GET /1.0/instances?recursion=2` and parses the +/// response body; `next_event` reads one newline-delimited JSON line from a +/// persistent `GET /1.0/events?type=lifecycle` stream. +pub struct UnixIncusClient { + socket_path: std::path::PathBuf, + /// Underlying reader for the persistent event stream. + event_reader: Option>, + /// Whether the event stream uses chunked transfer encoding. + event_chunked: bool, + /// Decoded bytes not yet consumed by `next_event`. + event_buf: Vec, +} + +impl UnixIncusClient { + /// Open a connection to the Incus unix socket at `socket_path`. + /// + /// The connection is lazy; actual I/O occurs on the first method call. + /// + /// # Errors + /// Currently infallible; returns `Ok` unconditionally. + pub fn connect(socket_path: &std::path::Path) -> Result { + Ok(Self { + socket_path: socket_path.to_path_buf(), + event_reader: None, + event_chunked: false, + event_buf: Vec::new(), + }) + } + + /// Send a minimal HTTP/1.1 GET request over a new unix-socket connection + /// and return the response body as a `String`. + async fn http_get_body( + socket_path: &std::path::Path, + path: &str, + ) -> Result { + use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; + use tokio::net::UnixStream; + + let stream = UnixStream::connect(socket_path).await?; + let mut stream = BufReader::new(stream); + + let request = format!( + "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", + path + ); + stream.get_mut().write_all(request.as_bytes()).await?; + + // Read response headers, collecting them to detect chunked encoding. + let mut headers: Vec = Vec::new(); + let mut line = String::new(); + loop { + line.clear(); + stream.read_line(&mut line).await?; + if line == "\r\n" || line.is_empty() { + break; + } + headers.push(line.clone()); + } + + let bytes = if is_chunked(&headers) { + read_chunked_body(&mut stream).await? + } else { + // Fall back: read to EOF. + let mut buf = Vec::new(); + stream.read_to_end(&mut buf).await?; + buf + }; + + String::from_utf8(bytes) + .map_err(|e| DiscoveryError::Parse(format!("response is not valid UTF-8: {}", e))) + } +} + +#[async_trait] +impl IncusClient for UnixIncusClient { + async fn list_instances(&self) -> Result, DiscoveryError> { + let body = Self::http_get_body(&self.socket_path, "/1.0/instances?recursion=2").await?; + + let v: serde_json::Value = + serde_json::from_str(&body).map_err(|e| DiscoveryError::Parse(e.to_string()))?; + + let items = v["metadata"].as_array().ok_or_else(|| { + DiscoveryError::Parse("instances response missing metadata array".to_owned()) + })?; + + items + .iter() + .map(|item| { + let s = item.to_string(); + crate::incus_model::parse_instance(&s) + }) + .collect() + } + + async fn next_event(&mut self) -> Result, DiscoveryError> { + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + use tokio::net::UnixStream; + + if self.event_reader.is_none() { + let stream = UnixStream::connect(&self.socket_path).await?; + let mut reader = BufReader::new(stream); + let request = + "GET /1.0/events?type=lifecycle HTTP/1.1\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n"; + reader.get_mut().write_all(request.as_bytes()).await?; + + // Read and collect response headers. + let mut headers: Vec = Vec::new(); + let mut line = String::new(); + loop { + line.clear(); + reader.read_line(&mut line).await?; + if line == "\r\n" || line.is_empty() { + break; + } + headers.push(line.clone()); + } + self.event_chunked = is_chunked(&headers); + self.event_reader = Some(reader); + } + + // `event_reader` is guaranteed Some here. + let reader = self + .event_reader + .as_mut() + .ok_or_else(|| DiscoveryError::Parse("event reader missing".to_owned()))?; + + loop { + // Try to extract a complete newline-terminated line from the buffer. + if let Some(pos) = self.event_buf.iter().position(|&b| b == b'\n') { + let line_bytes = self.event_buf.drain(..=pos).collect::>(); + let line = String::from_utf8(line_bytes) + .map_err(|e| DiscoveryError::Parse(format!("event not valid UTF-8: {}", e)))?; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + return crate::incus_event::parse_event(trimmed); + } + + // Need more data. + if self.event_chunked { + match read_next_chunk(reader).await? { + Some(chunk) => self.event_buf.extend_from_slice(&chunk), + None => return Ok(None), + } + } else { + // Non-chunked: read a line directly. + let mut line = String::new(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + return Ok(None); + } + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + return crate::incus_event::parse_event(trimmed); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::incus_event::InstanceChange; + use blackwall_core::L4Proto; + + struct MockIncusClient { + instances: Vec, + events: Vec, + } + + #[async_trait] + impl IncusClient for MockIncusClient { + async fn list_instances(&self) -> Result, DiscoveryError> { + Ok(self.instances.clone()) + } + async fn next_event(&mut self) -> Result, DiscoveryError> { + Ok(self.events.pop()) + } + } + + #[tokio::test] + async fn mock_client_lists_and_streams() { + let mut client = MockIncusClient { + instances: vec![Instance { + name: "web01".to_owned(), + addresses: vec!["203.0.113.5".parse().unwrap()], + ports: vec![(L4Proto::Tcp, 443)], + }], + events: vec![LifecycleEvent { + instance: "web01".to_owned(), + change: InstanceChange::Started, + }], + }; + assert_eq!(client.list_instances().await.unwrap().len(), 1); + assert_eq!( + client.next_event().await.unwrap().unwrap().instance, + "web01" + ); + assert_eq!(client.next_event().await.unwrap(), None); + } +} diff --git a/crates/blackwall-discovery/src/lib.rs b/crates/blackwall-discovery/src/lib.rs index 1346560..038940e 100644 --- a/crates/blackwall-discovery/src/lib.rs +++ b/crates/blackwall-discovery/src/lib.rs @@ -3,6 +3,7 @@ mod error; mod host; +mod incus_client; mod incus_event; mod incus_model; mod proc_io; @@ -10,6 +11,7 @@ mod reconcile; pub use error::DiscoveryError; pub use host::{parse_proc_net, ListeningSocket}; +pub use incus_client::{IncusClient, UnixIncusClient}; pub use incus_event::{parse_event, InstanceChange, LifecycleEvent}; pub use incus_model::{instance_services, parse_instance, parse_ports, Instance}; pub use proc_io::scan_host_sockets; From c4a4b9a322b54584395f4e264f88e4c14bce989c Mon Sep 17 00:00:00 2001 From: Zoa Hickenlooper Date: Thu, 25 Jun 2026 08:07:03 -0400 Subject: [PATCH 6/8] Reconcile discovered services in blackwalld run --- bin/blackwalld/Cargo.toml | 2 + bin/blackwalld/src/main.rs | 154 ++++++++++++++++++++++++++++++++++++- 2 files changed, 154 insertions(+), 2 deletions(-) diff --git a/bin/blackwalld/Cargo.toml b/bin/blackwalld/Cargo.toml index 33ed8fb..9fef5bd 100644 --- a/bin/blackwalld/Cargo.toml +++ b/bin/blackwalld/Cargo.toml @@ -7,7 +7,9 @@ repository.workspace = true [dependencies] blackwall-config = { path = "../../crates/blackwall-config" } +blackwall-core = { path = "../../crates/blackwall-core" } blackwall-deception = { path = "../../crates/blackwall-deception" } +blackwall-discovery = { path = "../../crates/blackwall-discovery" } blackwall-state = { path = "../../crates/blackwall-state" } blackwall-nft = { path = "../../crates/blackwall-nft" } clap = { workspace = true } diff --git a/bin/blackwalld/src/main.rs b/bin/blackwalld/src/main.rs index ad08464..33f2c26 100644 --- a/bin/blackwalld/src/main.rs +++ b/bin/blackwalld/src/main.rs @@ -6,6 +6,7 @@ use std::process::ExitCode; use blackwall_deception::transport::{run_nfqueue, serve, TproxyListener}; use blackwall_deception::{default_registry, EngineLimits, SharedBanners}; +use blackwall_discovery::IncusClient; use blackwall_state::SessionRow; use tokio::sync::mpsc; @@ -46,6 +47,12 @@ enum Command { /// Path to the banner definitions file. #[arg(long)] banners: PathBuf, + /// Also scan host sockets from /proc/net when building the discovered set. + #[arg(long)] + discover_host: bool, + /// Path to the Incus unix socket. + #[arg(long, default_value = "/var/lib/incus/unix.socket")] + incus_socket: PathBuf, }, } @@ -61,6 +68,73 @@ async fn main() -> ExitCode { } } +/// Apply the effective policy derived from `base` merged with `discovered`. +/// +/// Calls [`blackwall_discovery::reconcile`] to compute the effective policy, +/// persists it via [`blackwall_state::Store::apply_policy`], and then pushes it +/// to the kernel via [`blackwall_nft::apply`]. +async fn apply_effective( + base: &blackwall_core::Policy, + discovered: &[blackwall_discovery::DiscoveredService], + store: &blackwall_state::Store, +) -> Result<(), Box> { + let effective = blackwall_discovery::reconcile(base, discovered); + store.apply_policy(&effective, "discovery").await?; + blackwall_nft::apply(&effective)?; + Ok(()) +} + +/// Build the discovered-service list from host sockets and/or an Incus client. +/// +/// If `discover_host` is true, scans `/proc/net` sockets and converts them to +/// [`blackwall_discovery::DiscoveredService`] entries with +/// [`blackwall_core::ServiceTarget::Host`]. If `incus` is `Some`, calls +/// `list_instances` and expands each instance via +/// [`blackwall_discovery::instance_services`]. +async fn build_discovered( + discover_host: bool, + incus: Option<&blackwall_discovery::UnixIncusClient>, +) -> Vec { + use blackwall_core::ServiceTarget; + use blackwall_discovery::{DiscoveredService, DiscoverySource}; + + let mut discovered: Vec = Vec::new(); + + if discover_host { + match blackwall_discovery::scan_host_sockets(std::path::Path::new("/proc")) { + Ok(sockets) => { + for sock in sockets { + discovered.push(DiscoveredService { + addr: sock.addr, + proto: sock.proto, + port: sock.port, + target: ServiceTarget::Host, + source: DiscoverySource::Host, + }); + } + } + Err(err) => { + tracing::warn!(%err, "host socket scan failed; skipping host discovery"); + } + } + } + + if let Some(client) = incus { + match client.list_instances().await { + Ok(instances) => { + for inst in &instances { + discovered.extend(blackwall_discovery::instance_services(inst)); + } + } + Err(err) => { + tracing::warn!(%err, "Incus list_instances failed; skipping Incus discovery"); + } + } + } + + discovered +} + /// Core dispatch logic; returns `Err` on any failure. async fn run() -> Result<(), Box> { let cli = Cli::parse(); @@ -94,6 +168,8 @@ async fn run() -> Result<(), Box> { config, database_url, banners, + discover_host, + incus_socket, } => { // TPROXY and NFQUEUE both require CAP_NET_ADMIN; warn unconditionally // so the operator knows what is needed even before a bind failure. @@ -104,12 +180,35 @@ async fn run() -> Result<(), Box> { ); let policy = blackwall_config::parse_file(&config)?; - blackwall_nft::apply(&policy)?; - tracing::info!("ruleset applied"); + // Connect and migrate the store early so discovery can persist its results. let store = blackwall_state::Store::connect(&database_url).await?; store.migrate().await?; + // Attempt to connect to Incus; log a warning and continue without it on failure. + let incus_client = match blackwall_discovery::UnixIncusClient::connect(&incus_socket) { + Ok(client) => { + tracing::info!(socket = %incus_socket.display(), "connected to Incus"); + Some(client) + } + Err(err) => { + tracing::warn!( + %err, + socket = %incus_socket.display(), + "failed to connect to Incus; continuing with base policy only" + ); + None + } + }; + + // Build the initial discovered set and apply the reconciled effective policy. + let initial_discovered = build_discovered(discover_host, incus_client.as_ref()).await; + apply_effective(&policy, &initial_discovered, &store).await?; + tracing::info!( + services = initial_discovered.len(), + "initial effective policy applied" + ); + let shared = SharedBanners::load(&banners)?; let registry = std::sync::Arc::new(default_registry(shared.clone())); // Reload banners on file change (best-effort; a parse error keeps the old set). @@ -171,6 +270,57 @@ async fn run() -> Result<(), Box> { .await; }); + // Spawn the Incus discovery event loop as a supervised task (non-fatal exit). + if let Some(mut client) = incus_client { + let policy_for_task = policy.clone(); + let store_for_task = store.clone(); + tokio::spawn(async move { + loop { + match client.next_event().await { + Ok(Some(ev)) => { + use blackwall_discovery::InstanceChange; + match ev.change { + InstanceChange::Started + | InstanceChange::Stopped + | InstanceChange::Updated => { + tracing::info!( + instance = %ev.instance, + change = ?ev.change, + "Incus lifecycle event; reconciling" + ); + let discovered = + build_discovered(discover_host, Some(&client)).await; + if let Err(err) = apply_effective( + &policy_for_task, + &discovered, + &store_for_task, + ) + .await + { + tracing::warn!( + %err, + "reconcile after Incus event failed" + ); + } + } + } + } + Ok(None) => { + tracing::warn!("Incus event stream ended; discovery loop exiting"); + break; + } + Err(err) => { + tracing::warn!( + %err, + "Incus event stream error; discovery loop exiting" + ); + break; + } + } + } + }); + } + // Drop the controller's tx so the drain loop terminates when all serve clones are gone. drop(tx); From 4ecc7d269af2fafd1e00dfc71483256615766b23 Mon Sep 17 00:00:00 2001 From: Zoa Hickenlooper Date: Thu, 25 Jun 2026 08:13:15 -0400 Subject: [PATCH 7/8] Add host.rs coverage tests and document M3a in CHANGELOG Expand blackwall-discovery::host unit tests to cover IPv4/IPv6 parse paths, UDP UNCONN filtering, short-line skipping, and all error branches (bad port, bad local address format, bad hex, bad IPv6 length/word). host.rs coverage rises from 67% to 100%. Document M3a service-discovery additions in CHANGELOG.md: host-socket scanner, Incus instance/event/client modules, and policy reconciler auto-opening Incus-opted ports via blackwalld run. --- CHANGELOG.md | 6 ++ crates/blackwall-discovery/src/host.rs | 81 ++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2289734..fa340f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,3 +32,9 @@ All notable changes to this project are documented here, following - MySQL emulator: static handshake packet + error response capturing client login attempt. - PostgreSQL emulator: `ErrorResponse` framing capturing startup message. - Emulator port registration: SSH (22), SMTP (25), Redis (6379), MySQL (3306), PostgreSQL (5432) registered in `default_registry`. +- Service discovery crate (`blackwall-discovery`): host-socket scanner parsing `/proc/net/{tcp,tcp6,udp,udp6}` into `ListeningSocket` entries. +- Incus instance model: `IncusInstance` + `instance_services` producing `ResolvedService` from Incus API JSON (addresses × ports cartesian product). +- Incus event stream parser: lifecycle events (`started`, `stopped`, `deleted`) mapped to `IncusLifecycleEvent`. +- Incus unix-socket client (`IncusClient`): `list_instances` + `stream_events` over the Incus unix socket with a `MockIncusClient` for unit tests. +- Policy reconciler (`reconcile_incus_instances`): auto-opens Incus-opted ports by merging discovered services into the active `Policy`, respecting tenant prefix ownership and synthesizing a catch-all tenant for unowned-but-in-prefix addresses. +- `blackwalld run` integration: service discovery reconciler invoked at startup to populate dynamic allow-rules from live Incus instances. diff --git a/crates/blackwall-discovery/src/host.rs b/crates/blackwall-discovery/src/host.rs index 6d8293a..226d071 100644 --- a/crates/blackwall-discovery/src/host.rs +++ b/crates/blackwall-discovery/src/host.rs @@ -104,4 +104,85 @@ mod tests { let bad = "header\n 0: 057100CB:ZZZZ 00000000:0000 0A x x x x x x 1\n"; assert!(parse_proc_net(bad, L4Proto::Tcp, false).is_err()); } + + // UDP: remote address word = 00000000 means UNCONN (listening). + // 0.0.0.0 = 00000000. Port 5353 = 14E9. + const UDP_FIXTURE: &str = "\ + sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode + 0: 00000000:14E9 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 99 1 0000 100 + 1: 00000000:14E9 01020304:1234 07 00000000:00000000 00:00000000 00000000 0 0 100 1 0000 100 +"; + + #[test] + fn parses_udp_unconn_only() { + let socks = parse_proc_net(UDP_FIXTURE, L4Proto::Udp, false).expect("parse"); + assert_eq!(socks.len(), 1); + assert_eq!(socks[0].proto, L4Proto::Udp); + assert_eq!(socks[0].port, 0x14E9); + } + + // IPv6: ::1 in /proc/net/tcp6 is stored as one 32-bit word per group. + // 00000000000000000000000001000000 represents ::1 in little-endian groups. + const TCP6_FIXTURE: &str = "\ + sl local_address remote_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode + 0: 00000000000000000000000001000000:0050 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 0 1 0000 100 +"; + + #[test] + fn parses_ipv6_listening_tcp() { + let socks = parse_proc_net(TCP6_FIXTURE, L4Proto::Tcp, true).expect("parse"); + assert_eq!(socks.len(), 1); + assert_eq!(socks[0].port, 80); + assert_eq!(socks[0].addr, "::1".parse::().unwrap()); + } + + // IPv6 UDP with 32-char hex remote word = unconn. + const UDP6_FIXTURE: &str = "\ + sl local_address remote_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode + 0: 00000000000000000000000001000000:14E9 00000000000000000000000000000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 0 1 0000 100 +"; + + #[test] + fn parses_ipv6_udp_unconn() { + let socks = parse_proc_net(UDP6_FIXTURE, L4Proto::Udp, true).expect("parse"); + assert_eq!(socks.len(), 1); + assert_eq!(socks[0].proto, L4Proto::Udp); + } + + #[test] + fn skips_short_lines() { + // A line with fewer than 4 whitespace-separated columns is silently skipped. + let input = "header\ntoo short\n"; + let socks = parse_proc_net(input, L4Proto::Tcp, false).expect("parse"); + assert_eq!(socks.len(), 0); + } + + #[test] + fn rejects_bad_local_address_format() { + // No colon separator between addr and port. + let bad = "header\n 0: NOCOLON 00000000:0000 0A x x x x x x 1\n"; + assert!(parse_proc_net(bad, L4Proto::Tcp, false).is_err()); + } + + #[test] + fn rejects_bad_ipv4_hex() { + // Non-hex characters in address field. + let bad = "header\n 0: ZZZZZZZZ:01BB 00000000:0000 0A x x x x x x 1\n"; + assert!(parse_proc_net(bad, L4Proto::Tcp, false).is_err()); + } + + #[test] + fn rejects_bad_ipv6_length() { + // Address field is not exactly 32 hex chars. + let bad = + "header\n 0: DEADBEEF:01BB 00000000000000000000000000000000:0000 0A x x x x x x 1\n"; + assert!(parse_proc_net(bad, L4Proto::Tcp, true).is_err()); + } + + #[test] + fn rejects_bad_ipv6_word() { + // 32 chars but with invalid hex. + let bad = "header\n 0: ZZZZZZZZ000000000000000000000000:01BB 00000000000000000000000000000000:0000 0A x x x x x x 1\n"; + assert!(parse_proc_net(bad, L4Proto::Tcp, true).is_err()); + } } From 18f2582732db756d63bd31e6fbb4985530d883ea Mon Sep 17 00:00:00 2001 From: Zoa Hickenlooper Date: Thu, 25 Jun 2026 08:19:20 -0400 Subject: [PATCH 8/8] Harden Incus event loop and tidy discovery deps Remove unused serde direct dep from blackwall-discovery. Parse errors from the Incus event stream now emit a warning and continue rather than terminating discovery; I/O errors still break the loop. Document that AllowRule is not address-scoped so audit-log readers understand tenant-wide port effects. --- bin/blackwalld/src/main.rs | 6 +++++- crates/blackwall-discovery/Cargo.toml | 1 - crates/blackwall-discovery/src/reconcile.rs | 6 ++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/bin/blackwalld/src/main.rs b/bin/blackwalld/src/main.rs index 33f2c26..6662e71 100644 --- a/bin/blackwalld/src/main.rs +++ b/bin/blackwalld/src/main.rs @@ -309,10 +309,14 @@ async fn run() -> Result<(), Box> { tracing::warn!("Incus event stream ended; discovery loop exiting"); break; } + Err(blackwall_discovery::DiscoveryError::Parse(msg)) => { + tracing::warn!(%msg, "skipping malformed Incus event"); + continue; + } Err(err) => { tracing::warn!( %err, - "Incus event stream error; discovery loop exiting" + "Incus event stream error; discovery stopping" ); break; } diff --git a/crates/blackwall-discovery/Cargo.toml b/crates/blackwall-discovery/Cargo.toml index 70dc335..1b50327 100644 --- a/crates/blackwall-discovery/Cargo.toml +++ b/crates/blackwall-discovery/Cargo.toml @@ -9,7 +9,6 @@ repository.workspace = true async-trait = { workspace = true } blackwall-core = { path = "../blackwall-core" } ipnet = { workspace = true } -serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/crates/blackwall-discovery/src/reconcile.rs b/crates/blackwall-discovery/src/reconcile.rs index 1678eed..4cd2e2e 100644 --- a/crates/blackwall-discovery/src/reconcile.rs +++ b/crates/blackwall-discovery/src/reconcile.rs @@ -36,6 +36,12 @@ const DISCOVERED_TENANT: &str = "discovered"; /// as an `AllowRule` to the tenant that owns the address, or to a synthetic /// `"discovered"` tenant when no configured tenant owns it. Services outside all /// managed prefixes, and duplicates already present, are skipped. +/// +/// Because [`AllowRule`] is not address-scoped, a discovered service attached to +/// a tenant opens that port on **all** addresses that tenant owns — consistent +/// with config-file allow semantics. Operators reading the audit log should +/// therefore expect to see a port opened on an address that was not itself +/// observed listening, if another address in the same tenant triggered the rule. pub fn reconcile(base: &Policy, discovered: &[DiscoveredService]) -> Policy { let mut effective = base.clone();