Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 70 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,86 @@ A library to publish and receive events using postgres and grpc
* Receive all previous events on first connect
* Receive new events in real time


Create a local Event consumer
- [x] Publish a message to the event topic
- [x] Consume the message from the event topic
- [x] Ensure the message received is the same as the message published

Create a persistent Event consumer
- [ ] Publish a message to the event topic
- [ ] Message received is first stored in the database, then sent to the consumer
- [X] Publish a message to the event topic
- [X] Message received is first stored in the database, then sent to the consumer

Create a catchup mechanism
* The PC/PR detects that it has gaps in the event sequence
* As a new event is received, the CHWM is updated if the CHWM is idn-1
* If the CHWM is not idn-1, the PC/PR will trigger the catchup mechanism instead of the processor
* The catchup mechanism will request a batch of messages from the server from (CHWM+1) to the min idn greater than the CHWM
* The catchup mechanism fills in the gap, looks for contiguous values up to the next gap and updates the CHWM.
* The catchup mechanism looks for the next gap (CHWM+1 upwards) and repeats until there are no gaps
* The catchup mechanism restarts the processor
* Request a batch of messages from the server
* Write each message to the consumer
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Inaccurate description of catchup mechanism

The catchup mechanism writes messages to the database, not directly to the consumer. The consumer then reads from the database.

* Stop when the catchup mechanism detects that it is overwriting the live messages
- [ ] Request a batch of messages from the server (starting from?)
- [ ] Write each message to the consumer
- [ ] Stop when the catchup mechanism detects that it is overwriting the live messages

Create a processor
- [ ] Verifies that there are no gaps in the event sequence (check the sequence for the earliest unprocessed event until this one)
- [ ] Verifies that there are no earlier unprocessed events for the same subject

Create a processor
- [ ] ?

DB Debezium DONE
LC Local Consumer DONE
RC Remote Consumer
PC Persistent Consumer DONE
PR Processor
BF Business Function Ongoing
CC Catchup Client
CS Catchup Server
CHWM contiguous high water mark

Local constant consumption
```mermaid
graph LR;
Debezium-->PC;
PC-->PR;
PR-->BF;
```

Local constant consumption with catchup
```mermaid
graph LR;
Debezium-->PC;
PC-->PR;
PR-->BF;
PR-->CC;
CC-->CS;
CC-->PR;
CS-->CC;
```

Remote constant consumption
```mermaid
graph LR;
Debezium-->LC;
LC-->RC;
RC-->PC;
PC-->PR;
PR-->BF;
```


Remote constant consumption with catchup
```mermaid
graph LR;
Debezium-->LC;
LC-->RC;
RC-->PC;
PC-->PR;
PR-->BF;
PR-->CC;
CC-->CS;
CC-->PR;
CS-->CC;
```
70 changes: 70 additions & 0 deletions src/main/java/com/p14n/postevent/CatchupServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.p14n.postevent;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CatchupServer {
private static final Logger LOGGER = Logger.getLogger(CatchupServer.class.getName());
private final String topic;
private final DataSource dataSource;

public CatchupServer(String topic, DataSource dataSource) {
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty");
}
this.topic = topic;
this.dataSource = dataSource;
}

public List<Event> fetchEvents(long start, long end, int maxResults) {
if (start > end) {
throw new IllegalArgumentException("Start value must be less than or equal to end value");
}
if (maxResults <= 0) {
throw new IllegalArgumentException("Max results must be greater than zero");
}

List<Event> events = new ArrayList<>();
String sql = String.format(
"SELECT * FROM postevent.%s WHERE idn BETWEEN ? AND ? ORDER BY idn LIMIT ?",
topic);

try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {

stmt.setLong(1, start);
stmt.setLong(2, end);
stmt.setInt(3, maxResults);

try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
Event event = new Event(
rs.getString("id"),
rs.getString("source"),
rs.getString("type"),
rs.getString("datacontenttype"),
rs.getString("dataschema"),
rs.getString("subject"),
rs.getBytes("data"));
events.add(event);
}
}

LOGGER.info(String.format("Fetched %d events from topic %s between %d and %d",
events.size(), topic, start, end));

return events;

} catch (SQLException e) {
LOGGER.log(Level.SEVERE, "Error fetching events from database", e);
throw new RuntimeException("Failed to fetch events", e);
}
}
}
70 changes: 49 additions & 21 deletions src/main/java/com/p14n/postevent/DatabaseSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,77 @@ public DatabaseSetup(String jdbcUrl, String username, String password) {
this.password = password;
}

public void createSchemaIfNotExists() {
public DatabaseSetup createSchemaIfNotExists() {
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
Statement stmt = conn.createStatement()) {

String sql = "CREATE SCHEMA IF NOT EXISTS postevent";
stmt.execute(sql);
LOGGER.info("Schema creation completed successfully");

} catch (SQLException e) {
LOGGER.log(Level.SEVERE, "Error creating schema", e);
throw new RuntimeException("Failed to create schema", e);
}
return this;
}

public void createTableIfNotExists(String topic) {
public DatabaseSetup createTableIfNotExists(String topic) {
Comment thread
p14n marked this conversation as resolved.
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty");
}

try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
Statement stmt = conn.createStatement()) {

String sql = String.format("""
CREATE TABLE IF NOT EXISTS postevent.%s (
idn bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id VARCHAR(255) NOT NULL,
source VARCHAR(1024) NOT NULL,
type VARCHAR(255) NOT NULL,
datacontenttype VARCHAR(255),
dataschema VARCHAR(255),
subject VARCHAR(255),
data bytea,
time TIMESTAMP WITH TIME ZONE default current_timestamp,
UNIQUE (id, source)
)""", topic);
CREATE TABLE IF NOT EXISTS postevent.%s (
idn bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id VARCHAR(255) NOT NULL,
source VARCHAR(1024) NOT NULL,
type VARCHAR(255) NOT NULL,
datacontenttype VARCHAR(255),
dataschema VARCHAR(255),
subject VARCHAR(255),
data bytea,
time TIMESTAMP WITH TIME ZONE default current_timestamp,
UNIQUE (id, source)
)""", topic);

stmt.execute(sql);
LOGGER.info("Table creation completed successfully for topic: " + topic);

} catch (SQLException e) {
LOGGER.log(Level.SEVERE, "Error creating table for topic: " + topic, e);
throw new RuntimeException("Failed to create table", e);
}
return this;
}

public DatabaseSetup createMessagesTableIfNotExists() {
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {

String sql = """
CREATE TABLE IF NOT EXISTS postevent.messages (
idn bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id VARCHAR(255),
source VARCHAR(1024),
datacontenttype VARCHAR(255),
dataschema VARCHAR(255),
subject VARCHAR(255),
data bytea,
time TIMESTAMP WITH TIME ZONE default current_timestamp
)""";

stmt.execute(sql);
LOGGER.info("Messages table creation completed successfully");

} catch (SQLException e) {
LOGGER.log(Level.SEVERE, "Error creating messages table", e);
throw new RuntimeException("Failed to create messages table", e);
}
return this;
}

private Connection getConnection() throws SQLException {
Expand Down
Loading