Skip to content

Commit d025bfd

Browse files
authored
Merge pull request #256 from powersync-ja/rsocket-allow-json-requests
Allow JSON request payload for RSocket
2 parents 05b9593 + a602fb2 commit d025bfd

File tree

8 files changed

+164
-31
lines changed

8 files changed

+164
-31
lines changed

.changeset/happy-taxis-lick.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-image': minor
3+
---
4+
5+
Support WebSocket requests to be encoded as JSON, which will enable more SDKs to use WebSockets as a transport protocol when receiving sync lines.

.changeset/wild-maps-brake.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-rsocket-router': patch
3+
'@powersync/service-errors': patch
4+
'@powersync/service-core': minor
5+
---
6+
7+
Allow RSocket request payload to be encoded as JSON

packages/rsocket-router/src/router/ReactiveSocketRouter.ts

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ import {
1818
SocketResponder
1919
} from './types.js';
2020

21+
export interface ReactiveStreamRequest {
22+
payload: Payload;
23+
metadataMimeType: string;
24+
dataMimeType: string;
25+
initialN: number;
26+
responder: SocketResponder;
27+
}
28+
2129
export class ReactiveSocketRouter<C> {
2230
constructor(protected options?: ReactiveSocketRouterOptions<C>) {}
2331

@@ -75,7 +83,13 @@ export class ReactiveSocketRouter<C> {
7583
throw new errors.AuthorizationError(ErrorCode.PSYNC_S2101, 'No context meta data provided');
7684
}
7785

78-
const context = await params.contextProvider(payload.metadata!);
86+
const metadataMimeType = payload.metadataMimeType;
87+
const dataMimeType = payload.dataMimeType;
88+
89+
const context = await params.contextProvider({
90+
mimeType: payload.metadataMimeType,
91+
contents: payload.metadata!
92+
});
7993

8094
return {
8195
// RequestStream is currently the only supported connection type
@@ -84,13 +98,17 @@ export class ReactiveSocketRouter<C> {
8498
const abortController = new AbortController();
8599

86100
// TODO: Consider limiting the number of active streams per connection to prevent abuse
87-
handleReactiveStream(context, { payload, initialN, responder }, observer, abortController, params).catch(
88-
(ex) => {
89-
logger.error(ex);
90-
responder.onError(ex);
91-
responder.onComplete();
92-
}
93-
);
101+
handleReactiveStream(
102+
context,
103+
{ payload, initialN, responder, dataMimeType, metadataMimeType },
104+
observer,
105+
abortController,
106+
params
107+
).catch((ex) => {
108+
logger.error(ex);
109+
responder.onError(ex);
110+
responder.onComplete();
111+
});
94112
return {
95113
cancel: () => {
96114
abortController.abort();
@@ -115,11 +133,7 @@ export class ReactiveSocketRouter<C> {
115133

116134
export async function handleReactiveStream<Context>(
117135
context: Context,
118-
request: {
119-
payload: Payload;
120-
initialN: number;
121-
responder: SocketResponder;
122-
},
136+
request: ReactiveStreamRequest,
123137
observer: SocketRouterObserver,
124138
abortController: AbortController,
125139
params: CommonParams<Context>
@@ -137,7 +151,10 @@ export async function handleReactiveStream<Context>(
137151
return exitWithError(new errors.ValidationError('Metadata is not provided'));
138152
}
139153

140-
const meta = await params.metaDecoder(metadata);
154+
const meta = await params.metaDecoder({
155+
mimeType: request.metadataMimeType,
156+
contents: metadata
157+
});
141158

142159
const { path } = meta;
143160

@@ -148,7 +165,14 @@ export async function handleReactiveStream<Context>(
148165
}
149166

150167
const { handler, authorize, validator, decoder = params.payloadDecoder } = route;
151-
const requestPayload = await decoder(payload.data || undefined);
168+
const requestPayload = await decoder(
169+
payload.data
170+
? {
171+
contents: payload.data,
172+
mimeType: request.dataMimeType
173+
}
174+
: undefined
175+
);
152176

153177
if (validator) {
154178
const isValid = validator.validate(requestPayload);

packages/rsocket-router/src/router/types.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,24 @@ export type IReactiveStream<I = any, O = any, C = any> = Omit<
4747
* Decodes raw payload buffer to [I].
4848
* Falls back to router level decoder if not specified.
4949
*/
50-
decoder?: (rawData?: Buffer) => Promise<I>;
50+
decoder?: (rawData?: TypedBuffer) => Promise<I>;
5151
};
5252

53+
/**
54+
* A {@link Buffer} with an associated mimeType inferred from the RSocket `SETUP` frame.
55+
*/
56+
export interface TypedBuffer {
57+
mimeType: string;
58+
contents: Buffer;
59+
}
60+
5361
export type IReactiveStreamInput<I, O, C> = Omit<IReactiveStream<I, O, C>, 'path' | 'type' | 'method'>;
5462

5563
export type ReactiveEndpoint = IReactiveStream;
5664

5765
export type CommonParams<C> = {
5866
endpoints: Array<ReactiveEndpoint>;
59-
contextProvider: (metaData: Buffer) => Promise<C>;
60-
metaDecoder: (meta: Buffer) => Promise<RequestMeta>;
61-
payloadDecoder: (rawData?: Buffer) => Promise<any>;
67+
contextProvider: (metaData: TypedBuffer) => Promise<C>;
68+
metaDecoder: (meta: TypedBuffer) => Promise<RequestMeta>;
69+
payloadDecoder: (rawData?: TypedBuffer) => Promise<any>;
6270
};

packages/rsocket-router/tests/src/requests.test.ts

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { describe, expect, it, vi } from 'vitest';
22
import { createMockObserver, createMockResponder } from './utils/mock-responder.js';
3-
import { handleReactiveStream } from '../../src/router/ReactiveSocketRouter.js';
3+
import { handleReactiveStream, ReactiveStreamRequest } from '../../src/router/ReactiveSocketRouter.js';
44
import { deserialize, serialize } from 'bson';
55
import { RS_ENDPOINT_TYPE, ReactiveEndpoint, RequestMeta, SocketResponder } from '../../src/router/types.js';
6-
import { ErrorCode } from '@powersync/lib-services-framework';
6+
import { EndpointHandlerPayload, ErrorCode } from '@powersync/lib-services-framework';
77

88
/**
99
* Mocks the process of handling reactive routes
@@ -12,7 +12,12 @@ import { ErrorCode } from '@powersync/lib-services-framework';
1212
* @param responder a mock responder
1313
* @returns
1414
*/
15-
async function handleRoute(path: string, endpoints: ReactiveEndpoint[], responder: SocketResponder) {
15+
async function handleRoute(
16+
path: string,
17+
endpoints: ReactiveEndpoint[],
18+
responder: SocketResponder,
19+
request?: Partial<ReactiveStreamRequest>
20+
) {
1621
return handleReactiveStream<{}>(
1722
{},
1823
{
@@ -21,15 +26,18 @@ async function handleRoute(path: string, endpoints: ReactiveEndpoint[], responde
2126
metadata: Buffer.from(serialize({ path }))
2227
},
2328
initialN: 1,
24-
responder
29+
dataMimeType: 'application/bson',
30+
metadataMimeType: 'application/bson',
31+
responder,
32+
...request
2533
},
2634
createMockObserver(),
2735
new AbortController(),
2836
{
2937
contextProvider: async () => ({}),
3038
endpoints,
31-
metaDecoder: async (buffer) => deserialize(buffer) as RequestMeta,
32-
payloadDecoder: async (buffer) => buffer && deserialize(buffer)
39+
metaDecoder: async (buffer) => deserialize(buffer.contents) as RequestMeta,
40+
payloadDecoder: async (buffer) => buffer && deserialize(buffer.contents)
3341
}
3442
);
3543
}
@@ -133,4 +141,53 @@ describe('Requests', () => {
133141
// Should be a validation error
134142
expect(JSON.stringify(spy.mock.calls[0])).includes(ErrorCode.PSYNC_S2002);
135143
});
144+
145+
it('should forward mime types', async () => {
146+
const encoder = new TextEncoder();
147+
const decoder = new TextDecoder();
148+
const responder = createMockResponder();
149+
const encodeJson = (value: any) => encoder.encode(JSON.stringify(value));
150+
const path = '/test-route';
151+
152+
const fn = vi.fn(async (p: EndpointHandlerPayload<any, any>) => {
153+
expect(p.params).toStrictEqual({ hello: 'world' });
154+
return undefined;
155+
});
156+
157+
await handleReactiveStream<{}>(
158+
{},
159+
{
160+
payload: {
161+
data: Buffer.from(encodeJson({ hello: 'world' })),
162+
metadata: Buffer.from(encodeJson({ path }))
163+
},
164+
metadataMimeType: 'application/json',
165+
dataMimeType: 'application/json',
166+
initialN: 1,
167+
responder
168+
},
169+
createMockObserver(),
170+
new AbortController(),
171+
{
172+
contextProvider: async () => ({}),
173+
endpoints: [
174+
{
175+
path,
176+
type: RS_ENDPOINT_TYPE.STREAM,
177+
handler: fn
178+
}
179+
],
180+
metaDecoder: async (buffer) => {
181+
expect(buffer.mimeType, 'application/json');
182+
return JSON.parse(decoder.decode(buffer.contents));
183+
},
184+
payloadDecoder: async (buffer) => {
185+
expect(buffer!.mimeType, 'application/json');
186+
return JSON.parse(decoder.decode(buffer!.contents));
187+
}
188+
}
189+
);
190+
191+
expect(fn).toHaveBeenCalled();
192+
});
136193
});

packages/service-core/src/routes/configure-rsocket.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { deserialize } from 'bson';
22
import * as http from 'http';
33

44
import { ErrorCode, errors, logger } from '@powersync/lib-services-framework';
5-
import { ReactiveSocketRouter, RSocketRequestMeta } from '@powersync/service-rsocket-router';
5+
import { ReactiveSocketRouter, RSocketRequestMeta, TypedBuffer } from '@powersync/service-rsocket-router';
66

77
import { ServiceContext } from '../system/ServiceContext.js';
88
import { generateContext, getTokenFromHeader } from './auth.js';
@@ -22,8 +22,8 @@ export function configureRSocket(router: ReactiveSocketRouter<Context>, options:
2222
const { route_generators = DEFAULT_SOCKET_ROUTES, server, service_context } = options;
2323

2424
router.applyWebSocketEndpoints(server, {
25-
contextProvider: async (data: Buffer): Promise<Context & { token: string }> => {
26-
const { token, user_agent } = RSocketContextMeta.decode(deserialize(data) as any);
25+
contextProvider: async (data: TypedBuffer): Promise<Context & { token: string }> => {
26+
const { token, user_agent } = RSocketContextMeta.decode(decodeTyped(data) as any);
2727

2828
if (!token) {
2929
throw new errors.AuthorizationError(ErrorCode.PSYNC_S2106, 'No token provided');
@@ -58,9 +58,21 @@ export function configureRSocket(router: ReactiveSocketRouter<Context>, options:
5858
}
5959
},
6060
endpoints: route_generators.map((generator) => generator(router)),
61-
metaDecoder: async (meta: Buffer) => {
62-
return RSocketRequestMeta.decode(deserialize(meta) as any);
61+
metaDecoder: async (meta: TypedBuffer) => {
62+
return RSocketRequestMeta.decode(decodeTyped(meta) as any);
6363
},
64-
payloadDecoder: async (rawData?: Buffer) => rawData && deserialize(rawData)
64+
payloadDecoder: async (rawData?: TypedBuffer) => rawData && decodeTyped(rawData)
6565
});
6666
}
67+
68+
function decodeTyped(data: TypedBuffer) {
69+
switch (data.mimeType) {
70+
case 'application/json':
71+
const decoder = new TextDecoder();
72+
return JSON.parse(decoder.decode(data.contents));
73+
case 'application/bson':
74+
return deserialize(data.contents);
75+
}
76+
77+
throw new errors.UnsupportedMediaType(`Expected JSON or BSON request, got ${data.mimeType}`);
78+
}

packages/service-errors/src/codes.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,13 @@ export enum ErrorCode {
297297
*/
298298
PSYNC_S2003 = 'PSYNC_S2003',
299299

300+
/**
301+
* 415 unsupported media type.
302+
*
303+
* This code always indicates an issue with the client.
304+
*/
305+
PSYNC_S2004 = 'PSYNC_S2004',
306+
300307
// ## PSYNC_S21xx: Auth errors originating on the client.
301308
//
302309
// This does not include auth configuration errors on the service.

packages/service-errors/src/errors.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,19 @@ export class ReplicationAbortedError extends ServiceError {
159159
}
160160
}
161161

162+
export class UnsupportedMediaType extends ServiceError {
163+
static readonly CODE = ErrorCode.PSYNC_S2004;
164+
165+
constructor(errors: any) {
166+
super({
167+
code: UnsupportedMediaType.CODE,
168+
status: 415,
169+
description: 'Unsupported Media Type',
170+
details: errors
171+
});
172+
}
173+
}
174+
162175
export class AuthorizationError extends ServiceError {
163176
/**
164177
* String describing the token. Does not contain the full token, but may help with debugging.

0 commit comments

Comments
 (0)