Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: allegro/hermes
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 3e78e3a8d8a392db4f2de0d6748af58e4833c8f9
Choose a base ref
..
head repository: allegro/hermes
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 3ef19c8e7fc6402fa422c556895a8ec287d78345
Choose a head ref
Showing with 539 additions and 112 deletions.
  1. +1 −1 build.gradle
  2. +6 −3 hermes-api/src/main/java/pl/allegro/tech/hermes/api/RetentionTime.java
  3. +0 −3 hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionPolicy.java
  4. +8 −4 hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java
  5. +3 −1 hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicWithSchema.java
  6. +34 −3 hermes-api/src/test/java/pl/allegro/tech/hermes/api/TopicTest.java
  7. +6 −3 hermes-common/src/main/java/pl/allegro/tech/hermes/common/di/factories/ObjectMapperFactory.java
  8. +1 −1 hermes-common/src/test/groovy/pl/allegro/tech/hermes/test/IntegrationTest.groovy
  9. +1 −1 hermes-common/src/test/java/pl/allegro/tech/hermes/common/di/ObjectMapperFactoryTest.java
  10. +1 −1 hermes-console/src/views/search/topic-search-results/TopicSearchResults.vue
  11. +5 −1 hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/CommonConfiguration.java
  12. +4 −0 hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Message.java
  13. +27 −5 .../main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java
  14. +44 −1 ...oovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy
  15. +4 −3 hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/CommonConfiguration.java
  16. +16 −0 hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/TopicDefaultsProperties.java
  17. +1 −1 hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java
  18. +2 −1 ...es-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java
  19. +10 −0 hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/TopicProperties.java
  20. +38 −0 ...n/java/pl/allegro/tech/hermes/management/domain/subscription/validator/SubscriptionValidator.java
  21. +0 −1 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicService.java
  22. +58 −5 ...gement/src/main/java/pl/allegro/tech/hermes/management/domain/topic/validator/TopicValidator.java
  23. +96 −3 .../pl/allegro/tech/hermes/management/domain/subscription/validator/InflightSizeValidatorTest.groovy
  24. +2 −1 ...rc/test/groovy/pl/allegro/tech/hermes/management/domain/topic/validator/TopicValidatorTest.groovy
  25. +158 −19 ...o/tech/hermes/management/domain/topic/validator/TopicValidatorWithRealApiPreconditionsTest.groovy
  26. +0 −40 ...ationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java
  27. +13 −10 .../integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ allprojects {
targetCompatibility = JavaVersion.VERSION_17

project.ext.versions = [
kafka : '3.6.2',
kafka : '2.8.2',
guava : '33.1.0-jre',
jackson : '2.17.0',
jersey : '3.1.6',
Original file line number Diff line number Diff line change
@@ -2,18 +2,21 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import pl.allegro.tech.hermes.api.constraints.AdminPermitted;

import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class RetentionTime {
private static final TimeUnit DEFAULT_UNIT = TimeUnit.DAYS;

public static RetentionTime MAX = new RetentionTime(7, TimeUnit.DAYS);
public static Set<TimeUnit> allowedUnits = EnumSet.of(TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS);


@Min(0)
@Max(value = 7, groups = AdminPermitted.class)
private final int duration;

private final TimeUnit retentionUnit;
Original file line number Diff line number Diff line change
@@ -5,8 +5,6 @@
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Null;
import pl.allegro.tech.hermes.api.constraints.AdminPermitted;
import pl.allegro.tech.hermes.api.helpers.Patch;

import java.util.Map;
@@ -42,7 +40,6 @@ public class SubscriptionPolicy {
private int socketTimeout = DEFAULT_SOCKET_TIMEOUT;

@Min(1)
@Null(groups = AdminPermitted.class)
private Integer inflightSize;

@Min(0)
12 changes: 8 additions & 4 deletions hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java
Original file line number Diff line number Diff line change
@@ -37,7 +37,8 @@ public class Topic {
private boolean jsonToAvroDryRunEnabled = false;
@NotNull
private Ack ack;
private boolean fallbackToRemoteDatacenterEnabled;
public static final String DEFAULT_FALLBACK_TO_REMOTE_DATACENTER_KEY = "defaultFallbackToRemoteDatacenterEnabled";
private final boolean fallbackToRemoteDatacenterEnabled;
private PublishingChaosPolicy chaos;
@NotNull
private ContentType contentType;
@@ -58,8 +59,10 @@ public class Topic {
private Instant modifiedAt;

public Topic(TopicName name, String description, OwnerId owner, RetentionTime retentionTime,
boolean migratedFromJsonType, Ack ack, boolean fallbackToRemoteDatacenterEnabled, PublishingChaosPolicy chaos,
boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled,
boolean migratedFromJsonType, Ack ack,
@JacksonInject(value = DEFAULT_FALLBACK_TO_REMOTE_DATACENTER_KEY, useInput = OptBoolean.TRUE)
Boolean fallbackToRemoteDatacenterEnabled,
PublishingChaosPolicy chaos, boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled,
@JacksonInject(value = DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, useInput = OptBoolean.TRUE)
Boolean schemaIdAwareSerializationEnabled,
int maxMessageSize, PublishingAuth publishingAuth, boolean subscribingRestricted,
@@ -93,7 +96,8 @@ public Topic(
@JsonProperty("retentionTime") RetentionTime retentionTime,
@JsonProperty("jsonToAvroDryRun") boolean jsonToAvroDryRunEnabled,
@JsonProperty("ack") Ack ack,
@JsonProperty("fallbackToRemoteDatacenterEnabled") boolean fallbackToRemoteDatacenterEnabled,
@JacksonInject(value = DEFAULT_FALLBACK_TO_REMOTE_DATACENTER_KEY, useInput = OptBoolean.TRUE)
@JsonProperty("fallbackToRemoteDatacenterEnabled") Boolean fallbackToRemoteDatacenterEnabled,
@JsonProperty("chaos") PublishingChaosPolicy chaos,
@JsonProperty("trackingEnabled") boolean trackingEnabled,
@JsonProperty("migratedFromJsonType") boolean migratedFromJsonType,
Original file line number Diff line number Diff line change
@@ -32,7 +32,9 @@ public TopicWithSchema(@JsonProperty("schema") String schema,
@JsonProperty("retentionTime") RetentionTime retentionTime,
@JsonProperty("jsonToAvroDryRun") boolean jsonToAvroDryRunEnabled,
@JsonProperty("ack") Ack ack,
@JsonProperty("fallbackToRemoteDatacenterEnabled") boolean fallbackToRemoteDatacenterEnabled,
@JsonProperty("fallbackToRemoteDatacenterEnabled")
@JacksonInject(value = DEFAULT_FALLBACK_TO_REMOTE_DATACENTER_KEY, useInput = OptBoolean.TRUE)
boolean fallbackToRemoteDatacenterEnabled,
@JsonProperty("chaos") PublishingChaosPolicy chaos,
@JsonProperty("trackingEnabled") boolean trackingEnabled,
@JsonProperty("migratedFromJsonType") boolean migratedFromJsonType,
37 changes: 34 additions & 3 deletions hermes-api/src/test/java/pl/allegro/tech/hermes/api/TopicTest.java
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@

public class TopicTest {

private final ObjectMapper objectMapper = createObjectMapper();
private final ObjectMapper objectMapper = createObjectMapper(false);

@Test
public void shouldDeserializeTopicWithDefaults() throws Exception {
@@ -65,11 +65,42 @@ public void shouldSkippedDeserializedOldSchemaVersionId() throws Exception {
assertThat(topic.getName().getName()).isEqualTo("bar");
}

private ObjectMapper createObjectMapper() {
@Test
public void shouldDeserializeFallbackToRemoteDatacenterWithDefaults() throws Exception {
// given
String json = "{\"name\":\"foo.bar\", \"description\": \"description\"}";

// when
Topic topic = objectMapper.readValue(json, Topic.class);

// then
assertThat(topic.isFallbackToRemoteDatacenterEnabled()).isEqualTo(false);

// and when
Topic topic2 = createObjectMapper(true).readValue(json, Topic.class);

// then
assertThat(topic2.isFallbackToRemoteDatacenterEnabled()).isEqualTo(true);
}

@Test
public void shouldDeserializeFallbackToRemoteDatacenter() throws Exception {
// given
String json = "{\"name\":\"foo.bar\", \"description\": \"description\", \"fallbackToRemoteDatacenterEnabled\": true}";

// when
Topic topic = objectMapper.readValue(json, Topic.class);

// then
assertThat(topic.isFallbackToRemoteDatacenterEnabled()).isEqualTo(true);
}

private ObjectMapper createObjectMapper(boolean fallbackToRemoteDatacenterEnabled) {
ObjectMapper mapper = new ObjectMapper();

final InjectableValues defaultSchemaIdAwareSerializationEnabled = new InjectableValues
.Std().addValue(Topic.DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, true);
.Std().addValue(Topic.DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, true)
.addValue(Topic.DEFAULT_FALLBACK_TO_REMOTE_DATACENTER_KEY, fallbackToRemoteDatacenterEnabled);

mapper.setInjectableValues(defaultSchemaIdAwareSerializationEnabled);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
Original file line number Diff line number Diff line change
@@ -11,9 +11,11 @@
public class ObjectMapperFactory {

private final boolean schemaIdSerializationEnabled;
private final boolean fallbackToRemoteDatacenterEnabled;

public ObjectMapperFactory(boolean schemaIdSerializationEnabled) {
public ObjectMapperFactory(boolean schemaIdSerializationEnabled, boolean fallbackToRemoteDatacenterEnabled) {
this.schemaIdSerializationEnabled = schemaIdSerializationEnabled;
this.fallbackToRemoteDatacenterEnabled = fallbackToRemoteDatacenterEnabled;
}

public ObjectMapper provide() {
@@ -23,8 +25,9 @@ public ObjectMapper provide() {
objectMapper.disable(SerializationFeature.WRITE_NULL_MAP_VALUES);
objectMapper.registerModule(new JavaTimeModule());

final InjectableValues defaultSchemaIdAwareSerializationEnabled = new InjectableValues
.Std().addValue(Topic.DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, schemaIdSerializationEnabled);
final InjectableValues defaultSchemaIdAwareSerializationEnabled = new InjectableValues.Std()
.addValue(Topic.DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, schemaIdSerializationEnabled)
.addValue(Topic.DEFAULT_FALLBACK_TO_REMOTE_DATACENTER_KEY, fallbackToRemoteDatacenterEnabled);
objectMapper.setInjectableValues(defaultSchemaIdAwareSerializationEnabled);

return objectMapper;
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ abstract class IntegrationTest extends Specification {

protected RepositoryWaiter wait = new RepositoryWaiter(zookeeperResource.curator(), paths)

protected ObjectMapper mapper = new ObjectMapperFactory(true).provide()
protected ObjectMapper mapper = new ObjectMapperFactory(true, false).provide()

protected ZookeeperGroupRepository groupRepository = new ZookeeperGroupRepository(zookeeper(), mapper, paths)

Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ public class ObjectMapperFactoryTest {

@Before
public void init() {
ObjectMapperFactory factory = new ObjectMapperFactory(false);
ObjectMapperFactory factory = new ObjectMapperFactory(false, false);
mapper = factory.provide();
}

Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
}
function onTopicBlankClick(topic: Topic) {
const group = groupName(topic.name);
window.open(`/ui/groups/${group}//topics/${topic.name}`, '_blank');
window.open(`/ui/groups/${group}/topics/${topic.name}`, '_blank');
}
</script>

Original file line number Diff line number Diff line change
@@ -176,7 +176,11 @@ public ZookeeperAdminCache zookeeperAdminCache(ZookeeperPaths zookeeperPaths,

@Bean
public ObjectMapper objectMapper(SchemaProperties schemaProperties) {
return new ObjectMapperFactory(schemaProperties.isIdSerializationEnabled()).provide();
return new ObjectMapperFactory(
schemaProperties.isIdSerializationEnabled(),
/* fallbackToRemoteDatacenter is frontend specific property, we so don't expose consumer side property for it */
false
).provide();
}


Original file line number Diff line number Diff line change
@@ -138,6 +138,10 @@ public String getId() {
return id;
}

public synchronized Set<String> getSucceededUris() {
return succeededUris;
}

@Override
public Map<String, String> getExternalMetadata() {
return externalMetadata;
Original file line number Diff line number Diff line change
@@ -17,8 +17,12 @@
import pl.allegro.tech.hermes.consumers.consumer.sender.resolver.ResolvableEndpointAddress;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -58,17 +62,35 @@ public CompletableFuture<MessageSendingResult> send(Message message) {

private CompletableFuture<List<SingleMessageSendingResult>> sendMessage(Message message) {
try {
List<CompletableFuture<SingleMessageSendingResult>> results = collectResults(message);
Set<CompletableFuture<SingleMessageSendingResult>> results = collectResults(message);
return mergeResults(results);
} catch (EndpointAddressResolutionException exception) {
return CompletableFuture.completedFuture(Collections.singletonList(exceptionMapper.apply(exception)));
}
}

private List<CompletableFuture<SingleMessageSendingResult>> collectResults(
private Set<CompletableFuture<SingleMessageSendingResult>> collectResults(
Message message
) throws EndpointAddressResolutionException {
var currentResults = sendPendingMessages(message);
var results = new HashSet<>(currentResults);

// add previously succeeded uris to the result set so that successful uris from all attempts are retained.
// this way a MessageSendingResult can be considered successful even when the last send attempt
// did not send to any uri, e.g. because all uris returned by endpoint resolver were already sent to in the past.
for (String succeededUri : message.getSucceededUris()) {
try {
var uri = new URI(succeededUri);
var result = MessageSendingResult.succeededResult(uri);
results.add(CompletableFuture.completedFuture(result));
} catch (URISyntaxException exception) {
logger.error("Error while parsing already sent broadcast URI {}", succeededUri, exception);
}
}
return results;
}

private Set<CompletableFuture<SingleMessageSendingResult>> sendPendingMessages(Message message) throws EndpointAddressResolutionException {
final HttpRequestData requestData = new HttpRequestDataBuilder()
.withRawAddress(endpoint.getRawAddress())
.build();
@@ -80,16 +102,16 @@ private List<CompletableFuture<SingleMessageSendingResult>> collectResults(

if (resolvedUris.isEmpty()) {
logger.debug("Empty resolved URIs for message: {}", message.getId());
return Collections.emptyList();
return Collections.emptySet();
} else {
return resolvedUris.stream()
.map(uri -> requestFactory.buildRequest(message, uri, headers))
.map(this::processResponse)
.collect(Collectors.toList());
.collect(Collectors.toSet());
}
}

private CompletableFuture<List<SingleMessageSendingResult>> mergeResults(List<CompletableFuture<SingleMessageSendingResult>> results) {
private CompletableFuture<List<SingleMessageSendingResult>> mergeResults(Set<CompletableFuture<SingleMessageSendingResult>> results) {
return CompletableFuture.allOf(results.toArray(new CompletableFuture[results.size()]))
.thenApply(v -> results.stream()
.map(CompletableFuture::join)
Original file line number Diff line number Diff line change
@@ -150,7 +150,7 @@ class JettyBroadCastMessageSenderTest extends Specification {
future.get(1, TimeUnit.SECONDS).succeeded()
}

def "should return not succeeded and retry later when endpoint resolver return no hosts"() {
def "should return not succeeded and retry later when endpoint resolver return no hosts and no message was sent previously"() {
given:
def address = Stub(ResolvableEndpointAddress) {
resolveAllFor(_ as Message) >> []
@@ -173,6 +173,49 @@ class JettyBroadCastMessageSenderTest extends Specification {
messageSendingResult.isRetryLater()
}

def "should return succeeded when endpoint resolver return no hosts and but message was sent previously"() {
given:
Message message = testMessage()
message.incrementRetryCounter([serviceEndpoints[0].url])
def address = Stub(ResolvableEndpointAddress) {
resolveAllFor(_ as Message) >> []

getRawAddress() >> endpoint
}

def httpRequestFactory = new DefaultHttpRequestFactory(client, 1000, 1000, new DefaultHttpMetadataAppender())
MessageSender messageSender = new JettyBroadCastMessageSender(httpRequestFactory, address,
requestHeadersProvider, resultHandlersProvider, Mock(ResilientMessageSender))

when:
def future = messageSender.send(message)

then:
MessageSendingResult messageSendingResult = future.get(1, TimeUnit.SECONDS)

messageSendingResult.succeeded()
}


def "should return succeeded when endpoint resolver returns the same urls that the message was already sent to"() {
given: "a message that was sent"
ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) {
0 * registerSuccessfulSending()
}

serviceEndpoints.forEach { endpoint -> endpoint.expectMessages(TEST_MESSAGE_CONTENT) }

Message message = testMessage()
message.incrementRetryCounter(serviceEndpoints.collect { it.url })

when:
def future = getSender(rateLimiter).send(message)

then:
MessageSendingResult messageSendingResult = future.get(1, TimeUnit.SECONDS)
messageSendingResult.succeeded()
}

def cleanupSpec() {
wireMockServers.forEach { it.stop() }
client.stop()
Original file line number Diff line number Diff line change
@@ -83,7 +83,8 @@
ZookeeperClustersProperties.class,
KafkaClustersProperties.class,
ContentRootProperties.class,
DatacenterNameProperties.class
DatacenterNameProperties.class,
TopicDefaultsProperties.class
})
public class CommonConfiguration {

@@ -183,8 +184,8 @@ public ZookeeperAdminCache zookeeperAdminCache(ZookeeperPaths zookeeperPaths,
}

@Bean
public ObjectMapper objectMapper(SchemaProperties schemaProperties) {
return new ObjectMapperFactory(schemaProperties.isIdSerializationEnabled()).provide();
public ObjectMapper objectMapper(SchemaProperties schemaProperties, TopicDefaultsProperties topicDefaults) {
return new ObjectMapperFactory(schemaProperties.isIdSerializationEnabled(), topicDefaults.isFallbackToRemoteDatacenterEnabled()).provide();
}

@Bean
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package pl.allegro.tech.hermes.frontend.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "frontend.topic.defaults")
public class TopicDefaultsProperties {
private boolean fallbackToRemoteDatacenterEnabled = false;

public boolean isFallbackToRemoteDatacenterEnabled() {
return fallbackToRemoteDatacenterEnabled;
}

public void setFallbackToRemoteDatacenterEnabled(boolean fallbackToRemoteDatacenterEnabled) {
this.fallbackToRemoteDatacenterEnabled = fallbackToRemoteDatacenterEnabled;
}
}
Loading