-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient_UDP.py
54 lines (44 loc) · 1.54 KB
/
client_UDP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
########## Importazione moduli ##########
import configparser
import socket
import sys
########## Parametri ##########
config = configparser.ConfigParser()
config.read('config.ini')
port = config['RASPBERRY']['port']
addr_server = config['RASPBERRY']['address'] # Cambiare con l'indirizzo IPv4 del proprio gateway (RB/PC)
server = (addr_server, port)
########## Oggetto per il test ##########
sensor_string = '''{
"measurement":
{
"temperature": "12.5 C",
"umidity": "63 %"
}
}'''
########## Gestione del Client TCP ##########
def client_send(s, server):
while True:
cmd = input("-> s = invia pacchetto al Server.\n-> q = chiudi il socket.\n\t-> ")
if cmd == "q":
print("Sto chiudendo il socket...")
s.close()
print(f"Socket chiuso.\n")
sys.exit()
elif cmd == "s":
s.sendto(sensor_string.encode(), server)
print("Inviato pacchetto al Server.\n")
else:
print("Il comando digitato è invalido.\n")
########## Creazione & Connessione Client UDP ##########
def new_client_connect(server):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(f"Connessessione al Server {addr_server} effettuata.\n")
except socket.error as err:
print(f"Qualcosa è andato storto, sto uscendo...\n{err}.\n")
sys.exit()
client_send(s, server)
########## Programma ##########
if __name__ == '__main__':
new_client_connect(server)