Conversation
Reviewer's Guide by SourceryThis pull request introduces OpenTelemetry instrumentation to the post-event system. It adds metrics for message brokers and tracing spans to track message publishing, receiving, and subscriber activity. The changes include adding new dependencies, creating a telemetry configuration, updating broker implementations, and modifying client and server components to use OpenTelemetry. Sequence diagram for publishing a message with tracingsequenceDiagram
participant Broker as DefaultMessageBroker
participant Tracer
participant Subscriber
Broker->>Tracer: startSpan("publish_message")
activate Tracer
Tracer-->>Broker: Span
deactivate Tracer
Broker->>Subscriber: async submit
activate Subscriber
Subscriber->>Tracer: startSpan("process_message")
activate Tracer
Tracer-->>Subscriber: ChildSpan
deactivate Tracer
Subscriber->>Subscriber: onMessage(convert(message))
Subscriber->>Tracer: end()
activate Tracer
deactivate Tracer
Subscriber-->>Broker: return
deactivate Subscriber
Broker->>Tracer: end()
activate Tracer
deactivate Tracer
Updated class diagram for DefaultMessageBrokerclassDiagram
class DefaultMessageBroker {
-ConcurrentHashMap~String, Set~MessageSubscriber~OutT~~~ topicSubscribers
-AtomicBoolean closed
-AsyncExecutor asyncExecutor
-BrokerMetrics metrics
-Tracer tracer
+DefaultMessageBroker(TelemetryConfig telemetryConfig)
+DefaultMessageBroker(AsyncExecutor asyncExecutor, TelemetryConfig telemetryConfig)
#canProcess(String topic, InT message) bool
+publish(String topic, InT message)
+subscribe(String topic, MessageSubscriber~OutT~ subscriber) bool
+unsubscribe(String topic, MessageSubscriber~OutT~ subscriber) bool
+close()
#getEventId(InT message) String
}
class BrokerMetrics {
-LongCounter publishedMessages
-LongCounter receivedMessages
-LongUpDownCounter activeSubscribers
+BrokerMetrics(Meter meter)
+recordPublished(String topic)
+recordReceived(String topic)
+recordSubscriberAdded(String topic)
+recordSubscriberRemoved(String topic)
}
class TelemetryConfig{
<<interface>>
+getMeter() Meter
+getTracer() Tracer
+getOpenTelemetry() OpenTelemetry
}
DefaultMessageBroker -- BrokerMetrics : has
DefaultMessageBroker -- TelemetryConfig : uses
Updated class diagram for TransactionalBrokerclassDiagram
class TransactionalBroker {
-DataSource ds
+TransactionalBroker(DataSource ds, AsyncExecutor asyncExecutor, TelemetryConfig telemetryConfig)
+TransactionalBroker(DataSource ds, TelemetryConfig telemetryConfig)
+publish(String topic, Event message)
+convert(Event m) TransactionalEvent
+getEventId(Event message) String
}
TransactionalBroker --|> DefaultMessageBroker
Updated class diagram for EventMessageBrokerclassDiagram
class EventMessageBroker {
+EventMessageBroker(AsyncExecutor asyncExecutor, TelemetryConfig telemetryConfig)
+EventMessageBroker(TelemetryConfig telemetryConfig)
+convert(Event m) Event
+getEventId(Event message) String
}
EventMessageBroker --|> DefaultMessageBroker
Updated class diagram for SystemEventBrokerclassDiagram
class SystemEventBroker {
+SystemEventBroker(AsyncExecutor asyncExecutor, TelemetryConfig telemetryConfig)
+SystemEventBroker(TelemetryConfig telemetryConfig)
+convert(SystemEvent m) SystemEvent
+subscribe(MessageSubscriber~SystemEvent~ subscriber)
+getEventId(SystemEvent message) String
}
SystemEventBroker --|> DefaultMessageBroker
Updated class diagram for ConsumerClientclassDiagram
class ConsumerClient {
-List~AutoCloseable~ closeables
-TransactionalBroker tb
-SystemEventBroker seb
-TelemetryConfig telemetryConfig
+ConsumerClient(TelemetryConfig telemetryConfig, AsyncExecutor asyncExecutor)
+ConsumerClient(TelemetryConfig telemetryConfig)
+start(Set~String~ topics, DataSource ds, String host, int port)
+start(Set~String~ topics, DataSource ds, ManagedChannel channel)
+close()
}
Updated class diagram for LocalPersistentConsumerclassDiagram
class LocalPersistentConsumer {
-DataSource ds
-PostEventConfig cfg
-AsyncExecutor asyncExecutor
-TransactionalBroker tb
-List~AutoCloseable~ closeables
-TelemetryConfig telemetryConfig
+LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, AsyncExecutor asyncExecutor, TelemetryConfig telemetryConfig)
+LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, TelemetryConfig telemetryConfig)
+start() void
+close() void
}
Updated class diagram for ConsumerServerclassDiagram
class ConsumerServer {
-List~AutoCloseable~ closeables
-Server server
-AsyncExecutor asyncExecutor
-TelemetryConfig telemetryConfig
+ConsumerServer(DataSource ds, ConfigData cfg, TelemetryConfig telemetryConfig)
+ConsumerServer(DataSource ds, ConfigData cfg, AsyncExecutor asyncExecutor, TelemetryConfig telemetryConfig)
+start(int port) void
+start(ServerBuilder~?~ sb) void
+close() void
}
Updated class diagram for MessageBrokerGrpcClientclassDiagram
class MessageBrokerGrpcClient {
-ManagedChannel channel
+MessageBrokerGrpcClient(TelemetryConfig telemetryConfig, String host, int port)
+MessageBrokerGrpcClient(TelemetryConfig telemetryConfig, ManagedChannel channel)
}
Class diagram for TelemetryConfig and DefaultTelemetryConfigclassDiagram
class DefaultTelemetryConfig {
-String INSTRUMENTATION_NAME
-OpenTelemetry openTelemetry
-Meter meter
-Tracer tracer
+DefaultTelemetryConfig(String serviceName)
+getMeter() Meter
+getTracer() Tracer
+getOpenTelemetry() OpenTelemetry
}
class TelemetryConfig{
<<interface>>
+getMeter() Meter
+getTracer() Tracer
+getOpenTelemetry() OpenTelemetry
}
TelemetryConfig <|-- DefaultTelemetryConfig
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey @p14n - I've reviewed your changes - here's some feedback:
Overall Comments:
- Consider using a dedicated thread pool for telemetry to avoid impacting the performance of the main application logic.
- The added metrics look good, but consider adding more context-specific attributes to the spans for better observability.
Here's what I looked at during the review
- 🟡 General issues: 2 issues found
- 🟢 Security: all looks good
- 🟢 Testing: all looks good
- 🟡 Complexity: 2 issues found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
|
||
| metrics.recordPublished(topic); | ||
|
|
||
| Span span = tracer.spanBuilder("publish_message") |
There was a problem hiding this comment.
issue (bug_risk): Parent span is not ended.
The span created for publishing messages is started but never ended. This may lead to hanging spans in your telemetry. Consider ensuring span.end() is called after the subscriber tasks are submitted, for example by wrapping the publish logic in a try/finally block.
|
|
||
| metrics.recordPublished(topic); | ||
|
|
||
| Span span = tracer.spanBuilder("publish_message_tx") |
There was a problem hiding this comment.
issue (bug_risk): Transactional parent span is never ended.
Similar to the non-transactional broker, the parent span in the transactional publish method is started but not ended. Adding an appropriate span.end() call will ensure proper trace completion.
| Set<MessageSubscriber<OutT>> subscribers = topicSubscribers.get(topic); | ||
| if (subscribers != null) { | ||
| for (MessageSubscriber<OutT> subscriber : subscribers) { | ||
| asyncExecutor.submit(() -> { |
There was a problem hiding this comment.
issue (complexity): Consider extracting the telemetry boilerplate from the lambda into a helper method to reduce nesting and complexity in the publish method, improving readability and maintainability without altering functionality..
Consider extracting the telemetry boilerplate (span creation, scoping, and error handling) out of the lambda to a helper. For example, extract the processing of a subscriber’s message into a private method:
```java
private boolean processMessageWithTelemetry(MessageSubscriber<OutT> subscriber, String topic, InT message) {
Span childSpan = tracer.spanBuilder("process_message")
.setAttribute("topic", topic)
.setAttribute("event.id", getEventId(message))
.startSpan();
try (Scope childScope = childSpan.makeCurrent()) {
subscriber.onMessage(convert(message));
metrics.recordReceived(topic);
return true;
} catch (Exception e) {
childSpan.recordException(e);
try {
subscriber.onError(e);
} catch (Exception ignored) {
// ignore error handling failure
}
return false;
} finally {
childSpan.end();
}
}Then in publish(), replace the inner lambda code with a call to this helper:
@Override
public void publish(String topic, InT message) {
if (!canProcess(topic, message)) {
return;
}
metrics.recordPublished(topic);
Span span = tracer.spanBuilder("publish_message")
.setAttribute("topic", topic)
.setAttribute("event.id", getEventId(message))
.startSpan();
try {
Set<MessageSubscriber<OutT>> subscribers = topicSubscribers.get(topic);
if (subscribers != null) {
for (MessageSubscriber<OutT> subscriber : subscribers) {
asyncExecutor.submit(() -> processMessageWithTelemetry(subscriber, topic, message));
}
}
} finally {
span.end();
}
}This refactoring extracts the telemetry logic out of the lambda, reducing the nesting and overall complexity, while preserving all functionality.
| // Deliver to all subscribers | ||
| for (MessageSubscriber<TransactionalEvent> subscriber : topicSubscribers.get(topic)) { | ||
| try (Connection c = ds.getConnection()) { | ||
| var op = new OrderedProcessor((connection, event) -> { |
There was a problem hiding this comment.
issue (complexity): Consider extracting the telemetry logic into helper methods to reduce nested try-catch-finally blocks and improve code clarity by centralizing span management and exception handling, for example, by using a dedicated method for child span handling and refactoring the lambda expression in the publish method to use this method, which reduces duplication and nesting depth.
Extracting the telemetry logic into helper methods can reduce the nested try-catch-finally blocks and clarify the core message processing. For example, you could extract the child span handling into a dedicated method:
private boolean processWithTelemetry(String spanName, String topic, String eventId, Supplier<Boolean> action) {
Span span = tracer.spanBuilder(spanName)
.setAttribute("topic", topic)
.setAttribute("event.id", eventId)
.startSpan();
try (Scope scope = span.makeCurrent()) {
return action.get();
} catch (Exception e) {
span.recordException(e);
throw e;
} finally {
span.end();
}
}Then, in your publish method, you could refactor the lambda as follows:
var op = new OrderedProcessor((connection, event) ->
processWithTelemetry("process_message", topic, getEventId(message), () -> {
subscriber.onMessage(new TransactionalEvent(connection, event));
metrics.recordReceived(topic);
return true;
})
);This approach centralizes the telemetry span creation and ending logic, reducing duplication and the depth of nested exception handling.
Summary by Sourcery
Add OpenTelemetry instrumentation to the message broker system to enable observability and tracing
New Features:
Enhancements:
Build: