Skip to content

Commit

Permalink
Refactor notifications
Browse files Browse the repository at this point in the history
Add type hints and reasonable variable names.

Related #2711
  • Loading branch information
MattHag authored and pfps committed Jan 1, 2025
1 parent 64ac437 commit 810cda9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
58 changes: 29 additions & 29 deletions lib/logitech_receiver/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@
notification_lock = threading.Lock()


def process(device, notification):
def process(device, notification: HIDPPNotification):
assert device
assert notification

if not device.isDevice:
return _process_receiver_notification(device, notification)
return _process_device_notification(device, notification)
return process_receiver_notification(device, notification)
return process_device_notification(device, notification)


def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None:
def process_receiver_notification(receiver: Receiver, notification: HIDPPNotification) -> bool | None:
# supposedly only 0x4x notifications arrive for the receiver
assert hidpp_notification.sub_id in [
assert notification.sub_id in [
Notification.CONNECT_DISCONNECT,
Notification.DJ_PAIRING,
Notification.CONNECTED,
Expand All @@ -81,25 +81,25 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP
Registers.PASSKEY_REQUEST_NOTIFICATION,
]

if hidpp_notification.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01)
if notification.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(notification.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(hidpp_notification.data[:1])
pair_error = ord(notification.data[:1])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.PairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
elif notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.discovering = hidpp_notification.address == 0x00
receiver.pairing.discovering = notification.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
Expand All @@ -108,33 +108,33 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(hidpp_notification.data[:1])
discover_error = ord(notification.data[:1])
if discover_error:
receiver.pairing.error = discover_string = hidpp10_constants.BoltPairingError(discover_error)
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True

elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
elif notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
with notification_lock:
counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter
counter = notification.address + notification.data[0] * 256 # notification counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not receiver.pairing.counter == counter:
return None
if hidpp_notification.data[1] == 0:
receiver.pairing.device_kind = hidpp_notification.data[3]
receiver.pairing.device_address = hidpp_notification.data[6:12]
receiver.pairing.device_authentication = hidpp_notification.data[14]
elif hidpp_notification.data[1] == 1:
receiver.pairing.device_name = hidpp_notification.data[3 : 3 + hidpp_notification.data[2]].decode("utf-8")
if notification.data[1] == 0:
receiver.pairing.device_kind = notification.data[3]
receiver.pairing.device_address = notification.data[6:12]
receiver.pairing.device_authentication = notification.data[14]
elif notification.data[1] == 1:
receiver.pairing.device_name = notification.data[3 : 3 + notification.data[2]].decode("utf-8")
return True

elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
elif notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = hidpp_notification.address == 0x00
receiver.pairing.lock_open = notification.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
Expand All @@ -144,30 +144,30 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP
receiver.pairing.device_address = None
receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = hidpp_notification.data[0]
pair_error = notification.data[0]
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif hidpp_notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7])
elif notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(notification.data[7])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.BoltPairingError(pair_error)
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
elif notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8")
receiver.pairing.device_passkey = notification.data[0:6].decode("utf-8")
return True

elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
elif notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
return True

logger.warning("%s: unhandled notification %s", receiver, hidpp_notification)
logger.warning("%s: unhandled notification %s", receiver, notification)


def _process_device_notification(device, notification):
def process_device_notification(device, notification):
# incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here
assert notification.sub_id & 0x80 == 0

Expand Down
4 changes: 2 additions & 2 deletions tests/logitech_receiver/test_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_process_receiver_notification(sub_id, notification_data, expected_error
receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
notification = HIDPPNotification(0, 0, sub_id, 0x02, notification_data)

result = notifications._process_receiver_notification(receiver, notification)
result = notifications.process_receiver_notification(receiver, notification)

assert result
assert receiver.pairing.error == expected_error
Expand All @@ -73,7 +73,7 @@ def test_process_receiver_notification(sub_id, notification_data, expected_error
def test_process_device_notification(hidpp_notification, expected):
device = fake_hidpp.Device()

result = notifications._process_device_notification(device, hidpp_notification)
result = notifications.process_device_notification(device, hidpp_notification)

assert result == expected

Expand Down

0 comments on commit 810cda9

Please sign in to comment.