From b5057191a4f06f77bbd4c939741f0835c9f8b79f Mon Sep 17 00:00:00 2001 From: Maciej Moscicki Date: Tue, 18 Jul 2023 16:02:24 +0200 Subject: [PATCH] prometheus support in hermes tracker (#1690) --- .../hermes/common/metric/MetricsFacade.java | 6 ++ .../metric/TrackerElasticSearchMetrics.java | 68 +++++++++++++++++++ .../ElasticsearchQueueCommitter.java | 5 +- .../ConsumersElasticsearchLogRepository.java | 41 +++++------ .../FrontendElasticsearchLogRepository.java | 41 +++++------ .../tracker/elasticsearch/metrics/Gauges.java | 14 ---- .../tracker/elasticsearch/metrics/Timers.java | 7 -- ...nsumersElasticsearchLogRepositoryTest.java | 10 ++- ...rontendElasticsearchLogRepositoryTest.java | 9 ++- .../ElasticsearchLogRepositoryTest.java | 13 ++-- .../MultiElasticsearchLogRepositoryTest.java | 13 ++-- .../hermes/tracker/BatchingLogRepository.java | 21 +----- .../tech/hermes/tracker/QueueCommitter.java | 8 ++- 13 files changed, 160 insertions(+), 96 deletions(-) create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/TrackerElasticSearchMetrics.java delete mode 100644 hermes-tracker-elasticsearch/src/main/java/pl/allegro/tech/hermes/tracker/elasticsearch/metrics/Gauges.java delete mode 100644 hermes-tracker-elasticsearch/src/main/java/pl/allegro/tech/hermes/tracker/elasticsearch/metrics/Timers.java diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java index a76601550d..649e29e1c3 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java @@ -7,12 +7,14 @@ public class MetricsFacade { private final TopicMetrics topicMetrics; private final SubscriptionMetrics subscriptionMetrics; private final ConsumerMetrics consumerMetrics; + private final TrackerElasticSearchMetrics trackerElasticSearchMetrics; public MetricsFacade(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) { this.topicMetrics = new TopicMetrics(hermesMetrics, meterRegistry); this.subscriptionMetrics = new SubscriptionMetrics(hermesMetrics, meterRegistry); this.consumerMetrics = new ConsumerMetrics(hermesMetrics, meterRegistry); + this.trackerElasticSearchMetrics = new TrackerElasticSearchMetrics(hermesMetrics, meterRegistry); } public TopicMetrics topics() { @@ -26,5 +28,9 @@ public SubscriptionMetrics subscriptionMetrics() { public ConsumerMetrics consumers() { return consumerMetrics; } + + public TrackerElasticSearchMetrics trackerElasticSearch() { + return trackerElasticSearchMetrics; + } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/TrackerElasticSearchMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/TrackerElasticSearchMetrics.java new file mode 100644 index 0000000000..722185ddbb --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/TrackerElasticSearchMetrics.java @@ -0,0 +1,68 @@ +package pl.allegro.tech.hermes.common.metric; + +import io.micrometer.core.instrument.MeterRegistry; +import pl.allegro.tech.hermes.metrics.HermesTimer; + +import java.util.function.ToDoubleFunction; + +import static pl.allegro.tech.hermes.metrics.PathsCompiler.HOSTNAME; + +public class TrackerElasticSearchMetrics { + private final MeterRegistry meterRegistry; + private final HermesMetrics hermesMetrics; + + public TrackerElasticSearchMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + this.hermesMetrics = hermesMetrics; + } + + public void registerProducerTrackerElasticSearchQueueSizeGauge(T stateObj, ToDoubleFunction f) { + hermesMetrics.registerGauge(Gauges.Graphite.PRODUCER_TRACKER_ELASTICSEARCH_QUEUE_SIZE, () -> f.applyAsDouble(stateObj)); + meterRegistry.gauge(Gauges.Prometheus.TRACKER_ELASTICSEARCH_QUEUE_SIZE, stateObj, f); + } + + public void registerProducerTrackerElasticSearchRemainingCapacity(T stateObj, ToDoubleFunction f) { + hermesMetrics.registerGauge(Gauges.Graphite.PRODUCER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY, () -> f.applyAsDouble(stateObj)); + meterRegistry.gauge(Gauges.Prometheus.TRACKER_ELASTICSEARCH_REMAINING_CAPACITY, stateObj, f); + } + + public void registerConsumerTrackerElasticSearchQueueSizeGauge(T stateObj, ToDoubleFunction f) { + hermesMetrics.registerGauge(Gauges.Graphite.CONSUMER_TRACKER_ELASTICSEARCH_QUEUE_SIZE, () -> f.applyAsDouble(stateObj)); + meterRegistry.gauge(Gauges.Prometheus.TRACKER_ELASTICSEARCH_QUEUE_SIZE, stateObj, f); + } + + public void registerConsumerTrackerElasticSearchRemainingCapacity(T stateObj, ToDoubleFunction f) { + hermesMetrics.registerGauge(Gauges.Graphite.CONSUMER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY, () -> f.applyAsDouble(stateObj)); + meterRegistry.gauge(Gauges.Prometheus.TRACKER_ELASTICSEARCH_REMAINING_CAPACITY, stateObj, f); + } + + public HermesTimer trackerElasticSearchCommitLatencyTimer() { + return HermesTimer.from( + meterRegistry.timer(Timers.ELASTICSEARCH_COMMIT_LATENCY), + hermesMetrics.timer(Timers.ELASTICSEARCH_COMMIT_LATENCY) + ); + } + + private static class Gauges { + private static class Graphite { + public static final String PRODUCER_TRACKER_ELASTICSEARCH_QUEUE_SIZE = + "producer." + HOSTNAME + ".tracker.elasticsearch.queue-size"; + public static final String PRODUCER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY = + "producer." + HOSTNAME + ".tracker.elasticsearch.remaining-capacity"; + + public static final String CONSUMER_TRACKER_ELASTICSEARCH_QUEUE_SIZE = + "consumer." + HOSTNAME + ".tracker.elasticsearch.queue-size"; + public static final String CONSUMER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY = + "consumer." + HOSTNAME + ".tracker.elasticsearch.remaining-capacity"; + } + + private static class Prometheus { + public static final String TRACKER_ELASTICSEARCH_QUEUE_SIZE = "tracker.elasticsearch.queue-size"; + public static final String TRACKER_ELASTICSEARCH_REMAINING_CAPACITY = "tracker.elasticsearch.remaining-capacity"; + } + } + + private static class Timers { + public static final String ELASTICSEARCH_COMMIT_LATENCY = "tracker.elasticsearch.commit-latency"; + } +} diff --git a/hermes-tracker-elasticsearch/src/main/java/pl/allegro/tech/hermes/tracker/elasticsearch/ElasticsearchQueueCommitter.java b/hermes-tracker-elasticsearch/src/main/java/pl/allegro/tech/hermes/tracker/elasticsearch/ElasticsearchQueueCommitter.java index e509e0f209..daed417560 100644 --- a/hermes-tracker-elasticsearch/src/main/java/pl/allegro/tech/hermes/tracker/elasticsearch/ElasticsearchQueueCommitter.java +++ b/hermes-tracker-elasticsearch/src/main/java/pl/allegro/tech/hermes/tracker/elasticsearch/ElasticsearchQueueCommitter.java @@ -5,6 +5,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentType; +import pl.allegro.tech.hermes.metrics.HermesTimer; import pl.allegro.tech.hermes.tracker.QueueCommitter; import java.util.List; @@ -22,7 +23,7 @@ public class ElasticsearchQueueCommitter extends QueueCommitter queue, - Timer timer, + HermesTimer timer, IndexFactory indexFactory, String typeName, Client client) { @@ -44,7 +45,7 @@ public static void scheduleCommitAtFixedRate(BlockingQueue { - protected final MetricRegistry metricRegistry; - protected final PathsCompiler pathsCompiler; protected final String clusterName; protected final String hostname; protected BlockingQueue queue; public BatchingLogRepository(int queueSize, String clusterName, - String hostname, - MetricRegistry metricRegistry, - PathsCompiler pathsCompiler) { + String hostname) { this.queue = new LinkedBlockingQueue<>(queueSize); this.clusterName = clusterName; this.hostname = hostname; - this.metricRegistry = metricRegistry; - this.pathsCompiler = pathsCompiler; - } - - protected void registerQueueSizeGauge(String gauge) { - metricRegistry.register(pathsCompiler.compile(gauge), (Gauge) queue::size); } - - protected void registerRemainingCapacityGauge(String gauge) { - metricRegistry.register(pathsCompiler.compile(gauge), (Gauge) queue::remainingCapacity); - } - } \ No newline at end of file diff --git a/hermes-tracker/src/main/java/pl/allegro/tech/hermes/tracker/QueueCommitter.java b/hermes-tracker/src/main/java/pl/allegro/tech/hermes/tracker/QueueCommitter.java index fee2387de4..3bae8d6131 100644 --- a/hermes-tracker/src/main/java/pl/allegro/tech/hermes/tracker/QueueCommitter.java +++ b/hermes-tracker/src/main/java/pl/allegro/tech/hermes/tracker/QueueCommitter.java @@ -3,6 +3,8 @@ import com.codahale.metrics.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.metrics.HermesTimer; +import pl.allegro.tech.hermes.metrics.HermesTimerContext; import java.util.ArrayList; import java.util.List; @@ -13,9 +15,9 @@ public abstract class QueueCommitter implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(QueueCommitter.class); private final BlockingQueue queue; - private final Timer timer; + private final HermesTimer timer; - public QueueCommitter(BlockingQueue queue, Timer timer) { + public QueueCommitter(BlockingQueue queue, HermesTimer timer) { this.queue = queue; this.timer = timer; } @@ -24,7 +26,7 @@ public QueueCommitter(BlockingQueue queue, Timer timer) { public void run() { try { if (!queue.isEmpty()) { - Timer.Context ctx = timer.time(); + HermesTimerContext ctx = timer.time(); commit(); ctx.close(); }