Skip to content

[WIP] Trigger Based Table Diff Tracking #663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
"lint-staged": "^15.2.2",
"playwright": "^1.51.0",
"prettier": "^3.2.5",
"prettier-plugin-embed": "^0.4.15",
"prettier-plugin-sql": "^0.18.1",
"source-map-support": "^0.5.21",
"typescript": "^5.7.2",
"vitest": "^3.0.8"
}
Expand Down
14 changes: 14 additions & 0 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import {
type PowerSyncConnectionOptions,
type RequiredAdditionalConnectionOptions
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import { TriggerManager } from './triggers/TriggerManager.js';
import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down Expand Up @@ -182,6 +184,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

protected runExclusiveMutex: Mutex;

protected triggerManager: TriggerManagerImpl;

get triggers(): TriggerManager {
return this.triggerManager;
}

constructor(options: PowerSyncDatabaseOptionsWithDBAdapter);
constructor(options: PowerSyncDatabaseOptionsWithOpenFactory);
constructor(options: PowerSyncDatabaseOptionsWithSettings);
Expand Down Expand Up @@ -213,6 +221,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.ready = false;
this.sdkVersion = '';
this.runExclusiveMutex = new Mutex();

// Start async init
this.connectionManager = new ConnectionManager({
createSyncImplementation: async (connector, options) => {
Expand All @@ -239,7 +248,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
},
logger: this.logger
});

this._isReadyPromise = this.initialize();

this.triggerManager = new TriggerManagerImpl({
db: this
});
}

/**
Expand Down
201 changes: 201 additions & 0 deletions packages/common/src/client/triggers/TriggerManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import { LockContext } from 'src/db/DBAdapter.js';

export enum DiffTriggerOperation {
INSERT = 'INSERT',
UPDATE = 'UPDATE',
DELETE = 'DELETE'
}

export interface BaseTriggerDiffRecord {
id: string;
operation: DiffTriggerOperation;
/**
* Time the change operation was recorded.
* This is in ISO 8601 format, e.g. `2023-10-01T12:00:00.000Z`.
*/
timestamp: string;
}

export interface TriggerDiffUpdateRecord extends BaseTriggerDiffRecord {
operation: DiffTriggerOperation.UPDATE;
change: string;
}

export interface TriggerDiffInsertRecord extends BaseTriggerDiffRecord {
operation: DiffTriggerOperation.INSERT;
change: string;
}

export interface TriggerDiffDeleteRecord extends BaseTriggerDiffRecord {
operation: DiffTriggerOperation.DELETE;
}

export type TriggerDiffRecord = TriggerDiffUpdateRecord | TriggerDiffInsertRecord | TriggerDiffDeleteRecord;

export interface TriggerCreationHooks {
/**
* Executed inside the write lock before the trigger is created.
*/
beforeCreate?: (context: LockContext) => Promise<void>;
}

export interface CreateDiffTriggerOptions {
/**
* Source table to trigger and track changes from.
*/
source: string;

/**
* Operations to track changes for.
*/
operations: DiffTriggerOperation[];

/**
* Destination table to track changes to.
* This table is created internally.
*/
destination: string;

/**
* Columns to track and report changes for.
* Defaults to all columns in the source table.
*/
columns?: string[];

/**
* Optional condition to filter when the trigger should fire.
* This is useful for only triggering on specific conditions.
* For example, you can use it to only trigger on certain values in the NEW row.
* Note that for PowerSync the data is stored in a JSON column named `data`.
* @example
* [`NEW.data.status = 'active' AND length(NEW.data.name) > 3`]
*/
when?: string;

/**
* Optional context to create the triggers in.
* This can be useful to synchronize the current state and fetch all changes after the current state.
*/
hooks?: TriggerCreationHooks;
}

export type TriggerRemoveCallback = () => Promise<void>;

export interface TriggerDiffHandlerContext {
/**
* Allows querying the database with access to the table containing diff records.
* The diff table is accessible via the `diff` accessor.
*
* The `DIFF` table is of the form described in {@link TriggerManager#createDiffTrigger}
*
* @example
* ```sql
* SELECT
* todos.*
* FROM
* DIFF
* JOIN todos ON DIFF.id = todos.id
* ```
*/
getAll: <T = any>(query: string, params?: any[]) => Promise<T[]>;
}

export interface TrackDiffOptions {
/**
* A source SQLite table to track changes for.
*/
source: string;
/**
* SQLite Trigger WHEN condition to filter when the trigger should fire.
*/
filter?: string;
/**
* Table columns to track changes for.
*/
columns?: string[];
/**
* SQLite operations to track changes for.
*/
operations: DiffTriggerOperation[];
/**
* Handler for processing diff operations.
* Automatically invoked once diff items are present.
*/
onChange: (context: TriggerDiffHandlerContext) => Promise<void>;
/**
* Hooks into the trigger creation process.
*/
hooks?: TriggerCreationHooks;
}

export interface TriggerManager {
/**
* Creates a temporary trigger which tracks changes to a source table
* and writes changes to a destination table.
* The temporary destination table is created internally and will be dropped when the trigger is removed.
* The temporary destination table is created with the structure:
* ```sql
* CREATE TEMP TABLE ${destination} (
* id TEXT,
* operation TEXT,
* change TEXT,
* timestamp TEXT
* );
* ```
* The `change` column contains the JSON representation of the change. This column is NULL for
* {@link DiffTriggerOperation#DELETE} operations.
* For {@link DiffTriggerOperation#UPDATE} operations the `change` column is a JSON object containing both the `new` and `old` values.
*
* @returns A callback to remove the trigger and drop the destination table.
*/
createDiffTrigger(options: CreateDiffTriggerOptions): Promise<TriggerRemoveCallback>;

/**
* Tracks changes for a table. Triggering a provided handler on changes.
* Uses {@link createDiffTrigger} internally to create a temporary destination table.
* @returns A callback to cleanup the trigger and stop tracking changes.
*
* @example
* ```javascript
* database.triggers.trackTableDiff({
* source: 'ps_data__todos',
* columns: ['list_id'],
* filter: "json_extract(NEW.data, '$.list_id') = '123'",
* operations: [DiffTriggerOperation.INSERT],
* onChange: async (context) => {
* // Fetches the todo records that were inserted during this diff
* const newTodos = await context.getAll<Database['todos']>("
* SELECT
* todos.*
* FROM
* DIFF
* JOIN todos ON DIFF.id = todos.id
* ");
* todos.push(...newTodos);
* },
* hooks: {
* beforeCreate: async (lockContext) => {
* // This hook is executed inside the write lock before the trigger is created.
* // It can be used to synchronize the current state and fetch all changes after the current state.
* // Read the current state of the todos table
* const currentTodos = await lockContext.getAll<Database['todos']>(
* "
* SELECT
* *
* FROM
* todos
* WHERE
* list_id = ?
* ",
* ['123']
* );
*
* // Example code could process the current todos if necessary
* todos.push(...currentTodos);
* }
* }
* });
* ```
*/
trackTableDiff(options: TrackDiffOptions): Promise<TriggerRemoveCallback>;
}
Loading