From bfb42e4a2b29726c2484d4a8729983a51e8fad7b Mon Sep 17 00:00:00 2001 From: Liangying Wei Date: Wed, 26 Nov 2025 15:19:31 +1100 Subject: [PATCH] Add invoke support --- sdk/web-pubsub/web-pubsub-client/README.md | 15 ++ .../review/web-pubsub-client-node.api.md | 92 +++++++++- .../web-pubsub-client/src/errors/index.ts | 28 ++- .../src/invocationManager.ts | 161 ++++++++++++++++++ .../web-pubsub-client/src/models/index.ts | 42 ++++- .../web-pubsub-client/src/models/messages.ts | 70 +++++++- .../src/protocols/jsonProtocolBase.ts | 88 ++++++++++ .../web-pubsub-client/src/webPubSubClient.ts | 139 ++++++++++++++- .../test/client.invoke.spec.ts | 133 +++++++++++++++ .../test/invocationManager.spec.ts | 65 +++++++ 10 files changed, 821 insertions(+), 12 deletions(-) create mode 100644 sdk/web-pubsub/web-pubsub-client/src/invocationManager.ts create mode 100644 sdk/web-pubsub/web-pubsub-client/test/client.invoke.spec.ts create mode 100644 sdk/web-pubsub/web-pubsub-client/test/invocationManager.spec.ts diff --git a/sdk/web-pubsub/web-pubsub-client/README.md b/sdk/web-pubsub/web-pubsub-client/README.md index 1ef8a23b8efb..673e1fd638fe 100644 --- a/sdk/web-pubsub/web-pubsub-client/README.md +++ b/sdk/web-pubsub/web-pubsub-client/README.md @@ -91,6 +91,21 @@ await client.joinGroup(groupName); await client.sendToGroup(groupName, "hello world", "text"); ``` +### 5. Invoke upstream events (preview) + +```ts +import { WebPubSubClient } from "@azure/web-pubsub-client"; + +const client = new WebPubSubClient(""); +await client.start(); + +const result = await client.invokeEvent("processOrder", { orderId: 1 }, "json"); +console.log(`Invocation result: ${JSON.stringify(result.data)}`); +``` + +`invokeEvent` sends an `invoke` request to the service, awaits the correlated `invokeResponse`, and returns the payload. You can abort the invocation by passing `{ abortSignal }`. +_Streaming and service-initiated invocations are not yet supported._ + --- ## Examples diff --git a/sdk/web-pubsub/web-pubsub-client/review/web-pubsub-client-node.api.md b/sdk/web-pubsub/web-pubsub-client/review/web-pubsub-client-node.api.md index e20248866b8d..52c2817ac0e0 100644 --- a/sdk/web-pubsub/web-pubsub-client/review/web-pubsub-client-node.api.md +++ b/sdk/web-pubsub/web-pubsub-client/review/web-pubsub-client-node.api.md @@ -20,6 +20,14 @@ export interface AckMessageError { name: string; } +// @public +export interface CancelInvocationMessage extends WebPubSubMessageBase { + // (undocumented) + invocationId: string; + // (undocumented) + readonly kind: "cancelInvocation"; +} + // @public export interface ConnectedMessage extends WebPubSubMessageBase { connectionId: string; @@ -55,7 +63,11 @@ export type DownstreamMessageType = /** * Type for ServerDataMessage */ -| "serverData"; +| "serverData" +/** +* Type for InvokeResponseMessage +*/ +| "invokeResponse"; // @public export interface GetClientAccessUrlOptions { @@ -72,6 +84,71 @@ export interface GroupDataMessage extends WebPubSubMessageBase { sequenceId?: number; } +// @public +export class InvocationError extends Error { + constructor(message: string, options: InvocationErrorOptions); + errorDetail?: InvokeResponseError; + invocationId: string; +} + +// @public (undocumented) +export interface InvocationErrorOptions { + // (undocumented) + errorDetail?: InvokeResponseError; + // (undocumented) + invocationId: string; +} + +// @public +export interface InvokeEventOptions { + abortSignal?: AbortSignalLike; + invocationId?: string; +} + +// @public +export interface InvokeEventResult { + data?: JSONTypes | ArrayBuffer; + dataType?: WebPubSubDataType; + invocationId: string; +} + +// @public +export interface InvokeMessage extends WebPubSubMessageBase { + data?: JSONTypes | ArrayBuffer; + dataType?: WebPubSubDataType; + event?: string; + // (undocumented) + invocationId: string; + // (undocumented) + readonly kind: "invoke"; + // (undocumented) + target?: "event"; +} + +// @public +export interface InvokeResponseError { + // (undocumented) + message: string; + // (undocumented) + name: string; +} + +// @public +export interface InvokeResponseMessage extends WebPubSubMessageBase { + // (undocumented) + data?: JSONTypes | ArrayBuffer; + // (undocumented) + dataType?: WebPubSubDataType; + // (undocumented) + error?: InvokeResponseError; + // (undocumented) + invocationId: string; + // (undocumented) + readonly kind: "invokeResponse"; + // (undocumented) + success?: boolean; +} + // @public export interface JoinGroupMessage extends WebPubSubMessageBase { ackId?: number; @@ -224,12 +301,21 @@ export type UpstreamMessageType = /** * Type for SequenceAckMessage */ -| "sequenceAck"; +| "sequenceAck" +/** +* Type for InvokeMessage +*/ +| "invoke" +/** +* Type for CancelInvocationMessage +*/ +| "cancelInvocation"; // @public export class WebPubSubClient { constructor(clientAccessUrl: string, options?: WebPubSubClientOptions); constructor(credential: WebPubSubClientCredential, options?: WebPubSubClientOptions); + invokeEvent(eventName: string, content: JSONTypes | ArrayBuffer, dataType: WebPubSubDataType, options?: InvokeEventOptions): Promise; joinGroup(groupName: string, options?: JoinGroupOptions): Promise; leaveGroup(groupName: string, options?: LeaveGroupOptions): Promise; off(event: "connected", listener: (e: OnConnectedArgs) => void): void; @@ -298,7 +384,7 @@ export const WebPubSubJsonProtocol: () => WebPubSubClientProtocol; export const WebPubSubJsonReliableProtocol: () => WebPubSubClientProtocol; // @public -export type WebPubSubMessage = GroupDataMessage | ServerDataMessage | JoinGroupMessage | LeaveGroupMessage | ConnectedMessage | DisconnectedMessage | SendToGroupMessage | SendEventMessage | SequenceAckMessage | AckMessage; +export type WebPubSubMessage = GroupDataMessage | ServerDataMessage | JoinGroupMessage | LeaveGroupMessage | ConnectedMessage | DisconnectedMessage | SendToGroupMessage | SendEventMessage | SequenceAckMessage | AckMessage | InvokeMessage | InvokeResponseMessage | CancelInvocationMessage; // @public export interface WebPubSubMessageBase { diff --git a/sdk/web-pubsub/web-pubsub-client/src/errors/index.ts b/sdk/web-pubsub/web-pubsub-client/src/errors/index.ts index 88f7d3c28d64..5d4081faf914 100644 --- a/sdk/web-pubsub/web-pubsub-client/src/errors/index.ts +++ b/sdk/web-pubsub/web-pubsub-client/src/errors/index.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import type { AckMessageError } from "../models/messages.js"; +import type { AckMessageError, InvokeResponseError } from "../models/messages.js"; /** * Error when sending message failed @@ -42,3 +42,29 @@ export interface SendMessageErrorOptions { */ errorDetail?: AckMessageError; } + +export interface InvocationErrorOptions { + invocationId: string; + errorDetail?: InvokeResponseError; +} + +/** + * Error thrown when an invocation fails or is cancelled. + */ +export class InvocationError extends Error { + /** + * The invocation id of the request. + */ + public invocationId: string; + /** + * Error details from the service if available. + */ + public errorDetail?: InvokeResponseError; + + constructor(message: string, options: InvocationErrorOptions) { + super(message); + this.name = "InvocationError"; + this.invocationId = options.invocationId; + this.errorDetail = options.errorDetail; + } +} diff --git a/sdk/web-pubsub/web-pubsub-client/src/invocationManager.ts b/sdk/web-pubsub/web-pubsub-client/src/invocationManager.ts new file mode 100644 index 000000000000..73c544543724 --- /dev/null +++ b/sdk/web-pubsub/web-pubsub-client/src/invocationManager.ts @@ -0,0 +1,161 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import type { AbortSignalLike } from "@azure/abort-controller"; +import type { InvokeResponseMessage } from "./models/messages.js"; +import { InvocationError } from "./errors/index.js"; + +export interface InvocationWaitOptions { + abortSignal?: AbortSignalLike; +} + +export interface InvocationRegistration { + invocationId: string; + wait(options?: InvocationWaitOptions): Promise; +} + +/** + * Manages pending invocations awaiting invokeResponse frames. + */ +export class InvocationManager { + private readonly _entries = new Map(); + private _nextId = 0; + + public registerInvocation(invocationId?: string): InvocationRegistration { + const resolvedId = invocationId ?? this._generateInvocationId(); + if (this._entries.has(resolvedId)) { + throw new InvocationError("Invocation id is already registered.", { + invocationId: resolvedId, + }); + } + + const entity = new InvocationEntity(resolvedId); + this._entries.set(resolvedId, entity); + return { + invocationId: resolvedId, + wait: (options?: InvocationWaitOptions) => this._waitForEntry(entity, options), + }; + } + + public resolveInvocation(message: InvokeResponseMessage): boolean { + const entry = this._entries.get(message.invocationId); + if (!entry) { + return false; + } + this._entries.delete(message.invocationId); + entry.resolve(message); + return true; + } + + public rejectInvocation(invocationId: string, reason: unknown): boolean { + const entry = this._entries.get(invocationId); + if (!entry) { + return false; + } + this._entries.delete(invocationId); + entry.reject(reason); + return true; + } + + public discard(invocationId: string): void { + this._entries.delete(invocationId); + } + + public rejectAll(createReason: (invocationId: string) => unknown): void { + this._entries.forEach((entry, invocationId) => { + if (this._entries.delete(invocationId)) { + entry.reject(createReason(invocationId)); + } + }); + } + + private _waitForEntry( + entry: InvocationEntity, + options?: InvocationWaitOptions, + ): Promise { + const waitPromise = entry.promise(); + const abortSignal = options?.abortSignal; + + if (!abortSignal) { + return waitPromise; + } + + if (abortSignal.aborted) { + if (this._entries.delete(entry.invocationId)) { + entry.reject(this._createAbortError(entry.invocationId)); + } + return waitPromise; + } + + return new Promise((resolve, reject) => { + const onAbort = (): void => { + abortSignal.removeEventListener("abort", onAbort); + if (this._entries.delete(entry.invocationId)) { + entry.reject(this._createAbortError(entry.invocationId)); + } + }; + + abortSignal.addEventListener("abort", onAbort); + + waitPromise + .then((result) => { + abortSignal.removeEventListener("abort", onAbort); + resolve(result); + }) + .catch((err) => { + abortSignal.removeEventListener("abort", onAbort); + reject(err); + }); + }); + } + + private _generateInvocationId(): string { + this._nextId += 1; + return this._nextId.toString(); + } + + private _createAbortError(invocationId: string): InvocationError { + return new InvocationError("Invocation cancelled by abortSignal.", { + invocationId, + }); + } +} + +class InvocationEntity { + private readonly _promise: Promise; + private _resolve: + | ((value: InvokeResponseMessage | PromiseLike) => void) + | undefined; + private _reject: ((reason?: unknown) => void) | undefined; + + constructor(public readonly invocationId: string) { + this._promise = new Promise((resolve, reject) => { + this._resolve = resolve; + this._reject = reject; + }); + } + + public promise(): Promise { + return this._promise; + } + + public resolve(value: InvokeResponseMessage | PromiseLike): void { + const callback = this._resolve; + if (!callback) { + return; + } + this._resolve = undefined; + this._reject = undefined; + callback(value); + } + + public reject(reason?: unknown): void { + const callback = this._reject; + if (!callback) { + return; + } + this._resolve = undefined; + this._reject = undefined; + callback(reason); + } +} diff --git a/sdk/web-pubsub/web-pubsub-client/src/models/index.ts b/sdk/web-pubsub/web-pubsub-client/src/models/index.ts index 2f74263d2cbd..4c12ac325964 100644 --- a/sdk/web-pubsub/web-pubsub-client/src/models/index.ts +++ b/sdk/web-pubsub/web-pubsub-client/src/models/index.ts @@ -3,7 +3,13 @@ import type { AbortSignalLike } from "@azure/abort-controller"; import type { WebPubSubClientProtocol } from "../protocols/index.js"; -import type { DisconnectedMessage, GroupDataMessage, ServerDataMessage } from "./messages.js"; +import type { JSONTypes } from "../webPubSubClient.js"; +import type { + DisconnectedMessage, + GroupDataMessage, + ServerDataMessage, + WebPubSubDataType, +} from "./messages.js"; /** * The client options @@ -141,6 +147,20 @@ export interface SendEventOptions { abortSignal?: AbortSignalLike; } +/** + * Invoke event operation options + */ +export interface InvokeEventOptions { + /** + * Optional invocation identifier. If not specified, the client generates one. + */ + invocationId?: string; + /** + * Optional abort signal to cancel the invocation. + */ + abortSignal?: AbortSignalLike; +} + /** * Parameter of OnConnected callback */ @@ -172,7 +192,7 @@ export interface OnDisconnectedArgs { /** * Parameter of OnStopped callback */ -export interface OnStoppedArgs {} +export interface OnStoppedArgs { } /** * Parameter of OnDataMessage callback @@ -222,6 +242,24 @@ export interface WebPubSubResult { isDuplicated: boolean; } +/** + * Result of invokeEvent + */ +export interface InvokeEventResult { + /** + * Invocation identifier correlated with the response. + */ + invocationId: string; + /** + * The response payload data type. + */ + dataType?: WebPubSubDataType; + /** + * The response payload. + */ + data?: JSONTypes | ArrayBuffer; +} + /** * The start options */ diff --git a/sdk/web-pubsub/web-pubsub-client/src/models/messages.ts b/sdk/web-pubsub/web-pubsub-client/src/models/messages.ts index 489e90b17e78..97bcd86e5781 100644 --- a/sdk/web-pubsub/web-pubsub-client/src/models/messages.ts +++ b/sdk/web-pubsub/web-pubsub-client/src/models/messages.ts @@ -16,7 +16,10 @@ export type WebPubSubMessage = | SendToGroupMessage | SendEventMessage | SequenceAckMessage - | AckMessage; + | AckMessage + | InvokeMessage + | InvokeResponseMessage + | CancelInvocationMessage; /** * The common of web pubsub message @@ -48,7 +51,11 @@ export type DownstreamMessageType = /** * Type for ServerDataMessage */ - | "serverData"; + | "serverData" + /** + * Type for InvokeResponseMessage + */ + | "invokeResponse"; /** * Types for upstream messages @@ -73,7 +80,15 @@ export type UpstreamMessageType = /** * Type for SequenceAckMessage */ - | "sequenceAck"; + | "sequenceAck" + /** + * Type for InvokeMessage + */ + | "invoke" + /** + * Type for CancelInvocationMessage + */ + | "cancelInvocation"; /** * The ack message @@ -305,6 +320,55 @@ export interface SequenceAckMessage extends WebPubSubMessageBase { sequenceId: number; } +/** + * Invoke message + */ +export interface InvokeMessage extends WebPubSubMessageBase { + readonly kind: "invoke"; + invocationId: string; + target?: "event"; + /** + * The event name when targeting upstream events. + */ + event?: string; + /** + * Data type of the payload. + */ + dataType?: WebPubSubDataType; + /** + * Payload data. + */ + data?: JSONTypes | ArrayBuffer; +} + +/** + * Invoke response message + */ +export interface InvokeResponseMessage extends WebPubSubMessageBase { + readonly kind: "invokeResponse"; + invocationId: string; + success?: boolean; + dataType?: WebPubSubDataType; + data?: JSONTypes | ArrayBuffer; + error?: InvokeResponseError; +} + +/** + * Invoke response error details + */ +export interface InvokeResponseError { + name: string; + message: string; +} + +/** + * Cancel invocation message + */ +export interface CancelInvocationMessage extends WebPubSubMessageBase { + readonly kind: "cancelInvocation"; + invocationId: string; +} + /** * The data type */ diff --git a/sdk/web-pubsub/web-pubsub-client/src/protocols/jsonProtocolBase.ts b/sdk/web-pubsub/web-pubsub-client/src/protocols/jsonProtocolBase.ts index 7b12b62dd3f5..bdde24cac5a4 100644 --- a/sdk/web-pubsub/web-pubsub-client/src/protocols/jsonProtocolBase.ts +++ b/sdk/web-pubsub/web-pubsub-client/src/protocols/jsonProtocolBase.ts @@ -3,9 +3,11 @@ import type { AckMessage, + CancelInvocationMessage, ConnectedMessage, DisconnectedMessage, GroupDataMessage, + InvokeResponseMessage, ServerDataMessage, WebPubSubDataType, WebPubSubMessage, @@ -59,6 +61,30 @@ export function parseMessages(input: string): WebPubSubMessage | null { } } else if (typedMessage.type === "ack") { returnMessage = { ...parsedMessage, kind: "ack" } as AckMessage; + } else if (typedMessage.type === "invokeResponse") { + let data: JSONTypes | ArrayBuffer | undefined; + if (parsedMessage.dataType != null) { + const parsedData = parsePayload(parsedMessage.data, parsedMessage.dataType); + if (parsedData === null) { + return null; + } + data = parsedData; + } + + returnMessage = { + kind: "invokeResponse", + invocationId: parsedMessage.invocationId, + success: parsedMessage.success, + dataType: parsedMessage.dataType, + data, + error: parsedMessage.error, + fanout: parsedMessage.fanout, + } as InvokeResponseMessage; + } else if (typedMessage.type === "cancelInvocation") { + returnMessage = { + ...parsedMessage, + kind: "cancelInvocation", + } as CancelInvocationMessage; } else { // Forward compatible return null; @@ -102,6 +128,45 @@ export function writeMessage(message: WebPubSubMessage): string { data = { type: "sequenceAck", sequenceId: message.sequenceId } as SequenceAckData; break; } + case "invoke": { + const invokePayload: InvokeData = { + type: "invoke", + invocationId: message.invocationId, + target: message.target, + event: message.event, + }; + + if (message.dataType != null && message.data != null) { + invokePayload.dataType = message.dataType; + invokePayload.data = getPayload(message.data, message.dataType); + } + + data = invokePayload; + break; + } + case "invokeResponse": { + const invokeResponse: InvokeResponseData = { + type: "invokeResponse", + invocationId: message.invocationId, + success: message.success, + error: message.error, + }; + + if (message.dataType != null && message.data != null) { + invokeResponse.dataType = message.dataType; + invokeResponse.data = getPayload(message.data, message.dataType); + } + + data = invokeResponse; + break; + } + case "cancelInvocation": { + data = { + type: "cancelInvocation", + invocationId: message.invocationId, + } as CancelInvocationData; + break; + } default: { throw new Error(`Unsupported type: ${message.kind}`); } @@ -144,6 +209,29 @@ interface SequenceAckData { sequenceId: number; } +interface InvokeData { + readonly type: "invoke"; + invocationId: string; + target?: "event" | "group"; + event?: string; + dataType?: WebPubSubDataType; + data?: any; +} + +interface InvokeResponseData { + readonly type: "invokeResponse"; + invocationId: string; + success?: boolean; + error?: { name: string; message: string }; + dataType?: WebPubSubDataType; + data?: any; +} + +interface CancelInvocationData { + readonly type: "cancelInvocation"; + invocationId: string; +} + function getPayload(data: JSONTypes | ArrayBuffer, dataType: WebPubSubDataType): any { switch (dataType) { case "text": { diff --git a/sdk/web-pubsub/web-pubsub-client/src/webPubSubClient.ts b/sdk/web-pubsub/web-pubsub-client/src/webPubSubClient.ts index 47a284df6920..d733f3cda173 100644 --- a/sdk/web-pubsub/web-pubsub-client/src/webPubSubClient.ts +++ b/sdk/web-pubsub/web-pubsub-client/src/webPubSubClient.ts @@ -5,7 +5,7 @@ import type { AbortSignalLike } from "@azure/abort-controller"; import { delay } from "@azure/core-util"; import EventEmitter from "events"; import type { SendMessageErrorOptions } from "./errors/index.js"; -import { SendMessageError } from "./errors/index.js"; +import { InvocationError, SendMessageError } from "./errors/index.js"; import { logger } from "./logger.js"; import type { WebPubSubResult, @@ -23,11 +23,16 @@ import type { OnRejoinGroupFailedArgs, StartOptions, GetClientAccessUrlOptions, + InvokeEventOptions, + InvokeEventResult, } from "./models/index.js"; import type { ConnectedMessage, + CancelInvocationMessage, DisconnectedMessage, GroupDataMessage, + InvokeMessage, + InvokeResponseMessage, ServerDataMessage, WebPubSubDataType, WebPubSubMessage, @@ -47,6 +52,7 @@ import type { WebSocketClientLike, } from "./websocket/websocketClientLike.js"; import { AckManager } from "./ackManager.js"; +import { InvocationManager } from "./invocationManager.js"; enum WebPubSubClientState { Stopped = "Stopped", @@ -70,6 +76,7 @@ export class WebPubSubClient { private readonly _options: WebPubSubClientOptions; private readonly _groupMap: Map; private readonly _ackManager: AckManager; + private readonly _invocationManager: InvocationManager; private readonly _sequenceId: SequenceId; private readonly _messageRetryPolicy: RetryPolicy; private readonly _reconnectRetryPolicy: RetryPolicy; @@ -122,6 +129,7 @@ export class WebPubSubClient { this._protocol = this._options.protocol!; this._groupMap = new Map(); this._ackManager = new AckManager(); + this._invocationManager = new InvocationManager(); this._sequenceId = new SequenceId(); this._state = WebPubSubClientState.Stopped; @@ -398,6 +406,69 @@ export class WebPubSubClient { return { isDuplicated: false }; } + private async _invokeEventAttempt( + eventName: string, + content: JSONTypes | ArrayBuffer, + dataType: WebPubSubDataType, + options?: InvokeEventOptions, + ): Promise { + const invokeOptions = options ?? {}; + + const { invocationId, wait } = this._invocationManager.registerInvocation( + invokeOptions.invocationId, + ); + + const invokeMessage: InvokeMessage = { + kind: "invoke", + invocationId, + target: "event", + event: eventName, + dataType, + data: content, + }; + + const responsePromise = wait({ + abortSignal: invokeOptions.abortSignal, + }); + + try { + await this._sendMessage(invokeMessage, invokeOptions.abortSignal); + const response = await responsePromise; + return this._mapInvokeResponse(response); + } catch (err) { + const shouldCancel = + (err instanceof InvocationError && err.errorDetail == null) || + invokeOptions.abortSignal?.aborted === true; + if (shouldCancel) { + await this._sendCancelInvocation(invocationId).catch(() => { + /** empty */ + }); + } + throw err; + } finally { + this._invocationManager.discard(invocationId); + } + } + + /** + * Invoke an upstream event and wait for the correlated response. + * @param eventName - The event name + * @param content - The payload + * @param dataType - The payload type + * @param options - Invoke options + */ + public async invokeEvent( + eventName: string, + content: JSONTypes | ArrayBuffer, + dataType: WebPubSubDataType, + options?: InvokeEventOptions, + ): Promise { + return this._operationExecuteWithRetry( + () => this._invokeEventAttempt(eventName, content, dataType, options), + options?.abortSignal, + ); + } + /** * Join the client to group * @param groupName - The group name @@ -700,6 +771,15 @@ export class WebPubSubClient { this._safeEmitServerMessage(message); }; + const handleInvokeResponseMessage = (message: InvokeResponseMessage): void => { + const resolved = this._invocationManager.resolveInvocation(message); + if (!resolved) { + logger.verbose( + `Received invokeResponse for unknown invocationId: ${message.invocationId}`, + ); + } + }; + let messages: WebPubSubMessage[] | WebPubSubMessage | null; try { let convertedData: Buffer | ArrayBuffer | string; @@ -746,6 +826,10 @@ export class WebPubSubClient { handleServerDataMessage(message as ServerDataMessage); break; } + case "invokeResponse": { + handleInvokeResponseMessage(message as InvokeResponseMessage); + break; + } } } catch (err) { logger.warning( @@ -861,6 +945,15 @@ export class WebPubSubClient { ); }); + this._invocationManager.rejectAll((invocationId) => { + return new InvocationError( + "Connection is disconnected before receiving invoke response from the service", + { + invocationId, + }, + ); + }); + if (this._isStopping) { logger.warning("The client is stopping state. Stop recovery."); this._handleConnectionCloseAndNoRecovery(); @@ -949,6 +1042,43 @@ export class WebPubSubClient { } as OnRejoinGroupFailedArgs); } + private _mapInvokeResponse( + message: InvokeResponseMessage, + ): InvokeEventResult { + if (message.success !== true) { + if (message.success === false) { + throw new InvocationError(message.error?.message ?? "Invocation failed.", { + invocationId: message.invocationId, + errorDetail: message.error, + }); + } + + throw new InvocationError("Unsupported invoke response frame.", { + invocationId: message.invocationId, + }); + } + + return { + invocationId: message.invocationId, + dataType: message.dataType, + data: message.data, + }; + } + + private async _sendCancelInvocation(invocationId: string): Promise { + const message: CancelInvocationMessage = { + kind: "cancelInvocation", + invocationId, + }; + + try { + await this._sendMessage(message); + } catch (err) { + logger.verbose(`Failed to send cancelInvocation for ${invocationId}`, err); + } + } + + private _buildDefaultOptions(clientOptions: WebPubSubClientOptions): WebPubSubClientOptions { if (clientOptions.autoReconnect == null) { clientOptions.autoReconnect = true; @@ -1064,6 +1194,9 @@ export class WebPubSubClient { try { return await inner.call(this); } catch (err) { + if (err instanceof InvocationError) { + throw err; + } retryAttempt++; const delayInMs = this._messageRetryPolicy.nextRetryDelayInMs(retryAttempt); if (delayInMs == null) { @@ -1088,8 +1221,8 @@ class RetryPolicy { this._retryOptions = retryOptions; this._maxRetriesToGetMaxDelay = Math.ceil( Math.log2(this._retryOptions.maxRetryDelayInMs!) - - Math.log2(this._retryOptions.retryDelayInMs!) + - 1, + Math.log2(this._retryOptions.retryDelayInMs!) + + 1, ); } diff --git a/sdk/web-pubsub/web-pubsub-client/test/client.invoke.spec.ts b/sdk/web-pubsub/web-pubsub-client/test/client.invoke.spec.ts new file mode 100644 index 000000000000..eda1ac81e94c --- /dev/null +++ b/sdk/web-pubsub/web-pubsub-client/test/client.invoke.spec.ts @@ -0,0 +1,133 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { describe, it, expect, vi } from "vitest"; +import type { MockInstance } from "vitest"; +import { WebPubSubClient } from "../src/webPubSubClient.js"; +import { TestWebSocketClient } from "./testWebSocketClient.js"; +import { getConnectedPayload } from "./utils.js"; +import { InvocationError } from "../src/errors/index.js"; + +describe("WebPubSubClient invoke support", () => { + it("invokeEvent resolves with invokeResponse payload", async () => { + const client = new WebPubSubClient("wss://service.com"); + const testWs = new TestWebSocketClient(client); + makeStartable(testWs); + + await client.start(); + testWs.invokemessage(JSON.stringify(getConnectedPayload("conn"))); + + const invokePromise = client.invokeEvent( + "echo", + "ping", + "text", + { + invocationId: "invoke-id", + }, + ); + + testWs.invokemessage( + JSON.stringify({ + type: "invokeResponse", + invocationId: "invoke-id", + success: true, + dataType: "text", + data: "pong", + }), + ); + + const result = await invokePromise; + expect(result).toMatchObject({ + invocationId: "invoke-id", + dataType: "text", + data: "pong", + }); + + client.stop(); + }); + + it("invokeEvent rejects when service returns an error", async () => { + const client = new WebPubSubClient("wss://service.com"); + const testWs = new TestWebSocketClient(client); + makeStartable(testWs); + + await client.start(); + testWs.invokemessage(JSON.stringify(getConnectedPayload("conn"))); + + const invokePromise = client.invokeEvent( + "echo", + "ping", + "text", + { + invocationId: "invoke-error", + }, + ); + + testWs.invokemessage( + JSON.stringify({ + type: "invokeResponse", + invocationId: "invoke-error", + success: false, + error: { + name: "BadRequest", + message: "oops", + }, + }), + ); + + await expect(invokePromise).rejects.toThrow(InvocationError); + client.stop(); + }); + + it("invokeEvent sends cancelInvocation when aborted", async () => { + const client = new WebPubSubClient("wss://service.com"); + const testWs = new TestWebSocketClient(client); + makeStartable(testWs); + + const sendSpy = vi.spyOn(testWs, "send"); + + await client.start(); + testWs.invokemessage(JSON.stringify(getConnectedPayload("conn"))); + + const abortController = new AbortController(); + const invokePromise = client.invokeEvent( + "echo", + "ping", + "text", + { + invocationId: "invoke-abort", + abortSignal: abortController.signal, + }, + ); + + abortController.abort(); + + await expect(invokePromise).rejects.toThrow(InvocationError); + + // Verify cancelInvocation message is sent + // The last call should be cancelInvocation. + // Note: send might be called multiple times (e.g. for invoke message). + // We need to find the cancelInvocation message. + const calls = sendSpy.mock.calls; + const cancelMessage = calls.map(args => JSON.parse(args[0] as string)).find(msg => msg.type === "cancelInvocation"); + + expect(cancelMessage).toMatchObject({ + type: "cancelInvocation", + invocationId: "invoke-abort", + }); + + client.stop(); + }); +}); + +function makeStartable(ws: TestWebSocketClient): MockInstance<(fn: () => void) => void> { + const onOpen = ws.onopen.bind(ws); + const stub = vi.spyOn(ws, "onopen"); + stub.mockImplementationOnce((...args) => { + setTimeout(() => { + onOpen(...args); + ws.invokeopen.call(ws); + }); + }); + return stub; +} diff --git a/sdk/web-pubsub/web-pubsub-client/test/invocationManager.spec.ts b/sdk/web-pubsub/web-pubsub-client/test/invocationManager.spec.ts new file mode 100644 index 000000000000..433667e50c21 --- /dev/null +++ b/sdk/web-pubsub/web-pubsub-client/test/invocationManager.spec.ts @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { describe, expect, it } from "vitest"; +import { InvocationManager } from "../src/invocationManager.js"; +import { InvocationError } from "../src/errors/index.js"; +import type { InvokeResponseMessage } from "../src/models/messages.js"; + +const createResponse = (invocationId: string): InvokeResponseMessage => ({ + kind: "invokeResponse", + invocationId, + success: true, + dataType: "json", + data: { message: "ok" }, +}); + +describe("InvocationManager", () => { + it("resolves registered invocations when responses arrive", async () => { + const manager = new InvocationManager(); + const registration = manager.registerInvocation(); + const waitPromise = registration.wait(); + + const response = createResponse(registration.invocationId); + expect(manager.resolveInvocation(response)).toBe(true); + await expect(waitPromise).resolves.toBe(response); + }); + + it("throws when registering duplicate invocation ids", () => { + const manager = new InvocationManager(); + const existing = manager.registerInvocation("same-id"); + expect(existing.invocationId).toBe("same-id"); + expect(() => manager.registerInvocation("same-id")).toThrow(InvocationError); + }); + + it("rejects waiters when rejectInvocation is called", async () => { + const manager = new InvocationManager(); + const registration = manager.registerInvocation(); + const waitPromise = registration.wait(); + + expect(manager.rejectInvocation(registration.invocationId, new Error("boom"))).toBe(true); + await expect(waitPromise).rejects.toThrow("boom"); + }); + + it("propagates abort signals and removes pending registrations", async () => { + const manager = new InvocationManager(); + const registration = manager.registerInvocation(); + const abortController = new AbortController(); + const waitPromise = registration.wait({ abortSignal: abortController.signal }); + + abortController.abort(); + + await expect(waitPromise).rejects.toBeInstanceOf(InvocationError); + const response = createResponse(registration.invocationId); + expect(manager.resolveInvocation(response)).toBe(false); + }); + + it("rejects all pending registrations with rejectAll", async () => { + const manager = new InvocationManager(); + const registration = manager.registerInvocation(); + const waitPromise = registration.wait(); + + manager.rejectAll((id) => new Error(`failure:${id}`)); + await expect(waitPromise).rejects.toThrow(`failure:${registration.invocationId}`); + }); +});