Skip to content

Cursor catchup mechanism#24

Merged
p14n merged 16 commits intomainfrom
cursor-catchup-mechanism
Mar 10, 2025
Merged

Cursor catchup mechanism#24
p14n merged 16 commits intomainfrom
cursor-catchup-mechanism

Conversation

@p14n
Copy link
Copy Markdown
Owner

@p14n p14n commented Mar 10, 2025

Summary by Sourcery

Implements a cursor catchup mechanism that checks for gaps in the message sequence and updates the HWM to the last contiguous message.

New Features:

  • Introduces a mechanism to detect gaps in the event sequence for a subscriber and update the High Water Mark (HWM) to the last contiguous message ID.
  • Adds the hasSequenceGap method to the CatchupService to identify gaps in the message sequence.

Tests:

  • Adds tests to verify the gap detection mechanism in CatchupService, including scenarios with and without gaps.

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Mar 10, 2025

Reviewer's Guide by Sourcery

This pull request introduces a mechanism to detect gaps in the sequence of messages for a subscriber. It adds a hasSequenceGap method to the CatchupService that checks for gaps in the message sequence and updates the HWM to the last contiguous message. The pull request also includes new tests to verify the gap detection mechanism.

Sequence diagram for gap detection in CatchupService

sequenceDiagram
    participant CatchupService
    participant Database

    CatchupService->>Database: SELECT idn FROM postevent.messages WHERE idn > currentHwm ORDER BY idn
    activate Database
    Database-->>CatchupService: ResultSet (message IDs)
deactivate Database

    loop For each message ID in ResultSet
        CatchupService->>CatchupService: Check if (actualIdn > expectedNext)
        alt Gap found
            CatchupService->>CatchupService: Log gap
            CatchupService-->>CatchupService: break
        else No gap
            CatchupService->>CatchupService: Update lastContiguousIdn and expectedNext
        end
    end

    alt lastContiguousIdn > currentHwm
        CatchupService->>CatchupService: Update HWM
        CatchupService->>Database: Update HWM in database
        activate Database
        Database-->>CatchupService: Confirmation
        deactivate Database
    end

    CatchupService-->>CatchupService: Return gapFound
Loading

File-Level Changes

Change Details Files
Implemented a mechanism to detect gaps in the sequence of messages for a subscriber and update the HWM to the last contiguous message before the gap.
  • Added a hasSequenceGap method to check for gaps in the message sequence.
  • The hasSequenceGap method queries the database for messages with IDNs greater than the current HWM, ordered by IDN.
  • The method iterates through the result set, checking for gaps between expected and actual IDNs.
  • If a gap is found, the method updates the HWM to the last contiguous IDN and returns true.
  • If no gap is found, the method updates the HWM to the last processed IDN and returns false.
src/main/java/com/p14n/postevent/CatchupService.java
Added tests for the sequence gap detection mechanism.
  • Added a test case testHasSequenceGapWithNoGap to verify that no gap is detected when events are sequential.
  • Added a test case testHasSequenceGapWithGap to verify that a gap is detected when there is a gap in the sequence.
  • Added helper methods initializeHwm and getCurrentHwm to manage the HWM during testing.
  • Added copyEventsToMessages to copy events from the test_events table to the messages table.
src/test/java/com/p14n/postevent/CatchupServiceTest.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on
    an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @p14n - I've reviewed your changes - here's some feedback:

Overall Comments:

  • Consider adding a method to the test class to encapsulate the event creation logic to avoid duplication.
  • The copyEventsToMessages method duplicates data - consider if this is the best approach.
Here's what I looked at during the review
  • 🟡 General issues: 2 issues found
  • 🟢 Security: all looks good
  • 🟡 Testing: 1 issue found
  • 🟡 Complexity: 1 issue found
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

* @return true if a gap was found, false if no gaps were found
* @throws SQLException If a database error occurs
*/
public boolean hasSequenceGap(String subscriberName, long currentHwm) throws SQLException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Method name might not clearly express its side effects.

Since this method both checks for sequence gaps and updates the HWM when contiguous messages are found, consider renaming it (or enhancing the documentation) so its dual behavior is immediately evident to future maintainers.

Suggested implementation:

    /**
     * Checks for gaps in the message sequence and updates the HWM to the last contiguous message.
     * 
     * This method both detects gaps and, if contiguous messages are found, advances the HWM.
     *
     * @param subscriberName The name of the subscriber
     * @param currentHwm     The current high water mark to start checking from
     * @return true if a gap was found, false if no gaps were found (and the HWM has been updated)
     * @throws SQLException If a database error occurs
     */
    public boolean checkAndAdvanceHwm(String subscriberName, long currentHwm) throws SQLException {

Ensure that all references to the method "hasSequenceGap" in other parts of your codebase (e.g., callers and tests) are updated to "checkAndAdvanceHwm" to maintain consistency.

@@ -81,6 +83,14 @@ private void createProcessingGap() throws Exception {
""");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Edge case: Multiple gaps

Consider adding a test case with multiple gaps in the sequence to ensure the hasSequenceGap method handles them correctly. For example, a sequence like 1, 2, 4, 5, 7, 8 would have gaps at 3 and 6.

Suggested change
}
}
@Test
public void testHasSequenceGapWithMultipleGaps() throws Exception {
// Create multiple gaps by publishing events with specific IDs.
// We'll manually insert events with IDs 1, 2, 4, 5, 7, 8 (gaps at 3 and 6)
log.debug("Publishing events with multiple gaps");
// Insert events with IDs: 1, 2
for (int i = 1; i <= 2; i++) {
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
i // assuming this parameter represents the event's numeric ID
);
publishEvent(event);
}
// Insert events with IDs: 4, 5
for (int i = 4; i <= 5; i++) {
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
i
);
publishEvent(event);
}
// Insert events with IDs: 7, 8
for (int i = 7; i <= 8; i++) {
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
i
);
publishEvent(event);
}
// Verify that gaps are detected in the sequence
boolean hasGap = hasSequenceGap();
assertTrue(hasGap, "Should detect gaps in events with multiple missing sequence numbers");
// Verify HWM was updated to the last event (ID 8)
long newHwm = getCurrentHwm(SUBSCRIBER_NAME);
assertEquals(8, newHwm, "HWM should be updated to the last event");
}


TODO
Create a processor
- [ ] ?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (typo): Remove the stray question mark.

The question mark seems like leftover debugging text and should be removed for clarity.

@@ -157,4 +157,59 @@ private void updateHwm(String subscriberName, long currentHwm, long newHwm) thro
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the ResultSet loop into a helper method to reduce nesting complexity and improve readability of the hasSequenceGap method, by creating a private method that processes the result set and returns both the gap flag and the last contiguous IDN.

Consider extracting the ResultSet loop into a helper method to reduce nesting. For example, you could create a private method that processes the result set and returns both the gap flag and the last contiguous IDN:

private static class GapCheckResult {
    final boolean gapFound;
    final long lastContiguousIdn;

    GapCheckResult(boolean gapFound, long lastContiguousIdn) {
        this.gapFound = gapFound;
        this.lastContiguousIdn = lastContiguousIdn;
    }
}

private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQLException {
    long expectedNext = currentHwm + 1;
    long lastContiguousIdn = currentHwm;
    while (rs.next()) {
        long actualIdn = rs.getLong("idn");
        if (actualIdn > expectedNext) {
            LOGGER.log(Level.INFO, "Gap found: Expected {0}, found {1} (gap of {2})",
                    new Object[]{expectedNext, actualIdn, actualIdn - expectedNext});
            return new GapCheckResult(true, lastContiguousIdn);
        }
        lastContiguousIdn = actualIdn;
        expectedNext = actualIdn + 1;
    }
    return new GapCheckResult(false, lastContiguousIdn);
}

Then simplify your hasSequenceGap method by using this helper:

public boolean hasSequenceGap(String subscriberName, long currentHwm) throws SQLException {
    LOGGER.log(Level.FINE, "Checking for sequence gaps after HWM {0} for subscriber {1}",
            new Object[]{currentHwm, subscriberName});
    String sql = "SELECT idn FROM postevent.messages WHERE idn > ? ORDER BY idn";

    try (PreparedStatement stmt = connection.prepareStatement(sql)) {
        stmt.setLong(1, currentHwm);
        try (ResultSet rs = stmt.executeQuery()) {
            GapCheckResult result = processMessages(rs, currentHwm);
            if (result.lastContiguousIdn > currentHwm) {
                LOGGER.log(Level.INFO, "Updating HWM from {0} to {1} for subscriber {2}",
                        new Object[]{currentHwm, result.lastContiguousIdn, subscriberName});
                updateHwm(subscriberName, currentHwm, result.lastContiguousIdn);
            }
            if (!result.gapFound) {
                LOGGER.log(Level.INFO, "No sequence gaps found after HWM for subscriber {0}", subscriberName);
            }
            return result.gapFound;
        }
    }
}

This refactoring preserves your functionality while reducing the nesting complexity in hasSequenceGap.

@p14n p14n merged commit 86ec796 into main Mar 10, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant