Skip to content

Commit

Permalink
prometheus support in hermes tracker (#1690)
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky authored Jul 18, 2023
1 parent e945272 commit b505719
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ public class MetricsFacade {
private final TopicMetrics topicMetrics;
private final SubscriptionMetrics subscriptionMetrics;
private final ConsumerMetrics consumerMetrics;
private final TrackerElasticSearchMetrics trackerElasticSearchMetrics;

public MetricsFacade(MeterRegistry meterRegistry,
HermesMetrics hermesMetrics) {
this.topicMetrics = new TopicMetrics(hermesMetrics, meterRegistry);
this.subscriptionMetrics = new SubscriptionMetrics(hermesMetrics, meterRegistry);
this.consumerMetrics = new ConsumerMetrics(hermesMetrics, meterRegistry);
this.trackerElasticSearchMetrics = new TrackerElasticSearchMetrics(hermesMetrics, meterRegistry);
}

public TopicMetrics topics() {
Expand All @@ -26,5 +28,9 @@ public SubscriptionMetrics subscriptionMetrics() {
public ConsumerMetrics consumers() {
return consumerMetrics;
}

public TrackerElasticSearchMetrics trackerElasticSearch() {
return trackerElasticSearchMetrics;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import pl.allegro.tech.hermes.metrics.HermesTimer;

import java.util.function.ToDoubleFunction;

import static pl.allegro.tech.hermes.metrics.PathsCompiler.HOSTNAME;

public class TrackerElasticSearchMetrics {
private final MeterRegistry meterRegistry;
private final HermesMetrics hermesMetrics;

public TrackerElasticSearchMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.hermesMetrics = hermesMetrics;
}

public <T> void registerProducerTrackerElasticSearchQueueSizeGauge(T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerGauge(Gauges.Graphite.PRODUCER_TRACKER_ELASTICSEARCH_QUEUE_SIZE, () -> f.applyAsDouble(stateObj));
meterRegistry.gauge(Gauges.Prometheus.TRACKER_ELASTICSEARCH_QUEUE_SIZE, stateObj, f);
}

public <T> void registerProducerTrackerElasticSearchRemainingCapacity(T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerGauge(Gauges.Graphite.PRODUCER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY, () -> f.applyAsDouble(stateObj));
meterRegistry.gauge(Gauges.Prometheus.TRACKER_ELASTICSEARCH_REMAINING_CAPACITY, stateObj, f);
}

public <T> void registerConsumerTrackerElasticSearchQueueSizeGauge(T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerGauge(Gauges.Graphite.CONSUMER_TRACKER_ELASTICSEARCH_QUEUE_SIZE, () -> f.applyAsDouble(stateObj));
meterRegistry.gauge(Gauges.Prometheus.TRACKER_ELASTICSEARCH_QUEUE_SIZE, stateObj, f);
}

public <T> void registerConsumerTrackerElasticSearchRemainingCapacity(T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerGauge(Gauges.Graphite.CONSUMER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY, () -> f.applyAsDouble(stateObj));
meterRegistry.gauge(Gauges.Prometheus.TRACKER_ELASTICSEARCH_REMAINING_CAPACITY, stateObj, f);
}

public HermesTimer trackerElasticSearchCommitLatencyTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.ELASTICSEARCH_COMMIT_LATENCY),
hermesMetrics.timer(Timers.ELASTICSEARCH_COMMIT_LATENCY)
);
}

private static class Gauges {
private static class Graphite {
public static final String PRODUCER_TRACKER_ELASTICSEARCH_QUEUE_SIZE =
"producer." + HOSTNAME + ".tracker.elasticsearch.queue-size";
public static final String PRODUCER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY =
"producer." + HOSTNAME + ".tracker.elasticsearch.remaining-capacity";

public static final String CONSUMER_TRACKER_ELASTICSEARCH_QUEUE_SIZE =
"consumer." + HOSTNAME + ".tracker.elasticsearch.queue-size";
public static final String CONSUMER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY =
"consumer." + HOSTNAME + ".tracker.elasticsearch.remaining-capacity";
}

private static class Prometheus {
public static final String TRACKER_ELASTICSEARCH_QUEUE_SIZE = "tracker.elasticsearch.queue-size";
public static final String TRACKER_ELASTICSEARCH_REMAINING_CAPACITY = "tracker.elasticsearch.remaining-capacity";
}
}

private static class Timers {
public static final String ELASTICSEARCH_COMMIT_LATENCY = "tracker.elasticsearch.commit-latency";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentType;
import pl.allegro.tech.hermes.metrics.HermesTimer;
import pl.allegro.tech.hermes.tracker.QueueCommitter;

import java.util.List;
Expand All @@ -22,7 +23,7 @@ public class ElasticsearchQueueCommitter extends QueueCommitter<ElasticsearchDoc
private final String typeName;

public ElasticsearchQueueCommitter(BlockingQueue<ElasticsearchDocument> queue,
Timer timer,
HermesTimer timer,
IndexFactory indexFactory,
String typeName,
Client client) {
Expand All @@ -44,7 +45,7 @@ public static void scheduleCommitAtFixedRate(BlockingQueue<ElasticsearchDocument
IndexFactory indexFactory,
String typeName,
Client client,
Timer timer,
HermesTimer timer,
int interval) {
ElasticsearchQueueCommitter committer = new ElasticsearchQueueCommitter(queue, timer, indexFactory, typeName, client);
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("elasticsearch-queue-committer-%d").build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package pl.allegro.tech.hermes.tracker.elasticsearch.consumers;

import com.codahale.metrics.MetricRegistry;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import pl.allegro.tech.hermes.api.SentMessageTraceStatus;
import pl.allegro.tech.hermes.metrics.PathsCompiler;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.metric.TrackerElasticSearchMetrics;
import pl.allegro.tech.hermes.tracker.BatchingLogRepository;
import pl.allegro.tech.hermes.tracker.consumers.LogRepository;
import pl.allegro.tech.hermes.tracker.consumers.MessageMetadata;
Expand All @@ -14,10 +14,9 @@
import pl.allegro.tech.hermes.tracker.elasticsearch.IndexFactory;
import pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware;
import pl.allegro.tech.hermes.tracker.elasticsearch.SchemaManager;
import pl.allegro.tech.hermes.tracker.elasticsearch.metrics.Gauges;
import pl.allegro.tech.hermes.tracker.elasticsearch.metrics.Timers;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static pl.allegro.tech.hermes.api.SentMessageTraceStatus.DISCARDED;
Expand All @@ -42,16 +41,10 @@ private ConsumersElasticsearchLogRepository(Client elasticClient,
int commitInterval,
IndexFactory indexFactory,
String typeName,
MetricRegistry metricRegistry,
PathsCompiler pathsCompiler) {
super(queueSize, clusterName, hostname, metricRegistry, pathsCompiler);
MetricsFacade metricsFacade) {
super(queueSize, clusterName, hostname);
this.elasticClient = elasticClient;

registerQueueSizeGauge(Gauges.CONSUMER_TRACKER_ELASTICSEARCH_QUEUE_SIZE);
registerRemainingCapacityGauge(Gauges.CONSUMER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY);

ElasticsearchQueueCommitter.scheduleCommitAtFixedRate(queue, indexFactory, typeName, elasticClient,
metricRegistry.timer(pathsCompiler.compile(Timers.CONSUMER_TRACKER_ELASTICSEARCH_COMMIT_LATENCY)), commitInterval);
registerMetrics(commitInterval, indexFactory, typeName, metricsFacade.trackerElasticSearch());
}

@Override
Expand Down Expand Up @@ -117,6 +110,17 @@ protected XContentBuilder notEndedDocument(MessageMetadata message, long timesta
.field(SOURCE_HOSTNAME, hostname);
}

private void registerMetrics(int commitInterval,
IndexFactory indexFactory,
String typeName,
TrackerElasticSearchMetrics trackerMetrics) {
trackerMetrics.registerConsumerTrackerElasticSearchQueueSizeGauge(this.queue, BlockingQueue::size);
trackerMetrics.registerConsumerTrackerElasticSearchRemainingCapacity(this.queue, BlockingQueue::size);

ElasticsearchQueueCommitter.scheduleCommitAtFixedRate(this.queue, indexFactory, typeName, elasticClient,
trackerMetrics.trackerElasticSearchCommitLatencyTimer(), commitInterval);
}

private long toSeconds(long millis) {
return millis / 1000;
}
Expand All @@ -131,13 +135,11 @@ public static class Builder {
private ConsumersIndexFactory indexFactory = new ConsumersDailyIndexFactory();
private String typeName = SchemaManager.SENT_TYPE;

private final MetricRegistry metricRegistry;
private final PathsCompiler pathsCompiler;
private final MetricsFacade metricsFacade;

public Builder(Client elasticClient, PathsCompiler pathsCompiler, MetricRegistry metricRegistry) {
public Builder(Client elasticClient, MetricsFacade metricsFacade) {
this.elasticClient = elasticClient;
this.pathsCompiler = pathsCompiler;
this.metricRegistry = metricRegistry;
this.metricsFacade = metricsFacade;
}

public Builder withElasticClient(Client elasticClient) {
Expand Down Expand Up @@ -183,8 +185,7 @@ public ConsumersElasticsearchLogRepository build() {
commitInterval,
indexFactory,
typeName,
metricRegistry,
pathsCompiler);
metricsFacade);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
package pl.allegro.tech.hermes.tracker.elasticsearch.frontend;

import com.codahale.metrics.MetricRegistry;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import pl.allegro.tech.hermes.api.PublishedMessageTraceStatus;
import pl.allegro.tech.hermes.metrics.PathsCompiler;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.metric.TrackerElasticSearchMetrics;
import pl.allegro.tech.hermes.tracker.BatchingLogRepository;
import pl.allegro.tech.hermes.tracker.elasticsearch.ElasticsearchDocument;
import pl.allegro.tech.hermes.tracker.elasticsearch.ElasticsearchQueueCommitter;
import pl.allegro.tech.hermes.tracker.elasticsearch.IndexFactory;
import pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware;
import pl.allegro.tech.hermes.tracker.elasticsearch.SchemaManager;
import pl.allegro.tech.hermes.tracker.elasticsearch.metrics.Gauges;
import pl.allegro.tech.hermes.tracker.elasticsearch.metrics.Timers;
import pl.allegro.tech.hermes.tracker.frontend.LogRepository;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static pl.allegro.tech.hermes.api.PublishedMessageTraceStatus.ERROR;
Expand All @@ -41,16 +40,10 @@ private FrontendElasticsearchLogRepository(Client elasticClient,
int commitInterval,
IndexFactory indexFactory,
String typeName,
MetricRegistry metricRegistry,
PathsCompiler pathsCompiler) {
super(queueSize, clusterName, hostname, metricRegistry, pathsCompiler);

MetricsFacade metricsFacade) {
super(queueSize, clusterName, hostname);
this.elasticClient = elasticClient;
registerQueueSizeGauge(Gauges.PRODUCER_TRACKER_ELASTICSEARCH_QUEUE_SIZE);
registerRemainingCapacityGauge(Gauges.PRODUCER_TRACKER_ELASTICSEARCH_REMAINING_CAPACITY);

ElasticsearchQueueCommitter.scheduleCommitAtFixedRate(queue, indexFactory, typeName, elasticClient,
metricRegistry.timer(pathsCompiler.compile(Timers.PRODUCER_TRACKER_ELASTICSEARCH_COMMIT_LATENCY)), commitInterval);
registerMetrics(commitInterval, indexFactory, typeName, metricsFacade.trackerElasticSearch());
}

@Override
Expand Down Expand Up @@ -130,6 +123,17 @@ protected XContentBuilder notEndedDocument(String messageId,
.collect(extraRequestHeadersCollector()));
}

private void registerMetrics(int commitInterval,
IndexFactory indexFactory,
String typeName,
TrackerElasticSearchMetrics trackerMetrics) {
trackerMetrics.registerProducerTrackerElasticSearchQueueSizeGauge(this.queue, BlockingQueue::size);
trackerMetrics.registerProducerTrackerElasticSearchRemainingCapacity(this.queue, BlockingQueue::size);

ElasticsearchQueueCommitter.scheduleCommitAtFixedRate(this.queue, indexFactory, typeName, elasticClient,
trackerMetrics.trackerElasticSearchCommitLatencyTimer(), commitInterval);
}

private long toSeconds(long millis) {
return millis / 1000;
}
Expand All @@ -144,13 +148,11 @@ public static class Builder {
private FrontendIndexFactory indexFactory = new FrontendDailyIndexFactory();
private String typeName = SchemaManager.PUBLISHED_TYPE;

private final MetricRegistry metricRegistry;
private final PathsCompiler pathsCompiler;
private final MetricsFacade metricsFacade;

public Builder(Client elasticClient, PathsCompiler pathsCompiler, MetricRegistry metricRegistry) {
public Builder(Client elasticClient, MetricsFacade metricsFacade) {
this.elasticClient = elasticClient;
this.pathsCompiler = pathsCompiler;
this.metricRegistry = metricRegistry;
this.metricsFacade = metricsFacade;
}

public Builder withElasticClient(Client elasticClient) {
Expand Down Expand Up @@ -196,8 +198,7 @@ public FrontendElasticsearchLogRepository build() {
commitInterval,
indexFactory,
typeName,
metricRegistry,
pathsCompiler);
metricsFacade);
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package pl.allegro.tech.hermes.tracker.elasticsearch.consumers;

import com.codahale.metrics.MetricRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import pl.allegro.tech.hermes.api.SentMessageTraceStatus;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.metrics.PathsCompiler;
import pl.allegro.tech.hermes.tracker.consumers.AbstractLogRepositoryTest;
import pl.allegro.tech.hermes.tracker.consumers.LogRepository;
Expand All @@ -33,6 +36,11 @@ public class ConsumersElasticsearchLogRepositoryTest extends AbstractLogReposito
private static final Clock clock = Clock.fixed(LocalDate.now().atStartOfDay().toInstant(ZoneOffset.UTC), ZoneId.systemDefault());
private static final ConsumersIndexFactory indexFactory = new ConsumersDailyIndexFactory(clock);
private static final FrontendIndexFactory frontendIndexFactory = new FrontendDailyIndexFactory(clock);
private static final MetricsFacade metricsFacade = new MetricsFacade(
new SimpleMeterRegistry(),
new HermesMetrics(new MetricRegistry(), new PathsCompiler(""))
);


private static ElasticsearchResource elasticsearch = new ElasticsearchResource();
private SchemaManager schemaManager;
Expand All @@ -51,7 +59,7 @@ public void after() {
@Override
protected LogRepository createLogRepository() {
schemaManager.ensureSchema();
return new ConsumersElasticsearchLogRepository.Builder(elasticsearch.client(), new PathsCompiler("localhost"), new MetricRegistry())
return new ConsumersElasticsearchLogRepository.Builder(elasticsearch.client(), metricsFacade)
.withIndexFactory(indexFactory)
.build();
}
Expand Down
Loading

0 comments on commit b505719

Please sign in to comment.