Skip to content

Augment multiple topics#42

Merged
p14n merged 4 commits intomainfrom
augment-multiple-topics
Apr 3, 2025
Merged

Augment multiple topics#42
p14n merged 4 commits intomainfrom
augment-multiple-topics

Conversation

@p14n
Copy link
Copy Markdown
Owner

@p14n p14n commented Apr 3, 2025

Summary by Sourcery

Augment the message broker to support multiple topics with dedicated clients and improve event processing across different topics

New Features:

  • Support for multiple topics in message broker infrastructure
  • Enhanced topic-specific event processing and routing

Enhancements:

  • Refactored message broker to support topic-based subscriptions
  • Improved event persistence and processing with topic-specific handling
  • Updated database schema to support multi-topic event tracking

Tests:

  • Added test for multiple topics with dedicated clients
  • Enhanced test coverage for topic-specific event processing

Chores:

  • Renamed configuration parameters from single topic to multiple topics
  • Updated logging and error handling for multi-topic scenarios

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Apr 3, 2025

Reviewer's Guide by Sourcery

This pull request introduces support for multiple topics in the post-event system. The changes include modifications to the configuration, broker, consumer, and database setup to handle multiple topics. The pull request also includes a new test case to verify the functionality of multiple topics with dedicated clients.

Updated ER diagram for postevent.messages table

erDiagram
    messages {
        bigint idn PK
        VARCHAR topic PK
        VARCHAR id
        VARCHAR source
        VARCHAR data_schema
        VARCHAR subject
        bytea data
        TIMESTAMP time
        VARCHAR status
    }
Loading

Updated class diagram for DefaultMessageBroker

classDiagram
    class DefaultMessageBroker~InT, OutT~ {
        -ConcurrentHashMap~String, Set~MessageSubscriber~OutT~~~ topicSubscribers
        -AtomicBoolean closed
        -AsyncExecutor asyncExecutor
        +DefaultMessageBroker()
        +DefaultMessageBroker(AsyncExecutor asyncExecutor)
        -canProcess(String topic, InT message) bool
        +publish(String topic, InT message) void
        +subscribe(String topic, MessageSubscriber~OutT~ subscriber) bool
        +unsubscribe(String topic, MessageSubscriber~OutT~ subscriber) bool
        +close() void
    }
    DefaultMessageBroker : implements MessageBroker
    DefaultMessageBroker *-- AsyncExecutor : has a
    DefaultMessageBroker -- MessageSubscriber : Uses
    note for DefaultMessageBroker "Supports multiple topics using topicSubscribers"
Loading

Updated class diagram for MessageBrokerGrpcClient

classDiagram
    class MessageBrokerGrpcClient {
        -Logger logger
        -MessageBrokerServiceGrpc.MessageBrokerServiceStub asyncStub
        -Set~String~ subscribed
        ManagedChannel channel
        +MessageBrokerGrpcClient(String host, int port)
        +MessageBrokerGrpcClient(ManagedChannel channel)
        +subscribeToEvents(String topic) void
        -convertFromGrpcEvent(EventResponse grpcEvent) Event
        +publish(String topic, Event message) void
        +subscribe(String topic, MessageSubscriber~Event~ subscriber) bool
        +unsubscribe(String topic, MessageSubscriber~Event~ subscriber) bool
    }
    MessageBrokerGrpcClient -- Event : Publishes
    MessageBrokerGrpcClient : implements MessageBroker
    MessageBrokerGrpcClient -- MessageSubscriber : Uses
    note for MessageBrokerGrpcClient "Supports multiple topics and subscriptions"
Loading

Updated class diagram for ConsumerClient

classDiagram
    class ConsumerClient {
        -Logger logger
        -AsyncExecutor asyncExecutor
        -List~AutoCloseable~ closeables
        -TransactionalBroker tb
        SystemEventBroker seb
        +ConsumerClient(AsyncExecutor asyncExecutor)
        +ConsumerClient()
        +start(Set~String~ topics, DataSource ds, String host, int port) void
        +start(Set~String~ topics, DataSource ds, ManagedChannel channel) void
        +close() void
        +publish(String topic, TransactionalEvent message) void
        +subscribe(String topic, MessageSubscriber~TransactionalEvent~ subscriber) bool
        +unsubscribe(String topic, MessageSubscriber~TransactionalEvent~ subscriber) bool
        +convert(TransactionalEvent m) TransactionalEvent
    }
    ConsumerClient -- TransactionalEvent : Publishes
    ConsumerClient : implements MessageBroker
    ConsumerClient -- MessageSubscriber : Uses
    note for ConsumerClient "Supports multiple topics and subscriptions"
Loading

Updated class diagram for PersistentBroker

classDiagram
    class PersistentBroker~OutT~ {
        -Logger logger
        -String INSERT_SQL
        -String UPDATE_HWM_SQL
        -MessageBroker~Event, OutT~ targetBroker
        -DataSource dataSource
        -AsyncExecutor asyncExecutor
        +PersistentBroker(MessageBroker~Event, OutT~ targetBroker, DataSource dataSource, AsyncExecutor asyncExecutor)
        +publish(String topic, Event event) void
        +subscribe(String topic, MessageSubscriber~OutT~ subscriber) bool
        +unsubscribe(String topic, MessageSubscriber~OutT~ subscriber) bool
        +close() void
        +convert(Event m) OutT
        +onMessage(Event message) void
    }
    PersistentBroker : implements MessageBroker
    PersistentBroker : implements MessageSubscriber
    PersistentBroker -- Event : Publishes
    PersistentBroker -- MessageSubscriber : Uses
    note for PersistentBroker "Supports multiple topics and subscriptions"
Loading

Updated class diagram for LocalConsumer

classDiagram
    class LocalConsumer~OutT~ {
        -Logger logger
        -DebeziumServer debezium
        -MessageBroker~Event, OutT~ broker
        -PostEventConfig config
        -DatabaseSetup db
        +LocalConsumer(PostEventConfig config, MessageBroker~Event, OutT~ broker)
        +start() void
        +stop() void
        +close() void
    }
    LocalConsumer -- Event : Publishes
    LocalConsumer -- MessageBroker : Uses
    note for LocalConsumer "Supports multiple topics"
Loading

Updated class diagram for OrderedProcessor

classDiagram
    class OrderedProcessor {
        -Logger logger
        -BiFunction~Connection, Event, Boolean~ processorFunction
        +OrderedProcessor(BiFunction~Connection, Event, Boolean~ processorFunction)
        +process(Connection connection, Event event) bool
        -processEventWithFunction(Connection connection, Event event) bool
        -hasUnprocessedPriorEvents(Connection connection, Event event) bool
        -previousEventExists(Connection connection, Event event) bool
        -updateEventStatus(Connection connection, Event event) bool
    }
    OrderedProcessor -- Event : Processes
    note for OrderedProcessor "Filters events by topic"
Loading

Updated class diagram for TransactionalBroker

classDiagram
    class TransactionalBroker {
        -Logger logger
        -DataSource ds
        +TransactionalBroker(DataSource ds)
        +publish(String topic, Event message) void
        +subscribe(String topic, MessageSubscriber~TransactionalEvent~ subscriber) bool
        +unsubscribe(String topic, MessageSubscriber~TransactionalEvent~ subscriber) bool
        +convert(Event m) TransactionalEvent
    }
    TransactionalBroker -- Event : Publishes
    TransactionalBroker -- MessageSubscriber : Uses
    note for TransactionalBroker "Supports multiple topics and subscriptions"
Loading

Updated class diagram for SystemEventBroker

classDiagram
    class SystemEventBroker {
        +convert(SystemEvent m) SystemEvent
        +publish(SystemEvent event) void
        +subscribe(MessageSubscriber~SystemEvent~ subscriber) void
    }
    SystemEventBroker -- SystemEvent : Publishes
    SystemEventBroker -- MessageSubscriber : Uses
    note for SystemEventBroker "Supports system events"
Loading

Updated class diagram for PostEventConfig

classDiagram
    class PostEventConfig {
        +affinity() String
        +topics() Set~String~
        +dbHost() String
        +dbPort() int
        +dbUser() String
        +dbPassword() String
        +dbName() String
        +overrideProps() Properties
        +startupTimeoutSeconds() int
        +jdbcUrl() String
    }
    note for PostEventConfig "Supports multiple topics"
Loading

File-Level Changes

Change Details Files
Updated the consumer and broker to support multiple topics.
  • Modified ConfigData to use a Set<String> for topics instead of a single topic string.
  • Updated DebeziumServer to handle multiple topics.
  • Modified ConsumerClient and MessageBrokerGrpcClient to subscribe to multiple topics.
  • Updated TransactionalBroker and PersistentBroker to handle publishing and subscribing to specific topics.
  • Modified CatchupService to process events for specific topics.
  • Updated OrderedProcessor to include topic in the event processing logic.
  • Modified DatabaseSetup to create tables for multiple topics.
  • Updated LocalConsumer to support multiple topics.
  • Modified UnprocessedSubmitter to handle events for specific topics.
src/test/java/com/p14n/postevent/broker/DeterministicConsumerTest.java
src/main/java/com/p14n/postevent/broker/DefaultMessageBroker.java
src/main/java/com/p14n/postevent/catchup/CatchupService.java
src/test/java/com/p14n/postevent/TestUtil.java
src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcClient.java
src/main/java/com/p14n/postevent/broker/MessageBroker.java
src/test/java/com/p14n/postevent/CatchupServiceTest.java
src/main/java/com/p14n/postevent/ConsumerClient.java
src/main/java/com/p14n/postevent/LocalConsumer.java
src/main/java/com/p14n/postevent/debezium/DebeziumServer.java
src/test/java/com/p14n/postevent/DefaultMessageBrokerTest.java
src/test/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcIntegrationTest.java
src/main/java/com/p14n/postevent/data/ConfigData.java
src/main/java/com/p14n/postevent/catchup/PersistentBroker.java
src/test/java/com/p14n/postevent/CatchupServerTest.java
src/main/java/com/p14n/postevent/processor/OrderedProcessor.java
src/main/java/com/p14n/postevent/db/DatabaseSetup.java
src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
src/test/java/com/p14n/postevent/example/RemoteConsumerExample.java
src/test/java/com/p14n/postevent/example/LocalConsumerExample.java
src/main/java/com/p14n/postevent/broker/SystemEventBroker.java
src/test/java/com/p14n/postevent/processor/OrderedProcessorTest.java
src/main/java/com/p14n/postevent/broker/TransactionalBroker.java
src/test/java/com/p14n/postevent/LocalConsumerTest.java
src/test/java/com/p14n/postevent/example/LocalPersistentConsumerExample.java
src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java
src/test/java/com/p14n/postevent/PostgresDebeziumConnectorTest.java
src/main/java/com/p14n/postevent/catchup/UnprocessedSubmitter.java
src/main/java/com/p14n/postevent/debezium/Functions.java
src/main/resources/logback.xml
src/test/java/com/p14n/postevent/broker/TestAsyncExecutor.java
src/main/java/com/p14n/postevent/catchup/CatchupServer.java
src/main/java/com/p14n/postevent/data/UnprocessedEventFinder.java
src/test/java/com/p14n/postevent/catchup/grpc/CatchupGrpcIntegrationTest.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on
    an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @p14n - I've reviewed your changes - here's some feedback:

Overall Comments:

  • The changes to DefaultMessageBroker look good, but it would be good to add a test case that verifies that messages are only delivered to subscribers of the correct topic.
  • The new test case testMultipleTopicsWithDedicatedClients is a great addition to verify the multi-topic functionality.
Here's what I looked at during the review
  • 🟡 General issues: 1 issue found
  • 🟢 Security: all looks good
  • 🟡 Testing: 1 issue found
  • 🟡 Complexity: 1 issue found
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

subscribed.set(false);
public boolean unsubscribe(String topic, MessageSubscriber<Event> subscriber) {
boolean unsubscribed = super.unsubscribe(topic, subscriber);
if (topicSubscribers.isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Check reference to undefined 'topicSubscribers'.

The unsubscribe method uses 'if (topicSubscribers.isEmpty())' but this field is not declared in MessageBrokerGrpcClient. It appears to have been carried over from another class. Likely, it should instead check whether there are any active subscriptions for the given topic (for example, via the 'subscribed' set) before shutting down the channel.

@@ -115,6 +108,13 @@ void testDeterministicEventDelivery(@ForAll("randomSeeds") long seed) throws Exc
}
Thread.sleep(2000);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test for unsubscribing.

It would be beneficial to add a test case that specifically covers the unsubscribe functionality to ensure that clients can correctly unsubscribe from topics and no longer receive messages after doing so.

Suggested implementation:

// (Existing test methods and code)

// New test method to verify the unsubscribe functionality
@Test
public void testUnsubscribe() throws Exception {
    // Setup client subscriber and capture received events
    Set<Integer> receivedIds = ConcurrentHashMap.newKeySet();

    // Subscribe to the topic and capture events
    var subscription = client.subscribe(TOPIC, (TransactionalEvent event) -> {
        receivedIds.add(event.event().idn());
    });

    // Allow some time for subscription to be active, and verify subscription works by waiting briefly
    Thread.sleep(1000);

    // Now unsubscribe to stop receiving further events
    subscription.unsubscribe();

    // Clear any events that might have been received prior to unsubscribing
    receivedIds.clear();

    // Publish a new event after unsubscribe.
    // This assumes a publish method exists that sends events to TOPIC.
    client.publish(TOPIC, createTestEvent());

    // Allow time for event processing after publish
    Thread.sleep(2000);

    // Assert that no new event was received after unsubscribing
    assertTrue("No events should be received after unsubscribe", receivedIds.isEmpty());
}

// Helper method to create a dummy test event.
// Implementation should be replaced with the actual parameters and logic applicable to your TransactionalEvent.
private TransactionalEvent createTestEvent() {
    // TODO: Provide actual implementation details for creating a dummy event for testing
    return new TransactionalEvent(/* pass necessary parameters */);
}

Make sure that:

  1. The client.subscribe method returns an object (here named subscription) with an unsubscribe() method. If the method signature differs, adjust accordingly.
  2. A client.publish(TOPIC, event) method is available or replace it with the actual publish mechanism.
  3. All required imports are included:
    • import org.junit.Test;
    • import static org.junit.Assert.assertTrue;
    • import java.util.Set;
    • import java.util.concurrent.ConcurrentHashMap;
      Also, adjust the createTestEvent() method to properly construct a valid TransactionalEvent instance based on your project’s actual implementation.

import io.grpc.stub.StreamObserver;

import java.time.Instant;
import java.time.OffsetDateTime;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider relying on the base class's tracking of per-topic subscribers instead of maintaining a separate subscribed set to simplify the logic and reduce state complexity .

Consider removing the extra subscription tracking (i.e. the separate subscribed set) and instead rely on the base class’s tracking of per‑topic subscribers. For example, if your base class maintains subscriber collections per topic you can check their size to decide when to initiate or cancel a subscription. This reduces branching and the extra state.

For instance:

@Override
public boolean subscribe(String topic, MessageSubscriber<Event> subscriber) {
    if (super.subscribe(topic, subscriber)) {
        // If this is the first subscriber for the topic, subscribe to events.
        if (getSubscribers(topic).size() == 1) { // assuming getSubscribers(topic) is available
            subscribeToEvents(topic);
        }
        return true;
    }
    return false;
}

@Override
public boolean unsubscribe(String topic, MessageSubscriber<Event> subscriber) {
    boolean result = super.unsubscribe(topic, subscriber);
    if (getSubscribers(topic).isEmpty()) {
        shutdownTopicSubscription(topic); // implement shutdown per topic if needed
    }
    return result;
}

This way, you remove the duplicated state, delegate the logic to the underlying topic-subscriber management, and make the code easier to follow.

@p14n p14n merged commit d797770 into main Apr 3, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant