From f3cebf7aed3818a7b1ab4b0583dcd6cafbc7fe32 Mon Sep 17 00:00:00 2001 From: elschnorro77 <57951003+elschnorro77@users.noreply.github.com> Date: Tue, 1 Feb 2022 14:16:58 +0100 Subject: [PATCH 1/4] Update __init__.py fix _write_sentence for pyhton3 fix send_command for pyhton3 add missung elements from nmea sentences add initialisation of PA1010D add waitforfix --- library/pa1010d/__init__.py | 324 ++++++++++++++++++++++++++++++------ 1 file changed, 269 insertions(+), 55 deletions(-) diff --git a/library/pa1010d/__init__.py b/library/pa1010d/__init__.py index ed75844..f6e58f6 100644 --- a/library/pa1010d/__init__.py +++ b/library/pa1010d/__init__.py @@ -4,8 +4,13 @@ import pynmea2 +import logging +import inspect -__version__ = '0.0.3' +loggername='PA1010D' #+ inspect.getfile(inspect.currentframe()) +logger = logging.getLogger(loggername) + +__version__ = '0.0.3.1' PA1010D_ADDR = 0x10 @@ -13,24 +18,46 @@ class PA1010D(): __slots__ = ( "timestamp", + "datestamp", + "datetimestamp", "latitude", "longitude", "altitude", + "altitude_units", + "lat", "lat_dir", + "lon", "lon_dir", "geo_sep", + "geo_sep_units", + "horizontal_dil", "num_sats", "gps_qual", + "status", + "faa_mode", + "true_track", + "true_track_sym", + "mag_track", + "mag_track_sym", + "spd_over_grnd_kts", + "spd_over_grnd_kts_sym", + "spd_over_grnd_kmph", + "spd_over_grnd_kmph_sym", "speed_over_ground", + "mag_variation", + "mag_var_dir", "mode_fix_type", "pdop", "hdop", "vdop", + "age_gps_data", + "ref_station_id", "_i2c_addr", "_i2c", "_debug" ) + def __init__(self, i2c_addr=PA1010D_ADDR, debug=False): self._i2c_addr = i2c_addr self._i2c = smbus.SMBus(1) @@ -38,21 +65,41 @@ def __init__(self, i2c_addr=PA1010D_ADDR, debug=False): self._debug = debug self.timestamp = None + self.datetimestamp = None self.latitude = None self.longitude = None self.altitude = None + self.altitude_units = None self.num_sats = None self.gps_qual = None + self.status = None + self.faa_mode = None + self.true_track = None + self.true_track_sym = None + self.mag_track = None + self.mag_track_sym = None + self.spd_over_grnd_kts = None + self.spd_over_grnd_kts_sym = None + self.spd_over_grnd_kmph = None + self.spd_over_grnd_kmph_sym = None + self.horizontal_dil = None + self.lat = None self.lat_dir = None + self.lon = None self.lon_dir = None self.geo_sep = None + self.geo_sep_units = None + self.age_gps_data = None + self.ref_station_id = None self.pdop = None self.hdop = None self.vdop = None self.speed_over_ground = None + self.mag_variation = None + self.mag_var_dir = None self.mode_fix_type = None @property @@ -67,7 +114,7 @@ def _write_sentence(self, bytestring): """ for char_index in bytestring: - self._i2c.write_byte(self._i2c_addr, ord(char_index)) + self._i2c.write_byte(self._i2c_addr, char_index) def send_command(self, command, add_checksum=True): """Send a command string to the PA1010D. @@ -76,9 +123,9 @@ def send_command(self, command, add_checksum=True): """ # TODO replace with pynmea2 functionality - if command.startswith("$"): + if command.startswith(b"$"): command = command[1:] - if command.endswith("*"): + if command.endswith(b"*"): command = command[:-1] buf = bytearray() @@ -116,7 +163,7 @@ def read_sentence(self, timeout=5): raise TimeoutError("Timeout waiting for readline") - def update(self, wait_for="GGA", timeout=5): + def update(self, wait_for="GGA", timeout=5, waitforfix=False): """Attempt to update from PA1010D. Returns true if a sentence has been successfully parsed. @@ -127,6 +174,7 @@ def update(self, wait_for="GGA", timeout=5): :param wait_for: Message type to wait for. :param timeout: Wait timeout in seconds + :param waitforfix: Will return true only once GPS signal has fix """ timeout += time.time() @@ -135,62 +183,157 @@ def update(self, wait_for="GGA", timeout=5): try: sentence = self.read_sentence() except TimeoutError: + logger.debug("Timeout error") continue try: result = pynmea2.parse(sentence) except pynmea2.nmea.ParseError: if self._debug: - print("Parse error: {sentence}".format(sentence=sentence)) + logger.debug("Parse error: {sentence}".format(sentence=sentence)) continue # Time, position and fix - if type(result) == pynmea2.GGA: - if result.gps_qual is None: - self.num_sats = 0 - self.gps_qual = 0 - else: - self.timestamp = result.timestamp - self.latitude = result.latitude - self.longitude = result.longitude - self.lat_dir = result.lat_dir - self.lon_dir = result.lon_dir - self.altitude = result.altitude - self.geo_sep = result.geo_sep - self.num_sats = result.num_sats - self.gps_qual = result.gps_qual - if wait_for == "GGA": - return True + if type(result) == pynmea2.GGA: #$--GGA,hhmmss.ss,llll.ll,a,yyyyy.yy,a,x,xx,x.x,x.x,M,x.x,M,x.x,xxxx*hh + logger.debug("GGA: " + str(result)) + if result.gps_qual is not None: + if waitforfix and result.gps_qual > 0: + self.timestamp = result.timestamp + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.gps_qual = result.gps_qual + self.num_sats = result.num_sats + self.horizontal_dil = result.horizontal_dil + self.altitude = float(result.altitude or 0.0) + self.altitude_units = result.altitude_units + self.geo_sep = float(result.geo_sep or 0.0) + self.geo_sep_units = result.geo_sep_units + self.age_gps_data = result.age_gps_data + self.ref_station_id = result.ref_station_id + if wait_for == "GGA": + return True + elif not waitforfix: + self.timestamp = result.timestamp + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.gps_qual = result.gps_qual + self.num_sats = result.num_sats + self.horizontal_dil = result.horizontal_dil + self.altitude = float(result.altitude or 0.0) + self.altitude_units = result.altitude_units + self.geo_sep = float(result.geo_sep or 0.0) + self.geo_sep_units = result.geo_sep_units + self.age_gps_data = result.age_gps_data + self.ref_station_id = result.ref_station_id + if wait_for == "GGA": + return True # Geographic Lat/Lon (Loran holdover) - elif type(result) == pynmea2.GLL: - pass + elif type(result) == pynmea2.GLL: #$--GLL,llll.ll,a,yyyyy.yy,a,hhmmss.ss,A*hh + logger.debug("GLL: " + str(result)) + if result.status is not None: + if waitforfix and result.status == 'A': + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.timestamp = result.timestamp + self.status = result.status + self.faa_mode = result.faa_mode + if wait_for == "GLL": + return True + if not waitforfix: + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.timestamp = result.timestamp + self.status = result.status + self.faa_mode = result.faa_mode + if wait_for == "GLL": + return True # GPS DOP and active satellites - elif type(result) == pynmea2.GSA: - self.mode_fix_type = result.mode_fix_type - self.pdop = result.pdop - self.hdop = result.hdop - self.vdop = result.vdop - if wait_for == "GSA": - return True + elif type(result) == pynmea2.GSA: #$--GSA,a,a,x,x,x,x,x,x,x,x,x,x,x,x,x,x,x.x,x.x,x.x*hh + logger.debug("GSA: " + str(result)) + #self.mode_fix_type = result.mode_fix_type + self.pdop = result.pdop + self.hdop = result.hdop + self.vdop = result.vdop + if wait_for == "GSA": + return True # Position, velocity and time - elif type(result) == pynmea2.RMC: - self.speed_over_ground = result.spd_over_grnd - if wait_for == "RMC": - return True + elif type(result) == pynmea2.RMC: #$--RMC,hhmmss.ss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,xxxx,x.x,a*hh + logger.debug("RMC: " + str(result)) + if result.status is not None: + if waitforfix and result.status == 'A': + self.timestamp = result.timestamp + self.status = result.status + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.speed_over_ground = result.spd_over_grnd + self.mag_variation = result.mag_variation + self.mag_var_dir = result.mag_var_dir + self.datetimestamp = result.datetime + if wait_for == "RMC": + return True + elif not waitforfix: + self.timestamp = result.timestamp + self.status = result.status + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.speed_over_ground = result.spd_over_grnd + self.mag_variation = result.mag_variation + self.mag_var_dir = result.mag_var_dir + self.datetimestamp = result.datetime + if wait_for == "RMC": + return True + + # Track made good and speed over ground - elif type(result) == pynmea2.VTG: - if wait_for == "VTG": - return True + elif type(result) == pynmea2.VTG: #$--VTG,x.x,T,x.x,M,x.x,N,x.x,K*hh + logger.debug("VTG: " + str(result)) + self.true_track = result.true_track + self.true_track_sym = result.true_track_sym + self.mag_track = result.mag_track + self.mag_track_sym = result.mag_track_sym + self.spd_over_grnd_kts = result.spd_over_grnd_kts + self.spd_over_grnd_kts_sym = result.spd_over_grnd_kts_sym + self.spd_over_grnd_kmph = result.spd_over_grnd_kmph + self.spd_over_grnd_kmph_sym = result.spd_over_grnd_kmph_sym + self.faa_mode = result.faa_mode + if wait_for == "VTG": + return True # SVs in view, PRN, elevation, azimuth and SNR - elif type(result) == pynmea2.GSV: + elif type(result) == pynmea2.GSV: #$--GSV,x,x,x,x,x,x,x,...*hh + logger.debug("GSV: " + str(result)) if wait_for == "GSV": return True + # ProprietarySentence handles boot up output such as "$PMTK011,MTKGPS*08" elif type(result) == pynmea2.ProprietarySentence: # TODO If we implement sending commands *to* the GPS, @@ -198,8 +341,9 @@ def update(self, wait_for="GGA", timeout=5): # $PMTK011,MTKGPS*08 Successful bootup # $PMTK010,001*2E Startup # $PMTK010,002*2D Wake from standby, normal operation - print(sentence) - return True + logger.debug("ProprietarySentence:" + str(result)) + if wait_for == "": + return True else: # If native MTK support exists, check for those message types @@ -210,7 +354,9 @@ def update(self, wait_for="GGA", timeout=5): pynmea2.types.proprietary.mtk.MTK011, pynmea2.types.proprietary.mtk.MTK010 ): - return True + print(sentence) + if wait_for == "": + return True except AttributeError: pass raise RuntimeError("Unsupported message type {type} ({sentence})".format(type=type(result), sentence=sentence)) @@ -219,18 +365,86 @@ def update(self, wait_for="GGA", timeout=5): if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) gps = PA1010D() + #request firmware version + #gps.send_command(b'PMTK605') + + timeout=45 + waitforfix=False + + nema_type="GGA" + + These are NMEA extensions for PMTK_314_SET_NMEA_OUTPUT + # Turn off everything: + gps.send_command(b'PMTK314,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + + if nema_type=="GGA" or nema_type=="RMC" or nema_type=="VTG": + # Turn on the basic GGA, RMC and VTG info (what you typically want) + gps.send_command(b'PMTK314,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GLL only): + #gps.send_command(b'PMTK314,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (RMC only): + # gps.send_command(b'PMTK314,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (VTG only): + #gps.send_command(b'PMTK314,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GGA only): + # gps.send_command(b'PMTK314,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GSA only): + #gps.send_command(b'PMTK314,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + #Turn on just minimum info (GSV only): + #gps.send_command(b'PMTK314,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0') + #Turn on everything + #gps.send_command(b'PMTK314,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0') + + result=False + try: + while True: + try: + result = gps.update(nema_type, timeout, waitforfix) + except TimeoutError: + continue - while True: - result = gps.update() - if result: - print(f""" -Time: {gps.timestamp} -Longitude: {gps.longitude: .5f} {gps.lon_dir} -Latitude: {gps.latitude: .5f} {gps.lat_dir} -Altitude: {gps.altitude} -Geoid_Sep: {gps.geo_sep} -Geoid_Alt: {float(gps.altitude) + -float(gps.geo_sep)} -Used Sats: {gps.num_sats} -Quality: {gps.gps_qual}""") - time.sleep(1.0) + if result: + if nema_type == "RMC": + #logger.debug("Time: " + gps.timestamp.strftime("%H:%M:%S") + " DateTime: " + gps.datetimestamp.strftime("%d/%m/%Y %H:%M:%S") + " Status: " + gps.status + " Longitude: " + str(gps.longitude) + "long dir: " + gps.lon_dir + " Latitude: " + str(gps.latitude) + " latitude dir "+ gps.lat_dir + "Altitude: " + str(gps.altitude) + "Geoid_Sep: " + gps.geo_sep ) + #+ "Geoid_Alt: " + str(float(gps.altitude)) + str(float(gps.geo_sep))) + print(f""" + Time: {gps.timestamp} + DateTime: {gps.datetimestamp} + Status: {gps.status} + Longitude: {gps.lon} {gps.lon_dir} + Latitude: {gps.lat} {gps.lat_dir} + Longitude: {gps.longitude: .5f} + Latitude: {gps.latitude: .5f} + """) + if nema_type == "GGA": + #logger.debug("Time: " + gps.timestamp.strftime("%H:%M:%S") + " Longitude: " + str(gps.longitude) + "long dir: " + gps.lon_dir + " Latitude: " + str(gps.latitude) + " latitude dir "+ gps.lat_dir + " Altitude: " + str(gps.altitude) + " Geoid_Sep: " + str(gps.geo_sep) + "Geoid_Alt: " + str(float(gps.altitude) + -float(gps.geo_sep)) + " Horiz_dil: " + str(gps.horizontal_dil) + " Used Sats: " + str(gps.num_sats) + " Quality: " + str(gps.gps_qual)) + print(f""" + Time: {gps.timestamp} + Longitude: {gps.lon} {gps.lon_dir} + Latitude: {gps.lat} {gps.lat_dir} + Longitude: {gps.longitude: .5f} + Latitude: {gps.latitude: .5f} + Altitude: {gps.altitude} {gps.altitude_units} + Geoid_Sep: {gps.geo_sep} {gps.geo_sep_units} + Geoid_Alt: {(float(gps.altitude) + -float(gps.geo_sep)): .1f} + Horiz_dil: {gps.horizontal_dil} + Used Sats: {gps.num_sats} + Quality: {gps.gps_qual} + Aged: {gps.age_gps_data} + RefStation:{gps.ref_station_id} + """) + if nema_type == "VTG": + print(f""" + true track: {gps.true_track} {gps.true_track_sym} + magnetic track: {gps.mag_track} {gps.mag_track_sym} + Speed over ground + knots: {gps.spd_over_grnd_kts} {gps.spd_over_grnd_kts_sym} + kilometer per hour: {gps.spd_over_grnd_kmph} {gps.spd_over_grnd_kmph_sym} + faa_mode: {gps.faa_mode} + """) + + time.sleep(1.0) + except Exception as ex: + logger.exception("Unhandled Exception: " + str(ex)) From 76c47e1c784eac73af9a1294a26e9433a9cda634 Mon Sep 17 00:00:00 2001 From: elschnorro77 <57951003+elschnorro77@users.noreply.github.com> Date: Thu, 3 Feb 2022 16:36:16 +0100 Subject: [PATCH 2/4] add error handling --- library/pa1010d/__init__.py | 498 ++++-------------------------------- 1 file changed, 55 insertions(+), 443 deletions(-) diff --git a/library/pa1010d/__init__.py b/library/pa1010d/__init__.py index f6e58f6..94cc5ed 100644 --- a/library/pa1010d/__init__.py +++ b/library/pa1010d/__init__.py @@ -1,450 +1,62 @@ -import time -import smbus - - -import pynmea2 - -import logging -import inspect - -loggername='PA1010D' #+ inspect.getfile(inspect.currentframe()) -logger = logging.getLogger(loggername) - -__version__ = '0.0.3.1' - -PA1010D_ADDR = 0x10 - - -class PA1010D(): - __slots__ = ( - "timestamp", - "datestamp", - "datetimestamp", - "latitude", - "longitude", - "altitude", - "altitude_units", - "lat", - "lat_dir", - "lon", - "lon_dir", - "geo_sep", - "geo_sep_units", - "horizontal_dil", - "num_sats", - "gps_qual", - "status", - "faa_mode", - "true_track", - "true_track_sym", - "mag_track", - "mag_track_sym", - "spd_over_grnd_kts", - "spd_over_grnd_kts_sym", - "spd_over_grnd_kmph", - "spd_over_grnd_kmph_sym", - "speed_over_ground", - "mag_variation", - "mag_var_dir", - "mode_fix_type", - "pdop", - "hdop", - "vdop", - "age_gps_data", - "ref_station_id", - "_i2c_addr", - "_i2c", - "_debug" - ) - - - def __init__(self, i2c_addr=PA1010D_ADDR, debug=False): - self._i2c_addr = i2c_addr - self._i2c = smbus.SMBus(1) - - self._debug = debug - - self.timestamp = None - self.datetimestamp = None - self.latitude = None - self.longitude = None - self.altitude = None - self.altitude_units = None - self.num_sats = None - self.gps_qual = None - - self.status = None - self.faa_mode = None - self.true_track = None - self.true_track_sym = None - self.mag_track = None - self.mag_track_sym = None - self.spd_over_grnd_kts = None - self.spd_over_grnd_kts_sym = None - self.spd_over_grnd_kmph = None - self.spd_over_grnd_kmph_sym = None - self.horizontal_dil = None - self.lat = None - self.lat_dir = None - self.lon = None - self.lon_dir = None - self.geo_sep = None - self.geo_sep_units = None - - self.age_gps_data = None - self.ref_station_id = None - self.pdop = None - self.hdop = None - self.vdop = None - - self.speed_over_ground = None - self.mag_variation = None - self.mag_var_dir = None - self.mode_fix_type = None - - @property - def data(self): - return dict((slot, getattr(self, slot)) for slot in self.__slots__) - - def _write_sentence(self, bytestring): - """Write a sentence to the PA1010D device over i2c. - - We could- in theory- do this in one burst, but since smbus is limited to 32bytes, - we would have to chunk the message and this is already one byte chunks anyway! - - """ - for char_index in bytestring: - self._i2c.write_byte(self._i2c_addr, char_index) - - def send_command(self, command, add_checksum=True): - """Send a command string to the PA1010D. - - If add_checksum is True (the default) a NMEA checksum will automatically be computed and added. - - """ - # TODO replace with pynmea2 functionality - if command.startswith(b"$"): - command = command[1:] - if command.endswith(b"*"): - command = command[:-1] - - buf = bytearray() - buf += b'$' - buf += command - if add_checksum: - checksum = 0 - # bytes() is a real thing in Python 3 - # so `for char in commaud` iterates through char ordinals - for char in command: - checksum ^= char - buf += b'*' # Delimits checksum value - buf += "{checksum:02X}".format(checksum=checksum).encode("ascii") - buf += b'\r\n' - self._write_sentence(buf) - - def read_sentence(self, timeout=5): - """Attempt to read an NMEA sentence from the PA1010D.""" - buf = [] - timeout += time.time() - - while time.time() < timeout: - char = self._i2c.read_byte_data(self._i2c_addr, 0x00) - - if len(buf) == 0 and char != ord("$"): - continue +#!/usr/bin/env python3 +# -*- Coding: utf-8 -*- +# from https://github.com/ShuDiamonds/PCF8591/blob/master/PCF8591.py +###### pin assign +#PCF8591 --------- Raspberry pi3 +# SDA ------------ GPIO2/SDA1 +# SCL ------------ GPIO3/SCL1 +# VCC ------------- 3.3V +# GND ------------- GND +# +###### - buf += [char] - # Check for end of line - # Should be a full \r\n since the GPS emits spurious newlines - if buf[-2:] == [ord("\r"), ord("\n")]: - # Remove line ending and spurious newlines from the sentence - return bytearray(buf).decode("ascii").strip().replace("\n","") - raise TimeoutError("Timeout waiting for readline") - - def update(self, wait_for="GGA", timeout=5, waitforfix=False): - """Attempt to update from PA1010D. - - Returns true if a sentence has been successfully parsed. - - Returns false if an error has occured. - - Will wait 5 seconds for a GGA message by default. - - :param wait_for: Message type to wait for. - :param timeout: Wait timeout in seconds - :param waitforfix: Will return true only once GPS signal has fix - - """ - timeout += time.time() - - while time.time() < timeout: - try: - sentence = self.read_sentence() - except TimeoutError: - logger.debug("Timeout error") - continue - - try: - result = pynmea2.parse(sentence) - except pynmea2.nmea.ParseError: - if self._debug: - logger.debug("Parse error: {sentence}".format(sentence=sentence)) - continue - - # Time, position and fix - if type(result) == pynmea2.GGA: #$--GGA,hhmmss.ss,llll.ll,a,yyyyy.yy,a,x,xx,x.x,x.x,M,x.x,M,x.x,xxxx*hh - logger.debug("GGA: " + str(result)) - if result.gps_qual is not None: - if waitforfix and result.gps_qual > 0: - self.timestamp = result.timestamp - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.gps_qual = result.gps_qual - self.num_sats = result.num_sats - self.horizontal_dil = result.horizontal_dil - self.altitude = float(result.altitude or 0.0) - self.altitude_units = result.altitude_units - self.geo_sep = float(result.geo_sep or 0.0) - self.geo_sep_units = result.geo_sep_units - self.age_gps_data = result.age_gps_data - self.ref_station_id = result.ref_station_id - if wait_for == "GGA": - return True - elif not waitforfix: - self.timestamp = result.timestamp - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.gps_qual = result.gps_qual - self.num_sats = result.num_sats - self.horizontal_dil = result.horizontal_dil - self.altitude = float(result.altitude or 0.0) - self.altitude_units = result.altitude_units - self.geo_sep = float(result.geo_sep or 0.0) - self.geo_sep_units = result.geo_sep_units - self.age_gps_data = result.age_gps_data - self.ref_station_id = result.ref_station_id - if wait_for == "GGA": - return True - - # Geographic Lat/Lon (Loran holdover) - elif type(result) == pynmea2.GLL: #$--GLL,llll.ll,a,yyyyy.yy,a,hhmmss.ss,A*hh - logger.debug("GLL: " + str(result)) - if result.status is not None: - if waitforfix and result.status == 'A': - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.timestamp = result.timestamp - self.status = result.status - self.faa_mode = result.faa_mode - if wait_for == "GLL": - return True - if not waitforfix: - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.timestamp = result.timestamp - self.status = result.status - self.faa_mode = result.faa_mode - if wait_for == "GLL": - return True - - # GPS DOP and active satellites - elif type(result) == pynmea2.GSA: #$--GSA,a,a,x,x,x,x,x,x,x,x,x,x,x,x,x,x,x.x,x.x,x.x*hh - logger.debug("GSA: " + str(result)) - #self.mode_fix_type = result.mode_fix_type - self.pdop = result.pdop - self.hdop = result.hdop - self.vdop = result.vdop - if wait_for == "GSA": - return True - - # Position, velocity and time - elif type(result) == pynmea2.RMC: #$--RMC,hhmmss.ss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,xxxx,x.x,a*hh - logger.debug("RMC: " + str(result)) - if result.status is not None: - if waitforfix and result.status == 'A': - self.timestamp = result.timestamp - self.status = result.status - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.speed_over_ground = result.spd_over_grnd - self.mag_variation = result.mag_variation - self.mag_var_dir = result.mag_var_dir - self.datetimestamp = result.datetime - if wait_for == "RMC": - return True - elif not waitforfix: - self.timestamp = result.timestamp - self.status = result.status - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.speed_over_ground = result.spd_over_grnd - self.mag_variation = result.mag_variation - self.mag_var_dir = result.mag_var_dir - self.datetimestamp = result.datetime - if wait_for == "RMC": - return True - - - - # Track made good and speed over ground - elif type(result) == pynmea2.VTG: #$--VTG,x.x,T,x.x,M,x.x,N,x.x,K*hh - logger.debug("VTG: " + str(result)) - self.true_track = result.true_track - self.true_track_sym = result.true_track_sym - self.mag_track = result.mag_track - self.mag_track_sym = result.mag_track_sym - self.spd_over_grnd_kts = result.spd_over_grnd_kts - self.spd_over_grnd_kts_sym = result.spd_over_grnd_kts_sym - self.spd_over_grnd_kmph = result.spd_over_grnd_kmph - self.spd_over_grnd_kmph_sym = result.spd_over_grnd_kmph_sym - self.faa_mode = result.faa_mode - if wait_for == "VTG": - return True - - # SVs in view, PRN, elevation, azimuth and SNR - elif type(result) == pynmea2.GSV: #$--GSV,x,x,x,x,x,x,x,...*hh - logger.debug("GSV: " + str(result)) - if wait_for == "GSV": - return True - - - # ProprietarySentence handles boot up output such as "$PMTK011,MTKGPS*08" - elif type(result) == pynmea2.ProprietarySentence: - # TODO If we implement sending commands *to* the GPS, - # they should not be permitted until after receiving this sequence - # $PMTK011,MTKGPS*08 Successful bootup - # $PMTK010,001*2E Startup - # $PMTK010,002*2D Wake from standby, normal operation - logger.debug("ProprietarySentence:" + str(result)) - if wait_for == "": - return True - - else: - # If native MTK support exists, check for those message types - # requires merge and release of: https://github.com/Knio/pynmea2/pull/111 - # TODO Drop this special case when #111 is merged & released - try: - if type(result) in ( - pynmea2.types.proprietary.mtk.MTK011, - pynmea2.types.proprietary.mtk.MTK010 - ): - print(sentence) - if wait_for == "": - return True - except AttributeError: - pass - raise RuntimeError("Unsupported message type {type} ({sentence})".format(type=type(result), sentence=sentence)) - - raise TimeoutError("Timeout waiting for {wait_for} message.".format(wait_for=wait_for)) +import wiringpi +import time +class PCF8591: + def __init__(self, addr): + wiringpi.wiringPiSetup() #setup wiringpi + + self.i2c = wiringpi.I2C() #get I2C + self.dev = self.i2c.setup(addr) #setup I2C device + + def LED_ON(self): + self.i2c.writeReg8(self.dev, 0x40, 0xFF) + + def LED_OFF(self): + self.i2c.writeReg8(self.dev, 0x40, 0x00) + + def DAoutput(self,value): + self.i2c.writeReg8(self.dev, 0x40, value) + + def analogRead0(self): + self.i2c.writeReg8(self.dev, 0x48,0x40) + self.i2c.readReg8(self.dev,0x48) #read dummy + return self.i2c.readReg8(self.dev,0x48) + + def analogRead1(self): + self.i2c.writeReg8(self.dev, 0x48,0x41) + self.i2c.readReg8(self.dev,0x48) #read dummy + return self.i2c.readReg8(self.dev,0x48) + + def analogRead2(self): + self.i2c.writeReg8(self.dev, 0x48,0x42) + self.i2c.readReg8(self.dev,0x48) #read dummy + return self.i2c.readReg8(self.dev,0x48) + def analogRead3(self): + self.i2c.writeReg8(self.dev, 0x48,0x43) + self.i2c.readReg8(self.dev,0x48) #read dummy + return self.i2c.readReg8(self.dev,0x48) + def analogRead(self,pin): + self.i2c.writeReg8(self.dev, 0x48,0x40+pin) + self.i2c.readReg8(self.dev,0x48) #read dummy + return self.i2c.readReg8(self.dev,0x48) if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - gps = PA1010D() - #request firmware version - #gps.send_command(b'PMTK605') - - timeout=45 - waitforfix=False - - nema_type="GGA" - - These are NMEA extensions for PMTK_314_SET_NMEA_OUTPUT - # Turn off everything: - gps.send_command(b'PMTK314,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - - if nema_type=="GGA" or nema_type=="RMC" or nema_type=="VTG": - # Turn on the basic GGA, RMC and VTG info (what you typically want) - gps.send_command(b'PMTK314,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (GLL only): - #gps.send_command(b'PMTK314,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (RMC only): - # gps.send_command(b'PMTK314,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (VTG only): - #gps.send_command(b'PMTK314,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (GGA only): - # gps.send_command(b'PMTK314,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (GSA only): - #gps.send_command(b'PMTK314,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - #Turn on just minimum info (GSV only): - #gps.send_command(b'PMTK314,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0') - #Turn on everything - #gps.send_command(b'PMTK314,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0') - - result=False - try: - while True: - try: - result = gps.update(nema_type, timeout, waitforfix) - except TimeoutError: - continue - - if result: - if nema_type == "RMC": - #logger.debug("Time: " + gps.timestamp.strftime("%H:%M:%S") + " DateTime: " + gps.datetimestamp.strftime("%d/%m/%Y %H:%M:%S") + " Status: " + gps.status + " Longitude: " + str(gps.longitude) + "long dir: " + gps.lon_dir + " Latitude: " + str(gps.latitude) + " latitude dir "+ gps.lat_dir + "Altitude: " + str(gps.altitude) + "Geoid_Sep: " + gps.geo_sep ) - #+ "Geoid_Alt: " + str(float(gps.altitude)) + str(float(gps.geo_sep))) - print(f""" - Time: {gps.timestamp} - DateTime: {gps.datetimestamp} - Status: {gps.status} - Longitude: {gps.lon} {gps.lon_dir} - Latitude: {gps.lat} {gps.lat_dir} - Longitude: {gps.longitude: .5f} - Latitude: {gps.latitude: .5f} - """) - if nema_type == "GGA": - #logger.debug("Time: " + gps.timestamp.strftime("%H:%M:%S") + " Longitude: " + str(gps.longitude) + "long dir: " + gps.lon_dir + " Latitude: " + str(gps.latitude) + " latitude dir "+ gps.lat_dir + " Altitude: " + str(gps.altitude) + " Geoid_Sep: " + str(gps.geo_sep) + "Geoid_Alt: " + str(float(gps.altitude) + -float(gps.geo_sep)) + " Horiz_dil: " + str(gps.horizontal_dil) + " Used Sats: " + str(gps.num_sats) + " Quality: " + str(gps.gps_qual)) - print(f""" - Time: {gps.timestamp} - Longitude: {gps.lon} {gps.lon_dir} - Latitude: {gps.lat} {gps.lat_dir} - Longitude: {gps.longitude: .5f} - Latitude: {gps.latitude: .5f} - Altitude: {gps.altitude} {gps.altitude_units} - Geoid_Sep: {gps.geo_sep} {gps.geo_sep_units} - Geoid_Alt: {(float(gps.altitude) + -float(gps.geo_sep)): .1f} - Horiz_dil: {gps.horizontal_dil} - Used Sats: {gps.num_sats} - Quality: {gps.gps_qual} - Aged: {gps.age_gps_data} - RefStation:{gps.ref_station_id} - """) - if nema_type == "VTG": - print(f""" - true track: {gps.true_track} {gps.true_track_sym} - magnetic track: {gps.mag_track} {gps.mag_track_sym} - Speed over ground - knots: {gps.spd_over_grnd_kts} {gps.spd_over_grnd_kts_sym} - kilometer per hour: {gps.spd_over_grnd_kmph} {gps.spd_over_grnd_kmph_sym} - faa_mode: {gps.faa_mode} - """) + pcf8591 = PCF8591(0x48) + while 1: + + print(pcf8591.analogRead0()) #read the Variable register - time.sleep(1.0) - except Exception as ex: - logger.exception("Unhandled Exception: " + str(ex)) From ee26798baf15e2ba7295f12acc0687758d55990d Mon Sep 17 00:00:00 2001 From: elschnorro77 <57951003+elschnorro77@users.noreply.github.com> Date: Thu, 3 Feb 2022 16:39:23 +0100 Subject: [PATCH 3/4] Revert "add error handling" This reverts commit 76c47e1c784eac73af9a1294a26e9433a9cda634. --- library/pa1010d/__init__.py | 498 ++++++++++++++++++++++++++++++++---- 1 file changed, 443 insertions(+), 55 deletions(-) diff --git a/library/pa1010d/__init__.py b/library/pa1010d/__init__.py index 94cc5ed..f6e58f6 100644 --- a/library/pa1010d/__init__.py +++ b/library/pa1010d/__init__.py @@ -1,62 +1,450 @@ -#!/usr/bin/env python3 -# -*- Coding: utf-8 -*- -# from https://github.com/ShuDiamonds/PCF8591/blob/master/PCF8591.py -###### pin assign -#PCF8591 --------- Raspberry pi3 -# SDA ------------ GPIO2/SDA1 -# SCL ------------ GPIO3/SCL1 -# VCC ------------- 3.3V -# GND ------------- GND -# -###### +import time +import smbus +import pynmea2 -import wiringpi -import time +import logging +import inspect + +loggername='PA1010D' #+ inspect.getfile(inspect.currentframe()) +logger = logging.getLogger(loggername) + +__version__ = '0.0.3.1' + +PA1010D_ADDR = 0x10 + + +class PA1010D(): + __slots__ = ( + "timestamp", + "datestamp", + "datetimestamp", + "latitude", + "longitude", + "altitude", + "altitude_units", + "lat", + "lat_dir", + "lon", + "lon_dir", + "geo_sep", + "geo_sep_units", + "horizontal_dil", + "num_sats", + "gps_qual", + "status", + "faa_mode", + "true_track", + "true_track_sym", + "mag_track", + "mag_track_sym", + "spd_over_grnd_kts", + "spd_over_grnd_kts_sym", + "spd_over_grnd_kmph", + "spd_over_grnd_kmph_sym", + "speed_over_ground", + "mag_variation", + "mag_var_dir", + "mode_fix_type", + "pdop", + "hdop", + "vdop", + "age_gps_data", + "ref_station_id", + "_i2c_addr", + "_i2c", + "_debug" + ) + + + def __init__(self, i2c_addr=PA1010D_ADDR, debug=False): + self._i2c_addr = i2c_addr + self._i2c = smbus.SMBus(1) + + self._debug = debug + + self.timestamp = None + self.datetimestamp = None + self.latitude = None + self.longitude = None + self.altitude = None + self.altitude_units = None + self.num_sats = None + self.gps_qual = None + + self.status = None + self.faa_mode = None + self.true_track = None + self.true_track_sym = None + self.mag_track = None + self.mag_track_sym = None + self.spd_over_grnd_kts = None + self.spd_over_grnd_kts_sym = None + self.spd_over_grnd_kmph = None + self.spd_over_grnd_kmph_sym = None + self.horizontal_dil = None + self.lat = None + self.lat_dir = None + self.lon = None + self.lon_dir = None + self.geo_sep = None + self.geo_sep_units = None + + self.age_gps_data = None + self.ref_station_id = None + self.pdop = None + self.hdop = None + self.vdop = None + + self.speed_over_ground = None + self.mag_variation = None + self.mag_var_dir = None + self.mode_fix_type = None + + @property + def data(self): + return dict((slot, getattr(self, slot)) for slot in self.__slots__) + + def _write_sentence(self, bytestring): + """Write a sentence to the PA1010D device over i2c. + + We could- in theory- do this in one burst, but since smbus is limited to 32bytes, + we would have to chunk the message and this is already one byte chunks anyway! + + """ + for char_index in bytestring: + self._i2c.write_byte(self._i2c_addr, char_index) + + def send_command(self, command, add_checksum=True): + """Send a command string to the PA1010D. + + If add_checksum is True (the default) a NMEA checksum will automatically be computed and added. + + """ + # TODO replace with pynmea2 functionality + if command.startswith(b"$"): + command = command[1:] + if command.endswith(b"*"): + command = command[:-1] + + buf = bytearray() + buf += b'$' + buf += command + if add_checksum: + checksum = 0 + # bytes() is a real thing in Python 3 + # so `for char in commaud` iterates through char ordinals + for char in command: + checksum ^= char + buf += b'*' # Delimits checksum value + buf += "{checksum:02X}".format(checksum=checksum).encode("ascii") + buf += b'\r\n' + self._write_sentence(buf) + + def read_sentence(self, timeout=5): + """Attempt to read an NMEA sentence from the PA1010D.""" + buf = [] + timeout += time.time() + + while time.time() < timeout: + char = self._i2c.read_byte_data(self._i2c_addr, 0x00) + + if len(buf) == 0 and char != ord("$"): + continue + + buf += [char] + + # Check for end of line + # Should be a full \r\n since the GPS emits spurious newlines + if buf[-2:] == [ord("\r"), ord("\n")]: + # Remove line ending and spurious newlines from the sentence + return bytearray(buf).decode("ascii").strip().replace("\n","") + + raise TimeoutError("Timeout waiting for readline") + + def update(self, wait_for="GGA", timeout=5, waitforfix=False): + """Attempt to update from PA1010D. + + Returns true if a sentence has been successfully parsed. + + Returns false if an error has occured. + + Will wait 5 seconds for a GGA message by default. + + :param wait_for: Message type to wait for. + :param timeout: Wait timeout in seconds + :param waitforfix: Will return true only once GPS signal has fix + + """ + timeout += time.time() + + while time.time() < timeout: + try: + sentence = self.read_sentence() + except TimeoutError: + logger.debug("Timeout error") + continue + + try: + result = pynmea2.parse(sentence) + except pynmea2.nmea.ParseError: + if self._debug: + logger.debug("Parse error: {sentence}".format(sentence=sentence)) + continue + + # Time, position and fix + if type(result) == pynmea2.GGA: #$--GGA,hhmmss.ss,llll.ll,a,yyyyy.yy,a,x,xx,x.x,x.x,M,x.x,M,x.x,xxxx*hh + logger.debug("GGA: " + str(result)) + if result.gps_qual is not None: + if waitforfix and result.gps_qual > 0: + self.timestamp = result.timestamp + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.gps_qual = result.gps_qual + self.num_sats = result.num_sats + self.horizontal_dil = result.horizontal_dil + self.altitude = float(result.altitude or 0.0) + self.altitude_units = result.altitude_units + self.geo_sep = float(result.geo_sep or 0.0) + self.geo_sep_units = result.geo_sep_units + self.age_gps_data = result.age_gps_data + self.ref_station_id = result.ref_station_id + if wait_for == "GGA": + return True + elif not waitforfix: + self.timestamp = result.timestamp + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.gps_qual = result.gps_qual + self.num_sats = result.num_sats + self.horizontal_dil = result.horizontal_dil + self.altitude = float(result.altitude or 0.0) + self.altitude_units = result.altitude_units + self.geo_sep = float(result.geo_sep or 0.0) + self.geo_sep_units = result.geo_sep_units + self.age_gps_data = result.age_gps_data + self.ref_station_id = result.ref_station_id + if wait_for == "GGA": + return True + + # Geographic Lat/Lon (Loran holdover) + elif type(result) == pynmea2.GLL: #$--GLL,llll.ll,a,yyyyy.yy,a,hhmmss.ss,A*hh + logger.debug("GLL: " + str(result)) + if result.status is not None: + if waitforfix and result.status == 'A': + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.timestamp = result.timestamp + self.status = result.status + self.faa_mode = result.faa_mode + if wait_for == "GLL": + return True + if not waitforfix: + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.timestamp = result.timestamp + self.status = result.status + self.faa_mode = result.faa_mode + if wait_for == "GLL": + return True + + # GPS DOP and active satellites + elif type(result) == pynmea2.GSA: #$--GSA,a,a,x,x,x,x,x,x,x,x,x,x,x,x,x,x,x.x,x.x,x.x*hh + logger.debug("GSA: " + str(result)) + #self.mode_fix_type = result.mode_fix_type + self.pdop = result.pdop + self.hdop = result.hdop + self.vdop = result.vdop + if wait_for == "GSA": + return True + + # Position, velocity and time + elif type(result) == pynmea2.RMC: #$--RMC,hhmmss.ss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,xxxx,x.x,a*hh + logger.debug("RMC: " + str(result)) + if result.status is not None: + if waitforfix and result.status == 'A': + self.timestamp = result.timestamp + self.status = result.status + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.speed_over_ground = result.spd_over_grnd + self.mag_variation = result.mag_variation + self.mag_var_dir = result.mag_var_dir + self.datetimestamp = result.datetime + if wait_for == "RMC": + return True + elif not waitforfix: + self.timestamp = result.timestamp + self.status = result.status + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.speed_over_ground = result.spd_over_grnd + self.mag_variation = result.mag_variation + self.mag_var_dir = result.mag_var_dir + self.datetimestamp = result.datetime + if wait_for == "RMC": + return True + + + + # Track made good and speed over ground + elif type(result) == pynmea2.VTG: #$--VTG,x.x,T,x.x,M,x.x,N,x.x,K*hh + logger.debug("VTG: " + str(result)) + self.true_track = result.true_track + self.true_track_sym = result.true_track_sym + self.mag_track = result.mag_track + self.mag_track_sym = result.mag_track_sym + self.spd_over_grnd_kts = result.spd_over_grnd_kts + self.spd_over_grnd_kts_sym = result.spd_over_grnd_kts_sym + self.spd_over_grnd_kmph = result.spd_over_grnd_kmph + self.spd_over_grnd_kmph_sym = result.spd_over_grnd_kmph_sym + self.faa_mode = result.faa_mode + if wait_for == "VTG": + return True + + # SVs in view, PRN, elevation, azimuth and SNR + elif type(result) == pynmea2.GSV: #$--GSV,x,x,x,x,x,x,x,...*hh + logger.debug("GSV: " + str(result)) + if wait_for == "GSV": + return True + + + # ProprietarySentence handles boot up output such as "$PMTK011,MTKGPS*08" + elif type(result) == pynmea2.ProprietarySentence: + # TODO If we implement sending commands *to* the GPS, + # they should not be permitted until after receiving this sequence + # $PMTK011,MTKGPS*08 Successful bootup + # $PMTK010,001*2E Startup + # $PMTK010,002*2D Wake from standby, normal operation + logger.debug("ProprietarySentence:" + str(result)) + if wait_for == "": + return True + + else: + # If native MTK support exists, check for those message types + # requires merge and release of: https://github.com/Knio/pynmea2/pull/111 + # TODO Drop this special case when #111 is merged & released + try: + if type(result) in ( + pynmea2.types.proprietary.mtk.MTK011, + pynmea2.types.proprietary.mtk.MTK010 + ): + print(sentence) + if wait_for == "": + return True + except AttributeError: + pass + raise RuntimeError("Unsupported message type {type} ({sentence})".format(type=type(result), sentence=sentence)) + + raise TimeoutError("Timeout waiting for {wait_for} message.".format(wait_for=wait_for)) -class PCF8591: - def __init__(self, addr): - wiringpi.wiringPiSetup() #setup wiringpi - - self.i2c = wiringpi.I2C() #get I2C - self.dev = self.i2c.setup(addr) #setup I2C device - - def LED_ON(self): - self.i2c.writeReg8(self.dev, 0x40, 0xFF) - - def LED_OFF(self): - self.i2c.writeReg8(self.dev, 0x40, 0x00) - - def DAoutput(self,value): - self.i2c.writeReg8(self.dev, 0x40, value) - - def analogRead0(self): - self.i2c.writeReg8(self.dev, 0x48,0x40) - self.i2c.readReg8(self.dev,0x48) #read dummy - return self.i2c.readReg8(self.dev,0x48) - - def analogRead1(self): - self.i2c.writeReg8(self.dev, 0x48,0x41) - self.i2c.readReg8(self.dev,0x48) #read dummy - return self.i2c.readReg8(self.dev,0x48) - - def analogRead2(self): - self.i2c.writeReg8(self.dev, 0x48,0x42) - self.i2c.readReg8(self.dev,0x48) #read dummy - return self.i2c.readReg8(self.dev,0x48) - def analogRead3(self): - self.i2c.writeReg8(self.dev, 0x48,0x43) - self.i2c.readReg8(self.dev,0x48) #read dummy - return self.i2c.readReg8(self.dev,0x48) - def analogRead(self,pin): - self.i2c.writeReg8(self.dev, 0x48,0x40+pin) - self.i2c.readReg8(self.dev,0x48) #read dummy - return self.i2c.readReg8(self.dev,0x48) if __name__ == "__main__": - pcf8591 = PCF8591(0x48) - while 1: - - print(pcf8591.analogRead0()) #read the Variable register + logging.basicConfig(level=logging.DEBUG) + gps = PA1010D() + #request firmware version + #gps.send_command(b'PMTK605') + + timeout=45 + waitforfix=False + + nema_type="GGA" + + These are NMEA extensions for PMTK_314_SET_NMEA_OUTPUT + # Turn off everything: + gps.send_command(b'PMTK314,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + + if nema_type=="GGA" or nema_type=="RMC" or nema_type=="VTG": + # Turn on the basic GGA, RMC and VTG info (what you typically want) + gps.send_command(b'PMTK314,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GLL only): + #gps.send_command(b'PMTK314,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (RMC only): + # gps.send_command(b'PMTK314,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (VTG only): + #gps.send_command(b'PMTK314,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GGA only): + # gps.send_command(b'PMTK314,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GSA only): + #gps.send_command(b'PMTK314,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + #Turn on just minimum info (GSV only): + #gps.send_command(b'PMTK314,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0') + #Turn on everything + #gps.send_command(b'PMTK314,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0') + + result=False + try: + while True: + try: + result = gps.update(nema_type, timeout, waitforfix) + except TimeoutError: + continue + + if result: + if nema_type == "RMC": + #logger.debug("Time: " + gps.timestamp.strftime("%H:%M:%S") + " DateTime: " + gps.datetimestamp.strftime("%d/%m/%Y %H:%M:%S") + " Status: " + gps.status + " Longitude: " + str(gps.longitude) + "long dir: " + gps.lon_dir + " Latitude: " + str(gps.latitude) + " latitude dir "+ gps.lat_dir + "Altitude: " + str(gps.altitude) + "Geoid_Sep: " + gps.geo_sep ) + #+ "Geoid_Alt: " + str(float(gps.altitude)) + str(float(gps.geo_sep))) + print(f""" + Time: {gps.timestamp} + DateTime: {gps.datetimestamp} + Status: {gps.status} + Longitude: {gps.lon} {gps.lon_dir} + Latitude: {gps.lat} {gps.lat_dir} + Longitude: {gps.longitude: .5f} + Latitude: {gps.latitude: .5f} + """) + if nema_type == "GGA": + #logger.debug("Time: " + gps.timestamp.strftime("%H:%M:%S") + " Longitude: " + str(gps.longitude) + "long dir: " + gps.lon_dir + " Latitude: " + str(gps.latitude) + " latitude dir "+ gps.lat_dir + " Altitude: " + str(gps.altitude) + " Geoid_Sep: " + str(gps.geo_sep) + "Geoid_Alt: " + str(float(gps.altitude) + -float(gps.geo_sep)) + " Horiz_dil: " + str(gps.horizontal_dil) + " Used Sats: " + str(gps.num_sats) + " Quality: " + str(gps.gps_qual)) + print(f""" + Time: {gps.timestamp} + Longitude: {gps.lon} {gps.lon_dir} + Latitude: {gps.lat} {gps.lat_dir} + Longitude: {gps.longitude: .5f} + Latitude: {gps.latitude: .5f} + Altitude: {gps.altitude} {gps.altitude_units} + Geoid_Sep: {gps.geo_sep} {gps.geo_sep_units} + Geoid_Alt: {(float(gps.altitude) + -float(gps.geo_sep)): .1f} + Horiz_dil: {gps.horizontal_dil} + Used Sats: {gps.num_sats} + Quality: {gps.gps_qual} + Aged: {gps.age_gps_data} + RefStation:{gps.ref_station_id} + """) + if nema_type == "VTG": + print(f""" + true track: {gps.true_track} {gps.true_track_sym} + magnetic track: {gps.mag_track} {gps.mag_track_sym} + Speed over ground + knots: {gps.spd_over_grnd_kts} {gps.spd_over_grnd_kts_sym} + kilometer per hour: {gps.spd_over_grnd_kmph} {gps.spd_over_grnd_kmph_sym} + faa_mode: {gps.faa_mode} + """) + time.sleep(1.0) + except Exception as ex: + logger.exception("Unhandled Exception: " + str(ex)) From 1000a90a1dcd0857515cea1e281ae969db776179 Mon Sep 17 00:00:00 2001 From: elschnorro77 <57951003+elschnorro77@users.noreply.github.com> Date: Thu, 3 Feb 2022 16:41:56 +0100 Subject: [PATCH 4/4] add error handling --- library/pa1010d/__init__.py | 536 +++++++++++++++++++----------------- 1 file changed, 284 insertions(+), 252 deletions(-) diff --git a/library/pa1010d/__init__.py b/library/pa1010d/__init__.py index f6e58f6..9b53bb1 100644 --- a/library/pa1010d/__init__.py +++ b/library/pa1010d/__init__.py @@ -72,7 +72,6 @@ def __init__(self, i2c_addr=PA1010D_ADDR, debug=False): self.altitude_units = None self.num_sats = None self.gps_qual = None - self.status = None self.faa_mode = None self.true_track = None @@ -90,7 +89,6 @@ def __init__(self, i2c_addr=PA1010D_ADDR, debug=False): self.lon_dir = None self.geo_sep = None self.geo_sep_units = None - self.age_gps_data = None self.ref_station_id = None self.pdop = None @@ -103,6 +101,7 @@ def __init__(self, i2c_addr=PA1010D_ADDR, debug=False): self.mode_fix_type = None @property + def data(self): return dict((slot, getattr(self, slot)) for slot in self.__slots__) @@ -113,8 +112,13 @@ def _write_sentence(self, bytestring): we would have to chunk the message and this is already one byte chunks anyway! """ - for char_index in bytestring: - self._i2c.write_byte(self._i2c_addr, char_index) + try: + for char_index in bytestring: + self._i2c.write_byte(self._i2c_addr, char_index) + except IOError as ex: + raise IOError(str(ex)) + except Exception as ex: + logger.exception("Unhandled Exception :" + str(ex)) def send_command(self, command, add_checksum=True): """Send a command string to the PA1010D. @@ -122,46 +126,58 @@ def send_command(self, command, add_checksum=True): If add_checksum is True (the default) a NMEA checksum will automatically be computed and added. """ + try: # TODO replace with pynmea2 functionality - if command.startswith(b"$"): - command = command[1:] - if command.endswith(b"*"): - command = command[:-1] - - buf = bytearray() - buf += b'$' - buf += command - if add_checksum: - checksum = 0 - # bytes() is a real thing in Python 3 - # so `for char in commaud` iterates through char ordinals - for char in command: - checksum ^= char - buf += b'*' # Delimits checksum value - buf += "{checksum:02X}".format(checksum=checksum).encode("ascii") - buf += b'\r\n' - self._write_sentence(buf) + if command.startswith(b"$"): + command = command[1:] + if command.endswith(b"*"): + command = command[:-1] + + buf = bytearray() + buf += b'$' + buf += command + if add_checksum: + checksum = 0 + # bytes() is a real thing in Python 3 + # so `for char in commaud` iterates through char ordinals + for char in command: + checksum ^= char + buf += b'*' # Delimits checksum value + buf += "{checksum:02X}".format(checksum=checksum).encode("ascii") + buf += b'\r\n' + self._write_sentence(buf) + except IOError as ex: + raise IOError(str(ex)) + except Exception as ex: + logger.exception("Unhandled Exception :" + str(ex)) def read_sentence(self, timeout=5): """Attempt to read an NMEA sentence from the PA1010D.""" - buf = [] - timeout += time.time() + try: + buf = [] + timeout += time.time() - while time.time() < timeout: - char = self._i2c.read_byte_data(self._i2c_addr, 0x00) + while time.time() < timeout: + char = self._i2c.read_byte_data(self._i2c_addr, 0x00) - if len(buf) == 0 and char != ord("$"): - continue + if len(buf) == 0 and char != ord("$"): + continue - buf += [char] + buf += [char] - # Check for end of line - # Should be a full \r\n since the GPS emits spurious newlines - if buf[-2:] == [ord("\r"), ord("\n")]: - # Remove line ending and spurious newlines from the sentence - return bytearray(buf).decode("ascii").strip().replace("\n","") + # Check for end of line + # Should be a full \r\n since the GPS emits spurious newlines + if buf[-2:] == [ord("\r"), ord("\n")]: + # Remove line ending and spurious newlines from the sentence + return bytearray(buf).decode("ascii").strip().replace("\n","") + + raise TimeoutError("Timeout waiting for readline") + + except IOError as ex: + raise IOError(str(ex)) + except Exception as ex: + logger.exception("Unhandled Exception :" + str(ex)) - raise TimeoutError("Timeout waiting for readline") def update(self, wait_for="GGA", timeout=5, waitforfix=False): """Attempt to update from PA1010D. @@ -177,234 +193,246 @@ def update(self, wait_for="GGA", timeout=5, waitforfix=False): :param waitforfix: Will return true only once GPS signal has fix """ - timeout += time.time() - - while time.time() < timeout: - try: - sentence = self.read_sentence() - except TimeoutError: - logger.debug("Timeout error") - continue + try: + timeout += time.time() - try: - result = pynmea2.parse(sentence) - except pynmea2.nmea.ParseError: - if self._debug: - logger.debug("Parse error: {sentence}".format(sentence=sentence)) - continue - - # Time, position and fix - if type(result) == pynmea2.GGA: #$--GGA,hhmmss.ss,llll.ll,a,yyyyy.yy,a,x,xx,x.x,x.x,M,x.x,M,x.x,xxxx*hh - logger.debug("GGA: " + str(result)) - if result.gps_qual is not None: - if waitforfix and result.gps_qual > 0: - self.timestamp = result.timestamp - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.gps_qual = result.gps_qual - self.num_sats = result.num_sats - self.horizontal_dil = result.horizontal_dil - self.altitude = float(result.altitude or 0.0) - self.altitude_units = result.altitude_units - self.geo_sep = float(result.geo_sep or 0.0) - self.geo_sep_units = result.geo_sep_units - self.age_gps_data = result.age_gps_data - self.ref_station_id = result.ref_station_id - if wait_for == "GGA": - return True - elif not waitforfix: - self.timestamp = result.timestamp - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.gps_qual = result.gps_qual - self.num_sats = result.num_sats - self.horizontal_dil = result.horizontal_dil - self.altitude = float(result.altitude or 0.0) - self.altitude_units = result.altitude_units - self.geo_sep = float(result.geo_sep or 0.0) - self.geo_sep_units = result.geo_sep_units - self.age_gps_data = result.age_gps_data - self.ref_station_id = result.ref_station_id - if wait_for == "GGA": - return True - - # Geographic Lat/Lon (Loran holdover) - elif type(result) == pynmea2.GLL: #$--GLL,llll.ll,a,yyyyy.yy,a,hhmmss.ss,A*hh - logger.debug("GLL: " + str(result)) - if result.status is not None: - if waitforfix and result.status == 'A': - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.timestamp = result.timestamp - self.status = result.status - self.faa_mode = result.faa_mode - if wait_for == "GLL": - return True - if not waitforfix: - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.timestamp = result.timestamp - self.status = result.status - self.faa_mode = result.faa_mode - if wait_for == "GLL": - return True - - # GPS DOP and active satellites - elif type(result) == pynmea2.GSA: #$--GSA,a,a,x,x,x,x,x,x,x,x,x,x,x,x,x,x,x.x,x.x,x.x*hh - logger.debug("GSA: " + str(result)) - #self.mode_fix_type = result.mode_fix_type - self.pdop = result.pdop - self.hdop = result.hdop - self.vdop = result.vdop - if wait_for == "GSA": - return True - - # Position, velocity and time - elif type(result) == pynmea2.RMC: #$--RMC,hhmmss.ss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,xxxx,x.x,a*hh - logger.debug("RMC: " + str(result)) - if result.status is not None: - if waitforfix and result.status == 'A': - self.timestamp = result.timestamp - self.status = result.status - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.speed_over_ground = result.spd_over_grnd - self.mag_variation = result.mag_variation - self.mag_var_dir = result.mag_var_dir - self.datetimestamp = result.datetime - if wait_for == "RMC": - return True - elif not waitforfix: - self.timestamp = result.timestamp - self.status = result.status - self.latitude = result.latitude - self.longitude = result.longitude - self.lat = result.lat - self.lat_dir = result.lat_dir - self.lon = result.lon - self.lon_dir = result.lon_dir - self.speed_over_ground = result.spd_over_grnd - self.mag_variation = result.mag_variation - self.mag_var_dir = result.mag_var_dir - self.datetimestamp = result.datetime - if wait_for == "RMC": - return True - - - - # Track made good and speed over ground - elif type(result) == pynmea2.VTG: #$--VTG,x.x,T,x.x,M,x.x,N,x.x,K*hh - logger.debug("VTG: " + str(result)) - self.true_track = result.true_track - self.true_track_sym = result.true_track_sym - self.mag_track = result.mag_track - self.mag_track_sym = result.mag_track_sym - self.spd_over_grnd_kts = result.spd_over_grnd_kts - self.spd_over_grnd_kts_sym = result.spd_over_grnd_kts_sym - self.spd_over_grnd_kmph = result.spd_over_grnd_kmph - self.spd_over_grnd_kmph_sym = result.spd_over_grnd_kmph_sym - self.faa_mode = result.faa_mode - if wait_for == "VTG": - return True - - # SVs in view, PRN, elevation, azimuth and SNR - elif type(result) == pynmea2.GSV: #$--GSV,x,x,x,x,x,x,x,...*hh - logger.debug("GSV: " + str(result)) - if wait_for == "GSV": - return True - - - # ProprietarySentence handles boot up output such as "$PMTK011,MTKGPS*08" - elif type(result) == pynmea2.ProprietarySentence: - # TODO If we implement sending commands *to* the GPS, - # they should not be permitted until after receiving this sequence - # $PMTK011,MTKGPS*08 Successful bootup - # $PMTK010,001*2E Startup - # $PMTK010,002*2D Wake from standby, normal operation - logger.debug("ProprietarySentence:" + str(result)) - if wait_for == "": - return True - - else: - # If native MTK support exists, check for those message types - # requires merge and release of: https://github.com/Knio/pynmea2/pull/111 - # TODO Drop this special case when #111 is merged & released + while time.time() < timeout: try: - if type(result) in ( - pynmea2.types.proprietary.mtk.MTK011, - pynmea2.types.proprietary.mtk.MTK010 - ): - print(sentence) - if wait_for == "": - return True - except AttributeError: - pass - raise RuntimeError("Unsupported message type {type} ({sentence})".format(type=type(result), sentence=sentence)) + sentence = self.read_sentence() + except TimeoutError: + logger.debug("Timeout error") + continue - raise TimeoutError("Timeout waiting for {wait_for} message.".format(wait_for=wait_for)) + try: + result = pynmea2.parse(sentence) + except pynmea2.nmea.ParseError: + if self._debug: + logger.debug("Parse error: {sentence}".format(sentence=sentence)) + continue + + # Time, position and fix + if type(result) == pynmea2.GGA: #$--GGA,hhmmss.ss,llll.ll,a,yyyyy.yy,a,x,xx,x.x,x.x,M,x.x,M,x.x,xxxx*hh + logger.debug("GGA: " + str(result)) + if result.gps_qual is not None: + if waitforfix and result.gps_qual > 0: + self.timestamp = result.timestamp + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.gps_qual = result.gps_qual + self.num_sats = result.num_sats + self.horizontal_dil = result.horizontal_dil + self.altitude = float(result.altitude or 0.0) + self.altitude_units = result.altitude_units + self.geo_sep = float(result.geo_sep or 0.0) + self.geo_sep_units = result.geo_sep_units + self.age_gps_data = result.age_gps_data + self.ref_station_id = result.ref_station_id + if wait_for == "GGA": + return True + elif not waitforfix: + self.timestamp = result.timestamp + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.gps_qual = result.gps_qual + self.num_sats = result.num_sats + self.horizontal_dil = result.horizontal_dil + self.altitude = float(result.altitude or 0.0) + self.altitude_units = result.altitude_units + self.geo_sep = float(result.geo_sep or 0.0) + self.geo_sep_units = result.geo_sep_units + self.age_gps_data = result.age_gps_data + self.ref_station_id = result.ref_station_id + if wait_for == "GGA": + return True + + # Geographic Lat/Lon (Loran holdover) + elif type(result) == pynmea2.GLL: #$--GLL,llll.ll,a,yyyyy.yy,a,hhmmss.ss,A*hh + logger.debug("GLL: " + str(result)) + if result.status is not None: + if waitforfix and result.status == 'A': + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.timestamp = result.timestamp + self.status = result.status + self.faa_mode = result.faa_mode + if wait_for == "GLL": + return True + if not waitforfix: + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.timestamp = result.timestamp + self.status = result.status + self.faa_mode = result.faa_mode + if wait_for == "GLL": + return True + + # GPS DOP and active satellites + elif type(result) == pynmea2.GSA: #$--GSA,a,a,x,x,x,x,x,x,x,x,x,x,x,x,x,x,x.x,x.x,x.x*hh + logger.debug("GSA: " + str(result)) + #self.mode_fix_type = result.mode_fix_type + self.pdop = result.pdop + self.hdop = result.hdop + self.vdop = result.vdop + if wait_for == "GSA": + return True + + # Position, velocity and time + elif type(result) == pynmea2.RMC: #$--RMC,hhmmss.ss,A,llll.ll,a,yyyyy.yy,a,x.x,x.x,xxxx,x.x,a*hh + logger.debug("RMC: " + str(result)) + if result.status is not None: + if waitforfix and result.status == 'A': + self.timestamp = result.timestamp + self.status = result.status + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.speed_over_ground = result.spd_over_grnd + self.mag_variation = result.mag_variation + self.mag_var_dir = result.mag_var_dir + self.datetimestamp = result.datetime + if wait_for == "RMC": + return True + elif not waitforfix: + self.timestamp = result.timestamp + self.status = result.status + self.latitude = result.latitude + self.longitude = result.longitude + self.lat = result.lat + self.lat_dir = result.lat_dir + self.lon = result.lon + self.lon_dir = result.lon_dir + self.speed_over_ground = result.spd_over_grnd + self.mag_variation = result.mag_variation + self.mag_var_dir = result.mag_var_dir + self.datetimestamp = result.datetime + if wait_for == "RMC": + return True + + + + # Track made good and speed over ground + elif type(result) == pynmea2.VTG: #$--VTG,x.x,T,x.x,M,x.x,N,x.x,K*hh + logger.debug("VTG: " + str(result)) + self.true_track = result.true_track + self.true_track_sym = result.true_track_sym + self.mag_track = result.mag_track + self.mag_track_sym = result.mag_track_sym + self.spd_over_grnd_kts = result.spd_over_grnd_kts + self.spd_over_grnd_kts_sym = result.spd_over_grnd_kts_sym + self.spd_over_grnd_kmph = result.spd_over_grnd_kmph + self.spd_over_grnd_kmph_sym = result.spd_over_grnd_kmph_sym + self.faa_mode = result.faa_mode + if wait_for == "VTG": + return True + + # SVs in view, PRN, elevation, azimuth and SNR + elif type(result) == pynmea2.GSV: #$--GSV,x,x,x,x,x,x,x,...*hh + logger.debug("GSV: " + str(result)) + if wait_for == "GSV": + return True + + + # ProprietarySentence handles boot up output such as "$PMTK011,MTKGPS*08" + elif type(result) == pynmea2.ProprietarySentence: + # TODO If we implement sending commands *to* the GPS, + # they should not be permitted until after receiving this sequence + # $PMTK011,MTKGPS*08 Successful bootup + # $PMTK010,001*2E Startup + # $PMTK010,002*2D Wake from standby, normal operation + logger.debug("ProprietarySentence:" + str(result)) + if wait_for == "": + return True + + else: + # If native MTK support exists, check for those message types + # requires merge and release of: https://github.com/Knio/pynmea2/pull/111 + # TODO Drop this special case when #111 is merged & released + try: + if type(result) in ( + pynmea2.types.proprietary.mtk.MTK011, + pynmea2.types.proprietary.mtk.MTK010 + ): + print(sentence) + if wait_for == "": + return True + except AttributeError: + pass + raise RuntimeError("Unsupported message type {type} ({sentence})".format(type=type(result), sentence=sentence)) + + raise TimeoutError("Timeout waiting for {wait_for} message.".format(wait_for=wait_for)) + except TimeoutError as ex: + raise TimeoutError(str(ex)) + except IOError as ex: + raise IOError(str(ex)) + except Exception as ex: + logger.exception("Unhandled Exception :" + str(ex)) if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - gps = PA1010D() - #request firmware version - #gps.send_command(b'PMTK605') - - timeout=45 - waitforfix=False - - nema_type="GGA" - - These are NMEA extensions for PMTK_314_SET_NMEA_OUTPUT - # Turn off everything: - gps.send_command(b'PMTK314,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - - if nema_type=="GGA" or nema_type=="RMC" or nema_type=="VTG": - # Turn on the basic GGA, RMC and VTG info (what you typically want) - gps.send_command(b'PMTK314,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (GLL only): - #gps.send_command(b'PMTK314,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (RMC only): - # gps.send_command(b'PMTK314,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (VTG only): - #gps.send_command(b'PMTK314,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (GGA only): - # gps.send_command(b'PMTK314,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - # Turn on just minimum info (GSA only): - #gps.send_command(b'PMTK314,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0') - #Turn on just minimum info (GSV only): - #gps.send_command(b'PMTK314,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0') - #Turn on everything - #gps.send_command(b'PMTK314,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0') - - result=False try: + logging.basicConfig(level=logging.DEBUG) + gps = PA1010D() + # request firmware version + gps.send_command(b'PMTK605') + + timeout=1 + waitforfix=True + + nema_type="GGA" + + + # These are NMEA extensions for PMTK_314_SET_NMEA_OUTPUT + # Turn off everything: + gps.send_command(b'PMTK314,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + + if nema_type=="GGA" or nema_type=="RMC" or nema_type=="VTG": + # Turn on the basic GGA, RMC and VTG info (what you typically want) + gps.send_command(b'PMTK314,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GLL only): + #gps.send_command(b'PMTK314,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (RMC only): + # gps.send_command(b'PMTK314,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (VTG only): + #gps.send_command(b'PMTK314,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GGA only): + # gps.send_command(b'PMTK314,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + # Turn on just minimum info (GSA only): + #gps.send_command(b'PMTK314,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0') + #Turn on just minimum info (GSV only): + #gps.send_command(b'PMTK314,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0') + #Turn on everything + #gps.send_command(b'PMTK314,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0') + + + result=False while True: try: result = gps.update(nema_type, timeout, waitforfix) - except TimeoutError: + except TimeoutError as ex: + logger.debug(f"{ex}") continue - + except IOError as ex: + logger.error("Could not access PA1010D on I2C bus") + break if result: if nema_type == "RMC": #logger.debug("Time: " + gps.timestamp.strftime("%H:%M:%S") + " DateTime: " + gps.datetimestamp.strftime("%d/%m/%Y %H:%M:%S") + " Status: " + gps.status + " Longitude: " + str(gps.longitude) + "long dir: " + gps.lon_dir + " Latitude: " + str(gps.latitude) + " latitude dir "+ gps.lat_dir + "Altitude: " + str(gps.altitude) + "Geoid_Sep: " + gps.geo_sep ) @@ -446,5 +474,9 @@ def update(self, wait_for="GGA", timeout=5, waitforfix=False): """) time.sleep(1.0) + except IOError as ex: + logger.error("Could not access PA1010D on I2C bus") + except (KeyboardInterrupt, SystemExit): + exit except Exception as ex: logger.exception("Unhandled Exception: " + str(ex))