diff --git a/src/main/java/com/p14n/postevent/ConsumerClient.java b/src/main/java/com/p14n/postevent/ConsumerClient.java index 2224a47..70387b5 100644 --- a/src/main/java/com/p14n/postevent/ConsumerClient.java +++ b/src/main/java/com/p14n/postevent/ConsumerClient.java @@ -23,7 +23,7 @@ import java.sql.SQLException; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.Set; import java.util.concurrent.TimeUnit; public class ConsumerClient implements AutoCloseable, MessageBroker { @@ -32,26 +32,25 @@ public class ConsumerClient implements AutoCloseable, MessageBroker closeables; private TransactionalBroker tb; - private String topic; + SystemEventBroker seb; - public ConsumerClient(String topic, AsyncExecutor asyncExecutor) { + public ConsumerClient(AsyncExecutor asyncExecutor) { this.asyncExecutor = asyncExecutor; - this.topic = topic; } - public ConsumerClient(String topic) { - this(topic, new DefaultExecutor(2)); + public ConsumerClient() { + this(new DefaultExecutor(2)); } - public void start(DataSource ds, String host, int port) { - start(ds, ManagedChannelBuilder.forAddress(host, port) + public void start(Set topics, DataSource ds, String host, int port) { + start(topics, ds, ManagedChannelBuilder.forAddress(host, port) .keepAliveTime(1, TimeUnit.HOURS) .keepAliveTimeout(30, TimeUnit.SECONDS) .usePlaintext() .build()); } - public void start(DataSource ds, ManagedChannel channel) { + public void start(Set topics, DataSource ds, ManagedChannel channel) { logger.atInfo().log("Starting consumer client"); if (tb != null) { @@ -61,12 +60,14 @@ public void start(DataSource ds, ManagedChannel channel) { try { tb = new TransactionalBroker(ds, asyncExecutor); - var seb = new SystemEventBroker(asyncExecutor); + seb = new SystemEventBroker(asyncExecutor); var pb = new PersistentBroker<>(tb, ds, seb); - var client = new MessageBrokerGrpcClient(channel, topic); + var client = new MessageBrokerGrpcClient(channel); var catchupClient = new CatchupGrpcClient(channel); - client.subscribe(pb); + for (var topic : topics) { + client.subscribe(topic, pb); + } seb.subscribe(new CatchupService(ds, catchupClient, seb)); seb.subscribe(new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb)); @@ -105,7 +106,7 @@ public void close() { } @Override - public void publish(TransactionalEvent message) { + public void publish(String topic, TransactionalEvent message) { try { Publisher.publish(message.event(), message.connection(), topic); } catch (SQLException e) { @@ -114,13 +115,15 @@ public void publish(TransactionalEvent message) { } @Override - public boolean subscribe(MessageSubscriber subscriber) { - return tb.subscribe(subscriber); + public boolean subscribe(String topic, MessageSubscriber subscriber) { + var subscribed = tb.subscribe(topic, subscriber); + seb.publish(SystemEvent.CatchupRequired.withTopic(topic)); + return subscribed; } @Override - public boolean unsubscribe(MessageSubscriber subscriber) { - return tb.unsubscribe(subscriber); + public boolean unsubscribe(String topic, MessageSubscriber subscriber) { + return tb.unsubscribe(topic, subscriber); } @Override diff --git a/src/main/java/com/p14n/postevent/LocalConsumer.java b/src/main/java/com/p14n/postevent/LocalConsumer.java index f74a57c..c824d40 100644 --- a/src/main/java/com/p14n/postevent/LocalConsumer.java +++ b/src/main/java/com/p14n/postevent/LocalConsumer.java @@ -10,7 +10,12 @@ import java.io.IOException; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class LocalConsumer implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(LocalConsumer.class); private final DebeziumServer debezium; private final MessageBroker broker; private final PostEventConfig config; @@ -24,18 +29,27 @@ public LocalConsumer(PostEventConfig config, MessageBroker broker) } public void start() throws IOException, InterruptedException { - db.setupAll(config.name()); - Consumer> consumer = record -> { - try { - Event event = changeEventToEvent(record); - if (event != null) { - broker.publish(event); + logger.atInfo() + .addArgument(config.topics()) // renamed from name() + .log("Starting local consumer for {}"); + + try { + db.setupAll(config.topics()); // renamed from name() + Consumer> consumer = record -> { + try { + Event event = changeEventToEvent(record); + if (event != null) { + broker.publish(event.topic(), event); + } + } catch (Exception e) { + throw new RuntimeException("Failed to process change event", e); } - } catch (Exception e) { - throw new RuntimeException("Failed to process change event", e); - } - }; - debezium.start(config, consumer); + }; + debezium.start(config, consumer); + } catch (IOException | InterruptedException e) { + logger.atError().setCause(e).log("Failed to start local consumer"); + throw e; + } } public void stop() throws IOException { @@ -46,4 +60,4 @@ public void stop() throws IOException { public void close() throws IOException { stop(); } -} \ No newline at end of file +} diff --git a/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java b/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java index cd403b2..2c297a4 100644 --- a/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java +++ b/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java @@ -90,22 +90,22 @@ public void close() { } @Override - public void publish(TransactionalEvent message) { + public void publish(String topic, TransactionalEvent message) { try { - Publisher.publish(message.event(), message.connection(), cfg.name()); + Publisher.publish(message.event(), message.connection(), message.event().topic()); // renamed from name() } catch (SQLException e) { throw new RuntimeException(e); } } @Override - public boolean subscribe(MessageSubscriber subscriber) { - return tb.subscribe(subscriber); + public boolean subscribe(String topic, MessageSubscriber subscriber) { + return tb.subscribe(topic, subscriber); } @Override - public boolean unsubscribe(MessageSubscriber subscriber) { - return tb.unsubscribe(subscriber); + public boolean unsubscribe(String topic, MessageSubscriber subscriber) { + return tb.unsubscribe(topic, subscriber); } @Override diff --git a/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java b/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java index fa0ed91..306977e 100644 --- a/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java +++ b/src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java @@ -1,14 +1,14 @@ package com.p14n.postevent.broker; -import java.util.concurrent.Callable; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; public abstract class DefaultMessageBroker implements MessageBroker, AutoCloseable { - protected final CopyOnWriteArraySet> subscribers = new CopyOnWriteArraySet<>(); + protected final ConcurrentHashMap>> topicSubscribers = new ConcurrentHashMap<>(); protected final AtomicBoolean closed = new AtomicBoolean(false); - private final AsyncExecutor asyncExecutor; public DefaultMessageBroker() { @@ -19,7 +19,7 @@ public DefaultMessageBroker(AsyncExecutor asyncExecutor) { this.asyncExecutor = asyncExecutor; } - protected boolean canProcess(InT message) { + protected boolean canProcess(String topic, InT message) { if (closed.get()) { throw new IllegalStateException("Broker is closed"); } @@ -28,42 +28,43 @@ protected boolean canProcess(InT message) { throw new IllegalArgumentException("Message cannot be null"); } - // If no subscribers, message is silently dropped - if (subscribers.isEmpty()) { - return false; + if (topic == null) { + throw new IllegalArgumentException("Topic cannot be null"); } - return true; + // If no subscribers for this topic, message is silently dropped + return topicSubscribers.containsKey(topic) && !topicSubscribers.get(topic).isEmpty(); } @Override - public void publish(InT message) { - - if (!canProcess(message)) { + public void publish(String topic, InT message) { + if (!canProcess(topic, message)) { return; } - // Deliver to all subscribers - for (MessageSubscriber subscriber : subscribers) { - asyncExecutor.submit(() -> { - try { - subscriber.onMessage(convert(message)); - return null; - } catch (Exception e) { + // Deliver to all subscribers for this topic + Set> subscribers = topicSubscribers.get(topic); + if (subscribers != null) { + for (MessageSubscriber subscriber : subscribers) { + asyncExecutor.submit(() -> { try { - subscriber.onError(e); - } catch (Exception ignored) { - // If error handling fails, we ignore it to protect other subscribers + subscriber.onMessage(convert(message)); + return null; + } catch (Exception e) { + try { + subscriber.onError(e); + } catch (Exception ignored) { + // If error handling fails, we ignore it to protect other subscribers + } } - } - return null; - }); - + return null; + }); + } } } @Override - public boolean subscribe(MessageSubscriber subscriber) { + public boolean subscribe(String topic, MessageSubscriber subscriber) { if (closed.get()) { throw new IllegalStateException("Broker is closed"); } @@ -72,31 +73,39 @@ public boolean subscribe(MessageSubscriber subscriber) { throw new IllegalArgumentException("Subscriber cannot be null"); } - return subscribers.add(subscriber); + if (topic == null) { + throw new IllegalArgumentException("Topic cannot be null"); + } + + return topicSubscribers + .computeIfAbsent(topic, k -> new CopyOnWriteArraySet<>()) + .add(subscriber); } @Override - public boolean unsubscribe(MessageSubscriber subscriber) { + public boolean unsubscribe(String topic, MessageSubscriber subscriber) { if (subscriber == null) { throw new IllegalArgumentException("Subscriber cannot be null"); } - return subscribers.remove(subscriber); + if (topic == null) { + throw new IllegalArgumentException("Topic cannot be null"); + } + + Set> subscribers = topicSubscribers.get(topic); + if (subscribers != null) { + boolean removed = subscribers.remove(subscriber); + if (subscribers.isEmpty()) { + topicSubscribers.remove(topic); + } + return removed; + } + return false; } @Override public void close() { - if (closed.compareAndSet(false, true)) { - // Notify all subscribers of shutdown - Throwable shutdownError = new IllegalStateException("Message broker is shutting down"); - for (MessageSubscriber subscriber : subscribers) { - try { - subscriber.onError(shutdownError); - } catch (Exception ignored) { - // Ignore errors during shutdown notification - } - } - subscribers.clear(); - } + closed.set(true); + topicSubscribers.clear(); } } diff --git a/src/main/java/com/p14n/postevent/broker/MessageBroker.java b/src/main/java/com/p14n/postevent/broker/MessageBroker.java index 85382b8..bfad677 100644 --- a/src/main/java/com/p14n/postevent/broker/MessageBroker.java +++ b/src/main/java/com/p14n/postevent/broker/MessageBroker.java @@ -1,37 +1,52 @@ package com.p14n.postevent.broker; /** - * Thread-safe message broker interface for publishing messages and managing subscribers. - * @param The type of messages this broker handles + * Thread-safe message broker interface for publishing messages and managing + * subscribers. + * + * @param The type of messages this broker accepts + * @param The type of messages this broker delivers to subscribers */ -public interface MessageBroker { - +public interface MessageBroker { + /** - * Publishes a message to all current subscribers. - * If no subscribers are present, the message is silently dropped. + * Publishes a message to all subscribers of the specified topic. + * If no subscribers are present for the topic, the message is silently dropped. + * + * @param topic The topic to publish to * @param message The message to publish */ - void publish(InT message); - + void publish(String topic, InT message); + /** - * Adds a subscriber to receive messages. + * Adds a subscriber to receive messages for the specified topic. + * + * @param topic The topic to subscribe to * @param subscriber The subscriber to add * @return true if the subscriber was added, false if it was already present */ - boolean subscribe(MessageSubscriber subscriber); - + boolean subscribe(String topic, MessageSubscriber subscriber); + /** - * Removes a subscriber from receiving messages. + * Removes a subscriber from receiving messages for the specified topic. + * + * @param topic The topic to unsubscribe from * @param subscriber The subscriber to remove * @return true if the subscriber was removed, false if it wasn't present */ - boolean unsubscribe(MessageSubscriber subscriber); - + boolean unsubscribe(String topic, MessageSubscriber subscriber); + /** * Closes the broker and releases any resources. * After closing, no more messages can be published or subscribers added. */ void close(); + /** + * Converts an incoming message to the outgoing message type + * + * @param m The message to convert + * @return The converted message + */ OutT convert(InT m); } diff --git a/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java b/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java index 6aa513e..5e9b4d1 100644 --- a/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java +++ b/src/main/java/com/p14n/postevent/broker/SystemEventBroker.java @@ -16,4 +16,12 @@ public SystemEvent convert(SystemEvent m) { return m; } + public void publish(SystemEvent event) { + publish("system", event); + } + + public void subscribe(MessageSubscriber subscriber) { + subscribe("system", subscriber); + } + } diff --git a/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java b/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java index 913ae58..331bbc9 100644 --- a/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java +++ b/src/main/java/com/p14n/postevent/broker/TransactionalBroker.java @@ -20,14 +20,13 @@ public TransactionalBroker(DataSource ds) { } @Override - public void publish(Event message) { - - if (!canProcess(message)) { + public void publish(String topic, Event message) { + if (!canProcess(topic, message)) { return; } // Deliver to all subscribers - for (MessageSubscriber subscriber : subscribers) { + for (MessageSubscriber subscriber : topicSubscribers.get(topic)) { try (Connection c = ds.getConnection()) { var op = new OrderedProcessor((connection, event) -> { try { diff --git a/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java b/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java index 01d039f..d2b493a 100644 --- a/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java +++ b/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java @@ -1,46 +1,42 @@ package com.p14n.postevent.broker.grpc; import com.p14n.postevent.broker.EventMessageBroker; -import com.p14n.postevent.broker.MessageBroker; import com.p14n.postevent.broker.MessageSubscriber; -import com.p14n.postevent.catchup.CatchupServer; import com.p14n.postevent.data.Event; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import java.time.Instant; import java.time.OffsetDateTime; -import java.util.concurrent.CountDownLatch; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MessageBrokerGrpcClient extends EventMessageBroker implements AutoCloseable { +public class MessageBrokerGrpcClient extends EventMessageBroker { private static final Logger logger = LoggerFactory.getLogger(MessageBrokerGrpcClient.class); private final MessageBrokerServiceGrpc.MessageBrokerServiceStub asyncStub; - private final AtomicBoolean subscribed = new AtomicBoolean(false); + private final Set subscribed = ConcurrentHashMap.newKeySet();; ManagedChannel channel; - String topic; - public MessageBrokerGrpcClient(String host, int port, String topic) { + public MessageBrokerGrpcClient(String host, int port) { this(ManagedChannelBuilder.forAddress(host, port) .keepAliveTime(1, TimeUnit.HOURS) .keepAliveTimeout(30, TimeUnit.SECONDS) .usePlaintext() - .build(), topic); + .build()); } - public MessageBrokerGrpcClient(ManagedChannel channel, String topic) { + public MessageBrokerGrpcClient(ManagedChannel channel) { this.channel = channel; this.asyncStub = MessageBrokerServiceGrpc.newStub(channel); - this.topic = topic; } - public void subscribeToEvents() { + public void subscribeToEvents(String topic) { SubscriptionRequest request = SubscriptionRequest.newBuilder() .setTopic(topic) .build(); @@ -51,7 +47,7 @@ public void onNext(EventResponse response) { try { logger.atDebug().log(() -> "Received event: " + response.getId()); Event event = convertFromGrpcEvent(response); - publish(event); + publish(topic, event); } catch (Exception e) { logger.atError().setCause(e).log("Error processing event"); } @@ -60,18 +56,18 @@ public void onNext(EventResponse response) { @Override public void onError(Throwable t) { logger.atError().setCause(t).log("Error in event stream"); - subscribed.set(false); + subscribed.remove(topic); } @Override public void onCompleted() { logger.atInfo().log("Stream completed"); - subscribed.set(false); + subscribed.remove(topic); } }; asyncStub.subscribeToEvents(request, responseObserver); - subscribed.set(true); + subscribed.add(topic); // Send the subscription request } @@ -89,21 +85,21 @@ private Event convertFromGrpcEvent(EventResponse grpcEvent) { grpcEvent.getDataSchema(), grpcEvent.getSubject(), grpcEvent.getData().toByteArray(), - time.toInstant(), + time == null ? Instant.now() : time.toInstant(), grpcEvent.getIdn(), grpcEvent.getTopic()); } @Override - public void publish(Event message) { - super.publish(message); + public void publish(String topic, Event message) { + super.publish(topic, message); } @Override - public boolean subscribe(MessageSubscriber subscriber) { - if (super.subscribe(subscriber)) { - if (!subscribed.get()) { - subscribeToEvents(); + public boolean subscribe(String topic, MessageSubscriber subscriber) { + if (super.subscribe(topic, subscriber)) { + if (!subscribed.contains(topic)) { + subscribeToEvents(topic); } return true; } @@ -111,10 +107,10 @@ public boolean subscribe(MessageSubscriber subscriber) { } @Override - public boolean unsubscribe(MessageSubscriber subscriber) { - boolean unsubscribed = super.unsubscribe(subscriber); - if (subscribers.isEmpty()) { - subscribed.set(false); + public boolean unsubscribe(String topic, MessageSubscriber subscriber) { + boolean unsubscribed = super.unsubscribe(topic, subscriber); + if (topicSubscribers.isEmpty()) { + subscribed.remove(topic); this.channel.shutdownNow(); try { this.channel.awaitTermination(5, TimeUnit.SECONDS); diff --git a/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java b/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java index 42a3c00..64d6274 100644 --- a/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java +++ b/src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java @@ -78,10 +78,10 @@ public void onError(Throwable error) { ServerCallStreamObserver responseCallObserver = (ServerCallStreamObserver) responseObserver; responseCallObserver.setOnCancelHandler(() -> { cancelled.set(true); - messageBroker.unsubscribe(subscriber); + messageBroker.unsubscribe(topic, subscriber); logger.atInfo().log("Unsubscribed from topic: {}", topic); }); - messageBroker.subscribe(subscriber); + messageBroker.subscribe(topic, subscriber); logger.atInfo().log("Subscribed to topic: {}", topic); } catch (Exception e) { diff --git a/src/main/java/com/p14n/postevent/catchup/CatchupServer.java b/src/main/java/com/p14n/postevent/catchup/CatchupServer.java index 9b05927..ab4a316 100644 --- a/src/main/java/com/p14n/postevent/catchup/CatchupServer.java +++ b/src/main/java/com/p14n/postevent/catchup/CatchupServer.java @@ -1,6 +1,5 @@ package com.p14n.postevent.catchup; -import com.p14n.postevent.ConsumerServer; import com.p14n.postevent.data.Event; import com.p14n.postevent.db.SQL; diff --git a/src/main/java/com/p14n/postevent/catchup/CatchupService.java b/src/main/java/com/p14n/postevent/catchup/CatchupService.java index fa13a40..45b2281 100644 --- a/src/main/java/com/p14n/postevent/catchup/CatchupService.java +++ b/src/main/java/com/p14n/postevent/catchup/CatchupService.java @@ -54,13 +54,16 @@ public int catchup(String topicName) { long currentHwm = getCurrentHwm(conn, topicName); // Find the next gap end (lowest idn greater than hwm) - long gapEnd = findGapEnd(conn, currentHwm); + long gapEnd = findGapEnd(conn, currentHwm, topicName); LOGGER.info(String.format("Current HWM %d highest message in gap %d", currentHwm, gapEnd)); - if (gapEnd <= currentHwm) { - LOGGER.info("No new events to process for topic: " + topicName); + if (gapEnd <= (currentHwm + 1)) { + LOGGER.info("No new gap events to process for topic: " + topicName); + if (updateHwmToLastContiguous(topicName, currentHwm, conn)) { + systemEventBroker.publish(SystemEvent.UnprocessedCheckRequired); + } return 0; } @@ -119,11 +122,12 @@ private void initializeHwm(Connection connection, String topicName) throws SQLEx getCurrentHwm(connection, topicName); } - private long findGapEnd(Connection connection, long currentHwm) throws SQLException { - String sql = "SELECT MIN(idn) as next_idn FROM postevent.messages WHERE idn > ?"; + private long findGapEnd(Connection connection, long currentHwm, String topicName) throws SQLException { + String sql = "SELECT MIN(idn) as next_idn FROM postevent.messages WHERE topic=? AND idn > ?"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { - stmt.setLong(1, currentHwm); + stmt.setString(1, topicName); + stmt.setLong(2, currentHwm); try (ResultSet rs = stmt.executeQuery()) { if (rs.next() && rs.getObject("next_idn") != null) { @@ -213,7 +217,7 @@ private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQL while (rs.next()) { long actualIdn = rs.getLong("idn"); if (actualIdn > expectedNext) { - LOGGER.info("Gap found: Expected {0}, found {1} (gap of {2})", + LOGGER.info("Gap found: Expected {}, found {} (gap of {})", new Object[] { expectedNext, actualIdn, actualIdn - expectedNext }); return new GapCheckResult(true, lastContiguousIdn); } @@ -233,24 +237,33 @@ private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQL * @throws SQLException If a database error occurs */ public boolean hasSequenceGap(String topicName, long currentHwm) throws SQLException { - LOGGER.debug("Checking for sequence gaps after HWM {0} for topic {1}", + + try (Connection connection = datasource.getConnection()) { + + connection.setAutoCommit(false); + var result = hasSequenceGap(topicName, currentHwm, connection); + connection.commit(); + return result; + } + } + + public boolean hasSequenceGap(String topicName, long currentHwm, Connection connection) throws SQLException { + LOGGER.info("Checking for sequence gaps after HWM {} for topic {}", new Object[] { currentHwm, topicName }); String sql = "SELECT idn FROM postevent.messages WHERE idn > ? ORDER BY idn"; - try (Connection connection = datasource.getConnection(); - PreparedStatement stmt = connection.prepareStatement(sql)) { - connection.setAutoCommit(false); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { stmt.setLong(1, currentHwm); try (ResultSet rs = stmt.executeQuery()) { GapCheckResult result = processMessages(rs, currentHwm); if (result.lastContiguousIdn > currentHwm) { - LOGGER.debug("Updating HWM from {0} to {1} for topic {2}", + LOGGER.info("Updating HWM from {} to {} for topic {}", new Object[] { currentHwm, result.lastContiguousIdn, topicName }); updateHwm(connection, topicName, currentHwm, result.lastContiguousIdn); } if (!result.gapFound) { - LOGGER.debug("No sequence gaps found after HWM for topic {0}", + LOGGER.info("No sequence gaps found after HWM for topic {}", new Object[] { topicName }); } connection.commit(); @@ -258,4 +271,26 @@ public boolean hasSequenceGap(String topicName, long currentHwm) throws SQLExcep } } } + + public boolean updateHwmToLastContiguous(String topicName, long currentHwm, Connection connection) + throws SQLException { + String sql = "SELECT idn FROM postevent.messages WHERE topic = ? AND idn > ? ORDER BY idn"; + + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, topicName); + stmt.setLong(2, currentHwm); + try (ResultSet rs = stmt.executeQuery()) { + GapCheckResult result = processMessages(rs, currentHwm); + boolean shouldUpdate = result.lastContiguousIdn > currentHwm; + if (shouldUpdate) { + LOGGER.info("Updating HWM from {} to {} for topic {}", + new Object[] { currentHwm, result.lastContiguousIdn, topicName }); + updateHwm(connection, topicName, currentHwm, result.lastContiguousIdn); + } + connection.commit(); + return shouldUpdate; + } + } + } + } \ No newline at end of file diff --git a/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java b/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java index 29cc28d..15f713f 100644 --- a/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java +++ b/src/main/java/com/p14n/postevent/catchup/PersistentBroker.java @@ -16,7 +16,7 @@ public class PersistentBroker implements MessageBroker, AutoCloseable, MessageSubscriber { private static final Logger logger = LoggerFactory.getLogger(PersistentBroker.class); private static final String INSERT_SQL = "INSERT INTO postevent.messages (" + SQL.EXT_COLS + - ") VALUES (" + SQL.EXT_PH + ")"; + ") VALUES (" + SQL.EXT_PH + ") ON CONFLICT DO NOTHING"; private static final String UPDATE_HWM_SQL = "UPDATE postevent.contiguous_hwm set hwm=? where topic_name=? and hwm=?"; private final MessageBroker targetBroker; @@ -33,7 +33,7 @@ public PersistentBroker(MessageBroker targetBroker, } @Override - public void publish(Event event) { + public void publish(String topic, Event event) { Connection conn = null; try { conn = dataSource.getConnection(); @@ -62,10 +62,11 @@ public void publish(Event event) { // Forward to actual subscriber after successful persistence if (updates > 0) { logger.atDebug().log("Forwarding event to target broker"); - targetBroker.publish(event); + targetBroker.publish(topic, event); } } catch (SQLException e) { + logger.atError().setCause(e).log("Error persisting and forwarding event"); SQL.handleSQLException(e, conn); throw new RuntimeException("Failed to persist and forward event", e); } finally { @@ -74,13 +75,13 @@ public void publish(Event event) { } @Override - public boolean subscribe(MessageSubscriber subscriber) { - return targetBroker.subscribe(subscriber); + public boolean subscribe(String topic, MessageSubscriber subscriber) { + return targetBroker.subscribe(topic, subscriber); } @Override - public boolean unsubscribe(MessageSubscriber subscriber) { - return targetBroker.unsubscribe(subscriber); + public boolean unsubscribe(String topic, MessageSubscriber subscriber) { + return targetBroker.unsubscribe(topic, subscriber); } @Override @@ -96,6 +97,6 @@ public OutT convert(Event m) { @Override public void onMessage(Event message) { logger.atDebug().log("Received event for persistence"); - publish(message); + publish(message.topic(), message); } } diff --git a/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java b/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java index ea8efe9..8a0f730 100644 --- a/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java +++ b/src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java @@ -27,7 +27,7 @@ private void resubmit() { try (Connection c = ds.getConnection()) { var events = unprocessedEventFinder.findUnprocessedEvents(c); for (var e : events) { - targetBroker.publish(e); + targetBroker.publish(e.topic(), e); } } catch (SQLException e) { throw new RuntimeException(e); diff --git a/src/main/java/com/p14n/postevent/data/ConfigData.java b/src/main/java/com/p14n/postevent/data/ConfigData.java index b165014..54472d8 100644 --- a/src/main/java/com/p14n/postevent/data/ConfigData.java +++ b/src/main/java/com/p14n/postevent/data/ConfigData.java @@ -1,9 +1,10 @@ package com.p14n.postevent.data; import java.util.Properties; +import java.util.Set; public record ConfigData(String affinity, - String name, + Set topics, // renamed from name String dbHost, int dbPort, String dbUser, @@ -11,13 +12,12 @@ public record ConfigData(String affinity, String dbName, Properties overrideProps) implements PostEventConfig { public ConfigData(String affinity, - String name, - String dbHost, - int dbPort, - String dbUser, - String dbPassword, - String dbName){ - this(affinity,name,dbHost,dbPort,dbUser,dbPassword,dbName,null); - + Set topics, // renamed from name + String dbHost, + int dbPort, + String dbUser, + String dbPassword, + String dbName) { + this(affinity, topics, dbHost, dbPort, dbUser, dbPassword, dbName, null); } } diff --git a/src/main/java/com/p14n/postevent/data/PostEventConfig.java b/src/main/java/com/p14n/postevent/data/PostEventConfig.java index c82f19c..cef1a27 100644 --- a/src/main/java/com/p14n/postevent/data/PostEventConfig.java +++ b/src/main/java/com/p14n/postevent/data/PostEventConfig.java @@ -1,11 +1,12 @@ package com.p14n.postevent.data; import java.util.Properties; +import java.util.Set; public interface PostEventConfig { public String affinity(); - public String name(); + public Set topics(); // Changed from single topic to set public String dbHost(); @@ -22,8 +23,9 @@ public interface PostEventConfig { public default int startupTimeoutSeconds() { return 30; } + public default String jdbcUrl() { return String.format("jdbc:postgresql://%s:%d/%s", - dbHost(),dbPort(),dbName()); + dbHost(), dbPort(), dbName()); } } diff --git a/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java b/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java index 35dae8e..c97efce 100644 --- a/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java +++ b/src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java @@ -5,15 +5,11 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.p14n.postevent.broker.grpc.MessageBrokerGrpcClient; - /** * Responsible for finding all unprocessed events in the messages table. * Unprocessed events have a status of 'u'. diff --git a/src/main/java/com/p14n/postevent/db/DatabaseSetup.java b/src/main/java/com/p14n/postevent/db/DatabaseSetup.java index 9253b10..8e91450 100644 --- a/src/main/java/com/p14n/postevent/db/DatabaseSetup.java +++ b/src/main/java/com/p14n/postevent/db/DatabaseSetup.java @@ -1,7 +1,6 @@ package com.p14n.postevent.db; import com.p14n.postevent.data.PostEventConfig; -import com.p14n.postevent.data.UnprocessedEventFinder; import com.zaxxer.hikari.HikariDataSource; import javax.sql.DataSource; @@ -13,6 +12,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.util.Set; public class DatabaseSetup { @@ -32,11 +32,16 @@ public DatabaseSetup(String jdbcUrl, String username, String password) { this.password = password; } - public void setupAll(String topic) { + public DatabaseSetup setupAll(Set topics) { createSchemaIfNotExists(); createMessagesTableIfNotExists(); createContiguousHwmTableIfNotExists(); - createTableIfNotExists(topic); + topics.stream().forEach(this::createTableIfNotExists); + return this; + } + + public DatabaseSetup setupAll(String topic) { + return setupAll(Set.of(topic)); } public DatabaseSetup createSchemaIfNotExists() { @@ -95,7 +100,7 @@ public DatabaseSetup createMessagesTableIfNotExists() { String sql = """ CREATE TABLE IF NOT EXISTS postevent.messages ( - idn bigint PRIMARY KEY NOT NULL, + idn bigint NOT NULL, topic VARCHAR(255) NOT NULL, id VARCHAR(255), source VARCHAR(1024), @@ -106,7 +111,7 @@ subject VARCHAR(255), data bytea, time TIMESTAMP WITH TIME ZONE, status VARCHAR(1) DEFAULT 'u', - UNIQUE (idn, topic) + PRIMARY KEY (topic,idn) )"""; stmt.execute(sql); diff --git a/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java b/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java index 5e7ea7f..9c16bd6 100644 --- a/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java +++ b/src/main/java/com/p14n/postevent/debezium/DebeziumServer.java @@ -1,6 +1,8 @@ package com.p14n.postevent.debezium; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.p14n.postevent.data.PostEventConfig; @@ -22,15 +24,20 @@ public class DebeziumServer { public static Properties props( String affinity, - String name, + Set topics, String dbHost, String dbPort, String dbUser, String dbPassword, String dbName) { final Properties props = new Properties(); - var affinityid = name + "_" + affinity; - props.setProperty("name", "postevent-" + name); + + // Create comma-separated list of tables + String tableList = topics.stream() + .map(topic -> "postevent." + topic) + .collect(Collectors.joining(",")); + + props.setProperty("name", "postevent-multi"); props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); props.setProperty("offset.storage", "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"); props.setProperty("offset.storage.jdbc.offset.table.name", "postevent.offsets"); @@ -45,11 +52,11 @@ public static Properties props( props.setProperty("database.user", dbUser); props.setProperty("database.password", dbPassword); props.setProperty("database.dbname", dbName); - props.setProperty("table.include.list", "postevent." + name); - props.setProperty("topic.prefix", "postevent-" + name); + props.setProperty("table.include.list", tableList); + props.setProperty("topic.prefix", "postevent"); props.setProperty("publication.autocreate.mode", "filtered"); props.setProperty("snapshot.mode", "no_data"); - props.setProperty("slot.name", "postevent_" + name + "_" + affinity); + props.setProperty("slot.name", "postevent_" + affinity); props.setProperty("offset.storage.jdbc.offset.table.ddl", "CREATE TABLE IF NOT EXISTS %s (affinityid VARCHAR(255) NOT NULL, id VARCHAR(36) NOT NULL, " + @@ -58,13 +65,13 @@ public static Properties props( "record_insert_seq INTEGER NOT NULL" + ")"); props.setProperty("offset.storage.jdbc.offset.table.select", - "SELECT id, offset_key, offset_val FROM %s WHERE affinityid = '" + affinityid + "SELECT id, offset_key, offset_val FROM %s WHERE affinityid = '" + affinity + "' ORDER BY record_insert_ts, record_insert_seq"); props.setProperty("offset.storage.jdbc.offset.table.delete", - "DELETE FROM %s WHERE affinityid = '" + affinityid + "'"); + "DELETE FROM %s WHERE affinityid = '" + affinity + "'"); props.setProperty("offset.storage.jdbc.offset.table.insert", "INSERT INTO %s(affinityid, id, offset_key, offset_val, record_insert_ts, record_insert_seq) VALUES ( '" - + affinityid + "', ?, ?, ?, ?, ? )"); + + affinity + "', ?, ?, ?, ?, ? )"); return props; } @@ -80,9 +87,9 @@ public void start(PostEventConfig cfg, throw new IllegalStateException("Config must be set before starting the engine"); } logger.atInfo() - .addArgument(cfg.name()) + .addArgument(cfg.topics()) .addArgument(cfg.affinity()) - .log("Starting Debezium engine for {} with affinity {}"); + .log("Starting Debezium engine for topics {} with affinity {}"); var started = new CountDownLatch(1); engine = DebeziumEngine.create(Json.class) .using(new DebeziumEngine.ConnectorCallback() { @@ -93,7 +100,7 @@ public void taskStarted() { } }) .using(cfg.overrideProps() != null ? cfg.overrideProps() - : props(cfg.affinity(), cfg.name(), cfg.dbHost(), + : props(cfg.affinity(), cfg.topics(), cfg.dbHost(), String.valueOf(cfg.dbPort()), cfg.dbUser(), cfg.dbPassword(), cfg.dbName())) diff --git a/src/main/java/com/p14n/postevent/debezium/Functions.java b/src/main/java/com/p14n/postevent/debezium/Functions.java index 7a607c8..a4847eb 100644 --- a/src/main/java/com/p14n/postevent/debezium/Functions.java +++ b/src/main/java/com/p14n/postevent/debezium/Functions.java @@ -14,7 +14,7 @@ public static Event changeEventToEvent(ChangeEvent record) throw var actualObj = mapper.readTree(record.value()); var payload = actualObj.get("payload"); var r = payload != null ? payload.get("after") : null; - if (r != null && + if (r != null && payload != null && payload.get("source") != null && payload.get("source").get("table") != null) { var topic = payload.get("source").get("table"); diff --git a/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java b/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java index ca8397e..ef87623 100644 --- a/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java +++ b/src/main/java/com/p14n/postevent/processor/OrderedProcessor.java @@ -1,8 +1,6 @@ package com.p14n.postevent.processor; import com.p14n.postevent.data.Event; -import com.p14n.postevent.db.DatabaseSetup; - import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -60,8 +58,8 @@ public boolean process(Connection connection, Event event) { return false; } if (!previousEventExists(connection, event)) { - logger.atDebug().log(() -> "Skipping event " + event.id() + " (idn: " + event.idn() + ")" - + " as the previous event has not reached the client"); + logger.atDebug().log(() -> "Skipping event " + event.id() + " (idn: " + event.idn() + ") topic " + + event.topic() + " as the previous event has not reached the client"); return false; } @@ -71,7 +69,8 @@ public boolean process(Connection connection, Event event) { if (success) { connection.commit(); logger.atDebug() - .log(() -> "Successfully processed event " + event.id() + " (idn: " + event.idn() + ")"); + .log(() -> "Successfully processed event " + event.id() + " (idn: " + event.idn() + ") " + + event.topic()); return true; } else { return false; // Already rolled back in processEventWithFunction @@ -94,11 +93,12 @@ public boolean process(Connection connection, Event event) { */ private boolean hasUnprocessedPriorEvents(Connection connection, Event event) throws SQLException { String sql = "SELECT COUNT(*) FROM postevent.messages " + - "WHERE subject = ? AND idn < ? AND status != 'p'"; + "WHERE subject = ? AND topic = ? AND idn < ? AND status != 'p'"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { stmt.setString(1, event.subject()); - stmt.setLong(2, event.idn()); + stmt.setString(2, event.topic()); + stmt.setLong(3, event.idn()); try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { @@ -145,10 +145,11 @@ private boolean previousEventExists(Connection connection, Event event) throws S */ private boolean updateEventStatus(Connection connection, Event event) throws SQLException { String sql = "UPDATE postevent.messages SET status = 'p' " + - "WHERE idn = ? AND status = 'u'"; + "WHERE idn = ? AND topic = ? AND status = 'u'"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { stmt.setLong(1, event.idn()); + stmt.setString(2, event.topic()); int updatedRows = stmt.executeUpdate(); return updatedRows > 0; diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 2c91ff3..6e44c64 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -8,4 +8,6 @@ + + diff --git a/src/test/java/com/p14n/postevent/CatchupServerTest.java b/src/test/java/com/p14n/postevent/CatchupServerTest.java index 4c250b4..0379942 100644 --- a/src/test/java/com/p14n/postevent/CatchupServerTest.java +++ b/src/test/java/com/p14n/postevent/CatchupServerTest.java @@ -11,8 +11,6 @@ import java.sql.Connection; import java.sql.DriverManager; import java.util.List; -import java.util.UUID; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -57,7 +55,7 @@ public void testFetchEvents() throws Exception { } } // Fetch events - List events = catchupServer.fetchEvents(1, 5, 10,TEST_TOPIC); + List events = catchupServer.fetchEvents(1, 5, 10, TEST_TOPIC); // Verify results assertEquals(4, events.size()); @@ -75,17 +73,17 @@ public void testMaxResultsIsRespected() throws Exception { } // Test with different maxResults values - List events1 = catchupServer.fetchEvents(1, 20, 5,TEST_TOPIC); + List events1 = catchupServer.fetchEvents(1, 20, 5, TEST_TOPIC); assertEquals(5, events1.size()); - List events2 = catchupServer.fetchEvents(1, 20, 10,TEST_TOPIC); + List events2 = catchupServer.fetchEvents(1, 20, 10, TEST_TOPIC); assertEquals(10, events2.size()); - List events3 = catchupServer.fetchEvents(1, 20, 15,TEST_TOPIC); + List events3 = catchupServer.fetchEvents(1, 20, 15, TEST_TOPIC); assertEquals(15, events3.size()); // When maxResults is greater than available events - List events4 = catchupServer.fetchEvents(10, 20, 20,TEST_TOPIC); + List events4 = catchupServer.fetchEvents(10, 20, 20, TEST_TOPIC); assertEquals(10, events4.size()); } @@ -101,7 +99,7 @@ public void testMaxResultsLimitsLargeRange() throws Exception { } // Request a large range but limit with maxResults - List events = catchupServer.fetchEvents(0, 50, 25,TEST_TOPIC); + List events = catchupServer.fetchEvents(0, 50, 25, TEST_TOPIC); // Verify maxResults is respected assertEquals(25, events.size()); @@ -119,9 +117,9 @@ public void testMaxResultsLimitsLargeRange() throws Exception { @Test public void testInvalidParameters() { // Test invalid start/end - assertThrows(IllegalArgumentException.class, () -> catchupServer.fetchEvents(10, 5, 10,TEST_TOPIC)); + assertThrows(IllegalArgumentException.class, () -> catchupServer.fetchEvents(10, 5, 10, TEST_TOPIC)); // Test invalid maxResults - assertThrows(IllegalArgumentException.class, () -> catchupServer.fetchEvents(1, 10, 0,TEST_TOPIC)); + assertThrows(IllegalArgumentException.class, () -> catchupServer.fetchEvents(1, 10, 0, TEST_TOPIC)); } } \ No newline at end of file diff --git a/src/test/java/com/p14n/postevent/CatchupServiceTest.java b/src/test/java/com/p14n/postevent/CatchupServiceTest.java index 148a679..9fe214b 100644 --- a/src/test/java/com/p14n/postevent/CatchupServiceTest.java +++ b/src/test/java/com/p14n/postevent/CatchupServiceTest.java @@ -140,8 +140,8 @@ public void testCatchupWithExistingHwm() throws Exception { } createProcessingGap(connection); - logEventsInTopicTable(connection); - logEventsInMessagesTable(connection); + TestUtil.logEventsInTopicTable(connection, log, TEST_TOPIC); + TestUtil.logEventsInMessagesTable(connection, log); // Process initial events log.debug("Processing initial events"); int initialProcessed = catchupService.catchup(TEST_TOPIC); @@ -167,7 +167,7 @@ public void testCatchupWithExistingHwm() throws Exception { // Log the actual events in the table for debugging log.debug("Checking events in the table:"); - logEventsInMessagesTable(connection); + TestUtil.logEventsInMessagesTable(connection, log); // There should be 4 new events in the messages table as the processing gap // created a gap of 4 events @@ -231,8 +231,8 @@ public void testHasSequenceGapWithGap() throws Exception { Publisher.publish(event, connection, TEST_TOPIC); } copyEventsToMessages(connection, 5); - logEventsInTopicTable(connection); - logEventsInMessagesTable(connection); + TestUtil.logEventsInTopicTable(connection, log, TEST_TOPIC); + TestUtil.logEventsInMessagesTable(connection, log); // Initialize HWM to 0 initializeHwm(connection, TEST_TOPIC, 0); @@ -296,32 +296,4 @@ private int countMessagesInTable(Connection connection) throws Exception { } } - private void logEventsInMessagesTable(Connection connection) throws Exception { - log.debug("postevent.messages contents:"); - String sql = "SELECT idn, id, source FROM postevent.messages ORDER BY idn"; - try (PreparedStatement stmt = connection.prepareStatement(sql); - ResultSet rs = stmt.executeQuery()) { - while (rs.next()) { - log.debug("Event: idn={}, id={}, source={}", - rs.getLong("idn"), - rs.getString("id"), - rs.getString("source")); - } - } - } - - private void logEventsInTopicTable(Connection connection) throws Exception { - log.debug("postevent.test_events contents:"); - String sql = "SELECT idn, id, source FROM postevent.test_events ORDER BY idn"; - try (PreparedStatement stmt = connection.prepareStatement(sql); - ResultSet rs = stmt.executeQuery()) { - while (rs.next()) { - log.debug("Event: idn={}, id={}, source={}", - rs.getLong("idn"), - rs.getString("id"), - rs.getString("source")); - } - } - } - } \ No newline at end of file diff --git a/src/test/java/com/p14n/postevent/DefaultMessageBrokerTest.java b/src/test/java/com/p14n/postevent/DefaultMessageBrokerTest.java index e2bb4db..b719452 100644 --- a/src/test/java/com/p14n/postevent/DefaultMessageBrokerTest.java +++ b/src/test/java/com/p14n/postevent/DefaultMessageBrokerTest.java @@ -18,6 +18,7 @@ class DefaultMessageBrokerTest { private volatile DefaultMessageBroker broker; + private final String TOPIC = "topic"; @AfterEach void tearDown() { @@ -69,10 +70,10 @@ public void onError(Throwable error) { } }; - broker.subscribe(subscriber1); - broker.subscribe(subscriber2); + broker.subscribe(TOPIC, subscriber1); + broker.subscribe(TOPIC, subscriber2); - broker.publish("test"); + broker.publish(TOPIC, "test"); assertTrue(counter1.await(1, TimeUnit.SECONDS)); assertTrue(counter2.await(1, TimeUnit.SECONDS)); @@ -80,7 +81,7 @@ public void onError(Throwable error) { @Test void shouldSilentlyDropMessagesWithNoSubscribers() { - broker.publish("test"); // Should not throw + broker.publish(TOPIC, "test"); // Should not throw } @Test @@ -102,8 +103,8 @@ public void onError(Throwable error) { } }; - broker.subscribe(erroringSubscriber); - broker.publish("test"); + broker.subscribe(TOPIC, erroringSubscriber); + broker.publish(TOPIC, "test"); assertTrue(counter.await(1, TimeUnit.SECONDS)); assertSame(testException, caughtError.get()); @@ -126,14 +127,14 @@ public void onError(Throwable error) { } }; - broker.subscribe(subscriber); + broker.subscribe(TOPIC, subscriber); // Create publisher threads for (int i = 0; i < threadCount; i++) { new Thread(() -> { try { startLatch.await(); - broker.publish("test"); + broker.publish(TOPIC, "test"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -147,8 +148,8 @@ public void onError(Throwable error) { @Test void shouldPreventPublishingAfterClose() { broker.close(); - assertThrows(IllegalStateException.class, () -> broker.publish("test")); - assertThrows(IllegalStateException.class, () -> broker.subscribe(new MessageSubscriber<>() { + assertThrows(IllegalStateException.class, () -> broker.publish(TOPIC, "test")); + assertThrows(IllegalStateException.class, () -> broker.subscribe(TOPIC, new MessageSubscriber<>() { @Override public void onMessage(String message) { } @@ -175,12 +176,12 @@ public void onError(Throwable error) { } }; - broker.subscribe(subscriber); - broker.publish("first message"); + broker.subscribe(TOPIC, subscriber); + broker.publish(TOPIC, "first message"); assertTrue(counter.await(1, TimeUnit.SECONDS), "Should receive message while subscribed"); - broker.unsubscribe(subscriber); - broker.publish("second message"); + broker.unsubscribe(TOPIC, subscriber); + broker.publish(TOPIC, "second message"); Thread.sleep(100); assertEquals(1, messageCount.get(), "Should not receive message after unsubscribe"); } diff --git a/src/test/java/com/p14n/postevent/LocalConsumerTest.java b/src/test/java/com/p14n/postevent/LocalConsumerTest.java index 8e0de51..9685ce4 100644 --- a/src/test/java/com/p14n/postevent/LocalConsumerTest.java +++ b/src/test/java/com/p14n/postevent/LocalConsumerTest.java @@ -12,6 +12,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.sql.Connection; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -39,7 +40,7 @@ void setUp() throws Exception { broker = new EventMessageBroker(); PostEventConfig config = new ConfigData( "test", - "test", + Set.of("test_topic"), // renamed from "test" "localhost", pg.getPort(), "postgres", @@ -69,7 +70,7 @@ void shouldReceivePublishedEvent() throws Exception { AtomicReference receivedEvent = new AtomicReference<>(); // Setup consumer - broker.subscribe(new MessageSubscriber() { + broker.subscribe("test_topic", new MessageSubscriber() { @Override public void onMessage(Event event) { receivedEvent.set(event); @@ -90,7 +91,7 @@ public void onError(Throwable error) { "text/plain", "test-schema", "test-subject", "test-data".getBytes()); - Publisher.publish(testEvent, conn, "test"); + Publisher.publish(testEvent, conn, "test_topic"); assertTrue(latch.await(10, TimeUnit.SECONDS), "Did not receive event within timeout"); @@ -105,4 +106,4 @@ public void onError(Throwable error) { assertArrayEquals(testEvent.data(), actual.data()); } -} \ No newline at end of file +} diff --git a/src/test/java/com/p14n/postevent/PersistentBrokerTest.java b/src/test/java/com/p14n/postevent/PersistentBrokerTest.java index 1151c59..54d1a2a 100644 --- a/src/test/java/com/p14n/postevent/PersistentBrokerTest.java +++ b/src/test/java/com/p14n/postevent/PersistentBrokerTest.java @@ -1,7 +1,6 @@ package com.p14n.postevent; import com.p14n.postevent.broker.MessageBroker; -import com.p14n.postevent.broker.MessageSubscriber; import com.p14n.postevent.broker.SystemEventBroker; import com.p14n.postevent.catchup.PersistentBroker; import com.p14n.postevent.data.Event; @@ -56,7 +55,7 @@ void shouldPersistAndForwardEvent() throws Exception { // Create test event Event testEvent = Event.create( "test-123", "test-source", "test-type", "text/plain", - "test-schema", "test-subject", "test-data".getBytes(), Instant.now(), 1L,"topic"); + "test-schema", "test-subject", "test-data".getBytes(), Instant.now(), 1L, "topic"); try (Statement stmt = conn.createStatement();) { stmt.executeUpdate("insert into postevent.contiguous_hwm (topic_name, hwm) values ('topic',0)"); @@ -64,7 +63,7 @@ void shouldPersistAndForwardEvent() throws Exception { } // Test the subscriber - persistentBroker.publish(testEvent); + persistentBroker.publish("topic", testEvent); // Verify database persistence try (Statement stmt = conn.createStatement(); @@ -81,6 +80,6 @@ void shouldPersistAndForwardEvent() throws Exception { } // Verify forwarding to subscriber - verify(mockSubscriber).publish(testEvent); + verify(mockSubscriber).publish("topic", testEvent); } } \ No newline at end of file diff --git a/src/test/java/com/p14n/postevent/PostgresDebeziumConnectorTest.java b/src/test/java/com/p14n/postevent/PostgresDebeziumConnectorTest.java index 5fbc397..74c60a4 100644 --- a/src/test/java/com/p14n/postevent/PostgresDebeziumConnectorTest.java +++ b/src/test/java/com/p14n/postevent/PostgresDebeziumConnectorTest.java @@ -6,6 +6,7 @@ import io.debezium.engine.ChangeEvent; import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -72,7 +73,7 @@ void throwsExceptionWhenStartedWithoutConsumer() { @Test void engineStartsWithConsumer() throws IOException, InterruptedException { AtomicBoolean consumerCalled = new AtomicBoolean(false); - ConfigData cfg = new ConfigData("test", "test", "localhost", pg.getPort(), "postgres", "postgres", + ConfigData cfg = new ConfigData("test", Set.of("test"), "localhost", pg.getPort(), "postgres", "postgres", "postgres", null); engine.start(cfg, (ChangeEvent event) -> { consumerCalled.set(true); @@ -85,7 +86,7 @@ void engineStartsWithConsumer() throws IOException, InterruptedException { @Test void engineReceivesMessage() throws SQLException, IOException, InterruptedException { - ConfigData cfg = new ConfigData("test", "test", "localhost", pg.getPort(), "postgres", "postgres", + ConfigData cfg = new ConfigData("test", Set.of("test"), "localhost", pg.getPort(), "postgres", "postgres", "postgres", null); var latch = new CountDownLatch(1); var result = new AtomicReference(); diff --git a/src/test/java/com/p14n/postevent/TestUtil.java b/src/test/java/com/p14n/postevent/TestUtil.java index 7be0bc1..c2203b5 100644 --- a/src/test/java/com/p14n/postevent/TestUtil.java +++ b/src/test/java/com/p14n/postevent/TestUtil.java @@ -2,8 +2,13 @@ import com.p14n.postevent.data.Event; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.UUID; +import org.slf4j.Logger; + public class TestUtil { public static Event createTestEvent(int i) { return Event.create( @@ -15,7 +20,8 @@ public static Event createTestEvent(int i) { "test-subject", ("{\"value\":" + i + "}").getBytes()); } - public static Event createTestEvent(int i,String subject) { + + public static Event createTestEvent(int i, String subject) { return Event.create( UUID.randomUUID().toString(), "test-source", @@ -25,4 +31,49 @@ public static Event createTestEvent(int i,String subject) { subject, ("{\"value\":" + i + "}").getBytes()); } + + public static void logEventsInMessagesTable(Connection connection, Logger log) throws Exception { + log.debug("postevent.messages contents:"); + String sql = "SELECT idn, id, source, status, topic, subject FROM postevent.messages ORDER BY idn"; + try (PreparedStatement stmt = connection.prepareStatement(sql); + ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + log.debug("Event: idn={}, id={}, source={}, status={}, topic={}, subject={}", + rs.getLong("idn"), + rs.getString("id"), + rs.getString("source"), + rs.getString("status"), + rs.getString("topic"), + rs.getString("subject")); + } + } + } + + public static void logEventsInTopicTable(Connection connection, Logger log, String topic) throws Exception { + log.debug("postevent." + topic + " contents:"); + String sql = "SELECT idn, id, source FROM postevent." + topic + " ORDER BY idn"; + try (PreparedStatement stmt = connection.prepareStatement(sql); + ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + log.debug("Event: idn={}, id={}, source={}", + rs.getLong("idn"), + rs.getString("id"), + rs.getString("source")); + } + } + } + + public static void logEventsInHwmTable(Connection connection, Logger log) throws Exception { + log.debug("postevent.contiguous_hwm contents:"); + String sql = "SELECT * FROM postevent.contiguous_hwm ORDER BY topic_name"; + try (PreparedStatement stmt = connection.prepareStatement(sql); + ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + log.debug("HWM: topic={}, hwm={}", + rs.getString("topic_name"), + rs.getLong("hwm")); + } + } + } + } diff --git a/src/test/java/com/p14n/postevent/broker/DeterministicConsumerTest.java b/src/test/java/com/p14n/postevent/broker/DeterministicConsumerTest.java index 95d75a5..c412710 100644 --- a/src/test/java/com/p14n/postevent/broker/DeterministicConsumerTest.java +++ b/src/test/java/com/p14n/postevent/broker/DeterministicConsumerTest.java @@ -7,15 +7,8 @@ import com.p14n.postevent.data.ConfigData; import com.p14n.postevent.example.ExampleUtil; -import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; import net.jqwik.api.*; -import net.jqwik.api.lifecycle.AfterProperty; -import net.jqwik.api.lifecycle.BeforeProperty; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - -import javax.sql.DataSource; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.Set; @@ -45,7 +38,7 @@ void testDeterministicEventDelivery(@ForAll("randomSeeds") long seed) throws Exc var config = new ConfigData( TOPIC, - TOPIC, + Set.of(TOPIC), "127.0.0.1", pg.getPort(), "postgres", @@ -58,8 +51,8 @@ void testDeterministicEventDelivery(@ForAll("randomSeeds") long seed) throws Exc server.start(PORT); // Start client - var client = new ConsumerClient(TOPIC, executor); - client.start(dataSource, "localhost", PORT); + var client = new ConsumerClient(executor); + client.start(Set.of(TOPIC), dataSource, "localhost", PORT); var receivedEventIdns = new CopyOnWriteArrayList(); Set receivedEventIds = ConcurrentHashMap.newKeySet(); @@ -73,7 +66,7 @@ void testDeterministicEventDelivery(@ForAll("randomSeeds") long seed) throws Exc AtomicInteger failmod = new AtomicInteger(5); // Setup client subscriber - client.subscribe((TransactionalEvent event) -> { + client.subscribe(TOPIC, (TransactionalEvent event) -> { var eventIdn = event.event().idn(); if (eventIdn % failmod.getAndIncrement() == 0) { throw new RuntimeException("Fell over intentionally"); @@ -115,6 +108,13 @@ void testDeterministicEventDelivery(@ForAll("randomSeeds") long seed) throws Exc } Thread.sleep(2000); + try (var connection = dataSource.getConnection()) { + connection.setAutoCommit(false); + TestUtil.logEventsInTopicTable(connection, logger, TOPIC); + TestUtil.logEventsInMessagesTable(connection, logger); + TestUtil.logEventsInHwmTable(connection, logger); + } + logger.atInfo().log("Test completed in {} ticks", tickCount); logger.atInfo().log("Published events: {}", publishedEventIds.size()); logger.atInfo().log("Received events: {}", receivedEventIds.size()); @@ -147,7 +147,7 @@ void testDeterministicEventDeliveryBySubject(@ForAll("randomSeeds") long seed) t var config = new ConfigData( TOPIC, - TOPIC, + Set.of(TOPIC), "127.0.0.1", pg.getPort(), "postgres", @@ -162,8 +162,8 @@ void testDeterministicEventDeliveryBySubject(@ForAll("randomSeeds") long seed) t server.start(PORT); // Start client - var client = new ConsumerClient(TOPIC, executor); - client.start(dataSource, "localhost", PORT); + var client = new ConsumerClient(executor); + client.start(Set.of(TOPIC), dataSource, "localhost", PORT); logger.atInfo().log("Testing with seed: {}", seed); Random random = new Random(seed); @@ -183,7 +183,7 @@ void testDeterministicEventDeliveryBySubject(@ForAll("randomSeeds") long seed) t AtomicInteger failmod = new AtomicInteger(5); // Setup client subscriber - client.subscribe((TransactionalEvent event) -> { + client.subscribe(TOPIC, (TransactionalEvent event) -> { var eventIdn = event.event().idn(); if (eventIdn % failmod.getAndIncrement() == 0) { throw new RuntimeException("Fell over intentionally"); @@ -251,6 +251,159 @@ void testDeterministicEventDeliveryBySubject(@ForAll("randomSeeds") long seed) t } } + @Property(tries = 5) + void testMultipleTopicsWithDedicatedClients(@ForAll("randomSeeds") long seed) throws Exception { + logger.atInfo().log("Testing with seed: {}", seed); + Random random = new Random(seed); + var executor = new TestAsyncExecutor(); + + String topic1 = "test_topic_one"; + String topic2 = "test_topic_two"; + + try (var pg = ExampleUtil.embeddedPostgres()) { + var dataSource = pg.getPostgresDatabase(); + + // Configure server with both topics + var config = new ConfigData( + "server1", + Set.of(topic1, topic2), + "127.0.0.1", + pg.getPort(), + "postgres", + "postgres", + "postgres"); + + // Start server + var server = new ConsumerServer(dataSource, config, executor); + server.start(PORT); + + // Start client for topic1 + var client1 = new ConsumerClient(executor); + client1.start(Set.of(topic1), dataSource, "localhost", PORT); + + // Start client for topic2 + var client2 = new ConsumerClient(executor); + client2.start(Set.of(topic2), dataSource, "localhost", PORT); + + // Track received events per topic + Set receivedEventIdsTopic1 = ConcurrentHashMap.newKeySet(); + Set receivedEventIdsTopic2 = ConcurrentHashMap.newKeySet(); + Set publishedEventIdsTopic1 = ConcurrentHashMap.newKeySet(); + Set publishedEventIdsTopic2 = ConcurrentHashMap.newKeySet(); + + // Generate random number of events per topic (1-50 each) + int numberOfEventsTopic1 = random.nextInt(50) + 1; + int numberOfEventsTopic2 = random.nextInt(50) + 1; + int totalEvents = numberOfEventsTopic1 + numberOfEventsTopic2; + + var eventsLatch = new CountDownLatch(totalEvents); + logger.atInfo().log("Testing with {} events for topic1 and {} events for topic2", + numberOfEventsTopic1, numberOfEventsTopic2); + + // Setup client1 subscriber + client1.subscribe(topic1, (TransactionalEvent event) -> { + var eventId = event.event().id(); + receivedEventIdsTopic1.add(eventId); + logger.atInfo().log("Topic1 received event: {} {}", eventId, receivedEventIdsTopic1.size()); + eventsLatch.countDown(); + }); + + // Setup client2 subscriber + client2.subscribe(topic2, (TransactionalEvent event) -> { + var eventId = event.event().id(); + receivedEventIdsTopic2.add(eventId); + logger.atInfo().log("Topic2 received event: {} {}", eventId, receivedEventIdsTopic2.size()); + eventsLatch.countDown(); + }); + + // Publish events to both topics + for (int i = 0; i < numberOfEventsTopic1; i++) { + final int eventNumber = i; + executor.submit(() -> { + try { + var event = TestUtil.createTestEvent(eventNumber, "topic1_subject"); + publishedEventIdsTopic1.add(event.id()); + Publisher.publish(event, dataSource, topic1); + logger.atInfo().log("Published event to topic1: {}", event.id()); + return event.id(); + } catch (Exception e) { + throw new RuntimeException("Failed to publish event to topic1", e); + } + }); + } + + for (int i = 0; i < numberOfEventsTopic2; i++) { + final int eventNumber = i; + executor.submit(() -> { + try { + var event = TestUtil.createTestEvent(eventNumber, "topic2_subject"); + publishedEventIdsTopic2.add(event.id()); + Publisher.publish(event, dataSource, topic2); + logger.atInfo().log("Published event to topic2: {}", event.id()); + return event.id(); + } catch (Exception e) { + throw new RuntimeException("Failed to publish event to topic2", e); + } + }); + } + + // Calculate maximum ticks allowed + int maxTicks = (totalEvents * 10) + 100; + int tickCount = 0; + + // Tick until all events are received or max ticks reached + while (tickCount < maxTicks && + (receivedEventIdsTopic1.size() < numberOfEventsTopic1 || + receivedEventIdsTopic2.size() < numberOfEventsTopic2)) { + executor.tick(random, tickCount % 5 == 0); + Thread.sleep(10); + tickCount++; + logger.atInfo().log("Tick {}: Topic1 received {}/{}, Topic2 received {}/{} events", + tickCount, + receivedEventIdsTopic1.size(), numberOfEventsTopic1, + receivedEventIdsTopic2.size(), numberOfEventsTopic2); + } + + Thread.sleep(2000); + + try (var connection = dataSource.getConnection()) { + connection.setAutoCommit(false); + TestUtil.logEventsInTopicTable(connection, logger, topic1); + TestUtil.logEventsInTopicTable(connection, logger, topic2); + TestUtil.logEventsInMessagesTable(connection, logger); + TestUtil.logEventsInHwmTable(connection, logger); + } + Thread.sleep(100); + // Assertions + assertTrue(tickCount < maxTicks, "Test did not complete within maximum ticks(" + maxTicks + ")"); + + // Topic1 assertions + assertEquals(numberOfEventsTopic1, publishedEventIdsTopic1.size(), + "Not all events were published to topic1"); + assertEquals(numberOfEventsTopic1, receivedEventIdsTopic1.size(), + "Not all events were received on topic1"); + assertTrue(receivedEventIdsTopic1.containsAll(publishedEventIdsTopic1), + "Not all published events were received on topic1"); + assertTrue(Collections.disjoint(receivedEventIdsTopic1, publishedEventIdsTopic2), + "Topic1 client received events from topic2"); + + // Topic2 assertions + assertEquals(numberOfEventsTopic2, publishedEventIdsTopic2.size(), + "Not all events were published to topic2"); + assertEquals(numberOfEventsTopic2, receivedEventIdsTopic2.size(), + "Not all events were received on topic2"); + assertTrue(receivedEventIdsTopic2.containsAll(publishedEventIdsTopic2), + "Not all published events were received on topic2"); + assertTrue(Collections.disjoint(receivedEventIdsTopic2, publishedEventIdsTopic1), + "Topic2 client received events from topic1"); + + close(server); + close(client1); + close(client2); + close(executor); + } + } + private void close(AutoCloseable c) { try { if (c != null) diff --git a/src/test/java/com/p14n/postevent/broker/TestAsyncExecutor.java b/src/test/java/com/p14n/postevent/broker/TestAsyncExecutor.java index b0ee07c..9972a67 100644 --- a/src/test/java/com/p14n/postevent/broker/TestAsyncExecutor.java +++ b/src/test/java/com/p14n/postevent/broker/TestAsyncExecutor.java @@ -173,6 +173,8 @@ public void close() { @SuppressWarnings("unchecked") public void tick(Random random, boolean includeScheduled) { + System.out.println("Tick" + scheduledTasks.size() + " " + includeScheduled); + var handleScheduled = includeScheduled && !scheduledTasks.isEmpty(); if (pendingTasks.isEmpty() && !handleScheduled) { return; diff --git a/src/test/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcIntegrationTest.java b/src/test/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcIntegrationTest.java index 8c3b417..acba76a 100644 --- a/src/test/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcIntegrationTest.java +++ b/src/test/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcIntegrationTest.java @@ -1,13 +1,8 @@ package com.p14n.postevent.broker.grpc; import com.p14n.postevent.broker.DefaultMessageBroker; -import com.p14n.postevent.broker.MessageBroker; import com.p14n.postevent.broker.MessageSubscriber; import com.p14n.postevent.data.Event; -import com.p14n.postevent.db.DatabaseSetup; - -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; import org.junit.jupiter.api.AfterEach; @@ -32,6 +27,7 @@ public class MessageBrokerGrpcIntegrationTest { private static final int PORT = 50052; private static final String HOST = "localhost"; + private static final String TOPIC = "topic"; private Server server; private MessageBrokerGrpcClient client; @@ -55,7 +51,7 @@ public void setUp() throws IOException { .start(); // Create the client - client = new MessageBrokerGrpcClient(HOST, PORT, "*"); + client = new MessageBrokerGrpcClient(HOST, PORT); } @AfterEach @@ -77,7 +73,7 @@ public void testSubscribeToEvents() throws Exception { AtomicBoolean errorOccurred = new AtomicBoolean(false); // Add a subscriber to the client to collect events - client.subscribe(new MessageSubscriber() { + client.subscribe(TOPIC, new MessageSubscriber() { @Override public void onMessage(Event event) { receivedEvents.add(event); @@ -97,7 +93,7 @@ public void onError(Throwable error) { Event event = createSampleEvent(i + 1); logger.info("Publishing event: " + i + " " + event.id()); - messageBroker.publish(event); + messageBroker.publish(TOPIC, event); logger.info("Published event: " + event.id()); // Small delay to avoid overwhelming the stream Thread.sleep(50); @@ -150,12 +146,12 @@ public void onError(Throwable error) { } }; - client.subscribe(subscriber); + client.subscribe(TOPIC, subscriber); Thread.sleep(200); // Publish an event Event event1 = createSampleEvent(1); - messageBroker.publish(event1); + messageBroker.publish(TOPIC, event1); // Wait for the event to be received logger.info("Waiting for event to be received"); @@ -164,12 +160,12 @@ public void onError(Throwable error) { // Unsubscribe logger.info("Unsubscribing from events"); - client.unsubscribe(subscriber); + client.unsubscribe(TOPIC, subscriber); logger.info("Unsubscribed from events"); // Publish another event Event event2 = createSampleEvent(2); - messageBroker.publish(event2); + messageBroker.publish(TOPIC, event2); // Wait a bit to see if the event is received logger.info("Waiting for secondevent to be received"); @@ -200,7 +196,7 @@ private Event createSampleEvent(long idn) { data, time, idn, - "topic"); + TOPIC); } /** @@ -210,10 +206,10 @@ private static class TestMessageBroker extends DefaultMessageBroker publishedEvents = new ArrayList<>(); @Override - public void publish(Event event) { + public void publish(String topic, Event event) { publishedEvents.add(event); logger.info("Published event in test broker: " + event.id()); - super.publish(event); + super.publish(topic, event); } @Override diff --git a/src/test/java/com/p14n/postevent/catchup/grpc/CatchupGrpcIntegrationTest.java b/src/test/java/com/p14n/postevent/catchup/grpc/CatchupGrpcIntegrationTest.java index 72a6a49..d202c19 100644 --- a/src/test/java/com/p14n/postevent/catchup/grpc/CatchupGrpcIntegrationTest.java +++ b/src/test/java/com/p14n/postevent/catchup/grpc/CatchupGrpcIntegrationTest.java @@ -9,8 +9,6 @@ import java.io.IOException; import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.UUID; diff --git a/src/test/java/com/p14n/postevent/example/LocalConsumerExample.java b/src/test/java/com/p14n/postevent/example/LocalConsumerExample.java index 53fea37..599c39e 100644 --- a/src/test/java/com/p14n/postevent/example/LocalConsumerExample.java +++ b/src/test/java/com/p14n/postevent/example/LocalConsumerExample.java @@ -3,19 +3,15 @@ import com.p14n.postevent.LocalConsumer; import com.p14n.postevent.Publisher; import com.p14n.postevent.TestUtil; -import com.p14n.postevent.broker.DefaultMessageBroker; import com.p14n.postevent.broker.EventMessageBroker; -import com.p14n.postevent.broker.MessageBroker; -import com.p14n.postevent.broker.MessageSubscriber; import com.p14n.postevent.data.ConfigData; -import com.p14n.postevent.data.Event; import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.SQLException; -import java.util.Properties; +import java.util.Set; import java.util.concurrent.CountDownLatch; public class LocalConsumerExample { @@ -28,14 +24,14 @@ public static void main(String[] args) throws IOException, InterruptedException, EmbeddedPostgres pg = ExampleUtil.embeddedPostgres(); var lc = new LocalConsumer<>(new ConfigData( "local", - "topic", + Set.of("topic"), "127.0.0.1", pg.getPort(), "postgres", "postgres", "postgres"), mb)) { - mb.subscribe(message -> { + mb.subscribe("topic", message -> { logger.atInfo().log("********* Message received *************"); l.countDown(); }); diff --git a/src/test/java/com/p14n/postevent/example/LocalPersistentConsumerExample.java b/src/test/java/com/p14n/postevent/example/LocalPersistentConsumerExample.java index 3475bad..eaead7e 100644 --- a/src/test/java/com/p14n/postevent/example/LocalPersistentConsumerExample.java +++ b/src/test/java/com/p14n/postevent/example/LocalPersistentConsumerExample.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.sql.SQLException; +import java.util.Set; import java.util.concurrent.CountDownLatch; public class LocalPersistentConsumerExample { @@ -16,7 +17,7 @@ public static void main(String[] args) throws IOException, InterruptedException, try (var pg = ExampleUtil.embeddedPostgres(); var lc = new LocalPersistentConsumer(pg.getPostgresDatabase(), new ConfigData( "local", - "topic", + Set.of("topic"), "127.0.0.1", pg.getPort(), "postgres", @@ -27,7 +28,7 @@ public static void main(String[] args) throws IOException, InterruptedException, lc.start(); - lc.subscribe(message -> { + lc.subscribe("topic", message -> { System.err.println("********* Message received *************"); l.countDown(); }); diff --git a/src/test/java/com/p14n/postevent/example/RemoteConsumerExample.java b/src/test/java/com/p14n/postevent/example/RemoteConsumerExample.java index 5e4d345..cdcfa8e 100644 --- a/src/test/java/com/p14n/postevent/example/RemoteConsumerExample.java +++ b/src/test/java/com/p14n/postevent/example/RemoteConsumerExample.java @@ -7,6 +7,8 @@ import com.p14n.postevent.data.ConfigData; import javax.sql.DataSource; + +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -25,9 +27,9 @@ private static void constructServer(DataSource ds, ConfigData cfg, CountDownLatc private static void constructClient(DataSource ds, CountDownLatch l, int port, String topic) { - try (ConsumerClient client = new ConsumerClient(topic)) { - client.start(ds, "localhost", port); - client.subscribe(message -> { + try (ConsumerClient client = new ConsumerClient()) { + client.start(Set.of(topic), ds, "localhost", port); + client.subscribe(topic, message -> { System.err.println("********* Message received *************"); l.countDown(); }); @@ -48,7 +50,7 @@ public static void main(String[] args) throws Exception { var ds = pg.getPostgresDatabase(); var cfg = new ConfigData( "local", - "topic", + Set.of("topic"), "127.0.0.1", pg.getPort(), "postgres", @@ -60,11 +62,11 @@ public static void main(String[] args) throws Exception { Thread.sleep(2000); - es.execute(() -> constructClient(ds, serverLatch, port, cfg.name())); + es.execute(() -> constructClient(ds, serverLatch, port, cfg.topics().iterator().next())); Thread.sleep(2000); - Publisher.publish(TestUtil.createTestEvent(1), ds, cfg.name()); + Publisher.publish(TestUtil.createTestEvent(1), ds, cfg.topics().iterator().next()); serverLatch.await(); diff --git a/src/test/java/com/p14n/postevent/processor/OrderedProcessorTest.java b/src/test/java/com/p14n/postevent/processor/OrderedProcessorTest.java index 72cb859..c481f8f 100644 --- a/src/test/java/com/p14n/postevent/processor/OrderedProcessorTest.java +++ b/src/test/java/com/p14n/postevent/processor/OrderedProcessorTest.java @@ -3,9 +3,6 @@ import com.p14n.postevent.data.Event; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; - import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -46,7 +43,7 @@ public void setUp() throws SQLException { return mockPriorEventsStmt; } else if (sql.contains("UPDATE")) { return mockUpdateStmt; - } else if (sql.contains("SELECT hwm")){ + } else if (sql.contains("SELECT hwm")) { return mockHwmStmt; } return mock(PreparedStatement.class); @@ -85,7 +82,8 @@ public void testProcessSuccessful() throws SQLException { verify(mockConnection).commit(); // Verify statements verify(mockPriorEventsStmt).setString(1, testEvent.subject()); - verify(mockPriorEventsStmt).setLong(2, testEvent.idn()); + verify(mockPriorEventsStmt).setString(2, testEvent.topic()); + verify(mockPriorEventsStmt).setLong(3, testEvent.idn()); verify(mockUpdateStmt).setLong(1, testEvent.idn()); // Verify processor was called @@ -111,7 +109,8 @@ public void testProcessWithPriorUnprocessedEvents() throws SQLException { // Verify statements verify(mockPriorEventsStmt).setString(1, testEvent.subject()); - verify(mockPriorEventsStmt).setLong(2, testEvent.idn()); + verify(mockPriorEventsStmt).setString(2, testEvent.topic()); + verify(mockPriorEventsStmt).setLong(3, testEvent.idn()); // Verify processor was not called verify(mockProcessor, never()).apply(any(), any());