Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions dbus-ble-advertisements.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import logging
import signal
import os
from typing import Dict, Set, Tuple, Optional

sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'ext', 'bleak'))
from bleak import BleakScanner
Expand Down Expand Up @@ -343,7 +342,7 @@ def __init__(self, bus):
# - previous: bytes (last routed payload, only if route=True)
# - timestamp: float (last route time, only if route=True)
# - last_log_time: float (last time we logged routing for this device)
self.discovered_devices: Dict[str, dict] = {}
self.discovered_devices: dict[str, dict] = {}

# Repeat interval in seconds (cached from slider for fast access)
self._repeat_interval: int = DEFAULT_REPEAT_INTERVAL
Expand Down Expand Up @@ -458,26 +457,25 @@ def __init__(self, bus):

# Filters: manufacturer IDs, product IDs, and MAC addresses we care about
# Key: mfg_id or MAC, Value: set of full registration paths
self.mfg_registrations: Dict[int, Set[str]] = {} # mfg_id -> {'/ble_advertisements/orion_tr/mfgr/737', ...}
self.mac_registrations: Dict[str, Set[str]] = {} # MAC -> {'/ble_advertisements/orion_tr/addr/EFC...', ...}
self.mfg_registrations: dict[int, set[str]] = {} # mfg_id -> {'/ble_advertisements/orion_tr/mfgr/737', ...}
self.mac_registrations: dict[str, set[str]] = {} # MAC -> {'/ble_advertisements/orion_tr/addr/EFC...', ...}

# Product ID filters (more specific than manufacturer-only)
# Key: (mfg_id, product_id), Value: set of full registration paths
self.pid_registrations: Dict[Tuple[int, int], Set[str]] = {} # (mfg_id, pid) -> {paths}
self.pid_registrations: dict[tuple[int, int], set[str]] = {} # (mfg_id, pid) -> {paths}
# Key: (mfg_id, min_pid, max_pid), Value: set of full registration paths
self.pid_range_registrations: Dict[Tuple[int, int, int], Set[str]] = {} # (mfg_id, min, max) -> {paths}
self.pid_range_registrations: dict[tuple[int, int, int], set[str]] = {} # (mfg_id, min, max) -> {paths}

# Signal emitters for each registered path
# Key: full path (e.g., '/ble_advertisements/orion_tr/mfgr/737'), Value: AdvertisementEmitter
self.emitters: Dict[str, AdvertisementEmitter] = {}
self.emitters: dict[str, AdvertisementEmitter] = {}

# Device name tracking
# Key: MAC address, Value: device name (or empty string if unknown)
self.device_names: Dict[str, str] = {}
self.device_names: dict[str, str] = {}

# Passive scanner state
self._scanner_thread = None
self._scanner_loop = None
self._scanner_stop = threading.Event()

# Pending services for asynchronous registration scan
Expand Down Expand Up @@ -1326,7 +1324,7 @@ def _parse_registrations(self, service_name: str, path: str, xml: str):
except Exception as e:
logging.debug(f"Error parsing XML for {service_name}{path}: {e}")

def _extract_product_id(self, data: bytes) -> Optional[int]:
def _extract_product_id(self, data: bytes) -> int | None:
"""Extract product ID from Victron BLE advertisement data.

For Victron devices (mfg_id 0x02E1), the product ID is at bytes 2-3 (little-endian).
Expand All @@ -1340,7 +1338,7 @@ def _extract_product_id(self, data: bytes) -> Optional[int]:
pass
return None

def _has_registration_for_advertisement(self, mac: str, mfg_id: int, product_id: Optional[int] = None) -> bool:
def _has_registration_for_advertisement(self, mac: str, mfg_id: int, product_id: int | None = None) -> bool:
"""Check if any service has registered interest in this advertisement.

Returns True if:
Expand Down Expand Up @@ -1463,15 +1461,18 @@ def _bootstrap_existing_devices(self):
return False

def _scanner_thread_main(self, adapters):
"""Background thread running asyncio event loop for BLE scanning"""
self._scanner_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._scanner_loop)
"""Background thread running asyncio event loop for BLE scanning.

Uses asyncio.Runner to ensure the event loop is properly registered
for this thread. dbus-fast (used internally by Bleak) relies on
asyncio.get_event_loop() to schedule D-Bus signal callbacks; without
a properly set loop, passive scan callbacks silently fail to fire.
"""
try:
self._scanner_loop.run_until_complete(self._run_scanners(adapters))
except Exception as e:
logging.error(f"Scanner thread error: {e}")
finally:
self._scanner_loop.close()
with asyncio.Runner() as runner:
runner.run(self._run_scanners(adapters))
except BaseException:
logging.exception("Scanner thread died")

async def _run_scanners(self, adapters):
"""Run passive scanners on all adapters concurrently"""
Expand Down