Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/fts-no-duplicate-rows-on-backfill.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
"@atmo-dev/contrail-appview": patch
---

Stop accumulating duplicate FTS rows when records are re-applied during backfill.

The FTS-sync path only deleted an existing FTS row before inserting when the
record was already in `existingMap`. Backfill runs with `skipReplayDetection`,
which leaves `existingMap` empty, so every re-applied record looked brand-new
and appended another FTS row. The FTS virtual table has no uniqueness
constraint, so these accumulated, and the search JOIN fanned each event out into
one result row per duplicate. Make the delete-then-insert unconditional so FTS
sync is idempotent regardless of replay detection.

The unconditional delete also evicts a stale FTS row when an update clears all
searchable fields: in that case there is no content to re-insert, but the prior
row must still be removed so old terms stop matching.
26 changes: 15 additions & 11 deletions packages/contrail-appview/src/core/db/records.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ function buildBatchCountStatements(
function buildFtsStatements(
db: Database,
event: IngestEvent,
config: ContrailConfig,
existingMap: Map<string, ExistingRecordInfo>
config: ContrailConfig
): Statement[] {
// PostgreSQL: tsvector generated column is auto-maintained, no manual FTS sync
if (getDialect(db).ftsStrategy === "generated-column") return [];
Expand All @@ -184,16 +183,21 @@ function buildFtsStatements(
const record = event.record ? JSON.parse(event.record) : null;
if (!record) return [];

const content = buildFtsContent(record, fields);
if (!content) return [];
// Always delete first so FTS sync is idempotent. The FTS virtual table has no
// uniqueness constraint, so a bare insert appends a duplicate row when one
// already exists. existingMap is unreliable here: backfill runs with
// skipReplayDetection, leaving it empty, so a re-applied record would look new
// and accumulate duplicate rows that fan out the search JOIN. The delete is
// unconditional so it also evicts a stale row when an update clears all
// searchable fields (content is null); only the re-insert is gated on content.
stmts.push(db.prepare(`DELETE FROM ${table} WHERE uri = ?`).bind(event.uri));

// Only delete existing FTS row if this is an update (record already existed)
if (existingMap.has(event.uri)) {
stmts.push(db.prepare(`DELETE FROM ${table} WHERE uri = ?`).bind(event.uri));
const content = buildFtsContent(record, fields);
if (content) {
stmts.push(
db.prepare(`INSERT INTO ${table} (uri, content) VALUES (?, ?)`).bind(event.uri, content)
);
}
stmts.push(
db.prepare(`INSERT INTO ${table} (uri, content) VALUES (?, ?)`).bind(event.uri, content)
);
}

return stmts;
Expand Down Expand Up @@ -644,7 +648,7 @@ export async function applyEvents(
if (!isReplay && !options?.skipFeedFanout) {
batch.push(...buildFeedStatements(db, e, config, existingRecordStrings));
}
batch.push(...buildFtsStatements(db, e, config, existingMap));
batch.push(...buildFtsStatements(db, e, config));
}
}

Expand Down
34 changes: 34 additions & 0 deletions packages/contrail/tests/search.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,40 @@ describe.skipIf(!hasFts)("FTS sync", () => {
], SEARCH_CONFIG);
expect((await queryRecords(db, SEARCH_CONFIG, { collection, search: "Deletable" })).records).toHaveLength(0);
});

it("does not duplicate FTS rows when the same record is re-applied during backfill", async () => {
// Backfill passes call applyEvents with skipReplayDetection: true, which leaves
// existingMap empty so every record looks brand-new. Re-backfilling the same
// record must not append a second FTS row; otherwise the search JOIN fans out
// and returns the event more than once (which crashes keyed lists downstream).
const event = makeEvent({
uri: "at://did:plc:a/community.lexicon.calendar.event/1",
collection,
rkey: "1",
record: { name: "Backfilled Meetup", mode: "online", description: "test" },
time_us: 1000,
});
await applyEvents(db, [event], SEARCH_CONFIG, { skipReplayDetection: true });
await applyEvents(db, [event], SEARCH_CONFIG, { skipReplayDetection: true });

const result = await queryRecords(db, SEARCH_CONFIG, { collection, search: "Meetup" });
expect(result.records).toHaveLength(1);
});

it("evicts the stale FTS row when an update clears all searchable fields", async () => {
// The delete must run unconditionally. If an update leaves every searchable
// field empty, buildFtsContent returns null and there is nothing to re-insert,
// but the prior FTS row must still be removed so old terms stop matching.
await applyEvents(db, [
makeEvent({ uri: "at://did:plc:a/community.lexicon.calendar.event/1", collection, rkey: "1", record: { name: "Searchable Title", mode: "online", description: "find me" }, time_us: 1000 }),
], SEARCH_CONFIG);
expect((await queryRecords(db, SEARCH_CONFIG, { collection, search: "Searchable" })).records).toHaveLength(1);

await applyEvents(db, [
makeEvent({ uri: "at://did:plc:a/community.lexicon.calendar.event/1", collection, rkey: "1", record: { startsAt: "2026-01-01T00:00:00Z" }, operation: "update", time_us: 2000 }),
], SEARCH_CONFIG);
expect((await queryRecords(db, SEARCH_CONFIG, { collection, search: "Searchable" })).records).toHaveLength(0);
});
});

describe.skipIf(!hasFts)("explicit searchable fields", () => {
Expand Down
Loading