Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix validation, command formatting, error handling, and feedback interpretation for MSR605X #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions msr605x.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@ def __init__(self, **kwargs):
kwargs["idProduct"] = 0x0003
self.dev = usb.core.find(**kwargs)
self.hid_endpoint = None

def connect(self):
""" Establish a connection to the MSR605X """
dev = self.dev
if dev.is_kernel_driver_active(0):
dev.detach_kernel_driver(0)
dev.set_configuration()
config = dev.get_active_configuration()
interface = config.interfaces()[0]
self.hid_endpoint = interface.endpoints()[0]
try:
dev = self.dev
if dev.is_kernel_driver_active(0):
dev.detach_kernel_driver(0)
dev.set_configuration()
config = dev.get_active_configuration()
interface = config.interfaces()[0]
self.hid_endpoint = interface.endpoints()[0]
except usb.core.USBError as e:
raise ConnectionError(f"Failed to connect to the device: {e}")

def _make_header(self, start_of_sequence: bool, end_of_sequence: bool, length: int):
if length < 0 or length > 63:
raise ValueError("Length must be a non-negative number no more than 63")
Expand All @@ -41,6 +46,7 @@ def _make_header(self, start_of_sequence: bool, end_of_sequence: bool, length: i
if end_of_sequence:
header |= SEQUENCE_END_BIT
return bytes([header])

def _encapsulate_message(self, message):
idx = 0
while idx < len(message):
Expand All @@ -49,37 +55,43 @@ def _encapsulate_message(self, message):
padding = b"\0" * (63 - len(payload))
yield header + payload + padding
idx += 63

def _send_packet(self, packet):
self.dev.ctrl_transfer(0x21, 9, wValue=0x0300, wIndex=0, data_or_wLength=packet)

def _recv_packet(self, **kwargs):
try:
return bytes(self.hid_endpoint.read(64, **kwargs))
except usb.core.USBError as error:
if error.errno == 110:
return None
raise error

def send_message(self, message):
""" Send a message to the MSR605X """
for packet in self._encapsulate_message(message):
self._send_packet(packet)

def recv_message(self, timeout=0):
""" Receive message from the MSR605X """
message = b""
while True:
packet = self._recv_packet(timeout=timeout)
if packet is None and not message:
return None
if packet is None:
raise TimeoutError("Timeout while waiting for device response.")
payload_length = packet[0] & SEQUENCE_LENGTH_BITS
payload = packet[1:1+payload_length]
message = message + payload
# note: we don't actually check the sequence start bit currently, we probably should to
# check this in case we somehow start reading in the middle of a message
if packet[0] & SEQUENCE_END_BIT:
break
return payload
return message

def reset(self):
""" Sends reset message to the MSR605X """
self.send_message(ESC + b"a")

def get_firmware_version(self):
""" Get the firmware version of the connected MSR605X """
self.send_message(ESC + b"v")
Expand All @@ -102,18 +114,15 @@ def write_track(self, track_number, data):
if isinstance(data, str):
data = data.encode('utf-8')

# Building the command with the appropriate structure as per the device's specs
command = (ESC + b'w' + ESC + b's' + ESC +
bytes(f"[0{track_number}]", 'utf-8') + # Specifying track number in the expected format
data + b'?' + ESC + b'\x1c')
bytes(f"[0{track_number}]", 'utf-8') + # Specifying track number in the expected format
data + b'?' + ESC + b'\x1c')

self.send_message(command)

# Receiving feedback from the device
feedback = self.recv_message()
if feedback and b'\x1b0' in feedback:
if feedback and (b'\x1b0' in feedback or b'\x1b1' in feedback): # Example of broader success response check
print(f"Successfully wrote to track {track_number}.")
else:
error_message = f"Failed to write data to track {track_number}. Device response: {feedback}"
raise Exception(error_message)

raise Exception(error_message)
20 changes: 10 additions & 10 deletions write.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ def is_valid_track3(data):

def write_tracks(msr, track1, track2, track3):
""" Write data to each track with validation """
# if not is_valid_track1(track1):
# raise ValueError("Invalid data for Track 1. Must be alphanumeric.")
# if not is_valid_track2(track2):
# raise ValueError("Invalid data for Track 2. Must be numeric only.")
# if not is_valid_track3(track3):
# raise ValueError("Invalid data for Track 3. Must be numeric only.")
if not is_valid_track1(track1):
raise ValueError("Invalid data for Track 1. Must be alphanumeric.")
if not is_valid_track2(track2):
raise ValueError("Invalid data for Track 2. Must be numeric only.")
if not is_valid_track3(track3):
raise ValueError("Invalid data for Track 3. Must be numeric only.")

msr.write_track(1, track1.encode('utf-8'))
msr.write_track(2, track2.encode('utf-8'))
Expand All @@ -33,9 +33,9 @@ def main():

while True:
# Input data for each track
track1_data = "%TESTE02?"
track2_data = "12347"
track3_data = ";1234?"
track1_data = input("Enter data for Track 1: ")
track2_data = input("Enter data for Track 2: ")
track3_data = input("Enter data for Track 3: ")

# Validate and write data to card
try:
Expand All @@ -53,4 +53,4 @@ def main():
print(f"An error occurred: {e}")

if __name__ == "__main__":
main()
main()