Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions src/main/java/io/ringbroker/broker/ingress/Ingress.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.ringbroker.broker.ingress;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import io.ringbroker.core.ring.RingBuffer;
import io.ringbroker.ledger.orchestrator.LedgerOrchestrator;
import io.ringbroker.registry.TopicRegistry;
Expand Down Expand Up @@ -90,18 +93,22 @@ public void publish(final String topic, final byte[] payload) {
*/
public void publish(final String topic, final int retries, final byte[] rawPayload) {
// 1) validate base topic
if (!registry.contains(topic)) throw new IllegalArgumentException("topic not registered: " + topic);
if (!registry.contains(topic)) {
throw new IllegalArgumentException("topic not registered: " + topic);
}

// 2) DLQ routing
// 2) DLQ routing based on retry count
String outTopic = retries > MAX_RETRIES ? topic + ".DLQ" : topic;
if (!registry.contains(outTopic)) throw new IllegalArgumentException("topic not registered: " + outTopic);
if (!registry.contains(outTopic)) {
throw new IllegalArgumentException("topic not registered: " + outTopic);
}

// 3) schema-validate
try {
DynamicMessage.parseFrom(registry.descriptor(outTopic), rawPayload);
} catch (final Exception ex) {
// 3) schema‐validate without throwing out
if (!isWireValid(rawPayload, registry.descriptor(outTopic))) {
outTopic = topic + ".DLQ";
if (!registry.contains(outTopic)) throw new IllegalArgumentException("DLQ not registered: " + outTopic);
if (!registry.contains(outTopic)) {
throw new IllegalArgumentException("DLQ not registered: " + outTopic);
}
}

// 4) enqueue without allocation; spin if queue is momentarily full
Expand Down Expand Up @@ -158,6 +165,29 @@ public void close() throws IOException {
this.segments.writable().close();
}

/**
* Returns true if payload is a well‐formed instance of the given descriptor’s message
* (i.e. no truncated stream, bad varint, negative length, etc.). Never throws.
*/
private static boolean isWireValid(byte[] raw, final Descriptors.Descriptor descriptor) {
CodedInputStream in = CodedInputStream.newInstance(raw);
try {
// Try to merge into a DynamicMessage.Builder; this will throw on any wire‐format error.
DynamicMessage.newBuilder(descriptor)
.mergeFrom(in)
.buildPartial();
// Also ensure we consumed exactly all bytes
return in.isAtEnd();
} catch (InvalidProtocolBufferException e) {
return false;
}

catch (IOException exception) {
return false;
}
}


/*
* Allocation‑free bounded lock‑free multi‑producer / multi‑consumer queue
* (heavily simplified Vyukov algorithm).
Expand Down
52 changes: 21 additions & 31 deletions src/main/java/test/SanityCheckMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.ringbroker.offset.InMemoryOffsetStore;
import io.ringbroker.proto.test.EventsProto;
import io.ringbroker.registry.TopicRegistry;
import lombok.extern.slf4j.Slf4j;

import java.io.DataInputStream;
import java.io.EOFException;
Expand All @@ -15,7 +16,10 @@
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,6 +32,7 @@
* – verify the in-memory subscription sees every record
* – replay every segment on disk and confirm the same IDs partition-by-partition
*/
@Slf4j
public class SanityCheckMain {

/* ---------- test parameters ---------- */
Expand All @@ -42,11 +47,7 @@ public class SanityCheckMain {
private static final String GROUP = "sanity-latch";
private static final Path DATA = Paths.get("data");

/* ---------- main ---------- */

public static void main(String[] args) throws Exception {

/* 0) clean data dir ------------------------------------------------- */
if (Files.exists(DATA)) {
try (Stream<Path> w = Files.walk(DATA)) {
w.sorted(Comparator.reverseOrder())
Expand All @@ -55,7 +56,6 @@ public static void main(String[] args) throws Exception {
}
Files.createDirectories(DATA);

/* 1) registry + broker --------------------------------------------- */
TopicRegistry registry = new TopicRegistry.Builder()
.topic(TOPIC, EventsProto.OrderCreated.getDescriptor())
.build();
Expand All @@ -76,17 +76,14 @@ public static void main(String[] args) throws Exception {
new InMemoryOffsetStore()
);

/* expected IDs per partition --------------------------------------- */
RoundRobinPartitioner psel = new RoundRobinPartitioner();
Map<Integer, Set<String>> expected = new HashMap<>();
for (int p = 0; p < PARTITIONS; p++) expected.put(p, new HashSet<>());

/* latch for subscriber --------------------------------------------- */
CountDownLatch latch = new CountDownLatch(TOTAL_MSGS);
ingress.subscribeTopic(TOPIC, GROUP, (seq, payload) -> latch.countDown());

/* 2) publish ------------------------------------------------------- */
System.out.println("=== publishing " + TOTAL_MSGS + " messages ===");
log.info("=== publishing {} messages ===", TOTAL_MSGS);
for (int i = 0; i < TOTAL_MSGS; i++) {
String id = "msg-" + i;
byte[] key = id.getBytes(StandardCharsets.UTF_8);
Expand All @@ -102,21 +99,15 @@ public static void main(String[] args) throws Exception {
ingress.publish(TOPIC, key, 0, evt.toByteArray());
}

/* writers push tail batches immediately (patch already in place) */

/* 3) wait for subscriber ------------------------------------------- */
System.out.println("waiting for in-memory delivery…");
log.info("waiting for in-memory delivery…");
if (!latch.await(30, TimeUnit.SECONDS)) {
System.err.printf("❌ saw only %d/%d messages%n",
TOTAL_MSGS - latch.getCount(), TOTAL_MSGS);
log.error("saw only {}/{} messages", TOTAL_MSGS - latch.getCount(), TOTAL_MSGS);
System.exit(1);
}
System.out.println("✅ all messages delivered in-memory");
log.info("all messages delivered in-memory");

/* 4) shutdown (forces fsync) --------------------------------------- */
ingress.shutdown();

/* 5) replay segments per partition --------------------------------- */
Map<Integer, Set<String>> seen = new HashMap<>();
for (int p = 0; p < PARTITIONS; p++) {
seen.put(p, new HashSet<>());
Expand All @@ -130,27 +121,26 @@ public static void main(String[] args) throws Exception {
}
}

/* 6) compare ------------------------------------------------------- */
boolean pass = true;
System.out.println("\n=== partition results ===");
log.info("\n=== partition results ===");
for (int p = 0; p < PARTITIONS; p++) {
Set<String> exp = expected.get(p), got = seen.get(p);
System.out.printf("partition-%2d: exp=%3d, got=%3d%n", p, exp.size(), got.size());
log.info("partition-{}: exp={}, got={}", p, exp.size(), got.size());
if (!exp.equals(got)) {
log.error(" missing: {}", diff(exp, got));
log.error(" extra : {}", diff(got, exp));
pass = false;
System.err.println(" missing: " + diff(exp, got));
System.err.println(" extra : " + diff(got, exp));
}
}

System.out.println(pass
? "\n✅ SANITY-PASS: all routing + writes succeeded."
: "\n❌ SANITY-FAIL: see mismatches above.");
if (!pass) System.exit(1);
if (pass) {
log.info("\nSANITY-PASS: all routing + writes succeeded.");
} else {
log.error("\nSANITY-FAIL: see mismatches above.");
System.exit(1);
}
}

/* ---------- helpers --------------------------------------------------- */

/** read a 32-bit little-endian int from the stream */
private static int readIntLE(DataInputStream in) throws IOException {
int b0 = in.readUnsignedByte();
Expand Down Expand Up @@ -181,7 +171,7 @@ private static void parseSegmentLittleEndian(Path seg, Set<String> outSet) throw
CRC32 crc = new CRC32();
crc.update(buf, 0, len);
if ((int) crc.getValue() != storedCrc) {
System.err.printf("CRC mismatch in %s%n", seg.getFileName());
log.error("CRC mismatch in {}", seg.getFileName());
break;
}

Expand Down