From 80ee439b2a958ad01a58422de93e52c4784e476a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20F=C4=85derski?= Date: Fri, 18 Aug 2023 12:51:07 +0200 Subject: [PATCH] Zookeeper counter based on prometheus metrics (#1706) * Zookeeper counter based on prometheus metrics * Fix method name --- .../allegro/tech/hermes/api/TopicMetrics.java | 2 +- .../di/factories/MetricRegistryFactory.java | 13 -- .../MicrometerRegistryParameters.java | 5 + .../PrometheusMeterRegistryFactory.java | 31 ++++- .../common/metric/SubscriptionMetrics.java | 89 ++++++------ .../hermes/common/metric/TopicMetrics.java | 70 ++++++---- .../counter/zookeeper/CounterMatcher.java | 68 +++++++--- .../zookeeper/ZookeeperCounterReporter.java | 115 +++++++--------- .../zookeeper/CounterMatcherTest.groovy | 26 ++-- .../ZookeeperCounterReporterTest.java | 127 ++++++------------ .../consumers/config/CommonConfiguration.java | 8 +- .../config/MicrometerRegistryProperties.java | 29 ++++ .../frontend/config/CommonConfiguration.java | 8 +- .../config/MicrometerRegistryProperties.java | 29 ++++ hermes-management/build.gradle | 5 +- .../config/ManagementConfiguration.java | 18 --- .../config/MicrometerRegistryProperties.java | 4 +- .../config/PrometheusConfiguration.java | 65 +++++++++ 18 files changed, 405 insertions(+), 307 deletions(-) create mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/PrometheusConfiguration.java diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicMetrics.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicMetrics.java index 76b684c5ec..ddc89bd6cc 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicMetrics.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicMetrics.java @@ -69,7 +69,7 @@ public static TopicMetrics unavailable() { } public static class Builder { - private TopicMetrics topicMetrics; + private final TopicMetrics topicMetrics; public Builder() { topicMetrics = new TopicMetrics(); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/MetricRegistryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/MetricRegistryFactory.java index d541791598..1d1de61579 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/MetricRegistryFactory.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/MetricRegistryFactory.java @@ -17,8 +17,6 @@ import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.common.metric.HermesMetrics; import pl.allegro.tech.hermes.common.metric.MetricRegistryWithHdrHistogramReservoir; -import pl.allegro.tech.hermes.common.metric.counter.CounterStorage; -import pl.allegro.tech.hermes.common.metric.counter.zookeeper.ZookeeperCounterReporter; import pl.allegro.tech.hermes.common.util.InstanceIdResolver; import java.net.InetSocketAddress; @@ -33,18 +31,15 @@ public class MetricRegistryFactory { private static final Logger logger = LoggerFactory.getLogger(MetricRegistryFactory.class); private final MetricRegistryParameters metricRegistryParameters; private final GraphiteParameters graphiteParameters; - private final CounterStorage counterStorage; private final InstanceIdResolver instanceIdResolver; private final String moduleName; public MetricRegistryFactory(MetricRegistryParameters metricRegistryParameters, GraphiteParameters graphiteParameters, - CounterStorage counterStorage, InstanceIdResolver instanceIdResolver, @Named("moduleName") String moduleName) { this.metricRegistryParameters = metricRegistryParameters; this.graphiteParameters = graphiteParameters; - this.counterStorage = counterStorage; this.instanceIdResolver = instanceIdResolver; this.moduleName = moduleName; } @@ -73,14 +68,6 @@ public MetricRegistry provide() { metricRegistryParameters.getReportPeriod().toSeconds(), TimeUnit.SECONDS ); } - - if (metricRegistryParameters.isZookeeperReporterEnabled()) { - new ZookeeperCounterReporter(registry, counterStorage, graphiteParameters.getPrefix()).start( - metricRegistryParameters.getReportPeriod().toSeconds(), - TimeUnit.SECONDS - ); - } - registerJvmMetrics(registry); return registry; diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/MicrometerRegistryParameters.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/MicrometerRegistryParameters.java index 5a967bb9e8..48a74a84ea 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/MicrometerRegistryParameters.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/MicrometerRegistryParameters.java @@ -1,7 +1,12 @@ package pl.allegro.tech.hermes.common.di.factories; +import java.time.Duration; import java.util.List; public interface MicrometerRegistryParameters { List getPercentiles(); + + boolean zookeeperReporterEnabled(); + + Duration zookeeperReportPeriod(); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/PrometheusMeterRegistryFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/PrometheusMeterRegistryFactory.java index 9f7d24bb62..6a06ec5eba 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/PrometheusMeterRegistryFactory.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/PrometheusMeterRegistryFactory.java @@ -5,22 +5,36 @@ import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.prometheus.PrometheusConfig; import io.micrometer.prometheus.PrometheusMeterRegistry; +import pl.allegro.tech.hermes.common.metric.counter.CounterStorage; +import pl.allegro.tech.hermes.common.metric.counter.zookeeper.ZookeeperCounterReporter; + +import java.util.concurrent.TimeUnit; public class PrometheusMeterRegistryFactory { private final MicrometerRegistryParameters parameters; private final PrometheusConfig prometheusConfig; + private final CounterStorage counterStorage; private final String prefix; public PrometheusMeterRegistryFactory(MicrometerRegistryParameters parameters, PrometheusConfig prometheusConfig, - String prefix) { + CounterStorage counterStorage, String prefix) { this.parameters = parameters; this.prometheusConfig = prometheusConfig; + this.counterStorage = counterStorage; this.prefix = prefix + "_"; } public PrometheusMeterRegistry provide() { PrometheusMeterRegistry meterRegistry = new PrometheusMeterRegistry(prometheusConfig); + applyFilters(meterRegistry); + if (parameters.zookeeperReporterEnabled()) { + registerZookeeperReporter(meterRegistry); + } + return meterRegistry; + } + + private void applyFilters(PrometheusMeterRegistry meterRegistry) { meterRegistry.config().meterFilter(new MeterFilter() { @Override public Meter.Id map(Meter.Id id) { @@ -28,14 +42,17 @@ public Meter.Id map(Meter.Id id) { } @Override - public DistributionStatisticConfig configure(Meter.Id id, - DistributionStatisticConfig config) { + public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) { return DistributionStatisticConfig.builder() - .percentiles(parameters.getPercentiles().stream().mapToDouble(Double::doubleValue).toArray()) - .build() - .merge(config); + .percentiles(parameters.getPercentiles() + .stream().mapToDouble(Double::doubleValue).toArray() + ).build().merge(config); } }); - return meterRegistry; + } + + private void registerZookeeperReporter(PrometheusMeterRegistry meterRegistry) { + new ZookeeperCounterReporter(meterRegistry, counterStorage, prefix) + .start(parameters.zookeeperReportPeriod().toSeconds(), TimeUnit.SECONDS); } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/SubscriptionMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/SubscriptionMetrics.java index a4db4f5efb..ea41df5b58 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/SubscriptionMetrics.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/SubscriptionMetrics.java @@ -5,30 +5,14 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import pl.allegro.tech.hermes.api.SubscriptionName; -import pl.allegro.tech.hermes.metrics.DefaultHermesHistogram; import pl.allegro.tech.hermes.metrics.HermesCounter; import pl.allegro.tech.hermes.metrics.HermesHistogram; import pl.allegro.tech.hermes.metrics.HermesTimer; import pl.allegro.tech.hermes.metrics.counters.HermesCounters; -import java.util.concurrent.TimeUnit; import java.util.function.ToDoubleFunction; -import static pl.allegro.tech.hermes.common.metric.Counters.DELIVERED; -import static pl.allegro.tech.hermes.common.metric.Counters.DISCARDED; -import static pl.allegro.tech.hermes.common.metric.Meters.DISCARDED_METER; -import static pl.allegro.tech.hermes.common.metric.Meters.DISCARDED_SUBSCRIPTION_METER; -import static pl.allegro.tech.hermes.common.metric.Meters.DISCARDED_TOPIC_METER; -import static pl.allegro.tech.hermes.common.metric.Meters.FAILED_METER_SUBSCRIPTION; -import static pl.allegro.tech.hermes.common.metric.Meters.FILTERED_METER; -import static pl.allegro.tech.hermes.common.metric.Meters.METER; -import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_BATCH_METER; -import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_METER; -import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_THROUGHPUT_BYTES; -import static pl.allegro.tech.hermes.common.metric.Meters.TOPIC_METER; import static pl.allegro.tech.hermes.common.metric.SubscriptionTagsFactory.subscriptionTags; -import static pl.allegro.tech.hermes.common.metric.Timers.CONSUMER_IDLE_TIME; -import static pl.allegro.tech.hermes.common.metric.Timers.SUBSCRIPTION_LATENCY; public class SubscriptionMetrics { private final HermesMetrics hermesMetrics; @@ -41,68 +25,68 @@ public SubscriptionMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegis public SubscriptionHermesCounter throughputInBytes(SubscriptionName subscription) { return SubscriptionHermesCounter.from( - micrometerCounter("subscription-throughput-bytes", subscription), - hermesMetrics.meter(SUBSCRIPTION_THROUGHPUT_BYTES, subscription.getTopicName(), subscription.getName()), - SUBSCRIPTION_THROUGHPUT_BYTES, subscription); + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_THROUGHPUT, subscription), + hermesMetrics.meter(Meters.SUBSCRIPTION_THROUGHPUT_BYTES, subscription.getTopicName(), subscription.getName()), + Meters.SUBSCRIPTION_THROUGHPUT_BYTES, subscription); } public HermesCounter successes(SubscriptionName subscription) { return size -> { - hermesMetrics.meter(METER).mark(size); - hermesMetrics.meter(TOPIC_METER, subscription.getTopicName()).mark(size); - hermesMetrics.meter(SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size); - hermesMetrics.counter(DELIVERED, subscription.getTopicName(), subscription.getName()).inc(size); - micrometerCounter("subscription.delivered", subscription).increment(size); + hermesMetrics.meter(Meters.METER).mark(size); + hermesMetrics.meter(Meters.TOPIC_METER, subscription.getTopicName()).mark(size); + hermesMetrics.meter(Meters.SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size); + hermesMetrics.counter(Counters.DELIVERED, subscription.getTopicName(), subscription.getName()).inc(size); + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_DELIVERED, subscription).increment(size); }; } public HermesCounter batchSuccesses(SubscriptionName subscription) { return HermesCounters.from( - micrometerCounter("subscription.batches", subscription), - hermesMetrics.meter(SUBSCRIPTION_BATCH_METER, subscription.getTopicName(), subscription.getName()) + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_BATCHES, subscription), + hermesMetrics.meter(Meters.SUBSCRIPTION_BATCH_METER, subscription.getTopicName(), subscription.getName()) ); } public HermesCounter discarded(SubscriptionName subscription) { return size -> { - hermesMetrics.meter(DISCARDED_METER).mark(size); - hermesMetrics.meter(DISCARDED_TOPIC_METER, subscription.getTopicName()).mark(size); - hermesMetrics.meter(DISCARDED_SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size); - hermesMetrics.counter(DISCARDED, subscription.getTopicName(), subscription.getName()).inc(size); - micrometerCounter("subscription.discarded", subscription).increment(size); + hermesMetrics.meter(Meters.DISCARDED_METER).mark(size); + hermesMetrics.meter(Meters.DISCARDED_TOPIC_METER, subscription.getTopicName()).mark(size); + hermesMetrics.meter(Meters.DISCARDED_SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size); + hermesMetrics.counter(Counters.DISCARDED, subscription.getTopicName(), subscription.getName()).inc(size); + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_DISCARDED, subscription).increment(size); }; } public HermesTimer latency(SubscriptionName subscription) { return HermesTimer.from( - meterRegistry.timer("subscription.latency", subscriptionTags(subscription)), - hermesMetrics.timer(SUBSCRIPTION_LATENCY, subscription.getTopicName(), subscription.getName()) + meterRegistry.timer(SubscriptionMetricsNames.SUBSCRIPTION_LATENCY, subscriptionTags(subscription)), + hermesMetrics.timer(Timers.SUBSCRIPTION_LATENCY, subscription.getTopicName(), subscription.getName()) ); } public void registerInflightGauge(SubscriptionName subscription, T obj, ToDoubleFunction f) { hermesMetrics.registerInflightGauge(subscription, () -> (int) f.applyAsDouble(obj)); - meterRegistry.gauge("subscription.inflight", subscriptionTags(subscription), obj, f); + meterRegistry.gauge(SubscriptionMetricsNames.SUBSCRIPTION_INFLIGHT, subscriptionTags(subscription), obj, f); } public HermesTimer consumerIdleTimer(SubscriptionName subscription) { return HermesTimer.from( - meterRegistry.timer("subscription.idle-duration", subscriptionTags(subscription)), - hermesMetrics.timer(CONSUMER_IDLE_TIME, subscription.getTopicName(), subscription.getName()) + meterRegistry.timer(SubscriptionMetricsNames.SUBSCRIPTION_IDLE_DURATION, subscriptionTags(subscription)), + hermesMetrics.timer(Timers.CONSUMER_IDLE_TIME, subscription.getTopicName(), subscription.getName()) ); } public HermesCounter filteredOutCounter(SubscriptionName subscription) { return HermesCounters.from( - micrometerCounter("subscription.filtered-out", subscription), - hermesMetrics.meter(FILTERED_METER, subscription.getTopicName(), subscription.getName()) + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_FILTERED_OUT, subscription), + hermesMetrics.meter(Meters.FILTERED_METER, subscription.getTopicName(), subscription.getName()) ); } public HermesCounter httpAnswerCounter(SubscriptionName subscription, int statusCode) { return size -> { meterRegistry.counter( - "subscription.http-status-codes", + SubscriptionMetricsNames.SUBSCRIPTION_HTTP_STATUS_CODES, Tags.concat(subscriptionTags(subscription), "status_code", String.valueOf(statusCode)) ).increment(size); hermesMetrics.registerConsumerHttpAnswer(subscription, statusCode, size); @@ -111,28 +95,28 @@ public HermesCounter httpAnswerCounter(SubscriptionName subscription, int status public HermesCounter timeoutsCounter(SubscriptionName subscription) { return HermesCounters.from( - micrometerCounter("subscription.timeouts", subscription), + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_TIMEOUTS, subscription), hermesMetrics.consumerErrorsTimeoutMeter(subscription) ); } public HermesCounter otherErrorsCounter(SubscriptionName subscription) { return HermesCounters.from( - micrometerCounter("subscription.other-errors", subscription), + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_OTHER_ERRORS, subscription), hermesMetrics.consumerErrorsOtherMeter(subscription) ); } public HermesCounter failuresCounter(SubscriptionName subscription) { return HermesCounters.from( - micrometerCounter("subscription.failures", subscription), - hermesMetrics.meter(FAILED_METER_SUBSCRIPTION, subscription.getTopicName(), subscription.getName()) + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_FAILURES, subscription), + hermesMetrics.meter(Meters.FAILED_METER_SUBSCRIPTION, subscription.getTopicName(), subscription.getName()) ); } public HermesHistogram inflightTimeInMillisHistogram(SubscriptionName subscriptionName) { return value -> { - DistributionSummary.builder("subscription.inflight-time-seconds") + DistributionSummary.builder(SubscriptionMetricsNames.SUBSCRIPTION_INFLIGHT_TIME) .tags(subscriptionTags(subscriptionName)) .register(meterRegistry) .record(value / 1000d); @@ -143,4 +127,21 @@ public HermesHistogram inflightTimeInMillisHistogram(SubscriptionName subscripti private Counter micrometerCounter(String metricName, SubscriptionName subscription) { return meterRegistry.counter(metricName, subscriptionTags(subscription)); } + + public static class SubscriptionMetricsNames { + public static final String SUBSCRIPTION_DELIVERED = "subscription.delivered"; + public static final String SUBSCRIPTION_THROUGHPUT = "subscription.throughput-bytes"; + public static final String SUBSCRIPTION_BATCHES = "subscription.batches"; + public static final String SUBSCRIPTION_DISCARDED = "subscription.discarded"; + public static final String SUBSCRIPTION_LATENCY = "subscription.latency"; + public static final String SUBSCRIPTION_INFLIGHT = "subscription.inflight"; + public static final String SUBSCRIPTION_IDLE_DURATION = "subscription.idle-duration"; + public static final String SUBSCRIPTION_FILTERED_OUT = "subscription.filtered-out"; + public static final String SUBSCRIPTION_HTTP_STATUS_CODES = "subscription.http-status-codes"; + public static final String SUBSCRIPTION_TIMEOUTS = "subscription.timeouts"; + public static final String SUBSCRIPTION_OTHER_ERRORS = "subscription.other-errors"; + public static final String SUBSCRIPTION_FAILURES = "subscription.failures"; + public static final String SUBSCRIPTION_INFLIGHT_TIME = "subscription.inflight-time-seconds"; + } + } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/TopicMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/TopicMetrics.java index 331ea9b35d..1d840e3b5d 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/TopicMetrics.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/TopicMetrics.java @@ -15,11 +15,6 @@ import pl.allegro.tech.hermes.metrics.counters.MeterBackedHermesCounter; import static pl.allegro.tech.hermes.common.metric.Meters.DELAYED_PROCESSING; -import static pl.allegro.tech.hermes.common.metric.Meters.METER; -import static pl.allegro.tech.hermes.common.metric.Meters.THROUGHPUT_BYTES; -import static pl.allegro.tech.hermes.common.metric.Meters.TOPIC_DELAYED_PROCESSING; -import static pl.allegro.tech.hermes.common.metric.Meters.TOPIC_METER; -import static pl.allegro.tech.hermes.common.metric.Meters.TOPIC_THROUGHPUT_BYTES; public class TopicMetrics { private final HermesMetrics hermesMetrics; @@ -32,107 +27,108 @@ public TopicMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) { public HermesTimer ackAllGlobalLatency() { return HermesTimer.from( - meterRegistry.timer("ack-all.global-latency"), + meterRegistry.timer(TopicMetricsNames.TOPIC_ACK_ALL_GLOBAL_LATENCY), hermesMetrics.timer(Timers.ACK_ALL_LATENCY) ); } public HermesTimer ackAllTopicLatency(TopicName topic) { return HermesTimer.from( - micrometerTimer("ack-all.topic-latency", topic), + micrometerTimer(TopicMetricsNames.TOPIC_ACK_ALL_LATENCY, topic), hermesMetrics.timer(Timers.ACK_ALL_TOPIC_LATENCY, topic)); } public HermesTimer ackAllBrokerLatency() { return HermesTimer.from( - meterRegistry.timer("ack-all.broker-latency"), + meterRegistry.timer(TopicMetricsNames.TOPIC_ACK_ALL_BROKER_LATENCY), hermesMetrics.timer(Timers.ACK_ALL_BROKER_LATENCY)); } public HermesTimer ackLeaderGlobalLatency() { return HermesTimer.from( - meterRegistry.timer("ack-leader.global-latency"), + meterRegistry.timer(TopicMetricsNames.TOPIC_ACK_LEADER_GLOBAL_LATENCY), hermesMetrics.timer(Timers.ACK_LEADER_LATENCY)); } public HermesTimer ackLeaderTopicLatency(TopicName topic) { return HermesTimer.from( - micrometerTimer("ack-leader.topic-latency", topic), + micrometerTimer(TopicMetricsNames.TOPIC_ACK_LEADER_LATENCY, topic), hermesMetrics.timer(Timers.ACK_LEADER_TOPIC_LATENCY, topic)); } public HermesTimer ackLeaderBrokerLatency() { return HermesTimer.from( - meterRegistry.timer("ack-leader.broker-latency"), + meterRegistry.timer(TopicMetricsNames.TOPIC_ACK_LEADER_BROKER_LATENCY), hermesMetrics.timer(Timers.ACK_LEADER_BROKER_LATENCY)); } public MeterBackedHermesCounter topicThroughputBytes(TopicName topicName) { return HermesCounters.from( - micrometerCounter("topic-throughput-bytes", topicName), - hermesMetrics.meter(TOPIC_THROUGHPUT_BYTES, topicName) + micrometerCounter(TopicMetricsNames.TOPIC_THROUGHPUT, topicName), + hermesMetrics.meter(Meters.TOPIC_THROUGHPUT_BYTES, topicName) ); } public MeterBackedHermesCounter topicGlobalThroughputBytes() { return HermesCounters.from( - meterRegistry.counter("topic-global-throughput-bytes"), - hermesMetrics.meter(THROUGHPUT_BYTES) + meterRegistry.counter(TopicMetricsNames.TOPIC_GLOBAL_THROUGHPUT), + hermesMetrics.meter(Meters.THROUGHPUT_BYTES) ); } public HermesCounter topicPublished(TopicName topicName) { return HermesCounters.from( - micrometerCounter("published", topicName), + micrometerCounter(TopicMetricsNames.TOPIC_PUBLISHED, topicName), hermesMetrics.counter(Counters.PUBLISHED, topicName) ); } public HermesCounter topicGlobalRequestCounter() { return HermesCounters.from( - meterRegistry.counter("topic-global-requests"), - hermesMetrics.meter(METER) + meterRegistry.counter(TopicMetricsNames.TOPIC_GLOBAL_REQUESTS), + hermesMetrics.meter(Meters.METER) ); } public HermesCounter topicRequestCounter(TopicName topicName) { return HermesCounters.from( - micrometerCounter("topic-requests", topicName), - hermesMetrics.meter(TOPIC_METER, topicName) + micrometerCounter(TopicMetricsNames.TOPIC_REQUESTS, topicName), + hermesMetrics.meter(Meters.TOPIC_METER, topicName) ); } public HermesCounter topicGlobalDelayedProcessingCounter() { return HermesCounters.from( - meterRegistry.counter("topic-global-delayed-processing"), + meterRegistry.counter(TopicMetricsNames.TOPIC_GLOBAL_DELAYED_PROCESSING), hermesMetrics.meter(DELAYED_PROCESSING) ); } public HermesCounter topicDelayedProcessingCounter(TopicName topicName) { return HermesCounters.from( - micrometerCounter("topic-delayed-processing", topicName), - hermesMetrics.meter(TOPIC_DELAYED_PROCESSING, topicName) + micrometerCounter(TopicMetricsNames.TOPIC_DELAYED_PROCESSING, topicName), + hermesMetrics.meter(Meters.TOPIC_DELAYED_PROCESSING, topicName) ); } public HermesCounter topicGlobalHttpStatusCodeCounter(int statusCode) { return HermesCounters.from( - meterRegistry.counter("topic-global-http-status-codes", Tags.of("status_code", String.valueOf(statusCode))), + meterRegistry.counter(TopicMetricsNames.TOPIC_GLOBAL_HTTP_STATUS_CODES, Tags.of("status_code", String.valueOf(statusCode))), hermesMetrics.httpStatusCodeMeter(statusCode) ); } public HermesCounter topicHttpStatusCodeCounter(TopicName topicName, int statusCode) { return HermesCounters.from( - meterRegistry.counter("topic-http-status-codes", topicTags(topicName).and("status_code", String.valueOf(statusCode))), + meterRegistry.counter(TopicMetricsNames.TOPIC_HTTP_STATUS_CODES, topicTags(topicName) + .and("status_code", String.valueOf(statusCode))), hermesMetrics.httpStatusCodeMeter(statusCode, topicName) ); } public HermesHistogram topicGlobalMessageContentSizeHistogram() { return DefaultHermesHistogram.of( - DistributionSummary.builder("topic-global-message-size-bytes") + DistributionSummary.builder(TopicMetricsNames.TOPIC_GLOBAL_MESSAGE_SIZE_BYTES) .register(meterRegistry), hermesMetrics.messageContentSizeHistogram() ); @@ -140,7 +136,7 @@ public HermesHistogram topicGlobalMessageContentSizeHistogram() { public HermesHistogram topicMessageContentSizeHistogram(TopicName topicName) { return DefaultHermesHistogram.of( - DistributionSummary.builder("topic-message-size-bytes") + DistributionSummary.builder(TopicMetricsNames.TOPIC_MESSAGE_SIZE_BYTES) .tags(topicTags(topicName)) .register(meterRegistry), hermesMetrics.messageContentSizeHistogram(topicName) @@ -161,4 +157,24 @@ private Tags topicTags(TopicName topicName) { Tag.of("topic", topicName.getName()) ); } + + public static class TopicMetricsNames { + public static final String TOPIC_ACK_ALL_GLOBAL_LATENCY = "topic.ack-all.global-latency"; + public static final String TOPIC_ACK_ALL_LATENCY = "topic.ack-all.latency"; + public static final String TOPIC_ACK_ALL_BROKER_LATENCY = "topic.ack-all.broker-latency"; + public static final String TOPIC_ACK_LEADER_GLOBAL_LATENCY = "topic.ack-leader.global-latency"; + public static final String TOPIC_ACK_LEADER_LATENCY = "topic.ack-leader.latency"; + public static final String TOPIC_ACK_LEADER_BROKER_LATENCY = "topic.ack-leader.broker-latency"; + public static final String TOPIC_THROUGHPUT = "topic.throughput-bytes"; + public static final String TOPIC_GLOBAL_THROUGHPUT = "topic.global-throughput-bytes"; + public static final String TOPIC_PUBLISHED = "topic.published"; + public static final String TOPIC_GLOBAL_REQUESTS = "topic.global-requests"; + public static final String TOPIC_REQUESTS = "topic.requests"; + public static final String TOPIC_GLOBAL_DELAYED_PROCESSING = "topic-global-delayed-processing"; + public static final String TOPIC_DELAYED_PROCESSING = "topic-delayed-processing"; + public static final String TOPIC_GLOBAL_HTTP_STATUS_CODES = "topic-global-http-status-codes"; + public static final String TOPIC_HTTP_STATUS_CODES = "topic-http-status-codes"; + public static final String TOPIC_GLOBAL_MESSAGE_SIZE_BYTES = "topic-global-message-size-bytes"; + public static final String TOPIC_MESSAGE_SIZE_BYTES = "topic-message-size-bytes"; + } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/CounterMatcher.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/CounterMatcher.java index b1fdced29c..7e52b5186b 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/CounterMatcher.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/CounterMatcher.java @@ -1,68 +1,92 @@ package pl.allegro.tech.hermes.common.metric.counter.zookeeper; +import io.micrometer.core.instrument.Counter; +import pl.allegro.tech.hermes.api.TopicName; + import java.util.Optional; +import static pl.allegro.tech.hermes.common.metric.SubscriptionMetrics.SubscriptionMetricsNames; +import static pl.allegro.tech.hermes.common.metric.TopicMetrics.TopicMetricsNames; + class CounterMatcher { - private static final int TOPIC_METRICS_PARTS = 3; - private static final int SUBSCRIPTION_METRIC_PARTS = 4; + private static final String GROUP_TAG_NAME = "group"; + private static final String TOPIC_TAG_NAME = "topic"; + private static final String SUBSCRIPTION_TAG_NAME = "subscription"; - private final String counterName; - private String topicName; + private final Counter counter; + private final String metricSearchPrefix; + private TopicName topicName; + private long value; private Optional subscription; - private int metricParts; - public CounterMatcher(String counterName) { - this.counterName = counterName; - parseCounter(counterName); + public CounterMatcher(Counter counter, String metricSearchPrefix) { + this.counter = counter; + this.metricSearchPrefix = metricSearchPrefix; + parseCounter(this.counter); } - private void parseCounter(String counterName) { - String[] splitted = counterName.split("\\."); - metricParts = splitted.length; + private void parseCounter(Counter counter) { if (isTopicPublished() || isTopicThroughput()) { - topicName = splitted[splitted.length - 2] + "." + splitted[splitted.length - 1]; + topicName = new TopicName(counter.getId().getTag(GROUP_TAG_NAME), counter.getId().getTag(TOPIC_TAG_NAME)); subscription = Optional.empty(); } else if ( isSubscriptionDelivered() || isSubscriptionThroughput() || isSubscriptionDiscarded() || isSubscriptionFiltered() - ) { - subscription = Optional.of(splitted[splitted.length - 1]); - topicName = splitted[splitted.length - 3] + "." + splitted[splitted.length - 2]; + ) { + topicName = new TopicName(counter.getId().getTag(GROUP_TAG_NAME), counter.getId().getTag(TOPIC_TAG_NAME)); + subscription = Optional.of(counter.getId().getTag(SUBSCRIPTION_TAG_NAME)); } + value = (long) counter.count(); } public boolean isTopicPublished() { - return counterName.startsWith("published."); + return isTopicCounter() && nameEquals(TopicMetricsNames.TOPIC_PUBLISHED); } public boolean isTopicThroughput() { - return metricParts == TOPIC_METRICS_PARTS && counterName.startsWith("throughput."); + return isTopicCounter() && nameEquals(TopicMetricsNames.TOPIC_THROUGHPUT); } public boolean isSubscriptionThroughput() { - return metricParts == SUBSCRIPTION_METRIC_PARTS && counterName.startsWith("throughput."); + return isSubscriptionCounter() && nameEquals(SubscriptionMetricsNames.SUBSCRIPTION_THROUGHPUT); } public boolean isSubscriptionDelivered() { - return counterName.startsWith("delivered."); + return isSubscriptionCounter() && nameEquals(SubscriptionMetricsNames.SUBSCRIPTION_DELIVERED); } public boolean isSubscriptionDiscarded() { - return counterName.startsWith("discarded."); + return isSubscriptionCounter() && nameEquals(SubscriptionMetricsNames.SUBSCRIPTION_DISCARDED); } public boolean isSubscriptionFiltered() { - return counterName.startsWith("filtered."); + return isSubscriptionCounter() && nameEquals(SubscriptionMetricsNames.SUBSCRIPTION_FILTERED_OUT); } - public String getTopicName() { + public TopicName getTopicName() { return topicName; } public String getSubscriptionName() { return subscription.orElse(""); } + + public long getValue() { + return value; + } + + private boolean isTopicCounter() { + return counter.getId().getTag(TOPIC_TAG_NAME) != null; + } + + private boolean isSubscriptionCounter() { + return counter.getId().getTag(SUBSCRIPTION_TAG_NAME) != null; + } + + private boolean nameEquals(String name) { + return counter.getId().getName().equals(metricSearchPrefix + name); + } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/ZookeeperCounterReporter.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/ZookeeperCounterReporter.java index 9a17cf7f77..c66efdc640 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/ZookeeperCounterReporter.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/ZookeeperCounterReporter.java @@ -1,108 +1,99 @@ package pl.allegro.tech.hermes.common.metric.counter.zookeeper; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricFilter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.Timer; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.search.Search; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.common.metric.HermesMetrics; -import pl.allegro.tech.hermes.common.metric.Meters; import pl.allegro.tech.hermes.common.metric.counter.CounterStorage; -import java.util.SortedMap; +import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static pl.allegro.tech.hermes.api.TopicName.fromQualifiedName; - -public class ZookeeperCounterReporter extends ScheduledReporter { - - private static final String ZOOKEEPER_REPORTER_NAME = "zookeeper-reporter"; - private static final TimeUnit RATE_UNIT = TimeUnit.SECONDS; - private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS; +public class ZookeeperCounterReporter { + private static final Logger logger = LoggerFactory.getLogger(ZookeeperCounterReporter.class); private final CounterStorage counterStorage; + private final String metricsSearchPrefix; + private final MeterRegistry meterRegistry; - public ZookeeperCounterReporter(MetricRegistry registry, + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("zookeeper-reporter-scheduled-executor-%d") + .setDaemon(true) + .build()); + + public ZookeeperCounterReporter(MeterRegistry registry, CounterStorage counterStorage, - String graphitePrefix - ) { - super( - registry, - ZOOKEEPER_REPORTER_NAME, - new ZookeeperMetricsFilter(graphitePrefix), - RATE_UNIT, - DURATION_UNIT - ); + String metricsSearchPrefix) { + this.meterRegistry = registry; this.counterStorage = counterStorage; + this.metricsSearchPrefix = metricsSearchPrefix; } - @Override - public void report(SortedMap gauges, - SortedMap counters, - SortedMap histograms, - SortedMap meters, - SortedMap timers) { - - counters.forEach(this::reportCounter); + public void start(long period, TimeUnit unit) { + scheduledExecutorService.scheduleWithFixedDelay(this::report, 0, period, unit); + } - meters - .entrySet() - .stream() - .filter(meterEntry -> meterEntry.getKey().startsWith(Meters.THROUGHPUT_BYTES)) - .forEach(meterEntry -> reportVolumeCounter(meterEntry.getKey(), meterEntry.getValue().getCount())); + public void report() { + try { + Collection counters = Search.in(meterRegistry).counters(); + counters.forEach(counter -> { + CounterMatcher matcher = new CounterMatcher(counter, metricsSearchPrefix); + reportCounter(matcher); + reportVolumeCounter(matcher); + }); + } catch (RuntimeException ex) { + logger.error("Error during reporting metrics to Zookeeper...", ex); + } } - private void reportVolumeCounter(String metricName, long value) { - CounterMatcher matcher = new CounterMatcher(metricName); + private void reportVolumeCounter(CounterMatcher matcher) { if (matcher.isTopicThroughput()) { counterStorage.incrementVolumeCounter( escapedTopicName(matcher.getTopicName()), - value + matcher.getValue() ); } else if (matcher.isSubscriptionThroughput()) { counterStorage.incrementVolumeCounter( escapedTopicName(matcher.getTopicName()), escapeMetricsReplacementChar(matcher.getSubscriptionName()), - value + matcher.getValue() ); } } - private void reportCounter(String counterName, Counter counter) { - if (counter.getCount() == 0) { + private void reportCounter(CounterMatcher matcher) { + if (matcher.getValue() == 0) { return; } - CounterMatcher matcher = new CounterMatcher(counterName); - long value = counter.getCount(); - if (matcher.isTopicPublished()) { counterStorage.setTopicPublishedCounter( escapedTopicName(matcher.getTopicName()), - value + matcher.getValue() ); } else if (matcher.isSubscriptionDelivered()) { counterStorage.setSubscriptionDeliveredCounter( escapedTopicName(matcher.getTopicName()), escapeMetricsReplacementChar(matcher.getSubscriptionName()), - value + matcher.getValue() ); } else if (matcher.isSubscriptionDiscarded()) { counterStorage.setSubscriptionDiscardedCounter( escapedTopicName(matcher.getTopicName()), escapeMetricsReplacementChar(matcher.getSubscriptionName()), - value + matcher.getValue() ); } } - private static TopicName escapedTopicName(String qualifiedTopicName) { - TopicName topicName = fromQualifiedName(qualifiedTopicName); + private static TopicName escapedTopicName(TopicName topicName) { return new TopicName( escapeMetricsReplacementChar(topicName.getGroupName()), topicName.getName() @@ -112,18 +103,4 @@ private static TopicName escapedTopicName(String qualifiedTopicName) { private static String escapeMetricsReplacementChar(String value) { return value.replaceAll(HermesMetrics.REPLACEMENT_CHAR, "\\."); } - - private static final class ZookeeperMetricsFilter implements MetricFilter { - private final String offsetPrefix; - - private ZookeeperMetricsFilter(String graphitePrefix) { - offsetPrefix = graphitePrefix + "." + "consumer.offset"; - } - - @Override - public boolean matches(String name, Metric metric) { - return (metric instanceof Counter && !name.startsWith(offsetPrefix)) - || (metric instanceof Meter && name.startsWith(Meters.THROUGHPUT_BYTES + ".")); - } - } } diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/metric/counter/zookeeper/CounterMatcherTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/metric/counter/zookeeper/CounterMatcherTest.groovy index b12e4f3fa5..1ed88c7538 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/metric/counter/zookeeper/CounterMatcherTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/common/metric/counter/zookeeper/CounterMatcherTest.groovy @@ -1,13 +1,19 @@ package pl.allegro.tech.hermes.common.metric.counter.zookeeper +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import pl.allegro.tech.hermes.api.TopicName +import spock.lang.Shared import spock.lang.Specification class CounterMatcherTest extends Specification { + @Shared + def meterRegistry = new SimpleMeterRegistry() + def "should match topic published"() { given: - def counterName = "published.lagMetricGroup.topic" - def counterMatcher = new CounterMatcher(counterName) + def counter = meterRegistry.counter("topic.published", "group", "lagMetricGroup", "topic", "topic") + def counterMatcher = new CounterMatcher(counter, "") when: def isTopic = counterMatcher.isTopicPublished() @@ -15,13 +21,14 @@ class CounterMatcherTest extends Specification { then: isTopic - topicName == "lagMetricGroup.topic" + topicName == new TopicName("lagMetricGroup", "topic") } def "should match subscription delivered"() { given: - def counterName = "delivered.lagMetricGroup.topic.subscription" - def counterMatcher = new CounterMatcher(counterName) + def counter = meterRegistry.counter("subscription.delivered", "group", "lagMetricGroup", + "topic", "topic", "subscription", "subscription") + def counterMatcher = new CounterMatcher(counter, "") when: def isSubscription = counterMatcher.isSubscriptionDelivered() @@ -30,14 +37,15 @@ class CounterMatcherTest extends Specification { then: isSubscription - topicName == "lagMetricGroup.topic" + topicName == new TopicName("lagMetricGroup", "topic") subscriptionName == "subscription" } def "should match subscription discarded"() { given: - def counterName = "discarded.lagMetricGroup.topic.subscription" - def counterMatcher = new CounterMatcher(counterName) + def counter = meterRegistry.counter("subscription.discarded", "group", "lagMetricGroup", + "topic", "topic", "subscription", "subscription") + def counterMatcher = new CounterMatcher(counter, "") when: def isSubscription = counterMatcher.isSubscriptionDiscarded() @@ -46,7 +54,7 @@ class CounterMatcherTest extends Specification { then: isSubscription - topicName == "lagMetricGroup.topic" + topicName == new TopicName("lagMetricGroup", "topic") subscriptionName == "subscription" } } \ No newline at end of file diff --git a/hermes-common/src/test/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/ZookeeperCounterReporterTest.java b/hermes-common/src/test/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/ZookeeperCounterReporterTest.java index 28d06c5632..3788cfb46e 100644 --- a/hermes-common/src/test/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/ZookeeperCounterReporterTest.java +++ b/hermes-common/src/test/java/pl/allegro/tech/hermes/common/metric/counter/zookeeper/ZookeeperCounterReporterTest.java @@ -1,78 +1,41 @@ package pl.allegro.tech.hermes.common.metric.counter.zookeeper; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.TopicName; +import pl.allegro.tech.hermes.common.metric.HermesMetrics; +import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.common.metric.counter.CounterStorage; import pl.allegro.tech.hermes.common.util.InstanceIdResolver; import pl.allegro.tech.hermes.metrics.PathsCompiler; -import java.util.SortedMap; -import java.util.TreeMap; - import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static pl.allegro.tech.hermes.common.metric.Counters.DELIVERED; -import static pl.allegro.tech.hermes.common.metric.Counters.DISCARDED; -import static pl.allegro.tech.hermes.common.metric.Counters.PUBLISHED; -import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_THROUGHPUT_BYTES; -import static pl.allegro.tech.hermes.common.metric.Meters.TOPIC_THROUGHPUT_BYTES; -import static pl.allegro.tech.hermes.metrics.PathContext.pathContext; @RunWith(MockitoJUnitRunner.class) public class ZookeeperCounterReporterTest { - - public static final SortedMap EMPTY_TIMERS = new TreeMap<>(); - public static final SortedMap EMPTY_METERS = new TreeMap<>(); - public static final SortedMap EMPTY_COUNTERS = new TreeMap<>(); - public static final SortedMap EMPTY_HISTOGRAMS = new TreeMap<>(); - public static final SortedMap EMPTY_GAUGES = null; - public static final String GROUP_NAME_UNDERSCORE = "pl_allegro_tech_skylab"; public static final String GROUP_NAME = "pl.allegro.tech.skylab"; public static final String TOPIC_NAME_UNDERSCORE = "topic_1"; - public static final String SUBSCRIPTION_NAME_UNDERSCORE = "subscription_name"; public static final String SUBSCRIPTION_NAME = "subscription.name"; - public static final TopicName QUALIFIED_TOPIC_NAME = new TopicName(GROUP_NAME, TOPIC_NAME_UNDERSCORE); - public static final long COUNT = 100L; - public static final String GRAPHITE_PREFIX = "tech.hermes"; - - private static final PathsCompiler pathsCompiler = new PathsCompiler("localhost.domain"); - - public static final String METRIC_NAME_FOR_PUBLISHED = pathsCompiler.compile(PUBLISHED, pathContext() - .withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME_UNDERSCORE).build()); + public static final TopicName topic = new TopicName(GROUP_NAME, TOPIC_NAME_UNDERSCORE); + public static final SubscriptionName subscription = new SubscriptionName(SUBSCRIPTION_NAME, topic); - public static final String METRIC_NAME_FOR_DELIVERED = pathsCompiler.compile(DELIVERED, pathContext() - .withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME_UNDERSCORE).withSubscription(SUBSCRIPTION_NAME_UNDERSCORE).build()); - - public static final String METRIC_NAME_FOR_DISCARDED = pathsCompiler.compile(DISCARDED, pathContext() - .withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME_UNDERSCORE).withSubscription(SUBSCRIPTION_NAME_UNDERSCORE).build()); - - public static final String METRIC_NAME_FOR_SUBSCRIPTION_THROUGHPUT = pathsCompiler.compile(SUBSCRIPTION_THROUGHPUT_BYTES, pathContext() - .withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME_UNDERSCORE).withSubscription(SUBSCRIPTION_NAME_UNDERSCORE).build()); - - public static final String METRIC_NAME_FOR_TOPIC_THRESHOLD = pathsCompiler.compile(TOPIC_THROUGHPUT_BYTES, pathContext() - .withGroup(GROUP_NAME_UNDERSCORE).withTopic(TOPIC_NAME_UNDERSCORE).build()); + public static final long COUNT = 100L; @Mock private CounterStorage counterStorage; - @Mock - private MetricRegistry metricRegistry; - - @Mock - private Counter counter; + private final MeterRegistry meterRegistry = new SimpleMeterRegistry(); - @Mock - private Meter meter; + private final MetricsFacade metricsFacade = new MetricsFacade( + meterRegistry, new HermesMetrics(new MetricRegistry(), new PathsCompiler("localhost"))); @Mock private InstanceIdResolver instanceIdResolver; @@ -82,72 +45,66 @@ public class ZookeeperCounterReporterTest { @Before public void before() { when(instanceIdResolver.resolve()).thenReturn("localhost.domain"); - zookeeperCounterReporter = new ZookeeperCounterReporter(metricRegistry, counterStorage, GRAPHITE_PREFIX); + zookeeperCounterReporter = new ZookeeperCounterReporter(meterRegistry, counterStorage, ""); } @Test public void shouldReportPublishedMessages() { - SortedMap counters = prepareCounters(METRIC_NAME_FOR_PUBLISHED); - when(counter.getCount()).thenReturn(COUNT); + // given + metricsFacade.topics().topicPublished(topic).increment(COUNT); - zookeeperCounterReporter.report(EMPTY_GAUGES, counters, EMPTY_HISTOGRAMS, EMPTY_METERS, EMPTY_TIMERS); + // when + zookeeperCounterReporter.report(); - verify(counterStorage).setTopicPublishedCounter(QUALIFIED_TOPIC_NAME, COUNT); + // then + verify(counterStorage).setTopicPublishedCounter(topic, COUNT); } @Test public void shouldReportDeliveredMessages() { - SortedMap counters = prepareCounters(METRIC_NAME_FOR_DELIVERED); - when(counter.getCount()).thenReturn(COUNT); + // given + metricsFacade.subscriptions().successes(subscription).increment(COUNT); - zookeeperCounterReporter.report(EMPTY_GAUGES, counters, EMPTY_HISTOGRAMS, EMPTY_METERS, EMPTY_TIMERS); + // when + zookeeperCounterReporter.report(); - verify(counterStorage).setSubscriptionDeliveredCounter(QUALIFIED_TOPIC_NAME, SUBSCRIPTION_NAME, COUNT); + // then + verify(counterStorage).setSubscriptionDeliveredCounter(topic, SUBSCRIPTION_NAME, COUNT); } @Test public void shouldReportDiscardedMessages() { - SortedMap counters = prepareCounters(METRIC_NAME_FOR_DISCARDED); - when(counter.getCount()).thenReturn(COUNT); + // given + metricsFacade.subscriptions().discarded(subscription).increment(COUNT); - zookeeperCounterReporter.report(EMPTY_GAUGES, counters, EMPTY_HISTOGRAMS, EMPTY_METERS, EMPTY_TIMERS); + // when + zookeeperCounterReporter.report(); - verify(counterStorage).setSubscriptionDiscardedCounter( - QUALIFIED_TOPIC_NAME, SUBSCRIPTION_NAME, COUNT - ); + // then + verify(counterStorage).setSubscriptionDiscardedCounter(topic, SUBSCRIPTION_NAME, COUNT); } @Test public void shouldReportSubscriptionVolumeCounter() { - SortedMap meters = new TreeMap<>(); - meters.put(METRIC_NAME_FOR_SUBSCRIPTION_THROUGHPUT, meter); - when(meter.getCount()).thenReturn(COUNT); + // given + metricsFacade.subscriptions().throughputInBytes(subscription).increment(COUNT); - zookeeperCounterReporter.report(EMPTY_GAUGES, EMPTY_COUNTERS, EMPTY_HISTOGRAMS, meters, EMPTY_TIMERS); + // when + zookeeperCounterReporter.report(); - verify(counterStorage).incrementVolumeCounter( - QUALIFIED_TOPIC_NAME, SUBSCRIPTION_NAME, COUNT - ); + // then + verify(counterStorage).incrementVolumeCounter(topic, SUBSCRIPTION_NAME, COUNT); } @Test public void shouldReportTopicVolumeCounter() { - SortedMap meters = new TreeMap<>(); - meters.put(METRIC_NAME_FOR_TOPIC_THRESHOLD, meter); - when(meter.getCount()).thenReturn(COUNT); + // given + metricsFacade.topics().topicThroughputBytes(topic).increment(COUNT); - zookeeperCounterReporter.report(EMPTY_GAUGES, EMPTY_COUNTERS, EMPTY_HISTOGRAMS, meters, EMPTY_TIMERS); + // when + zookeeperCounterReporter.report(); - verify(counterStorage).incrementVolumeCounter( - QUALIFIED_TOPIC_NAME, COUNT - ); + // then + verify(counterStorage).incrementVolumeCounter(topic, COUNT); } - - private SortedMap prepareCounters(String metricName) { - SortedMap counters = new TreeMap<>(); - counters.put(metricName, counter); - - return counters; - } - } \ No newline at end of file diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/CommonConfiguration.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/CommonConfiguration.java index 6fdd451dad..260e224196 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/CommonConfiguration.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/CommonConfiguration.java @@ -251,10 +251,9 @@ public MetricsFacade metricsFacade(MeterRegistry meterRegistry, HermesMetrics he @Bean public MetricRegistry metricRegistry(MetricsProperties metricsProperties, GraphiteProperties graphiteProperties, - CounterStorage counterStorage, InstanceIdResolver instanceIdResolver, @Named("moduleName") String moduleName) { - return new MetricRegistryFactory(metricsProperties, graphiteProperties, counterStorage, instanceIdResolver, moduleName) + return new MetricRegistryFactory(metricsProperties, graphiteProperties, instanceIdResolver, moduleName) .provide(); } @@ -265,9 +264,10 @@ PrometheusConfig prometheusConfig(PrometheusProperties properties) { @Bean public PrometheusMeterRegistry micrometerRegistry(MicrometerRegistryParameters micrometerRegistryParameters, - PrometheusConfig prometheusConfig) { + PrometheusConfig prometheusConfig, + CounterStorage counterStorage) { return new PrometheusMeterRegistryFactory(micrometerRegistryParameters, - prometheusConfig, "hermes-consumers").provide(); + prometheusConfig, counterStorage, "hermes-consumers").provide(); } @Bean diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/MicrometerRegistryProperties.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/MicrometerRegistryProperties.java index cfb07ffc96..d5431b7de1 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/MicrometerRegistryProperties.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/MicrometerRegistryProperties.java @@ -3,19 +3,48 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import pl.allegro.tech.hermes.common.di.factories.MicrometerRegistryParameters; +import java.time.Duration; import java.util.List; @ConfigurationProperties(prefix = "consumer.metrics.micrometer") public class MicrometerRegistryProperties implements MicrometerRegistryParameters { private List percentiles = List.of(0.5, 0.99, 0.999); + private boolean zookeeperReporterEnabled = true; + private Duration reportPeriod = Duration.ofSeconds(20); @Override public List getPercentiles() { return percentiles; } + @Override + public boolean zookeeperReporterEnabled() { + return zookeeperReporterEnabled; + } + + @Override + public Duration zookeeperReportPeriod() { + return reportPeriod; + } + public void setPercentiles(List percentiles) { this.percentiles = percentiles; } + + public boolean isZookeeperReporterEnabled() { + return zookeeperReporterEnabled; + } + + public void setZookeeperReporterEnabled(boolean zookeeperReporterEnabled) { + this.zookeeperReporterEnabled = zookeeperReporterEnabled; + } + + public Duration getReportPeriod() { + return reportPeriod; + } + + public void setReportPeriod(Duration reportPeriod) { + this.reportPeriod = reportPeriod; + } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/CommonConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/CommonConfiguration.java index c3da9b601b..6ee77fb667 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/CommonConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/CommonConfiguration.java @@ -301,10 +301,9 @@ public MetricsFacade micrometerHermesMetrics(MeterRegistry meterRegistry, Hermes @Bean public MetricRegistry metricRegistry(MetricRegistryProperties metricRegistryProperties, GraphiteProperties graphiteProperties, - CounterStorage counterStorage, InstanceIdResolver instanceIdResolver, @Named("moduleName") String moduleName) { - return new MetricRegistryFactory(metricRegistryProperties, graphiteProperties, counterStorage, + return new MetricRegistryFactory(metricRegistryProperties, graphiteProperties, instanceIdResolver, moduleName).provide(); } @@ -315,9 +314,10 @@ PrometheusConfig prometheusConfig(PrometheusProperties properties) { @Bean public PrometheusMeterRegistry micrometerRegistry(MicrometerRegistryParameters micrometerRegistryParameters, - PrometheusConfig prometheusConfig) { + PrometheusConfig prometheusConfig, + CounterStorage counterStorage) { return new PrometheusMeterRegistryFactory(micrometerRegistryParameters, - prometheusConfig, "hermes-frontend").provide(); + prometheusConfig, counterStorage, "hermes-frontend").provide(); } @Bean diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/MicrometerRegistryProperties.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/MicrometerRegistryProperties.java index 52358007d7..8a99c812bf 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/MicrometerRegistryProperties.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/MicrometerRegistryProperties.java @@ -3,6 +3,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import pl.allegro.tech.hermes.common.di.factories.MicrometerRegistryParameters; +import java.time.Duration; import java.util.List; @@ -10,13 +11,41 @@ public class MicrometerRegistryProperties implements MicrometerRegistryParameters { private List percentiles = List.of(0.5, 0.99, 0.999); + private boolean zookeeperReporterEnabled = true; + private Duration reportPeriod = Duration.ofSeconds(20); @Override public List getPercentiles() { return percentiles; } + @Override + public boolean zookeeperReporterEnabled() { + return zookeeperReporterEnabled; + } + + @Override + public Duration zookeeperReportPeriod() { + return reportPeriod; + } + public void setPercentiles(List percentiles) { this.percentiles = percentiles; } + + public boolean isZookeeperReporterEnabled() { + return zookeeperReporterEnabled; + } + + public void setZookeeperReporterEnabled(boolean zookeeperReporterEnabled) { + this.zookeeperReporterEnabled = zookeeperReporterEnabled; + } + + public Duration getReportPeriod() { + return reportPeriod; + } + + public void setReportPeriod(Duration reportPeriod) { + this.reportPeriod = reportPeriod; + } } diff --git a/hermes-management/build.gradle b/hermes-management/build.gradle index 55579b7110..eeed48aa5b 100644 --- a/hermes-management/build.gradle +++ b/hermes-management/build.gradle @@ -55,7 +55,10 @@ task buildHermesConsole(type: Exec) { task attachHermesConsole(type: Copy, dependsOn: 'buildHermesConsole') { from '../hermes-console/dist/static' - into(sourceSets.main.output.resourcesDir.path + '/static') + def staticDirectory = sourceSets.main.output.resourcesDir.path + '/static' + // remove previous static dir if exists and start with clear setup + delete staticDirectory + into(staticDirectory) } tasks.register('prepareIndexTemplate') { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java index 72b0b0334b..ea9c71775f 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java @@ -8,8 +8,6 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.prometheus.PrometheusConfig; -import io.micrometer.prometheus.PrometheusMeterRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -17,8 +15,6 @@ import org.springframework.context.annotation.Configuration; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.common.clock.ClockFactory; -import pl.allegro.tech.hermes.common.di.factories.MicrometerRegistryParameters; -import pl.allegro.tech.hermes.common.di.factories.PrometheusMeterRegistryFactory; import pl.allegro.tech.hermes.common.metric.HermesMetrics; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.common.util.InetAddressInstanceIdResolver; @@ -66,20 +62,6 @@ public MetricRegistry metricRegistry() { return new MetricRegistry(); } - @Bean - @ConditionalOnMissingBean - public PrometheusMeterRegistry micrometerRegistry(MicrometerRegistryParameters micrometerRegistryParameters, - PrometheusConfig prometheusConfig) { - return new PrometheusMeterRegistryFactory(micrometerRegistryParameters, - prometheusConfig, "hermes-management").provide(); - } - - @Bean - @ConditionalOnMissingBean - PrometheusConfig prometheusConfig(PrometheusProperties properties) { - return new PrometheusConfigAdapter(properties); - } - @Bean public InstanceIdResolver instanceIdResolver() { return new InetAddressInstanceIdResolver(); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/MicrometerRegistryProperties.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/MicrometerRegistryProperties.java index 668da99c8e..9f131b66c8 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/MicrometerRegistryProperties.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/MicrometerRegistryProperties.java @@ -1,16 +1,14 @@ package pl.allegro.tech.hermes.management.config; import org.springframework.boot.context.properties.ConfigurationProperties; -import pl.allegro.tech.hermes.common.di.factories.MicrometerRegistryParameters; import java.util.List; @ConfigurationProperties(prefix = "metrics.micrometer") -public class MicrometerRegistryProperties implements MicrometerRegistryParameters { +public class MicrometerRegistryProperties { private List percentiles = List.of(0.5, 0.99, 0.999); - @Override public List getPercentiles() { return percentiles; } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/PrometheusConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/PrometheusConfiguration.java new file mode 100644 index 0000000000..b7bbb4b1a0 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/PrometheusConfiguration.java @@ -0,0 +1,65 @@ +package pl.allegro.tech.hermes.management.config; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.config.MeterFilter; +import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; +import io.micrometer.prometheus.PrometheusConfig; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +@EnableConfigurationProperties(MicrometerRegistryProperties.class) +public class PrometheusConfiguration { + + @Bean + @ConditionalOnMissingBean + public PrometheusMeterRegistry micrometerRegistry(MicrometerRegistryProperties properties, + PrometheusConfig prometheusConfig) { + return new PrometheusMeterRegistryFactory(properties, + prometheusConfig, "hermes-management").provide(); + } + + @Bean + @ConditionalOnMissingBean + PrometheusConfig prometheusConfig(PrometheusProperties properties) { + return new PrometheusConfigAdapter(properties); + } + + + static class PrometheusMeterRegistryFactory { + private final MicrometerRegistryProperties parameters; + private final PrometheusConfig prometheusConfig; + private final String prefix; + + PrometheusMeterRegistryFactory(MicrometerRegistryProperties properties, + PrometheusConfig prometheusConfig, + String prefix) { + this.parameters = properties; + this.prometheusConfig = prometheusConfig; + this.prefix = prefix + "_"; + } + + PrometheusMeterRegistry provide() { + PrometheusMeterRegistry meterRegistry = new PrometheusMeterRegistry(prometheusConfig); + applyFilters(meterRegistry); + return meterRegistry; + } + + void applyFilters(PrometheusMeterRegistry meterRegistry) { + meterRegistry.config().meterFilter(new MeterFilter() { + @Override + public Meter.Id map(Meter.Id id) { + return id.withName(prefix + id.getName()); + } + + @Override + public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) { + return DistributionStatisticConfig.builder() + .percentiles(parameters.getPercentiles().stream().mapToDouble(Double::doubleValue).toArray() + ).build().merge(config); + } + }); + } + } +}