Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,8 @@ replay_pid*
.idea

### vscode ###
.vscode
.vscode

### clojure ###
.clj-kondo/.cache
.lsp
9 changes: 7 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ repositories {

dependencies {
implementation 'io.debezium:debezium-api:3.0.1.Final'
implementation 'io.debezium:debezium-embedded:3.0.1.Final'
implementation ('io.debezium:debezium-embedded:3.0.1.Final') {
exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet'
exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2'
exclude group: 'org.eclipse.jetty'
}
implementation 'io.debezium:debezium-connector-postgres:3.0.1.Final'
implementation 'io.debezium:debezium-storage-jdbc:3.0.1.Final'
implementation 'org.slf4j:slf4j-api:2.0.9'

implementation 'com.zaxxer:HikariCP:6.2.1'

// gRPC dependencies
implementation 'io.grpc:grpc-netty-shaded:1.53.0'
implementation 'io.grpc:grpc-protobuf:1.53.0'
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/com/p14n/postevent/LocalConsumer.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
package com.p14n.postevent;

import com.p14n.postevent.broker.DefaultMessageBroker;
import com.p14n.postevent.broker.MessageBroker;
import com.p14n.postevent.broker.MessageSubscriber;
import com.p14n.postevent.data.ConfigData;
import com.p14n.postevent.data.Event;
import com.p14n.postevent.data.PostEventConfig;
import com.p14n.postevent.db.DatabaseSetup;
import com.p14n.postevent.debezium.DebeziumServer;
import static com.p14n.postevent.debezium.Functions.changeEventToEvent;
import io.debezium.engine.ChangeEvent;
import java.io.IOException;
import java.util.Properties;
import java.util.function.Consumer;

public class LocalConsumer {
public class LocalConsumer<OutT> implements AutoCloseable {
private final DebeziumServer debezium;
private final MessageBroker<Event> broker;
private final MessageBroker<Event,OutT> broker;
private final PostEventConfig config;
private final DatabaseSetup db;

public LocalConsumer(PostEventConfig config, MessageBroker<Event> broker) {
public LocalConsumer(PostEventConfig config, MessageBroker<Event,OutT> broker) {
this.config = config;
this.broker = broker;
this.db = new DatabaseSetup(config);
this.debezium = new DebeziumServer();
}

public void start() throws IOException, InterruptedException {
db.setupAll(config.name());
Consumer<ChangeEvent<String, String>> consumer = record -> {
try {
Event event = changeEventToEvent(record);
Expand All @@ -37,4 +45,9 @@ public void start() throws IOException, InterruptedException {
public void stop() throws IOException {
debezium.stop();
}

@Override
public void close() throws IOException {
stop();
}
}
11 changes: 10 additions & 1 deletion src/main/java/com/p14n/postevent/Publisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.p14n.postevent.data.Event;
import com.p14n.postevent.db.SQL;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand All @@ -14,6 +15,9 @@
*/
public class Publisher {

private Publisher(){

}
/**
* Publishes an event to the specified topic table.
*
Expand All @@ -24,7 +28,7 @@ public class Publisher {
* @throws IllegalArgumentException if the topic is null, empty, or contains
* invalid characters
*/
public void publish(Event event, Connection connection, String topic) throws SQLException {
public static void publish(Event event, Connection connection, String topic) throws SQLException {
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty");
}
Expand All @@ -41,4 +45,9 @@ public void publish(Event event, Connection connection, String topic) throws SQL
}
}

public static void publish(Event event, DataSource ds, String topic) throws SQLException {
try (Connection c = ds.getConnection()) {
publish(event, c, topic);
}
}
}
35 changes: 19 additions & 16 deletions src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,12 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Default implementation of MessageBroker using thread-safe collections.
*
* @param <T> The type of messages this broker handles
*/
public class DefaultMessageBroker<T> implements MessageBroker<T> {
public abstract class DefaultMessageBroker<InT,OutT> implements MessageBroker<InT,OutT>,AutoCloseable {

protected final CopyOnWriteArraySet<MessageSubscriber<T>> subscribers = new CopyOnWriteArraySet<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
protected final CopyOnWriteArraySet<MessageSubscriber<OutT>> subscribers = new CopyOnWriteArraySet<>();
protected final AtomicBoolean closed = new AtomicBoolean(false);

@Override
public void publish(T message) {
protected boolean canProcess(InT message){
if (closed.get()) {
throw new IllegalStateException("Broker is closed");
}
Expand All @@ -25,13 +19,22 @@ public void publish(T message) {

// If no subscribers, message is silently dropped
if (subscribers.isEmpty()) {
return;
return false;
}
return true;

}

@Override
public void publish(InT message) {

if(!canProcess(message)){
return;
}
// Deliver to all subscribers
for (MessageSubscriber<T> subscriber : subscribers) {
for (MessageSubscriber<OutT> subscriber : subscribers) {
try {
subscriber.onMessage(message);
subscriber.onMessage(convert(message));
} catch (Exception e) {
try {
subscriber.onError(e);
Expand All @@ -43,7 +46,7 @@ public void publish(T message) {
}

@Override
public boolean subscribe(MessageSubscriber<T> subscriber) {
public boolean subscribe(MessageSubscriber<OutT> subscriber) {
if (closed.get()) {
throw new IllegalStateException("Broker is closed");
}
Expand All @@ -56,7 +59,7 @@ public boolean subscribe(MessageSubscriber<T> subscriber) {
}

@Override
public boolean unsubscribe(MessageSubscriber<T> subscriber) {
public boolean unsubscribe(MessageSubscriber<OutT> subscriber) {
if (subscriber == null) {
throw new IllegalArgumentException("Subscriber cannot be null");
}
Expand All @@ -69,7 +72,7 @@ public void close() {
if (closed.compareAndSet(false, true)) {
// Notify all subscribers of shutdown
Throwable shutdownError = new IllegalStateException("Message broker is shutting down");
for (MessageSubscriber<T> subscriber : subscribers) {
for (MessageSubscriber<OutT> subscriber : subscribers) {
try {
subscriber.onError(shutdownError);
} catch (Exception ignored) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/p14n/postevent/broker/EventMessageBroker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.p14n.postevent.broker;

import com.p14n.postevent.data.Event;

public class EventMessageBroker extends DefaultMessageBroker<Event,Event> {
@Override
public Event convert(Event m) {
return m;
}
}
10 changes: 6 additions & 4 deletions src/main/java/com/p14n/postevent/broker/MessageBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,34 @@
* Thread-safe message broker interface for publishing messages and managing subscribers.
* @param <T> The type of messages this broker handles
*/
public interface MessageBroker<T> {
public interface MessageBroker<InT,OutT> {

/**
* Publishes a message to all current subscribers.
* If no subscribers are present, the message is silently dropped.
* @param message The message to publish
*/
void publish(T message);
void publish(InT message);

/**
* Adds a subscriber to receive messages.
* @param subscriber The subscriber to add
* @return true if the subscriber was added, false if it was already present
*/
boolean subscribe(MessageSubscriber<T> subscriber);
boolean subscribe(MessageSubscriber<OutT> subscriber);

/**
* Removes a subscriber from receiving messages.
* @param subscriber The subscriber to remove
* @return true if the subscriber was removed, false if it wasn't present
*/
boolean unsubscribe(MessageSubscriber<T> subscriber);
boolean unsubscribe(MessageSubscriber<OutT> subscriber);

/**
* Closes the broker and releases any resources.
* After closing, no more messages can be published or subscribers added.
*/
void close();

OutT convert(InT m);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ public interface MessageSubscriber<T> {
* Called when an error occurs that prevents further message processing.
* @param error The error that occurred
*/
void onError(Throwable error);
default void onError(Throwable error){

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.p14n.postevent.broker;

import com.p14n.postevent.data.Event;
import com.p14n.postevent.processor.OrderedProcessor;

import javax.sql.DataSource;
import java.sql.Connection;
import java.util.function.BiFunction;

public class TransactionalBroker extends DefaultMessageBroker<Event,TransactionalEvent> {
private final DataSource ds;

public TransactionalBroker(DataSource ds){
this.ds = ds;
}
@Override
public void publish(Event message) {

if (!canProcess(message)) {
return;
}

// Deliver to all subscribers
for (MessageSubscriber<TransactionalEvent> subscriber : subscribers) {
try (Connection c = ds.getConnection()){
var op = new OrderedProcessor((connection, event) -> {
try {
subscriber.onMessage(new TransactionalEvent(connection,event));
return true;
} catch (Exception e){
return false;
}
});
op.process(c,message);
} catch (Exception e) {
try {
subscriber.onError(e);
} catch (Exception ignored) {
// If error handling fails, we ignore it to protect other subscribers
}
}
}
}

@Override
public TransactionalEvent convert(Event m) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.p14n.postevent.broker;

import com.p14n.postevent.data.Event;

import java.sql.Connection;

public record TransactionalEvent(Connection connection, Event event){}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.p14n.postevent.broker.grpc;

import com.p14n.postevent.broker.DefaultMessageBroker;
import com.p14n.postevent.broker.EventMessageBroker;
import com.p14n.postevent.broker.MessageBroker;
import com.p14n.postevent.broker.MessageSubscriber;
import com.p14n.postevent.data.Event;
Expand All @@ -15,7 +15,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;

public class MessageBrokerGrpcClient extends DefaultMessageBroker<Event> implements AutoCloseable {
public class MessageBrokerGrpcClient extends EventMessageBroker implements AutoCloseable {
private static final Logger LOGGER = Logger.getLogger(MessageBrokerGrpcClient.class.getName());
private final MessageBrokerServiceGrpc.MessageBrokerServiceStub asyncStub;
private final AtomicBoolean subscribed = new AtomicBoolean(false);
Expand Down Expand Up @@ -119,5 +119,5 @@ public boolean unsubscribe(MessageSubscriber<Event> subscriber) {
}
return unsubscribed;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

public class MessageBrokerGrpcServer extends MessageBrokerServiceGrpc.MessageBrokerServiceImplBase {
private static final Logger LOGGER = Logger.getLogger(MessageBrokerGrpcServer.class.getName());
private final MessageBroker<Event> messageBroker;
private final MessageBroker<Event,Event> messageBroker;

public MessageBrokerGrpcServer(MessageBroker<Event> messageBroker) {
public MessageBrokerGrpcServer(MessageBroker<Event,Event> messageBroker) {
this.messageBroker = messageBroker;
}

Expand Down
Loading