diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 7bbc71c..0000000 --- a/.gitignore +++ /dev/null @@ -1,101 +0,0 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -env/ -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -*.egg-info/ -.installed.cfg -*.egg - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -.hypothesis/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# pyenv -.python-version - -# celery beat schedule file -celerybeat-schedule - -# SageMath parsed files -*.sage.py - -# dotenv -.env - -# virtualenv -.venv -venv/ -ENV/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ diff --git a/README.md b/README.md index 3e69e3c..af57c11 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,40 @@ +# Update: This library can now only be used with the new uasyncio version that got merged into the main micropython repository! + +# NOTE: +With the latest commits in the original repository by Peter Hinch, this module is now USEABLE AS A DROP-IN REPLACEMENT! Only change needed is how the MQTTClient is created. Look [here](./README_mqtt_as.md#23-example-usage) for an example, it is simple, the pythonic way. + +# Changes to base repo of Peter Hinch + +1. Sorted files and made a structure so you know which file belongs where without reading the documentation every time +2. Made repo a module to be used like +*from micropython_mqtt_as.mqtt_as import MQTTClient +from micropython_mqtt_as.config import config* +making it possible to just clone the repo and copy it to `espXXXX/modules` also reducing file clutter in this directory. +3. ~~Removed unnecessary workarounds of official ESP32 port for ESP32 loboris fork (Feel free to report issues).~~ implemented upstream +4. Changed MQTTClient constructor initialization from using a dictionary to using keywords with default parameters. It's still possible to use the dictionary for initialization with almost no changes to existing codebase +5. ~~Made a minimal version of mqtt_as for the ESP8266 to save some RAM~~ Removed again as the removing of workarounds in the main version only got ~150B less RAM usage which is not worth the effort. +6. Added support for "unsubscribe" +7. ~~Added support for recognizing retained publications (makes change in "subs_cb" necessary as it now has to take 3 args [topic,msg,retained])~~ implemented upstream +8. All other files are updated to the new changes and are usable (e.g. tests). +9. Updated documentation to reflect all changes +10. ~~Fixes a reliability problem when having many subscribe/unsubscribe in a short time, resulting endless reconnects (see commit for changes and explanation)~~ fixed upstream, real concurrent operations possible now +11. Added support for the unix port of Micropython + +Motivation for the changes: +For my project I had to adapt the library to use it on the ESP32 with loboris fork but also use it on my ESP8266 that is short on RAM all the time. +Therefore I had the following motivation for each of the above mentioned changes: +1. I don't like to walk through a mess of files not knowing which one is important or where it belongs to and I don't want to read all the documentation just to know which files belong where. +2. Like all modules this should be a directory as well, making usage easier. +3. ~~Made it work with loboris fork but did not want to use workarounds that are not needed on this fork. (Peter Hinch made it work with loboris port as well but has the workarounds still in it to be safe)~~ +4. I felt that this kind of initialization is the more pythonic way of doing things but apart from that it has an important advantage on the ESP8266, removing the config dict completely uses 100-200 Bytes less, which is important on ESP8266. +5. ~~This version for the ESP8266 has all non related code (workarounds for ESP32) and also some not commonly functions removed, saving another 150-250 Bytes so that after all changes I get 250-450 Bytes more RAM which is about 2% of the available RAM.~~ +6. At first I did not need that but later it became important to me so I added it +7. I made a huge workaround in a subclass to recognize retained messages instead of just supporting it directly +8. Although I do not need any other file I felt that it is important to finish the work I started and not leave half the repo unusable. +9. Wouldn't want issues because of wrong documentation or frustrated users. Have fun with it :D +10. ~~was simply needed. Sadly makes the module a little bigger~~ +11. Needed that to run my projects on a Pi and it's great for testing code + # Introduction MQTT is an easily used networking protocol designed for IOT (internet of @@ -38,7 +75,7 @@ It has the drawback of increased code size which is an issue on the ESP8266. Run as frozen bytecode it uses about 50% of the RAM on the ESP8266. On ESP32 and Pyboard D it may be run as a standard Python module. -It is documented [here](./mqtt_as/README.md). +It is documented [here](./README_mqtt_as.md). ## 2. MQTT for generic MicroPython targets @@ -50,4 +87,4 @@ five GPIO pins accessible via the `machine` library should suffice. The driver is non-blocking and is designed for applications using uasyncio. -It is documented [here](./NO_NET.md). +It is documented [here](./remote_mqtt/NO_NET.md). diff --git a/mqtt_as/README.md b/README_mqtt_as.md similarity index 89% rename from mqtt_as/README.md rename to README_mqtt_as.md index e6f2e0b..38fc29d 100644 --- a/mqtt_as/README.md +++ b/README_mqtt_as.md @@ -10,43 +10,43 @@ but duplication can occur. Level 2 avoids duplication; it is not supported by the official driver or by this module. Duplicates can readily be handled at the application level. -###### [Main README](../README.md) +###### [Main README](./README.md) # 1. Contents - 1. [Contents](./README.md#1-contents) - 1.1 [Rationale](./README.md#11-rationale) - 1.2 [Overview](./README.md#12-overview) - 1.3 [Project Status](./README.md#13-project-status) - 1.4 [ESP8266 Limitations](./README.md#14-esp8266-limitations) - 1.5 [ESP32 Issues](./README.md#15-esp32-issues) - 1.6 [Pyboard D](./README.md#16-pyboard-d) - 1.7 [Dependency](./README.md#17-dependency) - 2. [Getting started](./README.md#2-getting_started) - 2.1 [Program files](./README.md#21-program-files) - 2.2 [Installation](./README.md#22-installation) - 2.3 [Example Usage](./README.md#23-example-usage) - 3. [MQTTClient class](./README.md#3-mqttclient-class) - 3.1 [Constructor](./README.md#31-constructor) - 3.2 [Methods](./README.md#32-methods) -      3.2.1 [connect](./README.md#321-connect) -      3.2.2 [publish](./README.md#322-publish) -      3.2.3 [subscribe](./README.md#323-subscribe) -      3.2.4 [isconnected](./README.md#324-isconnected) -      3.2.5 [disconnect](./README.md#325-disconnect) -      3.2.6 [close](./README.md#326-close) -      3.2.7 [broker_up](./README.md#327-broker_up) -      3.2.8 [wan_ok](./README.md#328-wan_ok) - 3.3 [Class Variables](./README.md#33-class-variables) - 3.4 [Module Attribute](./README.md#34-module-attribute) - 4. [Notes](./README.md#4-notes) - 4.1 [Connectivity](./README.md#41-connectivity) - 4.2 [Client publications with qos == 1](./README.md#42-client-publications-with-qos-1) - 4.3 [Client subscriptions with qos == 1](./README.md#43-client-subscriptions-with-qos-1) - 4.4 [Application Design](./README.md#44-application-design) -      4.4.1 [Publication Timeouts](./README.md#441-publication-timeouts) - 5. [Low Power Demo](./README.md#5-low-power-demo) Note: Pyboard D specific and highly experimental. - 6. [References](./README.md#6-references) + 1. [Contents](./README.md#1-contents) + 1.1 [Rationale](./README.md#11-rationale) + 1.2 [Overview](./README.md#12-overview) + 1.3 [Project Status](./README.md#13-project-status) + 1.4 [ESP8266 Limitations](./README.md#14-esp8266-limitations) + 1.5 [ESP32 Issues](./README.md#15-esp32-issues) + 1.6 [Pyboard D](./README.md#16-pyboard-d) + 1.7 [Dependency](./README.md#17-dependency) + 2. [Getting started](./README.md#2-getting_started) + 2.1 [Program files](./README.md#21-program-files) + 2.2 [Installation](./README.md#22-installation) + 2.3 [Example Usage](./README.md#23-example-usage) + 3. [MQTTClient class](./README.md#3-mqttclient-class) + 3.1 [Constructor](./README.md#31-constructor) + 3.2 [Methods](./README.md#32-methods) +      3.2.1 [connect](./README.md#321-connect) +      3.2.2 [publish](./README.md#322-publish) +      3.2.3 [subscribe](./README.md#323-subscribe) +      3.2.4 [isconnected](./README.md#324-isconnected) +      3.2.5 [disconnect](./README.md#325-disconnect) +      3.2.6 [close](./README.md#326-close) +      3.2.7 [broker_up](./README.md#327-broker_up) +      3.2.8 [wan_ok](./README.md#328-wan_ok) + 3.3 [Class Variables](./README.md#33-class-variables) + 3.4 [Module Attribute](./README.md#34-module-attribute) + 4. [Notes](./README.md#4-notes) + 4.1 [Connectivity](./README.md#41-connectivity) + 4.2 [Client publications with qos == 1](./README.md#42-client-publications-with-qos-1) + 4.3 [Client subscriptions with qos == 1](./README.md#43-client-subscriptions-with-qos-1) + 4.4 [Application Design](./README.md#44-application-design) +      4.4.1 [Publication Timeouts](./README.md#441-publication-timeouts) + 5. [Low Power Demo](./README.md#5-low-power-demo) Note: Pyboard D specific and highly experimental. + 6. [References](./README.md#6-references) ## 1.1 Rationale @@ -97,7 +97,7 @@ modified for resilience and for asynchronous operation. Hardware support: Pyboard D, ESP8266 and ESP32. Firmware support: Official firmware. Limited support for ESP32 Loboris port. -Broker support: Mosquitto is preferred for its excellent MQTT compliance. +Broker support: Mosquitto is preferred for its excellent MQTT compliance. Protocol: Currently the module supports a subset of MQTT revision 3.1.1. Initial development was by Peter Hinch. Thanks are due to Kevin Köck for @@ -109,16 +109,22 @@ providing and testing a number of bugfixes and enhancements. SSL/TLS now tested successfully on Pyboard D. Fix bug where ESP8266 could hang attempting to connect. Can now reconnect after disconnect is issued. -Now supports concurrent qos==1 publications and subscriptions. +Now supports concurrent qos==1 publications and subscriptions. **API change** The disconnect method is now asynchronous. -24th Sept 2019 +24th Sept 2019 **API change:** the subscription callback requires an additional parameter for the retained message flag. On ESP8266 the code disables automatic sleep: this reduces reconnects at cost -of increased power consumption. +of increased power consumption. -1st April 2019 +2nd July 2019 +Added support for the unix port of Micropython. The unique_id must be set manually +as the unix port doesn't have the function *unique_id()* to read a chip's id. +The library assumes that the device is correctly connected to the network as the OS +will take care of the network connection. + +1st April 2019 In the light of improved ESP32 firmware and the availability of the Pyboard D the code has minor changes to support these platforms. @@ -178,7 +184,7 @@ fork and its library, but this has not been tested. ### Experimental scripts 1. `lowpower.py` Pybaord D micro-power test. See [Section 5](./README.md#5-low-power-demo). - 2. `tls8266.py` SSL/TLS connectionfor ESP8266. Fails with + 2. `tls8266.py` SSL/TLS connectionfor ESP8266. Fails with `ssl_handshake_status: -4`. 3. `tls32.py` SSL/TLS connection for ESP32. Fails with `mbedtls_ssl_handshake error: -77`. @@ -197,7 +203,7 @@ platforms, or to have the capability of running on an ESP8266 which has not previously connected, `config.py` should be edited to provide them. This is a sample cross-platform file: ```python -from mqtt_as import config +from micropython_mqtt_as.mqtt_as import config config['server'] = '192.168.0.10' # Change to suit e.g. 'iot.eclipse.org' @@ -230,9 +236,10 @@ with the topic `foo_topic` the topic and message are printed. The code periodically publishes an incrementing count under the topic `result`. ```python -from mqtt_as import MQTTClient -from config import config +from micropython_mqtt_as.mqtt_as import MQTTClient +from micropython_mqtt_as.config import config import uasyncio as asyncio +from sys import platform SERVER = '192.168.0.10' # Change to suit e.g. 'iot.eclipse.org' @@ -255,9 +262,13 @@ async def main(client): config['subs_cb'] = callback config['connect_coro'] = conn_han config['server'] = SERVER +if platform == "linux": + config["client_id"]="linux" MQTTClient.DEBUG = True # Optional: print diagnostic messages -client = MQTTClient(config) +client = MQTTClient(**config) # Using dict to stay compatible to upstream. +# Alternatively initialize MQTTClient the pythonic way using arguments like: +# client = MQTTClient(server=SERVER, port=1883, ...) loop = asyncio.get_event_loop() try: loop.run_until_complete(main(client)) @@ -286,9 +297,12 @@ The module provides a single class: `MQTTClient`. ## 3.1 Constructor -This takes a dictionary as argument. The default is `mqtt_as.config`. Normally -an application imports this and modifies selected entries as required. Entries -are as follows (default values shown in []): +This takes all keywords found in the dictionary in `config.py` as argument. +As a convenience you can also use this dictionary by importing it and changing +the values. You then call the constructor by `MQTTClient(**config)`, this +automatically matches the contents of the dict to the keywords of the constructor. + +Entries of config dictionary are: **WiFi Credentials** @@ -446,6 +460,16 @@ to '8.8.8.8' and checks for a valid response. There is a single arg `packet` which is a bytes object being the DNS query. The default object queries the Google DNS server. +### 3.2.9 unsubscribe (async) + +Unsubscribes a topic, so no messages will be received anymore. + +The coro will pause until a `UNSUBACK` has been received from the broker, if +necessary reconnecting to a failed network. + +Args: + 1. `topic` + ## 3.3 Class Variables 1. `DEBUG` If `True` causes diagnostic messages to be printed. @@ -597,7 +621,7 @@ not detected. [mosquitto server](http://mosquitto.org/man/mosquitto-8.html) [mosquitto client publish](http://mosquitto.org/man/mosquitto_pub-1.html) [mosquitto client subscribe](http://mosquitto.org/man/mosquitto_sub-1.html) -[MQTT 3.1.1 spec](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048) +[MQTT 3.1.1 spec](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048) [python client for PC's](https://www.eclipse.org/paho/clients/python/) [Unofficial MQTT FAQ](https://forum.micropython.org/viewtopic.php?f=16&t=2239) diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mqtt_as/config.py b/config.py similarity index 51% rename from mqtt_as/config.py rename to config.py index cec3fa0..a626ec7 100644 --- a/mqtt_as/config.py +++ b/config.py @@ -1,6 +1,34 @@ -# config.py Local configuration for mqtt_as demo programs. from sys import platform -from mqtt_as import config + +# Include any cross-project settings. + +config = { + 'client_id': None, # will default to hexlify(unique_id()) + 'server': None, + 'port': 0, + 'user': '', + 'password': '', + 'keepalive': 60, + 'ping_interval': 0, + 'ssl': False, + 'ssl_params': {}, + 'response_time': 10, + 'clean_init': True, + 'clean': True, + 'max_repubs': 4, + 'will': None, + 'subs_cb': lambda *_: None, + 'wifi_coro': None, + 'connect_coro': None, + 'ssid': None, + 'wifi_pw': None, +} + +if platform == 'esp32': + config['ssid'] = 'my SSID' # EDIT if you're using ESP32 / Pyboard D + config['wifi_pw'] = 'my WiFi password' + +# config.py Local configuration for mqtt_as demo programs. config['server'] = '192.168.0.10' # Change to suit # config['server'] = 'iot.eclipse.org' @@ -9,6 +37,9 @@ config['ssid'] = 'my_SSID' config['wifi_pw'] = 'my_WiFi_password' +if platform == "linux": + config["client_id"] = "linux" # change this to whatever your client_id should be + # For demos ensure the same calling convention for LED's on all platforms. # ESP8266 Feather Huzzah reference board has active low LED's on pins 0 and 2. # ESP32 is assumed to have user supplied active low LED's on same pins. @@ -16,20 +47,32 @@ if platform == 'esp8266' or platform == 'esp32' or platform == 'esp32_LoBo': from machine import Pin + + def ledfunc(pin): pin = pin + def func(v): pin(not v) # Active low on ESP8266 + return func - wifi_led = ledfunc(Pin(0, Pin.OUT, value = 0)) # Red LED for WiFi fail/not ready yet - blue_led = ledfunc(Pin(2, Pin.OUT, value = 1)) # Message received + + + wifi_led = ledfunc(Pin(0, Pin.OUT, value=0)) # Red LED for WiFi fail/not ready yet + blue_led = ledfunc(Pin(2, Pin.OUT, value=1)) # Message received elif platform == 'pyboard': from pyb import LED + + def ledfunc(led, init): led = led led.on() if init else led.off() + def func(v): led.on() if v else led.off() + return func + + wifi_led = ledfunc(LED(1), 1) blue_led = ledfunc(LED(3), 0) diff --git a/mqtt_as/mqtt_as.py b/mqtt_as.py similarity index 82% rename from mqtt_as/mqtt_as.py rename to mqtt_as.py index 893a340..6d8192e 100644 --- a/mqtt_as/mqtt_as.py +++ b/mqtt_as.py @@ -1,5 +1,6 @@ # mqtt_as.py Asynchronous version of umqtt.robust # (C) Copyright Peter Hinch 2017-2019. +# (C) Copyright Kevin Köck 2018-2019. # Released under the MIT licence. # Pyboard D support added @@ -14,18 +15,16 @@ import uasyncio as asyncio gc.collect() -from utime import ticks_ms, ticks_diff, sleep_ms +from utime import ticks_ms, ticks_diff from uerrno import EINPROGRESS, ETIMEDOUT gc.collect() from micropython import const -from machine import unique_id -import network gc.collect() from sys import platform -VERSION = (0, 5, 0) +VERSION = (0, 6, 0) # Default short delay for good SynCom throughput (avoid sleep(0) with SynCom). _DEFAULT_MS = const(20) @@ -42,6 +41,14 @@ ESP32 = platform == 'esp32' PYBOARD = platform == 'pyboard' LOBO = platform == 'esp32_LoBo' +LINUX = platform == "linux" + +if LINUX is False: + import network + from machine import unique_id +else: + def unique_id(): + raise NotImplementedError("Linux doesn't have a unique id. Provide the argument client_id") # Default "do little" coro for optional user replacement @@ -49,107 +56,74 @@ async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program await asyncio.sleep_ms(_DEFAULT_MS) -config = { - 'client_id': hexlify(unique_id()), - 'server': None, - 'port': 0, - 'user': '', - 'password': '', - 'keepalive': 60, - 'ping_interval': 0, - 'ssl': False, - 'ssl_params': {}, - 'response_time': 10, - 'clean_init': True, - 'clean': True, - 'max_repubs': 4, - 'will': None, - 'subs_cb': lambda *_: None, - 'wifi_coro': eliza, - 'connect_coro': eliza, - 'ssid': None, - 'wifi_pw': None, -} - - class MQTTException(Exception): pass + def pid_gen(): pid = 0 while True: pid = pid + 1 if pid < 65535 else 1 yield pid + def qos_check(qos): if not (qos == 0 or qos == 1): raise ValueError('Only qos 0 and 1 are supported.') -class Lock(): - def __init__(self): - self._locked = False - - async def __aenter__(self): - while True: - if self._locked: - await asyncio.sleep_ms(_DEFAULT_MS) - else: - self._locked = True - break - - async def __aexit__(self, *args): - self._locked = False - await asyncio.sleep_ms(_DEFAULT_MS) - - # MQTT_base class. Handles MQTT protocol on the basis of a good connection. # Exceptions from connectivity failures are handled by MQTTClient subclass. class MQTT_base: REPUB_COUNT = 0 # TEST DEBUG = False - def __init__(self, config): + def __init__(self, client_id, server, port, user, password, keepalive, ping_interval, + ssl, ssl_params, response_time, clean_init, clean, max_repubs, will, + subs_cb, wifi_coro, connect_coro, ssid, wifi_pw): # MQTT config - self._client_id = config['client_id'] - self._user = config['user'] - self._pswd = config['password'] - self._keepalive = config['keepalive'] + self.ping_interval = ping_interval + self._client_id = client_id + self._user = user + self._pswd = password + self._keepalive = keepalive if self._keepalive >= 65536: raise ValueError('invalid keepalive time') - self._response_time = config['response_time'] * 1000 # Repub if no PUBACK received (ms). - self._max_repubs = config['max_repubs'] - self._clean_init = config['clean_init'] # clean_session state on first connection - self._clean = config['clean'] # clean_session state on reconnect - will = config['will'] + self._response_time = response_time * 1000 # Repub if no PUBACK received (ms). + self._max_repubs = max_repubs + self._clean_init = clean_init # clean_session state on first connection + self._clean = clean # clean_session state on reconnect if will is None: self._lw_topic = False else: self._set_last_will(*will) # WiFi config - self._ssid = config['ssid'] # Required for ESP32 / Pyboard D. Optional ESP8266 - self._wifi_pw = config['wifi_pw'] - self._ssl = config['ssl'] - self._ssl_params = config['ssl_params'] + self._ssid = ssid # Required ESP32 / Pyboard D + self._wifi_pw = wifi_pw + self._ssl = ssl + self._ssl_params = ssl_params # Callbacks and coros - self._cb = config['subs_cb'] - self._wifi_handler = config['wifi_coro'] - self._connect_handler = config['connect_coro'] + self._cb = subs_cb + self._wifi_handler = wifi_coro + self._connect_handler = connect_coro # Network - self.port = config['port'] + self.port = port if self.port == 0: self.port = 8883 if self._ssl else 1883 - self.server = config['server'] + self.server = server if self.server is None: raise ValueError('no server specified.') self._sock = None - self._sta_if = network.WLAN(network.STA_IF) - self._sta_if.active(True) + if LINUX is True: + self._sta_isconnected = True + else: + self._sta_if = network.WLAN(network.STA_IF) + self._sta_if.active(True) self.newpid = pid_gen() self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response self.last_rx = ticks_ms() # Time of last communication from broker - self.lock = Lock() + self.lock = asyncio.Lock() def _set_last_will(self, topic, msg, retain=False, qos=0): qos_check(qos) @@ -400,6 +374,19 @@ async def subscribe(self, topic, qos): if not await self._await_pid(pid): raise OSError(-1) + # Can raise OSError if WiFi fails. Subclass traps + async def unsubscribe(self, topic): + pkt = bytearray(b"\xa2\0\0\0") + pid = next(self.newpid) + self.rcv_pids.add(pid) + struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid) + async with self.lock: + await self._as_write(pkt) + await self._send_str(topic) + + if not await self._await_pid(pid): + raise OSError(-1) + # Wait for a single incoming MQTT message and process it. # Subscribed messages are delivered to a callback previously # set by .setup() method. Other (internal) MQTT @@ -438,6 +425,14 @@ async def wait_msg(self): else: raise OSError(-1) + if op == 0xB0: # UNSUBACK + resp = await self._as_read(3) + pid = resp[2] | (resp[1] << 8) + if pid in self.rcv_pids: + self.rcv_pids.discard(pid) + else: + raise OSError(-1) + if op & 0xf0 != 0x30: return sz = await self._recv_len() @@ -452,23 +447,46 @@ async def wait_msg(self): msg = await self._as_read(sz) retained = op & 0x01 self._cb(topic, msg, bool(retained)) - if op & 6 == 2: + if op & 6 == 2: # qos 1 pkt = bytearray(b"\x40\x02\0\0") # Send PUBACK struct.pack_into("!H", pkt, 2, pid) await self._as_write(pkt) - elif op & 6 == 4: + elif op & 6 == 4: # qos 2 not supported raise OSError(-1) # MQTTClient class. Handles issues relating to connectivity. class MQTTClient(MQTT_base): - def __init__(self, config): - super().__init__(config) + def __init__(self, client_id=None, + server=None, + port=0, + user='', + password='', + keepalive=60, + ping_interval=0, + ssl=False, + ssl_params={}, + response_time=10, + clean_init=True, + clean=True, + max_repubs=4, + will=None, + subs_cb=lambda *_: None, + wifi_coro=None, + connect_coro=None, + ssid=None, + wifi_pw=None): + client_id = client_id or hexlify(unique_id()) + wifi_coro = wifi_coro or eliza + connect_coro = connect_coro or eliza + super().__init__(client_id, server, port, user, password, keepalive, ping_interval, + ssl, ssl_params, response_time, clean_init, clean, max_repubs, will, + subs_cb, wifi_coro, connect_coro, ssid, wifi_pw) self._isconnected = False # Current connection state keepalive = 1000 * self._keepalive # ms self._ping_interval = keepalive // 4 if keepalive else 20000 - p_i = config['ping_interval'] * 1000 # Can specify shorter e.g. for subscribe-only + p_i = self.ping_interval * 1000 # Can specify shorter e.g. for subscribe-only if p_i and p_i < self._ping_interval: self._ping_interval = p_i self._in_connect = False @@ -478,6 +496,9 @@ def __init__(self, config): esp.sleep_type(0) # Improve connection integrity at cost of power consumption. async def wifi_connect(self): + if LINUX is True: # no network control, assume connected as OS takes care of that + self._sta_isconnected = True + return s = self._sta_if if ESP8266: if s.isconnected(): # 1st attempt, already connected. @@ -543,7 +564,8 @@ async def connect(self): loop.create_task(self._wifi_handler(True)) # User handler. if not self._has_connected: self._has_connected = True # Use normal clean flag on reconnect. - loop.create_task(self._keep_connected()) # Runs forever unless user issues .disconnect() + loop.create_task( + self._keep_connected()) # Runs forever unless user issues .disconnect() loop.create_task(self._handle_msg()) # Tasks quit on connection fail. loop.create_task(self._keep_alive()) @@ -594,8 +616,12 @@ async def _memory(self): def isconnected(self): if self._in_connect: # Disable low-level check during .connect() return True - if self._isconnected and not self._sta_if.isconnected(): # It's going down. - self._reconnect() + if LINUX is True: + if self._isconnected and self._sta_isconnected is False: + self._reconnect() + else: + if self._isconnected and not self._sta_if.isconnected(): # It's going down. + self._reconnect() return self._isconnected def _reconnect(self): # Schedule a reconnection if not underway. @@ -618,7 +644,10 @@ async def _keep_connected(self): await asyncio.sleep(1) gc.collect() else: - self._sta_if.disconnect() + if LINUX is True: + self._sta_isconnected = False + else: + self._sta_if.disconnect() await asyncio.sleep(1) try: await self.wifi_connect() @@ -649,6 +678,15 @@ async def subscribe(self, topic, qos=0): pass self._reconnect() # Broker or WiFi fail. + async def unsubscribe(self, topic): + while 1: + await self._connection() + try: + return await super().unsubscribe(topic) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + async def publish(self, topic, msg, retain=False, qos=0): qos_check(qos) while 1: diff --git a/mqtt_as_OOM_protection.py b/mqtt_as_OOM_protection.py new file mode 100644 index 0000000..72a70de --- /dev/null +++ b/mqtt_as_OOM_protection.py @@ -0,0 +1,55 @@ +# Author: Kevin Köck +# Copyright Kevin Köck 2019 Released under the MIT license +# Created on 2019-10-28 + +# This is basically just a paranoid change protecting the device from crashing +# with a memory allocation error if it receives a message not fitting in RAM. +# It will still receive and process the message but the arguments "topic" or "msg" +# of the callback might be None depending on which values couldn't be received +# because of the memory allocation error. + +__updated__ = "2019-10-28" +__version__ = "0.1" + +from .mqtt_as import MQTTClient as _MQTTClient, ticks_ms, BUSY_ERRORS, asyncio, _SOCKET_POLL_DELAY +import gc + + +class MQTTClient(_MQTTClient): + async def _as_read(self, n, sock=None): # OSError caught by superclass + if sock is None: + sock = self._sock + data = b'' + t = ticks_ms() + r = 0 + while r < n: + if self._timeout(t) or not self.isconnected(): + raise OSError(-1) + nr = (n - r) if (n - r) < 200 else 200 + try: + msg = sock.read(nr) + if msg is not None: + r += len(msg) + except OSError as e: # ESP32 issues weird 119 errors here + msg = None + if e.args[0] not in BUSY_ERRORS: + raise + except MemoryError as e: + # no way of knowing how many bytes were received so the rest would be + # received later and lead to buffer overflows and keepalive timeout anyway + # so best to terminate the connection + raise OSError + if msg == b'': # Connection closed by host + raise OSError(-1) + if msg is not None and data is not None: # data received + try: + data = b''.join((data, msg)) + except MemoryError as e: + data = None + gc.collect() + t = ticks_ms() + self.last_rx = ticks_ms() + await asyncio.sleep_ms(0 if nr == 200 and msg is not None else _SOCKET_POLL_DELAY) + # don't wait if receiving a big message in chunks but yield + # to not block all other coroutines + return data diff --git a/mqtt_as/mqtt_as_timeout.py b/mqtt_as_timeout.py similarity index 62% rename from mqtt_as/mqtt_as_timeout.py rename to mqtt_as_timeout.py index a3b9bd5..8927ace 100644 --- a/mqtt_as/mqtt_as_timeout.py +++ b/mqtt_as_timeout.py @@ -7,12 +7,8 @@ # connectivity and cancels it if the delay exceeds a timeout. # Note that it blocks other attempts at publication while waiting for a PUBACK, -# counter to the normal operation of the module. A solution capable of handling -# concurrent qos == 1 publications would require a set instance containing coros. - -# It incorporates a workround for the bug in uasyncio V2 whereby cancellation -# is deferred if a task is waiting on a sleep command. -# For these reasons it was not included in the mqtt_as module. +# counter to the normal operation of the module but uses less RAM than the +# implementation with concurrent operations. # The occurrence of a timeout does not guarantee non-reception of the message: # connectivity loss may occur between reception by the broker and reception of @@ -23,8 +19,9 @@ import time import uasyncio as asyncio + class MQTTClient(_MQTTClient): - _pub_coro = None + _pub_task = None # Await broker connection. Subclassed to reduce canceling time from 1s to 50ms async def _connection(self): @@ -34,24 +31,24 @@ async def _connection(self): async def _publishTimeout(self, topic, msg, retain, qos): try: await super().publish(topic, msg, retain, qos) - except asyncio.CancelledError: - pass finally: - self._pub_coro = None + self._pub_task = None async def publish(self, topic, msg, retain=False, qos=0, timeout=None): - coro = None + task = None start = time.ticks_ms() while timeout is None or time.ticks_diff(time.ticks_ms(), start) < timeout: - if self._pub_coro is None and coro is None: - coro = self._publishTimeout(topic, msg, retain, qos) - asyncio.get_event_loop().create_task(coro) - self._pub_coro = coro - elif coro is not None: - if self._pub_coro != coro: + # Can't use wait_for because cancelling a wait_for would cancel _publishTimeout + # Also a timeout in wait_for would cancel _publishTimeout without waiting for + # the socket lock to be available, breaking mqtt protocol. + if self._pub_task is None and task is None: + task = asyncio.create_task(self._publishTimeout(topic, msg, retain, qos)) + self._pub_task = task + elif task is not None: + if self._pub_task != task: return # published await asyncio.sleep_ms(20) - if coro is not None: + if task is not None: async with self.lock: - asyncio.cancel(coro) + task.cancel() return diff --git a/mqtt_as_timeout_concurrent.py b/mqtt_as_timeout_concurrent.py new file mode 100644 index 0000000..9eb9b7f --- /dev/null +++ b/mqtt_as_timeout_concurrent.py @@ -0,0 +1,73 @@ +# Author: Kevin Köck +# Copyright Kevin Köck 2019 Released under the MIT license +# Created on 2019-11-04 + +__updated__ = "2020-04-01" +__version__ = "0.4" + +try: + from micropython_mqtt_as.mqtt_as import MQTTClient as _MQTTClient +except ImportError: + from .mqtt_as import MQTTClient as _MQTTClient +import uasyncio as asyncio +import time + + +class MQTTClient(_MQTTClient): + # operations return False is connection was lost and await_connection==False. + # operations return True if the operation was finished (mqtt_as doesn't return anything) + # operations raise asyncio.TimeoutError on timeout + # operations raise asyncio.CancelledError if the caller task got cancelled + + async def _waiter(self, coro, timeout, await_connection): + # using _waiter even without a timeout as it ensures proper cancellation with self.lock + done = False + + async def op(): + nonlocal done + try: + await coro + done = True + except Exception as e: + done = e + + task = asyncio.create_task(op()) + start = time.ticks_ms() + try: + while not done: + if not await_connection and not self._isconnected: + self.dprint("Connection lost") + return False + elif timeout and time.ticks_diff(time.ticks_ms(), start) > timeout * 1000: + self.dprint("timeout in operation") + raise asyncio.TimeoutError + await asyncio.sleep_ms(40) + task = None # Task finished. finally doesn't need to cancel it. + if isinstance(done, Exception): + raise done + else: + return done + except asyncio.CancelledError: + # operation got cancelled externally, finally: will cancel the task + raise + finally: + if task: + async with self.lock: + self.dprint("canceled with lock") + task.cancel() + + async def publish(self, topic, msg, retain=False, qos=0, timeout=None, await_connection=True): + if not await_connection and not self._isconnected: + return False + return await self._waiter(super().publish(topic, msg, retain, qos), timeout, + await_connection) + + async def subscribe(self, topic, qos=0, timeout=None, await_connection=True): + if not await_connection and not self._isconnected: + return False + return await self._waiter(super().subscribe(topic, qos), timeout, await_connection) + + async def unsubscribe(self, topic, timeout=None, await_connection=True): + if not await_connection and not self._isconnected: + return False + return await self._waiter(super().unsubscribe(topic), timeout, await_connection) diff --git a/NO_NET.md b/remote_mqtt/NO_NET.md similarity index 100% rename from NO_NET.md rename to remote_mqtt/NO_NET.md diff --git a/_boot.py b/remote_mqtt/_boot.py similarity index 100% rename from _boot.py rename to remote_mqtt/_boot.py diff --git a/firmware-combined.bin b/remote_mqtt/firmware-combined.bin similarity index 100% rename from firmware-combined.bin rename to remote_mqtt/firmware-combined.bin diff --git a/main.py b/remote_mqtt/main.py similarity index 100% rename from main.py rename to remote_mqtt/main.py diff --git a/mqtt.py b/remote_mqtt/mqtt.py similarity index 91% rename from mqtt.py rename to remote_mqtt/mqtt.py index 36ddf90..c39a6d1 100644 --- a/mqtt.py +++ b/remote_mqtt/mqtt.py @@ -8,7 +8,8 @@ import gc import ubinascii -from mqtt_as import MQTTClient, config +from micropython_mqtt_as.mqtt_as import MQTTClient +from micropython_mqtt_as.config import config from machine import Pin, unique_id, freq import uasyncio as asyncio gc.collect() @@ -21,7 +22,8 @@ from status_values import * # Numeric status values shared with user code. _WIFI_DELAY = 15 # Time (s) to wait for default network -blue = Pin(2, Pin.OUT, value = 1) +blue = Pin(2, Pin.OUT, value=1) + def loads(s): d = {} @@ -29,9 +31,12 @@ def loads(s): return d["v"] # Format an arbitrary list of positional args as a status_values.SEP separated string + + def argformat(*a): return SEP.join(['{}' for x in range(len(a))]).format(*a) + async def heartbeat(): led = Pin(0, Pin.OUT) while True: @@ -50,7 +55,7 @@ def __init__(self, channel, config): config['wifi_coro'] = self.wifi_han config['connect_coro'] = self.conn_han config['client_id'] = ubinascii.hexlify(unique_id()) - super().__init__(config) + super().__init__(**config) # Get NTP time or 0 on any error. async def get_time(self): @@ -98,21 +103,22 @@ async def conn_han(self, _): for topic, qos in self.subscriptions.items(): await self.subscribe(topic, qos) - def subs_cb(self, topic, msg): + def subs_cb(self, topic, msg, retained): self.channel.send(argformat(SUBSCRIPTION, topic.decode('UTF8'), msg.decode('UTF8'))) + class Channel(SynCom): def __init__(self): - mtx = Pin(14, Pin.OUT) # Define pins + mtx = Pin(14, Pin.OUT) # Define pins mckout = Pin(15, Pin.OUT, value=0) # clocks must be initialised to 0 mrx = Pin(13, Pin.IN) mckin = Pin(12, Pin.IN) - super().__init__(True, mckin, mckout, mrx, mtx, string_mode = True) + super().__init__(True, mckin, mckout, mrx, mtx, string_mode=True) self.cstatus = False # Connection status self.client = None -# Task runs continuously. Process incoming Pyboard messages. -# Started by main_task() after client instantiated. + # Task runs continuously. Process incoming Pyboard messages. + # Started by main_task() after client instantiated. async def from_pyboard(self): client = self.client while True: @@ -136,9 +142,9 @@ async def from_pyboard(self): else: self.send(argformat(STATUS, UNKNOWN, 'Unknown command:', istr)) -# Runs when channel has synchronised. No return: Pyboard resets ESP on fail. -# Get parameters from Pyboard. Process them. Connect. Instantiate client. Start -# from_pyboard() task. Wait forever, updating connected status. + # Runs when channel has synchronised. No return: Pyboard resets ESP on fail. + # Get parameters from Pyboard. Process them. Connect. Instantiate client. Start + # from_pyboard() task. Wait forever, updating connected status. async def main_task(self, _): got_params = False # Await connection parameters (init record) @@ -189,8 +195,8 @@ async def main_task(self, _): self.send(argformat(STATUS, SPECNET)) # Pause for confirmation. User may opt to reboot instead. istr = await self.await_obj(100) - ap = WLAN(AP_IF) # create access-point interface - ap.active(False) # deactivate the interface + ap = WLAN(AP_IF) # create access-point interface + ap.active(False) # deactivate the interface sta_if.active(True) sta_if.connect(ssid, pw) while not sta_if.isconnected(): @@ -219,6 +225,7 @@ async def main_task(self, _): gc.threshold(gc.mem_free() // 4 + gc.mem_alloc()) await asyncio.sleep(1) + loop = asyncio.get_event_loop() loop.create_task(heartbeat()) # Comms channel to Pyboard diff --git a/net_local.py b/remote_mqtt/net_local.py similarity index 100% rename from net_local.py rename to remote_mqtt/net_local.py diff --git a/pbmqtt.py b/remote_mqtt/pbmqtt.py similarity index 100% rename from pbmqtt.py rename to remote_mqtt/pbmqtt.py diff --git a/status_values.py b/remote_mqtt/status_values.py similarity index 100% rename from status_values.py rename to remote_mqtt/status_values.py diff --git a/syncom.py b/remote_mqtt/syncom.py similarity index 100% rename from syncom.py rename to remote_mqtt/syncom.py diff --git a/pb_simple.py b/remote_mqtt/tests/pb_simple.py similarity index 100% rename from pb_simple.py rename to remote_mqtt/tests/pb_simple.py diff --git a/pb_status.py b/remote_mqtt/tests/pb_status.py similarity index 100% rename from pb_status.py rename to remote_mqtt/tests/pb_status.py diff --git a/pbmqtt_test.py b/remote_mqtt/tests/pbmqtt_test.py similarity index 100% rename from pbmqtt_test.py rename to remote_mqtt/tests/pbmqtt_test.py diff --git a/pbrange.py b/remote_mqtt/tests/pbrange.py similarity index 100% rename from pbrange.py rename to remote_mqtt/tests/pbrange.py diff --git a/pubtest b/remote_mqtt/tests/pubtest similarity index 100% rename from pubtest rename to remote_mqtt/tests/pubtest diff --git a/pubtest_range b/remote_mqtt/tests/pubtest_range similarity index 100% rename from pubtest_range rename to remote_mqtt/tests/pubtest_range diff --git a/asyn.py b/tests/asyn.py similarity index 99% rename from asyn.py rename to tests/asyn.py index b49256a..8ae4d06 100644 --- a/asyn.py +++ b/tests/asyn.py @@ -336,7 +336,7 @@ def new_gen(*args, **kwargs): args = (args[0],) + args[2:] g = f(*args, **kwargs) try: - res = await g + res = yield g return res finally: NamedTask._stopped(task_id) diff --git a/mqtt_as/clean.py b/tests/clean.py similarity index 95% rename from mqtt_as/clean.py rename to tests/clean.py index 277367d..5fe09ce 100644 --- a/mqtt_as/clean.py +++ b/tests/clean.py @@ -12,14 +12,16 @@ # red LED: ON == WiFi fail # blue LED heartbeat: demonstrates scheduler is running. -from mqtt_as import MQTTClient, config +from micropython_mqtt_as.mqtt_as import MQTTClient, config from config import wifi_led, blue_led # Local definitions import uasyncio as asyncio + # Subscription callback def sub_cb(topic, msg, retained): print((topic, msg, retained)) + # Demonstrate scheduler is operational. async def heartbeat(): s = True @@ -28,15 +30,18 @@ async def heartbeat(): blue_led(s) s = not s + async def wifi_han(state): wifi_led(not state) print('Wifi is ', 'up' if state else 'down') await asyncio.sleep(1) + # If you connect with clean_session True, must re-subscribe (MQTT spec 3.1.2.4) async def conn_han(client): await client.subscribe('foo_topic', 1) + async def main(client): try: await client.connect() @@ -48,9 +53,10 @@ async def main(client): await asyncio.sleep(5) print('publish', n) # If WiFi is down the following will pause for the duration. - await client.publish('result', '{} {}'.format(n, client.REPUB_COUNT), qos = 1) + await client.publish('result', '{} {}'.format(n, client.REPUB_COUNT), qos=1) n += 1 + # Define configuration config['subs_cb'] = sub_cb config['wifi_coro'] = wifi_han diff --git a/mqtt_as/lowpower.py b/tests/lowpower.py similarity index 100% rename from mqtt_as/lowpower.py rename to tests/lowpower.py diff --git a/mqtt_as/main.py b/tests/main.py similarity index 100% rename from mqtt_as/main.py rename to tests/main.py diff --git a/mqtt_as/pubtest b/tests/pubtest similarity index 100% rename from mqtt_as/pubtest rename to tests/pubtest diff --git a/mqtt_as/range.py b/tests/range.py similarity index 100% rename from mqtt_as/range.py rename to tests/range.py diff --git a/mqtt_as/range_ex.py b/tests/range_ex.py similarity index 100% rename from mqtt_as/range_ex.py rename to tests/range_ex.py diff --git a/tests/timeout_concurrent.py b/tests/timeout_concurrent.py new file mode 100644 index 0000000..498e70d --- /dev/null +++ b/tests/timeout_concurrent.py @@ -0,0 +1,111 @@ +# Author: Kevin Köck +# Copyright Kevin Köck 2019 Released under the MIT license +# Created on 2019-11-05 + +__updated__ = "2019-11-05" +__version__ = "0.1" + +from ..mqtt_as_timeout_concurrent import MQTTClient + +import uasyncio as asyncio + +loop = asyncio.get_event_loop(waitq_len=60, runq_len=60) + + +async def publish(val, t): + print("Publishing", val, "timeout", t) + await client.publish("foo_topic", val, qos=1, timeout=t) + + +def callback(topic, msg, retained): + print((topic, msg, retained)) + + +first = True + + +async def conn_han(client): + global first + if first: + # await client.subscribe('foo_topic', 1) + loop = asyncio.get_event_loop() + loop.create_task(publish("payload {!s}".format(1), 1)) + loop.create_task(publish("payload {!s}".format(2), 2)) + loop.create_task(client.subscribe("testtopic{!s}".format(3), qos=1, timeout=5)) + loop.create_task(publish("payload {!s}".format(4), 4)) + loop.create_task(publish("payload {!s}".format(5), 5)) + loop.create_task(client.subscribe("testtopic{!s}".format(6), qos=1, timeout=5)) + first = False + await asyncio.sleep(1) + print("Closing connection") + await client.disconnect() + await asyncio.sleep(5) + print("Publishing disconnected") + loop.create_task(publish("payload {!s}".format(1), 1)) + loop.create_task(publish("payload {!s}".format(2), 2)) + loop.create_task(client.subscribe("testtopic{!s}".format(3), qos=1, timeout=5)) + loop.create_task(publish("payload {!s}".format(4), 4)) + loop.create_task(publish("payload {!s}".format(5), 5)) + loop.create_task(client.subscribe("testtopic{!s}".format(6), qos=1, timeout=5)) + await asyncio.sleep(10) + print("Reconnecting after all timeouts") + await client.connect() + loop.create_task(publish("payload {!s}".format(8), 8)) + await asyncio.sleep(5) + print("Test done") + await client.disconnect() + + +import config +from ubinascii import hexlify +from machine import unique_id + + +async def wifi(state): + print("WIFI state", state) + + +async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program + await asyncio.sleep_ms(20) + + +config_dict = { + 'client_id': hexlify(unique_id()), + 'server': config.MQTT_HOST, + 'port': config.MQTT_PORT, + 'user': config.MQTT_USER, + 'password': config.MQTT_PASSWORD, + 'keepalive': 60, + 'ping_interval': 0, + 'ssl': False, + 'ssl_params': {}, + 'response_time': 10, + 'clean_init': True, + 'clean': True, + 'max_repubs': 4, + 'will': None, + 'subs_cb': lambda *_: None, + 'wifi_coro': wifi, + 'connect_coro': eliza, + 'ssid': None, + 'wifi_pw': None, +} +config_dict['connect_coro'] = conn_han +config_dict['subs_cb'] = callback + +client = MQTTClient(**config_dict) +client.DEBUG = True + + +async def main(client): + await client.connect() + n = 0 + while True: + await asyncio.sleep(5) + + +def test(): + try: + loop.run_until_complete(main(client)) + finally: + client.close() # Prevent LmacRxBlk:1 errors diff --git a/mqtt_as/tls.py b/tests/tls.py similarity index 100% rename from mqtt_as/tls.py rename to tests/tls.py diff --git a/mqtt_as/tls32.py b/tests/tls32.py similarity index 100% rename from mqtt_as/tls32.py rename to tests/tls32.py diff --git a/mqtt_as/tls8266.py b/tests/tls8266.py similarity index 100% rename from mqtt_as/tls8266.py rename to tests/tls8266.py diff --git a/mqtt_as/unclean.py b/tests/unclean.py similarity index 100% rename from mqtt_as/unclean.py rename to tests/unclean.py