Skip to content

Commit 61859e7

Browse files
adds metrics for subscribers
Signed-off-by: Elena Kolevska <[email protected]>
1 parent 8bbe1e8 commit 61859e7

File tree

3 files changed

+87
-6
lines changed

3 files changed

+87
-6
lines changed

metrics.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,13 @@ def __init__(self, enable_prometheus: bool = True, prometheus_port: int = 8000,
7070
self._reconnection_count = 0
7171
self._reconnection_duration = 0.0
7272

73-
# OpenTelemetry setup (single collection method)
73+
# Setup metrics collection
7474
if self.enable_otel:
7575
self._setup_opentelemetry()
7676

77+
if self.enable_prometheus:
78+
self._setup_prometheus_metrics()
79+
7780
def _setup_opentelemetry(self):
7881
"""Setup OpenTelemetry metrics and tracing."""
7982
try:
@@ -139,7 +142,12 @@ def _setup_opentelemetry(self):
139142
unit="1"
140143
)
141144

142-
145+
# Pub/Sub specific metrics (unified)
146+
self.otel_pubsub_operations_counter = self.meter.create_counter(
147+
name="redis_pubsub_operations_total",
148+
description="Total number of Redis pub/sub operations (publish and receive)",
149+
unit="1"
150+
)
143151

144152
# Note: Using manual instrumentation instead of automatic Redis instrumentation
145153
# to have full control over operation labeling and avoid "BATCH" aggregation
@@ -205,6 +213,22 @@ def _setup_prometheus_metrics(self):
205213
'Average operation latency',
206214
['operation'] + base_labels
207215
)
216+
217+
# 7. Pub/Sub specific metrics (unified)
218+
self.prom_pubsub_operations_total = Counter(
219+
'redis_pubsub_operations_total',
220+
'Total number of Redis pub/sub operations (publish and receive)',
221+
['channel', 'operation_type', 'subscriber_id', 'status'] + base_labels
222+
)
223+
224+
# Start Prometheus HTTP server
225+
try:
226+
from prometheus_client import start_http_server
227+
start_http_server(self.prometheus_port)
228+
self.logger.info(f"Prometheus metrics server started on port {self.prometheus_port}")
229+
except Exception as e:
230+
self.logger.error(f"Failed to start Prometheus server: {e}")
231+
self.enable_prometheus = False
208232

209233

210234

@@ -320,10 +344,40 @@ def update_active_connections(self, count: int):
320344
}
321345
self.prom_active_connections.labels(**labels).set(count)
322346

347+
def record_pubsub_operation(self, channel: str, operation_type: str, subscriber_id: str = None, success: bool = True, error_type: str = None):
348+
"""Record metrics for a pub/sub operation (publish or receive)."""
323349

350+
# Update OpenTelemetry metrics
351+
if self.enable_otel and hasattr(self, 'otel_pubsub_operations_counter'):
352+
labels = {
353+
"app_name": self.app_name,
354+
"instance_id": self.instance_id,
355+
"version": self.version,
356+
"run_id": self.run_id,
357+
"channel": channel,
358+
"operation_type": operation_type,
359+
"subscriber_id": subscriber_id or "",
360+
"status": "success" if success else "error"
361+
}
362+
self.otel_pubsub_operations_counter.add(1, labels)
363+
364+
# Update Prometheus metrics
365+
if self.enable_prometheus and hasattr(self, 'prom_pubsub_operations_total'):
366+
base_labels = {
367+
'app_name': self.app_name,
368+
'service_name': self.service_name,
369+
'instance_id': self.instance_id
370+
}
324371

372+
status = "success" if success else "error"
373+
self.prom_pubsub_operations_total.labels(
374+
channel=channel,
375+
operation_type=operation_type,
376+
subscriber_id=subscriber_id or "",
377+
status=status,
378+
**base_labels
379+
).inc()
325380

326-
327381
def get_operation_stats(self, operation: str) -> Dict:
328382
"""Get statistics for a specific operation."""
329383
with self._lock:

redis_client.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,19 @@ def llen(self, key: str) -> int:
296296
# Pub/Sub operations
297297
def publish(self, channel: str, message: str) -> int:
298298
"""Publish a message to a channel."""
299-
return self._execute_with_metrics('PUBLISH', self._client.publish, channel, message)
299+
start_time = time.time()
300+
try:
301+
result = self._client.publish(channel, message)
302+
duration = max(0.0, time.time() - start_time)
303+
# Record both general operation metrics and pub/sub specific metrics
304+
self.metrics.record_operation('PUBLISH', duration, True)
305+
self.metrics.record_pubsub_operation(channel, 'PUBLISH', success=True)
306+
return result
307+
except Exception as e:
308+
duration = max(0.0, time.time() - start_time)
309+
self.metrics.record_operation('PUBLISH', duration, False, type(e).__name__)
310+
self.metrics.record_pubsub_operation(channel, 'PUBLISH', success=False, error_type=type(e).__name__)
311+
raise
300312

301313
def pubsub(self):
302314
"""Get a pubsub instance."""

workloads.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,9 @@ def __init__(self, config: WorkloadConfig, client: RedisClientManager):
303303
self._pubsub = None
304304
self._subscriber_thread = None
305305
self._stop_subscriber = threading.Event()
306+
# Generate unique subscriber ID for this workload instance
307+
import uuid
308+
self._subscriber_id = f"subscriber_{uuid.uuid4().hex[:8]}"
306309

307310
def _start_subscriber(self):
308311
"""Start subscriber in a separate thread."""
@@ -318,15 +321,27 @@ def _start_subscriber(self):
318321
while not self._stop_subscriber.is_set():
319322
try:
320323
# Use get_message with short timeout to be responsive to shutdown
324+
start_time = time.time()
321325
message = self._pubsub.get_message(timeout=0.5)
326+
322327
if message and message['type'] == 'message':
323-
self.logger.debug(f"Received message on {message['channel']}: {message['data']}")
328+
channel_name = message['channel'].decode() if isinstance(message['channel'], bytes) else str(message['channel'])
329+
330+
# Record receive metrics using unified pub/sub metric
331+
self.metrics.record_pubsub_operation(channel_name, 'receive', self._subscriber_id, success=True)
332+
333+
self.logger.debug(f"Received message on {channel_name}: {message['data']}")
334+
324335
except (ConnectionError, ValueError) as e:
325336
# These are expected during shutdown, break quietly
326337
break
327338
except Exception as e:
328-
# Other unexpected errors - log only if not shutting down
339+
# Record error metrics if we have channel info
329340
if not self._stop_subscriber.is_set():
341+
# Use default channel for error tracking
342+
channels = self.config.get_option("channels", ["test_channel"])
343+
default_channel = channels[0] if channels else "unknown"
344+
self.metrics.record_pubsub_operation(default_channel, 'receive', self._subscriber_id, success=False, error_type=type(e).__name__)
330345
self.logger.debug(f"Subscriber error (continuing): {e}")
331346
break
332347

0 commit comments

Comments
 (0)