Skip to content

Commit de864dd

Browse files
authored
[core-amqp] Update async methods to support cancellation (Azure#13835)
* [core-amqp] make CbsClient.negotiateClaim() cancellable * [core-amqp] make RequestResponseLink.create cancellable * [core-amqp] make CbsClient.init cancellable * [core-amqp] fix lint error * [core-amqp] update API review * [core-amqp] update 2.2.0 changelog with notes for issue 9988 * [core-amqp] update package.json version to 2.2.0 * [core-amqp] pass abortSignal to connection.open() * [core-amqp] rename RequestResponseLink.create(..., options) to RequestResponseLink.create(..., createOptions) * [core-amqp] rename RequestResponseLink.create(..., options) to RequestResponseLink.create(..., createOptions) - API review file
1 parent 829ac83 commit de864dd

File tree

8 files changed

+239
-17
lines changed

8 files changed

+239
-17
lines changed

sdk/core/core-amqp/CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
# Release History
22

3-
## 2.1.1 (Unreleased)
3+
## 2.2.0 (Unreleased)
44

5+
- Addresses issue [9988](https://github.com/Azure/azure-sdk-for-js/issues/9988)
6+
by updating the following operations to accept an `abortSignal` to allow cancellation:
7+
- CbsClient.init()
8+
- CbsClient.negotiateClaim()
9+
- RequestResponseLink.create()
510

611
## 2.1.0 (2021-02-08)
712

sdk/core/core-amqp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@azure/core-amqp",
33
"sdk-type": "client",
4-
"version": "2.1.1",
4+
"version": "2.2.0",
55
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
66
"author": "Microsoft Corporation",
77
"license": "MIT",

sdk/core/core-amqp/review/core-amqp.api.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,12 @@ export class CbsClient {
9090
connection: Connection;
9191
readonly connectionLock: string;
9292
readonly endpoint: string;
93-
init(): Promise<void>;
94-
negotiateClaim(audience: string, token: string, tokenType: TokenType): Promise<CbsResponse>;
93+
init(options?: {
94+
abortSignal?: AbortSignalLike;
95+
}): Promise<void>;
96+
negotiateClaim(audience: string, token: string, tokenType: TokenType, options?: {
97+
abortSignal?: AbortSignalLike;
98+
}): Promise<CbsResponse>;
9599
remove(): void;
96100
readonly replyTo: string;
97101
}
@@ -446,7 +450,9 @@ export class RequestResponseLink implements ReqResLink {
446450
constructor(session: Session, sender: Sender, receiver: Receiver);
447451
close(): Promise<void>;
448452
get connection(): Connection;
449-
static create(connection: Connection, senderOptions: SenderOptions, receiverOptions: ReceiverOptions): Promise<RequestResponseLink>;
453+
static create(connection: Connection, senderOptions: SenderOptions, receiverOptions: ReceiverOptions, createOptions?: {
454+
abortSignal?: AbortSignalLike;
455+
}): Promise<RequestResponseLink>;
450456
isOpen(): boolean;
451457
// (undocumented)
452458
receiver: Receiver;

sdk/core/core-amqp/src/cbs.ts

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
SenderOptions,
1313
generate_uuid
1414
} from "rhea-promise";
15+
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
1516
import { Constants } from "./util/constants";
1617
import { logErrorStackTrace, logger } from "./log";
1718
import { translate } from "./errors";
@@ -71,15 +72,24 @@ export class CbsClient {
7172
/**
7273
* Creates a singleton instance of the CBS session if it hasn't been initialized previously on
7374
* the given connection.
75+
* @param options - Optional parameters that can be used to affect this method's behavior.
76+
* For example, `abortSignal` can be passed to allow cancelling an in-progress `init` invocation.
7477
* @returns Promise<void>.
7578
*/
76-
async init(): Promise<void> {
79+
async init(options: { abortSignal?: AbortSignalLike } = {}): Promise<void> {
80+
const { abortSignal } = options;
81+
const initAbortMessage = "The init operation has been cancelled by the user.";
82+
7783
try {
84+
if (abortSignal?.aborted) {
85+
throw new AbortError(initAbortMessage);
86+
}
87+
7888
// Acquire the lock and establish an amqp connection if it does not exist.
7989
if (!this.connection.isOpen()) {
8090
logger.verbose("The CBS client is trying to establish an AMQP connection.");
8191
await defaultLock.acquire(this.connectionLock, () => {
82-
return this.connection.open();
92+
return this.connection.open({ abortSignal });
8393
});
8494
}
8595

@@ -107,7 +117,8 @@ export class CbsClient {
107117
this._cbsSenderReceiverLink = await RequestResponseLink.create(
108118
this.connection,
109119
srOpt,
110-
rxOpt
120+
rxOpt,
121+
{ abortSignal }
111122
);
112123
this._cbsSenderReceiverLink.sender.on(SenderEvents.senderError, (context: EventContext) => {
113124
const id = context.connection.options.id;
@@ -179,15 +190,24 @@ export class CbsClient {
179190
* - **ManagementClient**
180191
* - `"sb://<your-namespace>.servicebus.windows.net/<event-hub-name>/$management"`.
181192
* @param token - The token that needs to be sent in the put-token request.
193+
* @param tokenType - The type of token being used. For example, 'jwt' or 'servicebus.windows.net:sastoken'.
194+
* @param options - Optional parameters that can be used to affect this method's behavior.
195+
* For example, `abortSignal` can be passed to allow cancelling an in-progress `negotiateClaim` invocation.
182196
* @returns A Promise that resolves when $cbs authentication is successful
183197
* and rejects when an error occurs during $cbs authentication.
184198
*/
185199
async negotiateClaim(
186200
audience: string,
187201
token: string,
188-
tokenType: TokenType
202+
tokenType: TokenType,
203+
options: { abortSignal?: AbortSignalLike } = {}
189204
): Promise<CbsResponse> {
205+
const { abortSignal } = options;
190206
try {
207+
if (abortSignal?.aborted) {
208+
throw new AbortError("The negotiateClaim operation has been cancelled by the user.");
209+
}
210+
191211
if (!this._cbsSenderReceiverLink) {
192212
throw new Error("Attempted to negotiate a claim but the CBS link does not exist.");
193213
}
@@ -203,7 +223,10 @@ export class CbsClient {
203223
type: tokenType
204224
}
205225
};
206-
const responseMessage = await this._cbsSenderReceiverLink.sendRequest(request);
226+
const responseMessage = await this._cbsSenderReceiverLink.sendRequest(request, {
227+
abortSignal,
228+
requestName: "negotiateClaim"
229+
});
207230
logger.verbose("[%s] The CBS response is: %O", this.connection.id, responseMessage);
208231
return this._fromRheaMessageResponse(responseMessage);
209232
} catch (err) {

sdk/core/core-amqp/src/requestResponseLink.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,16 +215,20 @@ export class RequestResponseLink implements ReqResLink {
215215
* @param connection - The amqp connection.
216216
* @param senderOptions - Options that must be provided to create the sender link.
217217
* @param receiverOptions - Options that must be provided to create the receiver link.
218+
* @param createOptions - Optional parameters that can be used to affect this method's behavior.
219+
* For example, `abortSignal` can be passed to allow cancelling an in-progress `create` invocation.
218220
* @returns Promise<RequestResponseLink>
219221
*/
220222
static async create(
221223
connection: Connection,
222224
senderOptions: SenderOptions,
223-
receiverOptions: ReceiverOptions
225+
receiverOptions: ReceiverOptions,
226+
createOptions: { abortSignal?: AbortSignalLike } = {}
224227
): Promise<RequestResponseLink> {
225-
const session = await connection.createSession();
226-
const sender = await session.createSender(senderOptions);
227-
const receiver = await session.createReceiver(receiverOptions);
228+
const { abortSignal } = createOptions;
229+
const session = await connection.createSession({ abortSignal });
230+
const sender = await session.createSender({ ...senderOptions, abortSignal });
231+
const receiver = await session.createReceiver({ ...receiverOptions, abortSignal });
228232
logger.verbose(
229233
"[%s] Successfully created the sender and receiver links on the same session.",
230234
connection.id

sdk/core/core-amqp/test/cbs.spec.ts

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,83 @@
22
// Licensed under the MIT license.
33

44
import { assert } from "chai";
5+
import { AbortController } from "@azure/abort-controller";
6+
import { CbsClient, defaultLock, TokenType } from "../src";
7+
import { createConnectionStub } from "./utils/createConnectionStub";
58
import { Connection } from "rhea-promise";
69
import { stub } from "sinon";
7-
import { CbsClient, TokenType } from "../src";
810

911
describe("CbsClient", function() {
1012
const TEST_FAILURE = "Test failure";
13+
14+
describe("init", function() {
15+
it("honors already aborted abortSignal", async function() {
16+
const cbsClient = new CbsClient(new Connection(), "lock");
17+
18+
// Create an abort signal that is already aborted.
19+
const controller = new AbortController();
20+
controller.abort();
21+
const signal = controller.signal;
22+
23+
try {
24+
await cbsClient.init({ abortSignal: signal });
25+
throw new Error(TEST_FAILURE);
26+
} catch (err) {
27+
assert.equal(err.name, "AbortError");
28+
}
29+
});
30+
31+
it("honors abortSignal inside locking code", async function() {
32+
const lock = "lock";
33+
const cbsClient = new CbsClient(new Connection(), "lock");
34+
35+
// Create an abort signal that will be aborted on a future tick of the event loop.
36+
const controller = new AbortController();
37+
const signal = controller.signal;
38+
39+
// Make the existing `init` invocation wait until the abortSignal
40+
// is aborted before acquiring it's lock.
41+
await defaultLock.acquire(lock, (done) => {
42+
setTimeout(() => {
43+
controller.abort();
44+
done();
45+
}, 0);
46+
});
47+
48+
try {
49+
await cbsClient.init({ abortSignal: signal });
50+
throw new Error(TEST_FAILURE);
51+
} catch (err) {
52+
assert.equal(err.name, "AbortError");
53+
}
54+
});
55+
56+
it("honors abortSignal", async function() {
57+
const connectionStub = new Connection();
58+
// Stub 'open' because creating a real connection will fail.
59+
stub(connectionStub, "open").resolves({} as any);
60+
61+
const cbsClient = new CbsClient(connectionStub, "lock");
62+
63+
// Create an abort signal that will be aborted on a future tick of the event loop.
64+
const controller = new AbortController();
65+
const signal = controller.signal;
66+
setTimeout(() => controller.abort(), 0);
67+
68+
try {
69+
await cbsClient.init({ abortSignal: signal });
70+
throw new Error(TEST_FAILURE);
71+
} catch (err) {
72+
assert.equal(err.name, "AbortError");
73+
}
74+
});
75+
});
76+
1177
describe("negotiateClaim", function() {
1278
it("throws an error if the cbs link doesn't exist.", async function() {
13-
const connectionStub = stub(new Connection()) as any;
14-
79+
const connectionStub = createConnectionStub();
1580
const cbsClient = new CbsClient(connectionStub, "lock");
81+
1682
try {
1783
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas);
1884
throw new Error(TEST_FAILURE);
@@ -23,5 +89,49 @@ describe("CbsClient", function() {
2389
);
2490
}
2591
});
92+
93+
describe("cancellation", function() {
94+
it("honors already aborted abortSignal", async function() {
95+
const connectionStub = createConnectionStub();
96+
const cbsClient = new CbsClient(connectionStub, "lock");
97+
98+
// Create an abort signal that is already aborted.
99+
const controller = new AbortController();
100+
controller.abort();
101+
const signal = controller.signal;
102+
103+
try {
104+
// Pass the already aborted abortSignal to make sure negotiateClaim will exit quickly.
105+
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas, {
106+
abortSignal: signal
107+
});
108+
throw new Error(TEST_FAILURE);
109+
} catch (err) {
110+
assert.equal(err.name, "AbortError");
111+
}
112+
});
113+
114+
it("honors abortSignal", async function() {
115+
const connectionStub = createConnectionStub();
116+
const cbsClient = new CbsClient(connectionStub, "lock");
117+
118+
// Call `init()` to ensure the CbsClient has a RequestResponseLink.
119+
await cbsClient.init();
120+
121+
// Create an abort signal that will be aborted on a future tick of the event loop.
122+
const controller = new AbortController();
123+
const signal = controller.signal;
124+
setTimeout(() => controller.abort(), 0);
125+
126+
try {
127+
await cbsClient.negotiateClaim("audience", "token", TokenType.CbsTokenTypeSas, {
128+
abortSignal: signal
129+
});
130+
throw new Error(TEST_FAILURE);
131+
} catch (err) {
132+
assert.equal(err.name, "AbortError");
133+
}
134+
});
135+
});
26136
});
27137
});

sdk/core/core-amqp/test/requestResponse.spec.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
getCodeDescriptionAndError,
2020
onMessageReceived
2121
} from "../src/requestResponseLink";
22+
import { createConnectionStub } from "./utils/createConnectionStub";
2223
interface Window {}
2324
declare let self: Window & typeof globalThis;
2425

@@ -42,6 +43,48 @@ const assertItemsLengthInResponsesMap = (
4243
};
4344

4445
describe("RequestResponseLink", function() {
46+
const TEST_FAILURE = "Test failure";
47+
48+
describe("#create", function() {
49+
it("should create a RequestResponseLink", async function() {
50+
const connectionStub = createConnectionStub();
51+
const link = await RequestResponseLink.create(connectionStub, {}, {});
52+
assert.isTrue(link instanceof RequestResponseLink);
53+
});
54+
55+
it("honors already aborted abortSignal", async function() {
56+
const connection = new Connection();
57+
58+
// Create an abort signal that will be aborted on a future tick of the event loop.
59+
const controller = new AbortController();
60+
const signal = controller.signal;
61+
setTimeout(() => controller.abort(), 0);
62+
63+
try {
64+
await RequestResponseLink.create(connection, {}, {}, { abortSignal: signal });
65+
throw new Error(TEST_FAILURE);
66+
} catch (err) {
67+
assert.equal(err.name, "AbortError");
68+
}
69+
});
70+
71+
it("honors abortSignal", async function() {
72+
const connection = new Connection();
73+
74+
// Create an abort signal that is already aborted.
75+
const controller = new AbortController();
76+
controller.abort();
77+
const signal = controller.signal;
78+
79+
try {
80+
await RequestResponseLink.create(connection, {}, {}, { abortSignal: signal });
81+
throw new Error(TEST_FAILURE);
82+
} catch (err) {
83+
assert.equal(err.name, "AbortError");
84+
}
85+
});
86+
});
87+
4588
it("should send a request and receive a response correctly", async function() {
4689
const connectionStub = stub(new Connection());
4790
const rcvr = new EventEmitter();
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
import EventEmitter from "events";
5+
import { Connection } from "rhea-promise";
6+
import { stub } from "sinon";
7+
8+
/**
9+
* Creates a stubbed rhea-promise Connection object.
10+
*/
11+
export function createConnectionStub(): Connection {
12+
const connectionStub = new Connection();
13+
stub(connectionStub, "open").resolves({} as any);
14+
stub(connectionStub, "createSession").resolves({
15+
connection: {
16+
id: "connection-1"
17+
},
18+
createSender: () => {
19+
const sender = new EventEmitter() as any;
20+
sender.send = () => {
21+
/* no-op */
22+
};
23+
return Promise.resolve(sender);
24+
},
25+
createReceiver: () => {
26+
return Promise.resolve(new EventEmitter());
27+
}
28+
} as any);
29+
stub(connectionStub, "id").get(() => "connection-1");
30+
return connectionStub;
31+
}

0 commit comments

Comments
 (0)