diff --git a/dbus-ble-advertisements.py b/dbus-ble-advertisements.py index c733baa..3a529f5 100755 --- a/dbus-ble-advertisements.py +++ b/dbus-ble-advertisements.py @@ -43,7 +43,6 @@ import logging import signal import os -from typing import Dict, Set, Tuple, Optional sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'ext', 'bleak')) from bleak import BleakScanner @@ -343,7 +342,7 @@ def __init__(self, bus): # - previous: bytes (last routed payload, only if route=True) # - timestamp: float (last route time, only if route=True) # - last_log_time: float (last time we logged routing for this device) - self.discovered_devices: Dict[str, dict] = {} + self.discovered_devices: dict[str, dict] = {} # Repeat interval in seconds (cached from slider for fast access) self._repeat_interval: int = DEFAULT_REPEAT_INTERVAL @@ -458,26 +457,25 @@ def __init__(self, bus): # Filters: manufacturer IDs, product IDs, and MAC addresses we care about # Key: mfg_id or MAC, Value: set of full registration paths - self.mfg_registrations: Dict[int, Set[str]] = {} # mfg_id -> {'/ble_advertisements/orion_tr/mfgr/737', ...} - self.mac_registrations: Dict[str, Set[str]] = {} # MAC -> {'/ble_advertisements/orion_tr/addr/EFC...', ...} + self.mfg_registrations: dict[int, set[str]] = {} # mfg_id -> {'/ble_advertisements/orion_tr/mfgr/737', ...} + self.mac_registrations: dict[str, set[str]] = {} # MAC -> {'/ble_advertisements/orion_tr/addr/EFC...', ...} # Product ID filters (more specific than manufacturer-only) # Key: (mfg_id, product_id), Value: set of full registration paths - self.pid_registrations: Dict[Tuple[int, int], Set[str]] = {} # (mfg_id, pid) -> {paths} + self.pid_registrations: dict[tuple[int, int], set[str]] = {} # (mfg_id, pid) -> {paths} # Key: (mfg_id, min_pid, max_pid), Value: set of full registration paths - self.pid_range_registrations: Dict[Tuple[int, int, int], Set[str]] = {} # (mfg_id, min, max) -> {paths} + self.pid_range_registrations: dict[tuple[int, int, int], set[str]] = {} # (mfg_id, min, max) -> {paths} # Signal emitters for each registered path # Key: full path (e.g., '/ble_advertisements/orion_tr/mfgr/737'), Value: AdvertisementEmitter - self.emitters: Dict[str, AdvertisementEmitter] = {} + self.emitters: dict[str, AdvertisementEmitter] = {} # Device name tracking # Key: MAC address, Value: device name (or empty string if unknown) - self.device_names: Dict[str, str] = {} + self.device_names: dict[str, str] = {} # Passive scanner state self._scanner_thread = None - self._scanner_loop = None self._scanner_stop = threading.Event() # Pending services for asynchronous registration scan @@ -1326,7 +1324,7 @@ def _parse_registrations(self, service_name: str, path: str, xml: str): except Exception as e: logging.debug(f"Error parsing XML for {service_name}{path}: {e}") - def _extract_product_id(self, data: bytes) -> Optional[int]: + def _extract_product_id(self, data: bytes) -> int | None: """Extract product ID from Victron BLE advertisement data. For Victron devices (mfg_id 0x02E1), the product ID is at bytes 2-3 (little-endian). @@ -1340,7 +1338,7 @@ def _extract_product_id(self, data: bytes) -> Optional[int]: pass return None - def _has_registration_for_advertisement(self, mac: str, mfg_id: int, product_id: Optional[int] = None) -> bool: + def _has_registration_for_advertisement(self, mac: str, mfg_id: int, product_id: int | None = None) -> bool: """Check if any service has registered interest in this advertisement. Returns True if: @@ -1463,15 +1461,18 @@ def _bootstrap_existing_devices(self): return False def _scanner_thread_main(self, adapters): - """Background thread running asyncio event loop for BLE scanning""" - self._scanner_loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._scanner_loop) + """Background thread running asyncio event loop for BLE scanning. + + Uses asyncio.Runner to ensure the event loop is properly registered + for this thread. dbus-fast (used internally by Bleak) relies on + asyncio.get_event_loop() to schedule D-Bus signal callbacks; without + a properly set loop, passive scan callbacks silently fail to fire. + """ try: - self._scanner_loop.run_until_complete(self._run_scanners(adapters)) - except Exception as e: - logging.error(f"Scanner thread error: {e}") - finally: - self._scanner_loop.close() + with asyncio.Runner() as runner: + runner.run(self._run_scanners(adapters)) + except BaseException: + logging.exception("Scanner thread died") async def _run_scanners(self, adapters): """Run passive scanners on all adapters concurrently"""