-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgateway_TCP.py
106 lines (86 loc) · 2.93 KB
/
gateway_TCP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
########## Importazione moduli ##########
import paho.mqtt.client as mqtt
import socket
import threading
########## Parametri ##########
port = 20000
addr = "192.168.1.57" # Cambiare con l'indirizzo IPv4 del proprio gateway (RB/PC)
gateway = (addr, port)
addr_broker = "10.108.230.1"
topic = "test/topic"
########## Definizione coda FIFO ##########
class Queue:
def __init__(self):
self.items = []
self.elem = 0
self.head = None
def notEmpty(self):
return self.elem != 0
def inQueue(self, item):
self.items.insert(0,item)
if(self.elem == 0):
self.head = item
self.elem += 1
def outQueue(self):
if(self.elem == 1):
self.head = None
self.elem -= 1
return self.items.pop()
########## on_connect() ##########
def on_connect(pub, userdata, flags, rc):
print(f"Risultato connessione al broker con indirizzo IPv4 {addr_broker} (0 -> successo): [{rc}]\n")
def on_disconnect(pub, userdata, rc):
if(rc != 0):
print(f"Disconnessione inaspettata: codice [{rc}].\n")
else:
print(f"Disconnessione dal broker con indirizzo IPv4 {addr_broker}.\n")
########## MQTT Publishing ##########
def queue2broker(pub, q):
pub.loop_start()
while True:
if(q.notEmpty()):
data = q.outQueue()
pub.publish(topic, data)
pub.loop_stop()
########## Ricezione da Client TCP ##########
def client2queue(s_client, addr_client, q):
while True:
msg = s_client.recv(4096)
if(msg == b''):
print(f"Il Client {addr_client} si è disconnesso.\nChiudo il socket: {s_client}.\n")
s_client.close()
break
data = f"Client: {addr_client}" + "\n" + msg.decode()
q.inQueue(data)
########## Accettazione Client TCP ##########
def gateway_threads(s, pub, q):
thread1 = threading.Thread(target = queue2broker, args = (pub, q))
thread1.start()
while True:
s_client, addr_client = s.accept()
print(f"Connessione con Client TCP {addr_client} stabilita.\n")
thread2 = threading.Thread(target = client2queue, args = (s_client, addr_client, q))
thread2.start()
########## Creazione Gateway ##########
def initialize(gateway):
# Server TCP
try:
s = socket.socket()
s.bind(gateway)
s.listen(0)
print("Server TCP inizializzato. In ascolto...\n")
except socket.error as err:
print(f"Qualcosa è andato storto...\n{err}.\n")
print("Sto tentando di reinizializzare il Server TCP...\n")
initialize(gateway)
# Publisher MQTT
pub = mqtt.Client(client_id="", clean_session=True, userdata=None)
pub.on_connect = on_connect
pub.on_disconnect = on_disconnect
pub.connect(addr_broker, 1883, 10)
# Avvio Gateway
q = Queue()
gateway_threads(s, pub, q)
########## Programma ##########
if __name__ == '__main__':
initialize(gateway)