Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ dependencies {
testRuntimeOnly 'ch.qos.logback:logback-classic:1.4.11'
testImplementation 'org.mockito:mockito-core:3.12.4'

// OpenTelemetry core dependencies
implementation 'io.opentelemetry:opentelemetry-api:1.32.0'
implementation 'io.opentelemetry:opentelemetry-sdk:1.32.0'

// Exporters - choose based on your needs
implementation 'io.opentelemetry:opentelemetry-exporter-otlp:1.32.0'

// Instrumentation for GRPC
implementation 'io.opentelemetry.instrumentation:opentelemetry-grpc-1.6:1.32.0-alpha'

// Logging integration
//implementation 'io.opentelemetry:opentelemetry-extension-logging:1.32.0'
}

testlogger {
Expand Down Expand Up @@ -75,6 +87,9 @@ test {
failFast = true
testLogging.showStandardStreams = true
}
task fastTest( type: Test ) {
exclude '**/dst/**'
}

jar {
manifest {
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/com/p14n/postevent/ConsumerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
import com.p14n.postevent.catchup.UnprocessedSubmitter;
import com.p14n.postevent.catchup.grpc.CatchupGrpcClient;
import com.p14n.postevent.data.UnprocessedEventFinder;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.OpenTelemetry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,13 +36,15 @@ public class ConsumerClient implements AutoCloseable, MessageBroker<Transactiona
private List<AutoCloseable> closeables;
private TransactionalBroker tb;
SystemEventBroker seb;
OpenTelemetry ot;

public ConsumerClient(AsyncExecutor asyncExecutor) {
public ConsumerClient(OpenTelemetry ot, AsyncExecutor asyncExecutor) {
this.asyncExecutor = asyncExecutor;
this.ot = ot;
}

public ConsumerClient() {
this(new DefaultExecutor(2));
public ConsumerClient(OpenTelemetry ot) {
this(ot, new DefaultExecutor(2));
}

public void start(Set<String> topics, DataSource ds, String host, int port) {
Expand All @@ -59,10 +64,10 @@ public void start(Set<String> topics, DataSource ds, ManagedChannel channel) {
}

try {
tb = new TransactionalBroker(ds, asyncExecutor);
seb = new SystemEventBroker(asyncExecutor);
tb = new TransactionalBroker(ds, asyncExecutor, ot);
seb = new SystemEventBroker(asyncExecutor, ot);
var pb = new PersistentBroker<>(tb, ds, seb);
var client = new MessageBrokerGrpcClient(channel);
var client = new MessageBrokerGrpcClient(ot, channel);
var catchupClient = new CatchupGrpcClient(channel);

for (var topic : topics) {
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/com/p14n/postevent/ConsumerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.opentelemetry.api.OpenTelemetry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,15 +29,17 @@ public class ConsumerServer implements AutoCloseable {
private List<AutoCloseable> closeables;
private Server server;
private AsyncExecutor asyncExecutor;
OpenTelemetry ot;

public ConsumerServer(DataSource ds, ConfigData cfg) {
this(ds, cfg, new DefaultExecutor(2));
public ConsumerServer(DataSource ds, ConfigData cfg, OpenTelemetry ot) {
this(ds, cfg, new DefaultExecutor(2), ot);
}

public ConsumerServer(DataSource ds, ConfigData cfg, AsyncExecutor asyncExecutor) {
public ConsumerServer(DataSource ds, ConfigData cfg, AsyncExecutor asyncExecutor, OpenTelemetry ot) {
this.ds = ds;
this.cfg = cfg;
this.asyncExecutor = asyncExecutor;
this.ot = ot;
}

public void start(int port) throws IOException, InterruptedException {
Expand All @@ -45,7 +49,7 @@ public void start(int port) throws IOException, InterruptedException {
public void start(ServerBuilder<?> sb) throws IOException, InterruptedException {
logger.atInfo().log("Starting consumer server");

var mb = new EventMessageBroker(asyncExecutor);
var mb = new EventMessageBroker(asyncExecutor, ot);
var lc = new LocalConsumer<>(cfg, mb);
var grpcServer = new MessageBrokerGrpcServer(mb);
var catchupServer = new CatchupServer(ds);
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import com.p14n.postevent.data.PostEventConfig;
import com.p14n.postevent.data.UnprocessedEventFinder;

import io.opentelemetry.api.OpenTelemetry;

import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,15 +27,18 @@ public class LocalPersistentConsumer implements AutoCloseable, MessageBroker<Tra
private AsyncExecutor asyncExecutor;
private TransactionalBroker tb;
private List<AutoCloseable> closeables;
private OpenTelemetry ot;

public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, AsyncExecutor asyncExecutor) {
public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, AsyncExecutor asyncExecutor,
OpenTelemetry ot) {
this.ds = ds;
this.cfg = cfg;
this.asyncExecutor = asyncExecutor;
this.ot = ot;
}

public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg) {
this(ds, cfg, new DefaultExecutor(2));
public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, OpenTelemetry ot) {
this(ds, cfg, new DefaultExecutor(2), ot);
}

public void start() throws IOException, InterruptedException {
Expand All @@ -44,8 +50,8 @@ public void start() throws IOException, InterruptedException {
}

try {
tb = new TransactionalBroker(ds, asyncExecutor);
var seb = new SystemEventBroker(asyncExecutor);
tb = new TransactionalBroker(ds, asyncExecutor, ot);
var seb = new SystemEventBroker(asyncExecutor, ot);
var pb = new PersistentBroker<>(tb, ds, seb);
var lc = new LocalConsumer<>(cfg, pb);

Expand Down
70 changes: 50 additions & 20 deletions src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,30 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class DefaultMessageBroker<InT, OutT> implements MessageBroker<InT, OutT>, AutoCloseable {
import com.p14n.postevent.data.Traceable;
import com.p14n.postevent.telemetry.BrokerMetrics;
import static com.p14n.postevent.telemetry.OpenTelemetryFunctions.processWithTelemetry;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;

public abstract class DefaultMessageBroker<InT extends Traceable, OutT>
implements MessageBroker<InT, OutT>, AutoCloseable {

protected final ConcurrentHashMap<String, Set<MessageSubscriber<OutT>>> topicSubscribers = new ConcurrentHashMap<>();
protected final AtomicBoolean closed = new AtomicBoolean(false);
private final AsyncExecutor asyncExecutor;
protected final BrokerMetrics metrics;
protected final Tracer tracer;

public DefaultMessageBroker() {
this(new DefaultExecutor(2));
public DefaultMessageBroker(OpenTelemetry ot) {
this(new DefaultExecutor(2), ot);
}

public DefaultMessageBroker(AsyncExecutor asyncExecutor) {
public DefaultMessageBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot) {
this.asyncExecutor = asyncExecutor;
this.metrics = new BrokerMetrics(ot.getMeter("default-message-broker"));
this.tracer = ot.getTracer("default-message-broker");
}

protected boolean canProcess(String topic, InT message) {
Expand All @@ -42,24 +54,32 @@ public void publish(String topic, InT message) {
return;
}

metrics.recordPublished(topic);

// Deliver to all subscribers for this topic
Set<MessageSubscriber<OutT>> subscribers = topicSubscribers.get(topic);
if (subscribers != null) {
for (MessageSubscriber<OutT> subscriber : subscribers) {
asyncExecutor.submit(() -> {
try {
subscriber.onMessage(convert(message));
return null;
} catch (Exception e) {
try {
subscriber.onError(e);
} catch (Exception ignored) {
// If error handling fails, we ignore it to protect other subscribers
}
}
return null;
});
}

processWithTelemetry(tracer, message, "publish_message", () -> {
for (MessageSubscriber<OutT> subscriber : subscribers) {
asyncExecutor.submit(() -> processWithTelemetry(tracer, message, "process_message",
() -> {
try {
subscriber.onMessage(convert(message));
metrics.recordReceived(topic);
return true;
} catch (Exception e) {
try {
subscriber.onError(e);
} catch (Exception ignored) {
}
throw e;
}
}));
}
return null;
});

}
}

Expand All @@ -77,9 +97,15 @@ public boolean subscribe(String topic, MessageSubscriber<OutT> subscriber) {
throw new IllegalArgumentException("Topic cannot be null");
}

return topicSubscribers
boolean added = topicSubscribers
.computeIfAbsent(topic, k -> new CopyOnWriteArraySet<>())
.add(subscriber);

if (added) {
metrics.recordSubscriberAdded(topic);
}

return added;
}

@Override
Expand All @@ -98,6 +124,9 @@ public boolean unsubscribe(String topic, MessageSubscriber<OutT> subscriber) {
if (subscribers.isEmpty()) {
topicSubscribers.remove(topic);
}
if (removed) {
metrics.recordSubscriberRemoved(topic);
}
return removed;
}
return false;
Expand All @@ -108,4 +137,5 @@ public void close() {
closed.set(true);
topicSubscribers.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import com.p14n.postevent.data.Event;

import io.opentelemetry.api.OpenTelemetry;

public class EventMessageBroker extends DefaultMessageBroker<Event, Event> {

public EventMessageBroker(AsyncExecutor asyncExecutor) {
super(asyncExecutor);
public EventMessageBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot) {
super(asyncExecutor, ot);
}

public EventMessageBroker() {
super();
public EventMessageBroker(OpenTelemetry ot) {
super(ot);
}

@Override
public Event convert(Event m) {
return m;
}

}
19 changes: 18 additions & 1 deletion src/main/java/com/p14n/postevent/broker/SystemEvent.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.p14n.postevent.broker;

public enum SystemEvent {
import com.p14n.postevent.data.Traceable;

public enum SystemEvent implements Traceable {

CatchupRequired,
UnprocessedCheckRequired;
Expand All @@ -11,4 +13,19 @@ public SystemEvent withTopic(String topic) {
this.topic = topic;
return this;
}

@Override
public String id() {
return this.toString();
}

@Override
public String topic() {
return topic;
}

@Override
public String subject() {
return "";
}
}
10 changes: 6 additions & 4 deletions src/main/java/com/p14n/postevent/broker/SystemEventBroker.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.p14n.postevent.broker;

import io.opentelemetry.api.OpenTelemetry;

public class SystemEventBroker extends
DefaultMessageBroker<SystemEvent, SystemEvent> {

public SystemEventBroker(AsyncExecutor asyncExecutor) {
super(asyncExecutor);
public SystemEventBroker(AsyncExecutor asyncExecutor, OpenTelemetry ot) {
super(asyncExecutor, ot);
}

public SystemEventBroker() {
super();
public SystemEventBroker(OpenTelemetry ot) {
super(ot);
}

@Override
Expand Down
Loading