From f11cc0f9fbdb7068c26f350c0955ae596e96b48a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20R=C5=BCysko?= Date: Mon, 3 Apr 2023 14:35:59 +0200 Subject: [PATCH] Readiness admin panel resilient to data center unavailability (#1658) --- .../tech/hermes/api/DatacenterReadiness.java | 24 +++-- .../hermes/api/endpoints/ModeEndpoint.java | 3 +- .../zookeeper/ZookeeperBasedRepository.java | 79 ++++++++++------ .../ZookeeperCredentialsRepository.java | 7 +- ...ookeeperDatacenterReadinessRepository.java | 84 ----------------- .../zookeeper/ZookeeperGroupRepository.java | 25 +++--- .../ZookeeperOAuthProviderRepository.java | 16 ++-- .../ZookeeperSubscriptionRepository.java | 15 +++- .../zookeeper/ZookeeperTopicRepository.java | 24 +++-- ...ookeeperWorkloadConstraintsRepository.java | 34 ++----- .../counter/ZookeeperCounterException.java | 4 + .../console/readiness/ReadinessController.js | 4 +- .../console/readiness/ReadinessRepository.js | 5 +- hermes-console/static/partials/readiness.html | 7 +- .../config/FrontendConfiguration.java | 10 --- .../config/FrontendServerConfiguration.java | 16 +++- .../config/ReadinessCheckProperties.java | 10 +++ .../server/DefaultReadinessChecker.java | 74 ++++++++++++--- .../management/api/ReadinessEndpoint.java | 4 +- .../config/storage/StorageConfiguration.java | 17 +--- .../domain/health/HealthCheckScheduler.java | 48 +++++++--- .../management/domain/mode/ModeService.java | 2 +- .../domain/readiness/GetReadinessQuery.java | 18 +++- .../domain/readiness/ReadinessRepository.java | 5 +- .../domain/readiness/SetReadinessCommand.java | 15 +++- .../metrics/SummedSharedCounter.java | 89 ++++++++++++++----- ...ookeeperDatacenterReadinessRepository.java | 45 ++++++++++ ...keeperOfflineRetransmissionRepository.java | 8 +- .../zookeeper/ZookeeperClientManager.java | 7 +- .../zookeeper/ZookeeperRepositoryManager.java | 4 +- .../domain/health/HealthCheckTaskTest.groovy | 1 + .../metrics/SummedSharedCounterTest.groovy | 3 +- .../ZookeeperClientManagerTest.groovy | 1 + .../MultiZookeeperIntegrationTest.groovy | 21 +++-- .../helper/endpoint/HermesAPIOperations.java | 4 + .../FrontendConfigurationProperties.java | 1 + .../integration/ReadinessCheckTest.java | 26 ++++++ .../env/HermesIntegrationEnvironment.java | 14 ++- .../management/GroupManagementTest.java | 4 +- .../setup/HermesFrontendInstance.java | 11 +++ .../setup/HermesManagementInstance.java | 31 ++----- 41 files changed, 499 insertions(+), 321 deletions(-) delete mode 100644 hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperDatacenterReadinessRepository.java rename {hermes-common/src/main/java/pl/allegro/tech/hermes => hermes-management/src/main/java/pl/allegro/tech/hermes/management}/domain/readiness/ReadinessRepository.java (61%) create mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/readiness/ZookeeperDatacenterReadinessRepository.java diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/DatacenterReadiness.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/DatacenterReadiness.java index f51d7c102c..024c9d32b1 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/DatacenterReadiness.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/DatacenterReadiness.java @@ -7,28 +7,28 @@ public class DatacenterReadiness { private final String datacenter; - private final boolean isReady; + private final ReadinessStatus status; @JsonCreator - public DatacenterReadiness(@JsonProperty("datacenter") String datacenter, @JsonProperty("isReady") boolean isReady) { + public DatacenterReadiness(@JsonProperty("datacenter") String datacenter, + @JsonProperty("status") ReadinessStatus status) { this.datacenter = datacenter; - this.isReady = isReady; + this.status = status; } public String getDatacenter() { return datacenter; } - @JsonProperty("isReady") - public boolean isReady() { - return isReady; + public ReadinessStatus getStatus() { + return status; } @Override public String toString() { return "DatacenterReadiness{" + "datacenter='" + datacenter + '\'' - + ", isReady=" + isReady + + ", status=" + status + '}'; } @@ -41,12 +41,18 @@ public boolean equals(Object o) { return false; } DatacenterReadiness that = (DatacenterReadiness) o; - return isReady == that.isReady + return status == that.status && Objects.equals(datacenter, that.datacenter); } @Override public int hashCode() { - return Objects.hash(datacenter, isReady); + return Objects.hash(datacenter, status); + } + + public enum ReadinessStatus { + READY, + NOT_READY, + UNDEFINED } } diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/endpoints/ModeEndpoint.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/endpoints/ModeEndpoint.java index 42d46ec7fb..4f1a64a45b 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/endpoints/ModeEndpoint.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/endpoints/ModeEndpoint.java @@ -8,12 +8,13 @@ import javax.ws.rs.core.Response; import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static javax.ws.rs.core.MediaType.TEXT_PLAIN; @Path("mode") public interface ModeEndpoint { @GET - @Produces(APPLICATION_JSON) + @Produces(TEXT_PLAIN) String getMode(); @POST diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java index 5f1558c485..b688aa314d 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java @@ -23,9 +23,9 @@ public abstract class ZookeeperBasedRepository { private static final Logger logger = LoggerFactory.getLogger(ZookeeperBasedRepository.class); - protected final CuratorFramework zookeeper; + private final CuratorFramework zookeeper; - protected final ObjectMapper mapper; + private final ObjectMapper mapper; protected final ZookeeperPaths paths; @@ -37,7 +37,7 @@ protected ZookeeperBasedRepository(CuratorFramework zookeeper, this.paths = paths; } - protected void ensureConnected() { + private void ensureConnected() { if (!zookeeper.getZookeeperClient().isConnected()) { throw new RepositoryNotAvailableException("Could not establish connection to a Zookeeper instance"); } @@ -65,6 +65,7 @@ protected boolean pathExists(String path) { } protected List childrenOf(String path) { + ensureConnected(); try { List retrievedNodes = new ArrayList<>(zookeeper.getChildren().forPath(path)); Collections.sort(retrievedNodes); @@ -74,6 +75,11 @@ protected List childrenOf(String path) { } } + @SuppressWarnings("unchecked") + protected byte[] readFrom(String path) { + return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get(); + } + @SuppressWarnings("unchecked") protected T readFrom(String path, Class clazz) { return readFrom(path, clazz, false).get(); @@ -94,6 +100,7 @@ protected Optional readWithStatFrom(String path, Class clazz, BiConsum } private Optional readWithStatFrom(String path, ThrowingReader supplier, BiConsumer statDecorator, boolean quiet) { + ensureConnected(); try { Stat stat = new Stat(); byte[] data = zookeeper.getData().storingStatIn(stat).forPath(path); @@ -122,38 +129,52 @@ private void logWarnOrThrowException(String message, RuntimeException e, Boolean } } - protected void overwrite(String path, Object value) { - try { - zookeeper.setData().forPath(path, mapper.writeValueAsBytes(value)); - } catch (Exception ex) { - throw new InternalProcessingException(ex); - } + protected void overwrite(String path, Object value) throws Exception { + ensureConnected(); + zookeeper.setData().forPath(path, mapper.writeValueAsBytes(value)); } - protected void touch(String path) { - try { - byte[] oldData = zookeeper.getData().forPath(path); - zookeeper.setData().forPath(path, oldData); - } catch (Exception ex) { - throw new InternalProcessingException(ex); - } + protected void overwrite(String path, byte[] value) throws Exception { + ensureConnected(); + zookeeper.setData().forPath(path, value); } - protected void remove(String path) { - try { - zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); - } catch (Exception ex) { - throw new InternalProcessingException(ex); - } + protected void createRecursively(String path, Object value) throws Exception { + ensureConnected(); + zookeeper.create() + .creatingParentsIfNeeded() + .forPath(path, mapper.writeValueAsBytes(value)); } - protected boolean isEmpty(String path) { - try { - byte[] data = zookeeper.getData().forPath(path); - return data.length == 0; - } catch (Exception e) { - throw new InternalProcessingException(e); - } + protected void createInTransaction(String path, Object value, String childPath) throws Exception { + ensureConnected(); + zookeeper.inTransaction() + .create().forPath(path, mapper.writeValueAsBytes(value)) + .and() + .create().forPath(childPath) + .and() + .commit(); + } + + protected void create(String path, Object value) throws Exception { + ensureConnected(); + zookeeper.create().forPath(path, mapper.writeValueAsBytes(value)); + } + + protected void create(String path, byte[] value) throws Exception { + ensureConnected(); + zookeeper.create().forPath(path, value); + } + + protected void touch(String path) throws Exception { + ensureConnected(); + byte[] oldData = zookeeper.getData().forPath(path); + zookeeper.setData().forPath(path, oldData); + } + + protected void remove(String path) throws Exception { + ensureConnected(); + zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); } private interface ThrowingReader { diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperCredentialsRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperCredentialsRepository.java index a174470bf9..bf29c41a08 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperCredentialsRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperCredentialsRepository.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; import pl.allegro.tech.hermes.domain.CredentialsRepository; import pl.allegro.tech.hermes.domain.NodePassword; @@ -18,6 +19,10 @@ public NodePassword readAdminPassword() { @Override public void overwriteAdminPassword(String password) { - overwrite(paths.groupsPath(), new NodePassword(password)); + try { + overwrite(paths.groupsPath(), new NodePassword(password)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperDatacenterReadinessRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperDatacenterReadinessRepository.java deleted file mode 100644 index 04f0e67378..0000000000 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperDatacenterReadinessRepository.java +++ /dev/null @@ -1,84 +0,0 @@ -package pl.allegro.tech.hermes.infrastructure.zookeeper; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.NodeCache; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.api.Readiness; -import pl.allegro.tech.hermes.common.exception.InternalProcessingException; -import pl.allegro.tech.hermes.domain.readiness.ReadinessRepository; - -public class ZookeeperDatacenterReadinessRepository extends ZookeeperBasedRepository implements ReadinessRepository, NodeCacheListener { - private static final Logger logger = LoggerFactory.getLogger(ZookeeperDatacenterReadinessRepository.class); - - private final NodeCache cache; - private final ObjectMapper mapper; - - private volatile boolean ready = true; - - public ZookeeperDatacenterReadinessRepository(CuratorFramework curator, ObjectMapper mapper, ZookeeperPaths paths) { - super(curator, mapper, paths); - this.mapper = mapper; - this.cache = new NodeCache(curator, paths.frontendReadinessPath()); - cache.getListenable().addListener(this); - try { - cache.start(true); - } catch (Exception e) { - throw new InternalProcessingException("Readiness cache cannot start.", e); - } - refreshReadiness(); - } - - @Override - public boolean isReady() { - return ready; - } - - @Override - public void setReadiness(boolean isReady) { - try { - String path = paths.frontendReadinessPath(); - if (!pathExists(path)) { - zookeeper.create() - .creatingParentsIfNeeded() - .forPath(path, mapper.writeValueAsBytes(new Readiness(isReady))); - } else { - zookeeper.setData().forPath(path, mapper.writeValueAsBytes(new Readiness(isReady))); - } - } catch (Exception ex) { - throw new InternalProcessingException(ex); - } - } - - @Override - public void close() { - try { - cache.close(); - } catch (Exception e) { - logger.warn("Failed to stop readiness cache", e); - } - } - - @Override - public void nodeChanged() { - refreshReadiness(); - } - - private void refreshReadiness() { - try { - ChildData nodeData = cache.getCurrentData(); - if (nodeData != null) { - byte[] data = nodeData.getData(); - Readiness readiness = mapper.readValue(data, Readiness.class); - ready = readiness.isReady(); - } else { - ready = true; - } - } catch (Exception e) { - logger.error("Failed reloading readiness cache. Current value: ready=" + ready, e); - } - } -} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java index 5919c71d5d..6a3e7a3e9f 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java @@ -28,7 +28,6 @@ public ZookeeperGroupRepository(CuratorFramework zookeeper, @Override public boolean groupExists(String groupName) { - ensureConnected(); return pathExists(paths.groupPath(groupName)); } @@ -41,17 +40,11 @@ public void ensureGroupExists(String groupName) { @Override public void createGroup(Group group) { - ensureConnected(); - String groupPath = paths.groupPath(group.getGroupName()); logger.info("Creating group {} for path {}", group.getGroupName(), groupPath); try { - zookeeper.inTransaction() - .create().forPath(groupPath, mapper.writeValueAsBytes(group)) - .and() - .create().forPath(paths.topicsPath(group.getGroupName())) - .and().commit(); + createInTransaction(groupPath, group, paths.topicsPath(group.getGroupName())); } catch (KeeperException.NodeExistsException ex) { throw new GroupAlreadyExistsException(group.getGroupName(), ex); } catch (Exception ex) { @@ -61,21 +54,27 @@ public void createGroup(Group group) { @Override public void updateGroup(Group group) { - ensureConnected(); ensureGroupExists(group.getGroupName()); logger.info("Updating group {}", group.getGroupName()); - overwrite(paths.groupPath(group.getGroupName()), group); + try { + overwrite(paths.groupPath(group.getGroupName()), group); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } @Override public void removeGroup(String groupName) { - ensureConnected(); ensureGroupExists(groupName); ensureGroupIsEmpty(groupName); logger.info("Removing group: {}", groupName); - remove(paths.groupPath(groupName)); + try { + remove(paths.groupPath(groupName)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } private void ensureGroupIsEmpty(String groupName) { @@ -86,7 +85,6 @@ private void ensureGroupIsEmpty(String groupName) { @Override public List listGroupNames() { - ensureConnected(); return childrenOf(paths.groupsPath()); } @@ -105,7 +103,6 @@ public Group getGroupDetails(String groupName) { } private Optional getGroupDetails(String groupName, boolean quiet) { - ensureConnected(); ensureGroupExists(groupName); String path = paths.groupPath(groupName); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperOAuthProviderRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperOAuthProviderRepository.java index 217e5917d8..4673888b4e 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperOAuthProviderRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperOAuthProviderRepository.java @@ -45,13 +45,11 @@ public OAuthProvider getOAuthProviderDetails(String oAuthProviderName) { @Override public void createOAuthProvider(OAuthProvider oAuthProvider) { - ensureConnected(); - String oAuthProviderPath = paths.oAuthProviderPath(oAuthProvider.getName()); logger.info("Creating OAuthProvider for path {}", oAuthProviderPath); try { - zookeeper.create().creatingParentsIfNeeded().forPath(oAuthProviderPath, mapper.writeValueAsBytes(oAuthProvider)); + createRecursively(oAuthProviderPath, oAuthProvider); } catch (KeeperException.NodeExistsException ex) { throw new OAuthProviderAlreadyExistsException(oAuthProvider, ex); } catch (Exception ex) { @@ -64,7 +62,11 @@ public void updateOAuthProvider(OAuthProvider oAuthprovider) { ensureOAuthProviderExists(oAuthprovider.getName()); logger.info("Updating OAuthProvider {}", oAuthprovider.getName()); - overwrite(paths.oAuthProviderPath(oAuthprovider.getName()), oAuthprovider); + try { + overwrite(paths.oAuthProviderPath(oAuthprovider.getName()), oAuthprovider); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } @Override @@ -72,7 +74,11 @@ public void removeOAuthProvider(String oAuthProviderName) { ensureOAuthProviderExists(oAuthProviderName); logger.info("Removing OAuthProvider {}", oAuthProviderName); - remove(paths.oAuthProviderPath(oAuthProviderName)); + try { + remove(paths.oAuthProviderPath(oAuthProviderName)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } @Override diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionRepository.java index 810fa0d7b9..de72b9fba2 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionRepository.java @@ -47,14 +47,13 @@ public void ensureSubscriptionExists(TopicName topicName, String subscriptionNam @Override public void createSubscription(Subscription subscription) { - ensureConnected(); topicRepository.ensureTopicExists(subscription.getTopicName()); String subscriptionPath = paths.subscriptionPath(subscription); logger.info("Creating subscription {}", subscription.getQualifiedName()); try { - zookeeper.create().forPath(subscriptionPath, mapper.writeValueAsBytes(subscription)); + create(subscriptionPath, subscription); } catch (KeeperException.NodeExistsException ex) { throw new SubscriptionAlreadyExistsException(subscription, ex); } catch (Exception ex) { @@ -67,14 +66,22 @@ public void removeSubscription(TopicName topicName, String subscriptionName) { ensureSubscriptionExists(topicName, subscriptionName); logger.info("Removing subscription {}", new SubscriptionName(subscriptionName, topicName).getQualifiedName()); - remove(paths.subscriptionPath(topicName, subscriptionName)); + try { + remove(paths.subscriptionPath(topicName, subscriptionName)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } @Override public void updateSubscription(Subscription modifiedSubscription) { ensureSubscriptionExists(modifiedSubscription.getTopicName(), modifiedSubscription.getName()); logger.info("Updating subscription {}", modifiedSubscription.getQualifiedName()); - overwrite(paths.subscriptionPath(modifiedSubscription), modifiedSubscription); + try { + overwrite(paths.subscriptionPath(modifiedSubscription), modifiedSubscription); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } @Override diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index 863580c013..60dcb49bc5 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -69,11 +69,7 @@ public void createTopic(Topic topic) { logger.info("Creating topic for path {}", topicPath); try { - zookeeper.inTransaction() - .create().forPath(topicPath, mapper.writeValueAsBytes(topic)) - .and() - .create().forPath(paths.subscriptionsPath(topic.getName())) - .and().commit(); + createInTransaction(topicPath, topic, paths.subscriptionsPath(topic.getName())); } catch (KeeperException.NodeExistsException ex) { throw new TopicAlreadyExistsException(topic.getName(), ex); } catch (Exception ex) { @@ -85,7 +81,11 @@ public void createTopic(Topic topic) { public void removeTopic(TopicName topicName) { ensureTopicExists(topicName); logger.info("Removing topic: " + topicName); - remove(paths.topicPath(topicName)); + try { + remove(paths.topicPath(topicName)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } @Override @@ -93,7 +93,11 @@ public void updateTopic(Topic topic) { ensureTopicExists(topic.getName()); logger.info("Updating topic: " + topic.getName()); - overwrite(paths.topicPath(topic.getName()), topic); + try { + overwrite(paths.topicPath(topic.getName()), topic); + } catch (Exception e) { + throw new InternalProcessingException(e); + } } @Override @@ -101,7 +105,11 @@ public void touchTopic(TopicName topicName) { ensureTopicExists(topicName); logger.info("Touching topic: " + topicName.qualifiedName()); - touch(paths.topicPath(topicName)); + try { + touch(paths.topicPath(topicName)); + } catch (Exception ex) { + throw new InternalProcessingException(ex); + } } @Override diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperWorkloadConstraintsRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperWorkloadConstraintsRepository.java index 4ab7f3be5a..c1a0ad6f5f 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperWorkloadConstraintsRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperWorkloadConstraintsRepository.java @@ -66,10 +66,7 @@ public void createConstraints(SubscriptionName subscriptionName, Constraints con private void createConstraints(String path, Constraints constraints) throws KeeperException.NodeExistsException { try { - byte[] bytes = mapper.writeValueAsBytes(constraints); - zookeeper.create() - .creatingParentsIfNeeded() - .forPath(path, bytes); + createRecursively(path, constraints); } catch (KeeperException.NodeExistsException e) { throw e; } catch (Exception e) { @@ -82,9 +79,11 @@ public void updateConstraints(TopicName topicName, Constraints constraints) { logger.info("Updating constraints for topic {}", topicName.qualifiedName()); String path = paths.consumersWorkloadConstraintsPath(topicName.qualifiedName()); try { - updateConstraints(path, constraints); + overwrite(path, constraints); } catch (KeeperException.NoNodeException e) { throw new TopicConstraintsDoNotExistException(topicName, e); + } catch (Exception e) { + throw new InternalProcessingException(e); } } @@ -93,18 +92,9 @@ public void updateConstraints(SubscriptionName subscriptionName, Constraints con logger.info("Updating constraints for subscription {}", subscriptionName.getQualifiedName()); String path = paths.consumersWorkloadConstraintsPath(subscriptionName.getQualifiedName()); try { - updateConstraints(path, constraints); + overwrite(path, constraints); } catch (KeeperException.NoNodeException e) { throw new SubscriptionConstraintsDoNotExistException(subscriptionName, e); - } - } - - private void updateConstraints(String path, Constraints constraints) throws KeeperException.NoNodeException { - try { - byte[] bytes = mapper.writeValueAsBytes(constraints); - zookeeper.setData().forPath(path, bytes); - } catch (KeeperException.NoNodeException e) { - throw e; } catch (Exception e) { throw new InternalProcessingException(e); } @@ -126,7 +116,7 @@ public void deleteConstraints(SubscriptionName subscriptionName) { private void deleteConstraints(String path) { try { - zookeeper.delete().forPath(path); + remove(path); } catch (KeeperException.NoNodeException e) { // ignore - it's ok } catch (Exception e) { @@ -137,20 +127,12 @@ private void deleteConstraints(String path) { @Override public boolean constraintsExist(TopicName topicName) { String path = paths.consumersWorkloadConstraintsPath(topicName.qualifiedName()); - return constraintsExist(path); + return pathExists(path); } @Override public boolean constraintsExist(SubscriptionName subscriptionName) { String path = paths.consumersWorkloadConstraintsPath(subscriptionName.getQualifiedName()); - return constraintsExist(path); - } - - private boolean constraintsExist(String path) { - try { - return zookeeper.checkExists().forPath(path) != null; - } catch (Exception e) { - throw new InternalProcessingException(e); - } + return pathExists(path); } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/counter/ZookeeperCounterException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/counter/ZookeeperCounterException.java index f668d0a956..8d0fdfa0b8 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/counter/ZookeeperCounterException.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/counter/ZookeeperCounterException.java @@ -6,6 +6,10 @@ @SuppressWarnings("serial") public class ZookeeperCounterException extends HermesException { + public ZookeeperCounterException(String key, String message) { + super("Exception while trying to access counter " + key + " via Zookeeper. " + message); + } + public ZookeeperCounterException(String key, Throwable cause) { super("Exception while trying to access counter " + key + " via Zookeeper.", cause); } diff --git a/hermes-console/static/js/console/readiness/ReadinessController.js b/hermes-console/static/js/console/readiness/ReadinessController.js index 99a50c6775..3d5bd8b0ec 100644 --- a/hermes-console/static/js/console/readiness/ReadinessController.js +++ b/hermes-console/static/js/console/readiness/ReadinessController.js @@ -14,15 +14,15 @@ readiness.controller('ReadinessController', ['$scope', 'ReadinessRepository', 'C } $scope.openModal = function openModal(datacenterInfo) { - var action = datacenterInfo.isReady ? "Turn off" : "Turn on"; + var action = datacenterInfo.status === 'READY' ? "Turn off" : "Turn on"; confirmationModal.open({ actionSubject: 'Are you sure you want to ' + action.toLowerCase() + ' the ' + datacenterInfo.datacenter + ' datacenter ?', action: action }).result.then(function () { readinessRepository.setReadiness(datacenterInfo) .then(function () { - datacenterInfo.isReady = !datacenterInfo.isReady; clearError(); + return loadDatacenters(); }) .catch(function (e) { displayError(e); diff --git a/hermes-console/static/js/console/readiness/ReadinessRepository.js b/hermes-console/static/js/console/readiness/ReadinessRepository.js index 8cd63c495c..ac25b6b3eb 100644 --- a/hermes-console/static/js/console/readiness/ReadinessRepository.js +++ b/hermes-console/static/js/console/readiness/ReadinessRepository.js @@ -17,7 +17,10 @@ repository.factory('ReadinessRepository', ['DiscoveryService', '$resource', }); }, setReadiness: function (datacenterInfo) { - return setReadinessEndpoint.save({datacenter: datacenterInfo.datacenter}, {isReady: !datacenterInfo.isReady}).$promise; + return setReadinessEndpoint.save( + {datacenter: datacenterInfo.datacenter}, + {isReady: datacenterInfo.status === 'NOT_READY'} + ).$promise; } }; }]); diff --git a/hermes-console/static/partials/readiness.html b/hermes-console/static/partials/readiness.html index 56a7981d6f..edadb59d8d 100644 --- a/hermes-console/static/partials/readiness.html +++ b/hermes-console/static/partials/readiness.html @@ -13,7 +13,7 @@

Datacenter readiness

-
+
@@ -24,8 +24,9 @@

Datacenter readiness

diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java index 18c806fb77..f365b2a12d 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java @@ -1,6 +1,5 @@ package pl.allegro.tech.hermes.frontend.config; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -9,7 +8,6 @@ import pl.allegro.tech.hermes.common.metric.HermesMetrics; import pl.allegro.tech.hermes.domain.group.GroupRepository; import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus; -import pl.allegro.tech.hermes.domain.readiness.ReadinessRepository; import pl.allegro.tech.hermes.domain.topic.TopicRepository; import pl.allegro.tech.hermes.frontend.blacklist.BlacklistZookeeperNotifyingCache; import pl.allegro.tech.hermes.frontend.buffer.BackupMessagesLoader; @@ -21,7 +19,6 @@ import pl.allegro.tech.hermes.frontend.services.HealthCheckService; import pl.allegro.tech.hermes.frontend.validator.MessageValidators; import pl.allegro.tech.hermes.frontend.validator.TopicMessageValidator; -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperDatacenterReadinessRepository; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; import pl.allegro.tech.hermes.schema.SchemaExistenceEnsurer; import pl.allegro.tech.hermes.schema.SchemaRepository; @@ -78,13 +75,6 @@ public BlacklistZookeeperNotifyingCache blacklistZookeeperNotifyingCache(Curator return new BlacklistZookeeperNotifyingCache(curator, zookeeperPaths); } - @Bean(destroyMethod = "close") - public ReadinessRepository zookeeperDatacenterReadinessRepository(CuratorFramework zookeeper, - ZookeeperPaths paths, - ObjectMapper mapper) { - return new ZookeeperDatacenterReadinessRepository(zookeeper, mapper, paths); - } - @Bean(initMethod = "startup") public HealthCheckService healthCheckService() { return new HealthCheckService(); diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java index f2f54a6060..31a3a36f7a 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendServerConfiguration.java @@ -1,12 +1,13 @@ package pl.allegro.tech.hermes.frontend.config; +import com.fasterxml.jackson.databind.ObjectMapper; import io.undertow.server.HttpHandler; +import org.apache.curator.framework.CuratorFramework; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import pl.allegro.tech.hermes.common.metric.HermesMetrics; import pl.allegro.tech.hermes.common.ssl.SslContextFactory; -import pl.allegro.tech.hermes.domain.readiness.ReadinessRepository; import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter; @@ -18,6 +19,7 @@ import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingRunner; import pl.allegro.tech.hermes.frontend.server.TopicMetadataLoadingStartupHook; import pl.allegro.tech.hermes.frontend.server.TopicSchemaLoadingStartupHook; +import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; import pl.allegro.tech.hermes.schema.SchemaRepository; import java.util.Optional; @@ -58,12 +60,18 @@ public HermesServer hermesServer(HermesServerProperties hermesServerProperties, @Bean public DefaultReadinessChecker readinessChecker(ReadinessCheckProperties readinessCheckProperties, TopicMetadataLoadingRunner topicMetadataLoadingRunner, - ReadinessRepository readinessRepository) { + CuratorFramework zookeeper, + ZookeeperPaths paths, + ObjectMapper mapper) { return new DefaultReadinessChecker( topicMetadataLoadingRunner, - readinessRepository, + zookeeper, + paths, + mapper, readinessCheckProperties.isEnabled(), - readinessCheckProperties.getInterval()); + readinessCheckProperties.isKafkaCheckEnabled(), + readinessCheckProperties.getInterval() + ); } @Bean diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessCheckProperties.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessCheckProperties.java index 5cbc7c7a03..dce2d5be68 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessCheckProperties.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/ReadinessCheckProperties.java @@ -9,6 +9,8 @@ public class ReadinessCheckProperties { private boolean enabled = false; + private boolean kafkaCheckEnabled = false; + private Duration interval = Duration.ofSeconds(1); public boolean isEnabled() { @@ -26,4 +28,12 @@ public Duration getInterval() { public void setIntervalSeconds(Duration intervalSeconds) { this.interval = intervalSeconds; } + + public boolean isKafkaCheckEnabled() { + return kafkaCheckEnabled; + } + + public void setKafkaCheckEnabled(boolean kafkaCheckEnabled) { + this.kafkaCheckEnabled = kafkaCheckEnabled; + } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/DefaultReadinessChecker.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/DefaultReadinessChecker.java index fbeba8273f..5cfb7806f9 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/DefaultReadinessChecker.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/server/DefaultReadinessChecker.java @@ -1,9 +1,16 @@ package pl.allegro.tech.hermes.frontend.server; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.domain.readiness.ReadinessRepository; +import pl.allegro.tech.hermes.api.Readiness; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; import java.time.Duration; import java.util.List; @@ -12,25 +19,39 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -public class DefaultReadinessChecker implements ReadinessChecker { +public class DefaultReadinessChecker implements ReadinessChecker, NodeCacheListener { private static final Logger logger = LoggerFactory.getLogger(DefaultReadinessChecker.class); private final boolean enabled; + private final boolean kafkaCheckEnabled; private final Duration interval; private final TopicMetadataLoadingRunner topicMetadataLoadingRunner; - private final ReadinessRepository readinessRepository; private final ScheduledExecutorService scheduler; + private final ObjectMapper mapper; + private final NodeCache cache; + private volatile boolean adminReady = false; private volatile boolean ready = false; public DefaultReadinessChecker(TopicMetadataLoadingRunner topicMetadataLoadingRunner, - ReadinessRepository readinessRepository, + CuratorFramework curator, + ZookeeperPaths paths, + ObjectMapper mapper, boolean enabled, + boolean kafkaCheckEnabled, Duration interval) { this.enabled = enabled; + this.kafkaCheckEnabled = kafkaCheckEnabled; this.interval = interval; this.topicMetadataLoadingRunner = topicMetadataLoadingRunner; - this.readinessRepository = readinessRepository; + this.mapper = mapper; + this.cache = new NodeCache(curator, paths.frontendReadinessPath()); + cache.getListenable().addListener(this); + try { + cache.start(true); + } catch (Exception e) { + throw new InternalProcessingException("Readiness cache cannot start.", e); + } ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("ReadinessChecker-%d").build(); this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); @@ -47,6 +68,7 @@ public boolean isReady() { @Override public void start() { if (enabled) { + refreshAdminReady(); ReadinessCheckerJob job = new ReadinessCheckerJob(); job.run(); scheduler.scheduleAtFixedRate(job, interval.toSeconds(), interval.toSeconds(), TimeUnit.SECONDS); @@ -57,6 +79,31 @@ public void start() { public void stop() throws InterruptedException { scheduler.shutdown(); scheduler.awaitTermination(1, TimeUnit.MINUTES); + try { + cache.close(); + } catch (Exception e) { + logger.warn("Failed to stop readiness cache", e); + } + } + + @Override + public void nodeChanged() { + refreshAdminReady(); + } + + private void refreshAdminReady() { + try { + ChildData nodeData = cache.getCurrentData(); + if (nodeData != null) { + byte[] data = nodeData.getData(); + Readiness value = mapper.readValue(data, Readiness.class); + adminReady = value.isReady(); + } else { + adminReady = true; + } + } catch (Exception e) { + logger.error("Failed reloading readiness cache. Current value: ready=" + ready, e); + } } private class ReadinessCheckerJob implements Runnable { @@ -64,7 +111,7 @@ private class ReadinessCheckerJob implements Runnable { @Override public void run() { - if (!readinessRepository.isReady()) { + if (!adminReady) { ready = false; } else if (kafkaReady) { ready = true; @@ -75,13 +122,16 @@ public void run() { } private boolean checkKafkaReadiness() { - try { - List results = topicMetadataLoadingRunner.refreshMetadata(); - return results.stream().noneMatch(MetadataLoadingResult::isFailure); - } catch (Exception ex) { - logger.warn("Unexpected error occurred during checking Kafka readiness", ex); - return false; + if (kafkaCheckEnabled) { + try { + List results = topicMetadataLoadingRunner.refreshMetadata(); + return results.stream().noneMatch(MetadataLoadingResult::isFailure); + } catch (Exception ex) { + logger.warn("Unexpected error occurred during checking Kafka readiness", ex); + return false; + } } + return true; } } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/ReadinessEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/ReadinessEndpoint.java index 84afd76cd7..470f6e4887 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/ReadinessEndpoint.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/ReadinessEndpoint.java @@ -17,6 +17,8 @@ import javax.ws.rs.core.Response; import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static pl.allegro.tech.hermes.api.DatacenterReadiness.ReadinessStatus.NOT_READY; +import static pl.allegro.tech.hermes.api.DatacenterReadiness.ReadinessStatus.READY; @Path("readiness/datacenters") @Component @@ -34,7 +36,7 @@ public ReadinessEndpoint(ReadinessService readinessService) { @RolesAllowed(Roles.ADMIN) @Path("/{datacenter}") public Response setReadiness(@PathParam("datacenter") String datacenter, Readiness readiness) { - readinessService.setReady(new DatacenterReadiness(datacenter, readiness.isReady())); + readinessService.setReady(new DatacenterReadiness(datacenter, readiness.isReady() ? READY : NOT_READY)); return Response.status(Response.Status.ACCEPTED).build(); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/storage/StorageConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/storage/StorageConfiguration.java index f00dc25f43..257b76e4ff 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/storage/StorageConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/storage/StorageConfiguration.java @@ -40,7 +40,6 @@ import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager; import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperRepositoryManager; -import java.util.List; import javax.annotation.PostConstruct; import static java.util.stream.Collectors.toList; @@ -109,10 +108,11 @@ MultiDatacenterRepositoryQueryExecutor multiDcRepositoryQueryExecutor( @Bean SummedSharedCounter summedSharedCounter(ZookeeperClientManager manager) { return new SummedSharedCounter( - getCuratorClients(manager), + manager.getClients(), storageClustersProperties.getSharedCountersExpiration(), storageClustersProperties.getRetrySleep(), - storageClustersProperties.getRetryTimes()); + storageClustersProperties.getRetryTimes() + ); } @Bean @@ -182,15 +182,6 @@ public void init() { private void ensureInitPathExists() { ZookeeperClientManager clientManager = clientManager(); - for (ZookeeperClient client : clientManager.getClients()) { - logger.info("Ensuring that path exists for Zookeeper client: {}", client.getDatacenterName()); - client.ensurePathExists(zookeeperPaths().groupsPath()); - } - } - - private List getCuratorClients(ZookeeperClientManager manager) { - return manager.getClients().stream() - .map(ZookeeperClient::getCuratorFramework) - .collect(toList()); + clientManager.getLocalClient().ensurePathExists(zookeeperPaths().groupsPath()); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/health/HealthCheckScheduler.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/health/HealthCheckScheduler.java index 2198817da2..d4e0554c09 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/health/HealthCheckScheduler.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/health/HealthCheckScheduler.java @@ -6,7 +6,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; import pl.allegro.tech.hermes.management.domain.mode.ModeService; @@ -18,16 +17,21 @@ import javax.annotation.PostConstruct; @Component -@ConditionalOnProperty(name = "management.health.enabled", havingValue = "true") public class HealthCheckScheduler { private static final Logger logger = LoggerFactory.getLogger(HealthCheckScheduler.class); - private final HealthCheckTask healthCheckTask; - private final Long period; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("storage-health-check-scheduler-%d").build() ); + private final ZookeeperClientManager zookeeperClientManager; + private final ZookeeperPaths zookeeperPaths; + private final NodeDataProvider nodeDataProvider; + private final ObjectMapper objectMapper; + private final ModeService modeService; + private final MeterRegistry meterRegistry; + private final Long periodSeconds; + private final boolean enabled; public HealthCheckScheduler(ZookeeperClientManager zookeeperClientManager, ZookeeperPaths zookeeperPaths, @@ -35,17 +39,37 @@ public HealthCheckScheduler(ZookeeperClientManager zookeeperClientManager, ObjectMapper objectMapper, ModeService modeService, MeterRegistry meterRegistry, - @Value("${management.health.periodSeconds}") Long periodSeconds) { - String healthCheckPath = - zookeeperPaths.nodeHealthPathForManagementHost(nodeDataProvider.getHostname(), nodeDataProvider.getServerPort()); - this.period = periodSeconds; - this.healthCheckTask = - new HealthCheckTask(zookeeperClientManager.getClients(), healthCheckPath, objectMapper, modeService, meterRegistry); + @Value("${management.health.periodSeconds:30}") Long periodSeconds, + @Value("${management.health.enabled:false}") boolean enabled) { + this.zookeeperClientManager = zookeeperClientManager; + this.zookeeperPaths = zookeeperPaths; + this.nodeDataProvider = nodeDataProvider; + this.objectMapper = objectMapper; + this.modeService = modeService; + this.meterRegistry = meterRegistry; + this.periodSeconds = periodSeconds; + this.enabled = enabled; } @PostConstruct public void scheduleHealthCheck() { - logger.info("Starting the storage health check scheduler"); - executorService.scheduleAtFixedRate(healthCheckTask, 0, period, TimeUnit.SECONDS); + if (enabled) { + logger.info("Starting the storage health check scheduler"); + String healthCheckPath = zookeeperPaths.nodeHealthPathForManagementHost( + nodeDataProvider.getHostname(), + nodeDataProvider.getServerPort() + ); + HealthCheckTask healthCheckTask = new HealthCheckTask( + zookeeperClientManager.getClients(), + healthCheckPath, + objectMapper, + modeService, + meterRegistry + ); + executorService.scheduleAtFixedRate(healthCheckTask, 0, periodSeconds, TimeUnit.SECONDS); + } else { + logger.info("Storage health check is disabled"); + modeService.setMode(ModeService.ManagementMode.READ_WRITE); + } } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/mode/ModeService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/mode/ModeService.java index 6b8583d0a5..0c75cd6c3c 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/mode/ModeService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/mode/ModeService.java @@ -26,7 +26,7 @@ public String toString() { } } - private volatile ManagementMode mode = ManagementMode.READ_WRITE; + private volatile ManagementMode mode = ManagementMode.READ_ONLY; public ManagementMode getMode() { return mode; diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/GetReadinessQuery.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/GetReadinessQuery.java index 4bcdcca24f..918592dd05 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/GetReadinessQuery.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/GetReadinessQuery.java @@ -1,14 +1,24 @@ package pl.allegro.tech.hermes.management.domain.readiness; -import pl.allegro.tech.hermes.domain.readiness.ReadinessRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.DatacenterReadiness.ReadinessStatus; import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; import pl.allegro.tech.hermes.management.domain.dc.QueryCommand; -public class GetReadinessQuery extends QueryCommand { +public class GetReadinessQuery extends QueryCommand { + + private static final Logger logger = LoggerFactory.getLogger(GetReadinessQuery.class); @Override - public Boolean query(DatacenterBoundRepositoryHolder holder) { - return holder.getRepository().isReady(); + public ReadinessStatus query(DatacenterBoundRepositoryHolder holder) { + try { + boolean ready = holder.getRepository().isReady(); + return ready ? ReadinessStatus.READY : ReadinessStatus.NOT_READY; + } catch (Exception e) { + logger.error("Cannot obtain readiness status from {}", holder.getDatacenterName(), e); + return ReadinessStatus.UNDEFINED; + } } @Override diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/readiness/ReadinessRepository.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessRepository.java similarity index 61% rename from hermes-common/src/main/java/pl/allegro/tech/hermes/domain/readiness/ReadinessRepository.java rename to hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessRepository.java index 8283e9fc1c..c33487f53e 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/readiness/ReadinessRepository.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/ReadinessRepository.java @@ -1,9 +1,8 @@ -package pl.allegro.tech.hermes.domain.readiness; +package pl.allegro.tech.hermes.management.domain.readiness; public interface ReadinessRepository { + boolean isReady(); void setReadiness(boolean isReady); - - void close(); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/SetReadinessCommand.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/SetReadinessCommand.java index a5750bf51b..a96f0caec9 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/SetReadinessCommand.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/readiness/SetReadinessCommand.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.management.domain.readiness; import pl.allegro.tech.hermes.api.DatacenterReadiness; -import pl.allegro.tech.hermes.domain.readiness.ReadinessRepository; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; import pl.allegro.tech.hermes.management.domain.dc.RepositoryCommand; @@ -18,7 +18,18 @@ public void backup(DatacenterBoundRepositoryHolder holder) @Override public void execute(DatacenterBoundRepositoryHolder holder) { if (holder.getDatacenterName().equals(readiness.getDatacenter())) { - holder.getRepository().setReadiness(readiness.isReady()); + holder.getRepository().setReadiness(isReady()); + } + } + + private boolean isReady() { + switch (readiness.getStatus()) { + case READY: + return true; + case NOT_READY: + return false; + default: + throw new InternalProcessingException("Invalid readiness status: " + readiness.getStatus()); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/SummedSharedCounter.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/SummedSharedCounter.java index d065139756..a5395f9786 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/SummedSharedCounter.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/SummedSharedCounter.java @@ -7,48 +7,95 @@ import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; import org.apache.curator.retry.ExponentialBackoffRetry; import pl.allegro.tech.hermes.infrastructure.zookeeper.counter.ZookeeperCounterException; +import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; -import static java.util.stream.Collectors.toList; - public class SummedSharedCounter { - private final List> distributedAtomicLongCaches; + private final LoadingCache counterAggregators; - public SummedSharedCounter(List curatorClients, int expireAfter, - int distributedLoaderBackoff, int distributedLoaderRetries) { - this.distributedAtomicLongCaches = curatorClients.stream() - .map(client -> buildLoadingCache(client, expireAfter, distributedLoaderBackoff, distributedLoaderRetries)) - .collect(toList()); + public SummedSharedCounter(List zookeeperClients, + int expireAfter, + int distributedLoaderBackoff, + int distributedLoaderRetries) { + this.counterAggregators = buildLoadingCache(zookeeperClients, expireAfter, distributedLoaderBackoff, distributedLoaderRetries); } public long getValue(String path) { - return distributedAtomicLongCaches.stream() - .map(distAtomicLong -> getValue(distAtomicLong, path)) - .reduce(0L, (a, b) -> a + b); - } - - private long getValue(LoadingCache distAtomicLong, String path) { try { - return distAtomicLong.get(path).get().preValue(); + return counterAggregators.get(path).aggregate(); + } catch (ZookeeperCounterException e) { + throw e; } catch (Exception e) { throw new ZookeeperCounterException(path, e); } } - private LoadingCache buildLoadingCache(CuratorFramework curatorClient, int expireAfter, - int distributedLoaderBackoff, int distributedLoaderRetries) { + private LoadingCache buildLoadingCache(List zookeeperClients, + int expireAfter, + int distributedLoaderBackoff, + int distributedLoaderRetries) { return CacheBuilder.newBuilder() .expireAfterAccess(expireAfter, TimeUnit.HOURS) - .build(new CacheLoader() { + .build(new CacheLoader<>() { @Override - public DistributedAtomicLong load(String key) { - return new DistributedAtomicLong(curatorClient, key, - new ExponentialBackoffRetry(distributedLoaderBackoff, distributedLoaderRetries)); + public CounterAggregator load(String key) { + return new CounterAggregator( + key, + zookeeperClients, + distributedLoaderBackoff, + distributedLoaderRetries + ); } } ); } + + private static class CounterAggregator { + + private final String counterName; + private final Map curatorPerDatacenter = new HashMap<>(); + private final Map counterPerDatacenter = new HashMap<>(); + + CounterAggregator(String counterName, + List zookeeperClients, + int distributedLoaderBackoff, + int distributedLoaderRetries) { + this.counterName = counterName; + for (ZookeeperClient zookeeperClient : zookeeperClients) { + CuratorFramework curatorFramework = zookeeperClient.getCuratorFramework(); + curatorPerDatacenter.put(zookeeperClient.getDatacenterName(), curatorFramework); + DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong( + curatorFramework, + counterName, + new ExponentialBackoffRetry(distributedLoaderBackoff, distributedLoaderRetries) + ); + counterPerDatacenter.put(zookeeperClient.getDatacenterName(), distributedAtomicLong); + } + } + + long aggregate() throws Exception { + long sum = 0; + for (Map.Entry counterEntry : counterPerDatacenter.entrySet()) { + ensureConnected(counterEntry.getKey()); + DistributedAtomicLong counter = counterEntry.getValue(); + sum += counter.get().preValue(); + } + return sum; + } + + private void ensureConnected(String datacenter) { + CuratorFramework curator = curatorPerDatacenter.get(datacenter); + if (!curator.getZookeeperClient().isConnected()) { + throw new ZookeeperCounterException( + counterName, + "Could not establish connection to a Zookeeper instance in " + datacenter + "." + ); + } + } + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/readiness/ZookeeperDatacenterReadinessRepository.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/readiness/ZookeeperDatacenterReadinessRepository.java new file mode 100644 index 0000000000..ec1d0fe0f3 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/readiness/ZookeeperDatacenterReadinessRepository.java @@ -0,0 +1,45 @@ +package pl.allegro.tech.hermes.management.infrastructure.readiness; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.KeeperException; +import pl.allegro.tech.hermes.api.Readiness; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperBasedRepository; +import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; +import pl.allegro.tech.hermes.management.domain.readiness.ReadinessRepository; + +public class ZookeeperDatacenterReadinessRepository extends ZookeeperBasedRepository implements ReadinessRepository { + + public ZookeeperDatacenterReadinessRepository(CuratorFramework curator, ObjectMapper mapper, ZookeeperPaths paths) { + super(curator, mapper, paths); + } + + @Override + public boolean isReady() { + try { + String path = paths.frontendReadinessPath(); + Readiness readiness = readFrom(path, Readiness.class); + return readiness.isReady(); + } catch (InternalProcessingException e) { + if (e.getCause() instanceof KeeperException.NoNodeException) { + return true; + } + throw e; + } + } + + @Override + public void setReadiness(boolean isReady) { + try { + String path = paths.frontendReadinessPath(); + if (!pathExists(path)) { + createRecursively(path, new Readiness(isReady)); + } else { + overwrite(path, new Readiness(isReady)); + } + } catch (Exception ex) { + throw new InternalProcessingException(ex); + } + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/retransmit/ZookeeperOfflineRetransmissionRepository.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/retransmit/ZookeeperOfflineRetransmissionRepository.java index 56364e3ed7..715f9db2af 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/retransmit/ZookeeperOfflineRetransmissionRepository.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/retransmit/ZookeeperOfflineRetransmissionRepository.java @@ -29,8 +29,7 @@ public ZookeeperOfflineRetransmissionRepository(CuratorFramework zookeeper, Obje public void saveTask(OfflineRetransmissionTask task) { logger.info("Saving retransmission task {}", task); try { - zookeeper.create().creatingParentsIfNeeded() - .forPath(paths.offlineRetransmissionPath(task.getTaskId()), mapper.writeValueAsBytes(task)); + createRecursively(paths.offlineRetransmissionPath(task.getTaskId()), task); logger.info("Successfully saved retransmission task {}", task); } catch (Exception ex) { String msg = format("Error while saving retransmission task %s", task.toString()); @@ -42,8 +41,7 @@ public void saveTask(OfflineRetransmissionTask task) { public List getAllTasks() { try { if (pathExists(paths.offlineRetransmissionPath())) { - return zookeeper.getChildren() - .forPath(paths.offlineRetransmissionPath()) + return childrenOf(paths.offlineRetransmissionPath()) .stream() .map(id -> readFrom(paths.offlineRetransmissionPath(id), OfflineRetransmissionTask.class)) .collect(Collectors.toList()); @@ -60,7 +58,7 @@ public void deleteTask(String taskId) { logger.info("Trying to delete retransmission task with id={}", taskId); try { ensureTaskExists(taskId); - zookeeper.delete().forPath(paths.offlineRetransmissionPath(taskId)); + remove(paths.offlineRetransmissionPath(taskId)); logger.info("Successfully deleted retransmission task with id={}", taskId); } catch (OfflineRetransmissionValidationException ex) { throw ex; diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperClientManager.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperClientManager.java index d65f1b86bb..022b629dda 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperClientManager.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperClientManager.java @@ -36,6 +36,7 @@ public ZookeeperClientManager(StorageClustersProperties properties, DatacenterNa public void start() { createClients(); selectLocalClient(); + waitForConnection(localClient.getCuratorFramework()); } private void createClients() { @@ -113,14 +114,12 @@ public List getAclForPath(String path) { ); CuratorFramework curator = builder.build(); - - startAndWaitForConnection(curator); + curator.start(); return curator; } - private void startAndWaitForConnection(CuratorFramework curator) { - curator.start(); + private void waitForConnection(CuratorFramework curator) { try { curator.blockUntilConnected(); } catch (InterruptedException interruptedException) { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java index 6a42f936bb..56724b7a24 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java @@ -11,14 +11,12 @@ import pl.allegro.tech.hermes.domain.CredentialsRepository; import pl.allegro.tech.hermes.domain.group.GroupRepository; import pl.allegro.tech.hermes.domain.oauth.OAuthProviderRepository; -import pl.allegro.tech.hermes.domain.readiness.ReadinessRepository; import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository; import pl.allegro.tech.hermes.domain.topic.TopicRepository; import pl.allegro.tech.hermes.domain.topic.preview.MessagePreviewRepository; import pl.allegro.tech.hermes.domain.workload.constraints.WorkloadConstraintsRepository; import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperCredentialsRepository; -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperDatacenterReadinessRepository; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperMessagePreviewRepository; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperOAuthProviderRepository; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; @@ -30,8 +28,10 @@ import pl.allegro.tech.hermes.management.domain.blacklist.TopicBlacklistRepository; import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder; import pl.allegro.tech.hermes.management.domain.dc.RepositoryManager; +import pl.allegro.tech.hermes.management.domain.readiness.ReadinessRepository; import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionRepository; import pl.allegro.tech.hermes.management.infrastructure.blacklist.ZookeeperTopicBlacklistRepository; +import pl.allegro.tech.hermes.management.infrastructure.readiness.ZookeeperDatacenterReadinessRepository; import pl.allegro.tech.hermes.management.infrastructure.retransmit.ZookeeperOfflineRetransmissionRepository; import java.util.Comparator; diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/health/HealthCheckTaskTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/health/HealthCheckTaskTest.groovy index 305f75b47e..c3f981feb4 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/health/HealthCheckTaskTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/health/HealthCheckTaskTest.groovy @@ -27,6 +27,7 @@ class HealthCheckTaskTest extends MultiZookeeperIntegrationTest { healthCheckTask = new HealthCheckTask(manager.clients, healthCheckPath, new ObjectMapper().registerModule(new JavaTimeModule()), modeService, meterRegistry) successfulCounter = meterRegistry.counter('storage-health-check.successful') failedCounter = meterRegistry.counter('storage-health-check.failed') + modeService.mode = ModeService.ManagementMode.READ_WRITE } def cleanup() { diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/SummedSharedCounterTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/SummedSharedCounterTest.groovy index 82deaaac72..3515e821aa 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/SummedSharedCounterTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/SummedSharedCounterTest.groovy @@ -30,8 +30,7 @@ class SummedSharedCounterTest extends MultiZookeeperIntegrationTest { def zkClientDc2 = findClientByDc(manager.clients, DC_2_NAME).curatorFramework sharedCounterDc2 = new SharedCounter(zkClientDc2, EXPIRE_AFTER, DISTRIBUTED_LEADER_BACKOFF, DISTRIBUTED_LEADER_RETRIES) - def zkClients = manager.clients.collect { client -> client.curatorFramework } - summedSharedCounter = new SummedSharedCounter(zkClients, (int) EXPIRE_AFTER.toHours(), (int) DISTRIBUTED_LEADER_BACKOFF.toMillis(), DISTRIBUTED_LEADER_RETRIES) + summedSharedCounter = new SummedSharedCounter(manager.clients, (int) EXPIRE_AFTER.toHours(), (int) DISTRIBUTED_LEADER_BACKOFF.toMillis(), DISTRIBUTED_LEADER_RETRIES) } def cleanup() { diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperClientManagerTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperClientManagerTest.groovy index db3c9397d3..31f9786eb3 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperClientManagerTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperClientManagerTest.groovy @@ -53,6 +53,7 @@ class ZookeeperClientManagerTest extends MultiZookeeperIntegrationTest { def manager = buildZookeeperClientManager() manager.start() def clients = manager.clients + assertZookeeperClientsConnected(clients) when: manager.stop() diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/utils/MultiZookeeperIntegrationTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/utils/MultiZookeeperIntegrationTest.groovy index f593c46451..fcff24afe0 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/utils/MultiZookeeperIntegrationTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/utils/MultiZookeeperIntegrationTest.groovy @@ -8,6 +8,7 @@ import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClien import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager import pl.allegro.tech.hermes.test.helper.util.Ports import spock.lang.Specification +import spock.util.concurrent.PollingConditions abstract class MultiZookeeperIntegrationTest extends Specification { @@ -16,6 +17,8 @@ abstract class MultiZookeeperIntegrationTest extends Specification { static final int DC_2_ZOOKEEPER_PORT = Ports.nextAvailable() static final String DC_2_NAME = "dc2" + def conditions = new PollingConditions(timeout: 120) + TestingServer zookeeper1 TestingServer zookeeper2 @@ -31,20 +34,24 @@ abstract class MultiZookeeperIntegrationTest extends Specification { zookeeper2.stop() } - static assertClientConnected(ZookeeperClient client) { - return client.getCuratorFramework().getZookeeperClient().isConnected() + void assertClientConnected(ZookeeperClient client) { + conditions.eventually { + assert client.getCuratorFramework().getZookeeperClient().isConnected() + } } - static assertClientDisconnected(ZookeeperClient client) { - return !client.getCuratorFramework().getZookeeperClient().isConnected() + void assertClientDisconnected(ZookeeperClient client) { + conditions.eventually { + assert !client.getCuratorFramework().getZookeeperClient().isConnected() + } } - static assertZookeeperClientsConnected(List clients) { + void assertZookeeperClientsConnected(List clients) { def dc1Client = findClientByDc(clients, DC_1_NAME) - assert assertClientConnected(dc1Client) + assertClientConnected(dc1Client) def dc2Client = findClientByDc(clients, DC_2_NAME) - assert assertClientConnected(dc2Client) + assertClientConnected(dc2Client) } static buildZookeeperClientManager(String dc = "dc1") { diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/HermesAPIOperations.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/HermesAPIOperations.java index 2b01a70c97..6624aad7b4 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/HermesAPIOperations.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/endpoint/HermesAPIOperations.java @@ -245,4 +245,8 @@ public void createOAuthProvider(OAuthProvider oAuthProvider) { public void setReadiness(String dcName, boolean isReady) { assertThat(endpoints.readiness().setReadiness(dcName, new Readiness(isReady)).getStatus()).isEqualTo(ACCEPTED.getStatusCode()); } + + public boolean isInReadWriteMode() { + return "readWrite".equals(endpoints.modeEndpoint().getMode()); + } } diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/frontend/FrontendConfigurationProperties.java b/integration/src/integration/java/pl/allegro/tech/hermes/frontend/FrontendConfigurationProperties.java index 7ba7fc6919..09004a7358 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/frontend/FrontendConfigurationProperties.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/frontend/FrontendConfigurationProperties.java @@ -9,6 +9,7 @@ public class FrontendConfigurationProperties { public static String FRONTEND_MESSAGE_PREVIEW_ENABLED = "frontend.message.preview.enabled"; public static String FRONTEND_MESSAGE_PREVIEW_LOG_PERSIST_PERIOD = "frontend.message.preview.logPersistPeriod"; public static String FRONTEND_READINESS_CHECK_ENABLED = "frontend.readiness.check.enabled"; + public static String FRONTEND_READINESS_CHECK_KAFKA_CHECK_ENABLED = "frontend.readiness.check.kafkaCheckEnabled"; public static String FRONTEND_READINESS_CHECK_INTERVAL_SECONDS = "frontend.readiness.check.interval"; public static String FRONTEND_AUTHENTICATION_MODE = "frontend.handlers.authentication.mode"; public static String FRONTEND_AUTHENTICATION_ENABLED = "frontend.handlers.authentication.enabled"; diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/ReadinessCheckTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/ReadinessCheckTest.java index 55af1e5864..438f5943d2 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/ReadinessCheckTest.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/ReadinessCheckTest.java @@ -38,6 +38,7 @@ public void shouldNotBeReadyUntilKafkaClusterIsUp() { HermesFrontendInstance hermesFrontend = HermesFrontendInstance.starter() .metadataMaxAgeInSeconds(1) .readinessCheckIntervalInSeconds(1) + .kafkaCheckEnabled() .zookeeperConnectionString(hermesZookeeperOne.getConnectionString()) .kafkaConnectionString(kafkaClusterOne.getBootstrapServersForExternalClients()) .start(); @@ -59,6 +60,29 @@ public void shouldNotBeReadyUntilKafkaClusterIsUp() { hermesFrontend.stop(); } + @Test + public void shouldRespectKafkaCheckEnabledFlag() { + // given + kafkaClusterOne.cutOffConnectionsBetweenBrokersAndClients(); + + // when + HermesFrontendInstance hermesFrontend = HermesFrontendInstance.starter() + .metadataMaxAgeInSeconds(1) + .readinessCheckIntervalInSeconds(1) + .kafkaCheckDisabled() + .zookeeperConnectionString(hermesZookeeperOne.getConnectionString()) + .kafkaConnectionString(kafkaClusterOne.getBootstrapServersForExternalClients()) + .start(); + + // then + assertThat(hermesFrontend.isReady()).isTrue(); + assertThat(hermesFrontend.isHealthy()).isTrue(); + + // cleanup + kafkaClusterOne.restoreConnectionsBetweenBrokersAndClients(); + hermesFrontend.stop(); + } + @Test public void shouldNotBeReadyUntilThereAreNoUnderReplicatedPartitions() throws Exception { // given @@ -71,6 +95,7 @@ public void shouldNotBeReadyUntilThereAreNoUnderReplicatedPartitions() throws Ex HermesFrontendInstance hermesFrontend = HermesFrontendInstance.starter() .metadataMaxAgeInSeconds(1) .readinessCheckIntervalInSeconds(1) + .kafkaCheckEnabled() .zookeeperConnectionString(hermesZookeeperOne.getConnectionString()) .kafkaConnectionString(kafkaClusterOne.getBootstrapServersForExternalClients()) .start(); @@ -117,6 +142,7 @@ public void shouldNotBeReadyUntilThereAreNoOfflinePartitions() throws Exception HermesFrontendInstance hermesFrontend = HermesFrontendInstance.starter() .metadataMaxAgeInSeconds(1) .readinessCheckIntervalInSeconds(1) + .kafkaCheckEnabled() .zookeeperConnectionString(hermesZookeeperOne.getConnectionString()) .kafkaConnectionString(kafkaClusterOne.getBootstrapServersForExternalClients()) .start(); diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/env/HermesIntegrationEnvironment.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/env/HermesIntegrationEnvironment.java index d72dfe0c5f..2c9eea9efd 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/env/HermesIntegrationEnvironment.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/env/HermesIntegrationEnvironment.java @@ -93,8 +93,10 @@ public void prepareEnvironment(ITestContext context) { .replicationFactor(kafkaClusterOne.getAllBrokers().size()) .uncleanLeaderElectionEnabled(false) .start(); + // Since we don't start a management instance for DC2 we need to create the root path manually. + initializeRootPathInZookeeperTwo(); - zookeeper = startZookeeperClient(); + zookeeper = startZookeeperClient(hermesZookeeperOne.getConnectionString()); ConsumersStarter consumersStarter = new ConsumersStarter(); consumersStarter.overrideProperty(ConsumerConfigurationProperties.KAFKA_AUTHORIZATION_ENABLED, false); @@ -132,15 +134,21 @@ public void prepareEnvironment(ITestContext context) { } } - private CuratorFramework startZookeeperClient() { + private CuratorFramework startZookeeperClient(String connectString) { final CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder() - .connectString(hermesZookeeperOne.getConnectionString()) + .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); zookeeperClient.start(); return zookeeperClient; } + private void initializeRootPathInZookeeperTwo() throws Exception { + try (CuratorFramework curatorFramework = startZookeeperClient(hermesZookeeperTwo.getConnectionString())) { + curatorFramework.create().creatingParentsIfNeeded().forPath("/hermes/groups"); + } + } + @AfterSuite(alwaysRun = true) public void cleanEnvironment() throws Exception { try { diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/GroupManagementTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/GroupManagementTest.java index abbef0a1bf..d295d4cefd 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/GroupManagementTest.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/GroupManagementTest.java @@ -59,11 +59,11 @@ public void shouldEmmitAuditEventWhenGroupUpdated() { @Test public void shouldCreateGroup() { // given when - Response response = management.group().create(group("testGroup").build()); + Response response = management.group().create(group("groupToCreate").build()); // then assertThat(response).hasStatus(Response.Status.CREATED); - Assertions.assertThat(management.group().list()).contains("testGroup"); + Assertions.assertThat(management.group().list()).contains("groupToCreate"); } @Test diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/setup/HermesFrontendInstance.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/setup/HermesFrontendInstance.java index 4f1e31a649..4e93b67adc 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/setup/HermesFrontendInstance.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/setup/HermesFrontendInstance.java @@ -12,6 +12,7 @@ import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_HTTP2_ENABLED; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_READINESS_CHECK_ENABLED; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_READINESS_CHECK_INTERVAL_SECONDS; +import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_READINESS_CHECK_KAFKA_CHECK_ENABLED; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.METRICS_GRAPHITE_REPORTER_ENABLED; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.METRICS_ZOOKEEPER_REPORTER_ENABLED; @@ -86,6 +87,16 @@ public Starter kafkaConnectionString(String connectionString) { return this; } + public Starter kafkaCheckEnabled() { + frontend.overrideProperty(FRONTEND_READINESS_CHECK_KAFKA_CHECK_ENABLED, true); + return this; + } + + public Starter kafkaCheckDisabled() { + frontend.overrideProperty(FRONTEND_READINESS_CHECK_KAFKA_CHECK_ENABLED, false); + return this; + } + public HermesFrontendInstance start() { try { frontend.start(); diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/setup/HermesManagementInstance.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/setup/HermesManagementInstance.java index 1eec4dee30..a63f29c585 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/setup/HermesManagementInstance.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/setup/HermesManagementInstance.java @@ -23,7 +23,9 @@ import static pl.allegro.tech.hermes.test.helper.endpoint.TimeoutAdjuster.adjust; public class HermesManagementInstance { + private static final Logger logger = getLogger(HermesManagementInstance.class); + private final HermesAPIOperations operations; private HermesManagementInstance(HermesAPIOperations operations) { @@ -83,39 +85,16 @@ public Starter addKafkaCluster(String dc, String connectionString) { public HermesManagementInstance start() { try { startManagement(); - List clusters = startSeparateZookeeperClientPerCluster(); - waitUntilStructureInZookeeperIsCreated(clusters); - closeZookeeperClustersConnections(clusters); HermesAPIOperations operations = setupOperations(startZookeeperClient()); + waitUntilManagementIsInReadWriteMode(operations); return new HermesManagementInstance(operations); } catch (Exception e) { throw new RuntimeException(e); } } - private void waitUntilStructureInZookeeperIsCreated(List zookeeperClusters) { - logger.info("Waiting for zookeeper structure to be created"); - try { - waitAtMost(adjust(240), TimeUnit.SECONDS).until(() -> allZookeeperClustersHaveStructureCreated(zookeeperClusters)); - } catch (ConditionTimeoutException ex) { - logger.error("Structure for Zookeeper does not exist in 240 seconds"); - throw ex; - } - } - - private boolean allZookeeperClustersHaveStructureCreated(List zookeeperClusters) throws Exception { - for (CuratorFramework zookeeper : zookeeperClusters) { - if (zookeeper.checkExists().forPath("/hermes/groups") == null) { - logger.info("Structure for Zookeeper does not exist yet"); - return false; - } - } - logger.info("Structure for Zookeepers exists in 240 seconds"); - return true; - } - - private void closeZookeeperClustersConnections(List zookeeperClusters) { - zookeeperClusters.forEach(CuratorFramework::close); + private void waitUntilManagementIsInReadWriteMode(HermesAPIOperations operations) { + waitAtMost(adjust(240), TimeUnit.SECONDS).until(operations::isInReadWriteMode); } private void startManagement() {
# {{$index + 1}} {{item.datacenter}} - - + + +
Cannot obtain readiness status