Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: allegro/hermes
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: d7e6acb06b3b17b5ed10e1c1c0cb3e91761b7fd3
Choose a base ref
..
head repository: allegro/hermes
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 061996a6b9dbb8a07d8f5a3016ab9e8e8bd8a088
Choose a head ref
Showing with 147 additions and 82 deletions.
  1. +15 −7 hermes-api/src/main/java/pl/allegro/tech/hermes/api/PublishedMessageTrace.java
  2. +1 −1 hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java
  3. +6 −2 hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoader.java
  4. +5 −2 hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendConfiguration.java
  5. +4 −2 ...rontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendPublishingConfiguration.java
  6. +12 −4 ...nd/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/end/MessageEndProcessor.java
  7. +10 −4 hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessagesLoaderTest.java
  8. +1 −0 ...cker-elasticsearch/src/main/java/pl/allegro/tech/hermes/tracker/elasticsearch/LogSchemaAware.java
  9. +2 −0 ...acker-elasticsearch/src/main/java/pl/allegro/tech/hermes/tracker/elasticsearch/SchemaManager.java
  10. +27 −15 ...ava/pl/allegro/tech/hermes/tracker/elasticsearch/frontend/FrontendElasticsearchLogRepository.java
  11. +29 −20 ...pl/allegro/tech/hermes/tracker/elasticsearch/frontend/FrontendElasticsearchLogRepositoryTest.java
  12. +6 −4 .../java/pl/allegro/tech/hermes/tracker/elasticsearch/management/ElasticsearchLogRepositoryTest.java
  13. +7 −4 .../pl/allegro/tech/hermes/tracker/elasticsearch/management/MultiElasticsearchLogRepositoryTest.java
  14. +1 −1 hermes-tracker/src/main/java/pl/allegro/tech/hermes/tracker/frontend/LogRepository.java
  15. +1 −1 ...s-tracker/src/main/java/pl/allegro/tech/hermes/tracker/frontend/NoOperationPublishingTracker.java
  16. +2 −2 hermes-tracker/src/main/java/pl/allegro/tech/hermes/tracker/frontend/PublishingMessageTracker.java
  17. +1 −1 hermes-tracker/src/main/java/pl/allegro/tech/hermes/tracker/frontend/PublishingTracker.java
  18. +17 −12 hermes-tracker/src/test/java/pl/allegro/tech/hermes/tracker/frontend/AbstractLogRepositoryTest.java
Original file line number Diff line number Diff line change
@@ -18,16 +18,18 @@ public class PublishedMessageTrace implements MessageTrace {
private final String message;
private final String cluster;
private final String extraRequestHeaders;
private final String storageDatacenter;

@JsonCreator
public PublishedMessageTrace(@JsonProperty("messageId") String messageId,
@JsonProperty("timestamp") Long timestamp,
@JsonProperty("topicName") String topicName,
@JsonProperty("status") PublishedMessageTraceStatus status,
@JsonProperty("reason") String reason,
@JsonProperty("message") String message,
@JsonProperty("cluster") String cluster,
@JsonProperty("extraRequestHeaders") String extraRequestHeaders) {
@JsonProperty("timestamp") Long timestamp,
@JsonProperty("topicName") String topicName,
@JsonProperty("status") PublishedMessageTraceStatus status,
@JsonProperty("reason") String reason,
@JsonProperty("message") String message,
@JsonProperty("cluster") String cluster,
@JsonProperty("extraRequestHeaders") String extraRequestHeaders,
@JsonProperty("storageDc") String storageDatacenter) {
this.messageId = messageId;
this.timestamp = timestamp;
this.status = status;
@@ -36,6 +38,7 @@ public PublishedMessageTrace(@JsonProperty("messageId") String messageId,
this.message = message;
this.cluster = cluster;
this.extraRequestHeaders = extraRequestHeaders;
this.storageDatacenter = storageDatacenter;
}

public String getMessageId() {
@@ -76,6 +79,11 @@ public String getExtraRequestHeaders() {
return extraRequestHeaders;
}

@JsonProperty("storageDc")
public String getStorageDatacenter() {
return storageDatacenter;
}

@Override
public int hashCode() {
return Objects.hash(messageId);
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite
return new HandlersChainFactory(
topicsCache,
new MessageErrorProcessor(new ObjectMapper(), trackers, trackingHeadersExtractor),
new MessageEndProcessor(trackers, new BrokerListeners(), trackingHeadersExtractor),
new MessageEndProcessor(trackers, new BrokerListeners(), trackingHeadersExtractor, "dc"),
new MessageFactory(
new MessageValidators(Collections.emptyList()),
new MessageContentTypeEnforcer(),
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ public class BackupMessagesLoader {
private final int maxResendRetries;
private final Duration resendSleep;
private final Duration readTopicInfoSleep;
private final String datacenter;

private final Set<Topic> topicsAvailabilityCache = new HashSet<>();
private final AtomicReference<ConcurrentLinkedQueue<Pair<Message, CachedTopic>>> toResend = new AtomicReference<>();
@@ -64,7 +65,9 @@ public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer,
SchemaRepository schemaRepository,
SchemaExistenceEnsurer schemaExistenceEnsurer,
Trackers trackers,
BackupMessagesLoaderParameters backupMessagesLoaderParameters) {
BackupMessagesLoaderParameters backupMessagesLoaderParameters,
String datacenter
) {
this.brokerMessageProducer = brokerMessageProducer;
this.brokerListeners = brokerListeners;
this.topicsCache = topicsCache;
@@ -75,6 +78,7 @@ public BackupMessagesLoader(BrokerMessageProducer brokerMessageProducer,
this.resendSleep = backupMessagesLoaderParameters.getLoadingPauseBetweenResend();
this.readTopicInfoSleep = backupMessagesLoaderParameters.getLoadingWaitForBrokerTopicInfo();
this.maxResendRetries = backupMessagesLoaderParameters.getMaxResendRetries();
this.datacenter = datacenter;
}

public void loadMessages(List<BackupMessage> messages) {
@@ -261,7 +265,7 @@ public void onPublished(Message message, Topic topic) {
brokerTimer.close();
cachedTopic.incrementPublished();
brokerListeners.onAcknowledge(message, topic);
trackers.get(topic).logPublished(message.getId(), topic.getName(), "", Collections.emptyMap());
trackers.get(topic).logPublished(message.getId(), topic.getName(), "", datacenter, Collections.emptyMap());
}
});
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.frontend.validator.TopicMessageValidator;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.schema.SchemaExistenceEnsurer;
import pl.allegro.tech.hermes.schema.SchemaRepository;
@@ -53,9 +54,11 @@ public BackupMessagesLoader backupMessagesLoader(BrokerMessageProducer brokerMes
TopicsCache topicsCache,
SchemaRepository schemaRepository,
Trackers trackers,
LocalMessageStorageProperties localMessageStorageProperties) {
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider
) {
return new BackupMessagesLoader(brokerMessageProducer, brokerListeners, topicsCache, schemaRepository,
new SchemaExistenceEnsurer(schemaRepository), trackers, localMessageStorageProperties);
new SchemaExistenceEnsurer(schemaRepository), trackers, localMessageStorageProperties, datacenterNameProvider.getDatacenterName());
}

@Bean(initMethod = "extend")
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
import pl.allegro.tech.hermes.frontend.publishing.preview.MessagePreviewLog;
import pl.allegro.tech.hermes.frontend.server.auth.AuthenticationConfiguration;
import pl.allegro.tech.hermes.frontend.validator.MessageValidators;
import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
import pl.allegro.tech.hermes.schema.SchemaRepository;
import pl.allegro.tech.hermes.tracker.frontend.Trackers;

@@ -63,8 +64,9 @@ public ThroughputLimiter throughputLimiter(ThroughputProperties throughputProper

@Bean
public MessageEndProcessor messageEndProcessor(Trackers trackers, BrokerListeners brokerListeners,
TrackingHeadersExtractor trackingHeadersExtractor) {
return new MessageEndProcessor(trackers, brokerListeners, trackingHeadersExtractor);
TrackingHeadersExtractor trackingHeadersExtractor,
DatacenterNameProvider datacenterNameProvider) {
return new MessageEndProcessor(trackers, brokerListeners, trackingHeadersExtractor, datacenterNameProvider.getDatacenterName());
}

@Bean
Original file line number Diff line number Diff line change
@@ -23,24 +23,32 @@ public class MessageEndProcessor {
private final Trackers trackers;
private final BrokerListeners brokerListeners;
private final TrackingHeadersExtractor trackingHeadersExtractor;
private final String datacenter;

public MessageEndProcessor(Trackers trackers, BrokerListeners brokerListeners, TrackingHeadersExtractor trackingHeadersExtractor) {
public MessageEndProcessor(Trackers trackers, BrokerListeners brokerListeners, TrackingHeadersExtractor trackingHeadersExtractor, String datacenter) {
this.trackers = trackers;
this.brokerListeners = brokerListeners;
this.trackingHeadersExtractor = trackingHeadersExtractor;
this.datacenter = datacenter;
}

public void sent(HttpServerExchange exchange, AttachmentContent attachment) {
trackers.get(attachment.getTopic()).logPublished(attachment.getMessageId(),
attachment.getTopic().getName(), readHostAndPort(exchange),
attachment.getTopic().getName(),
readHostAndPort(exchange),
datacenter,
trackingHeadersExtractor.extractHeadersToLog(exchange.getRequestHeaders()));
sendResponse(exchange, attachment, StatusCodes.CREATED);
attachment.getCachedTopic().incrementPublished();
}

public void delayedSent(HttpServerExchange exchange, CachedTopic cachedTopic, Message message) {
trackers.get(cachedTopic.getTopic()).logPublished(message.getId(), cachedTopic.getTopic().getName(),
readHostAndPort(exchange), trackingHeadersExtractor.extractHeadersToLog(exchange.getRequestHeaders()));
trackers.get(cachedTopic.getTopic()).logPublished(
message.getId(),
cachedTopic.getTopic().getName(),
readHostAndPort(exchange),
datacenter,
trackingHeadersExtractor.extractHeadersToLog(exchange.getRequestHeaders()));
brokerListeners.onAcknowledge(message, cachedTopic.getTopic());
cachedTopic.incrementPublished();
}
Original file line number Diff line number Diff line change
@@ -66,6 +66,8 @@ public class BackupMessagesLoaderTest {

private final Topic topic = TopicBuilder.topic("pl.allegro.tech.hermes.test").build();

private final String datacenter = "dc1";

@Before
public void setUp() {
tempDir = Files.createTempDir();
@@ -104,7 +106,8 @@ public void shouldNotSendOldMessages() {
schemaRepository,
schemaExistenceEnsurer,
trackers,
localMessageStorageProperties
localMessageStorageProperties,
datacenter
);

messageRepository.save(messageOfAge(1), topic);
@@ -147,7 +150,8 @@ public void shouldSendAndResendMessages() {
schemaRepository,
schemaExistenceEnsurer,
trackers,
localMessageStorageProperties
localMessageStorageProperties,
datacenter
);

messageRepository.save(messageOfAge(1), topic);
@@ -176,7 +180,8 @@ public void shouldSendOnlyWhenBrokerTopicIsAvailable() {
schemaRepository,
schemaExistenceEnsurer,
trackers,
localMessageStorageProperties
localMessageStorageProperties,
datacenter
);
MessageRepository messageRepository = new ChronicleMapMessageRepository(
new File(tempDir.getAbsoluteFile(), "messages.dat"),
@@ -214,7 +219,8 @@ public void shouldSendMessageWithAllArgumentsFromBackupMessage() {
schemaRepository,
schemaExistenceEnsurer,
trackers,
localMessageStorageProperties
localMessageStorageProperties,
datacenter
);

messageRepository.save(messageOfAge(1), topic);
Original file line number Diff line number Diff line change
@@ -17,5 +17,6 @@ public interface LogSchemaAware {
String SOURCE_HOSTNAME = "hostname";
String REMOTE_HOSTNAME = "remote_hostname";
String EXTRA_REQUEST_HEADERS = "extra_request_headers";
String STORAGE_DATACENTER = "storageDc";

}
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.REMOTE_HOSTNAME;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.SOURCE_HOSTNAME;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.STATUS;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.STORAGE_DATACENTER;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.SUBSCRIPTION;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.TIMESTAMP;
import static pl.allegro.tech.hermes.tracker.elasticsearch.LogSchemaAware.TIMESTAMP_SECONDS;
@@ -142,6 +143,7 @@ private XContentBuilder prepareMapping(String indexType, Function<XContentBuilde
.startObject(SOURCE_HOSTNAME).field("type", "keyword").field("norms", false).endObject()
.startObject(REMOTE_HOSTNAME).field("type", "keyword").field("norms", false).endObject()
.startObject(REASON).field("type", "text").field("norms", false).endObject()
.startObject(STORAGE_DATACENTER).field("type", "text").field("norms", false).endObject()
.startObject(EXTRA_REQUEST_HEADERS).field("type", "text").field("norms", false).endObject();

return additionalMapping.apply(jsonBuilder).endObject().endObject().endObject();
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import pl.allegro.tech.hermes.api.PublishedMessageTraceStatus;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.metric.TrackerElasticSearchMetrics;
import pl.allegro.tech.hermes.tracker.BatchingLogRepository;
@@ -51,8 +50,9 @@ public void logPublished(String messageId,
long timestamp,
String topicName,
String hostname,
String storageDatacenter,
Map<String, String> extraRequestHeaders) {
queue.offer(build(() -> document(messageId, timestamp, topicName, SUCCESS, hostname, extraRequestHeaders)));
queue.offer(build(() -> success(messageId, timestamp, topicName, hostname, storageDatacenter, extraRequestHeaders)));
}

@Override
@@ -62,7 +62,7 @@ public void logError(String messageId,
String reason,
String hostname,
Map<String, String> extraRequestHeaders) {
queue.offer(build(() -> document(messageId, timestamp, topicName, ERROR, reason, hostname, extraRequestHeaders)));
queue.offer(build(() -> error(messageId, timestamp, topicName, reason, hostname, extraRequestHeaders)));
}

@Override
@@ -71,37 +71,49 @@ public void logInflight(String messageId,
String topicName,
String hostname,
Map<String, String> extraRequestHeaders) {
queue.offer(build(() -> document(messageId, timestamp, topicName, INFLIGHT, hostname, extraRequestHeaders)));
queue.offer(build(() -> inflight(messageId, timestamp, topicName, hostname, extraRequestHeaders)));
}

@Override
public void close() {
this.elasticClient.close();
}

private XContentBuilder document(String messageId,
long timestamp,
String topicName,
PublishedMessageTraceStatus status,
String hostname,
Map<String, String> extraRequestHeaders)
private XContentBuilder success(String messageId,
long timestamp,
String topicName,
String hostname,
String storageDatacenter,
Map<String, String> extraRequestHeaders)
throws IOException {
return notEndedDocument(messageId, timestamp, topicName, status.toString(), hostname, extraRequestHeaders).endObject();
return notEndedDocument(messageId, timestamp, topicName, SUCCESS.toString(), hostname, extraRequestHeaders)
.field(STORAGE_DATACENTER, storageDatacenter)
.endObject();
}

private XContentBuilder document(String messageId,

private XContentBuilder inflight(String messageId,
long timestamp,
String topicName,
PublishedMessageTraceStatus status,
String reason,
String hostname,
Map<String, String> extraRequestHeaders)
throws IOException {
return notEndedDocument(messageId, timestamp, topicName, status.toString(), hostname, extraRequestHeaders)
return notEndedDocument(messageId, timestamp, topicName, INFLIGHT.name(), hostname, extraRequestHeaders).endObject();
}

private XContentBuilder error(String messageId,
long timestamp,
String topicName,
String reason,
String hostname,
Map<String, String> extraRequestHeaders)
throws IOException {
return notEndedDocument(messageId, timestamp, topicName, ERROR.toString(), hostname, extraRequestHeaders)
.field(REASON, reason)
.endObject();
}


protected XContentBuilder notEndedDocument(String messageId,
long timestamp,
String topicName,
Loading