Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/dirty-pugs-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@emdash-cms/cloudflare": minor
---

New experimental `coalesce` option for the `d1()` adapter, for much faster uncached page loads:

```ts
emdash({
database: d1({ binding: "DB", session: "auto", coalesce: true }),
});
```

When enabled, read queries that a page issues at the same time are sent to D1 as a single round trip instead of one at a time. A page that runs half a dozen queries — settings, menus, the entry, related posts — pays for one trip to the database instead of six, which can cut uncached render time by more than half. Each query still gets its own results and its own errors, and writes are unaffected. Requires `session` to be enabled; off by default while experimental.
2 changes: 1 addition & 1 deletion infra/blog-demo/astro.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export default defineConfig({
integrations: [
react(),
emdash({
database: d1({ binding: "DB", session: "auto" }),
database: d1({ binding: "DB", session: "auto", coalesce: true }),
storage: r2({ binding: "MEDIA" }),
plugins: [formsPlugin()],
sandboxed: [webhookNotifier],
Expand Down
2 changes: 1 addition & 1 deletion infra/cache-demo/astro.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export default defineConfig({
integrations: [
react(),
emdash({
database: d1({ binding: "DB", session: "auto" }),
database: d1({ binding: "DB", session: "auto", coalesce: true }),
storage: r2({ binding: "MEDIA" }),
plugins: [formsPlugin()],
sandboxed: [webhookNotifier],
Expand Down
276 changes: 276 additions & 0 deletions packages/cloudflare/src/db/coalescing-d1.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/**
* Experimental coalescing D1 driver.
*
* Buffers SELECT queries issued in the same event-loop turn and executes
* them as a single D1 `batch()` call (one HTTP round trip) instead of N
* fully-serialized round trips. Production pages routinely issue 5-7
* serialized queries at 15-40ms each; batching collapses them into one.
*
* Only used for the per-request session Kysely (see createRequestScopedDb
* in d1.ts). The shared singleton must never coalesce: concurrent requests
* would share a buffer and one request's queries could be batched into
* another request's round trip.
*/

import {
type CompiledQuery,
type DatabaseConnection,
type Driver,
type QueryResult,
SqliteAdapter,
} from "kysely";
import type { D1DialectConfig } from "kysely-d1";

import { EmDashD1Dialect } from "./d1-dialect.js";

/**
* Statements safe to coalesce: plain SELECTs. Deliberately conservative —
* `WITH` is excluded because SQLite allows CTEs on writes
* (`WITH ... INSERT/UPDATE/DELETE`), and everything else (insert, update,
* delete, pragma, explain, ...) takes the direct path.
*/
const SELECT_PATTERN = /^select\b/i;

interface PendingQuery {
statement: D1PreparedStatement;
resolve: (result: QueryResult<any>) => void;
reject: (error: unknown) => void;
/** SQL text, kept for error reporting. */
sql: string;
}

/**
* Map a D1 result to a Kysely QueryResult. Mirrors kysely-d1's mapping
* exactly: rows from `results`, numAffectedRows from `meta.changes` as
* BigInt when > 0, insertId from `meta.last_row_id`.
*
* No `success` check: the D1Result type declares `success: true`, i.e. a
* returned result is always a success — D1 rejects the `.all()`/`.batch()`
* promise on failure, which the callers handle (the batch catch-fallback,
* and Kysely's own error path on the direct path).
*/
function mapD1Result<R>(result: D1Result<R>): QueryResult<R> {
if (result.error) {
throw new Error(result.error);
}
const numAffectedRows = result.meta.changes > 0 ? BigInt(result.meta.changes) : undefined;
return {
insertId:
result.meta.last_row_id === undefined || result.meta.last_row_id === null
? undefined
: BigInt(result.meta.last_row_id),
rows: result.results ?? [],
numAffectedRows,
};
}

export class CoalescingD1Connection implements DatabaseConnection {
#database: D1Database;
#buffer: PendingQuery[] = [];
#flushScheduled = false;
/**
* Tail of a promise chain that serializes every physical call against the
* shared D1DatabaseSession (direct-path statements and batch flushes
* alike). See #enqueue.
*/
#opChain: Promise<unknown> = Promise.resolve();

constructor(database: D1Database) {
this.#database = database;
}

/**
* Run `op` after all previously-enqueued session calls have settled, so
* only one physical call is ever in flight against the shared
* D1DatabaseSession at a time.
*
* The plain SqliteAdapter reports `supportsMultipleConnections: false`,
* which makes Kysely serialize every query behind a connection mutex. We
* override that to `true` so same-turn SELECTs can reach the buffer
* together — but that also removes the mutex for writes and direct-path
* statements. D1 sessions are sequentially consistent and advance a
* bookmark per executed query; overlapping calls on one session could
* interleave that bookmark and persist a stale one at commit(), breaking
* read-your-writes. This chain restores the single-in-flight invariant
* for physical calls while still letting SELECTs coalesce into one batch.
*
* A failed op must not break the chain, so the stored tail swallows the
* outcome; the returned promise still rejects for the caller.
*/
#enqueue<T>(op: () => Promise<T>): Promise<T> {
const run = this.#opChain.then(op, op);
this.#opChain = run.then(
() => undefined,
() => undefined,
);
return run;
}

async executeQuery<R>(compiledQuery: CompiledQuery): Promise<QueryResult<R>> {
if (!SELECT_PATTERN.test(compiledQuery.sql.trim())) {
// Non-SELECT: execute on the direct path (kysely-d1's prepare/bind/all
// flow), but through the op chain so it can't overlap an in-flight
// SELECT batch or another write on the shared session.
const statement = this.#database.prepare(compiledQuery.sql).bind(...compiledQuery.parameters);
const result = await this.#enqueue(() => statement.all<R>());
return mapD1Result(result);
}

const statement = this.#database.prepare(compiledQuery.sql).bind(...compiledQuery.parameters);

return new Promise<QueryResult<R>>((resolve, reject) => {
this.#buffer.push({ statement, resolve, reject, sql: compiledQuery.sql });
this.#scheduleFlush();
});
}

/**
* Schedule a flush of buffered SELECTs unless one is already pending.
*
* setTimeout(0) (macrotask), not queueMicrotask: Kysely awaits internally
* between acquiring the connection and executing each query, so a
* microtask window would close before sibling queries issued in the same
* turn reach the connection. Queries that arrive after the buffer is
* drained (flag already cleared) simply schedule the next window; physical
* ordering against any in-flight call is handled by #enqueue.
*/
#scheduleFlush(): void {
if (this.#flushScheduled) return;
this.#flushScheduled = true;
setTimeout(() => {
void this.#flush();
}, 0);
}

async #flush(): Promise<void> {
// Clear the scheduled flag before draining so queries arriving after
// this point schedule a fresh window rather than being stranded.
this.#flushScheduled = false;
const pending = this.#buffer.splice(0, this.#buffer.length);
if (pending.length === 0) return;

// Serialize the physical batch/all against every other session call.
await this.#enqueue(async () => {
const first = pending[0];
if (pending.length === 1 && first) {
// A lone query gains nothing from batch(); execute it directly.
try {
first.resolve(mapD1Result(await first.statement.all()));
} catch (error) {
first.reject(error);
}
return;
}

let results: D1Result[];
try {
results = await this.#database.batch(pending.map((p) => p.statement));
} catch {
// D1 batches are atomic: one bad statement rejects the whole call.
// Fall back to executing every buffered statement individually
// (they are all SELECTs, safe to re-run) so innocent queries still
// resolve and only the genuinely failing one rejects with its own
// error. This preserves per-query error semantics. Sequential, in
// issue order: this is an error path where determinism matters
// more than latency, and it avoids piling concurrent retries onto
// a database that just failed.
for (const p of pending) {
try {
p.resolve(mapD1Result(await p.statement.all()));
} catch (error) {
p.reject(error);
}
}
return;
}

for (let i = 0; i < pending.length; i++) {
const entry = pending[i];
if (!entry) continue;
const result = results[i];
if (result) {
try {
entry.resolve(mapD1Result(result));
} catch (error) {
entry.reject(error);
}
} else {
entry.reject(new Error(`D1 batch() returned no result for statement ${i}: ${entry.sql}`));
}
}
});
}

// eslint-disable-next-line require-yield -- D1 doesn't support streaming (same as kysely-d1)
async *streamQuery<R>(): AsyncIterableIterator<QueryResult<R>> {
throw new Error("D1 Driver does not support streaming");
}
}

export class CoalescingD1Driver implements Driver {
#connection: CoalescingD1Connection;

constructor(database: D1Database) {
// A single shared connection: the whole point is for concurrent queries
// in the same request to land in the same buffer.
this.#connection = new CoalescingD1Connection(database);
}

async init(): Promise<void> {}

async acquireConnection(): Promise<DatabaseConnection> {
return this.#connection;
}

async beginTransaction(): Promise<void> {
throw new Error("Transactions are not supported");
}

async commitTransaction(): Promise<void> {
throw new Error("Transactions are not supported");
}

async rollbackTransaction(): Promise<void> {
throw new Error("Transactions are not supported");
}

async releaseConnection(): Promise<void> {}

async destroy(): Promise<void> {}
}

/**
* SqliteAdapter reports `supportsMultipleConnections: false`, which makes
* Kysely's RuntimeDriver serialize every query behind a connection mutex
* (acquire → execute → release). Under that mutex a second query never
* reaches the connection until the first resolves, so nothing would ever
* coalesce. Our shared connection is explicitly safe for concurrent
* `executeQuery` calls — that is the whole point — so report `true`.
* Transactions are rejected by the driver regardless.
*/
class CoalescingD1Adapter extends SqliteAdapter {
override get supportsMultipleConnections(): boolean {
return true;
}
}

/**
* D1 dialect that coalesces same-turn SELECTs into a single `batch()` round
* trip. Keeps EmDash's D1-compatible introspector.
*/
export class CoalescingD1Dialect extends EmDashD1Dialect {
#database: D1Database;

constructor(config: D1DialectConfig) {
super(config);
this.#database = config.database;
}

override createAdapter(): SqliteAdapter {
return new CoalescingD1Adapter();
}

override createDriver(): Driver {
return new CoalescingD1Driver(this.#database);
}
}
24 changes: 24 additions & 0 deletions packages/cloudflare/src/db/d1-dialect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Shared D1 Kysely dialect with EmDash's D1-compatible introspector.
*
* Lives in its own module (rather than d1.ts) so the coalescing dialect in
* coalescing-d1.ts can extend it without creating a circular import with
* d1.ts, and without pulling cloudflare:workers into test environments.
*/

import type { DatabaseIntrospector, Kysely } from "kysely";
import { D1Dialect } from "kysely-d1";

import { D1Introspector } from "./d1-introspector.js";

/**
* Custom D1 Dialect that uses our D1-compatible introspector
*
* The default kysely-d1 dialect uses SqliteIntrospector which does a
* cross-join with pragma_table_info() that D1 doesn't allow.
*/
export class EmDashD1Dialect extends D1Dialect {
override createIntrospector(db: Kysely<any>): DatabaseIntrospector {
return new D1Introspector(db);
}
}
Loading
Loading