diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 6bde5b0bd..1ae276859 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -68,7 +68,7 @@ def process(device: Device | Receiver, notification: HIDPPNotification): return process_device_notification(device, notification) -def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None: +def process_receiver_notification(receiver: Receiver, notification: HIDPPNotification) -> bool | None: """Process event messages from receivers.""" event_handler_mapping: dict[int, NotificationHandler] = { Notification.PAIRING_LOCK: handle_pairing_lock, @@ -80,12 +80,12 @@ def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPN } try: - handler_func = event_handler_mapping[hidpp_notification.sub_id] - return handler_func(receiver, hidpp_notification) + handler_func = event_handler_mapping[notification.sub_id] + return handler_func(receiver, notification) except KeyError: pass - assert hidpp_notification.sub_id in [ + assert notification.sub_id in [ Notification.CONNECT_DISCONNECT, Notification.DJ_PAIRING, Notification.CONNECTED, @@ -93,10 +93,12 @@ def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPN Notification.POWER, ] - logger.warning(f"{receiver}: unhandled notification {hidpp_notification}") + logger.warning(f"{receiver}: unhandled notification {notification}") -def process_device_notification(device, notification): +def process_device_notification(device: Device, notification: HIDPPNotification): + """Process event messages from devices.""" + # incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here assert notification.sub_id & 0x80 == 0 @@ -129,16 +131,11 @@ def process_device_notification(device, notification): if not device.features: logger.warning("%s: feature notification but features not set up: %02X %s", device, notification.sub_id, notification) return False - try: - feature = device.features.get_feature(notification.sub_id) - except IndexError: - logger.warning("%s: notification from invalid feature index %02X: %s", device, notification.sub_id, notification) - return False - return _process_feature_notification(device, notification, feature) + return _process_feature_notification(device, notification) -def _process_dj_notification(device, notification): +def _process_dj_notification(device: Device, notification: HIDPPNotification): if logger.isEnabledFor(logging.DEBUG): logger.debug("%s (%s) DJ %s", device, device.protocol, notification) @@ -164,7 +161,7 @@ def _process_dj_notification(device, notification): logger.warning("%s: unrecognized DJ %s", device, notification) -def _process_hidpp10_custom_notification(device, notification): +def _process_hidpp10_custom_notification(device: Device, notification: HIDPPNotification): if logger.isEnabledFor(logging.DEBUG): logger.debug("%s (%s) custom notification %s", device, device.protocol, notification) @@ -177,7 +174,7 @@ def _process_hidpp10_custom_notification(device, notification): logger.warning("%s: unrecognized %s", device, notification) -def _process_hidpp10_notification(device, notification): +def _process_hidpp10_notification(device: Device, notification: HIDPPNotification): if notification.sub_id == Notification.CONNECT_DISCONNECT: # device unpairing if notification.address == 0x02: # device un-paired @@ -242,7 +239,13 @@ def _process_hidpp10_notification(device, notification): logger.warning("%s: unrecognized %s", device, notification) -def _process_feature_notification(device, notification, feature): +def _process_feature_notification(device: Device, notification: HIDPPNotification): + try: + feature = device.features.get_feature(notification.sub_id) + except IndexError: + logger.warning("%s: notification from invalid feature index %02X: %s", device, notification.sub_id, notification) + return False + if logger.isEnabledFor(logging.DEBUG): logger.debug( "%s: notification for feature %s, report %s, data %s", @@ -419,15 +422,15 @@ def _process_feature_notification(device, notification, feature): return True -def handle_pairing_lock(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: - receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01) +def handle_pairing_lock(receiver: Receiver, notification: HIDPPNotification) -> bool: + receiver.pairing.lock_open = bool(notification.address & 0x01) reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) receiver.pairing.error = None if receiver.pairing.lock_open: receiver.pairing.new_device = None - pair_error = ord(hidpp_notification.data[:1]) + pair_error = ord(notification.data[:1]) if pair_error: receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error) receiver.pairing.new_device = None @@ -436,9 +439,9 @@ def handle_pairing_lock(receiver: Receiver, hidpp_notification: HIDPPNotificatio return True -def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: +def handle_discovery_status(receiver: Receiver, notification: HIDPPNotification) -> bool: with notification_lock: - receiver.pairing.discovering = hidpp_notification.address == 0x00 + receiver.pairing.discovering = notification.address == 0x00 reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) @@ -447,7 +450,7 @@ def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotific receiver.pairing.counter = receiver.pairing.device_address = None receiver.pairing.device_authentication = receiver.pairing.device_name = None receiver.pairing.device_passkey = None - discover_error = ord(hidpp_notification.data[:1]) + discover_error = ord(notification.data[:1]) if discover_error: receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error) logger.warning("bolt discovering error %d: %s", discover_error, discover_string) @@ -455,27 +458,27 @@ def handle_discovery_status(receiver: Receiver, hidpp_notification: HIDPPNotific return True -def handle_device_discovery(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: +def handle_device_discovery(receiver: Receiver, notification: HIDPPNotification) -> bool: with notification_lock: - counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter + counter = notification.address + notification.data[0] * 256 # notification counter if receiver.pairing.counter is None: receiver.pairing.counter = counter else: if not receiver.pairing.counter == counter: return None - if hidpp_notification.data[1] == 0: - receiver.pairing.device_kind = hidpp_notification.data[3] - receiver.pairing.device_address = hidpp_notification.data[6:12] - receiver.pairing.device_authentication = hidpp_notification.data[14] - elif hidpp_notification.data[1] == 1: - receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8") + if notification.data[1] == 0: + receiver.pairing.device_kind = notification.data[3] + receiver.pairing.device_address = notification.data[6:12] + receiver.pairing.device_authentication = notification.data[14] + elif notification.data[1] == 1: + receiver.pairing.device_name = notification.data[3 : 3 + notification.data[2]].decode("utf-8") return True -def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: +def handle_pairing_status(receiver: Receiver, notification: HIDPPNotification) -> bool: with notification_lock: receiver.pairing.device_passkey = None - receiver.pairing.lock_open = hidpp_notification.address == 0x00 + receiver.pairing.lock_open = notification.address == 0x00 reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed") if logger.isEnabledFor(logging.INFO): logger.info("%s: %s", receiver, reason) @@ -485,11 +488,11 @@ def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotificat receiver.pairing.device_address = None receiver.pairing.device_authentication = None receiver.pairing.device_name = None - pair_error = hidpp_notification.data[0] + pair_error = notification.data[0] if receiver.pairing.lock_open: receiver.pairing.new_device = None - elif hidpp_notification.address == 0x02 and not pair_error: - receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7]) + elif notification.address == 0x02 and not pair_error: + receiver.pairing.new_device = receiver.register_new_device(notification.data[7]) if pair_error: receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error) receiver.pairing.new_device = None @@ -498,9 +501,9 @@ def handle_pairing_status(receiver: Receiver, hidpp_notification: HIDPPNotificat return True -def handle_passkey_request(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool: +def handle_passkey_request(receiver: Receiver, notification: HIDPPNotification) -> bool: with notification_lock: - receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8") + receiver.pairing.device_passkey = notification.data[0:6].decode("utf-8") return True diff --git a/tests/logitech_receiver/test_notifications.py b/tests/logitech_receiver/test_notifications.py index 6a6802552..2cf662be7 100644 --- a/tests/logitech_receiver/test_notifications.py +++ b/tests/logitech_receiver/test_notifications.py @@ -274,7 +274,7 @@ def test_process_feature_notification(mocker, hidpp_notification, feature): fake_device = fake_hidpp.Device() fake_device.receiver = ["rec1", "rec2"] - result = notifications._process_feature_notification(fake_device, hidpp_notification, feature) + result = notifications._process_feature_notification(fake_device, hidpp_notification) assert result is True @@ -289,6 +289,23 @@ def test_process_receiver_notification_invalid(mocker): notifications.process_receiver_notification(mock_receiver, notification) +@pytest.mark.parametrize( + "sub_id, notification_data, expected", + [ + (Notification.NO_OPERATION, b"\x00", False), + ], +) +def test_process_device_notification_extended(mocker, sub_id, notification_data, expected): + device = mocker.Mock() + device.handle_notification.return_value = None + device.protocol = 2.0 + notification = HIDPPNotification(0, 0, sub_id, 0, notification_data) + + result = notifications.process_device_notification(device, notification) + + assert result == expected + + def test_handle_device_discovery(): receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None) sub_id = Registers.DISCOVERY_STATUS_NOTIFICATION