Conversation
Reviewer's Guide by SourceryThis pull request enhances the logging mechanism across multiple classes by replacing the Java Util Logging (JUL) with SLF4J, improving log message formatting, and adding more detailed logging for better traceability and error handling. No diagrams generated as the changes look simple and do not need a visual representation. File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey @p14n - I've reviewed your changes - here's some feedback:
Overall Comments:
- The logging improvements look good, but consider adding a correlation ID to the logs to help trace events across different components.
- The use of slf4j is great, but ensure that the log levels are appropriate for the information being logged (e.g., use debug for detailed information, info for general events, warn/error for exceptions).
Here's what I looked at during the review
- 🟡 General issues: 1 issue found
- 🟢 Security: all looks good
- 🟢 Testing: all looks good
- 🟡 Complexity: 3 issues found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| throw new RuntimeException("Already started"); | ||
| } | ||
|
|
||
| try { |
There was a problem hiding this comment.
suggestion (bug_risk): Consider cleaning up partially initialized resources on startup failure.
If an exception occurs within the try block after some components have been successfully initialized, it may leave those resources active. It might be beneficial to include cleanup logic in the catch block to avoid potential resource leaks.
Suggested implementation:
logger.atInfo().log("Starting local persistent consumer");
if (tb != null) {
logger.atError().log("Local persistent consumer already started");
throw new RuntimeException("Already started");
}
logger.atInfo().log("Starting local persistent consumer");
TransactionalBroker tempTB = null;
SystemEventBroker tempSEB = null;
try {
tempTB = new TransactionalBroker(ds, asyncExecutor);
tempSEB = new SystemEventBroker(asyncExecutor);
tempSEB.subscribe(new CatchupService(ds, catchupClient, tempSEB));
tempSEB.subscribe(new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tempTB));
asyncExecutor.scheduleAtFixedRate(() -> tempSEB.publish(SystemEvent.UnprocessedCheckRequired), 30, 30, TimeUnit.SECONDS);
tb = tempTB;
} catch (Exception e) {
if (tempTB != null) {
tempTB.close();
}
if (tempSEB != null) {
tempSEB.shutdown();
}
throw e;
}Ensure that both TransactionalBroker and SystemEventBroker have cleanup methods (e.g., close() and shutdown()) or adjust the cleanup calls accordingly.
Also verify that the remaining usage of tb in the class correctly refers to the field and that no other resource leaks occur.
| @@ -33,35 +36,57 @@ public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg) { | |||
| } | |||
|
|
|||
| public void start() throws IOException, InterruptedException { | |||
There was a problem hiding this comment.
issue (complexity): Consider extracting the startup sequence into a helper method to improve clarity.
Consider extracting the startup sequence into its own helper method to reduce nesting and improve method clarity. For example:
public void start() throws IOException, InterruptedException {
logger.atInfo().log("Starting local persistent consumer");
if (tb != null) {
logger.atError().log("Local persistent consumer already started");
throw new RuntimeException("Already started");
}
try {
startupSequence();
logger.atInfo().log("Local persistent consumer started successfully");
} catch (Exception e) {
logger.atError().setCause(e).log("Failed to start local persistent consumer");
throw e;
}
}
private void startupSequence() {
tb = new TransactionalBroker(ds, asyncExecutor);
var seb = new SystemEventBroker(asyncExecutor);
var pb = new PersistentBroker<>(tb, ds, seb);
var lc = new LocalConsumer<>(cfg, pb);
seb.subscribe(new CatchupService(ds, new CatchupServer(ds), seb));
var unprocessedSubmitter = new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb);
seb.subscribe(unprocessedSubmitter);
asyncExecutor.scheduleAtFixedRate(() -> {
logger.atDebug().log("Triggering unprocessed check");
seb.publish(SystemEvent.UnprocessedCheckRequired);
}, 30, 30, TimeUnit.SECONDS);
lc.start();
closeables = List.of(lc, pb, seb, tb, asyncExecutor);
}This refactoring keeps functionality intact while simplifying the start() method.
| @@ -49,33 +52,56 @@ public void start(DataSource ds, String host, int port) { | |||
| } | |||
|
|
|||
| public void start(DataSource ds, ManagedChannel channel) { | |||
There was a problem hiding this comment.
issue (complexity): Consider extracting initialization code into a helper method to reduce nesting and repetitive logging.
Consider extracting the initialization code into its own helper method to reduce nesting and repetitive logging. This approach helps in isolating error handling and keeps the start() method cleaner. For example:
public void start(DataSource ds, ManagedChannel channel) {
logger.atInfo().log("Starting consumer client");
if (tb != null) {
logger.atError().log("Consumer client already started");
throw new IllegalStateException("Already started");
}
try {
initializeComponents(ds, channel);
logger.atInfo().log("Consumer client started successfully");
} catch (Exception e) {
logger.atError()
.setCause(e)
.log("Failed to start consumer client");
throw new RuntimeException("Failed to start consumer client", e);
}
}
private void initializeComponents(DataSource ds, ManagedChannel channel) {
tb = new TransactionalBroker(ds, asyncExecutor);
var seb = new SystemEventBroker(asyncExecutor);
var pb = new PersistentBroker<>(tb, ds, seb);
var client = new MessageBrokerGrpcClient(channel, topic);
var catchupClient = new CatchupGrpcClient(channel);
client.subscribe(pb);
seb.subscribe(new CatchupService(ds, catchupClient, seb));
seb.subscribe(new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb));
asyncExecutor.scheduleAtFixedRate(
() -> seb.publish(SystemEvent.UnprocessedCheckRequired),
30, 30, TimeUnit.SECONDS
);
closeables = List.of(client, catchupClient, pb, seb, tb);
}This refactoring reduces nesting within start() and isolates logging and error handling for initialization.
| @@ -40,30 +43,51 @@ public void start(int port) throws IOException, InterruptedException { | |||
| } | |||
|
|
|||
| public void start(ServerBuilder<?> sb) throws IOException, InterruptedException { | |||
There was a problem hiding this comment.
issue (complexity): Consider extracting error-handling and logging into helper methods to keep business logic focused.
Consider extracting the error-handling and logging into small helper methods so that your business logic remains focused. For example, you could create an execution wrapper for startup steps:
private <T> T executeWithLogging(Supplier<T> supplier, String successMsg, String errorMsg) {
try {
T result = supplier.get();
logger.atInfo().log(successMsg);
return result;
} catch (Exception e) {
logger.atError().setCause(e).log(errorMsg);
throw e;
}
}Then refactor your start method like so:
public void start(ServerBuilder<?> sb) throws IOException, InterruptedException {
logger.atInfo().log("Starting consumer server");
var mb = new EventMessageBroker(asyncExecutor);
var lc = new LocalConsumer<>(cfg, mb);
var grpcServer = new MessageBrokerGrpcServer(mb);
var catchupServer = new CatchupServer(ds);
var catchupService = new CatchupGrpcServer.CatchupServiceImpl(catchupServer);
executeWithLogging(() -> {
lc.start();
server = sb.addService(grpcServer)
.addService(catchupService)
.permitKeepAliveTime(1, TimeUnit.HOURS)
.permitKeepAliveWithoutCalls(true)
.build()
.start();
return null;
}, "Consumer server started successfully", "Failed to start consumer server");
closeables = List.of(lc, mb, asyncExecutor);
}Similarly, for the stop method, abstract the close-and-log behavior:
private void closeResource(AutoCloseable resource) {
try {
resource.close();
} catch (Exception e) {
logger.atWarn()
.setCause(e)
.addArgument(resource.getClass().getSimpleName())
.log("Error closing {}");
}
}
public void stop() {
logger.atInfo().log("Stopping consumer server");
server.shutdown();
for (var c : closeables) {
closeResource(c);
}
logger.atInfo().log("Consumer server stopped");
}These steps separate the business logic from the error-handling and logging concerns, thus reducing nesting and improving clarity while preserving functionality.
Summary by Sourcery
Replace Java's built-in logging with SLF4J logging across multiple classes, improving log management and providing more structured logging capabilities
Enhancements:
Chores: