From a22da7cc299b5129af56118b72f5f8294a99360e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 18 Jun 2023 17:27:55 +0200 Subject: [PATCH] [InfluxDB] Improve export adapter interfaces, and naming things This patch sets the stage for more easily bringing in different database adapters when _exporting_ data. --- kotori/daq/storage/influx.py | 10 +++- kotori/io/export/database.py | 81 +++++++++++++++++++++++++++++++++ kotori/io/export/influx.py | 73 ----------------------------- kotori/io/protocol/forwarder.py | 2 +- kotori/io/protocol/http.py | 77 +++++++++++++++++++++---------- kotori/io/protocol/influx.py | 75 +----------------------------- kotori/io/protocol/target.py | 2 +- kotori/io/protocol/util.py | 74 +++++++++++++++++++++++++++++- pyproject.toml | 5 ++ test/test_export.py | 34 ++++++++++---- test/util.py | 13 +++--- 11 files changed, 255 insertions(+), 191 deletions(-) create mode 100644 kotori/io/export/database.py delete mode 100644 kotori/io/export/influx.py diff --git a/kotori/daq/storage/influx.py b/kotori/daq/storage/influx.py index 074c4703..2d32bf9f 100644 --- a/kotori/daq/storage/influx.py +++ b/kotori/daq/storage/influx.py @@ -4,6 +4,8 @@ from copy import deepcopy from funcy import project from collections import OrderedDict + +from munch import Munch from twisted.logger import Logger from influxdb.client import InfluxDBClient, InfluxDBClientError @@ -12,7 +14,7 @@ log = Logger() -class InfluxDBAdapter(object): +class InfluxDBAdapter: def __init__(self, settings=None, database=None): @@ -64,6 +66,10 @@ def is_udp_database(self, name): return True return False + def query(self, expression: str, tdata: Munch = None): + log.info(f"Database query: {expression}") + return self.influx_client.query(expression) + def write(self, meta, data): meta_copy = deepcopy(dict(meta)) @@ -122,7 +128,7 @@ def get_tags(data): return project(data, ['gateway', 'node']) -class BusInfluxForwarder(object): +class BusInfluxForwarder: # pragma: nocover """ Generic software bus -> influxdb forwarder based on prototypic implementation at HiveEyes TODO: Generalize and refactor diff --git a/kotori/io/export/database.py b/kotori/io/export/database.py new file mode 100644 index 00000000..dea6ff83 --- /dev/null +++ b/kotori/io/export/database.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# (c) 2016-2023 Andreas Motl +import collections.abc +from twisted.logger import Logger + +log = Logger() + + +class DataFrameQuery: + """ + Query database, reshape result, and return as pandas DataFrame. + """ + + def __init__(self, settings=None, bucket=None): + self.settings = settings + self.bucket = bucket + self.request = bucket.request + + def query(self): + + bucket = self.bucket + + log.info("Creating database adapter") + + # Get a database adapter object. + # TODO: Support multiple databases at the same time. + # TODO: Pool adapter instances, keyed by database. + if "influxdb" in self.settings: + from kotori.daq.storage.influx import InfluxDBAdapter + database = InfluxDBAdapter( + settings=self.settings.influxdb, + database=bucket.tdata.database, + ) + else: + log.warn("No time-series database configured") + return + + # Get query expression from transformation dictionary. + expression = bucket.tdata.expression + log.info('Query expression: {expression}', expression=expression) + + # Run database query. + result = database.query(expression, tdata=bucket.tdata) + + # Bring results into records format. + # [{'time': '2020-03-10T03:29:42Z', 'humidity': 51.8, 'temperature': 25.26}] + records = list(flatten(result)) + + # Stop when having no results. + if not records: + return + + # Bring column names in order, `time` should be the first column. + columns = list(records[0].keys()) + if 'time' in columns and columns.index('time') != 0: + columns.remove('time') + columns.insert(0, 'time') + + # Produce pandas DataFrame from database results. + import pandas + df = pandas.DataFrame(records, columns=columns) + + # Convert `time` column to a pandas datetime object. + df['time'] = pandas.to_datetime(df['time']) + + return df + + +def flatten(l): + """ + Flatten irregular/nested results. + + See also: https://stackoverflow.com/questions/21461140/flatten-an-irregular-list-of-lists-in-python-respecting-pandas-dataframes + """ + import pandas + for el in l: + if isinstance(el, collections.abc.Iterable) and not isinstance(el, (str, pandas.DataFrame, dict)): + for sub in flatten(el): + yield sub + else: + yield el diff --git a/kotori/io/export/influx.py b/kotori/io/export/influx.py deleted file mode 100644 index 12696aa8..00000000 --- a/kotori/io/export/influx.py +++ /dev/null @@ -1,73 +0,0 @@ -# -*- coding: utf-8 -*- -# (c) 2016-2021 Andreas Motl -import collections.abc -from twisted.logger import Logger -from kotori.daq.storage.influx import InfluxDBAdapter - -log = Logger() - - -class DataFrameQuery(object): - """ - Query InfluxDB, massage result and return as DataFrame. - """ - - def __init__(self, settings=None, bucket=None): - self.settings = settings - self.bucket = bucket - self.request = bucket.request - - def query(self): - - bucket = self.bucket - - # Get an InfluxDB adapter object - # TODO: Pool these, keyed by database. - influx = InfluxDBAdapter( - settings=self.settings.influxdb, - database=bucket.tdata.database) - - # Get query expression from transformation dictionary - expression = bucket.tdata.expression - - log.info('InfluxDB expression: {expression}', expression=expression) - - # Run database query - result = influx.influx_client.query(expression) - - # Massage result format - entries = list(flatten(result)) - - # Stop when having no results - if not entries: - return - - # Bring column names in order, "time" should be the first column - columns = list(entries[0].keys()) - if 'time' in columns: - columns.remove('time') - columns.insert(0, 'time') - - # Make pandas DataFrame from database results - import pandas - df = pandas.DataFrame(entries, columns=columns) - - # Convert "time" column to datetime format - df['time'] = pandas.to_datetime(df['time']) - - return df - - -def flatten(l): - """ - Munge InfluxDB results. - - See also: https://stackoverflow.com/questions/21461140/flatten-an-irregular-list-of-lists-in-python-respecting-pandas-dataframes - """ - import pandas - for el in l: - if isinstance(el, collections.abc.Iterable) and not isinstance(el, (str, pandas.DataFrame, dict)): - for sub in flatten(el): - yield sub - else: - yield el diff --git a/kotori/io/protocol/forwarder.py b/kotori/io/protocol/forwarder.py index 1b693ba3..d2e87ae6 100644 --- a/kotori/io/protocol/forwarder.py +++ b/kotori/io/protocol/forwarder.py @@ -58,7 +58,7 @@ def __init__(self, channel=None): def setupService(self): #self.log(log.info, u'Setting up') - log.info(u'Starting {name}'.format(name=self.logname)) + log.info(u'Starting ProtocolForwarderService: {name}'.format(name=self.logname)) self.settings = self.parent.settings diff --git a/kotori/io/protocol/http.py b/kotori/io/protocol/http.py index 62ff15c1..729980f0 100644 --- a/kotori/io/protocol/http.py +++ b/kotori/io/protocol/http.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -# (c) 2016-2021 Andreas Motl +# (c) 2016-2023 Andreas Motl +import dataclasses import re import json import mimetypes @@ -48,61 +49,89 @@ def log(self, request): log.debug(line) +@dataclasses.dataclass +class HttpServerAddress: + """ + Represent a typical host/port pair for configuring IP server listening addresses. + Other than this, provide sensible factory and helper methods. + """ + host: str + port: int + + @classmethod + def from_settings(cls, settings): + return cls(host=settings.kotori.http_listen, port=int(settings.kotori.http_port)) + + @property + def combined(self): + return f"{self.host}:{self.port}" + + @property + def slug(self): + return f"{self.host}-{self.port}" + + class HttpServerService(Service): """ - Singleton instance of a Twisted service wrapping - the Twisted TCP/HTTP server object "Site", in turn - obtaining a ``HttpChannelContainer`` as root resource. + A Twisted service for managing multiple Twisted TCP/HTTP `Site` server objects, + and associating them with corresponding `HttpChannelContainer` root resources. """ - _instance = None + _instances = {} def __init__(self, settings): + log.info(f"Initializing HttpServerService. settings={settings}") - # Propagate global settings + # Propagate global settings. self.settings = settings - # Unique name of this service - self.name = 'http-server-default' + # Extract listen address settings. + self.address = HttpServerAddress.from_settings(self.settings) - # Root resource object representing a channel - # Contains routing machinery + # Assign a unique name to the Twisted service object. + self.name = f'http-server-{self.address.slug}' + + # Assign a root resource object, representing + # a channel containing the routing machinery. self.root = HttpChannelContainer(self.settings) - # Forward route registration method to channel object + # Forward route registration method to channel object. self.registerEndpoint = self.root.registerEndpoint def startService(self): """ - Start TCP listener on designated HTTP port, - serving ``HttpChannelContainer`` as root resource. + Start TCP listener on designated HTTP port, serving a + `HttpChannelContainer` as root resource. """ - # Don't start service twice + # Don't start service twice. if self.running == 1: return self.running = 1 - # Prepare startup - http_listen = self.settings.kotori.http_listen - http_port = int(self.settings.kotori.http_port) - log.info('Starting HTTP service on {http_listen}:{http_port}', http_listen=http_listen, http_port=http_port) + # Prepare startup. + log.info(f"Starting HTTP service on {self.address.combined}") # Configure root Site object and start listening to requests. # This must take place only once - can't bind to the same port multiple times! factory = LocalSite(self.root) - reactor.listenTCP(http_port, factory, interface=http_listen) + reactor.listenTCP(self.address.port, factory, interface=self.address.host) @classmethod def create(cls, settings): """ - Singleton factory + Factory method for creating `HttpServerService` instances. + + It makes sure to create only one instance per listening address, + in order not to bind to the same port multiple times. """ - if not cls._instance: - cls._instance = HttpServerService(settings) - cls._instance.startService() - return cls._instance + key = HttpServerAddress.from_settings(settings).combined + if key not in cls._instances: + instance = HttpServerService(settings) + instance.startService() + cls._instances[key] = instance + return cls._instances[key] class HttpChannelContainer(Resource): diff --git a/kotori/io/protocol/influx.py b/kotori/io/protocol/influx.py index 6be2a830..3aaad40c 100644 --- a/kotori/io/protocol/influx.py +++ b/kotori/io/protocol/influx.py @@ -1,16 +1,12 @@ # -*- coding: utf-8 -*- # (c) 2016-2021 Andreas Motl -import arrow -from arrow.parser import DateTimeParser -from datetime import datetime, timedelta from twisted.logger import Logger -from kotori.io.protocol.util import is_number -from kotori.util.common import tdelta +from kotori.io.protocol.util import compute_daterange, is_number log = Logger() -class QueryTransformer(object): +class QueryTransformer: @classmethod def transform(cls, data): @@ -55,70 +51,3 @@ def transform(cls, data): } return result - - -def get_timedelta(expression): - - # TODO: Use pandas' Timedelta. Timedelta('1m2s') - # http://pandas.pydata.org/pandas-docs/stable/timedeltas.html - - # FIXME: Sanitize expression - code = expression - delta_raw = code.replace('now-', '') - if code != delta_raw: - code = code.replace(delta_raw, 'delta') - - # "code" should now be "now-delta" - #print 'code:', code - - now = datetime.utcnow() - delta = tdelta(delta_raw) - - # FIXME: This is nasty - try: - td = eval(code) - except: - raise ValueError('Unknown expression: {expression}'.format(expression=expression)) - - return td - - -def compute_daterange(raw_begin, raw_end): - - # Defaults - raw_begin = raw_begin or 'now-10d' - raw_end = raw_end or 'now' - - # Parse dates, absolute or relative - time_begin = grok_datetime(raw_begin) - time_end = grok_datetime(raw_end) - - # If end of date range is supplied as date without time ('YYYY-MM-DD' or 'YYYYMMDD'), - # add appropriate offset to mean "end of day" (DWIM). - if 8 <= len(raw_end) <= 10: - offset_endofday = tdelta('23h59m59s') + timedelta(microseconds = 999999) - time_end += offset_endofday - - return time_begin, time_end - - -def grok_datetime(dstring): - more_formats = ['YYYYMMDDTHHmmss', 'YYYYMMDDTHHmmssZ', 'YYYYMMDD'] - parser = DateTimeParser() - parser.SEPARATORS += [''] - - # Try to parse datetime string in regular ISO 8601 format - try: - return parser.parse_iso(dstring) - - # Fall back to parse datetime string in additional convenience formats - except arrow.parser.ParserError as ex: - for format in more_formats: - try: - return parser.parse(dstring, format) - except arrow.parser.ParserError as ex: - pass - - # Fall back to attempt to parse as relative datetime expression, e.g. "now-10m" - return get_timedelta(dstring) - diff --git a/kotori/io/protocol/target.py b/kotori/io/protocol/target.py index f38f982d..56f3abd8 100644 --- a/kotori/io/protocol/target.py +++ b/kotori/io/protocol/target.py @@ -15,7 +15,7 @@ log = Logger() try: - from kotori.io.export.influx import DataFrameQuery + from kotori.io.export.database import DataFrameQuery except ImportError: log.failure('InfluxDB export not available, please install "pandas".', level=LogLevel.warn) diff --git a/kotori/io/protocol/util.py b/kotori/io/protocol/util.py index c16ca336..a851fdb2 100644 --- a/kotori/io/protocol/util.py +++ b/kotori/io/protocol/util.py @@ -1,8 +1,12 @@ # -*- coding: utf-8 -*- -# (c) 2016-2021 Andreas Motl +# (c) 2016-2023 Andreas Motl import math +from datetime import timedelta, datetime + import arrow import datetime + +from arrow.parser import DateTimeParser from six import text_type from dateutil.tz import gettz from dateutil.parser import parse @@ -13,6 +17,8 @@ from twisted.python.failure import Failure from twisted.web.error import Error +from kotori.util.common import tdelta + log = Logger() @@ -215,3 +221,69 @@ def parse_timestamp(timestamp): timestamp = parse(timestamp, tzinfos=tzinfos) return timestamp + + +def compute_daterange(raw_begin, raw_end): + + # Defaults + raw_begin = raw_begin or 'now-10d' + raw_end = raw_end or 'now' + + # Parse dates, absolute or relative + time_begin = grok_datetime(raw_begin) + time_end = grok_datetime(raw_end) + + # If end of date range is supplied as date without time ('YYYY-MM-DD' or 'YYYYMMDD'), + # add appropriate offset to mean "end of day" (DWIM). + if 8 <= len(raw_end) <= 10: + offset_endofday = tdelta('23h59m59s') + timedelta(microseconds = 999999) + time_end += offset_endofday + + return time_begin, time_end + + +def grok_datetime(dstring): + more_formats = ['YYYYMMDDTHHmmss', 'YYYYMMDDTHHmmssZ', 'YYYYMMDD'] + parser = DateTimeParser() + parser.SEPARATORS += [''] + + # Try to parse datetime string in regular ISO 8601 format + try: + return parser.parse_iso(dstring) + + # Fall back to parse datetime string in additional convenience formats + except arrow.parser.ParserError as ex: + for format in more_formats: + try: + return parser.parse(dstring, format) + except arrow.parser.ParserError as ex: + pass + + # Fall back to attempt to parse as relative datetime expression, e.g. "now-10m" + return get_timedelta(dstring) + + +def get_timedelta(expression): + + # TODO: Use pandas' Timedelta. Timedelta('1m2s') + # http://pandas.pydata.org/pandas-docs/stable/timedeltas.html + + # FIXME: Sanitize expression + code = expression + delta_raw = code.replace('now-', '') + if code != delta_raw: + code = code.replace(delta_raw, 'delta') + + # "code" should now be "now-delta" + #print 'code:', code + + now = datetime.utcnow() + delta = tdelta(delta_raw) + + # FIXME: This is nasty + try: + td = eval(code) + except: + raise ValueError('Unknown expression: {expression}'.format(expression=expression)) + + return td diff --git a/pyproject.toml b/pyproject.toml index 9c250b0c..f61155d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,12 @@ source = ["kotori"] fail_under = 0 show_missing = true omit = [ + "kotori/daq/intercom/c.py", + "kotori/daq/intercom/pyclibrary_ext/*", + "kotori/daq/intercom/udp.py", + "kotori/daq/intercom/wamp.py", "kotori/firmware/*", + "kotori/frontend/*", "kotori/vendor/hydro2motion/*", "kotori/vendor/ilaundry/*", "kotori/vendor/luftdaten/*", diff --git a/test/test_export.py b/test/test_export.py index 4de27c1d..519b632e 100644 --- a/test/test_export.py +++ b/test/test_export.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -# (c) 2020-2022 Andreas Motl +# (c) 2020-2023 Andreas Motl +import functools import json import logging @@ -25,19 +26,32 @@ @pytest_twisted.inlineCallbacks @pytest.mark.http @pytest.mark.export -def test_export_general(machinery, create_influxdb, reset_influxdb): +@pytest.mark.influxdb +def test_export_influxdb_general(machinery, create_influxdb, reset_influxdb): """ Submit single reading in JSON format to HTTP API and proof it can be retrieved back from the HTTP API in different formats. + + This uses InfluxDB as timeseries database. """ + channel_path = settings.channel_path_data + http_submit = functools.partial(http_json_sensor, port=24642) + http_fetch = functools.partial(http_get_data, port=24642) + + yield verify_export_general(channel_path, http_submit, http_fetch) + + +@pytest_twisted.inlineCallbacks +def verify_export_general(channel_path, http_submit, http_fetch): + # Submit a single measurement, with timestamp. data = { 'time': 1583810982, 'temperature': 25.26, 'humidity': 51.8, } - yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data) + yield threads.deferToThread(http_submit, channel_path, data) # Wait for some time to process the message. yield sleep(PROCESS_DELAY_MQTT) @@ -46,31 +60,31 @@ def test_export_general(machinery, create_influxdb, reset_influxdb): df_should = pd.DataFrame([[pd.to_datetime("2020-03-10T03:29:42.000000Z"), 51.8, 25.26]], columns=["time", "humidity", "temperature"]) # CSV format. - deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format='csv', ts_from=ts_from, ts_to=ts_to) + deferred = threads.deferToThread(http_fetch, channel_path, format='csv', ts_from=ts_from, ts_to=ts_to) yield deferred result = pd.read_csv(io.StringIO(deferred.result), parse_dates=["time"]) assert_frame_equal(result, df_should, check_names=False, check_like=True, check_datetimelike_compat=True) # TXT format (same as CSV). - deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format='txt', ts_from=ts_from, ts_to=ts_to) + deferred = threads.deferToThread(http_fetch, channel_path, format='txt', ts_from=ts_from, ts_to=ts_to) yield deferred result = pd.read_csv(io.StringIO(deferred.result), parse_dates=["time"]) assert_frame_equal(result, df_should, check_names=False, check_like=True, check_datetimelike_compat=True) # JSON format. - deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format='json', ts_from=ts_from, ts_to=ts_to) + deferred = threads.deferToThread(http_fetch, channel_path, format='json', ts_from=ts_from, ts_to=ts_to) yield deferred result = json.loads(deferred.result) should = [{"time": "2020-03-10T03:29:42.000Z", "humidity": 51.8, "temperature": 25.26}] assert_equal(result, should) # XLSX format. - deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format='xlsx', ts_from=ts_from, ts_to=ts_to) + deferred = threads.deferToThread(http_fetch, channel_path, format='xlsx', ts_from=ts_from, ts_to=ts_to) yield deferred assert deferred.result.startswith(b'PK\x03\x04\x14\x00\x00\x00\x08\x00') # HTML format. - deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format='html', ts_from=ts_from, ts_to=ts_to) + deferred = threads.deferToThread(http_fetch, channel_path, format='html', ts_from=ts_from, ts_to=ts_to) yield deferred assert \ '' in deferred.result and \ @@ -80,12 +94,12 @@ def test_export_general(machinery, create_influxdb, reset_influxdb): '51.8' in deferred.result # NetCDF format. - deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format='nc', ts_from=ts_from, ts_to=ts_to) + deferred = threads.deferToThread(http_fetch, channel_path, format='nc', ts_from=ts_from, ts_to=ts_to) yield deferred assert deferred.result.startswith(b'\x89HDF\r\n\x1a\n\x02\x08\x08\x00\x00\x00') # Datatables HTML. - deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format='dt', ts_from=ts_from, ts_to=ts_to) + deferred = threads.deferToThread(http_fetch, channel_path, format='dt', ts_from=ts_from, ts_to=ts_to) yield deferred assert b"cdn.datatables.net" in deferred.result diff --git a/test/util.py b/test/util.py index ebeaef47..83f75e9c 100644 --- a/test/util.py +++ b/test/util.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# (c) 2020-2021 Andreas Motl +# (c) 2020-2023 Andreas Motl import os import json import logging @@ -234,8 +234,9 @@ def http_raw(topic, headers=None, json=None, data=None): return requests.post(uri, headers=headers, json=json, data=data) -def http_json_sensor(topic, data): - uri = 'http://localhost:24642/api{}'.format(topic) +def http_json_sensor(path: str, data, port=24642): + path = path.lstrip("/") + uri = f'http://localhost:{port}/api/{path}' logger.info('HTTP: Submitting reading to {} using JSON'.format(uri)) return requests.post(uri, json=data) @@ -255,9 +256,9 @@ def http_csv_sensor(topic, data): return requests.post(uri, data=body, headers={'Content-Type': 'text/csv'}) -def http_get_data(topic=None, format='csv', ts_from=None, ts_to=None): - uri = 'http://localhost:24642/api{topic}.{format}?from={ts_from}&to={ts_to}'.format( - topic=topic, format=format, ts_from=ts_from, ts_to=ts_to) +def http_get_data(path: str = None, format='csv', ts_from=None, ts_to=None, port=24642): + path = path.lstrip("/") + uri = f'http://localhost:{port}/api/{path}.{format}?from={ts_from}&to={ts_to}' logger.info('HTTP: Exporting data from {} using format "{}"'.format(uri, format)) payload = requests.get(uri).content if format in ["csv", "txt", "json", "html"]: