Skip to content

feat: Add Proxy support for @powersync/node #573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 24, 2025
5 changes: 5 additions & 0 deletions .changeset/hip-lamps-draw.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/node': minor
---

Introduced support for specifying proxy environment variables for the connection methods. For HTTP it supports `HTTP_PROXY` or `HTTPS_PROXY`, and for WebSockets it supports `WS_PROXY` and `WSS_PROXY`.
6 changes: 6 additions & 0 deletions .changeset/thick-lies-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/common': minor
---

Added `fetchOptions` to AbstractRemoteOptions. Allows consumers to include fields such as `dispatcher` (e.g. for proxy support) to the fetch invocations.
Also ensuring all options provided to `connect()` are passed onwards, allows packages to have their own option definitions for `connect()` and the abstract `generateSyncStreamImplementation()`.
4 changes: 2 additions & 2 deletions demos/example-node/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import repl_factory from 'node:repl';
import { once } from 'node:events';
import repl_factory from 'node:repl';

import { PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
import { default as Logger } from 'js-logger';
import { AppSchema, DemoConnector } from './powersync.js';
import { exit } from 'node:process';
import { AppSchema, DemoConnector } from './powersync.js';

const main = async () => {
const logger = Logger.get('PowerSyncDemo');
Expand Down
38 changes: 18 additions & 20 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
return {
...options,
retryDelayMs:
options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
crudUploadThrottleMs:
Expand All @@ -436,12 +437,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
throw new Error('Cannot connect using a closed client');
}

const { retryDelayMs, crudUploadThrottleMs } = this.resolvedConnectionOptions(options);
const resolvedConnectOptions = this.resolvedConnectionOptions(options);

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
retryDelayMs,
crudUploadThrottleMs
});
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, resolvedConnectOptions);
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
this.currentStatus = new SyncStatus({
Expand Down Expand Up @@ -555,7 +553,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* This method does include transaction ids in the result, but does not group
* data by transaction. One batch may contain data from multiple transactions,
* and a single transaction may be split over multiple batches.
*
*
* @param limit Maximum number of CRUD entries to include in the batch
* @returns A batch of CRUD operations to upload, or null if there are none
*/
Expand Down Expand Up @@ -594,7 +592,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*
* Unlike {@link getCrudBatch}, this only returns data from a single transaction at a time.
* All data for the transaction is loaded into memory.
*
*
* @returns A transaction of CRUD operations to upload, or null if there are none
*/
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
Expand Down Expand Up @@ -633,7 +631,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Get an unique client id for this database.
*
* The id is not reset when the database is cleared, only when the database is deleted.
*
*
* @returns A unique identifier for the database instance
*/
async getClientId(): Promise<string> {
Expand Down Expand Up @@ -661,7 +659,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
/**
* Execute a SQL write (INSERT/UPDATE/DELETE) query
* and optionally return results.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The query result as an object with structured key-value pairs
Expand All @@ -674,7 +672,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
/**
* Execute a SQL write (INSERT/UPDATE/DELETE) query directly on the database without any PowerSync processing.
* This bypasses certain PowerSync abstractions and is useful for accessing the raw database results.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The raw query result from the underlying database as a nested array of raw values, where each row is
Expand All @@ -689,7 +687,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a write query (INSERT/UPDATE/DELETE) multiple times with each parameter set
* and optionally return results.
* This is faster than executing separately with each parameter set.
*
*
* @param sql The SQL query to execute
* @param parameters Optional 2D array of parameter sets, where each inner array is a set of parameters for one execution
* @returns The query result
Expand All @@ -701,7 +699,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return results.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns An array of results
Expand All @@ -713,7 +711,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The first result if found, or null if no results are returned
Expand All @@ -725,7 +723,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return the first result, error if the ResultSet is empty.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The first result matching the query
Expand Down Expand Up @@ -761,7 +759,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Open a read-only transaction.
* Read transactions can run concurrently to a write transaction.
* Changes from any write transaction are not visible to read transactions started before it.
*
*
* @param callback Function to execute within the transaction
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
* @returns The result of the callback
Expand All @@ -786,7 +784,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Open a read-write transaction.
* This takes a global lock - only one write transaction can execute against the database at a time.
* Statements within the transaction must be done on the provided {@link Transaction} interface.
*
*
* @param callback Function to execute within the transaction
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
* @returns The result of the callback
Expand Down Expand Up @@ -865,7 +863,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
*
* Note that the `onChange` callback member of the handler is required.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @param handler Callbacks for handling results and errors
Expand Down Expand Up @@ -915,7 +913,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a read query every time the source tables are modified.
* Use {@link SQLWatchOptions.throttleMs} to specify the minimum interval between queries.
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @param options Options for configuring watch behavior
Expand Down Expand Up @@ -944,7 +942,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Resolves the list of tables that are used in a SQL query.
* If tables are specified in the options, those are used directly.
* Otherwise, analyzes the query using EXPLAIN to determine which tables are accessed.
*
*
* @param sql The SQL query to analyze
* @param parameters Optional parameters for the SQL query
* @param options Optional watch options that may contain explicit table list
Expand Down Expand Up @@ -1077,7 +1075,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*
* This is preferred over {@link watchWithAsyncGenerator} when multiple queries need to be performed
* together when data is changed.
*
*
* Note: do not declare this as `async *onChange` as it will not work in React Native.
*
* @param options Options for configuring watch behavior
Expand Down
19 changes: 17 additions & 2 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,23 @@ export type AbstractRemoteOptions = {
* Binding should be done before passing here.
*/
fetchImplementation: FetchImplementation | FetchImplementationProvider;

/**
* Optional options to pass directly to all `fetch` calls.
*
* This can include fields such as `dispatcher` (e.g. for proxy support),
* `cache`, or any other fetch-compatible options.
*/
fetchOptions?: {};
};

export const DEFAULT_REMOTE_OPTIONS: AbstractRemoteOptions = {
socketUrlTransformer: (url) =>
url.replace(/^https?:\/\//, function (match) {
return match === 'https://' ? 'wss://' : 'ws://';
}),
fetchImplementation: new FetchImplementationProvider()
fetchImplementation: new FetchImplementationProvider(),
fetchOptions: {}
};

export abstract class AbstractRemote {
Expand Down Expand Up @@ -231,6 +240,10 @@ export abstract class AbstractRemote {
*/
abstract getBSON(): Promise<BSONImplementation>;

protected createSocket(url: string): WebSocket {
return new WebSocket(url);
}

/**
* Connects to the sync/stream websocket endpoint
*/
Expand All @@ -249,7 +262,8 @@ export abstract class AbstractRemote {

const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url: this.options.socketUrlTransformer(request.url)
url: this.options.socketUrlTransformer(request.url),
wsCreator: (url) => this.createSocket(url)
}),
setup: {
keepAlive: KEEP_ALIVE_MS,
Expand Down Expand Up @@ -421,6 +435,7 @@ export abstract class AbstractRemote {
body: JSON.stringify(data),
signal: controller.signal,
cache: 'no-store',
...(this.options.fetchOptions ?? {}),
...options.fetchOptions
}).catch((ex) => {
if (ex.name == 'AbortError') {
Expand Down
16 changes: 16 additions & 0 deletions packages/node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ contains everything you need to know to get started implementing PowerSync in yo

A simple example using `@powersync/node` is available in the [`demos/example-node/`](../demos/example-node) directory.

# Proxy Support

This SDK supports HTTP, HTTPS, and WebSocket proxies via environment variables.

## HTTP Connection Method

Internally we probe the http environment variables and apply it to fetch requests ([undici](https://www.npmjs.com/package/undici/v/5.6.0))

- Set the `HTTPS_PROXY` or `HTTP_PROXY` environment variable to automatically route HTTP requests through a proxy.

## WEB Socket Connection Method

Internally the [proxy-agent](https://www.npmjs.com/package/proxy-agent) dependency for WebSocket proxies, which has its own internal code for automatically picking up the appropriate environment variables:

- Set the `WS_PROXY` or `WSS_PROXY` environment variable to route the webocket connections through a proxy.

# Found a bug or need help?

- Join our [Discord server](https://discord.gg/powersync) where you can browse topics from our community, ask questions, share feedback, or just say hello :)
Expand Down
7 changes: 5 additions & 2 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@
"@powersync/common": "workspace:*",
"async-lock": "^1.4.0",
"bson": "^6.6.0",
"comlink": "^4.4.2"
"comlink": "^4.4.2",
"proxy-agent": "^6.5.0",
"undici": "^7.8.0",
"ws": "^8.18.1"
},
"devDependencies": {
"@powersync/drizzle-driver": "workspace:*",
"@types/async-lock": "^1.4.0",
"drizzle-orm": "^0.35.2",
"@powersync/drizzle-driver": "workspace:*",
"rollup": "4.14.3",
"typescript": "^5.5.3",
"vitest": "^3.0.5"
Expand Down
26 changes: 24 additions & 2 deletions packages/node/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import {
AbstractPowerSyncDatabase,
AbstractStreamingSyncImplementation,
AdditionalConnectionOptions,
BucketStorageAdapter,
DBAdapter,
PowerSyncBackendConnector,
PowerSyncConnectionOptions,
PowerSyncDatabaseOptions,
PowerSyncDatabaseOptionsWithSettings,
RequiredAdditionalConnectionOptions,
SqliteBucketStorage,
SQLOpenFactory
} from '@powersync/common';
Expand All @@ -15,11 +18,22 @@ import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyn

import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
import { NodeSQLOpenOptions } from './options.js';
import { Dispatcher } from 'undici';

export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
database: DBAdapter | SQLOpenFactory | NodeSQLOpenOptions;
};

export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & {
/**
* Optional custom dispatcher for HTTP connections (e.g. using undici).
* Only used when the connection method is SyncStreamConnectionMethod.HTTP
*/
dispatcher?: Dispatcher;
};

export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAdditionalConnectionOptions;

/**
* A PowerSync database which provides SQLite functionality
* which is automatically synced.
Expand Down Expand Up @@ -54,10 +68,18 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
}

connect(
connector: PowerSyncBackendConnector,
options?: PowerSyncConnectionOptions & { dispatcher?: Dispatcher }
): Promise<void> {
return super.connect(connector, options);
}

protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector
connector: PowerSyncBackendConnector,
options: NodeAdditionalConnectionOptions
): AbstractStreamingSyncImplementation {
const remote = new NodeRemote(connector);
const remote = new NodeRemote(connector, this.options.logger, { dispatcher: options.dispatcher });

return new NodeStreamingSyncImplementation({
adapter: this.bucketStorageAdapter,
Expand Down
27 changes: 25 additions & 2 deletions packages/node/src/sync/stream/NodeRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import {
RemoteConnector
} from '@powersync/common';
import { BSON } from 'bson';
import Agent from 'proxy-agent';
import { EnvHttpProxyAgent, Dispatcher } from 'undici';
import { WebSocket } from 'ws';

export const STREAMING_POST_TIMEOUT_MS = 30_000;

Expand All @@ -21,18 +24,38 @@ class NodeFetchProvider extends FetchImplementationProvider {
}
}

export type NodeRemoteOptions = AbstractRemoteOptions & {
dispatcher?: Dispatcher;
};

export class NodeRemote extends AbstractRemote {
constructor(
protected connector: RemoteConnector,
protected logger: ILogger = DEFAULT_REMOTE_LOGGER,
options?: Partial<AbstractRemoteOptions>
options?: Partial<NodeRemoteOptions>
) {
// EnvHttpProxyAgent automatically uses relevant env vars for HTTP
const dispatcher = options?.dispatcher ?? new EnvHttpProxyAgent();

super(connector, logger, {
...(options ?? {}),
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider()
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider(),
fetchOptions: {
dispatcher
}
});
}

protected createSocket(url: string): globalThis.WebSocket {
return new WebSocket(url, {
// Automatically uses relevant env vars for web sockets
agent: new Agent.ProxyAgent(),
headers: {
'User-Agent': this.getUserAgent()
}
}) as any as globalThis.WebSocket; // This is compatible in Node environments
}

getUserAgent(): string {
return [
super.getUserAgent(),
Expand Down
Loading