Skip to content

Commit

Permalink
feat: new API to set multicast loop for ServiceDaemon (#281)
Browse files Browse the repository at this point in the history
Enable or disable multicast loop for IPv4 or IPv6.
  • Loading branch information
keepsimple1 authored Dec 11, 2024
1 parent fcd31f3 commit 1ddae63
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 5 deletions.
90 changes: 85 additions & 5 deletions src/service_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,44 @@ impl ServiceDaemon {
)))
}

/// Enable or disable the loopback for locally sent multicast packets in IPv4.
///
/// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
/// receive announcements from a responder on the same host.
///
/// Reference: https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2
///
/// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
/// the UNIX version of the IP_MULTICAST_LOOP option:
///
/// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
/// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
///
/// Which means, in order NOT to receive localhost announcements, you want to call
/// this API on the querier side on Windows, but on the responder side on Unix.
pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
}

/// Enable or disable the loopback for locally sent multicast packets in IPv6.
///
/// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
/// receive announcements from a responder on the same host.
///
/// Reference: https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2
///
/// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
/// the UNIX version of the IP_MULTICAST_LOOP option:
///
/// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
/// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
///
/// Which means, in order NOT to receive localhost announcements, you want to call
/// this API on the querier side on Windows, but on the responder side on Unix.
pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
}

/// Proactively confirms whether a service instance still valid.
///
/// This call will issue queries for a service instance's SRV record and Address records.
Expand Down Expand Up @@ -637,7 +675,7 @@ impl ServiceDaemon {
}

/// Creates a new UDP socket that uses `intf` to send and recv multicast.
fn new_socket_bind(intf: &Interface) -> Result<MioUdpSocket> {
fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
// Use the same socket for receiving and sending multicast packets.
// Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
let intf_ip = &intf.ip();
Expand All @@ -654,6 +692,11 @@ fn new_socket_bind(intf: &Interface) -> Result<MioUdpSocket> {
sock.set_multicast_if_v4(ip)
.map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;

if !should_loop {
sock.set_multicast_loop_v4(false)
.map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
}

// Test if we can send packets successfully.
let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
let test_packets = DnsOutgoing::new(0).to_data_on_wire();
Expand Down Expand Up @@ -904,6 +947,10 @@ struct Zeroconf {

/// Service instances that are already resolved.
resolved: HashSet<String>,

multicast_loop_v4: bool,

multicast_loop_v6: bool,
}

impl Zeroconf {
Expand All @@ -918,7 +965,7 @@ impl Zeroconf {
let mut dns_registry_map = HashMap::new();

for intf in my_ifaddrs {
let sock = match new_socket_bind(&intf) {
let sock = match new_socket_bind(&intf, true) {
Ok(s) => s,
Err(e) => {
trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
Expand Down Expand Up @@ -959,6 +1006,8 @@ impl Zeroconf {
status,
pending_resolves: HashSet::new(),
resolved: HashSet::new(),
multicast_loop_v4: true,
multicast_loop_v6: true,
}
}

Expand All @@ -967,6 +1016,8 @@ impl Zeroconf {
DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
}
}

Expand All @@ -992,6 +1043,22 @@ impl Zeroconf {
self.apply_intf_selections(my_ip_interfaces());
}

fn set_multicast_loop_v4(&mut self, on: bool) {
for (_, sock) in self.intf_socks.iter_mut() {
if let Err(e) = sock.set_multicast_loop_v4(on) {
debug!("failed to set multicast loop v4: {e}");
}
}
}

fn set_multicast_loop_v6(&mut self, on: bool) {
for (_, sock) in self.intf_socks.iter_mut() {
if let Err(e) = sock.set_multicast_loop_v6(on) {
debug!("failed to set multicast loop v6: {e}");
}
}
}

fn notify_monitors(&mut self, event: DaemonEvent) {
// Only retain the monitors that are still connected.
self.monitors.retain(|sender| {
Expand Down Expand Up @@ -1150,7 +1217,12 @@ impl Zeroconf {
fn add_new_interface(&mut self, intf: Interface) {
// Bind the new interface.
let new_ip = intf.ip();
let mut sock = match new_socket_bind(&intf) {
let should_loop = if new_ip.is_ipv4() {
self.multicast_loop_v4
} else {
self.multicast_loop_v6
};
let mut sock = match new_socket_bind(&intf, should_loop) {
Ok(s) => s,
Err(e) => {
debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
Expand Down Expand Up @@ -1599,13 +1671,19 @@ impl Zeroconf {
}

// Replace the closed socket with a new one.
match new_socket_bind(intf) {
let should_loop = if intf.ip().is_ipv4() {
self.multicast_loop_v4
} else {
self.multicast_loop_v6
};
match new_socket_bind(intf, should_loop) {
Ok(new_sock) => {
trace!("reset socket for IP {}", intf.ip());
self.intf_socks.insert(intf.clone(), new_sock);
}
Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
}

return false;
}

Expand Down Expand Up @@ -2915,6 +2993,8 @@ enum DaemonOption {
ServiceNameLenMax(u8),
EnableInterface(Vec<IfKind>),
DisableInterface(Vec<IfKind>),
MulticastLoopV4(bool),
MulticastLoopV6(bool),
}

/// The length of Service Domain name supported in this lib.
Expand Down Expand Up @@ -3470,7 +3550,7 @@ mod tests {
packet_buffer.add_additional_answer(invalidate_ptr_packet);

for intf in intfs {
let sock = new_socket_bind(&intf).unwrap();
let sock = new_socket_bind(&intf, true).unwrap();
send_dns_outgoing(&packet_buffer, &intf, &sock);
}

Expand Down
167 changes: 167 additions & 0 deletions tests/mdns_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,173 @@ fn test_verify_srv() {
assert!(service_removal);
}

#[test]
fn test_multicast_loop_v4() {
let ty_domain = "_loop_v4._udp.local.";
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
let instance_name = now.as_micros().to_string(); // Create a unique name.
let host_name = "loop_v4_host.local.";
let port = 5200;

// Register the first service.
let server = ServiceDaemon::new().expect("failed to start server");
server.set_multicast_loop_v4(false).unwrap();

// Get a single IPv4 address
let ip_addr1 = my_ip_interfaces()
.iter()
.find(|iface| iface.ip().is_ipv4())
.map(|iface| iface.ip())
.unwrap();

// Publish the service on server
let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
.expect("valid service info");
server
.register(service1)
.expect("Failed to register service1");

// wait for the service announced.
sleep(Duration::from_secs(1));

// start a client i.e. querier.
let mut resolved = false;
let client = ServiceDaemon::new().expect("failed to create mdns client");

// For Windows, IP_MULTICAST_LOOP option works only on the receive path.
client.set_multicast_loop_v4(false).unwrap();

let receiver = client.browse(ty_domain).unwrap();

let timeout = Duration::from_secs(2);
while let Ok(event) = receiver.recv_timeout(timeout) {
match event {
ServiceEvent::ServiceResolved(info) => {
println!(
"Resolved a service: {} host {} IP {:?}",
info.get_fullname(),
info.get_hostname(),
info.get_addresses_v4()
);
resolved = true;
break;
}
_ => {}
}
}

assert_eq!(resolved, false);

// enable loopback and try again.
server.set_multicast_loop_v4(true).unwrap();
client.set_multicast_loop_v4(true).unwrap();
let receiver = client.browse(ty_domain).unwrap();

while let Ok(event) = receiver.recv_timeout(timeout) {
match event {
ServiceEvent::ServiceResolved(info) => {
println!(
"Resolved a service: {} host {} IP {:?}",
info.get_fullname(),
info.get_hostname(),
info.get_addresses_v4()
);
resolved = true;
break;
}
_ => {}
}
}

assert!(resolved);
}

#[test]
fn test_multicast_loop_v6() {
let ty_domain = "_loop_v6._udp.local.";
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
let instance_name = now.as_micros().to_string(); // Create a unique name.
let host_name = "loop_v6_host.local.";
let port = 5200;

// Register the first service.
let server = ServiceDaemon::new().expect("failed to start server");
server.set_multicast_loop_v6(false).unwrap();

// Get a single IPv4 address
let ip_addr1 = my_ip_interfaces()
.iter()
.find(|iface| iface.ip().is_ipv6())
.map(|iface| iface.ip())
.unwrap();

// Publish the service on server
let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
.expect("valid service info");
server
.register(service1)
.expect("Failed to register service1");

// wait for the service announced.
sleep(Duration::from_secs(1));

// start a client i.e. querier.
let mut resolved = false;
let client = ServiceDaemon::new().expect("failed to create mdns client");

// For Windows, IP_MULTICAST_LOOP option works only on the receive path.
client.set_multicast_loop_v6(false).unwrap();

let receiver = client.browse(ty_domain).unwrap();

let timeout = Duration::from_secs(2);
while let Ok(event) = receiver.recv_timeout(timeout) {
match event {
ServiceEvent::ServiceResolved(info) => {
println!(
"Resolved a service: {} host {} IP {:?}",
info.get_fullname(),
info.get_hostname(),
info.get_addresses()
);
resolved = true;
break;
}
_ => {}
}
}

assert_eq!(resolved, false);

// enable loopback and try again.
server.set_multicast_loop_v6(true).unwrap();
client.set_multicast_loop_v6(true).unwrap();

let receiver = client.browse(ty_domain).unwrap();

while let Ok(event) = receiver.recv_timeout(timeout) {
match event {
ServiceEvent::ServiceResolved(info) => {
println!(
"Resolved a service: {} host {} IP {:?}",
info.get_fullname(),
info.get_hostname(),
info.get_addresses()
);
resolved = true;
break;
}
_ => {}
}
}

assert!(resolved);
}

/// A helper function to include a timestamp for println.
fn timed_println(msg: String) {
let now = SystemTime::now();
Expand Down

0 comments on commit 1ddae63

Please sign in to comment.