1111import os
1212import uuid
1313
14- # Prometheus imports (for backward compatibility)
15- from prometheus_client import Counter , Histogram , Gauge
14+ # OpenTelemetry imports only
1615
1716# OpenTelemetry imports
1817from opentelemetry import metrics as otel_metrics
1918from opentelemetry .sdk .metrics import MeterProvider
2019from opentelemetry .sdk .resources import Resource
2120from opentelemetry .exporter .otlp .proto .grpc .metric_exporter import OTLPMetricExporter
22- from opentelemetry . exporter . prometheus import PrometheusMetricReader
21+
2322from opentelemetry .sdk .metrics .export import PeriodicExportingMetricReader
2423# Note: RedisInstrumentor import removed to prevent automatic instrumentation
2524
@@ -40,15 +39,11 @@ class OperationMetrics:
4039class MetricsCollector :
4140 """Centralized metrics collection for Redis operations with OpenTelemetry support."""
4241
43- def __init__ (self , enable_prometheus : bool = True , prometheus_port : int = 8000 ,
44- enable_otel : bool = True , otel_endpoint : str = None ,
42+ def __init__ (self , otel_endpoint : str ,
4543 service_name : str = "redis-load-test" , service_version : str = "1.0.0" ,
46- otel_export_interval_ms : int = 5000 , app_name : str = "python" ,
44+ otel_export_interval_ms : int = 1000 , app_name : str = "python" ,
4745 instance_id : str = None , run_id : str = None , version : str = None ):
4846 self .logger = get_logger ()
49- self .enable_prometheus = enable_prometheus
50- self .prometheus_port = prometheus_port
51- self .enable_otel = enable_otel
5247 self .otel_endpoint = otel_endpoint
5348 self .service_name = service_name
5449 self .service_version = service_version
@@ -70,12 +65,8 @@ def __init__(self, enable_prometheus: bool = True, prometheus_port: int = 8000,
7065 self ._reconnection_count = 0
7166 self ._reconnection_duration = 0.0
7267
73- # Setup metrics collection
74- if self .enable_otel :
75- self ._setup_opentelemetry ()
76-
77- if self .enable_prometheus :
78- self ._setup_prometheus_metrics ()
68+ # Setup OpenTelemetry metrics (single collection method)
69+ self ._setup_opentelemetry ()
7970
8071 def _setup_opentelemetry (self ):
8172 """Setup OpenTelemetry metrics and tracing."""
@@ -86,20 +77,15 @@ def _setup_opentelemetry(self):
8677 "service.version" : self .version
8778 })
8879
89- # Setup metrics
90- if self .otel_endpoint :
91- # OTLP exporter for metrics
92- metric_exporter = OTLPMetricExporter (
93- endpoint = self .otel_endpoint ,
94- insecure = True
95- )
96- metric_reader = PeriodicExportingMetricReader (
97- exporter = metric_exporter ,
98- export_interval_millis = self .otel_export_interval_ms
99- )
100- else :
101- # Prometheus exporter as fallback
102- metric_reader = PrometheusMetricReader ()
80+ # Setup OTLP metrics exporter
81+ metric_exporter = OTLPMetricExporter (
82+ endpoint = self .otel_endpoint ,
83+ insecure = True
84+ )
85+ metric_reader = PeriodicExportingMetricReader (
86+ exporter = metric_exporter ,
87+ export_interval_millis = self .otel_export_interval_ms
88+ )
10389
10490 # Initialize metrics provider
10591 metrics_provider = MeterProvider (
@@ -157,78 +143,7 @@ def _setup_opentelemetry(self):
157143
158144 except Exception as e :
159145 self .logger .error (f"Failed to setup OpenTelemetry: { e } " )
160- self .enable_otel = False
161-
162- def _setup_prometheus_metrics (self ):
163- """Setup Prometheus metrics with multi-app support."""
164- # Base labels for all metrics
165- base_labels = ['app_name' , 'service_name' , 'instance_id' ]
166-
167- # 1. Total number of successful/failed operations
168- self .prom_operations_total = Counter (
169- 'redis_operations_total' ,
170- 'Total number of Redis operations' ,
171- ['operation' , 'status' ] + base_labels
172- )
173-
174- # 2. Operation latency (for percentiles)
175- self .prom_operation_duration = Histogram (
176- 'redis_operation_duration_seconds' ,
177- 'Duration of Redis operations' ,
178- ['operation' ] + base_labels ,
179- buckets = [0.0001 , 0.0005 , 0.001 , 0.002 , 0.005 , 0.01 , 0.025 , 0.05 , 0.1 , 0.25 , 0.5 , 1.0 , 2.5 , 5.0 , 10.0 ]
180- )
181-
182- # 3. Connection metrics
183- self .prom_connections_total = Counter (
184- 'redis_connections_total' ,
185- 'Total number of connection attempts' ,
186- ['status' ] + base_labels
187- )
188-
189- self .prom_active_connections = Gauge (
190- 'redis_active_connections' ,
191- 'Number of active Redis connections' ,
192- base_labels
193- )
194-
195- # 4. Reconnection duration
196- self .prom_reconnection_duration = Histogram (
197- 'redis_reconnection_duration_seconds' ,
198- 'Duration of reconnection attempts' ,
199- base_labels ,
200- buckets = [0.1 , 0.5 , 1.0 , 2.0 , 5.0 , 10.0 , 30.0 , 60.0 ]
201- )
202-
203- # 5. Error rate percentage
204- self .prom_error_rate = Gauge (
205- 'redis_error_rate_percent' ,
206- 'Current error rate percentage' ,
207- base_labels
208- )
209-
210- # 6. Average latency gauge (for quick overview)
211- self .prom_avg_latency = Gauge (
212- 'redis_average_latency_seconds' ,
213- 'Average operation latency' ,
214- ['operation' ] + base_labels
215- )
216-
217- # 7. Pub/Sub specific metrics (unified)
218- self .prom_pubsub_operations_total = Counter (
219- 'redis_pubsub_operations_total' ,
220- 'Total number of Redis pub/sub operations (publish and receive)' ,
221- ['channel' , 'operation_type' , 'subscriber_id' , 'status' ] + base_labels
222- )
223-
224- # Start Prometheus HTTP server
225- try :
226- from prometheus_client import start_http_server
227- start_http_server (self .prometheus_port )
228- self .logger .info (f"Prometheus metrics server started on port { self .prometheus_port } " )
229- except Exception as e :
230- self .logger .error (f"Failed to start Prometheus server: { e } " )
231- self .enable_prometheus = False
146+ raise
232147
233148
234149
@@ -247,8 +162,7 @@ def record_operation(self, operation: str, duration: float, success: bool, error
247162 if error_type :
248163 metrics .errors_by_type [error_type ] += 1
249164
250- # Update OpenTelemetry metrics with cleaned-up labels
251- if self .enable_otel and hasattr (self , 'otel_operations_counter' ):
165+ # Update OpenTelemetry metrics
252166 status = 'success' if success else 'error'
253167 labels = {
254168 "operation" : operation ,
@@ -258,20 +172,20 @@ def record_operation(self, operation: str, duration: float, success: bool, error
258172 "run_id" : self .run_id ,
259173 "version" : self .version ,
260174 "error_type" : error_type or "none"
261- }
262- self .otel_operations_counter .add (1 , labels )
175+ }
176+ self .otel_operations_counter .add (1 , labels )
263177
264- duration_labels = {
178+ duration_labels = {
265179 "operation" : operation ,
266180 "status" : status ,
267181 "app_name" : self .app_name ,
268182 "instance_id" : self .instance_id ,
269183 "run_id" : self .run_id ,
270184 "version" : self .version
271- }
272- # Convert duration from seconds to milliseconds
273- duration_ms = duration * 1000
274- self .otel_operation_duration .record (duration_ms , duration_labels )
185+ }
186+ # Convert duration from seconds to milliseconds
187+ duration_ms = duration * 1000
188+ self .otel_operation_duration .record (duration_ms , duration_labels )
275189
276190 # Metrics are now only collected via OpenTelemetry (OTLP push)
277191
@@ -282,17 +196,16 @@ def record_connection_attempt(self, success: bool):
282196 if not success :
283197 self ._connection_failures += 1
284198
285- # Update OpenTelemetry metrics with cleaned-up labels
286- if self .enable_otel and hasattr (self , 'otel_connections_counter' ):
199+ # Update OpenTelemetry metrics
287200 status = 'success' if success else 'error'
288201 labels = {
289202 "status" : status ,
290203 "app_name" : self .app_name ,
291204 "instance_id" : self .instance_id ,
292205 "run_id" : self .run_id ,
293206 "version" : self .version
294- }
295- self .otel_connections_counter .add (1 , labels )
207+ }
208+ self .otel_connections_counter .add (1 , labels )
296209
297210 # Connection metrics collected via OpenTelemetry only
298211
@@ -302,81 +215,43 @@ def record_reconnection(self, duration: float):
302215 self ._reconnection_count += 1
303216 self ._reconnection_duration += duration
304217
305- # Update OpenTelemetry metrics with cleaned-up labels
306- if self .enable_otel and hasattr (self , 'otel_reconnection_duration' ):
218+ # Update OpenTelemetry metrics
307219 labels = {
308220 "app_name" : self .app_name ,
309221 "instance_id" : self .instance_id ,
310222 "run_id" : self .run_id ,
311223 "version" : self .version
312- }
313- # Convert duration from seconds to milliseconds
314- duration_ms = duration * 1000
315- self .otel_reconnection_duration .record (duration_ms , labels )
316-
317- # Update Prometheus metrics with app identification
318- if self .enable_prometheus and hasattr (self , 'prom_reconnection_duration' ):
319- labels = {
320- 'app_name' : self .app_name ,
321- 'service_name' : self .service_name ,
322- 'instance_id' : self .instance_id
323- }
324- self .prom_reconnection_duration .labels (** labels ).observe (duration )
224+ }
225+ # Convert duration from seconds to milliseconds
226+ duration_ms = duration * 1000
227+ self .otel_reconnection_duration .record (duration_ms , labels )
325228
326229 def update_active_connections (self , count : int ):
327230 """Update active connections count."""
328- # Update OpenTelemetry metrics with proper labels
329- if self .enable_otel and hasattr (self , 'otel_active_connections' ):
330- labels = {
331- "app_name" : self .app_name ,
332- "instance_id" : self .instance_id ,
333- "version" : self .version
334- }
335- # Use set() for gauge metrics, not add()
336- self .otel_active_connections .set (count , labels )
337-
338- # Update Prometheus metrics with app identification
339- if self .enable_prometheus and hasattr (self , 'prom_active_connections' ):
340- labels = {
341- 'app_name' : self .app_name ,
342- 'service_name' : self .service_name ,
343- 'instance_id' : self .instance_id
344- }
345- self .prom_active_connections .labels (** labels ).set (count )
231+ # Update OpenTelemetry metrics
232+ labels = {
233+ "app_name" : self .app_name ,
234+ "instance_id" : self .instance_id ,
235+ "version" : self .version
236+ }
237+ # Use set() for gauge metrics, not add()
238+ self .otel_active_connections .set (count , labels )
346239
347240 def record_pubsub_operation (self , channel : str , operation_type : str , subscriber_id : str = None , success : bool = True , error_type : str = None ):
348241 """Record metrics for a pub/sub operation (publish or receive)."""
349242
350243 # Update OpenTelemetry metrics
351- if self .enable_otel and hasattr (self , 'otel_pubsub_operations_counter' ):
352- labels = {
353- "app_name" : self .app_name ,
354- "instance_id" : self .instance_id ,
355- "version" : self .version ,
356- "run_id" : self .run_id ,
357- "channel" : channel ,
358- "operation_type" : operation_type ,
359- "subscriber_id" : subscriber_id or "" ,
360- "status" : "success" if success else "error"
361- }
362- self .otel_pubsub_operations_counter .add (1 , labels )
363-
364- # Update Prometheus metrics
365- if self .enable_prometheus and hasattr (self , 'prom_pubsub_operations_total' ):
366- base_labels = {
367- 'app_name' : self .app_name ,
368- 'service_name' : self .service_name ,
369- 'instance_id' : self .instance_id
370- }
371-
372- status = "success" if success else "error"
373- self .prom_pubsub_operations_total .labels (
374- channel = channel ,
375- operation_type = operation_type ,
376- subscriber_id = subscriber_id or "" ,
377- status = status ,
378- ** base_labels
379- ).inc ()
244+ labels = {
245+ "app_name" : self .app_name ,
246+ "instance_id" : self .instance_id ,
247+ "version" : self .version ,
248+ "run_id" : self .run_id ,
249+ "channel" : channel ,
250+ "operation_type" : operation_type ,
251+ "subscriber_id" : subscriber_id or "" ,
252+ "status" : "success" if success else "error"
253+ }
254+ self .otel_pubsub_operations_counter .add (1 , labels )
380255
381256 def get_operation_stats (self , operation : str ) -> Dict :
382257 """Get statistics for a specific operation."""
@@ -550,17 +425,13 @@ def get_metrics_collector() -> MetricsCollector:
550425 return _metrics_collector
551426
552427
553- def setup_metrics (enable_prometheus : bool = True , prometheus_port : int = 8000 ,
554- enable_otel : bool = True , otel_endpoint : str = None ,
428+ def setup_metrics (otel_endpoint : str ,
555429 service_name : str = "redis-load-test" , service_version : str = "1.0.0" ,
556- otel_export_interval_ms : int = 5000 , app_name : str = "python" ,
430+ otel_export_interval_ms : int = 1000 , app_name : str = "python" ,
557431 instance_id : str = None , run_id : str = None , version : str = None ) -> MetricsCollector :
558- """Setup global metrics collector with multi-app support ."""
432+ """Setup global metrics collector with OpenTelemetry only ."""
559433 global _metrics_collector
560434 _metrics_collector = MetricsCollector (
561- enable_prometheus = enable_prometheus ,
562- prometheus_port = prometheus_port ,
563- enable_otel = enable_otel ,
564435 otel_endpoint = otel_endpoint ,
565436 service_name = service_name ,
566437 service_version = service_version ,
0 commit comments