diff --git a/Hologram/Network/Modem/Modem.py b/Hologram/Network/Modem/Modem.py index ca5222d..63facea 100644 --- a/Hologram/Network/Modem/Modem.py +++ b/Hologram/Network/Modem/Modem.py @@ -17,9 +17,7 @@ from collections import deque -import binascii import datetime -import logging import os import serial from serial.tools import list_ports @@ -34,7 +32,7 @@ class Modem(IModem): DEFAULT_SERIAL_READ_SIZE = 256 DEFAULT_SERIAL_TIMEOUT = 1 DEFAULT_SERIAL_RETRIES = 0 - DEFAULT_SEND_TIMEOUT = 10 + DEFAULT_SEND_TIMEOUT = 20 _RETRY_DELAY = 0.05 # 50 millisecond delay to avoid spinning loops @@ -245,9 +243,6 @@ def set_timezone_configs(self): def set_network_registration_status(self): pass - def reset(self): - self.set('+CFUN', '16') # restart the modem - def radio_power(self, power_mode): cfun_val = '1' if power_mode else '0' ok, r = self.command('+CFUN', cfun_val, timeout=5) @@ -289,43 +284,18 @@ def _read_and_append_message_receive_buffer(self, socket_identifier, payload_len self.close_socket(socket_identifier=socket_identifier) def create_socket(self): - op = self._basic_set('+USOCR', '6', strip_val=False) - if op is not None: - self.socket_identifier = int(op) + raise NotImplementedError("Modem does not have an AT Socket Mode") # REQUIRES: The host and port. # EFFECTS: Issues an AT command to connect to the specified socket identifier. def connect_socket(self, host, port): - at_command_val = "%d,\"%s\",%s" % (self.socket_identifier, host, port) - ok, _ = self.set('+USOCO', at_command_val, timeout=20) - if ok != ModemResult.OK: - self.logger.error('Failed to connect socket') - raise NetworkError('Failed to connect socket') - else: - self.logger.info('Connect socket is successful') + raise NotImplementedError("Modem does not have an AT Socket Mode") def listen_socket(self, port): - at_command_val = "%d,%s" % (self.socket_identifier, port) - self.listen_socket_identifier = self.socket_identifier - ok, _ = self.set('+USOLI', at_command_val, timeout=5) - if ok != ModemResult.OK: - self.logger.error('Failed to listen socket') - raise NetworkError('Failed to listen socket') + raise NotImplementedError("Modem does not have an AT Socket Mode") def write_socket(self, data): - self.enable_hex_mode() - hexdata = binascii.hexlify(data) - # We have to do it in chunks of 510 since 512 is actually too long (CMEE error) - # and we need 2n chars for hexified data - for chunk in self._chunks(hexdata, 510): - value = b'%d,%d,\"%s\"' % (self.socket_identifier, - len(binascii.unhexlify(chunk)), - chunk) - ok, _ = self.set('+USOWR', value, timeout=10) - if ok != ModemResult.OK: - self.logger.error('Failed to write to socket') - raise NetworkError('Failed to write socket') - self.disable_hex_mode() + raise NotImplementedError("Modem does not have an AT Socket Mode") def _chunks(self, data, n): """Yield successive n-sized chunks from lst.""" @@ -333,38 +303,10 @@ def _chunks(self, data, n): yield data[i:i + n] def read_socket(self, socket_identifier=None, payload_length=None): - - if socket_identifier is None: - socket_identifier = self.socket_identifier - - if payload_length is None: - payload_length = self.last_read_payload_length - - self.enable_hex_mode() - - resp = self._basic_set('+USORD', '%d,%d' % (socket_identifier, payload_length)) - if resp is not None: - resp = resp.strip('"') - bytedata = binascii.unhexlify(resp) - try: - resp = bytedata.decode() - except: - # This is some sort of binary data that can't be decoded so just - # return the bytes. We might want to make this happen via parameter - # in the future so it is more deterministic - resp = bytedata - - self.disable_hex_mode() - return resp + raise NotImplementedError("Modem does not have an AT Socket Mode") def close_socket(self, socket_identifier=None): - - if socket_identifier is None: - socket_identifier = self.socket_identifier - - ok, r = self.set('+USOCL', "%s" % socket_identifier) - if ok != ModemResult.OK: - self.logger.info('Failed to close socket') + raise NotImplementedError("Modem does not have an AT Socket Mode") def debugwrite(self, x, hide=False): if not hide: @@ -400,46 +342,16 @@ def checkURC(self, hide=False): # EFFECTS: Handles URC related AT command responses. def handleURC(self, urc): - self.logger.debug("URC! %s", urc) + self.logger.debug("URC! %s", urc) self.logger.debug("handleURC state: %d", self.urc_state) - next_urc_state = self.urc_state - if urc.startswith("+CMTI: "): self._handle_sms_receive_urc(urc) - elif urc.startswith('+UULOC: '): - self._handle_location_urc(urc) - elif urc.startswith('+UUSORD: '): - - # Strip UUSORD socket identifier + payload length from the URC event. - # Example: {+UUSORD: 0,2} -> 0 and 2 - response_list = urc.lstrip('+UUSORD: ').split(',') - socket_identifier = int(response_list[0]) - payload_length = int(response_list[-1]) - - if self.urc_state == Modem.SOCKET_RECEIVE_READ: - self._read_and_append_message_receive_buffer(socket_identifier, payload_length) - else: - self.socket_identifier = socket_identifier - self.last_read_payload_length = payload_length - next_urc_state = Modem.SOCKET_SEND_READ - elif urc.startswith('+UUSOLI: '): - self._handle_listen_urc(urc) - self.last_read_payload_length = 0 - next_urc_state = Modem.SOCKET_RECEIVE_READ - elif urc.startswith('+UUPSDD: '): - self.event.broadcast('cellular.forced_disconnect') - elif urc.startswith('+UUSOCL: '): - next_urc_state = Modem.SOCKET_CLOSED else: self.logger.debug("URC was not handled. \'%s\'", urc) - self.urc_state = next_urc_state - # URC handlers - - def _handle_sms_receive_urc(self, urc): self.event.broadcast('sms.received') @@ -702,40 +614,13 @@ def is_registered(self): pass def _is_pdp_context_active(self): - if not self.is_registered(): - return False - - ok, r = self.set('+UPSND', '0,8') - if ok == ModemResult.OK: - try: - pdpstatus = int(r.lstrip('UPSND: ').split(',')[2]) - # 1: PDP active - return pdpstatus == 1 - except (IndexError, ValueError) as e: - self.logger.error(repr(e)) return False def _set_up_pdp_context(self): - if self._is_pdp_context_active(): return True - self.logger.info('Setting up PDP context') - self.set('+UPSD', f'0,1,\"{self._apn}\"') - self.set('+UPSD', '0,7,\"0.0.0.0\"') - ok, _ = self.set('+UPSDA', '0,3', timeout=30) - if ok != ModemResult.OK: - self.logger.error('PDP Context setup failed') - raise NetworkError('Failed PDP context setup') - else: - self.logger.info('PDP context active') + raise NotImplementedError('Modem PDP context setup not implemented') def _tear_down_pdp_context(self): - if not self._is_pdp_context_active(): return True - self.logger.info('Tearing down PDP context') - ok, _ = self.set('+UPSDA', '0,4', timeout=30) - if ok != ModemResult.OK: - self.logger.error('PDP Context tear down failed') - else: - self.logger.info('PDP context deactivated') - + raise NotImplementedError('Modem PDP context setup not implemented') def __enforce_serial_port_open(self): if not (self.serial_port and self.serial_port.isOpen()): @@ -837,13 +722,10 @@ def enable_at_sockets_mode(self): pass def enable_hex_mode(self): - self.__set_hex_mode(1) + pass def disable_hex_mode(self): - self.__set_hex_mode(0) - - def __set_hex_mode(self, enable_hex_mode): - self.command('+UDCONF', '1,%d' % enable_hex_mode) + pass @property def serial_port(self): @@ -895,27 +777,12 @@ def at_sockets_available(self): return self._at_sockets_available @property - def modem_mode(self): - mode_number = None - # trim: - # +UUSBCONF: 0,"",,"0x1102" -> 0 - # +UUSBCONF: 2,"ECM",,"0x1104" -> 2 - try: - ok, res = self.read('+UUSBCONF') - if ok == ModemResult.OK: - mode_number = int(res.lstrip('+UUSBCONF: ').split(',')[0]) - except (IndexError, ValueError) as e: - self.logger.error(repr(e)) - return mode_number + def modem_usb_mode(self): + raise NotImplementedError('This modem does not support this property') - @modem_mode.setter - def modem_mode(self, mode): - self.set('+UUSBCONF', str(mode)) - self.logger.info('Restarting modem') - self.reset() - self.logger.info('Modem restarted') - self.closeSerialPort() - time.sleep(Modem.DEFAULT_MODEM_RESTART_TIME) + @modem_usb_mode.setter + def modem_usb_mode(self, mode): + raise NotImplementedError('This modem does not support this property') @property def localIPAddress(self): @@ -933,7 +800,7 @@ def remoteIPAddress(self): @property def version(self): - raise NotImplementedError('This modem does not support this property') + return self._basic_command('I9') @property def imei(self): @@ -947,3 +814,5 @@ def apn(self): def apn(self, apn): self._apn = apn return self.set('+CGDCONT', f'1,"IP","{self._apn}"') + + diff --git a/Hologram/Network/Modem/Nova.py b/Hologram/Network/Modem/Nova.py deleted file mode 100644 index c3ddd03..0000000 --- a/Hologram/Network/Modem/Nova.py +++ /dev/null @@ -1,33 +0,0 @@ -# Nova.py - Hologram Python SDK Nova modem interface -# -# Author: Hologram -# -# Copyright 2016 - Hologram (Konekt, Inc.) -# -# -# LICENSE: Distributed under the terms of the MIT License -# - -from Hologram.Network.Modem import Modem -from Hologram.Event import Event - -DEFAULT_NOVA_TIMEOUT = 200 - -class Nova(Modem): - - def __init__(self, device_name=None, baud_rate='9600', - chatscript_file=None, event=Event()): - - super().__init__(device_name=device_name, baud_rate=baud_rate, - chatscript_file=chatscript_file, event=event) - - def disable_at_sockets_mode(self): - self._at_sockets_available = False - - def enable_at_sockets_mode(self): - self._at_sockets_available = True - - @property - def version(self): - return self._basic_command('I9') - diff --git a/Hologram/Network/Modem/NovaM.py b/Hologram/Network/Modem/NovaM.py index 65e6750..be0ede7 100644 --- a/Hologram/Network/Modem/NovaM.py +++ b/Hologram/Network/Modem/NovaM.py @@ -8,14 +8,14 @@ # LICENSE: Distributed under the terms of the MIT License # -from Hologram.Network.Modem.Nova import Nova +from Hologram.Network.Modem.UBlox import Ublox from Hologram.Event import Event from Exceptions.HologramError import NetworkError from UtilClasses import ModemResult DEFAULT_NOVAM_TIMEOUT = 200 -class NovaM(Nova): +class NovaM(Ublox): usb_ids = [('05c6', '90b2')] module = 'option' diff --git a/Hologram/Network/Modem/Nova_U201.py b/Hologram/Network/Modem/Nova_U201.py index d6a23d3..464c767 100644 --- a/Hologram/Network/Modem/Nova_U201.py +++ b/Hologram/Network/Modem/Nova_U201.py @@ -8,7 +8,7 @@ # LICENSE: Distributed under the terms of the MIT License # -from Hologram.Network.Modem.Nova import Nova +from Hologram.Network.Modem.UBlox import Ublox from Exceptions.HologramError import SerialError from Hologram.Event import Event from UtilClasses import Location @@ -16,7 +16,7 @@ DEFAULT_NOVA_U201_TIMEOUT = 200 -class Nova_U201(Nova): +class Nova_U201(Ublox): usb_ids = [('1546','1102'),('1546','1104')] def __init__(self, device_name=None, baud_rate='9600', @@ -26,7 +26,7 @@ def __init__(self, device_name=None, baud_rate='9600', chatscript_file=chatscript_file, event=event) # We need to enforce multi serial port support. We then reinstantiate # the serial interface with the correct device name. - self.enforce_nova_modem_mode() + self.enforce_nova_modem_usb_mode() self._at_sockets_available = True self.last_sim_otp_command_response = None self.last_location = None @@ -59,14 +59,14 @@ def is_registered(self): # EFFECTS: Enforces that the Nova modem be in the correct mode to support multiple # serial ports - def enforce_nova_modem_mode(self): + def enforce_nova_modem_usb_mode(self): - modem_mode = self.modem_mode - self.logger.debug('USB modem mode: ' + str(modem_mode)) + modem_usb_mode = self.modem_usb_mode + self.logger.debug('USB modem mode: ' + str(modem_usb_mode)) # Set the modem mode to 0 if necessary. - if modem_mode == 2: - self.modem_mode = 0 + if modem_usb_mode == 2: + self.modem_usb_mode = 0 devices = self.detect_usable_serial_port() self.device_name = devices[0] super().initialize_serial_interface() @@ -89,14 +89,6 @@ def get_sim_otp_response(self, command): return self.last_sim_otp_command_response - # EFFECTS: Handles URC related AT command responses. - def handleURC(self, urc): - if urc.startswith('+CSIM: '): - self.parse_and_populate_last_sim_otp_response(urc.lstrip('+CSIM: ')) - return - - super().handleURC(urc) - def populate_location_obj(self, response): response_list = response.split(',') self.last_location = Location(*response_list) diff --git a/Hologram/Network/Modem/Quectel.py b/Hologram/Network/Modem/Quectel.py index d996dd0..4a6b060 100644 --- a/Hologram/Network/Modem/Quectel.py +++ b/Hologram/Network/Modem/Quectel.py @@ -39,6 +39,7 @@ def send_message(self, data, timeout=Modem.DEFAULT_SEND_TIMEOUT): self.checkURC() if self.urc_state != Modem.SOCKET_SEND_READ: if loop_timeout.expired(): + self.close_socket() # Try closing the socket if we get a timeout raise SerialError('Timeout occurred waiting for message status') time.sleep(self._RETRY_DELAY) elif self.urc_state == Modem.SOCKET_CLOSED: @@ -140,9 +141,9 @@ def _is_pdp_context_active(self): # 1: PDP active return pdpstatus == 1 except (IndexError, ValueError) as e: - self.logger.error(repr(e)) + self.logger.error(repr(e), exc_info=True) except AttributeError as e: - self.logger.error(repr(e)) + self.logger.error(repr(e), exc_info=True) return False def set_network_registration_status(self): @@ -158,4 +159,7 @@ def _set_up_pdp_context(self): self.logger.error('PDP Context setup failed') raise NetworkError('Failed PDP context setup') else: - self.logger.info('PDP context active') \ No newline at end of file + self.logger.info('PDP context active') + + def reset(self): + self.set('+CFUN', '1,1') # restart the modem \ No newline at end of file diff --git a/Hologram/Network/Modem/UBlox.py b/Hologram/Network/Modem/UBlox.py new file mode 100644 index 0000000..a7483ba --- /dev/null +++ b/Hologram/Network/Modem/UBlox.py @@ -0,0 +1,209 @@ +# UBlox.py - Hologram Python SDK Ublox modem interface +# +# Author: Hologram +# +# Copyright 2016 - Hologram (Konekt, Inc.) +# +# +# LICENSE: Distributed under the terms of the MIT License +# +import time +import binascii + +from Hologram.Network.Modem import Modem +from Hologram.Event import Event +from Exceptions.HologramError import NetworkError +from UtilClasses import ModemResult + +class Ublox(Modem): + + def __init__(self, device_name=None, baud_rate='9600', + chatscript_file=None, event=Event()): + + super().__init__(device_name=device_name, baud_rate=baud_rate, + chatscript_file=chatscript_file, event=event) + + def disable_at_sockets_mode(self): + self._at_sockets_available = False + + def enable_at_sockets_mode(self): + self._at_sockets_available = True + + def create_socket(self): + op = self._basic_set('+USOCR', '6', strip_val=False) + if op is not None: + self.socket_identifier = int(op) + + # REQUIRES: The host and port. + # EFFECTS: Issues an AT command to connect to the specified socket identifier. + def connect_socket(self, host, port): + at_command_val = "%d,\"%s\",%s" % (self.socket_identifier, host, port) + ok, _ = self.set('+USOCO', at_command_val, timeout=20) + if ok != ModemResult.OK: + self.logger.error('Failed to connect socket') + raise NetworkError('Failed to connect socket') + else: + self.logger.info('Connect socket is successful') + + def listen_socket(self, port): + at_command_val = "%d,%s" % (self.socket_identifier, port) + self.listen_socket_identifier = self.socket_identifier + ok, _ = self.set('+USOLI', at_command_val, timeout=5) + if ok != ModemResult.OK: + self.logger.error('Failed to listen socket') + raise NetworkError('Failed to listen socket') + + def write_socket(self, data): + self.enable_hex_mode() + hexdata = binascii.hexlify(data) + # We have to do it in chunks of 510 since 512 is actually too long (CMEE error) + # and we need 2n chars for hexified data + for chunk in self._chunks(hexdata, 510): + value = b'%d,%d,\"%s\"' % (self.socket_identifier, + len(binascii.unhexlify(chunk)), + chunk) + ok, _ = self.set('+USOWR', value, timeout=10) + if ok != ModemResult.OK: + self.logger.error('Failed to write to socket') + raise NetworkError('Failed to write socket') + self.disable_hex_mode() + + def read_socket(self, socket_identifier=None, payload_length=None): + if socket_identifier is None: + socket_identifier = self.socket_identifier + + if payload_length is None: + payload_length = self.last_read_payload_length + + self.enable_hex_mode() + + resp = self._basic_set('+USORD', '%d,%d' % (socket_identifier, payload_length)) + if resp is not None: + resp = resp.strip('"') + bytedata = binascii.unhexlify(resp) + try: + resp = bytedata.decode() + except: + # This is some sort of binary data that can't be decoded so just + # return the bytes. We might want to make this happen via parameter + # in the future so it is more deterministic + resp = bytedata + + self.disable_hex_mode() + return resp + + def close_socket(self, socket_identifier=None): + + if socket_identifier is None: + socket_identifier = self.socket_identifier + + ok, r = self.set('+USOCL', "%s" % socket_identifier) + if ok != ModemResult.OK: + self.logger.info('Failed to close socket') + + def handleURC(self, urc): + """ + Handles UBlox URC related AT command responses. + + :param urc: the URC string + :type urc: string + """ + self.logger.debug("URC! %s", urc) + + if urc.startswith('+CSIM: '): + self.parse_and_populate_last_sim_otp_response(urc.lstrip('+CSIM: ')) + return + elif urc.startswith('+UULOC: '): + self._handle_location_urc(urc) + elif urc.startswith('+UUSORD: '): + + # Strip UUSORD socket identifier + payload length from the URC event. + # Example: {+UUSORD: 0,2} -> 0 and 2 + response_list = urc.lstrip('+UUSORD: ').split(',') + socket_identifier = int(response_list[0]) + payload_length = int(response_list[-1]) + + if self.urc_state == Modem.SOCKET_RECEIVE_READ: + self._read_and_append_message_receive_buffer(socket_identifier, payload_length) + else: + self.socket_identifier = socket_identifier + self.last_read_payload_length = payload_length + self.urc_state = Modem.SOCKET_SEND_READ + elif urc.startswith('+UUSOLI: '): + self._handle_listen_urc(urc) + self.last_read_payload_length = 0 + self.urc_state = Modem.SOCKET_RECEIVE_READ + elif urc.startswith('+UUPSDD: '): + self.event.broadcast('cellular.forced_disconnect') + elif urc.startswith('+UUSOCL: '): + self.urc_state = Modem.SOCKET_CLOSED + + super().handleURC(urc) + + def _is_pdp_context_active(self): + if not self.is_registered(): + return False + + ok, r = self.set('+UPSND', '0,8') + if ok == ModemResult.OK: + try: + pdpstatus = int(r.lstrip('UPSND: ').split(',')[2]) + # 1: PDP active + return pdpstatus == 1 + except (IndexError, ValueError) as e: + self.logger.error(repr(e)) + return False + + def _set_up_pdp_context(self): + if self._is_pdp_context_active(): return True + self.logger.info('Setting up PDP context') + self.set('+UPSD', f'0,1,\"{self._apn}\"') + self.set('+UPSD', '0,7,\"0.0.0.0\"') + ok, _ = self.set('+UPSDA', '0,3', timeout=30) + if ok != ModemResult.OK: + self.logger.error('PDP Context setup failed') + raise NetworkError('Failed PDP context setup') + else: + self.logger.info('PDP context active') + + def _tear_down_pdp_context(self): + if not self._is_pdp_context_active(): return True + self.logger.info('Tearing down PDP context') + ok, _ = self.set('+UPSDA', '0,4', timeout=30) + if ok != ModemResult.OK: + self.logger.error('PDP Context tear down failed') + else: + self.logger.info('PDP context deactivated') + + def enable_hex_mode(self): + self.command('+UDCONF', '1,1') + + def disable_hex_mode(self): + self.command('+UDCONF', '1,0') + + def reset(self): + self.set('+CFUN', '16') # restart the modem + + @property + def modem_usb_mode(self): + mode_number = None + # trim: + # +UUSBCONF: 0,"",,"0x1102" -> 0 + # +UUSBCONF: 2,"ECM",,"0x1104" -> 2 + try: + ok, res = self.read('+UUSBCONF') + if ok == ModemResult.OK: + mode_number = int(res.lstrip('+UUSBCONF: ').split(',')[0]) + except (IndexError, ValueError) as e: + self.logger.error(repr(e)) + return mode_number + + @modem_usb_mode.setter + def modem_usb_mode(self, mode): + self.set('+UUSBCONF', str(mode)) + self.logger.info('Restarting modem') + self.reset() + self.logger.info('Modem restarted') + self.closeSerialPort() + time.sleep(Modem.DEFAULT_MODEM_RESTART_TIME) + diff --git a/tests/Modem/test_Modem.py b/tests/Modem/test_Modem.py index e290d14..a5d48d2 100644 --- a/tests/Modem/test_Modem.py +++ b/tests/Modem/test_Modem.py @@ -107,18 +107,6 @@ def test_get_sms(no_serial_port, get_sms): assert(res.timestamp == datetime.utcfromtimestamp(1498264009)) assert(res.message == 'Test 123') -# WRITE SOCKET - -def test_socket_write_under_512(no_serial_port, override_command_result): - modem = Modem() - data = '{message:{fill}{align}{width}}'.format(message='Test-', fill='@', align='<', width=64) - modem.write_socket(data.encode()) - -def test_socket_write_over_512(no_serial_port, override_command_result): - modem = Modem() - data = '{message:{fill}{align}{width}}'.format(message='Test-', fill='@', align='<', width=600) - modem.write_socket(data.encode()) - # DEBUGWRITE def test_debugwrite(no_serial_port): diff --git a/tests/Modem/test_Nova.py b/tests/Modem/test_Nova.py deleted file mode 100644 index 92cacc1..0000000 --- a/tests/Modem/test_Nova.py +++ /dev/null @@ -1,56 +0,0 @@ -# Author: Hologram -# -# Copyright 2017 - Hologram (Konekt, Inc.) -# -# LICENSE: Distributed under the terms of the MIT License -# -# test_Nova.py - This file implements unit tests for the Nova modem interface. - -import pytest -import sys - -from Hologram.Network.Modem.Nova import Nova - -sys.path.append(".") -sys.path.append("..") -sys.path.append("../..") - -def mock_write(nova, message): - return True - -def mock_read(nova): - return True - -def mock_readline(nova, timeout=None, hide=False): - return '' - -def mock_open_serial_port(nova, device_name=None): - return True - -def mock_close_serial_port(nova): - return True - -def mock_detect_usable_serial_port(nova, stop_on_first=True): - return '/dev/ttyUSB0' - -@pytest.fixture -def no_serial_port(monkeypatch): - monkeypatch.setattr(Nova, '_read_from_serial_port', mock_read) - monkeypatch.setattr(Nova, '_readline_from_serial_port', mock_readline) - monkeypatch.setattr(Nova, '_write_to_serial_port_and_flush', mock_write) - monkeypatch.setattr(Nova, 'openSerialPort', mock_open_serial_port) - monkeypatch.setattr(Nova, 'closeSerialPort', mock_close_serial_port) - monkeypatch.setattr(Nova, 'detect_usable_serial_port', mock_detect_usable_serial_port) - -def test_init_nova_no_args(no_serial_port): - modem = Nova() - assert(modem.timeout == 1) - assert(modem.socket_identifier == 0) - assert(modem.chatscript_file.endswith('/chatscripts/default-script')) - assert(modem._at_sockets_available == False) - -def test_init_nova_chatscriptfileoverride(no_serial_port): - modem = Nova(chatscript_file='test-chatscript') - assert(modem.timeout == 1) - assert(modem.socket_identifier == 0) - assert(modem.chatscript_file == 'test-chatscript') diff --git a/tests/Modem/test_UBlox.py b/tests/Modem/test_UBlox.py new file mode 100644 index 0000000..c1a64d2 --- /dev/null +++ b/tests/Modem/test_UBlox.py @@ -0,0 +1,76 @@ +# Author: Hologram +# +# Copyright 2017 - Hologram (Konekt, Inc.) +# +# LICENSE: Distributed under the terms of the MIT License +# +# test_Ublox.py - This file implements unit tests for the Ublox modem interface. + +import pytest +import sys + +from Hologram.Network.Modem.UBlox import Ublox +from UtilClasses import ModemResult + +sys.path.append(".") +sys.path.append("..") +sys.path.append("../..") + +def mock_write(Ublox, message): + return True + +def mock_read(Ublox): + return True + +def mock_readline(Ublox, timeout=None, hide=False): + return '' + +def mock_open_serial_port(Ublox, device_name=None): + return True + +def mock_close_serial_port(Ublox): + return True + +def mock_detect_usable_serial_port(Ublox, stop_on_first=True): + return '/dev/ttyUSB0' + +def mock_result(modem): + return (ModemResult.OK, None) + +@pytest.fixture +def no_serial_port(monkeypatch): + monkeypatch.setattr(Ublox, '_read_from_serial_port', mock_read) + monkeypatch.setattr(Ublox, '_readline_from_serial_port', mock_readline) + monkeypatch.setattr(Ublox, '_write_to_serial_port_and_flush', mock_write) + monkeypatch.setattr(Ublox, 'openSerialPort', mock_open_serial_port) + monkeypatch.setattr(Ublox, 'closeSerialPort', mock_close_serial_port) + monkeypatch.setattr(Ublox, 'detect_usable_serial_port', mock_detect_usable_serial_port) + +@pytest.fixture +def override_command_result(monkeypatch): + monkeypatch.setattr(Ublox, '_command_result', mock_result) + +def test_init_Ublox_no_args(no_serial_port): + modem = Ublox() + assert(modem.timeout == 1) + assert(modem.socket_identifier == 0) + assert(modem.chatscript_file.endswith('/chatscripts/default-script')) + assert(modem._at_sockets_available == False) + +def test_init_Ublox_chatscriptfileoverride(no_serial_port): + modem = Ublox(chatscript_file='test-chatscript') + assert(modem.timeout == 1) + assert(modem.socket_identifier == 0) + assert(modem.chatscript_file == 'test-chatscript') + +# WRITE SOCKET + +def test_socket_write_under_512(no_serial_port, override_command_result): + modem = Ublox() + data = '{message:{fill}{align}{width}}'.format(message='Test-', fill='@', align='<', width=64) + modem.write_socket(data.encode()) + +def test_socket_write_over_512(no_serial_port, override_command_result): + modem = Ublox() + data = '{message:{fill}{align}{width}}'.format(message='Test-', fill='@', align='<', width=600) + modem.write_socket(data.encode()) \ No newline at end of file