Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ Create a catchup mechanism
- [ ] Verifies that there are no gaps in the event sequence (check the sequence for the earliest unprocessed event until this one)
- [ ] Verifies that there are no earlier unprocessed events for the same subject

TODO
Create a processor
- [ ] ?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (typo): Remove the stray question mark.

The question mark seems like leftover debugging text and should be removed for clarity.


Expand Down
57 changes: 57 additions & 0 deletions src/main/java/com/p14n/postevent/CatchupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,61 @@ private void updateHwm(String subscriberName, long currentHwm, long newHwm) thro
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the ResultSet loop into a helper method to reduce nesting complexity and improve readability of the hasSequenceGap method, by creating a private method that processes the result set and returns both the gap flag and the last contiguous IDN.

Consider extracting the ResultSet loop into a helper method to reduce nesting. For example, you could create a private method that processes the result set and returns both the gap flag and the last contiguous IDN:

private static class GapCheckResult {
    final boolean gapFound;
    final long lastContiguousIdn;

    GapCheckResult(boolean gapFound, long lastContiguousIdn) {
        this.gapFound = gapFound;
        this.lastContiguousIdn = lastContiguousIdn;
    }
}

private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQLException {
    long expectedNext = currentHwm + 1;
    long lastContiguousIdn = currentHwm;
    while (rs.next()) {
        long actualIdn = rs.getLong("idn");
        if (actualIdn > expectedNext) {
            LOGGER.log(Level.INFO, "Gap found: Expected {0}, found {1} (gap of {2})",
                    new Object[]{expectedNext, actualIdn, actualIdn - expectedNext});
            return new GapCheckResult(true, lastContiguousIdn);
        }
        lastContiguousIdn = actualIdn;
        expectedNext = actualIdn + 1;
    }
    return new GapCheckResult(false, lastContiguousIdn);
}

Then simplify your hasSequenceGap method by using this helper:

public boolean hasSequenceGap(String subscriberName, long currentHwm) throws SQLException {
    LOGGER.log(Level.FINE, "Checking for sequence gaps after HWM {0} for subscriber {1}",
            new Object[]{currentHwm, subscriberName});
    String sql = "SELECT idn FROM postevent.messages WHERE idn > ? ORDER BY idn";

    try (PreparedStatement stmt = connection.prepareStatement(sql)) {
        stmt.setLong(1, currentHwm);
        try (ResultSet rs = stmt.executeQuery()) {
            GapCheckResult result = processMessages(rs, currentHwm);
            if (result.lastContiguousIdn > currentHwm) {
                LOGGER.log(Level.INFO, "Updating HWM from {0} to {1} for subscriber {2}",
                        new Object[]{currentHwm, result.lastContiguousIdn, subscriberName});
                updateHwm(subscriberName, currentHwm, result.lastContiguousIdn);
            }
            if (!result.gapFound) {
                LOGGER.log(Level.INFO, "No sequence gaps found after HWM for subscriber {0}", subscriberName);
            }
            return result.gapFound;
        }
    }
}

This refactoring preserves your functionality while reducing the nesting complexity in hasSequenceGap.

}
}

private static class GapCheckResult {
final boolean gapFound;
final long lastContiguousIdn;

GapCheckResult(boolean gapFound, long lastContiguousIdn) {
this.gapFound = gapFound;
this.lastContiguousIdn = lastContiguousIdn;
}
}

private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQLException {
long expectedNext = currentHwm + 1;
long lastContiguousIdn = currentHwm;
while (rs.next()) {
long actualIdn = rs.getLong("idn");
if (actualIdn > expectedNext) {
LOGGER.log(Level.INFO, "Gap found: Expected {0}, found {1} (gap of {2})",
new Object[] { expectedNext, actualIdn, actualIdn - expectedNext });
return new GapCheckResult(true, lastContiguousIdn);
}
lastContiguousIdn = actualIdn;
expectedNext = actualIdn + 1;
}
return new GapCheckResult(false, lastContiguousIdn);
}

/**
* Checks for gaps in the message sequence and updates the HWM to the last
* contiguous message.
*
* @param subscriberName The name of the subscriber
* @param currentHwm The current high water mark to start checking from
* @return true if a gap was found, false if no gaps were found
* @throws SQLException If a database error occurs
*/
public boolean hasSequenceGap(String subscriberName, long currentHwm) throws SQLException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Method name might not clearly express its side effects.

Since this method both checks for sequence gaps and updates the HWM when contiguous messages are found, consider renaming it (or enhancing the documentation) so its dual behavior is immediately evident to future maintainers.

Suggested implementation:

    /**
     * Checks for gaps in the message sequence and updates the HWM to the last contiguous message.
     * 
     * This method both detects gaps and, if contiguous messages are found, advances the HWM.
     *
     * @param subscriberName The name of the subscriber
     * @param currentHwm     The current high water mark to start checking from
     * @return true if a gap was found, false if no gaps were found (and the HWM has been updated)
     * @throws SQLException If a database error occurs
     */
    public boolean checkAndAdvanceHwm(String subscriberName, long currentHwm) throws SQLException {

Ensure that all references to the method "hasSequenceGap" in other parts of your codebase (e.g., callers and tests) are updated to "checkAndAdvanceHwm" to maintain consistency.

LOGGER.log(Level.FINE, "Checking for sequence gaps after HWM {0} for subscriber {1}",
new Object[] { currentHwm, subscriberName });
String sql = "SELECT idn FROM postevent.messages WHERE idn > ? ORDER BY idn";

try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setLong(1, currentHwm);
try (ResultSet rs = stmt.executeQuery()) {
GapCheckResult result = processMessages(rs, currentHwm);
if (result.lastContiguousIdn > currentHwm) {
LOGGER.log(Level.INFO, "Updating HWM from {0} to {1} for subscriber {2}",
new Object[] { currentHwm, result.lastContiguousIdn, subscriberName });
updateHwm(subscriberName, currentHwm, result.lastContiguousIdn);
}
if (!result.gapFound) {
LOGGER.log(Level.INFO, "No sequence gaps found after HWM for subscriber {0}", subscriberName);
}
return result.gapFound;
}
}
}
}
125 changes: 123 additions & 2 deletions src/test/java/com/p14n/postevent/CatchupServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CatchupServiceTest {
Expand Down Expand Up @@ -81,6 +83,14 @@ private void createProcessingGap() throws Exception {
""");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Edge case: Multiple gaps

Consider adding a test case with multiple gaps in the sequence to ensure the hasSequenceGap method handles them correctly. For example, a sequence like 1, 2, 4, 5, 7, 8 would have gaps at 3 and 6.

Suggested change
}
}
@Test
public void testHasSequenceGapWithMultipleGaps() throws Exception {
// Create multiple gaps by publishing events with specific IDs.
// We'll manually insert events with IDs 1, 2, 4, 5, 7, 8 (gaps at 3 and 6)
log.debug("Publishing events with multiple gaps");
// Insert events with IDs: 1, 2
for (int i = 1; i <= 2; i++) {
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
i // assuming this parameter represents the event's numeric ID
);
publishEvent(event);
}
// Insert events with IDs: 4, 5
for (int i = 4; i <= 5; i++) {
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
i
);
publishEvent(event);
}
// Insert events with IDs: 7, 8
for (int i = 7; i <= 8; i++) {
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
i
);
publishEvent(event);
}
// Verify that gaps are detected in the sequence
boolean hasGap = hasSequenceGap();
assertTrue(hasGap, "Should detect gaps in events with multiple missing sequence numbers");
// Verify HWM was updated to the last event (ID 8)
long newHwm = getCurrentHwm(SUBSCRIBER_NAME);
assertEquals(8, newHwm, "HWM should be updated to the last event");
}


private void copyEventsToMessages(long lowestIdn) throws Exception {
connection.createStatement().execute("""
INSERT INTO postevent.messages (id, source, datacontenttype, dataschema, subject, data, idn)
select id, source, datacontenttype, dataschema, subject, data, idn
from postevent.test_events
where idn >= """ + lowestIdn);
}

@Test
public void testCatchupProcessesNewEvents() throws Exception {
// Publish some test events
Expand Down Expand Up @@ -191,15 +201,126 @@ public void testCatchupWithExistingHwm() throws Exception {
assertTrue(newHwm > initialHwm, "HWM should have increased");
}

private long getCurrentHwm(String subscriberName) throws Exception {
@Test
public void testHasSequenceGapWithNoGap() throws Exception {
// Publish sequential events
log.debug("Publishing 5 sequential events");
for (int i = 0; i < 5; i++) {
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
"test-subject",
("{\"value\":" + i + "}").getBytes(),
null);
publisher.publish(event, connection, TEST_TOPIC);
}

copyEventsToMessages(0);

// Initialize HWM to 0
initializeHwm(SUBSCRIBER_NAME, 0);

// Check for gaps
boolean hasGap = catchupService.hasSequenceGap(SUBSCRIBER_NAME, 0);

// Verify no gap was found
assertFalse(hasGap, "Should not find any gaps in sequential events");

// Verify HWM was updated to the last event
long newHwm = getCurrentHwm(SUBSCRIBER_NAME);
assertEquals(5, newHwm, "HWM should be updated to the last event");
}

@Test
public void testHasSequenceGapWithGap() throws Exception {
// Create a gap by publishing events with specific IDs
// We'll manually insert events with IDNs 1, 2, 3, 5, 6 (gap at 4)
log.debug("Publishing events with a gap");

// First, insert events 1-3
for (int i = 1; i <= 3; i++) {
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
"test-subject",
("{\"value\":" + i + "}").getBytes(),
null);
publisher.publish(event, connection, TEST_TOPIC);
}

copyEventsToMessages(0);

// Then insert events 4-6
for (int i = 4; i <= 6; i++) {
log.debug("Publishing event {}", i);
Event event = new Event(
UUID.randomUUID().toString(),
"test-source",
"test-type",
"application/json",
null,
"test-subject",
("{\"value\":" + i + "}").getBytes(),
null);
publisher.publish(event, connection, TEST_TOPIC);
}
copyEventsToMessages(5);
logEventsInTopicTable();
logEventsInMessagesTable();

// Initialize HWM to 0
initializeHwm(SUBSCRIBER_NAME, 0);

// Check for gaps
boolean hasGap = catchupService.hasSequenceGap(SUBSCRIBER_NAME, 0);

// Verify a gap was found
assertTrue(hasGap, "Should find a gap in the sequence");

// Verify HWM was updated to the last event before the gap
long newHwm = getCurrentHwm(SUBSCRIBER_NAME);
assertEquals(3, newHwm, "HWM should be updated to the last event before the gap");

}

/**
* Helper method to initialize HWM for a subscriber
*/
private void initializeHwm(String subscriberName, long hwm) throws SQLException {
String sql = "INSERT INTO postevent.contiguous_hwm (subscriber_name, hwm) " +
"VALUES (?, ?) " +
"ON CONFLICT (subscriber_name) DO UPDATE SET hwm = ?";

try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, subscriberName);
stmt.setLong(2, hwm);
stmt.setLong(3, hwm);

stmt.executeUpdate();
}
}

/**
* Helper method to get current HWM for a subscriber
*/
private long getCurrentHwm(String subscriberName) throws SQLException {
String sql = "SELECT hwm FROM postevent.contiguous_hwm WHERE subscriber_name = ?";

try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, subscriberName);

try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
return rs.getLong("hwm");
} else {
return 0;
}
return 0;
}
}
}
Expand Down