Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions src/main/java/com/p14n/postevent/ConsumerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class ConsumerClient implements AutoCloseable, MessageBroker<TransactionalEvent, TransactionalEvent> {
Expand All @@ -32,26 +32,25 @@ public class ConsumerClient implements AutoCloseable, MessageBroker<Transactiona
private AsyncExecutor asyncExecutor;
private List<AutoCloseable> closeables;
private TransactionalBroker tb;
private String topic;
SystemEventBroker seb;

public ConsumerClient(String topic, AsyncExecutor asyncExecutor) {
public ConsumerClient(AsyncExecutor asyncExecutor) {
this.asyncExecutor = asyncExecutor;
this.topic = topic;
}

public ConsumerClient(String topic) {
this(topic, new DefaultExecutor(2));
public ConsumerClient() {
this(new DefaultExecutor(2));
}

public void start(DataSource ds, String host, int port) {
start(ds, ManagedChannelBuilder.forAddress(host, port)
public void start(Set<String> topics, DataSource ds, String host, int port) {
start(topics, ds, ManagedChannelBuilder.forAddress(host, port)
.keepAliveTime(1, TimeUnit.HOURS)
.keepAliveTimeout(30, TimeUnit.SECONDS)
.usePlaintext()
.build());
}

public void start(DataSource ds, ManagedChannel channel) {
public void start(Set<String> topics, DataSource ds, ManagedChannel channel) {
logger.atInfo().log("Starting consumer client");

if (tb != null) {
Expand All @@ -61,12 +60,14 @@ public void start(DataSource ds, ManagedChannel channel) {

try {
tb = new TransactionalBroker(ds, asyncExecutor);
var seb = new SystemEventBroker(asyncExecutor);
seb = new SystemEventBroker(asyncExecutor);
var pb = new PersistentBroker<>(tb, ds, seb);
var client = new MessageBrokerGrpcClient(channel, topic);
var client = new MessageBrokerGrpcClient(channel);
var catchupClient = new CatchupGrpcClient(channel);

client.subscribe(pb);
for (var topic : topics) {
client.subscribe(topic, pb);
}
seb.subscribe(new CatchupService(ds, catchupClient, seb));
seb.subscribe(new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb));

Expand Down Expand Up @@ -105,7 +106,7 @@ public void close() {
}

@Override
public void publish(TransactionalEvent message) {
public void publish(String topic, TransactionalEvent message) {
try {
Publisher.publish(message.event(), message.connection(), topic);
} catch (SQLException e) {
Expand All @@ -114,13 +115,15 @@ public void publish(TransactionalEvent message) {
}

@Override
public boolean subscribe(MessageSubscriber<TransactionalEvent> subscriber) {
return tb.subscribe(subscriber);
public boolean subscribe(String topic, MessageSubscriber<TransactionalEvent> subscriber) {
var subscribed = tb.subscribe(topic, subscriber);
seb.publish(SystemEvent.CatchupRequired.withTopic(topic));
return subscribed;
}

@Override
public boolean unsubscribe(MessageSubscriber<TransactionalEvent> subscriber) {
return tb.unsubscribe(subscriber);
public boolean unsubscribe(String topic, MessageSubscriber<TransactionalEvent> subscriber) {
return tb.unsubscribe(topic, subscriber);
}

@Override
Expand Down
38 changes: 26 additions & 12 deletions src/main/java/com/p14n/postevent/LocalConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
import java.io.IOException;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalConsumer<OutT> implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(LocalConsumer.class);
private final DebeziumServer debezium;
private final MessageBroker<Event, OutT> broker;
private final PostEventConfig config;
Expand All @@ -24,18 +29,27 @@ public LocalConsumer(PostEventConfig config, MessageBroker<Event, OutT> broker)
}

public void start() throws IOException, InterruptedException {
db.setupAll(config.name());
Consumer<ChangeEvent<String, String>> consumer = record -> {
try {
Event event = changeEventToEvent(record);
if (event != null) {
broker.publish(event);
logger.atInfo()
.addArgument(config.topics()) // renamed from name()
.log("Starting local consumer for {}");

try {
db.setupAll(config.topics()); // renamed from name()
Consumer<ChangeEvent<String, String>> consumer = record -> {
try {
Event event = changeEventToEvent(record);
if (event != null) {
broker.publish(event.topic(), event);
}
} catch (Exception e) {
throw new RuntimeException("Failed to process change event", e);
}
} catch (Exception e) {
throw new RuntimeException("Failed to process change event", e);
}
};
debezium.start(config, consumer);
};
debezium.start(config, consumer);
} catch (IOException | InterruptedException e) {
logger.atError().setCause(e).log("Failed to start local consumer");
throw e;
}
}

public void stop() throws IOException {
Expand All @@ -46,4 +60,4 @@ public void stop() throws IOException {
public void close() throws IOException {
stop();
}
}
}
12 changes: 6 additions & 6 deletions src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,22 @@ public void close() {
}

@Override
public void publish(TransactionalEvent message) {
public void publish(String topic, TransactionalEvent message) {
try {
Publisher.publish(message.event(), message.connection(), cfg.name());
Publisher.publish(message.event(), message.connection(), message.event().topic()); // renamed from name()
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean subscribe(MessageSubscriber<TransactionalEvent> subscriber) {
return tb.subscribe(subscriber);
public boolean subscribe(String topic, MessageSubscriber<TransactionalEvent> subscriber) {
return tb.subscribe(topic, subscriber);
}

@Override
public boolean unsubscribe(MessageSubscriber<TransactionalEvent> subscriber) {
return tb.unsubscribe(subscriber);
public boolean unsubscribe(String topic, MessageSubscriber<TransactionalEvent> subscriber) {
return tb.unsubscribe(topic, subscriber);
}

@Override
Expand Down
91 changes: 50 additions & 41 deletions src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.p14n.postevent.broker;

import java.util.concurrent.Callable;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class DefaultMessageBroker<InT, OutT> implements MessageBroker<InT, OutT>, AutoCloseable {

protected final CopyOnWriteArraySet<MessageSubscriber<OutT>> subscribers = new CopyOnWriteArraySet<>();
protected final ConcurrentHashMap<String, Set<MessageSubscriber<OutT>>> topicSubscribers = new ConcurrentHashMap<>();
protected final AtomicBoolean closed = new AtomicBoolean(false);

private final AsyncExecutor asyncExecutor;

public DefaultMessageBroker() {
Expand All @@ -19,7 +19,7 @@ public DefaultMessageBroker(AsyncExecutor asyncExecutor) {
this.asyncExecutor = asyncExecutor;
}

protected boolean canProcess(InT message) {
protected boolean canProcess(String topic, InT message) {
if (closed.get()) {
throw new IllegalStateException("Broker is closed");
}
Expand All @@ -28,42 +28,43 @@ protected boolean canProcess(InT message) {
throw new IllegalArgumentException("Message cannot be null");
}

// If no subscribers, message is silently dropped
if (subscribers.isEmpty()) {
return false;
if (topic == null) {
throw new IllegalArgumentException("Topic cannot be null");
}
return true;

// If no subscribers for this topic, message is silently dropped
return topicSubscribers.containsKey(topic) && !topicSubscribers.get(topic).isEmpty();
}

@Override
public void publish(InT message) {

if (!canProcess(message)) {
public void publish(String topic, InT message) {
if (!canProcess(topic, message)) {
return;
}
// Deliver to all subscribers
for (MessageSubscriber<OutT> subscriber : subscribers) {

asyncExecutor.submit(() -> {
try {
subscriber.onMessage(convert(message));
return null;
} catch (Exception e) {
// Deliver to all subscribers for this topic
Set<MessageSubscriber<OutT>> subscribers = topicSubscribers.get(topic);
if (subscribers != null) {
for (MessageSubscriber<OutT> subscriber : subscribers) {
asyncExecutor.submit(() -> {
try {
subscriber.onError(e);
} catch (Exception ignored) {
// If error handling fails, we ignore it to protect other subscribers
subscriber.onMessage(convert(message));
return null;
} catch (Exception e) {
try {
subscriber.onError(e);
} catch (Exception ignored) {
// If error handling fails, we ignore it to protect other subscribers
}
}
}
return null;
});

return null;
});
}
}
}

@Override
public boolean subscribe(MessageSubscriber<OutT> subscriber) {
public boolean subscribe(String topic, MessageSubscriber<OutT> subscriber) {
if (closed.get()) {
throw new IllegalStateException("Broker is closed");
}
Expand All @@ -72,31 +73,39 @@ public boolean subscribe(MessageSubscriber<OutT> subscriber) {
throw new IllegalArgumentException("Subscriber cannot be null");
}

return subscribers.add(subscriber);
if (topic == null) {
throw new IllegalArgumentException("Topic cannot be null");
}

return topicSubscribers
.computeIfAbsent(topic, k -> new CopyOnWriteArraySet<>())
.add(subscriber);
}

@Override
public boolean unsubscribe(MessageSubscriber<OutT> subscriber) {
public boolean unsubscribe(String topic, MessageSubscriber<OutT> subscriber) {
if (subscriber == null) {
throw new IllegalArgumentException("Subscriber cannot be null");
}

return subscribers.remove(subscriber);
if (topic == null) {
throw new IllegalArgumentException("Topic cannot be null");
}

Set<MessageSubscriber<OutT>> subscribers = topicSubscribers.get(topic);
if (subscribers != null) {
boolean removed = subscribers.remove(subscriber);
if (subscribers.isEmpty()) {
topicSubscribers.remove(topic);
}
return removed;
}
return false;
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
// Notify all subscribers of shutdown
Throwable shutdownError = new IllegalStateException("Message broker is shutting down");
for (MessageSubscriber<OutT> subscriber : subscribers) {
try {
subscriber.onError(shutdownError);
} catch (Exception ignored) {
// Ignore errors during shutdown notification
}
}
subscribers.clear();
}
closed.set(true);
topicSubscribers.clear();
}
}
43 changes: 29 additions & 14 deletions src/main/java/com/p14n/postevent/broker/MessageBroker.java
Original file line number Diff line number Diff line change
@@ -1,37 +1,52 @@
package com.p14n.postevent.broker;

/**
* Thread-safe message broker interface for publishing messages and managing subscribers.
* @param <T> The type of messages this broker handles
* Thread-safe message broker interface for publishing messages and managing
* subscribers.
*
* @param <InT> The type of messages this broker accepts
* @param <OutT> The type of messages this broker delivers to subscribers
*/
public interface MessageBroker<InT,OutT> {
public interface MessageBroker<InT, OutT> {

/**
* Publishes a message to all current subscribers.
* If no subscribers are present, the message is silently dropped.
* Publishes a message to all subscribers of the specified topic.
* If no subscribers are present for the topic, the message is silently dropped.
*
* @param topic The topic to publish to
* @param message The message to publish
*/
void publish(InT message);
void publish(String topic, InT message);

/**
* Adds a subscriber to receive messages.
* Adds a subscriber to receive messages for the specified topic.
*
* @param topic The topic to subscribe to
* @param subscriber The subscriber to add
* @return true if the subscriber was added, false if it was already present
*/
boolean subscribe(MessageSubscriber<OutT> subscriber);
boolean subscribe(String topic, MessageSubscriber<OutT> subscriber);

/**
* Removes a subscriber from receiving messages.
* Removes a subscriber from receiving messages for the specified topic.
*
* @param topic The topic to unsubscribe from
* @param subscriber The subscriber to remove
* @return true if the subscriber was removed, false if it wasn't present
*/
boolean unsubscribe(MessageSubscriber<OutT> subscriber);
boolean unsubscribe(String topic, MessageSubscriber<OutT> subscriber);

/**
* Closes the broker and releases any resources.
* After closing, no more messages can be published or subscribers added.
*/
void close();

/**
* Converts an incoming message to the outgoing message type
*
* @param m The message to convert
* @return The converted message
*/
OutT convert(InT m);
}
Loading