From 3b02b2b07ef2f5b28590d95a20f5d6273aa89e75 Mon Sep 17 00:00:00 2001 From: Maciej Moscicki Date: Thu, 8 Aug 2024 11:40:06 +0200 Subject: [PATCH] Periodic storage consistency checks (#1885) * add ability to perform periodic storage checks * add periodic consistency check spec * register gauge based on state object --- .../common/metric/ConsistencyMetrics.java | 19 +++++ .../hermes/common/metric/MetricsFacade.java | 6 ++ .../config/ConsistencyCheckerProperties.java | 31 ++++++++ .../consistency/DcConsistencyService.java | 37 +++++++++- .../DcConsistencyServiceSpec.groovy | 73 +++++++++++++++++-- 5 files changed, 158 insertions(+), 8 deletions(-) create mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ConsistencyMetrics.java diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ConsistencyMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ConsistencyMetrics.java new file mode 100644 index 0000000000..4324ec0e68 --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ConsistencyMetrics.java @@ -0,0 +1,19 @@ +package pl.allegro.tech.hermes.common.metric; + +import io.micrometer.core.instrument.MeterRegistry; + +import java.util.function.ToDoubleFunction; + + +public class ConsistencyMetrics { + private final MeterRegistry meterRegistry; + + ConsistencyMetrics(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + + } + + public void registerStorageConsistencyGauge(T stateObject, ToDoubleFunction valueFunction) { + meterRegistry.gauge("storage.consistency", stateObject, valueFunction); + } +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java index 23f489aaf8..9608763425 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java @@ -27,6 +27,7 @@ public class MetricsFacade { private final OffsetCommitsMetrics offsetCommitsMetrics; private final MaxRateMetrics maxRateMetrics; private final BrokerMetrics brokerMetrics; + private final ConsistencyMetrics consistencyMetrics; public MetricsFacade(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; @@ -45,6 +46,7 @@ public MetricsFacade(MeterRegistry meterRegistry) { this.offsetCommitsMetrics = new OffsetCommitsMetrics(meterRegistry); this.maxRateMetrics = new MaxRateMetrics(meterRegistry); this.brokerMetrics = new BrokerMetrics(meterRegistry); + this.consistencyMetrics = new ConsistencyMetrics(meterRegistry); } public TopicMetrics topics() { @@ -107,6 +109,10 @@ public BrokerMetrics broker() { return brokerMetrics; } + public ConsistencyMetrics consistency() { + return consistencyMetrics; + } + public void unregisterAllMetricsRelatedTo(SubscriptionName subscription) { Collection meters = Search.in(meterRegistry) .tags(subscriptionTags(subscription)) diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ConsistencyCheckerProperties.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ConsistencyCheckerProperties.java index 382e2c2ca5..fba1977eb3 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ConsistencyCheckerProperties.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ConsistencyCheckerProperties.java @@ -2,10 +2,15 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +import java.time.Duration; + @ConfigurationProperties(prefix = "consistency-checker") public class ConsistencyCheckerProperties { private int threadPoolSize = 2; + private boolean periodicCheckEnabled = false; + private Duration refreshInterval = Duration.ofMinutes(15); + private Duration initialRefreshDelay = Duration.ofMinutes(2); public int getThreadPoolSize() { return threadPoolSize; @@ -14,4 +19,30 @@ public int getThreadPoolSize() { public void setThreadPoolSize(int threadPoolSize) { this.threadPoolSize = threadPoolSize; } + + + public boolean isPeriodicCheckEnabled() { + return periodicCheckEnabled; + } + + public void setPeriodicCheckEnabled(boolean periodicCheckEnabled) { + this.periodicCheckEnabled = periodicCheckEnabled; + } + + + public Duration getRefreshInterval() { + return refreshInterval; + } + + public void setRefreshInterval(Duration refreshInterval) { + this.refreshInterval = refreshInterval; + } + + public Duration getInitialRefreshDelay() { + return initialRefreshDelay; + } + + public void setInitialRefreshDelay(Duration initialRefreshDelay) { + this.initialRefreshDelay = initialRefreshDelay; + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/consistency/DcConsistencyService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/consistency/DcConsistencyService.java index 2b23998d36..9218362f0c 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/consistency/DcConsistencyService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/consistency/DcConsistencyService.java @@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import jakarta.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import pl.allegro.tech.hermes.api.Group; import pl.allegro.tech.hermes.api.InconsistentGroup; @@ -13,6 +15,7 @@ import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.api.TopicName; +import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.domain.group.GroupNotExistsException; import pl.allegro.tech.hermes.domain.group.GroupRepository; import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository; @@ -31,6 +34,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import static java.util.Collections.emptyList; @@ -39,15 +45,20 @@ @Component public class DcConsistencyService { + private static final Logger logger = LoggerFactory.getLogger(DcConsistencyService.class); + private final ExecutorService executor; + private final ScheduledExecutorService scheduler; private final List> groupRepositories; private final List> topicRepositories; private final List> subscriptionRepositories; private final ObjectMapper objectMapper; + private final AtomicBoolean isStorageConsistent = new AtomicBoolean(true); public DcConsistencyService(RepositoryManager repositoryManager, ObjectMapper objectMapper, - ConsistencyCheckerProperties properties) { + ConsistencyCheckerProperties properties, + MetricsFacade metricsFacade) { this.groupRepositories = repositoryManager.getRepositories(GroupRepository.class); this.topicRepositories = repositoryManager.getRepositories(TopicRepository.class); this.subscriptionRepositories = repositoryManager.getRepositories(SubscriptionRepository.class); @@ -58,11 +69,33 @@ public DcConsistencyService(RepositoryManager repositoryManager, .setNameFormat("consistency-checker-%d") .build() ); + this.scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("consistency-checker-scheduler-%d") + .build() + ); + if (properties.isPeriodicCheckEnabled()) { + scheduler.scheduleAtFixedRate(this::reportConsistency, + properties.getInitialRefreshDelay().getSeconds(), + properties.getRefreshInterval().getSeconds(), + TimeUnit.SECONDS); + metricsFacade.consistency().registerStorageConsistencyGauge(isStorageConsistent, isConsistent -> isConsistent.get() ? 1 : 0); + } } @PreDestroy public void stop() { executor.shutdown(); + scheduler.shutdown(); + } + + private void reportConsistency() { + long start = System.currentTimeMillis(); + Set groups = listAllGroupNames(); + List inconsistentGroups = listInconsistentGroups(groups); + long durationSeconds = (System.currentTimeMillis() - start) / 1000; + logger.info("Consistency check finished in {}s, number of inconsistent groups: {}", durationSeconds, inconsistentGroups.size()); + isStorageConsistent.set(inconsistentGroups.isEmpty()); } public List listInconsistentGroups(Set groupNames) { @@ -208,4 +241,6 @@ private T resolveFuture(Future future) { throw new ConsistencyCheckingException("Fetching metadata failed", e); } } + + } diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/consistency/DcConsistencyServiceSpec.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/consistency/DcConsistencyServiceSpec.groovy index 9fb2d7adad..f451d8c92f 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/consistency/DcConsistencyServiceSpec.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/consistency/DcConsistencyServiceSpec.groovy @@ -2,11 +2,16 @@ package pl.allegro.tech.hermes.management.domain.consistency import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import io.micrometer.core.instrument.simple.SimpleMeterRegistry import pl.allegro.tech.hermes.api.Group import pl.allegro.tech.hermes.api.Subscription import pl.allegro.tech.hermes.api.Topic +import pl.allegro.tech.hermes.common.metric.MetricsFacade import pl.allegro.tech.hermes.management.config.ConsistencyCheckerProperties import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +import java.time.Duration import static pl.allegro.tech.hermes.test.helper.builder.GroupBuilder.group import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription @@ -15,6 +20,8 @@ import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic class DcConsistencyServiceSpec extends Specification { def objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) + def meterRegistry = new SimpleMeterRegistry() + def metricsFacade = new MetricsFacade(meterRegistry) def "should return empty list when given groups are consistent"() { given: @@ -30,8 +37,8 @@ class DcConsistencyServiceSpec extends Specification { .addGroup(group) .addTopic(topic) .addSubscription(subscription) - DcConsistencyService dcConsistencyService = new DcConsistencyService(repositoryManager, objectMapper, - new ConsistencyCheckerProperties()) + DcConsistencyService dcConsistencyService = new DcConsistencyService(repositoryManager, + objectMapper, new ConsistencyCheckerProperties(), metricsFacade) when: def inconsistentGroups = dcConsistencyService.listInconsistentGroups([group.groupName] as Set) @@ -48,8 +55,8 @@ class DcConsistencyServiceSpec extends Specification { repositoryManager.datacenter("dc2") .addGroup(group("testGroup").build()) .addGroup(group("testGroup-dc2").build()) - DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, objectMapper, - new ConsistencyCheckerProperties()) + DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, + objectMapper, new ConsistencyCheckerProperties(), metricsFacade) when: def groups = consistencyService.listInconsistentGroups(["testGroup", "testGroup-dc1", "testGroup-dc2"] as Set) @@ -68,7 +75,7 @@ class DcConsistencyServiceSpec extends Specification { .addGroup(group) .addTopic(topic(group.groupName, "testTopic").withDescription("dc2").build()) DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, objectMapper, - new ConsistencyCheckerProperties()) + new ConsistencyCheckerProperties(), metricsFacade) when: def groups = consistencyService.listInconsistentGroups(["testGroup"] as Set) @@ -90,7 +97,7 @@ class DcConsistencyServiceSpec extends Specification { .addTopic(topic) .addSubscription(subscription(topic, "testSubscription").withDescription("dc2").build()) DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, objectMapper, - new ConsistencyCheckerProperties()) + new ConsistencyCheckerProperties(), metricsFacade) when: def groups = consistencyService.listInconsistentGroups(["testGroup"] as Set) @@ -108,7 +115,7 @@ class DcConsistencyServiceSpec extends Specification { .addGroup(group("testGroup").build()) .addGroup(group("testGroup-dc2").build()) DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, objectMapper, - new ConsistencyCheckerProperties()) + new ConsistencyCheckerProperties(), metricsFacade) when: def groups = consistencyService.listAllGroupNames() @@ -116,4 +123,56 @@ class DcConsistencyServiceSpec extends Specification { then: groups == ["testGroup", "testGroup-dc1", "testGroup-dc2"] as Set } + + def "should report storage as not consistent with periodic check"() { + given: "inconsistent storage state" + MockRepositoryManager repositoryManager = new MockRepositoryManager() + repositoryManager.datacenter("dc1") + .addGroup(group("testGroup").build()) + .addGroup(group("testGroup-dc1").build()) + repositoryManager.datacenter("dc2") + .addGroup(group("testGroup").build()) + .addGroup(group("testGroup-dc2").build()) + + and: "enabled periodic consistency checks" + def properties = new ConsistencyCheckerProperties() + properties.setPeriodicCheckEnabled(true) + properties.setInitialRefreshDelay(Duration.ofMillis(0)) + + when: "consistency service is created" + DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, + objectMapper, + properties, + metricsFacade) + + then: "storage is reported as not consistent" + new PollingConditions(timeout: 10).eventually { + meterRegistry.get("storage.consistency").gauge().value() == 0.0d + } + } + + def "should report storage as consistent with periodic check"() { + given: "consistent storage state" + MockRepositoryManager repositoryManager = new MockRepositoryManager() + repositoryManager.datacenter("dc1") + .addGroup(group("testGroup").build()) + repositoryManager.datacenter("dc2") + .addGroup(group("testGroup").build()) + + and: "enabled periodic consistency checks" + def properties = new ConsistencyCheckerProperties() + properties.setPeriodicCheckEnabled(true) + properties.setInitialRefreshDelay(Duration.ofMillis(0)) + + when: "consistency service is created" + DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, + objectMapper, + properties, + metricsFacade) + + then: "storage is reported as consistent" + new PollingConditions(timeout: 10).eventually { + meterRegistry.get("storage.consistency").gauge().value() == 1.0d + } + } }