Skip to content

Augment catchup latest#48

Merged
p14n merged 4 commits intomainfrom
augment-catchup-latest
Apr 26, 2025
Merged

Augment catchup latest#48
p14n merged 4 commits intomainfrom
augment-catchup-latest

Conversation

@p14n
Copy link
Copy Markdown
Owner

@p14n p14n commented Apr 26, 2025

Fixes #34

Summary by Sourcery

Introduce a mechanism to periodically fetch the single latest event for each topic, ensuring recent data is processed quickly before a full catchup occurs.

New Features:

  • Add a "fetch latest" capability to retrieve and process only the newest event per topic.

Bug Fixes:

  • Fix the SQL function call for dropping PostgreSQL replication slots (call -> select).
  • Enable the continuous writing functionality in the main application.

Enhancements:

  • Integrate the "fetch latest" trigger into the periodic background tasks for both local and remote consumers.
  • Expose the latest message ID retrieval via a new gRPC endpoint.

Deployment:

  • Update the AWS RDS instance configuration to use db.t4g.micro instance class and gp3 storage type.

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Apr 26, 2025

🧙 Sourcery has finished reviewing your pull request!


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @p14n - I've reviewed your changes and they look great!

Here's what I looked at during the review
  • 🟢 General issues: all looks good
  • 🟡 Security: 1 issue found
  • 🟢 Testing: all looks good
  • 🟡 Complexity: 1 issue found
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

}

private long findLatestMessageId(Connection connection, String topicName) throws SQLException {
String sql = "SELECT MAX(idn) FROM postevent." + topicName;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 suggestion (security): Consider sanitizing or validating topicName in SQL construction.

Directly concatenating topicName into the SQL string may pose SQL injection risks if the value is not strictly controlled. Although it may be validated elsewhere, adding safeguards here could improve security.

Suggested implementation:

    private long findLatestMessageId(Connection connection, String topicName) throws SQLException {
        if (!isValidTopicName(topicName)) {
            throw new IllegalArgumentException("Invalid topic name: " + topicName);
        }
        String sql = "SELECT MAX(idn) FROM postevent." + topicName;
        try (PreparedStatement stmt = connection.prepareStatement(sql)) {
    private boolean isValidTopicName(String topicName) {
        return topicName != null && topicName.matches("^[a-zA-Z0-9_]+$");
    }

Ensure that the isValidTopicName method is placed within the CatchupService class along with other helper methods. Adjust the regular expression as needed to match the naming conventions for topic names in your environment.

public void onMessage(SystemEvent message) {
if (Objects.requireNonNull(message) == SystemEvent.CatchupRequired) {
oneAtATime(() -> catchup(message.topic), () -> onMessage(message));
} else if (message == SystemEvent.FetchLatest) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting responsibilities in fetchLatest into separate methods for topic validation, transaction handling, business logic, and event publishing.

It would be beneficial to extract some of the responsibilities in fetchLatest into separate methods. For example, you can:

  • Extract the null check. Move topic-name validation into its own method.
  • Separate DB transaction handling. Isolate the logic to start a transaction, perform operations, and commit/rollback into a helper.
  • Extract business logic. Isolate request-specific logic like getting the current HWM, fetching events, and processing them.
  • Delegate event publishing. Move triggering follow-up events to a dedicated method.

This will reduce nesting and make unit testing easier. For instance, you could refactor as follows:

private boolean isValidTopic(String topicName) {
    if (topicName == null) {
        LOGGER.warn("Topic name is null for fetch latest");
        return false;
    }
    return true;
}

private int performFetchLatest(Connection conn, String topicName) throws SQLException {
    long currentHwm = getCurrentHwm(conn, topicName);
    long latestMessageId = catchupServer.getLatestMessageId(topicName);

    if (latestMessageId <= currentHwm) {
        LOGGER.info("No new messages found after HWM {} for topic {}", currentHwm, topicName);
        return 0;
    }
    List<Event> events = catchupServer.fetchEvents(latestMessageId - 1, latestMessageId, 1, topicName);
    if (events.isEmpty()) {
        LOGGER.info("No events found in range for topic: {}", topicName);
        return 0;
    }
    return writeEventsToMessagesTable(conn, events);
}

private void triggerCatchupIfNeeded(String topicName, long currentHwm, long latestMessageId) {
    if (latestMessageId > currentHwm) {
        systemEventBroker.publish(SystemEvent.CatchupRequired.withTopic(topicName));
    }
}

private int fetchLatest(String topicName) {
    if (!isValidTopic(topicName)) return 0;

    try (Connection conn = datasource.getConnection()) {
        conn.setAutoCommit(false);

        int processedCount = performFetchLatest(conn, topicName);
        conn.commit();

        triggerCatchupIfNeeded(topicName, getCurrentHwm(conn, topicName), catchupServer.getLatestMessageId(topicName));

        LOGGER.info("Processed latest event for topic {}", topicName);
        return processedCount;
    } catch (SQLException e) {
        LOGGER.error("Failed to fetch latest", e);
        return 0;
    }
}

These changes keep the current functionality intact while reducing the method's complexity and making future maintenance easier.

@p14n p14n merged commit 6231ccc into main Apr 26, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Initial connect should run catchup for all events

1 participant