Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/jokob-sk/NetAlertX
Browse files Browse the repository at this point in the history
  • Loading branch information
jokob-sk committed Sep 27, 2024
2 parents 7308797 + 6e36f7d commit e8f3530
Showing 1 changed file with 44 additions and 8 deletions.
52 changes: 44 additions & 8 deletions front/plugins/_publisher_mqtt/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,29 +273,54 @@ def create_sensor(mqtt_client, deviceId, deviceName, sensorType, sensorName, ico
#-------------------------------------------------------------------------------
def mqtt_create_client():

# attempt reconnections on failure, ref https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60

mytransport = 'tcp' # or 'websockets'

def on_disconnect(mqtt_client, userdata, reason_code):
def on_disconnect(mqtt_client, userdata, rc):

global mqtt_connected_to_broker

# REF: If we wanted a auto reconnect, a good source is here: https://www.emqx.com/en/blog/how-to-use-mqtt-in-python
mylog('debug', [f"[{pluginName}] Connection terminated, reason_code: {rc}"])
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
mylog('debug', [f"[{pluginName}] Reconnecting in {reconnect_delay} seconds..."])
time.sleep(reconnect_delay)

try:
mqtt_client.reconnect()
mqtt_connected_to_broker = True # Signal connection
mylog('debug', [f"[{pluginName}] Reconnected successfully"])
return
except Exception as err:
mylog('debug', [f"[{pluginName}] {err} Reconnect failed. Retrying..."])
pass

reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1

mqtt_connected_to_broker = False
mylog('debug', [f"[{pluginName}] Connection terminated, reason_code: {reason_code}"])


def on_connect(mqtt_client, userdata, flags, reason_code, properties):
def on_connect(mqtt_client, userdata, flags, rc, properties):

global mqtt_connected_to_broker

# REF: Good docu on reason codes: https://www.emqx.com/en/blog/mqtt5-new-features-reason-code-and-ack
if reason_code == 0:
if rc == 0:
mylog('verbose', [f"[{pluginName}] Connected to broker"])
mqtt_connected_to_broker = True # Signal connection
else:
mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {reason_code}"])
mylog('verbose', [f"[{pluginName}] Connection failed, reason_code: {rc}"])
mqtt_connected_to_broker = False

global mqtt_client
global mqtt_connected_to_broker

# Paho will be soon not supporting V1 anymore, so this really should not be a user choice to start with
# This code now uses V2 by default
Expand All @@ -306,18 +331,29 @@ def on_connect(mqtt_client, userdata, flags, reason_code, properties):
else:
version = mqtt.MQTTv5

# we now hardcode the client id into here.
# TODO: Add config ffor client id
mqtt_client = mqtt.Client(
client_id='netalertx',
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
transport=mytransport,
protocol=mqtt.MQTTv5)
protocol=version)
mqtt_client.on_connect = on_connect
mqtt_client.on_disconnect = on_disconnect

if get_setting_value('MQTT_TLS'):
mqtt_client.tls_set()

mqtt_client.username_pw_set(username = get_setting_value('MQTT_USER'), password = get_setting_value('MQTT_PASSWORD'))
mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT'))
err_code = mqtt_client.connect(host = get_setting_value('MQTT_BROKER'), port = get_setting_value('MQTT_PORT'))
if (err_code == mqtt.MQTT_ERR_SUCCESS):
# We (prematurely) set the connection state to connected
# the callback may be delayed
mqtt_connected_to_broker = True
# the client connects but connect callbacks will be called async and there may be a waiting time
# Mosquitto works straight away
# EMQX has a delay and does not update in loop below, so we cannot rely on it, we wait 1 sec
time.sleep(1)
mqtt_client.loop_start()

return mqtt_client
Expand Down

0 comments on commit e8f3530

Please sign in to comment.