-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
102 lines (83 loc) · 3.42 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import machine
import utime as time
import MQ2_data
import ubinascii
import ujson
import network
from umqtt.simple import MQTTClient
global client_id
def connect_and_subscribe(mqtt_server, user, password):
client_id = ubinascii.hexlify(machine.unique_id())
client = MQTTClient(client_id, server=mqtt_server, user=user, password=password,port=1883,keepalive=30)
client.connect()
print(
"Connected to %s MQTT broker, subscribed to %s topic"
% (settings["mqtt_server"], settings["mqtt_topic"])
)
return client
def mqttrestart():
print("Broker connection failure.")
machine.reset()
def get_data(MQ2, Ro):
CH4 = MQ2_data.MQGetGasPercentage(MQ2_data.MQRead(MQ2) / Ro, MQ2_data.GAS_CH4)
Propane = MQ2_data.MQGetGasPercentage(
MQ2_data.MQRead(MQ2) / Ro, MQ2_data.GAS_PROPANE
)
Smoke = MQ2_data.MQGetGasPercentage(MQ2_data.MQRead(MQ2) / Ro, MQ2_data.GAS_SMOKE)
CO = MQ2_data.MQGetGasPercentage(MQ2_data.MQRead(MQ2) / Ro, MQ2_data.GAS_CO)
LPG = MQ2_data.MQGetGasPercentage(MQ2_data.MQRead(MQ2) / Ro, MQ2_data.GAS_LPG)
message = {"CH4": CH4, "Propane": Propane, "Smoke": Smoke, "CO": CO, "LPG": LPG}
return message
if __name__ == "__main__":
try:
with open("settings.json", "r") as file:
settings = file.read()
settings = ujson.loads(settings)
print("Initiating...")
print("Calibrating...")
mq2pinSignal = 34
MQ2 = machine.ADC(machine.Pin(mq2pinSignal)) # define the analog input by sensor
MQ2.atten(machine.ADC.ATTN_11DB)
Ro = MQ2_data.MQCalibration(MQ2)
print("Calibration is completed...")
print("Ro={} kohm".format(Ro))
station = network.WLAN(network.STA_IF)
station.active(True)
if not station.isconnected():
station.connect(settings["ssid"], settings["password_wlan"]) # Connect to an AP
while not station.isconnected():
time.sleep(0.1)
print("Connected to the wlan network")
print(station.ifconfig())
try:
print("Connecting to mqttserver")
client = connect_and_subscribe(
settings["mqtt_server"],
settings["mqtt_user"],
settings["mqtt_password"],
)
except Exception as e:
print("An error has occurred: {}".format(e))
print("Connection successful")
while True:
message = get_data(MQ2, Ro)
message['ID'] = settings["id_device"]
message = ujson.dumps(message)
print(message)
try:
client.publish(settings["mqtt_topic"], message.encode("utf-8"))
except:
try:
print("Connecting to mqttserver")
client = connect_and_subscribe(settings["mqtt_server"],
settings["mqtt_user"],
settings["mqtt_password"]
)
except Exception as e:
print("An error has occurred: {}".format(e))
mqttrestart()
pass
time.sleep(5)
except KeyboardInterrupt as e:
print(e)
print("Flashmode...")