Skip to content

Commit

Permalink
feat: Add more metrics and error log for observability publisher (#598)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
Add kafka lag metrics and error log for easier troubleshoot when issue
arise regarding observability publisher
# Modifications
<!-- Summarize the key code changes. -->
* Add scheduled daemon to calculate lag for kafka consumer 
* Add more error log during failure
* Add container port for observability publisher deployment so it can be
scraped by the prometheus agent
# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [ ] Added PR label
- [ ] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
tiopramayudi authored Jul 31, 2024
1 parent 8c4930d commit 24d79eb
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 17 deletions.
13 changes: 10 additions & 3 deletions api/pkg/observability/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (c *deployer) applyDeployment(ctx context.Context, data *models.WorkerData,
deploymentV1 := appV1.Deployments(c.targetNamespace())

applyDeploymentFunc := func(data *models.WorkerData, secretName string, isExistingDeployment bool) (*appsv1.Deployment, error) {
deployment, err := c.createDeploymentSpec(ctx, data, secretName)
deployment, err := c.createDeploymentSpec(data, secretName)
if err != nil {
return nil, err
}
Expand All @@ -342,7 +342,7 @@ func (c *deployer) getLabels(data *models.WorkerData) map[string]string {
return labels
}

func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.WorkerData, secretName string) (*appsv1.Deployment, error) {
func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName string) (*appsv1.Deployment, error) {
labels := c.getLabels(data)

cfgVolName := "config-volume"
Expand Down Expand Up @@ -394,14 +394,21 @@ func (c *deployer) createDeploymentSpec(ctx context.Context, data *models.Worker
ReadOnly: true,
},
},
Ports: []corev1.ContainerPort{
{
Name: "prom-metric",
ContainerPort: 8000,
Protocol: corev1.ProtocolTCP,
},
},
},
},
Volumes: []corev1.Volume{
{
Name: cfgVolName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: c.getSecretName(data),
SecretName: secretName,
},
},
},
Expand Down
7 changes: 7 additions & 0 deletions api/pkg/observability/deployment/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ func createDeploymentSpec(data *models.WorkerData, resourceRequest corev1.Resour
ReadOnly: true,
},
},
Ports: []corev1.ContainerPort{
{
Name: "prom-metric",
ContainerPort: 8000,
Protocol: corev1.ProtocolTCP,
},
},
},
},
Volumes: []corev1.Volume{
Expand Down
5 changes: 3 additions & 2 deletions python/observation-publisher/publisher/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from merlin.observability.inference import InferenceSchema
from omegaconf import OmegaConf
from prometheus_client import start_http_server

from publisher.config import PublisherConfig
from publisher.metric import MetricWriter
from publisher.observation_sink import new_observation_sink
Expand All @@ -17,7 +16,9 @@ def start_consumer(cfg: PublisherConfig) -> None:

start_http_server(cfg.environment.prometheus_port)
MetricWriter().setup(
model_id=cfg.environment.model_id, model_version=cfg.environment.model_version
model_id=cfg.environment.model_id,
model_version=cfg.environment.model_version,
merlin_project=cfg.environment.project,
)
prediction_log_consumer = new_consumer(cfg.environment.observation_source)
inference_schema = InferenceSchema.from_dict(
Expand Down
35 changes: 30 additions & 5 deletions python/observation-publisher/publisher/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@ def __init__(self):
self.last_processed_timestamp_gauge = Gauge(
"last_processed_timestamp",
"The timestamp of the last prediction log processed by the publisher",
["model_id", "model_version"],
["model_id", "model_version", "merlin_project"],
)
self.total_prediction_logs_processed_counter = Counter(
"total_prediction_logs_processed",
"The total number of prediction logs processed by the publisher",
["model_id", "model_version"],
["model_id", "model_version", "merlin_project"],
)

self.kafka_consumer_lag_gauge = Gauge(
"kafka_consumer_lag",
"The number of unprocess message in kafka",
["model_id", "model_version", "merlin_project", "partition"],
)
self._initialized = True

Expand All @@ -31,7 +37,7 @@ def __new__(cls):
cls._instance._initialized = False
return cls._instance

def setup(self, model_id: str, model_version: str):
def setup(self, model_id: str, model_version: str, merlin_project: str):
"""
Needs to be run before sending metrics, so that the singleton instance has the correct properties value.
:param model_id:
Expand All @@ -40,6 +46,7 @@ def setup(self, model_id: str, model_version: str):
"""
self.model_id = model_id
self.model_version = model_version
self.merlin_project = merlin_project

def update_last_processed_timestamp(self, last_processed_timestamp: Timestamp):
"""
Expand All @@ -48,7 +55,9 @@ def update_last_processed_timestamp(self, last_processed_timestamp: Timestamp):
:return:
"""
self.last_processed_timestamp_gauge.labels(
model_id=self.model_id, model_version=self.model_version
model_id=self.model_id,
model_version=self.model_version,
merlin_project=self.merlin_project,
).set(last_processed_timestamp.timestamp())

def increment_total_prediction_logs_processed(self, value: int):
Expand All @@ -57,5 +66,21 @@ def increment_total_prediction_logs_processed(self, value: int):
:return:
"""
self.total_prediction_logs_processed_counter.labels(
model_id=self.model_id, model_version=self.model_version
model_id=self.model_id,
model_version=self.model_version,
merlin_project=self.merlin_project,
).inc(value)

def update_kafka_lag(self, total_lag: int, partition: int):
"""
Update the kafka_consumer_lag gauge with the given value
:param total_lag:
:param partition:
:return:
"""
self.kafka_consumer_lag_gauge.labels(
model_id=self.model_id,
model_version=self.model_version,
partition=partition,
merlin_project=self.merlin_project,
).set(total_lag)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import time
from dataclasses import dataclass
from datetime import datetime
from threading import Thread
Expand All @@ -7,7 +8,7 @@
import numpy as np
import pandas as pd
from caraml.upi.v1.prediction_log_pb2 import PredictionLog
from confluent_kafka import Consumer, KafkaException
from confluent_kafka import Consumer, KafkaException, TopicPartition
from dataclasses_json import DataClassJsonMixin, dataclass_json
from merlin.observability.inference import InferenceSchema, ObservationType
from publisher.config import ObservationSource, ObservationSourceConfig
Expand Down Expand Up @@ -141,9 +142,54 @@ def __init__(

self._consumer = Consumer(consumer_config)
self._batch_size = config.batch_size
self._topic = config.topic
self._consumer.subscribe([config.topic])
self._poll_timeout = config.poll_timeout_seconds

background_job_thread = Thread(target=self._emit_metrics)
background_job_thread.setDaemon(True)
background_job_thread.start()

def _emit_metrics(self):
while True:
lags, partitions = self._calculate_lag()
for lag, partition in zip(lags, partitions):
MetricWriter().update_kafka_lag(total_lag=lag, partition=partition)

time.sleep(60)

def _calculate_lag(self) -> Tuple[List[int], List[int]]:
cluster_metadata = self._consumer.list_topics(topic=self._topic)
topic_metadata = cluster_metadata.topics.get(self._topic)
partition_ids = list(topic_metadata.partitions.keys())

topic_partitions = [
TopicPartition(topic=self._topic, partition=partition_id)
for partition_id in partition_ids
]

committed_offsets = self._consumer.committed(topic_partitions)
committed_offsets_per_partitions = {}

for topic_partition in committed_offsets:
key = f"{topic_partition.topic}_{topic_partition.partition}"
committed_offsets_per_partitions[key] = topic_partition.offset

diff = []
partitions = []
for topic_partition in topic_partitions:
_, high = self._consumer.get_watermark_offsets(topic_partition)
committed_offset_key = (
f"{topic_partition.topic}_{topic_partition.partition}"
)
commited_offset = committed_offsets_per_partitions.get(
committed_offset_key, 0
)
diff.append(high - commited_offset)
partitions.append(topic_partition.partition)

return diff, partitions

def poll_new_logs(self) -> List[PredictionLog]:
messages = self._consumer.consume(self._batch_size, timeout=self._poll_timeout)
errors = [msg.error() for msg in messages if msg.error() is not None]
Expand Down
12 changes: 6 additions & 6 deletions python/observation-publisher/publisher/prediction_log_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def from_struct(


def convert_to_numpy_value(
col_value: Optional[int | str | float | bool], value_type: Optional[ValueType]
col_value: Optional[int | str | float | bool], value_type: Optional[ValueType], col_name: str
) -> np.int64 | np.float64 | np.bool_ | np.str_:
if value_type is None:
if isinstance(col_value, (int, float)):
Expand All @@ -104,16 +104,16 @@ def convert_to_numpy_value(

match value_type:
case ValueType.INT64:
assert isinstance(col_value, (int, float))
assert isinstance(col_value, (int, float)), f"type of value for column {col_name} should be int or float, current value: {col_value} and type: {type(col_value)}"
return np.int64(col_value)
case ValueType.FLOAT64:
assert isinstance(col_value, (int, float, NoneType))
assert isinstance(col_value, (int, float, NoneType)), f"type of value for column {col_name} should be int or float or None, current value: {col_value} and type: {type(col_value)}"
return np.float64(col_value)
case ValueType.BOOLEAN:
assert isinstance(col_value, bool)
assert isinstance(col_value, bool), f"type of value for column {col_name} should be boolean, current value: {col_value} and type: {type(col_value)}"
return np.bool_(col_value)
case ValueType.STRING:
assert isinstance(col_value, str)
assert isinstance(col_value, str),f"type of value for column {col_name} should be string, current value: {col_value} and type: {type(col_value)}"
return np.str_(col_value)
case _:
raise ValueError(f"Unknown value type: {value_type}")
Expand Down Expand Up @@ -165,6 +165,6 @@ def list_value_as_numpy_list(
column_values.append(v)

return [
convert_to_numpy_value(col_value, column_types.get(col_name))
convert_to_numpy_value(col_value=col_value, value_type=column_types.get(col_name), col_name=col_name)
for col_value, col_name in zip(column_values, column_names) if column_types.get(col_name) is not None
]

0 comments on commit 24d79eb

Please sign in to comment.