diff --git a/.changeset/dirty-pugs-attack.md b/.changeset/dirty-pugs-attack.md new file mode 100644 index 000000000..f0fb7ba57 --- /dev/null +++ b/.changeset/dirty-pugs-attack.md @@ -0,0 +1,13 @@ +--- +"@emdash-cms/cloudflare": minor +--- + +New experimental `coalesce` option for the `d1()` adapter, for much faster uncached page loads: + +```ts +emdash({ + database: d1({ binding: "DB", session: "auto", coalesce: true }), +}); +``` + +When enabled, read queries that a page issues at the same time are sent to D1 as a single round trip instead of one at a time. A page that runs half a dozen queries — settings, menus, the entry, related posts — pays for one trip to the database instead of six, which can cut uncached render time by more than half. Each query still gets its own results and its own errors, and writes are unaffected. Requires `session` to be enabled; off by default while experimental. diff --git a/infra/blog-demo/astro.config.mjs b/infra/blog-demo/astro.config.mjs index 7476714bc..6ac0cb2d3 100644 --- a/infra/blog-demo/astro.config.mjs +++ b/infra/blog-demo/astro.config.mjs @@ -16,7 +16,7 @@ export default defineConfig({ integrations: [ react(), emdash({ - database: d1({ binding: "DB", session: "auto" }), + database: d1({ binding: "DB", session: "auto", coalesce: true }), storage: r2({ binding: "MEDIA" }), plugins: [formsPlugin()], sandboxed: [webhookNotifier], diff --git a/infra/cache-demo/astro.config.mjs b/infra/cache-demo/astro.config.mjs index 3e2ffa53a..623b7ab32 100644 --- a/infra/cache-demo/astro.config.mjs +++ b/infra/cache-demo/astro.config.mjs @@ -22,7 +22,7 @@ export default defineConfig({ integrations: [ react(), emdash({ - database: d1({ binding: "DB", session: "auto" }), + database: d1({ binding: "DB", session: "auto", coalesce: true }), storage: r2({ binding: "MEDIA" }), plugins: [formsPlugin()], sandboxed: [webhookNotifier], diff --git a/packages/cloudflare/src/db/coalescing-d1.ts b/packages/cloudflare/src/db/coalescing-d1.ts new file mode 100644 index 000000000..5e1f54633 --- /dev/null +++ b/packages/cloudflare/src/db/coalescing-d1.ts @@ -0,0 +1,276 @@ +/** + * Experimental coalescing D1 driver. + * + * Buffers SELECT queries issued in the same event-loop turn and executes + * them as a single D1 `batch()` call (one HTTP round trip) instead of N + * fully-serialized round trips. Production pages routinely issue 5-7 + * serialized queries at 15-40ms each; batching collapses them into one. + * + * Only used for the per-request session Kysely (see createRequestScopedDb + * in d1.ts). The shared singleton must never coalesce: concurrent requests + * would share a buffer and one request's queries could be batched into + * another request's round trip. + */ + +import { + type CompiledQuery, + type DatabaseConnection, + type Driver, + type QueryResult, + SqliteAdapter, +} from "kysely"; +import type { D1DialectConfig } from "kysely-d1"; + +import { EmDashD1Dialect } from "./d1-dialect.js"; + +/** + * Statements safe to coalesce: plain SELECTs. Deliberately conservative — + * `WITH` is excluded because SQLite allows CTEs on writes + * (`WITH ... INSERT/UPDATE/DELETE`), and everything else (insert, update, + * delete, pragma, explain, ...) takes the direct path. + */ +const SELECT_PATTERN = /^select\b/i; + +interface PendingQuery { + statement: D1PreparedStatement; + resolve: (result: QueryResult) => void; + reject: (error: unknown) => void; + /** SQL text, kept for error reporting. */ + sql: string; +} + +/** + * Map a D1 result to a Kysely QueryResult. Mirrors kysely-d1's mapping + * exactly: rows from `results`, numAffectedRows from `meta.changes` as + * BigInt when > 0, insertId from `meta.last_row_id`. + * + * No `success` check: the D1Result type declares `success: true`, i.e. a + * returned result is always a success — D1 rejects the `.all()`/`.batch()` + * promise on failure, which the callers handle (the batch catch-fallback, + * and Kysely's own error path on the direct path). + */ +function mapD1Result(result: D1Result): QueryResult { + if (result.error) { + throw new Error(result.error); + } + const numAffectedRows = result.meta.changes > 0 ? BigInt(result.meta.changes) : undefined; + return { + insertId: + result.meta.last_row_id === undefined || result.meta.last_row_id === null + ? undefined + : BigInt(result.meta.last_row_id), + rows: result.results ?? [], + numAffectedRows, + }; +} + +export class CoalescingD1Connection implements DatabaseConnection { + #database: D1Database; + #buffer: PendingQuery[] = []; + #flushScheduled = false; + /** + * Tail of a promise chain that serializes every physical call against the + * shared D1DatabaseSession (direct-path statements and batch flushes + * alike). See #enqueue. + */ + #opChain: Promise = Promise.resolve(); + + constructor(database: D1Database) { + this.#database = database; + } + + /** + * Run `op` after all previously-enqueued session calls have settled, so + * only one physical call is ever in flight against the shared + * D1DatabaseSession at a time. + * + * The plain SqliteAdapter reports `supportsMultipleConnections: false`, + * which makes Kysely serialize every query behind a connection mutex. We + * override that to `true` so same-turn SELECTs can reach the buffer + * together — but that also removes the mutex for writes and direct-path + * statements. D1 sessions are sequentially consistent and advance a + * bookmark per executed query; overlapping calls on one session could + * interleave that bookmark and persist a stale one at commit(), breaking + * read-your-writes. This chain restores the single-in-flight invariant + * for physical calls while still letting SELECTs coalesce into one batch. + * + * A failed op must not break the chain, so the stored tail swallows the + * outcome; the returned promise still rejects for the caller. + */ + #enqueue(op: () => Promise): Promise { + const run = this.#opChain.then(op, op); + this.#opChain = run.then( + () => undefined, + () => undefined, + ); + return run; + } + + async executeQuery(compiledQuery: CompiledQuery): Promise> { + if (!SELECT_PATTERN.test(compiledQuery.sql.trim())) { + // Non-SELECT: execute on the direct path (kysely-d1's prepare/bind/all + // flow), but through the op chain so it can't overlap an in-flight + // SELECT batch or another write on the shared session. + const statement = this.#database.prepare(compiledQuery.sql).bind(...compiledQuery.parameters); + const result = await this.#enqueue(() => statement.all()); + return mapD1Result(result); + } + + const statement = this.#database.prepare(compiledQuery.sql).bind(...compiledQuery.parameters); + + return new Promise>((resolve, reject) => { + this.#buffer.push({ statement, resolve, reject, sql: compiledQuery.sql }); + this.#scheduleFlush(); + }); + } + + /** + * Schedule a flush of buffered SELECTs unless one is already pending. + * + * setTimeout(0) (macrotask), not queueMicrotask: Kysely awaits internally + * between acquiring the connection and executing each query, so a + * microtask window would close before sibling queries issued in the same + * turn reach the connection. Queries that arrive after the buffer is + * drained (flag already cleared) simply schedule the next window; physical + * ordering against any in-flight call is handled by #enqueue. + */ + #scheduleFlush(): void { + if (this.#flushScheduled) return; + this.#flushScheduled = true; + setTimeout(() => { + void this.#flush(); + }, 0); + } + + async #flush(): Promise { + // Clear the scheduled flag before draining so queries arriving after + // this point schedule a fresh window rather than being stranded. + this.#flushScheduled = false; + const pending = this.#buffer.splice(0, this.#buffer.length); + if (pending.length === 0) return; + + // Serialize the physical batch/all against every other session call. + await this.#enqueue(async () => { + const first = pending[0]; + if (pending.length === 1 && first) { + // A lone query gains nothing from batch(); execute it directly. + try { + first.resolve(mapD1Result(await first.statement.all())); + } catch (error) { + first.reject(error); + } + return; + } + + let results: D1Result[]; + try { + results = await this.#database.batch(pending.map((p) => p.statement)); + } catch { + // D1 batches are atomic: one bad statement rejects the whole call. + // Fall back to executing every buffered statement individually + // (they are all SELECTs, safe to re-run) so innocent queries still + // resolve and only the genuinely failing one rejects with its own + // error. This preserves per-query error semantics. Sequential, in + // issue order: this is an error path where determinism matters + // more than latency, and it avoids piling concurrent retries onto + // a database that just failed. + for (const p of pending) { + try { + p.resolve(mapD1Result(await p.statement.all())); + } catch (error) { + p.reject(error); + } + } + return; + } + + for (let i = 0; i < pending.length; i++) { + const entry = pending[i]; + if (!entry) continue; + const result = results[i]; + if (result) { + try { + entry.resolve(mapD1Result(result)); + } catch (error) { + entry.reject(error); + } + } else { + entry.reject(new Error(`D1 batch() returned no result for statement ${i}: ${entry.sql}`)); + } + } + }); + } + + // eslint-disable-next-line require-yield -- D1 doesn't support streaming (same as kysely-d1) + async *streamQuery(): AsyncIterableIterator> { + throw new Error("D1 Driver does not support streaming"); + } +} + +export class CoalescingD1Driver implements Driver { + #connection: CoalescingD1Connection; + + constructor(database: D1Database) { + // A single shared connection: the whole point is for concurrent queries + // in the same request to land in the same buffer. + this.#connection = new CoalescingD1Connection(database); + } + + async init(): Promise {} + + async acquireConnection(): Promise { + return this.#connection; + } + + async beginTransaction(): Promise { + throw new Error("Transactions are not supported"); + } + + async commitTransaction(): Promise { + throw new Error("Transactions are not supported"); + } + + async rollbackTransaction(): Promise { + throw new Error("Transactions are not supported"); + } + + async releaseConnection(): Promise {} + + async destroy(): Promise {} +} + +/** + * SqliteAdapter reports `supportsMultipleConnections: false`, which makes + * Kysely's RuntimeDriver serialize every query behind a connection mutex + * (acquire → execute → release). Under that mutex a second query never + * reaches the connection until the first resolves, so nothing would ever + * coalesce. Our shared connection is explicitly safe for concurrent + * `executeQuery` calls — that is the whole point — so report `true`. + * Transactions are rejected by the driver regardless. + */ +class CoalescingD1Adapter extends SqliteAdapter { + override get supportsMultipleConnections(): boolean { + return true; + } +} + +/** + * D1 dialect that coalesces same-turn SELECTs into a single `batch()` round + * trip. Keeps EmDash's D1-compatible introspector. + */ +export class CoalescingD1Dialect extends EmDashD1Dialect { + #database: D1Database; + + constructor(config: D1DialectConfig) { + super(config); + this.#database = config.database; + } + + override createAdapter(): SqliteAdapter { + return new CoalescingD1Adapter(); + } + + override createDriver(): Driver { + return new CoalescingD1Driver(this.#database); + } +} diff --git a/packages/cloudflare/src/db/d1-dialect.ts b/packages/cloudflare/src/db/d1-dialect.ts new file mode 100644 index 000000000..0fa4443e1 --- /dev/null +++ b/packages/cloudflare/src/db/d1-dialect.ts @@ -0,0 +1,24 @@ +/** + * Shared D1 Kysely dialect with EmDash's D1-compatible introspector. + * + * Lives in its own module (rather than d1.ts) so the coalescing dialect in + * coalescing-d1.ts can extend it without creating a circular import with + * d1.ts, and without pulling cloudflare:workers into test environments. + */ + +import type { DatabaseIntrospector, Kysely } from "kysely"; +import { D1Dialect } from "kysely-d1"; + +import { D1Introspector } from "./d1-introspector.js"; + +/** + * Custom D1 Dialect that uses our D1-compatible introspector + * + * The default kysely-d1 dialect uses SqliteIntrospector which does a + * cross-join with pragma_table_info() that D1 doesn't allow. + */ +export class EmDashD1Dialect extends D1Dialect { + override createIntrospector(db: Kysely): DatabaseIntrospector { + return new D1Introspector(db); + } +} diff --git a/packages/cloudflare/src/db/d1.ts b/packages/cloudflare/src/db/d1.ts index a8ca49db1..e819f6168 100644 --- a/packages/cloudflare/src/db/d1.ts +++ b/packages/cloudflare/src/db/d1.ts @@ -10,10 +10,10 @@ import { env } from "cloudflare:workers"; import { kyselyLogOption } from "emdash/database/instrumentation"; -import { type DatabaseIntrospector, type Dialect, Kysely } from "kysely"; -import { D1Dialect } from "kysely-d1"; +import { type Dialect, Kysely } from "kysely"; -import { D1Introspector } from "./d1-introspector.js"; +import { CoalescingD1Dialect } from "./coalescing-d1.js"; +import { EmDashD1Dialect } from "./d1-dialect.js"; /** * D1 configuration (runtime type — matches the config-time type in index.ts) @@ -22,10 +22,17 @@ interface D1Config { binding: string; session?: "disabled" | "auto" | "primary-first"; bookmarkCookie?: string; + coalesce?: boolean; } const DEFAULT_BOOKMARK_COOKIE = "__em_d1_bookmark"; +/** + * One-shot guard so the "coalesce opted in but the binding can't do sessions + * at runtime" warning fires once per worker, not on every request. + */ +let warnedCoalesceNoRuntimeSession = false; + /** * D1 bookmarks are opaque, minted by Cloudflare. We don't validate the shape * (a tighter regex risks rejecting a format change and silently degrading @@ -45,18 +52,6 @@ function hasControlChars(value: string): boolean { return false; } -/** - * Custom D1 Dialect that uses our D1-compatible introspector - * - * The default kysely-d1 dialect uses SqliteIntrospector which does a - * cross-join with pragma_table_info() that D1 doesn't allow. - */ -class EmDashD1Dialect extends D1Dialect { - override createIntrospector(db: Kysely): DatabaseIntrospector { - return new D1Introspector(db); - } -} - /** * Create a D1 dialect from config. Used for the singleton Kysely instance * (no session — queries go through the raw binding). @@ -82,6 +77,13 @@ export function createDialect(config: D1Config): Dialect { `Check your wrangler.jsonc configuration:\n\n${example}`, ); } + // Coalescing only applies to the per-request session db; without + // sessions it silently does nothing, which would be a confusing no-op. + if (config.coalesce && !isSessionEnabled(config)) { + console.warn( + '[emdash] d1({ coalesce: true }) has no effect without sessions — set session: "auto" (or "primary-first") to enable query coalescing.', + ); + } return new EmDashD1Dialect({ database: db }); } @@ -130,7 +132,20 @@ export interface RequestScopedDb { export function createRequestScopedDb(opts: RequestScopedDbOpts): RequestScopedDb | null { if (!isSessionEnabled(opts.config)) return null; const binding = getBinding(opts.config); - if (!binding || typeof binding.withSession !== "function") return null; + if (!binding || typeof binding.withSession !== "function") { + // Sessions are enabled in config, so createDialect's config-time warning + // didn't fire — but the live binding can't actually do sessions (older + // D1 binding / missing withSession). Coalescing silently falls back to + // the singleton, so surface that once rather than leaving the opt-in a + // mystery no-op. + if (opts.config.coalesce && binding && !warnedCoalesceNoRuntimeSession) { + warnedCoalesceNoRuntimeSession = true; + console.warn( + "[emdash] d1({ coalesce: true }) has no effect: the D1 binding does not support sessions (withSession() is unavailable at runtime). Query coalescing requires D1 sessions.", + ); + } + return null; + } const cookieName = opts.config.bookmarkCookie ?? DEFAULT_BOOKMARK_COOKIE; const configConstraint = @@ -162,8 +177,17 @@ export function createRequestScopedDb(opts: RequestScopedDbOpts): RequestScopedD // both of which D1DatabaseSession implements. // eslint-disable-next-line typescript/no-unsafe-type-assertion -- session is structurally compatible with the subset D1Dialect uses const sessionAsDatabase = session as unknown as D1Database; + // Coalescing is per-request only by construction: this Kysely (and its + // driver buffer) lives for a single request, so there is no cross-request + // buffering. The shared singleton from createDialect must never coalesce. + const dialect = opts.config.coalesce + ? new CoalescingD1Dialect({ database: sessionAsDatabase }) + : new EmDashD1Dialect({ database: sessionAsDatabase }); const db = new Kysely({ - dialect: new EmDashD1Dialect({ database: sessionAsDatabase }), + dialect, + // Kysely measures around the driver call, so per-query metrics still + // count each query. With coalescing, durations reflect the shared batch + // window rather than per-statement time — acceptable. log: kyselyLogOption(), }); diff --git a/packages/cloudflare/src/index.ts b/packages/cloudflare/src/index.ts index 5009ae379..0f56fe823 100644 --- a/packages/cloudflare/src/index.ts +++ b/packages/cloudflare/src/index.ts @@ -70,6 +70,33 @@ export interface D1Config { * @default "__em_d1_bookmark" */ bookmarkCookie?: string; + + /** + * Experimental: batch concurrent read queries into one D1 round trip. + * + * SELECT queries issued in the same event-loop turn are buffered and + * executed as a single D1 `batch()` call (one HTTP round trip) instead + * of N serialized round trips. Writes, CTEs and other statements are not + * batched — they enqueue immediately on the direct path. If the batch + * fails, queries are retried individually so each query keeps its own + * error semantics. Every physical D1 call (writes and the SELECT batch + * alike) is serialized per request, so the session bookmark always + * advances in execution order. + * + * Only applies to the per-request session database, so `session` must + * also be enabled (`"auto"` or `"primary-first"`); the shared singleton + * never coalesces. + * + * Ordering caveat: buffered reads execute at the next flush window + * (~one macrotask later), while a write enqueues immediately. A read and + * a write issued concurrently in the same turn (e.g. under + * `Promise.all`) may therefore execute write-first (they never overlap). + * Reads that must observe pre-write state should be awaited before + * issuing the write — which sequential `await` code already does. + * + * @default false + */ + coalesce?: boolean; } /** diff --git a/packages/cloudflare/tests/db/coalescing-d1.test.ts b/packages/cloudflare/tests/db/coalescing-d1.test.ts new file mode 100644 index 000000000..04152b487 --- /dev/null +++ b/packages/cloudflare/tests/db/coalescing-d1.test.ts @@ -0,0 +1,282 @@ +import { CompiledQuery, Kysely } from "kysely"; +import { describe, expect, it } from "vitest"; + +import { CoalescingD1Dialect } from "../../src/db/coalescing-d1.js"; + +interface MockResultConfig { + rows?: Record[]; + changes?: number; + lastRowId?: number; + error?: Error; +} + +interface MockStatement { + sql: string; + params: unknown[]; + bind: (...params: unknown[]) => MockStatement; + all: () => Promise; +} + +/** + * Hand-rolled mock of the D1Database subset the dialect uses (prepare/batch). + * Records every `all()` and `batch()` call (and their order) so tests can + * assert exactly which statements were coalesced. + */ +function createMockD1(resultsBySql: Record = {}) { + const operations: string[] = []; + const batchCalls: MockStatement[][] = []; + const allCalls: MockStatement[] = []; + + function d1Result(stmt: MockStatement) { + const config = resultsBySql[stmt.sql] ?? {}; + if (config.error) throw config.error; + return { + success: true, + results: config.rows ?? [], + meta: { changes: config.changes ?? 0, last_row_id: config.lastRowId ?? 0 }, + }; + } + + const database = { + prepare(sql: string): MockStatement { + const stmt: MockStatement = { + sql, + params: [], + bind(...params: unknown[]) { + stmt.params = params; + return stmt; + }, + async all() { + operations.push(`all:${stmt.sql}`); + allCalls.push(stmt); + return d1Result(stmt); + }, + }; + return stmt; + }, + // D1 batches are atomic: a single bad statement rejects the whole call, + // which the throw inside d1Result reproduces. + async batch(statements: MockStatement[]) { + operations.push(`batch:${statements.map((s) => s.sql).join("|")}`); + batchCalls.push(statements); + return statements.map((s) => d1Result(s)); + }, + }; + + return { database, operations, batchCalls, allCalls }; +} + +function createDb(mock: ReturnType) { + // eslint-disable-next-line typescript/no-unsafe-type-assertion -- mock implements the prepare/batch subset the dialect uses + const database = mock.database as unknown as D1Database; + return new Kysely({ dialect: new CoalescingD1Dialect({ database }) }); +} + +describe("CoalescingD1Dialect", () => { + it("coalesces SELECTs issued in the same turn into one batch in issue order", async () => { + const rowsA = [{ id: 1 }]; + const rowsB = [{ id: 2 }, { id: 3 }]; + const mock = createMockD1({ + "SELECT * FROM a": { rows: rowsA }, + "select * from b where id = ?": { rows: rowsB }, + }); + const db = createDb(mock); + + const [r1, r2] = await Promise.all([ + db.executeQuery(CompiledQuery.raw("SELECT * FROM a")), + db.executeQuery(CompiledQuery.raw("select * from b where id = ?", ["abc"])), + ]); + + expect(mock.batchCalls).toHaveLength(1); + expect(mock.batchCalls[0]?.map((s) => s.sql)).toEqual([ + "SELECT * FROM a", + "select * from b where id = ?", + ]); + // Each caller gets its own rows, and bind params are preserved. + expect(r1.rows).toEqual(rowsA); + expect(r2.rows).toEqual(rowsB); + expect(mock.batchCalls[0]?.[1]?.params).toEqual(["abc"]); + // Nothing went through the single-statement path. + expect(mock.allCalls).toHaveLength(0); + }); + + it("does not batch sequentially awaited SELECTs", async () => { + const mock = createMockD1({ + "select 1": { rows: [{ one: 1 }] }, + "select 2": { rows: [{ two: 2 }] }, + }); + const db = createDb(mock); + + const r1 = await db.executeQuery(CompiledQuery.raw("select 1")); + const r2 = await db.executeQuery(CompiledQuery.raw("select 2")); + + expect(mock.batchCalls).toHaveLength(0); + expect(mock.allCalls.map((s) => s.sql)).toEqual(["select 1", "select 2"]); + expect(r1.rows).toEqual([{ one: 1 }]); + expect(r2.rows).toEqual([{ two: 2 }]); + // Lone SELECTs flow through the buffer but report no affected rows. + expect(r1.numAffectedRows).toBeUndefined(); + }); + + it("executes writes and CTEs immediately while same-turn SELECTs still coalesce", async () => { + const mock = createMockD1({ "insert into a (name) values (?)": { changes: 1, lastRowId: 7 } }); + const db = createDb(mock); + + await Promise.all([ + db.executeQuery(CompiledQuery.raw("select * from a")), + db.executeQuery(CompiledQuery.raw("insert into a (name) values (?)", ["x"])), + db.executeQuery(CompiledQuery.raw("update a set name = ?", ["y"])), + db.executeQuery(CompiledQuery.raw("delete from a where id = ?", [1])), + db.executeQuery(CompiledQuery.raw("select * from b")), + // WITH is never coalesced: SQLite allows CTEs on writes. + db.executeQuery(CompiledQuery.raw("with x as (select 1) delete from a")), + ]); + + // Non-SELECTs hit the direct path before the macrotask flush fires. + expect(mock.operations).toEqual([ + "all:insert into a (name) values (?)", + "all:update a set name = ?", + "all:delete from a where id = ?", + "all:with x as (select 1) delete from a", + "batch:select * from a|select * from b", + ]); + }); + + it("falls back to individual execution when the batch rejects", async () => { + const failure = new Error("no such column: nope"); + const mock = createMockD1({ + "select good1": { rows: [{ ok: 1 }] }, + "select nope": { error: failure }, + "select good2": { rows: [{ ok: 2 }] }, + }); + const db = createDb(mock); + + const [r1, r2, r3] = await Promise.allSettled([ + db.executeQuery(CompiledQuery.raw("select good1")), + db.executeQuery(CompiledQuery.raw("select nope")), + db.executeQuery(CompiledQuery.raw("select good2")), + ]); + + // One atomic batch was attempted, then every statement re-ran solo. + expect(mock.batchCalls).toHaveLength(1); + expect(mock.allCalls.map((s) => s.sql)).toEqual([ + "select good1", + "select nope", + "select good2", + ]); + + expect(r1.status).toBe("fulfilled"); + if (r1.status === "fulfilled") expect(r1.value.rows).toEqual([{ ok: 1 }]); + expect(r2.status).toBe("rejected"); + if (r2.status === "rejected") expect(r2.reason).toBe(failure); + expect(r3.status).toBe("fulfilled"); + if (r3.status === "fulfilled") expect(r3.value.rows).toEqual([{ ok: 2 }]); + }); + + it("maps rows, numAffectedRows and insertId on the direct path", async () => { + const mock = createMockD1({ + "insert into a (name) values (?)": { changes: 2, lastRowId: 7 }, + "delete from a where 1 = 0": { changes: 0 }, + }); + const db = createDb(mock); + + const inserted = await db.executeQuery( + CompiledQuery.raw("insert into a (name) values (?)", ["x"]), + ); + expect(inserted.rows).toEqual([]); + expect(inserted.numAffectedRows).toBe(2n); + expect(inserted.insertId).toBe(7n); + expect(mock.allCalls[0]?.params).toEqual(["x"]); + + // Zero changes maps to undefined, matching kysely-d1. + const deleted = await db.executeQuery(CompiledQuery.raw("delete from a where 1 = 0")); + expect(deleted.numAffectedRows).toBeUndefined(); + expect(mock.batchCalls).toHaveLength(0); + }); + + it("starts a new batch window after a flush", async () => { + const mock = createMockD1(); + const db = createDb(mock); + + await Promise.all([ + db.executeQuery(CompiledQuery.raw("select 1")), + db.executeQuery(CompiledQuery.raw("select 2")), + ]); + await Promise.all([ + db.executeQuery(CompiledQuery.raw("select 3")), + db.executeQuery(CompiledQuery.raw("select 4")), + ]); + + expect(mock.batchCalls).toHaveLength(2); + expect(mock.batchCalls[0]?.map((s) => s.sql)).toEqual(["select 1", "select 2"]); + expect(mock.batchCalls[1]?.map((s) => s.sql)).toEqual(["select 3", "select 4"]); + }); + + it("never overlaps physical session calls: a write and a SELECT batch run serially", async () => { + // The driver flips supportsMultipleConnections to true, which removes + // Kysely's connection mutex. A same-turn write (direct path) must still + // not run concurrently with the buffered-SELECT batch on the shared D1 + // session — overlapping calls could interleave the session bookmark. + // This mock counts how many physical calls are in flight at once; the + // pre-serialization code reaches 2 here. + let inFlight = 0; + let maxInFlight = 0; + const ops: string[] = []; + const okResult = () => ({ success: true, results: [], meta: { changes: 0, last_row_id: 0 } }); + async function hold(label: string, value: T): Promise { + inFlight++; + maxInFlight = Math.max(maxInFlight, inFlight); + ops.push(label); + // Hold the "connection" across a macrotask so a concurrent call, + // if one were issued, would be observed in flight simultaneously. + await new Promise((resolve) => setTimeout(resolve, 0)); + inFlight--; + return value; + } + function makeStatement(sql: string): MockStatement { + const stmt: MockStatement = { + sql, + params: [], + bind(...params: unknown[]) { + stmt.params = params; + return stmt; + }, + all: () => hold(`all:${stmt.sql}`, okResult()), + }; + return stmt; + } + const database = { + prepare: (sql: string) => makeStatement(sql), + batch: (statements: MockStatement[]) => + hold( + `batch:${statements.map((s) => s.sql).join("|")}`, + statements.map(() => okResult()), + ), + }; + // eslint-disable-next-line typescript/no-unsafe-type-assertion -- mock implements the prepare/batch subset the dialect uses + const db = new Kysely({ + dialect: new CoalescingD1Dialect({ database: database as unknown as D1Database }), + }); + + await Promise.all([ + db.executeQuery(CompiledQuery.raw("select * from a")), + db.executeQuery(CompiledQuery.raw("select * from b")), + db.executeQuery(CompiledQuery.raw("update a set n = ?", ["x"])), + ]); + + expect(maxInFlight).toBe(1); + // The write executes first (direct path, enqueued immediately), then the + // coalesced SELECT batch — never overlapping. + expect(ops).toEqual(["all:update a set n = ?", "batch:select * from a|select * from b"]); + }); + + it("rejects transactions", async () => { + const db = createDb(createMockD1()); + + await expect( + db.transaction().execute(async () => { + // never reached + }), + ).rejects.toThrow("Transactions are not supported"); + }); +});