Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Set up JDK 17
- name: Set up JDK 21
uses: actions/setup-java@v3
with:
java-version: '17'
java-version: '21'
distribution: 'temurin'

- name: Validate Gradle wrapper
Expand Down
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,8 @@ replay_pid*
.idea

### vscode ###
.vscode
.vscode

### clojure ###
.clj-kondo/.cache
.lsp
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ Create a catchup mechanism
* Request a batch of messages from the server
* Write each message to the consumer
* Stop when the catchup mechanism detects that it is overwriting the live messages
- [ ] Request a batch of messages from the server (starting from?)
- [ ] Write each message to the consumer
- [ ] Stop when the catchup mechanism detects that it is overwriting the live messages
- [ ] Verifies that there are no gaps in the event sequence (check the sequence for the earliest unprocessed event until this one)
- [ ] Verifies that there are no earlier unprocessed events for the same subject
- [x] Request a batch of messages from the server (starting from?)
- [x] Write each message to the consumer
- [x] Stop when the catchup mechanism detects that it is overwriting the live messages
- [x] Verifies that there are no gaps in the event sequence (check the sequence for the earliest unprocessed event until this one)

Create a processor
- [ ] ?
- [x] Verifies that there are no earlier unprocessed events for the same subject

- [X] Hook up catchup mechanism
- [ ] Make server handle multiple topics
- [X] Make threading mechanism pluggable
- [ ] DST tests

DB Debezium DONE
LC Local Consumer DONE
Expand Down
9 changes: 7 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ repositories {

dependencies {
implementation 'io.debezium:debezium-api:3.0.1.Final'
implementation 'io.debezium:debezium-embedded:3.0.1.Final'
implementation ('io.debezium:debezium-embedded:3.0.1.Final') {
exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet'
exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2'
exclude group: 'org.eclipse.jetty'
}
implementation 'io.debezium:debezium-connector-postgres:3.0.1.Final'
implementation 'io.debezium:debezium-storage-jdbc:3.0.1.Final'
implementation 'org.slf4j:slf4j-api:2.0.9'

implementation 'com.zaxxer:HikariCP:6.2.1'

// gRPC dependencies
implementation 'io.grpc:grpc-netty-shaded:1.53.0'
implementation 'io.grpc:grpc-protobuf:1.53.0'
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/com/p14n/postevent/LocalConsumer.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
package com.p14n.postevent;

import com.p14n.postevent.broker.DefaultMessageBroker;
import com.p14n.postevent.broker.MessageBroker;
import com.p14n.postevent.broker.MessageSubscriber;
import com.p14n.postevent.data.ConfigData;
import com.p14n.postevent.data.Event;
import com.p14n.postevent.data.PostEventConfig;
import com.p14n.postevent.db.DatabaseSetup;
import com.p14n.postevent.debezium.DebeziumServer;
import static com.p14n.postevent.debezium.Functions.changeEventToEvent;
import io.debezium.engine.ChangeEvent;
import java.io.IOException;
import java.util.Properties;
import java.util.function.Consumer;

public class LocalConsumer {
public class LocalConsumer<OutT> implements AutoCloseable {
private final DebeziumServer debezium;
private final MessageBroker<Event> broker;
private final MessageBroker<Event,OutT> broker;
private final PostEventConfig config;
private final DatabaseSetup db;

public LocalConsumer(PostEventConfig config, MessageBroker<Event> broker) {
public LocalConsumer(PostEventConfig config, MessageBroker<Event,OutT> broker) {
this.config = config;
this.broker = broker;
this.db = new DatabaseSetup(config);
this.debezium = new DebeziumServer();
}

public void start() throws IOException, InterruptedException {
db.setupAll(config.name());
Consumer<ChangeEvent<String, String>> consumer = record -> {
try {
Event event = changeEventToEvent(record);
Expand All @@ -37,4 +45,9 @@ public void start() throws IOException, InterruptedException {
public void stop() throws IOException {
debezium.stop();
}

@Override
public void close() throws IOException {
stop();
}
}
11 changes: 10 additions & 1 deletion src/main/java/com/p14n/postevent/Publisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.p14n.postevent.data.Event;
import com.p14n.postevent.db.SQL;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand All @@ -14,6 +15,9 @@
*/
public class Publisher {

private Publisher(){

}
/**
* Publishes an event to the specified topic table.
*
Expand All @@ -24,7 +28,7 @@ public class Publisher {
* @throws IllegalArgumentException if the topic is null, empty, or contains
* invalid characters
*/
public void publish(Event event, Connection connection, String topic) throws SQLException {
public static void publish(Event event, Connection connection, String topic) throws SQLException {
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty");
}
Expand All @@ -41,4 +45,9 @@ public void publish(Event event, Connection connection, String topic) throws SQL
}
}

public static void publish(Event event, DataSource ds, String topic) throws SQLException {
try (Connection c = ds.getConnection()) {
publish(event, c, topic);
}
}
}
17 changes: 17 additions & 0 deletions src/main/java/com/p14n/postevent/broker/AsyncExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.p14n.postevent.broker;

import java.util.List;
import java.util.concurrent.*;

public interface AsyncExecutor {

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

List<Runnable> shutdownNow();

<T> Future<T> submit(Callable<T> task);

}
33 changes: 33 additions & 0 deletions src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.p14n.postevent.broker;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

public class DefaultExecutor implements AsyncExecutor {

private final ScheduledExecutorService se;
private final ExecutorService es;

public DefaultExecutor(int scheduledSize) {
this.se = Executors.newScheduledThreadPool(scheduledSize);
this.es = Executors.newVirtualThreadPerTaskExecutor();
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return se.scheduleAtFixedRate(command,initialDelay,period,unit);
}

@Override
public List<Runnable> shutdownNow() {
var x = es.shutdownNow();
x.addAll(se.shutdownNow());
return x;
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return es.submit(task);
}
}
62 changes: 40 additions & 22 deletions src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package com.p14n.postevent.broker;

import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Default implementation of MessageBroker using thread-safe collections.
*
* @param <T> The type of messages this broker handles
*/
public class DefaultMessageBroker<T> implements MessageBroker<T> {
public abstract class DefaultMessageBroker<InT,OutT> implements MessageBroker<InT,OutT>,AutoCloseable {

protected final CopyOnWriteArraySet<MessageSubscriber<T>> subscribers = new CopyOnWriteArraySet<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
protected final CopyOnWriteArraySet<MessageSubscriber<OutT>> subscribers = new CopyOnWriteArraySet<>();
protected final AtomicBoolean closed = new AtomicBoolean(false);

@Override
public void publish(T message) {
private final AsyncExecutor asyncExecutor;

public DefaultMessageBroker(){
this(new DefaultExecutor(2));
}
public DefaultMessageBroker(AsyncExecutor asyncExecutor) {
this.asyncExecutor = asyncExecutor;
}

protected boolean canProcess(InT message){
if (closed.get()) {
throw new IllegalStateException("Broker is closed");
}
Expand All @@ -25,25 +29,39 @@ public void publish(T message) {

// If no subscribers, message is silently dropped
if (subscribers.isEmpty()) {
return;
return false;
}
return true;

}

@Override
public void publish(InT message) {

if(!canProcess(message)){
return;
}
// Deliver to all subscribers
for (MessageSubscriber<T> subscriber : subscribers) {
try {
subscriber.onMessage(message);
} catch (Exception e) {
for (MessageSubscriber<OutT> subscriber : subscribers) {
asyncExecutor.submit(() -> {
try {
subscriber.onError(e);
} catch (Exception ignored) {
// If error handling fails, we ignore it to protect other subscribers
subscriber.onMessage(convert(message));
return null;
} catch (Exception e) {
try {
subscriber.onError(e);
} catch (Exception ignored) {
// If error handling fails, we ignore it to protect other subscribers
}
}
}
return null;
});

}
}

@Override
public boolean subscribe(MessageSubscriber<T> subscriber) {
public boolean subscribe(MessageSubscriber<OutT> subscriber) {
if (closed.get()) {
throw new IllegalStateException("Broker is closed");
}
Expand All @@ -56,7 +74,7 @@ public boolean subscribe(MessageSubscriber<T> subscriber) {
}

@Override
public boolean unsubscribe(MessageSubscriber<T> subscriber) {
public boolean unsubscribe(MessageSubscriber<OutT> subscriber) {
if (subscriber == null) {
throw new IllegalArgumentException("Subscriber cannot be null");
}
Expand All @@ -69,7 +87,7 @@ public void close() {
if (closed.compareAndSet(false, true)) {
// Notify all subscribers of shutdown
Throwable shutdownError = new IllegalStateException("Message broker is shutting down");
for (MessageSubscriber<T> subscriber : subscribers) {
for (MessageSubscriber<OutT> subscriber : subscribers) {
try {
subscriber.onError(shutdownError);
} catch (Exception ignored) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/p14n/postevent/broker/EventMessageBroker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.p14n.postevent.broker;

import com.p14n.postevent.data.Event;

public class EventMessageBroker extends DefaultMessageBroker<Event,Event> {
@Override
public Event convert(Event m) {
return m;
}
}
10 changes: 6 additions & 4 deletions src/main/java/com/p14n/postevent/broker/MessageBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,34 @@
* Thread-safe message broker interface for publishing messages and managing subscribers.
* @param <T> The type of messages this broker handles
*/
public interface MessageBroker<T> {
public interface MessageBroker<InT,OutT> {

/**
* Publishes a message to all current subscribers.
* If no subscribers are present, the message is silently dropped.
* @param message The message to publish
*/
void publish(T message);
void publish(InT message);

/**
* Adds a subscriber to receive messages.
* @param subscriber The subscriber to add
* @return true if the subscriber was added, false if it was already present
*/
boolean subscribe(MessageSubscriber<T> subscriber);
boolean subscribe(MessageSubscriber<OutT> subscriber);

/**
* Removes a subscriber from receiving messages.
* @param subscriber The subscriber to remove
* @return true if the subscriber was removed, false if it wasn't present
*/
boolean unsubscribe(MessageSubscriber<T> subscriber);
boolean unsubscribe(MessageSubscriber<OutT> subscriber);

/**
* Closes the broker and releases any resources.
* After closing, no more messages can be published or subscribers added.
*/
void close();

OutT convert(InT m);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ public interface MessageSubscriber<T> {
* Called when an error occurs that prevents further message processing.
* @param error The error that occurred
*/
void onError(Throwable error);
default void onError(Throwable error){

}
}
14 changes: 14 additions & 0 deletions src/main/java/com/p14n/postevent/broker/SystemEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.p14n.postevent.broker;

public enum SystemEvent {

CatchupRequired,
UnprocessedCheckRequired;

public String topic;

public SystemEvent withTopic(String topic) {
this.topic = topic;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/p14n/postevent/broker/SystemEventBroker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.p14n.postevent.broker;

public class SystemEventBroker extends
DefaultMessageBroker<SystemEvent, SystemEvent> implements AutoCloseable {

@Override
public SystemEvent convert(SystemEvent m) {
return m;
}

}
Loading