Skip to content

Commit

Permalink
refactor(zero-cache): allow reusing connection for syncer and mutator
Browse files Browse the repository at this point in the history
The mutator/pusher will have its own websocket connection. Make the `Connection` class reusable by
parameterizing it with a `messageHandler`
  • Loading branch information
tantaman committed Feb 13, 2025
1 parent c586da6 commit 6505299
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 125 deletions.
164 changes: 48 additions & 116 deletions packages/zero-cache/src/workers/connection.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import {trace} from '@opentelemetry/api';
import {Lock} from '@rocicorp/lock';
import type {LogContext} from '@rocicorp/logger';
import type {JWTPayload} from 'jose';
import type {CloseEvent, Data, ErrorEvent} from 'ws';
import WebSocket from 'ws';
import {startAsyncSpan, startSpan} from '../../../otel/src/span.ts';
import {version} from '../../../otel/src/version.ts';
import {unreachable} from '../../../shared/src/asserts.ts';
import * as valita from '../../../shared/src/valita.ts';
import type {ConnectedMessage} from '../../../zero-protocol/src/connect.ts';
import type {Downstream} from '../../../zero-protocol/src/down.ts';
Expand All @@ -17,19 +11,32 @@ import {
MIN_SERVER_SUPPORTED_SYNC_PROTOCOL,
PROTOCOL_VERSION,
} from '../../../zero-protocol/src/protocol-version.ts';
import {upstreamSchema} from '../../../zero-protocol/src/up.ts';
import {upstreamSchema, type Upstream} from '../../../zero-protocol/src/up.ts';
import type {ConnectParams} from '../services/dispatcher/connect-params.ts';
import type {Mutagen} from '../services/mutagen/mutagen.ts';
import type {
SyncContext,
TokenData,
ViewSyncer,
} from '../services/view-syncer/view-syncer.ts';
import {findErrorForClient, getLogLevel} from '../types/error-for-client.ts';
import type {Source} from '../types/streams.ts';
import type {Pusher} from '../services/mutagen/pusher.ts';
import {assert} from '../../../shared/src/asserts.ts';

const tracer = trace.getTracer('syncer-ws-server', version);
export type HandlerResult =
| {
type: 'ok';
}
| {
type: 'fatal';
error: ErrorBody;
}
| {
type: 'transient';
errors: ErrorBody[];
}
| {
type: 'stream';
stream: Source<Downstream>;
};

export interface MessageHandler {
handleMessage(msg: Upstream): Promise<HandlerResult>;
}

/**
* Represents a connection between the client and server.
Expand All @@ -43,65 +50,34 @@ export class Connection {
readonly #ws: WebSocket;
readonly #wsID: string;
readonly #protocolVersion: number;
readonly #clientGroupID: string;
readonly #syncContext: SyncContext;
readonly #lc: LogContext;
readonly #onClose: () => void;
readonly #pusher: Pusher | undefined;
readonly #token: string | undefined;

readonly #viewSyncer: ViewSyncer;
readonly #mutagen: Mutagen;
readonly #mutationLock = new Lock();
readonly #authData: JWTPayload | undefined;
readonly #messageHandler: MessageHandler;

#outboundStream: Source<Downstream> | undefined;
#closed = false;

constructor(
lc: LogContext,
tokenData: TokenData | undefined,
viewSyncer: ViewSyncer,
mutagen: Mutagen,
pusher: Pusher | undefined,
connectParams: ConnectParams,
ws: WebSocket,
messageHandler: MessageHandler,
onClose: () => void,
) {
const {
clientGroupID,
clientID,
wsID,
baseCookie,
protocolVersion,
schemaVersion,
} = connectParams;
const {clientGroupID, clientID, wsID, protocolVersion} = connectParams;
this.#messageHandler = messageHandler;

this.#ws = ws;
this.#authData = tokenData?.decoded;
this.#token = tokenData?.raw;
this.#wsID = wsID;
this.#protocolVersion = protocolVersion;
this.#clientGroupID = clientGroupID;
this.#syncContext = {
clientID,
wsID,
baseCookie,
protocolVersion,
schemaVersion,
tokenData,
};

this.#lc = lc
.withContext('connection')
.withContext('clientID', clientID)
.withContext('clientGroupID', clientGroupID)
.withContext('wsID', wsID);
this.#onClose = onClose;

this.#viewSyncer = viewSyncer;
this.#pusher = pusher;
this.#mutagen = mutagen;

this.#ws.addEventListener('message', this.#handleMessage);
this.#ws.addEventListener('close', this.#handleClose);
this.#ws.addEventListener('error', this.#handleError);
Expand Down Expand Up @@ -161,9 +137,7 @@ export class Connection {
}

#handleMessage = async (event: {data: Data}) => {
const lc = this.#lc;
const data = event.data.toString();
const viewSyncer = this.#viewSyncer;
if (this.#closed) {
this.#lc.debug?.('Ignoring message received after closed', data);
return;
Expand All @@ -181,76 +155,34 @@ export class Connection {
);
return;
}

try {
const msgType = msg[0];
switch (msgType) {
case 'ping':
this.send(['pong', {}] satisfies PongMessage);
break;
case 'push': {
await startAsyncSpan(tracer, 'connection.push', async () => {
const {clientGroupID, mutations, schemaVersion} = msg[1];
if (clientGroupID !== this.#clientGroupID) {
this.#closeWithError({
kind: ErrorKind.InvalidPush,
message:
`clientGroupID in mutation "${clientGroupID}" does not match ` +
`clientGroupID of connection "${this.#clientGroupID}`,
});
}

if (this.#pusher) {
this.#pusher.enqueuePush(msg[1], this.#token);
// We do not call mutagen since if a pusher is set
// the precludes crud mutators.
// We'll be removing crud mutators when we release custom mutators.
return;
}

// Hold a connection-level lock while processing mutations so that:
// 1. Mutations are processed in the order in which they are received and
// 2. A single view syncer connection cannot hog multiple upstream connections.
await this.#mutationLock.withLock(async () => {
for (const mutation of mutations) {
const maybeError = await this.#mutagen.processMutation(
mutation,
this.#authData,
schemaVersion,
);
if (maybeError !== undefined) {
this.sendError({kind: maybeError[0], message: maybeError[1]});
}
}
});
});
break;
}
case 'pull':
lc.error?.('TODO: implement pull');
break;
case 'changeDesiredQueries':
await startAsyncSpan(tracer, 'connection.changeDesiredQueries', () =>
viewSyncer.changeDesiredQueries(this.#syncContext, msg),
);
break;
case 'deleteClients':
await startAsyncSpan(tracer, 'connection.deleteClients', () =>
viewSyncer.deleteClients(this.#syncContext, msg),
);
if (msgType === 'ping') {
this.send(['pong', {}] satisfies PongMessage);
return;
}
const result = await this.#messageHandler.handleMessage(msg);
switch (result.type) {
case 'fatal':
this.#closeWithError(result.error);
break;
case 'ok':
break;
case 'initConnection': {
// TODO (mlaw): tell mutagens about the new token too
this.#outboundStream = startSpan(
tracer,
'connection.initConnection',
() => viewSyncer.initConnection(this.#syncContext, msg),
case 'stream': {
assert(
this.#outboundStream === undefined,
'Outbound stream already set for this connection!',
);
void this.#proxyOutbound(this.#outboundStream);
this.#outboundStream = result.stream;
void this.#proxyOutbound(result.stream);
break;
}
default:
unreachable(msgType);
case 'transient': {
for (const error of result.errors) {
this.sendError(error);
}
}
}
} catch (e) {
this.#closeWithThrown(e);
Expand Down
160 changes: 160 additions & 0 deletions packages/zero-cache/src/workers/syncer-ws-message-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import {trace} from '@opentelemetry/api';
import {Lock} from '@rocicorp/lock';
import type {Mutagen} from '../services/mutagen/mutagen.ts';
import type {
SyncContext,
TokenData,
ViewSyncer,
} from '../services/view-syncer/view-syncer.ts';
import type {HandlerResult, MessageHandler} from './connection.ts';
import {version} from '../../../otel/src/version.ts';
import * as ErrorKind from '../../../zero-protocol/src/error-kind-enum.ts';
import type {Upstream} from '../../../zero-protocol/src/up.ts';
import {startAsyncSpan, startSpan} from '../../../otel/src/span.ts';
import {unreachable} from '../../../shared/src/asserts.ts';
import type {LogContext} from '@rocicorp/logger';
import type {JWTPayload} from 'jose';
import type {ErrorBody} from '../../../zero-protocol/src/error.ts';
import type {ConnectParams} from '../services/dispatcher/connect-params.ts';
import type {Pusher} from '../services/mutagen/pusher.ts';

const tracer = trace.getTracer('syncer-ws-server', version);

export class SyncerWsMessageHandler implements MessageHandler {
readonly #viewSyncer: ViewSyncer;
readonly #mutagen: Mutagen;
readonly #mutationLock: Lock;
readonly #lc: LogContext;
readonly #authData: JWTPayload | undefined;
readonly #clientGroupID: string;
readonly #syncContext: SyncContext;
readonly #pusher: Pusher | undefined;
readonly #token: string | undefined;

constructor(
lc: LogContext,
connectParams: ConnectParams,
tokenData: TokenData | undefined,
viewSyncer: ViewSyncer,
mutagen: Mutagen,
pusher: Pusher | undefined,
) {
const {
clientGroupID,
clientID,
wsID,
baseCookie,
protocolVersion,
schemaVersion,
} = connectParams;
this.#viewSyncer = viewSyncer;
this.#mutagen = mutagen;
this.#mutationLock = new Lock();
this.#lc = lc
.withContext('connection')
.withContext('clientID', clientID)
.withContext('clientGroupID', clientGroupID)
.withContext('wsID', wsID);
this.#authData = tokenData?.decoded;
this.#clientGroupID = clientGroupID;
this.#pusher = pusher;
this.#syncContext = {
clientID,
wsID,
baseCookie,
protocolVersion,
schemaVersion,
tokenData,
};
}

async handleMessage(msg: Upstream): Promise<HandlerResult> {
const lc = this.#lc;
const msgType = msg[0];
const viewSyncer = this.#viewSyncer;
switch (msgType) {
case 'ping':
lc.error?.('Pull is not supported by Zero');
break;
case 'pull':
lc.error?.('Pull is not supported by Zero');
break;
case 'push': {
return startAsyncSpan<HandlerResult>(
tracer,
'connection.push',
async () => {
const {clientGroupID, mutations, schemaVersion} = msg[1];
if (clientGroupID !== this.#clientGroupID) {
return {
type: 'fatal',
error: {
kind: ErrorKind.InvalidPush,
message:
`clientGroupID in mutation "${clientGroupID}" does not match ` +
`clientGroupID of connection "${this.#clientGroupID}`,
},
} satisfies HandlerResult;
}

if (this.#pusher) {
this.#pusher.enqueuePush(msg[1], this.#token);
// We do not call mutagen since if a pusher is set
// the precludes crud mutators.
// We'll be removing crud mutators when we release custom mutators.
return {type: 'ok'} satisfies HandlerResult;
}

// Hold a connection-level lock while processing mutations so that:
// 1. Mutations are processed in the order in which they are received and
// 2. A single view syncer connection cannot hog multiple upstream connections.
const ret = await this.#mutationLock.withLock(async () => {
const errors: ErrorBody[] = [];
for (const mutation of mutations) {
const maybeError = await this.#mutagen.processMutation(
mutation,
this.#authData,
schemaVersion,
);
if (maybeError !== undefined) {
errors.push({kind: maybeError[0], message: maybeError[1]});
}
}
if (errors.length > 0) {
return {type: 'transient', errors} satisfies HandlerResult;
}
return {type: 'ok'} satisfies HandlerResult;
});
return ret;
},
);
}
case 'changeDesiredQueries':
await startAsyncSpan(tracer, 'connection.changeDesiredQueries', () =>
viewSyncer.changeDesiredQueries(this.#syncContext, msg),
);
break;
case 'deleteClients':
await startAsyncSpan(tracer, 'connection.deleteClients', () =>
viewSyncer.deleteClients(this.#syncContext, msg),
);
break;
case 'initConnection': {
// TODO (mlaw): tell mutagens about the new token too
const stream = await startSpan(
tracer,
'connection.initConnection',
() => viewSyncer.initConnection(this.#syncContext, msg),
);
return {
type: 'stream',
stream,
};
}
default:
unreachable(msgType);
}

return {type: 'ok'};
}
}
Loading

0 comments on commit 6505299

Please sign in to comment.