Skip to content

Commit 86ec796

Browse files
authored
Cursor catchup mechanism (#24)
* Update README * Update README * [cursor] Move messages table setup * [cursor] create cathup server and test * [cursor] correcting compile errors * [cursor] Catchup server tests pass * [cursor] Catchup service created * [cursor] Compile errors fixed with 'quick fix' * [cursor] Test added for catchup service, but it's testing the wrong thing - human taking over * [cursor] Added gap finding and test * [cursor] Added gap finding and test * [sourcery] Refactor suggestion
1 parent dd99b19 commit 86ec796

File tree

3 files changed

+180
-3
lines changed

3 files changed

+180
-3
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ Create a catchup mechanism
3535
- [ ] Verifies that there are no gaps in the event sequence (check the sequence for the earliest unprocessed event until this one)
3636
- [ ] Verifies that there are no earlier unprocessed events for the same subject
3737

38-
TODO
3938
Create a processor
4039
- [ ] ?
4140

src/main/java/com/p14n/postevent/CatchupService.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,61 @@ private void updateHwm(String subscriberName, long currentHwm, long newHwm) thro
157157
}
158158
}
159159
}
160+
161+
private static class GapCheckResult {
162+
final boolean gapFound;
163+
final long lastContiguousIdn;
164+
165+
GapCheckResult(boolean gapFound, long lastContiguousIdn) {
166+
this.gapFound = gapFound;
167+
this.lastContiguousIdn = lastContiguousIdn;
168+
}
169+
}
170+
171+
private GapCheckResult processMessages(ResultSet rs, long currentHwm) throws SQLException {
172+
long expectedNext = currentHwm + 1;
173+
long lastContiguousIdn = currentHwm;
174+
while (rs.next()) {
175+
long actualIdn = rs.getLong("idn");
176+
if (actualIdn > expectedNext) {
177+
LOGGER.log(Level.INFO, "Gap found: Expected {0}, found {1} (gap of {2})",
178+
new Object[] { expectedNext, actualIdn, actualIdn - expectedNext });
179+
return new GapCheckResult(true, lastContiguousIdn);
180+
}
181+
lastContiguousIdn = actualIdn;
182+
expectedNext = actualIdn + 1;
183+
}
184+
return new GapCheckResult(false, lastContiguousIdn);
185+
}
186+
187+
/**
188+
* Checks for gaps in the message sequence and updates the HWM to the last
189+
* contiguous message.
190+
*
191+
* @param subscriberName The name of the subscriber
192+
* @param currentHwm The current high water mark to start checking from
193+
* @return true if a gap was found, false if no gaps were found
194+
* @throws SQLException If a database error occurs
195+
*/
196+
public boolean hasSequenceGap(String subscriberName, long currentHwm) throws SQLException {
197+
LOGGER.log(Level.FINE, "Checking for sequence gaps after HWM {0} for subscriber {1}",
198+
new Object[] { currentHwm, subscriberName });
199+
String sql = "SELECT idn FROM postevent.messages WHERE idn > ? ORDER BY idn";
200+
201+
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
202+
stmt.setLong(1, currentHwm);
203+
try (ResultSet rs = stmt.executeQuery()) {
204+
GapCheckResult result = processMessages(rs, currentHwm);
205+
if (result.lastContiguousIdn > currentHwm) {
206+
LOGGER.log(Level.INFO, "Updating HWM from {0} to {1} for subscriber {2}",
207+
new Object[] { currentHwm, result.lastContiguousIdn, subscriberName });
208+
updateHwm(subscriberName, currentHwm, result.lastContiguousIdn);
209+
}
210+
if (!result.gapFound) {
211+
LOGGER.log(Level.INFO, "No sequence gaps found after HWM for subscriber {0}", subscriberName);
212+
}
213+
return result.gapFound;
214+
}
215+
}
216+
}
160217
}

src/test/java/com/p14n/postevent/CatchupServiceTest.java

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import java.sql.DriverManager;
1212
import java.sql.PreparedStatement;
1313
import java.sql.ResultSet;
14+
import java.sql.SQLException;
1415
import java.util.ArrayList;
1516
import java.util.List;
1617
import java.util.UUID;
1718

1819
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
1921
import static org.junit.jupiter.api.Assertions.assertTrue;
2022

2123
public class CatchupServiceTest {
@@ -81,6 +83,14 @@ private void createProcessingGap() throws Exception {
8183
""");
8284
}
8385

86+
private void copyEventsToMessages(long lowestIdn) throws Exception {
87+
connection.createStatement().execute("""
88+
INSERT INTO postevent.messages (id, source, datacontenttype, dataschema, subject, data, idn)
89+
select id, source, datacontenttype, dataschema, subject, data, idn
90+
from postevent.test_events
91+
where idn >= """ + lowestIdn);
92+
}
93+
8494
@Test
8595
public void testCatchupProcessesNewEvents() throws Exception {
8696
// Publish some test events
@@ -191,15 +201,126 @@ public void testCatchupWithExistingHwm() throws Exception {
191201
assertTrue(newHwm > initialHwm, "HWM should have increased");
192202
}
193203

194-
private long getCurrentHwm(String subscriberName) throws Exception {
204+
@Test
205+
public void testHasSequenceGapWithNoGap() throws Exception {
206+
// Publish sequential events
207+
log.debug("Publishing 5 sequential events");
208+
for (int i = 0; i < 5; i++) {
209+
Event event = new Event(
210+
UUID.randomUUID().toString(),
211+
"test-source",
212+
"test-type",
213+
"application/json",
214+
null,
215+
"test-subject",
216+
("{\"value\":" + i + "}").getBytes(),
217+
null);
218+
publisher.publish(event, connection, TEST_TOPIC);
219+
}
220+
221+
copyEventsToMessages(0);
222+
223+
// Initialize HWM to 0
224+
initializeHwm(SUBSCRIBER_NAME, 0);
225+
226+
// Check for gaps
227+
boolean hasGap = catchupService.hasSequenceGap(SUBSCRIBER_NAME, 0);
228+
229+
// Verify no gap was found
230+
assertFalse(hasGap, "Should not find any gaps in sequential events");
231+
232+
// Verify HWM was updated to the last event
233+
long newHwm = getCurrentHwm(SUBSCRIBER_NAME);
234+
assertEquals(5, newHwm, "HWM should be updated to the last event");
235+
}
236+
237+
@Test
238+
public void testHasSequenceGapWithGap() throws Exception {
239+
// Create a gap by publishing events with specific IDs
240+
// We'll manually insert events with IDNs 1, 2, 3, 5, 6 (gap at 4)
241+
log.debug("Publishing events with a gap");
242+
243+
// First, insert events 1-3
244+
for (int i = 1; i <= 3; i++) {
245+
Event event = new Event(
246+
UUID.randomUUID().toString(),
247+
"test-source",
248+
"test-type",
249+
"application/json",
250+
null,
251+
"test-subject",
252+
("{\"value\":" + i + "}").getBytes(),
253+
null);
254+
publisher.publish(event, connection, TEST_TOPIC);
255+
}
256+
257+
copyEventsToMessages(0);
258+
259+
// Then insert events 4-6
260+
for (int i = 4; i <= 6; i++) {
261+
log.debug("Publishing event {}", i);
262+
Event event = new Event(
263+
UUID.randomUUID().toString(),
264+
"test-source",
265+
"test-type",
266+
"application/json",
267+
null,
268+
"test-subject",
269+
("{\"value\":" + i + "}").getBytes(),
270+
null);
271+
publisher.publish(event, connection, TEST_TOPIC);
272+
}
273+
copyEventsToMessages(5);
274+
logEventsInTopicTable();
275+
logEventsInMessagesTable();
276+
277+
// Initialize HWM to 0
278+
initializeHwm(SUBSCRIBER_NAME, 0);
279+
280+
// Check for gaps
281+
boolean hasGap = catchupService.hasSequenceGap(SUBSCRIBER_NAME, 0);
282+
283+
// Verify a gap was found
284+
assertTrue(hasGap, "Should find a gap in the sequence");
285+
286+
// Verify HWM was updated to the last event before the gap
287+
long newHwm = getCurrentHwm(SUBSCRIBER_NAME);
288+
assertEquals(3, newHwm, "HWM should be updated to the last event before the gap");
289+
290+
}
291+
292+
/**
293+
* Helper method to initialize HWM for a subscriber
294+
*/
295+
private void initializeHwm(String subscriberName, long hwm) throws SQLException {
296+
String sql = "INSERT INTO postevent.contiguous_hwm (subscriber_name, hwm) " +
297+
"VALUES (?, ?) " +
298+
"ON CONFLICT (subscriber_name) DO UPDATE SET hwm = ?";
299+
300+
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
301+
stmt.setString(1, subscriberName);
302+
stmt.setLong(2, hwm);
303+
stmt.setLong(3, hwm);
304+
305+
stmt.executeUpdate();
306+
}
307+
}
308+
309+
/**
310+
* Helper method to get current HWM for a subscriber
311+
*/
312+
private long getCurrentHwm(String subscriberName) throws SQLException {
195313
String sql = "SELECT hwm FROM postevent.contiguous_hwm WHERE subscriber_name = ?";
314+
196315
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
197316
stmt.setString(1, subscriberName);
317+
198318
try (ResultSet rs = stmt.executeQuery()) {
199319
if (rs.next()) {
200320
return rs.getLong("hwm");
321+
} else {
322+
return 0;
201323
}
202-
return 0;
203324
}
204325
}
205326
}

0 commit comments

Comments
 (0)