Skip to content

Commit 4311e2d

Browse files
CognitePusher metrics upload fix (#495)
* fix CognitePusher to handle metrics using upload queue * add/modify testcases * add cleanup for registry * add helper assertion methods and remove check for upload_queue attribute * fix review comments * use uuids for test_id
1 parent 0da864e commit 4311e2d

3 files changed

Lines changed: 410 additions & 63 deletions

File tree

cognite/extractorutils/metrics.py

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,12 @@ def __init__(self):
5353
from prometheus_client.exposition import basic_auth_handler, delete_from_gateway, pushadd_to_gateway
5454

5555
from cognite.client import CogniteClient
56-
from cognite.client.data_classes import Asset, Datapoints, DatapointsArray, TimeSeries
57-
from cognite.client.data_classes.data_modeling import NodeId
56+
from cognite.client.data_classes import Asset, TimeSeries
5857
from cognite.client.exceptions import CogniteDuplicatedError
5958
from cognite.extractorutils.threading import CancellationToken
59+
from cognite.extractorutils.uploader.time_series import DataPointList, TimeSeriesUploadQueue
6060
from cognite.extractorutils.util import EitherId
6161

62-
from .util import ensure_time_series
63-
6462
_metrics_singularities = {}
6563

6664

@@ -359,70 +357,104 @@ def __init__(
359357
self.asset = asset
360358
self.external_id_prefix = external_id_prefix
361359
self.data_set = data_set
360+
self._asset_id: int | None = None
361+
self._data_set_id: int | None = None
362362

363363
self._init_cdf()
364364

365+
self.upload_queue = TimeSeriesUploadQueue(
366+
cdf_client=cdf_client,
367+
create_missing=self._create_missing_timeseries_factory,
368+
data_set_id=self._data_set_id,
369+
cancellation_token=cancellation_token,
370+
)
371+
365372
self._cdf_project = cdf_client.config.project
366373

367374
def _init_cdf(self) -> None:
368375
"""
369-
Initialize the CDF tenant with the necessary time series and asset.
370-
"""
371-
time_series: list[TimeSeries] = []
376+
Initialize the CDF tenant with the necessary asset and dataset.
372377
378+
Timeseries are created automatically by TimeSeriesUploadQueue when datapoints are pushed.
379+
"""
373380
if self.asset is not None:
374-
# Ensure that asset exist, and retrieve internal ID
381+
# Ensure that asset exists, and retrieve internal ID
375382
asset: Asset | None
376383
try:
377384
asset = self.cdf_client.assets.create(self.asset)
378385
except CogniteDuplicatedError:
379386
asset = self.cdf_client.assets.retrieve(external_id=self.asset.external_id)
380387

381-
asset_id = asset.id if asset is not None else None
382-
383-
else:
384-
asset_id = None
388+
self._asset_id = asset.id if asset is not None else None
385389

386-
data_set_id = None
387390
if self.data_set:
388391
dataset = self.cdf_client.data_sets.retrieve(
389392
id=self.data_set.internal_id, external_id=self.data_set.external_id
390393
)
391394
if dataset:
392-
data_set_id = dataset.id
395+
self._data_set_id = dataset.id
393396

394-
for metric in REGISTRY.collect():
395-
if type(metric) is Metric and metric.type in ["gauge", "counter"]:
396-
external_id = self.external_id_prefix + metric.name
397+
def _create_missing_timeseries_factory(self, external_id: str, datapoints: DataPointList) -> TimeSeries:
398+
"""
399+
Factory function to create missing timeseries.
397400
398-
time_series.append(
399-
TimeSeries(
400-
external_id=external_id,
401-
name=metric.name,
402-
legacy_name=external_id,
403-
description=metric.documentation,
404-
asset_id=asset_id,
405-
data_set_id=data_set_id,
406-
)
407-
)
401+
Args:
402+
external_id: External ID of the timeseries to create
403+
datapoints: List of datapoints that triggered the creation
404+
405+
Returns:
406+
A TimeSeries object
407+
"""
408+
metric_name = external_id[len(self.external_id_prefix) :]
408409

409-
ensure_time_series(self.cdf_client, time_series)
410+
metric_description = ""
411+
for metric in REGISTRY.collect():
412+
if isinstance(metric, Metric) and metric.name == metric_name:
413+
metric_description = metric.documentation
414+
break
415+
416+
is_string = (
417+
isinstance(datapoints[0].get("value"), str)
418+
if isinstance(datapoints[0], dict)
419+
else isinstance(datapoints[0][1], str)
420+
)
421+
422+
return TimeSeries(
423+
external_id=external_id,
424+
name=metric_name,
425+
legacy_name=external_id,
426+
description=metric_description,
427+
asset_id=self._asset_id,
428+
data_set_id=self._data_set_id,
429+
is_string=is_string,
430+
)
410431

411432
def _push_to_server(self) -> None:
412433
"""
413-
Create datapoints an push them to their respective time series.
434+
Create datapoints and push them to their respective time series using TimeSeriesUploadQueue.
435+
436+
The queue will automatically create missing timeseries for late-registered metrics.
414437
"""
415438
timestamp = int(arrow.get().float_timestamp * 1000)
416439

417-
datapoints: list[dict[str, str | int | list[Any] | Datapoints | DatapointsArray | NodeId]] = []
418-
419440
for metric in REGISTRY.collect():
420441
if isinstance(metric, Metric) and metric.type in ["gauge", "counter"]:
421442
if len(metric.samples) == 0:
422443
continue
423444

424445
external_id = self.external_id_prefix + metric.name
425-
datapoints.append({"externalId": external_id, "datapoints": [(timestamp, metric.samples[0].value)]})
426446

427-
self.cdf_client.time_series.data.insert_multiple(datapoints)
447+
self.upload_queue.add_to_upload_queue(
448+
external_id=external_id, datapoints=[(timestamp, metric.samples[0].value)]
449+
)
450+
451+
self.upload_queue.upload()
428452
self.logger.debug("Pushed metrics to CDF tenant '%s'", self._cdf_project)
453+
454+
def stop(self) -> None:
455+
"""
456+
Stop the push loop and ensure all metrics are uploaded.
457+
"""
458+
self._push_to_server()
459+
self.upload_queue.stop()
460+
self.cancellation_token.cancel()

0 commit comments

Comments
 (0)