Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions sdk/web-pubsub/web-pubsub-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ await client.joinGroup(groupName);
await client.sendToGroup(groupName, "hello world", "text");
```

### 5. Invoke upstream events (preview)

```ts
import { WebPubSubClient } from "@azure/web-pubsub-client";

const client = new WebPubSubClient("<client-access-url>");
await client.start();

const result = await client.invokeEvent("processOrder", { orderId: 1 }, "json");
console.log(`Invocation result: ${JSON.stringify(result.data)}`);
```

`invokeEvent` sends an `invoke` request to the service, awaits the correlated `invokeResponse`, and returns the payload. You can abort the invocation by passing `{ abortSignal }`.
_Streaming and service-initiated invocations are not yet supported._

---

## Examples
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ export interface AckMessageError {
name: string;
}

// @public
export interface CancelInvocationMessage extends WebPubSubMessageBase {
// (undocumented)
invocationId: string;
// (undocumented)
readonly kind: "cancelInvocation";
}

// @public
export interface ConnectedMessage extends WebPubSubMessageBase {
connectionId: string;
Expand Down Expand Up @@ -55,7 +63,11 @@ export type DownstreamMessageType =
/**
* Type for ServerDataMessage
*/
| "serverData";
| "serverData"
/**
* Type for InvokeResponseMessage
*/
| "invokeResponse";

// @public
export interface GetClientAccessUrlOptions {
Expand All @@ -72,6 +84,71 @@ export interface GroupDataMessage extends WebPubSubMessageBase {
sequenceId?: number;
}

// @public
export class InvocationError extends Error {
constructor(message: string, options: InvocationErrorOptions);
errorDetail?: InvokeResponseError;
invocationId: string;
}

// @public (undocumented)
export interface InvocationErrorOptions {
// (undocumented)
errorDetail?: InvokeResponseError;
// (undocumented)
invocationId: string;
}

// @public
export interface InvokeEventOptions {
abortSignal?: AbortSignalLike;
invocationId?: string;
}

// @public
export interface InvokeEventResult {
data?: JSONTypes | ArrayBuffer;
dataType?: WebPubSubDataType;
invocationId: string;
}

// @public
export interface InvokeMessage extends WebPubSubMessageBase {
data?: JSONTypes | ArrayBuffer;
dataType?: WebPubSubDataType;
event?: string;
// (undocumented)
invocationId: string;
// (undocumented)
readonly kind: "invoke";
// (undocumented)
target?: "event";
}

// @public
export interface InvokeResponseError {
// (undocumented)
message: string;
// (undocumented)
name: string;
}

// @public
export interface InvokeResponseMessage extends WebPubSubMessageBase {
// (undocumented)
data?: JSONTypes | ArrayBuffer;
// (undocumented)
dataType?: WebPubSubDataType;
// (undocumented)
error?: InvokeResponseError;
// (undocumented)
invocationId: string;
// (undocumented)
readonly kind: "invokeResponse";
// (undocumented)
success?: boolean;
}

// @public
export interface JoinGroupMessage extends WebPubSubMessageBase {
ackId?: number;
Expand Down Expand Up @@ -224,12 +301,21 @@ export type UpstreamMessageType =
/**
* Type for SequenceAckMessage
*/
| "sequenceAck";
| "sequenceAck"
/**
* Type for InvokeMessage
*/
| "invoke"
/**
* Type for CancelInvocationMessage
*/
| "cancelInvocation";

// @public
export class WebPubSubClient {
constructor(clientAccessUrl: string, options?: WebPubSubClientOptions);
constructor(credential: WebPubSubClientCredential, options?: WebPubSubClientOptions);
invokeEvent(eventName: string, content: JSONTypes | ArrayBuffer, dataType: WebPubSubDataType, options?: InvokeEventOptions): Promise<InvokeEventResult>;
joinGroup(groupName: string, options?: JoinGroupOptions): Promise<WebPubSubResult>;
leaveGroup(groupName: string, options?: LeaveGroupOptions): Promise<WebPubSubResult>;
off(event: "connected", listener: (e: OnConnectedArgs) => void): void;
Expand Down Expand Up @@ -298,7 +384,7 @@ export const WebPubSubJsonProtocol: () => WebPubSubClientProtocol;
export const WebPubSubJsonReliableProtocol: () => WebPubSubClientProtocol;

// @public
export type WebPubSubMessage = GroupDataMessage | ServerDataMessage | JoinGroupMessage | LeaveGroupMessage | ConnectedMessage | DisconnectedMessage | SendToGroupMessage | SendEventMessage | SequenceAckMessage | AckMessage;
export type WebPubSubMessage = GroupDataMessage | ServerDataMessage | JoinGroupMessage | LeaveGroupMessage | ConnectedMessage | DisconnectedMessage | SendToGroupMessage | SendEventMessage | SequenceAckMessage | AckMessage | InvokeMessage | InvokeResponseMessage | CancelInvocationMessage;

// @public
export interface WebPubSubMessageBase {
Expand Down
28 changes: 27 additions & 1 deletion sdk/web-pubsub/web-pubsub-client/src/errors/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import type { AckMessageError } from "../models/messages.js";
import type { AckMessageError, InvokeResponseError } from "../models/messages.js";

/**
* Error when sending message failed
Expand Down Expand Up @@ -42,3 +42,29 @@ export interface SendMessageErrorOptions {
*/
errorDetail?: AckMessageError;
}

export interface InvocationErrorOptions {
invocationId: string;
Comment on lines +46 to +47
Copy link

Copilot AI Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add JSDoc comments to the InvocationErrorOptions interface properties to be consistent with the pattern used for SendMessageErrorOptions above it.

Suggested change
export interface InvocationErrorOptions {
invocationId: string;
export interface InvocationErrorOptions {
/**
* The invocation id of the request.
*/
invocationId: string;
/**
* Error details from the service if available.
*/

Copilot uses AI. Check for mistakes.
errorDetail?: InvokeResponseError;
}

/**
* Error thrown when an invocation fails or is cancelled.
*/
export class InvocationError extends Error {
/**
* The invocation id of the request.
*/
public invocationId: string;
/**
* Error details from the service if available.
*/
public errorDetail?: InvokeResponseError;

constructor(message: string, options: InvocationErrorOptions) {
super(message);
this.name = "InvocationError";
this.invocationId = options.invocationId;
this.errorDetail = options.errorDetail;
}
}
161 changes: 161 additions & 0 deletions sdk/web-pubsub/web-pubsub-client/src/invocationManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import type { AbortSignalLike } from "@azure/abort-controller";
import type { InvokeResponseMessage } from "./models/messages.js";
import { InvocationError } from "./errors/index.js";

export interface InvocationWaitOptions {
abortSignal?: AbortSignalLike;
}

export interface InvocationRegistration {
invocationId: string;
wait(options?: InvocationWaitOptions): Promise<InvokeResponseMessage>;
}

/**
* Manages pending invocations awaiting invokeResponse frames.
*/
export class InvocationManager {
private readonly _entries = new Map<string, InvocationEntity>();
private _nextId = 0;

public registerInvocation(invocationId?: string): InvocationRegistration {
const resolvedId = invocationId ?? this._generateInvocationId();
if (this._entries.has(resolvedId)) {
throw new InvocationError("Invocation id is already registered.", {
invocationId: resolvedId,
});
}

const entity = new InvocationEntity(resolvedId);
this._entries.set(resolvedId, entity);
return {
invocationId: resolvedId,
wait: (options?: InvocationWaitOptions) => this._waitForEntry(entity, options),
};
}

public resolveInvocation(message: InvokeResponseMessage): boolean {
const entry = this._entries.get(message.invocationId);
if (!entry) {
return false;
}
this._entries.delete(message.invocationId);
entry.resolve(message);
return true;
}

public rejectInvocation(invocationId: string, reason: unknown): boolean {
const entry = this._entries.get(invocationId);
if (!entry) {
return false;
}
this._entries.delete(invocationId);
entry.reject(reason);
return true;
}

public discard(invocationId: string): void {
this._entries.delete(invocationId);
}

public rejectAll(createReason: (invocationId: string) => unknown): void {
this._entries.forEach((entry, invocationId) => {
if (this._entries.delete(invocationId)) {
entry.reject(createReason(invocationId));
}
});
}

private _waitForEntry(
entry: InvocationEntity,
options?: InvocationWaitOptions,
): Promise<InvokeResponseMessage> {
const waitPromise = entry.promise();
const abortSignal = options?.abortSignal;

if (!abortSignal) {
return waitPromise;
}

if (abortSignal.aborted) {
if (this._entries.delete(entry.invocationId)) {
entry.reject(this._createAbortError(entry.invocationId));
}
return waitPromise;
}

return new Promise<InvokeResponseMessage>((resolve, reject) => {
const onAbort = (): void => {
abortSignal.removeEventListener("abort", onAbort);
if (this._entries.delete(entry.invocationId)) {
entry.reject(this._createAbortError(entry.invocationId));
}
};

abortSignal.addEventListener("abort", onAbort);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these listeners may leak if _sendMessage throws in _invokeEventAttempt?


waitPromise
.then((result) => {
abortSignal.removeEventListener("abort", onAbort);
resolve(result);
})
.catch((err) => {
abortSignal.removeEventListener("abort", onAbort);
reject(err);
});
});
}

private _generateInvocationId(): string {
this._nextId += 1;
return this._nextId.toString();
}

private _createAbortError(invocationId: string): InvocationError {
return new InvocationError("Invocation cancelled by abortSignal.", {
invocationId,
});
}
}

class InvocationEntity {
private readonly _promise: Promise<InvokeResponseMessage>;
private _resolve:
| ((value: InvokeResponseMessage | PromiseLike<InvokeResponseMessage>) => void)
| undefined;
private _reject: ((reason?: unknown) => void) | undefined;

constructor(public readonly invocationId: string) {
this._promise = new Promise<InvokeResponseMessage>((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
}

public promise(): Promise<InvokeResponseMessage> {
return this._promise;
}

public resolve(value: InvokeResponseMessage | PromiseLike<InvokeResponseMessage>): void {
const callback = this._resolve;
if (!callback) {
return;
}
this._resolve = undefined;
this._reject = undefined;
callback(value);
}

public reject(reason?: unknown): void {
const callback = this._reject;
if (!callback) {
return;
}
this._resolve = undefined;
this._reject = undefined;
callback(reason);
}
}
Loading
Loading