diff --git a/.changeset/bright-snakes-clean.md b/.changeset/bright-snakes-clean.md new file mode 100644 index 000000000..040970701 --- /dev/null +++ b/.changeset/bright-snakes-clean.md @@ -0,0 +1,37 @@ +--- +'@powersync/common': minor +'@powersync/node': minor +'@powersync/web': minor +'@powersync/react-native': minor +--- + +Add experimental support for raw tables, giving you full control over the table structure to sync into. +While PowerSync manages tables as JSON views by default, raw tables have to be created by the application +developer. Also, the upsert and delete statements for raw tables needs to be specified in the app schema: + +```JavaScript +const customSchema = new Schema({}); +customSchema.withRawTables({ + lists: { + put: { + sql: 'INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)', + // put statements can use `Id` and extracted columns to bind parameters. + params: ['Id', { Column: 'name' }] + }, + delete: { + sql: 'DELETE FROM lists WHERE id = ?', + // delete statements can only use the id (but a CTE querying existing rows by id could + // be used as a workaround). + params: ['Id'] + } + } +}); + +const powersync = // open powersync database; +await powersync.execute('CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT);'); + +// Ready to sync into your custom table at this point +``` + +The main benefit of raw tables is better query performance (since SQLite doesn't have to +extract rows from JSON) and more control (allowing the use of e.g. column and table constraints). diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 590747969..4d29a4785 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -469,7 +469,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { await this.syncDisposer?.(); } - async connect(connector: PowerSyncBackendConnector, options?: PowerSyncConnectionOptions) { + async connect(connector: PowerSyncBackendConnector, options: PowerSyncConnectionOptions) { // Keep track if there were pending operations before this call const hadPendingOptions = !!this.pendingConnectionOptions; diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 58d4c6492..d88308d14 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -152,6 +152,11 @@ export interface BaseConnectionOptions { * These parameters are passed to the sync rules, and will be available under the`user_parameters` object. */ params?: Record; + + /** + * The serialized schema - mainly used to forward information about raw tables to the sync client. + */ + serializedSchema?: any; } /** @internal */ @@ -208,7 +213,8 @@ export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptio connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET, clientImplementation: DEFAULT_SYNC_CLIENT_IMPLEMENTATION, fetchStrategy: FetchStrategy.Buffered, - params: {} + params: {}, + serializedSchema: undefined }; // The priority we assume when we receive checkpoint lines where no priority is set. @@ -1019,12 +1025,12 @@ The next upload iteration will be delayed.`); } try { - await control( - PowerSyncControlCommand.START, - JSON.stringify({ - parameters: resolvedOptions.params - }) - ); + const options: any = { parameters: resolvedOptions.params }; + if (resolvedOptions.serializedSchema) { + options.schema = resolvedOptions.serializedSchema; + } + + await control(PowerSyncControlCommand.START, JSON.stringify(options)); this.notifyCompletedUploads = () => { controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); diff --git a/packages/common/src/db/schema/RawTable.ts b/packages/common/src/db/schema/RawTable.ts new file mode 100644 index 000000000..69d4dd9d6 --- /dev/null +++ b/packages/common/src/db/schema/RawTable.ts @@ -0,0 +1,63 @@ +/** + * A pending variant of a {@link RawTable} that doesn't have a name (because it would be inferred when creating the + * schema). + */ +export type RawTableType = { + /** + * The statement to run when PowerSync detects that a row needs to be inserted or updated. + */ + put: PendingStatement; + /** + * The statement to run when PowerSync detects that a row needs to be deleted. + */ + delete: PendingStatement; +}; + +/** + * A parameter to use as part of {@link PendingStatement}. + * + * For delete statements, only the `"Id"` value is supported - the sync client will replace it with the id of the row to + * be synced. + * + * For insert and replace operations, the values of columns in the table are available as parameters through + * `{Column: 'name'}`. + */ +export type PendingStatementParameter = 'Id' | { Column: string }; + +/** + * A statement that the PowerSync client should use to insert or delete data into a table managed by the user. + */ +export type PendingStatement = { + sql: string; + params: PendingStatementParameter[]; +}; + +/** + * Instructs PowerSync to sync data into a "raw" table. + * + * Since raw tables are not backed by JSON, running complex queries on them may be more efficient. Further, they allow + * using client-side table and column constraints. + * + * Note that raw tables are only supported when using the new `SyncClientImplementation.rust` sync client. + * + * @experimental Please note that this feature is experimental at the moment, and not covered by PowerSync semver or + * stability guarantees. + */ +export class RawTable implements RawTableType { + /** + * The name of the table. + * + * This does not have to match the actual table name in the schema - {@link put} and {@link delete} are free to use + * another table. Instead, this name is used by the sync client to recognize that operations on this table (as it + * appears in the source / backend database) are to be handled specially. + */ + name: string; + put: PendingStatement; + delete: PendingStatement; + + constructor(name: string, type: RawTableType) { + this.name = name; + this.put = type.put; + this.delete = type.delete; + } +} diff --git a/packages/common/src/db/schema/Schema.ts b/packages/common/src/db/schema/Schema.ts index 0aa175c3a..9f82ecf80 100644 --- a/packages/common/src/db/schema/Schema.ts +++ b/packages/common/src/db/schema/Schema.ts @@ -1,3 +1,4 @@ +import { RawTable, RawTableType } from './RawTable.js'; import { RowType, Table } from './Table.js'; type SchemaType = Record>; @@ -16,6 +17,7 @@ export class Schema { readonly types: SchemaTableType; readonly props: S; readonly tables: Table[]; + readonly rawTables: RawTable[]; constructor(tables: Table[] | S) { if (Array.isArray(tables)) { @@ -36,6 +38,24 @@ export class Schema { this.props = tables as S; this.tables = this.convertToClassicTables(this.props); } + + this.rawTables = []; + } + + /** + * Adds raw tables to this schema. Raw tables are identified by their name, but entirely managed by the application + * developer instead of automatically by PowerSync. + * Since raw tables are not backed by JSON, running complex queries on them may be more efficient. Further, they allow + * using client-side table and column constraints. + * Note that raw tables are only supported when using the new `SyncClientImplementation.rust` sync client. + * + * @param tables An object of (table name, raw table definition) entries. + * @experimental Note that the raw tables API is still experimental and may change in the future. + */ + withRawTables(tables: Record) { + for (const [name, rawTableDefinition] of Object.entries(tables)) { + this.rawTables.push(new RawTable(name, rawTableDefinition)); + } } validate() { @@ -47,7 +67,8 @@ export class Schema { toJSON() { return { // This is required because "name" field is not present in TableV2 - tables: this.tables.map((t) => t.toJSON()) + tables: this.tables.map((t) => t.toJSON()), + raw_tables: this.rawTables }; } diff --git a/packages/common/tests/db/schema/Schema.test.ts b/packages/common/tests/db/schema/Schema.test.ts index c9f303535..fa9a31689 100644 --- a/packages/common/tests/db/schema/Schema.test.ts +++ b/packages/common/tests/db/schema/Schema.test.ts @@ -80,6 +80,18 @@ describe('Schema', () => { content: column.text }) }); + schema.withRawTables({ + lists: { + put: { + sql: 'SELECT 1', + params: [{ Column: 'foo' }] + }, + delete: { + sql: 'SELECT 2', + params: ['Id'] + } + } + }); const json = schema.toJSON(); @@ -115,6 +127,19 @@ describe('Schema', () => { ], indexes: [] } + ], + raw_tables: [ + { + name: 'lists', + delete: { + sql: 'SELECT 2', + params: ['Id'] + }, + put: { + sql: 'SELECT 1', + params: [{ Column: 'foo' }] + } + } ] }); }); diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index f8fe6ac81..2b513a0c1 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -7,6 +7,7 @@ import { OplogEntryJSON, PowerSyncConnectionOptions, ProgressWithOperations, + Schema, SyncClientImplementation, SyncStreamConnectionMethod } from '@powersync/common'; @@ -638,6 +639,85 @@ function defineSyncTests(impl: SyncClientImplementation) { expect(another.currentStatus.statusForPriority(0).hasSynced).toBeTruthy(); await another.waitForFirstSync({ priority: 0 }); }); + + if (impl == SyncClientImplementation.RUST) { + mockSyncServiceTest('raw tables', async ({ syncService }) => { + const customSchema = new Schema({}); + customSchema.withRawTables({ + lists: { + put: { + sql: 'INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)', + params: ['Id', { Column: 'name' }] + }, + delete: { + sql: 'DELETE FROM lists WHERE id = ?', + params: ['Id'] + } + } + }); + + const powersync = await syncService.createDatabase({ schema: customSchema }); + await powersync.execute('CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT);'); + + const query = powersync.watchWithAsyncGenerator('SELECT * FROM lists')[Symbol.asyncIterator](); + expect((await query.next()).value.rows._array).toStrictEqual([]); + + powersync.connect(new TestConnector(), options); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + + syncService.pushLine({ + checkpoint: { + last_op_id: '1', + buckets: [bucket('a', 1)] + } + }); + syncService.pushLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '1', + op: 'PUT', + object_id: 'my_list', + object_type: 'lists', + data: '{"name": "custom list"}' + } + ] + } + }); + syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } }); + await powersync.waitForFirstSync(); + + expect((await query.next()).value.rows._array).toStrictEqual([{ id: 'my_list', name: 'custom list' }]); + + syncService.pushLine({ + checkpoint: { + last_op_id: '2', + buckets: [bucket('a', 2)] + } + }); + await vi.waitFor(() => powersync.currentStatus.dataFlowStatus.downloading == true); + syncService.pushLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '2', + op: 'REMOVE', + object_id: 'my_list', + object_type: 'lists' + } + ] + } + }); + syncService.pushLine({ checkpoint_complete: { last_op_id: '2' } }); + await vi.waitFor(() => powersync.currentStatus.dataFlowStatus.downloading == false); + + expect((await query.next()).value.rows._array).toStrictEqual([]); + }); + } } function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum { diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index 4246f9f76..ab10c9fa2 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -9,7 +9,6 @@ import { column, NodePowerSyncDatabaseOptions, PowerSyncBackendConnector, - PowerSyncConnectionOptions, PowerSyncCredentials, PowerSyncDatabase, Schema, @@ -56,12 +55,12 @@ async function createDatabase( options: Partial = {} ): Promise { const database = new PowerSyncDatabase({ - ...options, schema: AppSchema, database: { dbFilename: 'test.db', dbLocation: tmpdir - } + }, + ...options }); await database.init(); return database; @@ -128,8 +127,9 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ } }; - const newConnection = async () => { + const newConnection = async (options?: Partial) => { const db = await createDatabase(tmpdir, { + ...options, remoteOptions: { fetchImplementation: inMemoryFetch } @@ -156,7 +156,7 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ export interface MockSyncService { pushLine: (line: StreamingSyncLine) => void; connectedListeners: any[]; - createDatabase: () => Promise; + createDatabase: (options?: Partial) => Promise; } export class TestConnector implements PowerSyncBackendConnector { diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 0af122ead..3e1c85f09 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -239,7 +239,7 @@ export class SharedSyncImplementation */ async connect(options?: PowerSyncConnectionOptions) { this.lastConnectOptions = options; - return this.connectionManager.connect(CONNECTOR_PLACEHOLDER, options); + return this.connectionManager.connect(CONNECTOR_PLACEHOLDER, options ?? {}); } async disconnect() { @@ -318,7 +318,7 @@ export class SharedSyncImplementation this.dbAdapter = null; if (shouldReconnect) { - await this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions); + await this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions ?? {}); } } diff --git a/packages/web/tests/stream.test.ts b/packages/web/tests/stream.test.ts index d45ebe0aa..8c5082353 100644 --- a/packages/web/tests/stream.test.ts +++ b/packages/web/tests/stream.test.ts @@ -3,13 +3,17 @@ import { createBaseLogger, DataStream, PowerSyncConnectionOptions, + Schema, + SyncClientImplementation, SyncStreamConnectionMethod, WASQLiteOpenFactory, - WASQLiteVFS + WASQLiteVFS, + WebPowerSyncOpenFactoryOptions } from '@powersync/web'; import { describe, expect, it, onTestFinished, vi } from 'vitest'; import { TestConnector } from './utils/MockStreamOpenFactory'; import { ConnectedDatabaseUtils, generateConnectedDatabase } from './utils/generateConnectedDatabase'; +import { v4 } from 'uuid'; const UPLOAD_TIMEOUT_MS = 3000; @@ -22,10 +26,11 @@ describe('Streaming', { sequential: true }, () => { { sequential: true }, - describeStreamingTests(() => + describeStreamingTests((options) => generateConnectedDatabase({ powerSyncOptions: { - logger + logger, + ...options } }) ) @@ -160,9 +165,109 @@ describe('Streaming', { sequential: true }, () => { await expectUserRows(2); }); }); + + // There are more tests for raw tables in the node package and in the core extension itself. We just want to make + // sure the schema options are properly forwarded. + it('raw tables smoke test', async () => { + const customSchema = new Schema({}); + customSchema.withRawTables({ + lists: { + put: { + sql: 'INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)', + params: ['Id', { Column: 'name' }] + }, + delete: { + sql: 'DELETE FROM lists WHERE id = ?', + params: ['Id'] + } + } + }); + + function bucket(name: string, count: number): BucketChecksum { + return { + bucket: name, + count, + checksum: 0, + priority: 3 + }; + } + + const { powersync, waitForStream, remote } = await generateConnectedDatabase({ + powerSyncOptions: { schema: customSchema, flags: { enableMultiTabs: true } } + }); + await powersync.execute('CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT);'); + onTestFinished(async () => { + await powersync.execute('DROP TABLE lists'); + }); + + const query = powersync.watchWithAsyncGenerator('SELECT * FROM lists')[Symbol.asyncIterator](); + expect((await query.next()).value.rows._array).toStrictEqual([]); + + powersync.connect(new TestConnector(), { + connectionMethod: SyncStreamConnectionMethod.HTTP, + clientImplementation: SyncClientImplementation.RUST + }); + await waitForStream(); + + remote.enqueueLine({ + checkpoint: { + last_op_id: '1', + buckets: [bucket('a', 1)] + } + }); + remote.enqueueLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '1', + op: 'PUT', + object_id: 'my_list', + object_type: 'lists', + data: '{"name": "custom list"}' + } + ] + } + }); + remote.enqueueLine({ checkpoint_complete: { last_op_id: '1' } }); + await powersync.waitForFirstSync(); + + console.log('has first sync, should update list'); + expect((await query.next()).value.rows._array).toStrictEqual([{ id: 'my_list', name: 'custom list' }]); + + remote.enqueueLine({ + checkpoint: { + last_op_id: '2', + buckets: [bucket('a', 2)] + } + }); + await vi.waitFor(() => powersync.currentStatus.dataFlowStatus.downloading == true); + remote.enqueueLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: '2', + op: 'REMOVE', + object_id: 'my_list', + object_type: 'lists' + } + ] + } + }); + remote.enqueueLine({ checkpoint_complete: { last_op_id: '2' } }); + await vi.waitFor(() => powersync.currentStatus.dataFlowStatus.downloading == false); + + console.log('has second sync, should update list'); + expect((await query.next()).value.rows._array).toStrictEqual([]); + }); }); -function describeStreamingTests(createConnectedDatabase: () => Promise) { +function describeStreamingTests( + createConnectedDatabase: (options?: Partial) => Promise +) { return () => { it('PowerSync reconnect on closed stream', async () => { const { powersync, waitForStream, remote } = await createConnectedDatabase();