Skip to content

Commit 6231ccc

Browse files
authored
Augment catchup latest (#48)
* [augment] Augment adding fetch latest * [augment] Augment forgot server implementation * [human] Off by 1 error * [human] Switch writing back on, change db type
1 parent 9a0ec9d commit 6231ccc

File tree

12 files changed

+163
-15
lines changed

12 files changed

+163
-15
lines changed

app/infra/tf/rds.tf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ resource "aws_db_instance" "postevent" {
3030
identifier = "postevent-${count.index}"
3131
engine = "postgres"
3232
engine_version = "17"
33-
instance_class = "db.t3.micro"
33+
instance_class = "db.t4g.micro"
34+
storage_type = "gp3"
3435
allocated_storage = 20
3536
storage_encrypted = true
3637
performance_insights_enabled = true

app/src/main/java/com/p14n/postevent/App.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ private static void run(String affinity, String[] write, String[] read, String d
139139
}
140140

141141
} else {
142-
Thread.currentThread().join();
143-
// writeContinuously(ds, affinity, write, ot);
142+
writeContinuously(ds, affinity, write, ot);
144143
}
145144

146145
} finally {
@@ -168,8 +167,9 @@ private static NettyChannelBuilder buildClientChannel(String host, int port) {
168167
.sslContext(buildSslContext());
169168
}
170169

171-
private static RemotePersistentConsumer runConsumerClient(String[] write, String[] read, String[] topichosts, DataSource ds,
172-
OpenTelemetry ot) {
170+
private static RemotePersistentConsumer runConsumerClient(String[] write, String[] read, String[] topichosts,
171+
DataSource ds,
172+
OpenTelemetry ot) {
173173

174174
RemotePersistentConsumer cc;
175175
cc = new RemotePersistentConsumer(ot, 10);

library/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,15 @@ public void start() throws IOException, InterruptedException {
5858
var lc = new LocalConsumer<>(cfg, pb);
5959

6060
seb.subscribe(new CatchupService(ds, new CatchupServer(ds), seb));
61-
var unprocessedSubmitter = new UnprocessedSubmitter(seb,ds, new UnprocessedEventFinder(), tb, batchSize);
61+
var unprocessedSubmitter = new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize);
6262
seb.subscribe(unprocessedSubmitter);
6363

6464
asyncExecutor.scheduleAtFixedRate(() -> {
65-
logger.atDebug().log("Triggering unprocessed check");
65+
logger.atDebug().log("Triggering unprocessed check and fetch latest");
6666
seb.publish(SystemEvent.UnprocessedCheckRequired);
67+
for (String topic : cfg.topics()) {
68+
seb.publish(SystemEvent.FetchLatest.withTopic(topic));
69+
}
6770
}, 30, 30, TimeUnit.SECONDS);
6871

6972
lc.start();

library/src/main/java/com/p14n/postevent/RemotePersistentConsumer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,15 @@ public void start(Set<String> topics, DataSource ds, ManagedChannel channel) {
7676
client.subscribe(topic, pb);
7777
}
7878
seb.subscribe(new CatchupService(ds, catchupClient, seb));
79-
seb.subscribe(new UnprocessedSubmitter(seb,ds, new UnprocessedEventFinder(), tb, batchSize));
79+
seb.subscribe(new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize));
8080

8181
asyncExecutor.scheduleAtFixedRate(
82-
() -> seb.publish(SystemEvent.UnprocessedCheckRequired),
82+
() -> {
83+
seb.publish(SystemEvent.UnprocessedCheckRequired);
84+
for (String topic : topics) {
85+
seb.publish(SystemEvent.FetchLatest.withTopic(topic));
86+
}
87+
},
8388
30, 30, TimeUnit.SECONDS);
8489

8590
closeables = List.of(client, catchupClient, pb, seb, tb);

library/src/main/java/com/p14n/postevent/broker/SystemEvent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
public enum SystemEvent implements Traceable {
66

77
CatchupRequired,
8-
UnprocessedCheckRequired;
8+
UnprocessedCheckRequired,
9+
FetchLatest; // New event type
910

1011
public String topic;
1112

library/src/main/java/com/p14n/postevent/catchup/CatchupServer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,27 @@ public List<Event> fetchEvents(long startAfter, long end, int maxResults, String
6565
throw new RuntimeException("Failed to fetch events", e);
6666
}
6767
}
68-
}
68+
69+
@Override
70+
public long getLatestMessageId(String topic) {
71+
if (topic == null || topic.trim().isEmpty()) {
72+
throw new IllegalArgumentException("Topic name cannot be null or empty");
73+
}
74+
75+
String sql = String.format("SELECT MAX(idn) FROM postevent.%s", topic);
76+
77+
try (Connection conn = dataSource.getConnection();
78+
PreparedStatement stmt = conn.prepareStatement(sql);
79+
ResultSet rs = stmt.executeQuery()) {
80+
81+
if (rs.next()) {
82+
return rs.getLong(1);
83+
}
84+
return 0;
85+
86+
} catch (SQLException e) {
87+
logger.atError().setCause(e).log("Error fetching latest message ID");
88+
throw new RuntimeException("Failed to fetch latest message ID", e);
89+
}
90+
}
91+
}

library/src/main/java/com/p14n/postevent/catchup/CatchupServerInterface.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,12 @@ public interface CatchupServerInterface {
1717
* @return A list of events within the specified range
1818
*/
1919
List<Event> fetchEvents(long startAfter, long end, int maxResults, String topic);
20-
}
20+
21+
/**
22+
* Retrieves the latest message ID for a given topic.
23+
*
24+
* @param topic The topic to get the latest message ID for
25+
* @return The latest message ID for the specified topic
26+
*/
27+
long getLatestMessageId(String topic);
28+
}

library/src/main/java/com/p14n/postevent/catchup/CatchupService.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,66 @@ private void updateHwm(Connection connection, String topicName, long currentHwm,
186186
public void onMessage(SystemEvent message) {
187187
if (Objects.requireNonNull(message) == SystemEvent.CatchupRequired) {
188188
oneAtATime(() -> catchup(message.topic), () -> onMessage(message));
189+
} else if (message == SystemEvent.FetchLatest) {
190+
oneAtATime(() -> fetchLatest(message.topic), () -> onMessage(message));
191+
}
192+
}
193+
194+
private int fetchLatest(String topicName) {
195+
if (topicName == null) {
196+
LOGGER.warn("Topic name is null for fetch latest");
197+
return 0;
198+
}
199+
200+
try (Connection conn = datasource.getConnection()) {
201+
conn.setAutoCommit(false);
202+
203+
long currentHwm = getCurrentHwm(conn, topicName);
204+
205+
// Get the latest message ID from the server
206+
long latestMessageId = catchupServer.getLatestMessageId(topicName);
207+
208+
if (latestMessageId <= currentHwm) {
209+
LOGGER.info("No new messages found after HWM {} for topic {}", currentHwm, topicName);
210+
return 0;
211+
}
212+
213+
// Fetch just the latest message
214+
List<Event> events = catchupServer.fetchEvents(latestMessageId - 1, latestMessageId, 1, topicName);
215+
216+
if (events.isEmpty()) {
217+
LOGGER.info("No events found in range for topic: " + topicName);
218+
return 0;
219+
}
220+
221+
// Write event to messages table
222+
int processedCount = writeEventsToMessagesTable(conn, events);
223+
224+
LOGGER.info("Processed latest event for topic {}",
225+
topicName);
226+
227+
conn.commit();
228+
229+
// If there are more messages between currentHwm and latestMessageId,
230+
// trigger a catchup to get the rest
231+
systemEventBroker.publish(SystemEvent.CatchupRequired.withTopic(topicName));
232+
233+
return processedCount;
234+
} catch (SQLException e) {
235+
LOGGER.error("Failed to fetch latest", e);
236+
return 0;
237+
}
238+
}
239+
240+
private long findLatestMessageId(Connection connection, String topicName) throws SQLException {
241+
String sql = "SELECT MAX(idn) FROM postevent." + topicName;
242+
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
243+
try (ResultSet rs = stmt.executeQuery()) {
244+
if (rs.next()) {
245+
return rs.getLong(1);
246+
}
247+
return 0;
248+
}
189249
}
190250
}
191251

@@ -291,4 +351,4 @@ public AtomicBoolean getRunning() {
291351
return running;
292352
}
293353

294-
}
354+
}

library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcClient.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,26 @@ private Event convertFromGrpcEvent(com.p14n.postevent.catchup.grpc.Event grpcEve
8585
grpcEvent.getTraceparent());
8686
}
8787

88+
@Override
89+
public long getLatestMessageId(String topic) {
90+
logger.atInfo()
91+
.addArgument(topic)
92+
.log("Fetching latest message ID for topic {}");
93+
94+
TopicRequest request = TopicRequest.newBuilder()
95+
.setTopic(topic)
96+
.build();
97+
98+
LatestMessageIdResponse response;
99+
try {
100+
response = blockingStub.getLatestMessageId(request);
101+
return response.getMessageId();
102+
} catch (StatusRuntimeException e) {
103+
logger.atWarn().setCause(e).log("RPC failed: {}", e.getStatus());
104+
throw new RuntimeException("Failed to fetch latest message ID via gRPC", e);
105+
}
106+
}
107+
88108
@Override
89109
public void close() {
90110
try {

library/src/main/java/com/p14n/postevent/catchup/grpc/CatchupGrpcServer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,23 @@ public CatchupServiceImpl(CatchupServerInterface catchupServer) {
5959
this.catchupServer = catchupServer;
6060
}
6161

62+
@Override
63+
public void getLatestMessageId(TopicRequest request, StreamObserver<LatestMessageIdResponse> responseObserver) {
64+
try {
65+
long latestId = catchupServer.getLatestMessageId(request.getTopic());
66+
67+
LatestMessageIdResponse response = LatestMessageIdResponse.newBuilder()
68+
.setMessageId(latestId)
69+
.build();
70+
71+
responseObserver.onNext(response);
72+
responseObserver.onCompleted();
73+
} catch (Exception e) {
74+
logger.error("Error getting latest message ID", e);
75+
responseObserver.onError(e);
76+
}
77+
}
78+
6279
@Override
6380
public void fetchEvents(FetchEventsRequest request, StreamObserver<FetchEventsResponse> responseObserver) {
6481
try {

0 commit comments

Comments
 (0)