Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [TTN] Add experimental TTS/TTN HTTP Webhook forwarder #132

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ in progress
mean iterator type: *query.stringInterruptIterator`` or ``InfluxDB Error:
not executed``.
- Add TTS (The Things Stack) / TTN (The Things Network) decoder
- Add experimental TTS/TTN HTTP Webhook forwarder
``/api/mqttkit-1/ttn/itest-foo-bar/uplinks``


.. _kotori-0.27.0:
Expand Down
82 changes: 82 additions & 0 deletions etc/examples/forwarders/http-api-tts-ttn.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
; ------------------------------------------
; Family: Protocol forwarder
; About: Versatile protocol forwarder components for bridging the gap between
; different data sinks, bus systems and serialization formats.
; ------------------------------------------
; Name: TTS/TTN-to-MQTT protocol forwarder
; About: Suitable for data acquisition via TTS/TTN Webhook HTTP POST requests.
; Channel: Transport: HTTP, MQTT; Format: JSON
; See also: https://getkotori.org/docs/handbook/forwarders/tts-ttn.html
; https://getkotori.org/docs/handbook/acquisition/protocol/http.html
; https://getkotori.org/docs/handbook/decoders/
; ------------------------------------------
; Description:
;
; - Listen to HTTP POST or PUT requests
; - Receive payloads formatted as JSON or in urlencoded format
; - Decode payloads using specific decoders
; - Forward payloads to the MQTT bus
;
; Manual: Please specify forwarding source and target parameters in URI format.
; The MQTT topic path is derived from the HTTP URI path by interpolating
; the appropriate part of the context URI.
;
; Example: In the example below, given the "address" part of the resource URI
; is "testdrive/area-42/node-1", data sent to the full URIs
;
; /api/mqttkit-1/ttn/{devID}
; /api/mqttkit-1/ttn/{devID}/uplinks
; /api/mqttkit-1/ttn/{devID}/join-accept
; /api/mqttkit-1/ttn/{devID}/downlink-ack
;
; with, e.g.::
;
; devID=mqttkit-1/testdrive/area-42/node-1
;
; will be republished to the MQTT topics
;
; mqttkit-1/testdrive/area-42/node-1/data.json
; mqttkit-1/testdrive/area-42/node-1/events.json
;
; ------------------------------------------


[mqttkit-1.http-api-tts-ttn.composite]
enable = false
type = application
realm = mqttkit-1
mqtt_topics = mqttkit-1/#
app_factory = kotori.daq.application.composite:boot
services = kotori.daq.services.mig:MqttInfluxGrafanaService
graphing = kotori.daq.graphing.grafana:GrafanaManager
strategy = kotori.daq.strategy.tts_ttn:TheThingsWanBusStrategy



[mqttkit-1.http-api-tts-ttn]
enable = true

type = application
application = kotori.io.protocol.forwarder:boot
#app_factory = kotori.daq.application.composite:boot

realm = mqttkit-1
source = http:/api/{realm:mqttkit-1}/ttn/{device_id:.*}/{slot:(uplinks|join-accept|downlink-ack)} [POST]
target = mqtt:/{realm}/{address}/ttn/{slot}.json
Comment on lines +64 to +65
Copy link
Member Author

@amotl amotl May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding our discussion over at TTS-/TTN-Daten an Kotori weiterleiten ff., whether to use a prefix like /apittn, or a suffix like /data-ttn, for signalling that it's a request from TTN, it looks like I've proposed a third variant here, using an infix like /ttn after the <realm> component. Interesting!

/cc @thiasB

Copy link
Member Author

@amotl amotl May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I don't fancy the prefix variant too much is that the addressing slot at the very front of the URL is usually managed by infrastructure outside of Kotori. For example, we are using Nginx to dispatch exactly two prefix routes to Kotori, that is /api, and /api-notls. In order to keep that interface concise and lean, I think we should not use this particular addressing slot for other purposes than the two main entrypoints it is currently used for.

Thinking more about the topic in general, I am leaning towards the idea that signalling special decoding needs should actually be pushed to the end of the URL instead, so maybe we will finally introduce query parameters for that purpose, instead of trying to squeeze those metadata into the URL path itself?

In this case, we would not need any special forwarding component with an accompanying pattern-based forwarding rule configuration like outlined above, but would just signal additional information to the (passive) decoder component instead, which would be able to take two routes of dispatching, based on this information:

  1. Apply decoding based on the "device_id in URL" variant.
  2. Apply decoding based on the "device_id in Payload" variant.

On the other hand, it may be better to let the operator decide, and configure TTN capabilities on an existing channel group by adding a special flag to the configuration snippet. I am not yet 100% sure about that detail, this is why I am bouncing on an off about it. Personally, in order to give more power to the users, I am leaning towards the hassle-free "works out of the box" variant, which does not need any special configuration by operators at all.
I am not sure if we can achieve this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've diverted the discussion to GH-133. Thanks for listening.

transform = kotori.daq.strategy.tts_ttn:TheThingsWanBusStrategy.topic_to_topology


; ------------------------------------------------
; Notes about "source" and "target" parameters
; ------------------------------------------------

; Note that the "netloc" part (i.e. for specifying hostname/port)
; in directives "source" and "target" are omitted from these uris.
;
; Kotori will only listen to the default HTTP port and forward
; payloads to the default MQTT broker. Both are specified in the
; main configuration file, usually "/etc/kotori/kotori.ini".
;
; However, this might change in the future to enable spinning
; up HTTP listeners on arbitrary ports at runtime and to allow
; publishing messages to different MQTT brokers.
3 changes: 2 additions & 1 deletion etc/test/main.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
include =
etc/examples/mqttkit.ini,
etc/examples/forwarders/http-api-generic.ini,
etc/examples/forwarders/http-api-export.ini
etc/examples/forwarders/http-api-export.ini,
etc/examples/forwarders/http-api-tts-ttn.ini


; ==========================================
Expand Down
1 change: 1 addition & 0 deletions kotori/daq/decoder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def probe(self, payload: str = None):
return True

# TTS/TTN: The Things Stack / The Things Network
print("self.topology:", self.topology)
if self.topology.slot.endswith('data.json') \
and payload is not None \
and "uplink_message" in payload \
Expand Down
1 change: 1 addition & 0 deletions kotori/daq/services/mig.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def process_message(self, topic, payload, **kwargs):
def decode_message(self, topic, payload):

# Compute topology information from channel topic.
print("========== decode_message >>> topic_to_topology:", topic)
topology = self.strategy.topic_to_topology(topic)
log.debug(u'Topology address: {topology}', topology=dict(topology))

Expand Down
70 changes: 70 additions & 0 deletions kotori/daq/strategy/tts_ttn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
# (c) 2022 Andreas Motl, <[email protected]>
import re

from munch import Munch

from kotori.daq.strategy.wan import WanBusStrategy
from kotori.util.common import SmartBunch


class TheThingsWanBusStrategy(WanBusStrategy):

# Regular expression pattern for decoding MQTT topic address segments.
#pattern = r'^(?P<realm>.+?)/ttn/(?P<device_id>.+?)(?:/(?P<slot>.+?))?$'
#matcher = re.compile(pattern)

@classmethod
def topic_to_topology(cls, topic):
"""
Decode MQTT topic segments implementing the »quadruple hierarchy strategy«.

The topology hierarchy is directly specified by the MQTT topic and is
made up of a minimum of four identifiers describing the core structure::

realm / network / gateway / node

The topology identifiers are specified as:

- "realm" is the designated root realm. You should prefix the topic name
with this label when opting in for all features of the telemetry platform.
For other purposes, feel free to publish to any MQTT topic you like.

- "network" is your personal realm. Choose anything you like or use an
`Online GUID Generator <https://www.guidgenerator.com/>`_ to gain
maximum uniqueness.

- "gateway" is your gateway identifier. Choose anything you like.
This does not have to be very unique, so you might use labels
having the names of sites. While you are the owner of this
namespace hierarchy, remember these labels might be visible on
the collaborative ether, though. You might want to assign nicknames
to your sites to not identify their location.

- "node" is the node identifier. Choose anything you like. This usually
gets transmitted from an embedded device node.
"""

print("########## TOPIC:", topic)

# Munch({'realm': 'mqttkit-1', 'device_id': 'itest-foo-bar', 'slot': 'uplinks'})
assert isinstance(topic, Munch)
assert topic.realm
assert topic.device_id
assert topic.slot

# {'realm': 'mqttkit-1', 'network': 'itest', 'gateway': 'foo', 'node': 'bar', 'slot': 'data.json'}
address = SmartBunch(
realm=topic.realm,

)


# Decode the topic.
m = cls.matcher.match(topic)
if m:
address = SmartBunch(m.groupdict())
else:
address = {}

return address
2 changes: 2 additions & 0 deletions kotori/daq/strategy/wan.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def topic_to_topology(self, topic):
gets transmitted from an embedded device node.
"""

print("########## TOPIC:", topic)

# Decode the topic.
m = self.matcher.match(topic)
if m:
Expand Down
3 changes: 3 additions & 0 deletions kotori/io/protocol/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,22 @@ def forward(self, bucket):
Receive data bucket from source, run through
transformation machinery and emit to target.
"""
print("bucket:", bucket)

# 1. Map/transform topology address information
if 'transform' in self.channel:
for entrypoint in read_list(self.channel.transform):
try:
transformer = KotoriBootloader.load_entrypoint(entrypoint)
print("transformer:", transformer)
bucket.tdata.update(transformer(bucket.tdata))
except ImportError as ex:
log.error('ImportError "{message}" when loading entrypoint "{entrypoint}"',
entrypoint=entrypoint, message=ex)

# MQTT doesn't prefer leading forward slashes with topic names, let's get rid of them
target_uri_tpl = self.target_uri.path.lstrip('/')
print("target_uri_tpl:", target_uri_tpl)

# Compute target bus topic from url matches
target_uri = target_uri_tpl.format(**bucket.tdata)
Expand Down
1 change: 1 addition & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class TestSettings:
channel2_path_data = '/mqttkit-1/itest/foo/bar2/data'
channel_path_event = '/mqttkit-1/itest/foo/bar/event'
channel_path_airrohr = '/mqttkit-1/itest/foo/bar/custom/airrohr'
channel_path_ttn = '/mqttkit-1/ttn'


settings = TestSettings
Expand Down
32 changes: 29 additions & 3 deletions test/test_tts_ttn.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ def make_testcases():
@pytest.mark.tts
@pytest.mark.ttn
@pytest.mark.parametrize("testcase", make_testcases())
def test_tts_ttn_http_json_full(
testcase, machinery_basic, create_influxdb, reset_influxdb
):
def test_tts_ttn_http_json_decoder(testcase, machinery_basic, create_influxdb, reset_influxdb):
"""
Submit single reading in TTS/TTN webhook JSON format to HTTP API,
and verify it was correctly stored in the InfluxDB database.
Expand All @@ -95,3 +93,31 @@ def test_tts_ttn_http_json_full(
# Verify the records looks like expected.
assert record == data_out
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.tts
@pytest.mark.ttn
@pytest.mark.amo
def test_tts_ttn_http_json_forwarder(machinery, create_influxdb, reset_influxdb):
"""
Accept all requests to the `/api/ttn` URL suffix in TTS/TTN webhook JSON format
and proof it is stored in the InfluxDB database.
"""

from test.settings.mqttkit import settings as mqttkit_settings

# Submit a single measurement, without timestamp.
baseurl = mqttkit_settings.channel_path_ttn
device_id = "itest-foo-bar"
yield threads.deferToThread(http_json_sensor, f"{baseurl}/{device_id}/uplinks", data_in)
Copy link
Member Author

@amotl amotl May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With, for example, baseurl=/mqttkit-1/ttn, and device_id=itest-foo-bar, a full URL would boil down to:

https://daq.example.org/api/mqttkit-1/ttn/itest-foo-bar/uplinks

It does not look too bad from the outside, right? In this case, we omit the device's <realm> address component from its device_id identifier, to make it a bit shorter, and to better reflect the fact that it is only those address components the users may change on their own behalves.

In TTN parlance, using {/devID} to interpolate the device_id into the URL, that would mean to use a configuration setting like:

https://daq.example.org/api/mqttkit-1/ttn{/devID}/uplinks

realm  .....................^^
special identifier  ..................^^
device id (interpolated)  [1] ............^^
another special identifier [2] ...................^^

However, I still will need to explore if it can be implemented like that. Apparently, I got blocked by something the last time I was working on it, and then lost track of it.

[1] As we discussed already, there needs to be code which expands that device_id into a full topology path, like:

   mqttkit-1/ttn/itest-foo-bar
-> mqttkit-1/itest/foo/bar

I think the channel configuration snippet outlined above at #132 (review) tries to implement that transformation.

[2] I don't know yet, if this second special identifier will be needed for the final implementation. I don't favor it too much, but maybe it makes sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've diverted the discussion to GH-133. Thanks for listening.


# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)
yield sleep(PROCESS_DELAY_MQTT)

# Proof that data arrived in InfluxDB properly.
record = influx_sensors.get_first_record()
assert record == data_out
yield record