diff --git a/.changeset/fts-no-duplicate-rows-on-backfill.md b/.changeset/fts-no-duplicate-rows-on-backfill.md new file mode 100644 index 0000000..551a21c --- /dev/null +++ b/.changeset/fts-no-duplicate-rows-on-backfill.md @@ -0,0 +1,17 @@ +--- +"@atmo-dev/contrail-appview": patch +--- + +Stop accumulating duplicate FTS rows when records are re-applied during backfill. + +The FTS-sync path only deleted an existing FTS row before inserting when the +record was already in `existingMap`. Backfill runs with `skipReplayDetection`, +which leaves `existingMap` empty, so every re-applied record looked brand-new +and appended another FTS row. The FTS virtual table has no uniqueness +constraint, so these accumulated, and the search JOIN fanned each event out into +one result row per duplicate. Make the delete-then-insert unconditional so FTS +sync is idempotent regardless of replay detection. + +The unconditional delete also evicts a stale FTS row when an update clears all +searchable fields: in that case there is no content to re-insert, but the prior +row must still be removed so old terms stop matching. diff --git a/packages/contrail-appview/src/core/db/records.ts b/packages/contrail-appview/src/core/db/records.ts index f042180..933df81 100644 --- a/packages/contrail-appview/src/core/db/records.ts +++ b/packages/contrail-appview/src/core/db/records.ts @@ -161,8 +161,7 @@ function buildBatchCountStatements( function buildFtsStatements( db: Database, event: IngestEvent, - config: ContrailConfig, - existingMap: Map + config: ContrailConfig ): Statement[] { // PostgreSQL: tsvector generated column is auto-maintained, no manual FTS sync if (getDialect(db).ftsStrategy === "generated-column") return []; @@ -184,16 +183,21 @@ function buildFtsStatements( const record = event.record ? JSON.parse(event.record) : null; if (!record) return []; - const content = buildFtsContent(record, fields); - if (!content) return []; + // Always delete first so FTS sync is idempotent. The FTS virtual table has no + // uniqueness constraint, so a bare insert appends a duplicate row when one + // already exists. existingMap is unreliable here: backfill runs with + // skipReplayDetection, leaving it empty, so a re-applied record would look new + // and accumulate duplicate rows that fan out the search JOIN. The delete is + // unconditional so it also evicts a stale row when an update clears all + // searchable fields (content is null); only the re-insert is gated on content. + stmts.push(db.prepare(`DELETE FROM ${table} WHERE uri = ?`).bind(event.uri)); - // Only delete existing FTS row if this is an update (record already existed) - if (existingMap.has(event.uri)) { - stmts.push(db.prepare(`DELETE FROM ${table} WHERE uri = ?`).bind(event.uri)); + const content = buildFtsContent(record, fields); + if (content) { + stmts.push( + db.prepare(`INSERT INTO ${table} (uri, content) VALUES (?, ?)`).bind(event.uri, content) + ); } - stmts.push( - db.prepare(`INSERT INTO ${table} (uri, content) VALUES (?, ?)`).bind(event.uri, content) - ); } return stmts; @@ -644,7 +648,7 @@ export async function applyEvents( if (!isReplay && !options?.skipFeedFanout) { batch.push(...buildFeedStatements(db, e, config, existingRecordStrings)); } - batch.push(...buildFtsStatements(db, e, config, existingMap)); + batch.push(...buildFtsStatements(db, e, config)); } } diff --git a/packages/contrail/tests/search.test.ts b/packages/contrail/tests/search.test.ts index c0f91ce..66afe9e 100644 --- a/packages/contrail/tests/search.test.ts +++ b/packages/contrail/tests/search.test.ts @@ -159,6 +159,40 @@ describe.skipIf(!hasFts)("FTS sync", () => { ], SEARCH_CONFIG); expect((await queryRecords(db, SEARCH_CONFIG, { collection, search: "Deletable" })).records).toHaveLength(0); }); + + it("does not duplicate FTS rows when the same record is re-applied during backfill", async () => { + // Backfill passes call applyEvents with skipReplayDetection: true, which leaves + // existingMap empty so every record looks brand-new. Re-backfilling the same + // record must not append a second FTS row; otherwise the search JOIN fans out + // and returns the event more than once (which crashes keyed lists downstream). + const event = makeEvent({ + uri: "at://did:plc:a/community.lexicon.calendar.event/1", + collection, + rkey: "1", + record: { name: "Backfilled Meetup", mode: "online", description: "test" }, + time_us: 1000, + }); + await applyEvents(db, [event], SEARCH_CONFIG, { skipReplayDetection: true }); + await applyEvents(db, [event], SEARCH_CONFIG, { skipReplayDetection: true }); + + const result = await queryRecords(db, SEARCH_CONFIG, { collection, search: "Meetup" }); + expect(result.records).toHaveLength(1); + }); + + it("evicts the stale FTS row when an update clears all searchable fields", async () => { + // The delete must run unconditionally. If an update leaves every searchable + // field empty, buildFtsContent returns null and there is nothing to re-insert, + // but the prior FTS row must still be removed so old terms stop matching. + await applyEvents(db, [ + makeEvent({ uri: "at://did:plc:a/community.lexicon.calendar.event/1", collection, rkey: "1", record: { name: "Searchable Title", mode: "online", description: "find me" }, time_us: 1000 }), + ], SEARCH_CONFIG); + expect((await queryRecords(db, SEARCH_CONFIG, { collection, search: "Searchable" })).records).toHaveLength(1); + + await applyEvents(db, [ + makeEvent({ uri: "at://did:plc:a/community.lexicon.calendar.event/1", collection, rkey: "1", record: { startsAt: "2026-01-01T00:00:00Z" }, operation: "update", time_us: 2000 }), + ], SEARCH_CONFIG); + expect((await queryRecords(db, SEARCH_CONFIG, { collection, search: "Searchable" })).records).toHaveLength(0); + }); }); describe.skipIf(!hasFts)("explicit searchable fields", () => {