Skip to content

Human refactor 2#36

Merged
p14n merged 22 commits intomainfrom
human-refactor-2
Mar 28, 2025
Merged

Human refactor 2#36
p14n merged 22 commits intomainfrom
human-refactor-2

Conversation

@p14n
Copy link
Copy Markdown
Owner

@p14n p14n commented Mar 28, 2025

Summary by Sourcery

Refactor and simplify the event publishing and consumption system, introducing new high-level abstractions and improving code structure

New Features:

  • Introduced ConsumerClient for simplified remote event consumption
  • Added ConsumerServer for simplified server-side event handling
  • Created LocalPersistentConsumer for streamlined local persistent event processing

Enhancements:

  • Improved thread naming and executor configuration
  • Simplified event broker and consumer interfaces
  • Enhanced error handling and resource management
  • Updated README with more comprehensive project description

Chores:

  • Updated logging configuration to reduce verbosity
  • Refactored example code to use new abstractions
  • Improved code structure and reduced complexity

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Mar 28, 2025

Reviewer's Guide by Sourcery

This pull request introduces several refactorings and enhancements to the postevent library. It introduces thread factories for executor services, implements the AutoCloseable interface in DefaultExecutor, refactors the example to use ConsumerClient and ConsumerServer, refactors the local persistent consumer example to use LocalPersistentConsumer, adds ConsumerClient, LocalPersistentConsumer, and ConsumerServer, updates README.md, modifies TransactionalBroker and CatchupGrpcClient to accept an AsyncExecutor and ManagedChannel in their constructors, updates DebeziumServer to use a thread factory for the executor service and reduces the shutdown timeout, and adds topic to the EventResponse in MessageBrokerGrpcServer.

Updated class diagram for DefaultExecutor

classDiagram
  class DefaultExecutor {
    -ScheduledExecutorService se
    -ExecutorService es
    +DefaultExecutor(int scheduledSize)
    +ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
    +List~Runnable~ shutdownNow()
    +Future~T~ submit(Callable~T~ task)
    +close() throws Exception
  }
  DefaultExecutor : + AutoCloseable
Loading

Updated class diagram for AsyncExecutor

classDiagram
  class AsyncExecutor {
    <<interface>>
    +ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
    +List~Runnable~ shutdownNow()
    +Future~T~ submit(Callable~T~ task)
    +close() throws Exception
  }
Loading

Class diagram for LocalPersistentConsumer

classDiagram
  class LocalPersistentConsumer {
    -PostEventConfig cfg
    -DataSource ds
    -AsyncExecutor asyncExecutor
    -TransactionalBroker tb
    -List~AutoCloseable~ closeables
    +LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, AsyncExecutor asyncExecutor)
    +LocalPersistentConsumer(DataSource ds, PostEventConfig cfg)
    +start() throws IOException, InterruptedException
    +close()
    +publish(TransactionalEvent message)
    +subscribe(MessageSubscriber~TransactionalEvent~ subscriber)
    +unsubscribe(MessageSubscriber~TransactionalEvent~ subscriber)
    +convert(TransactionalEvent m)
  }
  LocalPersistentConsumer : + AutoCloseable
  LocalPersistentConsumer ..> TransactionalBroker : uses
  LocalPersistentConsumer ..> AsyncExecutor : uses
  LocalPersistentConsumer ..> LocalConsumer : uses
  LocalPersistentConsumer ..> PersistentBroker : uses
  LocalPersistentConsumer ..> SystemEventBroker : uses
Loading

Updated class diagram for TransactionalBroker

classDiagram
  class TransactionalBroker {
    -DataSource ds
    +TransactionalBroker(DataSource ds, AsyncExecutor asyncExecutor)
    +TransactionalBroker(DataSource ds)
    +publish(Event message)
  }
  TransactionalBroker --|> DefaultMessageBroker
Loading

Updated class diagram for CatchupGrpcClient

classDiagram
  class CatchupGrpcClient {
    -ManagedChannel channel
    -CatchupServiceGrpc.CatchupServiceBlockingStub blockingStub
    +CatchupGrpcClient(String host, int port)
    +CatchupGrpcClient(ManagedChannel channel)
    +fetchEvents(long startAfter, long end, int maxResults, String topic) List~Event~
    +close()
    -convertFromGrpcEvent(com.p14n.postevent.catchup.grpc.Event grpcEvent, String topic) Event
  }
  CatchupGrpcClient : + AutoCloseable
  CatchupGrpcClient ..> ManagedChannel : uses
Loading

Updated class diagram for DebeziumServer

classDiagram
  class DebeziumServer {
    -static Logger logger
    +static Properties props(String dbServerName, String dbName, String dbUser, String dbPassword, String tableName)
  }
  class DebeziumEngineRunner {
    -DebeziumEngine~ChangeEvent~String, String~~ engine
    -ExecutorService executor
    -CountDownLatch started
    +start(PostEventConfig cfg, Consumer~ChangeEvent~String, String~~~ consumer) void
    +taskStarted() void
    +stop() void
  }
  DebeziumServer *-- DebeziumEngineRunner : has a
Loading

Updated class diagram for EventResponse

classDiagram
  class EventResponse {
    +setId(String id)
    +setSource(String source)
    +setType(String type)
    +setDataContentType(String datacontenttype)
    +setDataSchema(String dataschema)
    +setSubject(String subject)
    +setTime(String time)
    +setData(ByteString data)
    +setIdn(String idn)
    +setTopic(String topic)
  }
Loading

Updated class diagram for EventMessageBroker

classDiagram
  class EventMessageBroker {
    +EventMessageBroker(AsyncExecutor asyncExecutor)
    +EventMessageBroker()
    +convert(Event m)
  }
  EventMessageBroker --|> DefaultMessageBroker
Loading

Updated class diagram for SystemEventBroker

classDiagram
  class SystemEventBroker {
    +SystemEventBroker(AsyncExecutor asyncExecutor)
    +SystemEventBroker()
    +convert(SystemEvent m)
  }
  SystemEventBroker --|> DefaultMessageBroker
Loading

File-Level Changes

Change Details Files
Introduced thread factories for executor services to provide more descriptive thread names, aiding in debugging and monitoring.
  • Replaced Executors.newScheduledThreadPool(scheduledSize) with a thread factory that names threads as post-event-scheduled-%d.
  • Replaced Executors.newVirtualThreadPerTaskExecutor() with a thread factory that names threads as post-event-virtual-%d.
src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
Implemented the AutoCloseable interface in DefaultExecutor to ensure resources are properly released when the executor is no longer needed.
  • Added the AutoCloseable interface to the DefaultExecutor class.
  • Implemented the close() method to call shutdownNow() to interrupt all running tasks and shut down the executor.
src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
Refactored the example to use ConsumerClient and ConsumerServer, simplifying the setup of remote consumers and servers.
  • Replaced the manual construction of gRPC servers and clients with ConsumerServer and ConsumerClient.
  • Simplified the server setup by using ConsumerServer to encapsulate the creation and management of gRPC services.
  • Simplified the client setup by using ConsumerClient to encapsulate the connection and subscription logic.
  • Removed direct usage of MessageBrokerGrpcClient, CatchupGrpcClient, and related classes in the example.
src/test/java/com/p14n/postevent/example/RemoteConsumerExample.java
Refactored the local persistent consumer example to use LocalPersistentConsumer, simplifying the setup of local persistent consumers.
  • Replaced the manual construction of brokers and consumers with LocalPersistentConsumer.
  • Simplified the example by using LocalPersistentConsumer to encapsulate the creation and management of brokers and consumers.
  • Removed direct usage of TransactionalBroker, SystemEventBroker, PersistentBroker, and related classes in the example.
src/test/java/com/p14n/postevent/example/LocalPersistentConsumerExample.java
Added ConsumerClient to encapsulate the logic of connecting to a remote consumer.
  • Created a new ConsumerClient class that implements the MessageBroker interface.
  • Implemented the start method to initialize the necessary brokers and clients.
  • Implemented the close method to release resources.
  • Implemented the publish, subscribe, unsubscribe, and convert methods to interact with the underlying brokers.
src/main/java/com/p14n/postevent/ConsumerClient.java
Added LocalPersistentConsumer to encapsulate the logic of creating a local persistent consumer.
  • Created a new LocalPersistentConsumer class that implements the MessageBroker interface.
  • Implemented the start method to initialize the necessary brokers and consumers.
  • Implemented the close method to release resources.
  • Implemented the publish, subscribe, unsubscribe, and convert methods to interact with the underlying brokers.
src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
Added ConsumerServer to encapsulate the logic of creating a consumer server.
  • Created a new ConsumerServer class.
  • Implemented the start method to initialize the necessary brokers and consumers.
  • Implemented the close method to release resources.
src/main/java/com/p14n/postevent/ConsumerServer.java
Updated README.md to include a description of the library and its features.
  • Added a description of the library's purpose and key features.
  • Added a list of architecture components.
README.md
Modified TransactionalBroker to accept an AsyncExecutor in its constructor.
  • Added a constructor that accepts an AsyncExecutor.
  • Modified the existing constructor to call the new constructor with a default AsyncExecutor.
src/main/java/com/p14n/postevent/broker/TransactionalBroker.java
Modified CatchupGrpcClient to accept a ManagedChannel in its constructor.
  • Added a constructor that accepts a ManagedChannel.
  • Modified the existing constructor to call the new constructor with a ManagedChannelBuilder.
src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java
Modified EventMessageBroker and SystemEventBroker to accept an AsyncExecutor in its constructor.
  • Added a constructor that accepts an AsyncExecutor.
  • Modified the existing constructor to call the new constructor with a default AsyncExecutor.
src/main/java/com/p14n/postevent/broker/EventMessageBroker.java
src/main/java/com/p14n/postevent/broker/SystemEventBroker.java
Updated DebeziumServer to use a thread factory for the executor service and reduced the shutdown timeout.
  • Added a thread factory to the executor service to provide more descriptive thread names.
  • Reduced the shutdown timeout from 30 seconds to 5 seconds.
src/main/java/com/p14n/postevent/debezium/DebeziumServer.java
Added topic to the EventResponse in MessageBrokerGrpcServer.
  • Added topic to the EventResponse builder.
src/main/java/com/p14n/postevent/broker/grpc/MessageBrokerGrpcServer.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on
    an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @p14n - I've reviewed your changes - here's some feedback:

Overall Comments:

  • Consider adding a health check endpoint to the gRPC server for better monitoring and operational readiness.
  • The new ConsumerClient and ConsumerServer classes should have corresponding integration tests.
Here's what I looked at during the review
  • 🟡 General issues: 4 issues found
  • 🟢 Security: all looks good
  • 🟡 Testing: 2 issues found
  • 🟢 Complexity: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +73 to +74
} catch (Exception e) {
e.printStackTrace();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Replace printStackTrace with a logging framework.

Instead of outputting exceptions directly to the console, consider using a logger to maintain consistency and better control over error reporting.

Suggested implementation:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Existing import statements may go here...
public class ConsumerClient {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerClient.class);
                logger.error("Error occurred while closing resource", e);

public void close() {
for (var c : closeables) {
try {
System.out.println("Closing " + c);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Avoid using System.out.println for resource closing notifications.

It would be beneficial to use a logging framework here to help manage log output and prevent unwanted console prints in production environments.

Suggested implementation:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// (other imports)
public class LocalPersistentConsumer {
    private static final Logger logger = LoggerFactory.getLogger(LocalPersistentConsumer.class);
                logger.info("Closing {}", c);
                logger.error("Error closing {}", c, e);

try {
c.close();
} catch (Exception e) {
e.printStackTrace();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Replace printStackTrace with proper logging.

Using a logging mechanism will allow for more controlled error management and consistent log formatting across the application.


es.execute(() -> constructServer(ds, cfg, serverLatch, port));

Thread.sleep(2000);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (testing): Unreliable sleep in test

The test uses Thread.sleep which makes the test unreliable, consider using Awaitility or another approach to wait for the asynchronous operations to complete.

});

lc.start();
Thread.sleep(1000);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (testing): Unreliable sleep in test

The test uses Thread.sleep which makes the test unreliable, consider using Awaitility or another approach to wait for the asynchronous operations to complete.

* Supports both in-process and remote consumers (via gRPC)
* Guarantees ordered event delivery by subject
* Provides catch-up mechanism for missed events
* Ensures new consumers receive all historical events on first connect
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (typo): Typo: "connect" should be "connection"

The phrase "on first connect" is grammatically incorrect. It should be "on first connection".

Suggested change
* Ensures new consumers receive all historical events on first connect
* Ensures new consumers receive all historical events on first connection

@p14n p14n merged commit 98e3247 into main Mar 28, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant