Skip to content

Commit 8dde38d

Browse files
authored
Implement revert() in SQL (#1582)
* start sql revert * clean * fix * chore: changeset * simplify count * fix: reverted count * split revert tests * Update slow-donkeys-love.md
1 parent 19f8aab commit 8dde38d

File tree

3 files changed

+122
-65
lines changed

3 files changed

+122
-65
lines changed

.changeset/slow-donkeys-love.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"ponder": patch
3+
---
4+
5+
Fixed a bug that would cause crash recovery and reorg handling to error for some apps.

packages/core/src/database/index.test.ts

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,14 +719,21 @@ test("revert()", async (context) => {
719719
.insert(account)
720720
.values({ address: "0x0000000000000000000000000000000000000001" });
721721

722+
await database.complete({
723+
checkpoint: createCheckpoint({ chainId: 1n, blockNumber: 10n }),
724+
db: database.qb.drizzle,
725+
});
726+
727+
await indexingStore.delete(account, { address: zeroAddress });
728+
722729
await database.complete({
723730
checkpoint: createCheckpoint({ chainId: 1n, blockNumber: 11n }),
724731
db: database.qb.drizzle,
725732
});
726733

727734
await database.qb.drizzle.transaction(async (tx) => {
728735
await database.revert({
729-
checkpoint: createCheckpoint({ chainId: 1n, blockNumber: 10n }),
736+
checkpoint: createCheckpoint({ chainId: 1n, blockNumber: 9n }),
730737
tx,
731738
});
732739
});
@@ -739,6 +746,72 @@ test("revert()", async (context) => {
739746
await context.common.shutdown.kill();
740747
});
741748

749+
test("revert() with composite primary key", async (context) => {
750+
const test = onchainTable(
751+
"Test",
752+
(p) => ({
753+
a: p.integer("A").notNull(),
754+
b: p.integer("B").notNull(),
755+
c: p.integer("C"),
756+
}),
757+
(table) => ({
758+
pk: primaryKey({ columns: [table.a, table.b] }),
759+
}),
760+
);
761+
762+
const database = await createDatabase({
763+
common: context.common,
764+
namespace: "public",
765+
preBuild: {
766+
databaseConfig: context.databaseConfig,
767+
},
768+
schemaBuild: {
769+
schema: { test },
770+
statements: buildSchema({ schema: { test } }).statements,
771+
},
772+
});
773+
774+
await database.migrate({ buildId: "abc" });
775+
776+
// setup tables, reorg tables, and metadata checkpoint
777+
778+
await database.createTriggers();
779+
780+
const indexingStore = createRealtimeIndexingStore({
781+
common: context.common,
782+
schemaBuild: { schema: { test } },
783+
database,
784+
});
785+
786+
await indexingStore.insert(test).values({ a: 1, b: 1 });
787+
788+
await database.complete({
789+
checkpoint: createCheckpoint({ chainId: 1n, blockNumber: 11n }),
790+
db: database.qb.drizzle,
791+
});
792+
793+
await indexingStore.update(test, { a: 1, b: 1 }).set({ c: 1 });
794+
795+
await database.complete({
796+
checkpoint: createCheckpoint({ chainId: 1n, blockNumber: 12n }),
797+
db: database.qb.drizzle,
798+
});
799+
800+
await database.qb.drizzle.transaction(async (tx) => {
801+
await database.revert({
802+
checkpoint: createCheckpoint({ chainId: 1n, blockNumber: 11n }),
803+
tx,
804+
});
805+
});
806+
807+
const rows = await database.qb.drizzle.select().from(test);
808+
809+
expect(rows).toHaveLength(1);
810+
expect(rows[0]).toStrictEqual({ a: 1, b: 1, c: null });
811+
812+
await context.common.shutdown.kill();
813+
});
814+
742815
test("getStatus() empty", async (context) => {
743816
const database = await createDatabase({
744817
common: context.common,

packages/core/src/database/index.ts

Lines changed: 43 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import { wait } from "@/utils/wait.js";
3030
import type { PGlite } from "@electric-sql/pglite";
3131
import {
3232
type TableConfig,
33-
and,
3433
eq,
3534
getTableColumns,
3635
getTableName,
@@ -1289,77 +1288,57 @@ FOR EACH ROW EXECUTE FUNCTION "${namespace}".${getTableNames(table).triggerFn};
12891288
});
12901289
},
12911290
async revert({ checkpoint, tx }) {
1292-
await this.wrap({ method: "revert", includeTraceLogs: true }, () =>
1291+
await this.record({ method: "revert", includeTraceLogs: true }, () =>
12931292
Promise.all(
12941293
tables.map(async (table) => {
12951294
const primaryKeyColumns = getPrimaryKeyColumns(table);
12961295

1297-
// @ts-ignore
1298-
const { rows } = await tx.execute<Schema>(
1299-
sql.raw(
1300-
`DELETE FROM "${namespace}"."${getTableName(getReorgTable(table))}" WHERE checkpoint > '${checkpoint}' RETURNING *`,
1301-
),
1302-
);
1303-
1304-
const reversed = rows.sort(
1305-
// @ts-ignore
1306-
(a, b) => b.operation_id - a.operation_id,
1296+
const result = await tx.execute(
1297+
sql.raw(`
1298+
WITH reverted1 AS (
1299+
DELETE FROM "${namespace}"."${getTableName(getReorgTable(table))}"
1300+
WHERE checkpoint > '${checkpoint}' RETURNING *
1301+
), reverted2 AS (
1302+
SELECT ${primaryKeyColumns.map(({ sql }) => `"${sql}"`).join(", ")}, MIN(operation_id) AS operation_id FROM reverted1
1303+
GROUP BY ${primaryKeyColumns.map(({ sql }) => `"${sql}"`).join(", ")}
1304+
), reverted3 AS (
1305+
SELECT ${Object.values(getTableColumns(table))
1306+
.map((column) => `reverted1."${getColumnCasing(column, "snake_case")}"`)
1307+
.join(", ")}, reverted1.operation FROM reverted2
1308+
INNER JOIN reverted1
1309+
ON ${primaryKeyColumns.map(({ sql }) => `reverted2."${sql}" = reverted1."${sql}"`).join("AND ")}
1310+
AND reverted2.operation_id = reverted1.operation_id
1311+
), inserted AS (
1312+
DELETE FROM "${namespace}"."${getTableName(table)}" as t
1313+
WHERE EXISTS (
1314+
SELECT * FROM reverted3
1315+
WHERE ${primaryKeyColumns.map(({ sql }) => `t."${sql}" = reverted3."${sql}"`).join("AND ")}
1316+
AND OPERATION = 0
1317+
)
1318+
RETURNING *
1319+
), updated_or_deleted AS (
1320+
INSERT INTO "${namespace}"."${getTableName(table)}"
1321+
SELECT ${Object.values(getTableColumns(table))
1322+
.map((column) => `"${getColumnCasing(column, "snake_case")}"`)
1323+
.join(", ")} FROM reverted3
1324+
WHERE operation = 1 OR operation = 2
1325+
ON CONFLICT (${primaryKeyColumns.map(({ sql }) => `"${sql}"`).join(", ")})
1326+
DO UPDATE SET
1327+
${Object.values(getTableColumns(table))
1328+
.map(
1329+
(column) =>
1330+
`"${getColumnCasing(column, "snake_case")}" = EXCLUDED."${getColumnCasing(column, "snake_case")}"`,
1331+
)
1332+
.join(", ")}
1333+
RETURNING *
1334+
) SELECT COUNT(*) FROM reverted1 as count;
1335+
`),
13071336
);
13081337

1309-
// undo operation
1310-
for (const log of reversed) {
1311-
if (log.operation === 0) {
1312-
// create
1313-
1314-
await tx.delete(table).where(
1315-
and(
1316-
...primaryKeyColumns.map(({ js, sql }) =>
1317-
// @ts-ignore
1318-
eq(table[js]!, log[sql]),
1319-
),
1320-
),
1321-
);
1322-
} else if (log.operation === 1) {
1323-
// update
1324-
1325-
// @ts-ignore
1326-
log.operation_id = undefined;
1327-
// @ts-ignore
1328-
log.checkpoint = undefined;
1329-
// @ts-ignore
1330-
log.operation = undefined;
1331-
await tx
1332-
.update(table)
1333-
.set(log)
1334-
.where(
1335-
and(
1336-
...primaryKeyColumns.map(({ js, sql }) =>
1337-
// @ts-ignore
1338-
eq(table[js]!, log[sql]),
1339-
),
1340-
),
1341-
);
1342-
} else {
1343-
// delete
1344-
1345-
// @ts-ignore
1346-
log.operation_id = undefined;
1347-
// @ts-ignore
1348-
log.checkpoint = undefined;
1349-
// @ts-ignore
1350-
log.operation = undefined;
1351-
await tx
1352-
.insert(table)
1353-
.values(log)
1354-
.onConflictDoNothing({
1355-
target: primaryKeyColumns.map(({ js }) => table[js]!),
1356-
});
1357-
}
1358-
}
1359-
13601338
common.logger.info({
13611339
service: "database",
1362-
msg: `Reverted ${rows.length} unfinalized operations from '${getTableName(table)}'`,
1340+
// @ts-ignore
1341+
msg: `Reverted ${result.rows[0]!.count} unfinalized operations from '${getTableName(table)}'`,
13631342
});
13641343
}),
13651344
),

0 commit comments

Comments
 (0)