-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathSerial_monitor.py
105 lines (81 loc) · 2.97 KB
/
Serial_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/python3.7
#-------------------------------------------------------------------------------
# Name: serial_monitor.py
# Purpose: Serial port monitoring on a RaspberryPi
#
# Author: paulv
#
# Created: 20-07-2018
# Copyright: (c) paulv 2018 2019
# Licence: <your licence>
#-------------------------------------------------------------------------------
# sudo apt-get install python3-serial
import serial
import logging
import logging.handlers
import sys
import os
import traceback
# To enable the serial port on the GPIO connector, use raspi-config or:
# sudo nano /boot/config.txt
# enable_uart=1
# reboot
DEBUG = False
port = "/dev/ttyAMA0"
serialPort = serial.Serial(port, baudrate=9600, timeout=10.0)
# data path is on a USB stick to protect the SD card
data_path = "/mnt/usb/"
# -- Logger definitions
LOG_FILENAME = data_path+"ocxo.log"
LOG_LEVEL = logging.INFO # Could be e.g. "INFO", "DEBUG", "ERROR" or "WARNING"
class MyLogger(object):
'''
Replace stdout and stderr with logging to a file so we can run this script
even as a daemon and still capture all the stdout and stderr messages in the log.
'''
def __init__(self, logger, level):
"""Needs a logger and a logger level."""
self.logger = logger
self.level = level
def write(self, message):
# Only log if there is a message (not just a new line)
# typical for serial data with a cr/lf ending
if message.rstrip() != "":
self.logger.log(self.level, message.rstrip())
def init():
global logger, handler
if DEBUG:print ("Setting up the logger functionality")
logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
handler = logging.handlers.TimedRotatingFileHandler(LOG_FILENAME, when="midnight", backupCount=31)
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# pipe the stdout and stderr messages to the logger
sys.stdout = MyLogger(logger, logging.INFO)
sys.stderr = MyLogger(logger, logging.ERROR)
def main():
if DEBUG:print("Serial logger")
init()
if DEBUG:print("Opened port", port, "for serial tracing")
try:
while True:
while (serialPort.inWaiting()>0):
try:
ser_input = serialPort.readline().decode('utf-8')
print(ser_input)
except (OSError, serial.serialutil.SerialException):
pass
if DEBUG : print("No data available")
except UnicodeDecodeError:
pass
if DEBUG: print("decode error")
except KeyboardInterrupt: # Ctrl-C
print("\nCtrl-C - Terminated")
os._exit(1)
except Exception as e:
sys.stderr.write("Got exception: %s" % (e))
print(traceback.format_exc())
os._exit(1)
if __name__ == '__main__':
main()