Skip to content

Commit 9775de9

Browse files
committed
initial VERY rough support for observable devices
1 parent d7074a5 commit 9775de9

File tree

8 files changed

+114
-20
lines changed

8 files changed

+114
-20
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
__pycache__
1+
__pycache__/
22
*.pyc
33
.*.sw?
44
*.egg-info/
55
.venv/
66
.vscode/
7+
.pytest_cache
78
# output by example script
89
state.json

poetry.lock

+27-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+2
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ marshmallow_enum = "^1.5.1"
1414
PyOpenSSL = "^19.1.0"
1515
paho-mqtt = "^1.5.0"
1616
attrdict = "^2.0.1"
17+
deepmerge = "^0.1.0"
1718

1819
[tool.poetry.dev-dependencies]
1920
pytest = "^5.2"
2021
pylint = "^2.4.4"
2122
black = "^19.10b0"
2223
flake8 = "^3.8.1"
24+
rope = "^0.17.0"
2325

2426
[build-system]
2527
requires = ["poetry>=0.12"]

thinq2/controller/device.py

+18
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from deepmerge import Merger
2+
13
from thinq2.schema import controller
24
from thinq2.util import memoize
35
from thinq2.client.thinq import ThinQClient
@@ -9,6 +11,22 @@
911
class ThinQDevice:
1012
def __init__(self, auth):
1113
self._auth = auth
14+
self._on_update = None
15+
16+
def update(self, state):
17+
schema = self.snapshot.Schema()
18+
snapshot = schema.dump(self.snapshot)
19+
update = self._merger.merge(snapshot, state)
20+
self.snapshot = schema.load(update)
21+
if self._on_update:
22+
self._on_update(self)
23+
24+
def on_update(self, func):
25+
self._on_update = func
26+
27+
@property
28+
def _merger(self):
29+
return Merger([(dict, ["merge"])], ["override"], ["override"])
1230

1331
@property
1432
def state(self):

thinq2/controller/mqtt.py

+19-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from OpenSSL.SSL import FILETYPE_PEM
77
from paho.mqtt.client import Client
88

9-
from thinq2.model.config import MQTTConfiguration
9+
from thinq2.model.mqtt import MQTTConfiguration, MQTTMessage
1010
from thinq2.schema import controller, initializer
1111
from thinq2.client.thinq import ThinQClient
1212
from thinq2.client.common import CommonClient
@@ -22,22 +22,37 @@ def __init__(self, auth):
2222
self._auth = auth
2323

2424
def connect(self):
25-
endpoint = urlparse(self.route.mqtt_server)
26-
self.client.connect(endpoint.hostname, endpoint.port)
25+
if not self.client.is_connected():
26+
endpoint = urlparse(self.route.mqtt_server)
27+
self.client.connect(endpoint.hostname, endpoint.port)
2728

2829
def loop_start(self):
30+
self.connect()
2931
self.client.loop_start()
3032

3133
def loop_forever(self):
34+
self.connect()
3235
self.client.loop_forever()
3336

37+
def on_message(self, client, userdata, msg):
38+
self._on_message(client, userdata, msg)
39+
3440
def on_connect(self, client, userdata, flags, rc):
3541
for topic in self.registration.subscriptions:
3642
client.subscribe(topic, 1)
3743

38-
def on_message(self, client, userdata, msg):
44+
def on_device_message(self, message):
3945
pass
4046

47+
def _on_message(self, client, userdata, msg):
48+
# XXX - nastiness
49+
message = None
50+
try:
51+
message = MQTTMessage.Schema().loads(msg.payload)
52+
except Exception as e:
53+
print("Can't parse MQTT message:", e)
54+
self.on_device_message(message)
55+
4156
@property
4257
@memoize
4358
def client(self):

thinq2/controller/thinq.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,37 @@
1+
import gc
2+
13
from thinq2.schema import controller
24
from thinq2.util import memoize
35
from thinq2.client.thinq import ThinQClient
46
from thinq2.controller.mqtt import ThinQMQTT
57
from thinq2.controller.auth import ThinQAuth
68
from thinq2.controller.device import ThinQDevice
79
from thinq2.model.config import ThinQConfiguration
10+
from thinq2.model.mqtt import MQTTMessage
811

912

1013
@controller(ThinQConfiguration)
1114
class ThinQ:
15+
16+
_devices = []
17+
1218
def get_device(self, device_id):
13-
return ThinQDevice(self.thinq_client.get_device(device_id), auth=self.auth)
19+
device = ThinQDevice(self.thinq_client.get_device(device_id), auth=self.auth)
20+
self._devices.append(device)
21+
return device
22+
23+
# XXX - temporary?
24+
def start(self):
25+
self.mqtt.on_device_message = self._notify_device
26+
self.mqtt.loop_forever()
27+
28+
def _notify_device(self, message: MQTTMessage):
29+
# XXX - ugly temporary PoC
30+
for device in self._devices:
31+
if len(gc.get_referrers(device)) <= 1:
32+
self._devices.remove(device)
33+
elif device.device_id == message.device_id:
34+
device.update(message.data.state.reported)
1435

1536
@property
1637
@memoize

thinq2/model/config.py

+1-11
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
from marshmallow_dataclass import dataclass
22

33
from thinq2.model.auth import ThinQSession
4-
from thinq2.model.common import Route
5-
from thinq2.model.thinq import IOTRegistration
6-
7-
8-
@dataclass
9-
class MQTTConfiguration:
10-
route: Route
11-
registration: IOTRegistration
12-
ca_cert: str
13-
private_key: str
14-
csr: str
4+
from thinq2.model.mqtt import MQTTConfiguration
155

166

177
@dataclass

thinq2/model/mqtt.py

+23-2
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,36 @@
1+
from datetime import datetime
2+
from dataclasses import field
3+
14
from marshmallow_dataclass import dataclass
25

3-
from thinq2.model.auth import ThinQSession
46
from thinq2.model.common import Route
57
from thinq2.model.thinq import IOTRegistration
8+
from thinq2.schema import CamelCaseSchema
69

710

811
@dataclass
912
class MQTTConfiguration:
10-
auth: ThinQSession
1113
route: Route
1214
registration: IOTRegistration
1315
ca_cert: str
1416
private_key: str
1517
csr: str
18+
19+
20+
@dataclass
21+
class MQTTMessageDeviceState:
22+
desired: dict = field(default_factory=dict)
23+
reported: dict = field(default_factory=dict)
24+
25+
26+
@dataclass(base_schema=CamelCaseSchema)
27+
class MQTTMessageDeviceData:
28+
state: MQTTMessageDeviceState
29+
30+
31+
@dataclass(base_schema=CamelCaseSchema)
32+
class MQTTMessage:
33+
device_id: str
34+
message_type: str = field(metadata=dict(data_key="type"))
35+
data: MQTTMessageDeviceData
36+
timestamp: datetime = None

0 commit comments

Comments
 (0)