-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[InfluxDB] Improve export adapter interfaces, and naming things
This patch sets the stage for more easily bringing in different database adapters when _exporting_ data.
- Loading branch information
Showing
11 changed files
with
255 additions
and
191 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# -*- coding: utf-8 -*- | ||
# (c) 2016-2023 Andreas Motl <[email protected]> | ||
import collections.abc | ||
from twisted.logger import Logger | ||
|
||
log = Logger() | ||
|
||
|
||
class DataFrameQuery: | ||
""" | ||
Query database, reshape result, and return as pandas DataFrame. | ||
""" | ||
|
||
def __init__(self, settings=None, bucket=None): | ||
self.settings = settings | ||
self.bucket = bucket | ||
self.request = bucket.request | ||
|
||
def query(self): | ||
|
||
bucket = self.bucket | ||
|
||
log.info("Creating database adapter") | ||
|
||
# Get a database adapter object. | ||
# TODO: Support multiple databases at the same time. | ||
# TODO: Pool adapter instances, keyed by database. | ||
if "influxdb" in self.settings: | ||
from kotori.daq.storage.influx import InfluxDBAdapter | ||
database = InfluxDBAdapter( | ||
settings=self.settings.influxdb, | ||
database=bucket.tdata.database, | ||
) | ||
else: | ||
log.warn("No time-series database configured") | ||
return | ||
|
||
# Get query expression from transformation dictionary. | ||
expression = bucket.tdata.expression | ||
log.info('Query expression: {expression}', expression=expression) | ||
|
||
# Run database query. | ||
result = database.query(expression, tdata=bucket.tdata) | ||
|
||
# Bring results into records format. | ||
# [{'time': '2020-03-10T03:29:42Z', 'humidity': 51.8, 'temperature': 25.26}] | ||
records = list(flatten(result)) | ||
|
||
# Stop when having no results. | ||
if not records: | ||
return | ||
|
||
# Bring column names in order, `time` should be the first column. | ||
columns = list(records[0].keys()) | ||
if 'time' in columns and columns.index('time') != 0: | ||
columns.remove('time') | ||
columns.insert(0, 'time') | ||
|
||
# Produce pandas DataFrame from database results. | ||
import pandas | ||
df = pandas.DataFrame(records, columns=columns) | ||
|
||
# Convert `time` column to a pandas datetime object. | ||
df['time'] = pandas.to_datetime(df['time']) | ||
|
||
return df | ||
|
||
|
||
def flatten(l): | ||
""" | ||
Flatten irregular/nested results. | ||
See also: https://stackoverflow.com/questions/21461140/flatten-an-irregular-list-of-lists-in-python-respecting-pandas-dataframes | ||
""" | ||
import pandas | ||
for el in l: | ||
if isinstance(el, collections.abc.Iterable) and not isinstance(el, (str, pandas.DataFrame, dict)): | ||
for sub in flatten(el): | ||
yield sub | ||
else: | ||
yield el |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
# -*- coding: utf-8 -*- | ||
# (c) 2016-2021 Andreas Motl <[email protected]> | ||
# (c) 2016-2023 Andreas Motl <[email protected]> | ||
import dataclasses | ||
import re | ||
import json | ||
import mimetypes | ||
|
@@ -48,61 +49,89 @@ def log(self, request): | |
log.debug(line) | ||
|
||
|
||
@dataclasses.dataclass | ||
class HttpServerAddress: | ||
""" | ||
Represent a typical host/port pair for configuring IP server listening addresses. | ||
Other than this, provide sensible factory and helper methods. | ||
""" | ||
host: str | ||
port: int | ||
|
||
@classmethod | ||
def from_settings(cls, settings): | ||
return cls(host=settings.kotori.http_listen, port=int(settings.kotori.http_port)) | ||
|
||
@property | ||
def combined(self): | ||
return f"{self.host}:{self.port}" | ||
|
||
@property | ||
def slug(self): | ||
return f"{self.host}-{self.port}" | ||
|
||
|
||
class HttpServerService(Service): | ||
""" | ||
Singleton instance of a Twisted service wrapping | ||
the Twisted TCP/HTTP server object "Site", in turn | ||
obtaining a ``HttpChannelContainer`` as root resource. | ||
A Twisted service for managing multiple Twisted TCP/HTTP `Site` server objects, | ||
and associating them with corresponding `HttpChannelContainer` root resources. | ||
""" | ||
|
||
_instance = None | ||
_instances = {} | ||
|
||
def __init__(self, settings): | ||
log.info(f"Initializing HttpServerService. settings={settings}") | ||
|
||
# Propagate global settings | ||
# Propagate global settings. | ||
self.settings = settings | ||
|
||
# Unique name of this service | ||
self.name = 'http-server-default' | ||
# Extract listen address settings. | ||
self.address = HttpServerAddress.from_settings(self.settings) | ||
|
||
# Root resource object representing a channel | ||
# Contains routing machinery | ||
# Assign a unique name to the Twisted service object. | ||
self.name = f'http-server-{self.address.slug}' | ||
|
||
# Assign a root resource object, representing | ||
# a channel containing the routing machinery. | ||
self.root = HttpChannelContainer(self.settings) | ||
|
||
# Forward route registration method to channel object | ||
# Forward route registration method to channel object. | ||
self.registerEndpoint = self.root.registerEndpoint | ||
|
||
def startService(self): | ||
""" | ||
Start TCP listener on designated HTTP port, | ||
serving ``HttpChannelContainer`` as root resource. | ||
Start TCP listener on designated HTTP port, serving a | ||
`HttpChannelContainer` as root resource. | ||
""" | ||
|
||
# Don't start service twice | ||
# Don't start service twice. | ||
if self.running == 1: | ||
return | ||
|
||
self.running = 1 | ||
|
||
# Prepare startup | ||
http_listen = self.settings.kotori.http_listen | ||
http_port = int(self.settings.kotori.http_port) | ||
log.info('Starting HTTP service on {http_listen}:{http_port}', http_listen=http_listen, http_port=http_port) | ||
# Prepare startup. | ||
log.info(f"Starting HTTP service on {self.address.combined}") | ||
|
||
# Configure root Site object and start listening to requests. | ||
# This must take place only once - can't bind to the same port multiple times! | ||
factory = LocalSite(self.root) | ||
reactor.listenTCP(http_port, factory, interface=http_listen) | ||
reactor.listenTCP(self.address.port, factory, interface=self.address.host) | ||
|
||
@classmethod | ||
def create(cls, settings): | ||
""" | ||
Singleton factory | ||
Factory method for creating `HttpServerService` instances. | ||
It makes sure to create only one instance per listening address, | ||
in order not to bind to the same port multiple times. | ||
""" | ||
if not cls._instance: | ||
cls._instance = HttpServerService(settings) | ||
cls._instance.startService() | ||
return cls._instance | ||
key = HttpServerAddress.from_settings(settings).combined | ||
if key not in cls._instances: | ||
instance = HttpServerService(settings) | ||
instance.startService() | ||
cls._instances[key] = instance | ||
return cls._instances[key] | ||
|
||
|
||
class HttpChannelContainer(Resource): | ||
|
Oops, something went wrong.