Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions src/main/java/com/p14n/postevent/ConsumerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.p14n.postevent.data.UnprocessedEventFinder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;

Expand All @@ -25,6 +27,7 @@
import java.util.concurrent.TimeUnit;

public class ConsumerClient implements AutoCloseable, MessageBroker<TransactionalEvent, TransactionalEvent> {
private static final Logger logger = LoggerFactory.getLogger(ConsumerClient.class);

private AsyncExecutor asyncExecutor;
private List<AutoCloseable> closeables;
Expand All @@ -49,33 +52,56 @@ public void start(DataSource ds, String host, int port) {
}

public void start(DataSource ds, ManagedChannel channel) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting initialization code into a helper method to reduce nesting and repetitive logging.

Consider extracting the initialization code into its own helper method to reduce nesting and repetitive logging. This approach helps in isolating error handling and keeps the start() method cleaner. For example:

public void start(DataSource ds, ManagedChannel channel) {
    logger.atInfo().log("Starting consumer client");
    if (tb != null) {
        logger.atError().log("Consumer client already started");
        throw new IllegalStateException("Already started");
    }
    try {
        initializeComponents(ds, channel);
        logger.atInfo().log("Consumer client started successfully");
    } catch (Exception e) {
        logger.atError()
              .setCause(e)
              .log("Failed to start consumer client");
        throw new RuntimeException("Failed to start consumer client", e);
    }
}

private void initializeComponents(DataSource ds, ManagedChannel channel) {
    tb = new TransactionalBroker(ds, asyncExecutor);
    var seb = new SystemEventBroker(asyncExecutor);
    var pb = new PersistentBroker<>(tb, ds, seb);
    var client = new MessageBrokerGrpcClient(channel, topic);
    var catchupClient = new CatchupGrpcClient(channel);

    client.subscribe(pb);
    seb.subscribe(new CatchupService(ds, catchupClient, seb));
    seb.subscribe(new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb));

    asyncExecutor.scheduleAtFixedRate(
        () -> seb.publish(SystemEvent.UnprocessedCheckRequired),
        30, 30, TimeUnit.SECONDS
    );

    closeables = List.of(client, catchupClient, pb, seb, tb);
}

This refactoring reduces nesting within start() and isolates logging and error handling for initialization.

logger.atInfo().log("Starting consumer client");

if (tb != null) {
logger.atError().log("Consumer client already started");
throw new IllegalStateException("Already started");
}
tb = new TransactionalBroker(ds, asyncExecutor);
var seb = new SystemEventBroker(asyncExecutor);
var pb = new PersistentBroker<>(tb, ds, seb);
var client = new MessageBrokerGrpcClient(channel, topic);
var catchupClient = new CatchupGrpcClient(channel);
client.subscribe(pb);

seb.subscribe(new CatchupService(ds, catchupClient, seb));
seb.subscribe(new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb));
asyncExecutor.scheduleAtFixedRate(() -> seb.publish(SystemEvent.UnprocessedCheckRequired), 30, 30, TimeUnit.SECONDS);
try {
tb = new TransactionalBroker(ds, asyncExecutor);
var seb = new SystemEventBroker(asyncExecutor);
var pb = new PersistentBroker<>(tb, ds, seb);
var client = new MessageBrokerGrpcClient(channel, topic);
var catchupClient = new CatchupGrpcClient(channel);

client.subscribe(pb);
seb.subscribe(new CatchupService(ds, catchupClient, seb));
seb.subscribe(new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb));

asyncExecutor.scheduleAtFixedRate(
() -> seb.publish(SystemEvent.UnprocessedCheckRequired),
30, 30, TimeUnit.SECONDS);

closeables = List.of(client, catchupClient, pb, seb, tb);
closeables = List.of(client, catchupClient, pb, seb, tb);

logger.atInfo().log("Consumer client started successfully");

} catch (Exception e) {
logger.atError()
.setCause(e)
.log("Failed to start consumer client");
throw new RuntimeException("Failed to start consumer client", e);
}
}

@Override
public void close() {
logger.atInfo().log("Closing consumer client");

for (AutoCloseable c : closeables) {
try {
c.close();
} catch (Exception e) {
e.printStackTrace();
logger.atWarn()
.setCause(e)
.addArgument(c.getClass().getSimpleName())
.log("Error closing {}");
}
}

logger.atInfo().log("Consumer client closed");
}

@Override
Expand Down
40 changes: 32 additions & 8 deletions src/main/java/com/p14n/postevent/ConsumerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

import io.grpc.Server;
import io.grpc.ServerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerServer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ConsumerServer.class);

private DataSource ds;
private ConfigData cfg;
Expand All @@ -40,30 +43,51 @@ public void start(int port) throws IOException, InterruptedException {
}

public void start(ServerBuilder<?> sb) throws IOException, InterruptedException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting error-handling and logging into helper methods to keep business logic focused.

Consider extracting the error-handling and logging into small helper methods so that your business logic remains focused. For example, you could create an execution wrapper for startup steps:

private <T> T executeWithLogging(Supplier<T> supplier, String successMsg, String errorMsg) {
    try {
        T result = supplier.get();
        logger.atInfo().log(successMsg);
        return result;
    } catch (Exception e) {
        logger.atError().setCause(e).log(errorMsg);
        throw e;
    }
}

Then refactor your start method like so:

public void start(ServerBuilder<?> sb) throws IOException, InterruptedException {
    logger.atInfo().log("Starting consumer server");
    var mb = new EventMessageBroker(asyncExecutor);
    var lc = new LocalConsumer<>(cfg, mb);
    var grpcServer = new MessageBrokerGrpcServer(mb);
    var catchupServer = new CatchupServer(ds);
    var catchupService = new CatchupGrpcServer.CatchupServiceImpl(catchupServer);

    executeWithLogging(() -> {
        lc.start();
        server = sb.addService(grpcServer)
                   .addService(catchupService)
                   .permitKeepAliveTime(1, TimeUnit.HOURS)
                   .permitKeepAliveWithoutCalls(true)
                   .build()
                   .start();
        return null;
    }, "Consumer server started successfully", "Failed to start consumer server");

    closeables = List.of(lc, mb, asyncExecutor);
}

Similarly, for the stop method, abstract the close-and-log behavior:

private void closeResource(AutoCloseable resource) {
    try {
        resource.close();
    } catch (Exception e) {
        logger.atWarn()
              .setCause(e)
              .addArgument(resource.getClass().getSimpleName())
              .log("Error closing {}");
    }
}

public void stop() {
    logger.atInfo().log("Stopping consumer server");
    server.shutdown();
    for (var c : closeables) {
        closeResource(c);
    }
    logger.atInfo().log("Consumer server stopped");
}

These steps separate the business logic from the error-handling and logging concerns, thus reducing nesting and improving clarity while preserving functionality.

logger.atInfo().log("Starting consumer server");

var mb = new EventMessageBroker(asyncExecutor);
var lc = new LocalConsumer<>(cfg, mb);
var grpcServer = new MessageBrokerGrpcServer(mb);
var catchupServer = new CatchupServer(ds);
var catchupService = new CatchupGrpcServer.CatchupServiceImpl(catchupServer);
lc.start();
server = sb.addService(grpcServer)
.addService(catchupService)
.permitKeepAliveTime(1, TimeUnit.HOURS)
.permitKeepAliveWithoutCalls(true)
.build()
.start();

try {
lc.start();
server = sb.addService(grpcServer)
.addService(catchupService)
.permitKeepAliveTime(1, TimeUnit.HOURS)
.permitKeepAliveWithoutCalls(true)
.build()
.start();

logger.atInfo().log("Consumer server started successfully");

} catch (Exception e) {
logger.atError()
.setCause(e)
.log("Failed to start consumer server");
throw e;
}

closeables = List.of(lc, mb, asyncExecutor);
}

public void stop() {
logger.atInfo().log("Stopping consumer server");

server.shutdown();
for (var c : closeables) {
try {
c.close();
} catch (Exception e) {
e.printStackTrace();
logger.atWarn()
.setCause(e)
.addArgument(c.getClass().getSimpleName())
.log("Error closing {}");
}
}

logger.atInfo().log("Consumer server stopped");
}

@Override
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/p14n/postevent/LocalConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public void start() throws IOException, InterruptedException {
Consumer<ChangeEvent<String, String>> consumer = record -> {
try {
Event event = changeEventToEvent(record);
System.err.println("LC got event " + event.id());
if (event != null) {
broker.publish(event);
}
Expand Down
61 changes: 43 additions & 18 deletions src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalPersistentConsumer implements AutoCloseable, MessageBroker<TransactionalEvent, TransactionalEvent> {
private static final Logger logger = LoggerFactory.getLogger(LocalPersistentConsumer.class);
private PostEventConfig cfg;
private DataSource ds;
private AsyncExecutor asyncExecutor;
Expand All @@ -33,35 +36,57 @@ public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg) {
}

public void start() throws IOException, InterruptedException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the startup sequence into a helper method to improve clarity.

Consider extracting the startup sequence into its own helper method to reduce nesting and improve method clarity. For example:

public void start() throws IOException, InterruptedException {
    logger.atInfo().log("Starting local persistent consumer");
    if (tb != null) {
        logger.atError().log("Local persistent consumer already started");
        throw new RuntimeException("Already started");
    }
    try {
        startupSequence();
        logger.atInfo().log("Local persistent consumer started successfully");
    } catch (Exception e) {
        logger.atError().setCause(e).log("Failed to start local persistent consumer");
        throw e;
    }
}

private void startupSequence() {
    tb = new TransactionalBroker(ds, asyncExecutor);
    var seb = new SystemEventBroker(asyncExecutor);
    var pb = new PersistentBroker<>(tb, ds, seb);
    var lc = new LocalConsumer<>(cfg, pb);

    seb.subscribe(new CatchupService(ds, new CatchupServer(ds), seb));

    var unprocessedSubmitter = new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb);
    seb.subscribe(unprocessedSubmitter);

    asyncExecutor.scheduleAtFixedRate(() -> {
        logger.atDebug().log("Triggering unprocessed check");
        seb.publish(SystemEvent.UnprocessedCheckRequired);
    }, 30, 30, TimeUnit.SECONDS);

    lc.start();
    closeables = List.of(lc, pb, seb, tb, asyncExecutor);
}

This refactoring keeps functionality intact while simplifying the start() method.

if (tb != null)
throw new RuntimeException("Already started");
tb = new TransactionalBroker(ds, asyncExecutor);
var seb = new SystemEventBroker(asyncExecutor);
var pb = new PersistentBroker<>(tb, ds, seb);
var lc = new LocalConsumer<>(cfg, pb);
seb.subscribe(new CatchupService(ds, new CatchupServer(ds), seb));
var unprocessedSubmitter = new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb);
seb.subscribe(unprocessedSubmitter);

asyncExecutor.scheduleAtFixedRate(() -> {
seb.publish(SystemEvent.UnprocessedCheckRequired);
}, 30, 30, TimeUnit.SECONDS);
logger.atInfo().log("Starting local persistent consumer");

lc.start();
closeables = List.of(lc, pb, seb, tb, asyncExecutor);
if (tb != null) {
logger.atError().log("Local persistent consumer already started");
throw new RuntimeException("Already started");
}

try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Consider cleaning up partially initialized resources on startup failure.

If an exception occurs within the try block after some components have been successfully initialized, it may leave those resources active. It might be beneficial to include cleanup logic in the catch block to avoid potential resource leaks.

Suggested implementation:

        logger.atInfo().log("Starting local persistent consumer");

        if (tb != null) {
            logger.atError().log("Local persistent consumer already started");
            throw new RuntimeException("Already started");
        }

        logger.atInfo().log("Starting local persistent consumer");
        TransactionalBroker tempTB = null;
        SystemEventBroker tempSEB = null;
        try {
            tempTB = new TransactionalBroker(ds, asyncExecutor);
            tempSEB = new SystemEventBroker(asyncExecutor);
            tempSEB.subscribe(new CatchupService(ds, catchupClient, tempSEB));
            tempSEB.subscribe(new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tempTB));
            asyncExecutor.scheduleAtFixedRate(() -> tempSEB.publish(SystemEvent.UnprocessedCheckRequired), 30, 30, TimeUnit.SECONDS);
            tb = tempTB;
        } catch (Exception e) {
            if (tempTB != null) {
                tempTB.close();
            }
            if (tempSEB != null) {
                tempSEB.shutdown();
            }
            throw e;
        }

Ensure that both TransactionalBroker and SystemEventBroker have cleanup methods (e.g., close() and shutdown()) or adjust the cleanup calls accordingly.
Also verify that the remaining usage of tb in the class correctly refers to the field and that no other resource leaks occur.

tb = new TransactionalBroker(ds, asyncExecutor);
var seb = new SystemEventBroker(asyncExecutor);
var pb = new PersistentBroker<>(tb, ds, seb);
var lc = new LocalConsumer<>(cfg, pb);

seb.subscribe(new CatchupService(ds, new CatchupServer(ds), seb));
var unprocessedSubmitter = new UnprocessedSubmitter(ds, new UnprocessedEventFinder(), tb);
seb.subscribe(unprocessedSubmitter);

asyncExecutor.scheduleAtFixedRate(() -> {
logger.atDebug().log("Triggering unprocessed check");
seb.publish(SystemEvent.UnprocessedCheckRequired);
}, 30, 30, TimeUnit.SECONDS);

lc.start();
closeables = List.of(lc, pb, seb, tb, asyncExecutor);

logger.atInfo().log("Local persistent consumer started successfully");

} catch (Exception e) {
logger.atError()
.setCause(e)
.log("Failed to start local persistent consumer");
throw e;
}
}

@Override
public void close() {
for (var c : closeables) {
logger.atInfo().log("Closing local persistent consumer");

for (AutoCloseable c : closeables) {
try {
System.out.println("Closing " + c);
c.close();
} catch (Exception e) {
e.printStackTrace();
logger.atWarn()
.setCause(e)
.addArgument(c.getClass().getSimpleName())
.log("Error closing {}");
}
}

logger.atInfo().log("Local persistent consumer closed");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@ protected boolean canProcess(InT message) {
@Override
public void publish(InT message) {

System.err.println("DMB got event " + message);
if (!canProcess(message)) {
return;
}
// Deliver to all subscribers
for (MessageSubscriber<OutT> subscriber : subscribers) {
System.err.println("TO ASYNC " + message);

asyncExecutor.submit(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import io.grpc.stub.StreamObserver;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageBrokerGrpcServer extends MessageBrokerServiceGrpc.MessageBrokerServiceImplBase {
private static final Logger LOGGER = Logger.getLogger(MessageBrokerGrpcServer.class.getName());
private static final Logger logger = LoggerFactory.getLogger(MessageBrokerGrpcServer.class);
private final MessageBroker<Event, Event> messageBroker;

public MessageBrokerGrpcServer(MessageBroker<Event, Event> messageBroker) {
Expand All @@ -31,27 +31,31 @@ private void errorResponse(StreamObserver<EventResponse> responseObserver, Strin
@Override
public void subscribeToEvents(SubscriptionRequest request, StreamObserver<EventResponse> responseObserver) {
String topic = request.getTopic();
LOGGER.info("Received subscription request for topic: " + topic);
logger.atInfo().log("Subscription request received for topic: {}", topic);

if (topic == null || topic.isEmpty()) {
logger.atError().log("Invalid topic name received");
errorResponse(responseObserver, "Topic name cannot be empty",
new IllegalArgumentException("Topic name cannot be empty"));
return;
}

AtomicBoolean cancelled = new AtomicBoolean(false);

try {
// Create a subscription handler
MessageSubscriber<Event> subscriber = new MessageSubscriber<Event>() {
@Override
public void onMessage(Event event) {
// Skip if the stream has been cancelled
LOGGER.info("Received message for topic: " + topic);
logger.atInfo().log("Received message for topic: {}", topic);
if (cancelled.get()) {
return;
}
synchronized (responseObserver) {
try {
// Convert Event to EventResponse
EventResponse response = convertToGrpcEvent(event);
responseObserver.onNext(response);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error sending event to client", e);
logger.atError().setCause(e).log("Error sending event to client");
if (!cancelled.getAndSet(true)) {
errorResponse(responseObserver, "Error processing event", e);
}
Expand All @@ -61,27 +65,27 @@ public void onMessage(Event event) {

@Override
public void onError(Throwable error) {
LOGGER.log(Level.SEVERE, "Error subscribing to topic: " + topic, error);
logger.atError()
.addArgument(topic)
.setCause(error)
.log("Error subscribing to topic: {}");
if (!cancelled.getAndSet(true)) {
errorResponse(responseObserver, "Failed to subscribe to topic: " + topic, error);
}
}

};

ServerCallStreamObserver<EventResponse> responseCallObserver = (ServerCallStreamObserver<EventResponse>) responseObserver;
responseCallObserver.setOnCancelHandler(() -> {
cancelled.set(true);
messageBroker.unsubscribe(subscriber);
LOGGER.log(Level.INFO, "Unsubscribed from topic: " + topic);
logger.atInfo().log("Unsubscribed from topic: {}", topic);
});
messageBroker.subscribe(subscriber);

// Handle cancellation
// responseObserver.onCompleted();
LOGGER.log(Level.INFO, "Subscribed to topic: " + topic);

logger.atInfo().log("Subscribed to topic: {}", topic);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error subscribing to topic: " + topic, e);
logger.atError().setCause(e).log("Error setting up subscription to topic: {}", topic);
if (!cancelled.getAndSet(true)) {
errorResponse(responseObserver, "Failed to subscribe to topic: " + topic, e);
}
Expand Down Expand Up @@ -121,4 +125,4 @@ private EventResponse convertToGrpcEvent(Event event) {

return builder.build();
}
}
}
Loading