Skip to content

Commit

Permalink
notification: Refactor receiver event handling
Browse files Browse the repository at this point in the history
Split processing of receiver notification into smaller functions.
Extract handler functions for every receiver notification for simple
maintenence and testability.

Related #2273
  • Loading branch information
MattHag authored and pfps committed Jan 2, 2025
1 parent 33c057f commit fa3a9bc
Showing 1 changed file with 109 additions and 95 deletions.
204 changes: 109 additions & 95 deletions lib/logitech_receiver/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@

if typing.TYPE_CHECKING:
from .base import HIDPPNotification
from .device import Device
from .receiver import Receiver


logger = logging.getLogger(__name__)

_hidpp10 = hidpp10.Hidpp10()
Expand All @@ -56,7 +56,8 @@
notification_lock = threading.Lock()


def process(device, notification: HIDPPNotification):
def process(device: Device | Receiver, notification: HIDPPNotification):
"""Handle incoming events (notification) from device or receiver."""
assert device
assert notification

Expand All @@ -65,106 +66,30 @@ def process(device, notification: HIDPPNotification):
return process_device_notification(device, notification)


def process_receiver_notification(receiver: Receiver, notification: HIDPPNotification) -> bool | None:
# supposedly only 0x4x notifications arrive for the receiver
assert notification.sub_id in [
def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None:
"""Process event messages from receivers."""
if hidpp_notification.sub_id == Notification.PAIRING_LOCK:
return handle_pairing_lock(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
return handle_discovery_status(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
return handle_device_discovery(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
return handle_pairing_status(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
return handle_passkey_request(receiver, hidpp_notification)
elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
return handle_passkey_pressed(receiver, hidpp_notification)

assert hidpp_notification.sub_id in [
Notification.CONNECT_DISCONNECT,
Notification.DJ_PAIRING,
Notification.CONNECTED,
Notification.RAW_INPUT,
Notification.PAIRING_LOCK,
Notification.POWER,
Registers.DEVICE_DISCOVERY_NOTIFICATION,
Registers.DISCOVERY_STATUS_NOTIFICATION,
Registers.PAIRING_STATUS_NOTIFICATION,
Registers.PASSKEY_PRESSED_NOTIFICATION,
Registers.PASSKEY_REQUEST_NOTIFICATION,
]

if notification.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(notification.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(notification.data[:1])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.discovering = notification.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.discovering:
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(notification.data[:1])
if discover_error:
receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error)
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True

elif notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
with notification_lock:
counter = notification.address + notification.data[0] * 256 # notification counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not receiver.pairing.counter == counter:
return None
if notification.data[1] == 0:
receiver.pairing.device_kind = notification.data[3]
receiver.pairing.device_address = notification.data[6:12]
receiver.pairing.device_authentication = notification.data[14]
elif notification.data[1] == 1:
receiver.pairing.device_name = notification.data[3 : 3 + notification.data[2]].decode("utf-8")
return True

elif notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = notification.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if not receiver.pairing.lock_open:
receiver.pairing.counter = None
receiver.pairing.device_address = None
receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = notification.data[0]
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(notification.data[7])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = notification.data[0:6].decode("utf-8")
return True

elif notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
return True

logger.warning("%s: unhandled notification %s", receiver, notification)
logger.warning(f"{receiver}: unhandled notification {hidpp_notification}")


def process_device_notification(device, notification):
Expand Down Expand Up @@ -488,3 +413,92 @@ def _process_feature_notification(device, notification, feature):

diversion.process_notification(device, notification, feature)
return True


def handle_pairing_lock(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(hidpp_notification.data[:1])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True


def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
with notification_lock:
receiver.pairing.discovering = hidpp_notification.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.discovering:
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(hidpp_notification.data[:1])
if discover_error:
receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error)
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True


def handle_device_discovery(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
with notification_lock:
counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not receiver.pairing.counter == counter:
return None
if hidpp_notification.data[1] == 0:
receiver.pairing.device_kind = hidpp_notification.data[3]
receiver.pairing.device_address = hidpp_notification.data[6:12]
receiver.pairing.device_authentication = hidpp_notification.data[14]
elif hidpp_notification.data[1] == 1:
receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8")
return True


def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
with notification_lock:
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = hidpp_notification.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if not receiver.pairing.lock_open:
receiver.pairing.counter = None
receiver.pairing.device_address = None
receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = hidpp_notification.data[0]
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif hidpp_notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True


def handle_passkey_request(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool:
with notification_lock:
receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8")
return True


def handle_passkey_pressed(_receiver: Receiver, _hidpp_notification: HIDPPNotification) -> bool:
return True

0 comments on commit fa3a9bc

Please sign in to comment.