-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDistributedLock.mts
658 lines (587 loc) · 26.2 KB
/
DistributedLock.mts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
/**
* Distributed lock implementation using DynamoDB.
* @module distributed-lock
*/
import { DistributedLockDb } from "./DistributedLockDb.mjs";
import { Metrics } from "@aws-lambda-powertools/metrics";
import { logger, TMMLogger, LogLevelThreshold } from "@tmm/tools/logger";
import { uuidv7 } from "uuidv7";
import { setTimeout as setTimeoutPromise } from "timers/promises";
import * as Defaults from "./Constants.mjs";
type DistributedLockDependencies = {
lockDb?: DistributedLockDb;
loggerInstance?: TMMLogger;
metricsInstance?: Metrics;
};
type DistributedLockParams = {
lockId: string;
lockGroup: string;
ownerName: string;
accountId: number;
appId: number;
getRemainingTimeMs?: () => number;
};
/**
* A distributed lock implementation using DynamoDB.
* @class DistributedLock
*/
class DistributedLock {
private readonly logger: TMMLogger;
#lockRevision: string | null = null;
#lockDurationMs: number | null = null;
#hasLock = false;
#cancelAtTimeInMs: number | null = null;
#abortController: AbortController | null = null;
#renewalInterval: NodeJS.Timeout | null = null;
#timeoutHandle: NodeJS.Timeout | null = null;
#lockId: string;
#lockGroup: string;
#ownerName: string;
readonly #lockDb: DistributedLockDb;
#acquiredAt: number | null = null;
#recordAccount: string;
private maximumLockDurationMs: number;
private readonly getRemainingTimeMs: () => number;
private readonly metrics?: Metrics;
private renewalCount = 0;
constructor(
{ lockId, lockGroup, ownerName, accountId, appId, getRemainingTimeMs = () => Defaults.DEFAULT_MAXIMUM_LOCK_DURATION_MS }: DistributedLockParams,
{ lockDb, loggerInstance = logger, metricsInstance }: DistributedLockDependencies = {},
) {
const emptyFields: string[] = [];
if (!lockId?.trim()) emptyFields.push("lockId");
if (!lockGroup?.trim()) emptyFields.push("lockGroup");
if (!ownerName?.trim()) emptyFields.push("ownerName");
if (emptyFields.length > 0) {
throw new Error(`DistributedLock constructor: The following fields must not be empty: ${emptyFields.join(", ")}`);
}
if (accountId <= 0) {
throw new Error("DistributedLock constructor: accountId must be a positive number");
}
if (appId <= 0) {
throw new Error("DistributedLock constructor: appId must be a positive number");
}
this.getRemainingTimeMs = getRemainingTimeMs;
//cannot use instanceof because of possible mocks.
//while lockDb appears optional, it is defaulted in the lock factory and is never undefined.
/* c8 ignore start */
if (!lockDb) {
throw new Error("DistributedLock constructor: lockDb is required");
}
/* c8 ignore end */
this.logger = loggerInstance.createChild({ serviceName: "DistributedLock" });
this.#lockId = lockId;
this.#lockGroup = lockGroup;
this.#ownerName = ownerName;
//this.#maximumLockDurationMs = this.getRemainingTimeMs();
this.#lockDb = lockDb;
this.#recordAccount = `act:${accountId}/app:${appId}`;
this.metrics = metricsInstance;
}
get lockId(): string | null {
return this.#lockId;
}
get lockGroup(): string | null {
return this.#lockGroup;
}
get ownerName(): string | null {
return this.#ownerName;
}
get hasLock(): boolean {
return this.#hasLock ?? false;
}
get lockRevision(): string | null {
return this.#lockRevision;
}
get hasRenewal(): boolean {
return this.#renewalInterval !== null;
}
/**
* Gets the abort signal for the current lock
* @returns The abort signal or null if no lock is held
*/
get signal(): AbortSignal | undefined {
return this.#abortController?.signal;
}
#setupLockTimeout() {
if (!this.#cancelAtTimeInMs || !this.#hasLock) return;
const timeoutMs = this.#cancelAtTimeInMs - Date.now();
if (timeoutMs <= 0) throw new Error("Lock timeout has already expired");
this.#abortController = new AbortController();
this.#timeoutHandle = setTimeout(async () => {
this.metrics?.addMetric("LockTimeout", "Count", 1);
this.#abortController?.abort();
try {
/* c8 ignore start */
await this.releaseLock();
} catch (error) {
/* c8 ignore end */
// If release fails, ensure we're still aborted
this.#abortController?.abort();
throw error;
}
}, timeoutMs);
}
#setupAutoRenewal() {
if (!this.#hasLock || !this.#lockDurationMs || !this.#lockRevision) return;
const renewalInterval = Math.max(Defaults.DEFAULT_RENEWAL_INTERVAL_MS, Math.floor(this.#lockDurationMs / 3)); // 1/3 allows missing one renewal without losing the
this.#renewalInterval = setInterval(async () => {
// If already renewing, increment queue counter
try {
// Check if we still have the lock
if (!this.#hasLock) {
if (this.#renewalInterval) {
clearInterval(this.#renewalInterval);
this.#renewalInterval = null;
}
this.#abortController?.abort();
return;
}
const startTime = Date.now();
const baseLockDuration = this.#lockDurationMs ?? Defaults.DEFAULT_LOCK_DURATION_MS;
// If we're behind on renewals, calculate a longer duration to catch up
await this.renewLock();
this.renewalCount++;
// Clear the queue since we've caught up with a longer duration
const renewalDuration = Date.now() - startTime;
if (renewalDuration > renewalInterval) {
this.metrics?.addMetric("LongRenewalCall", "Count", 1);
this.logger.warn("Lock renewal took longer than interval", {
renewalDuration,
renewalInterval,
baseLockDuration,
});
}
} catch (error) {
this.logger.error("LockRenewalError", { error });
// If renewal fails and we're still renewing (not already released), release the lock and abort
// Don't release the lock if it was just aborted or if we don't have it anymore
if (!(error instanceof Error && error.message === "Lock renewal aborted") && this.#hasLock) {
await this.releaseLock();
}
if (error instanceof Error && error.message.includes("Lock renewal failed")) {
await this.releaseLock();
}
this.#abortController?.abort();
}
}, renewalInterval);
}
/**
* Cancels the lock timeout without releasing the lock
*/
cancelTimeout() {
if (this.#timeoutHandle) {
clearTimeout(this.#timeoutHandle);
this.#timeoutHandle = null;
}
}
/**
* Cancels the auto-renewal without releasing the lock
*/
cancelAutoRenewal() {
if (this.#renewalInterval) {
clearInterval(this.#renewalInterval);
this.#renewalInterval = null;
}
}
/**
* Cancels both timeout and auto-renewal without releasing the lock
*/
cancelAll() {
this.cancelTimeout();
this.cancelAutoRenewal();
}
async checkForStaleLock(): Promise<boolean> {
const currentLock = await this.#lockDb.getLock({
lockId: this.#lockId,
lockGroup: this.#lockGroup,
});
if (currentLock && currentLock.revision === this.#lockRevision && currentLock.ownerName === this.#ownerName) {
return false;
}
return true;
}
/**
* Attempts to acquire a lock with the specified duration
* @param lockDurationMs - Duration in milliseconds for which to hold the lock. Defaults to DEFAULT_LOCK_DURATION_MS
* @param acquireLock - Optional function to override default lock acquisition behavior
* @returns True if lock was acquired, false otherwise
*/
async acquireLock(lockDurationMs = Defaults.DEFAULT_LOCK_DURATION_MS): Promise<boolean> {
const startTime = Date.now();
try {
// Prevent acquiring a lock when we already have one
if (this.#hasLock) {
const isStale = await this.checkForStaleLock();
if (!isStale) {
return true;
}
if (isStale) {
this.metrics?.addMetric("StaleLock", "Count", 1);
const revision = this.#lockRevision;
this.cleanupTimers();
this.cleanupLock();
throw new LockAcquisitionError("Stale Lock", {
cause: {
staleRevision: revision,
reason: "stale_lock",
},
});
}
}
const effectiveLockDurationMs = Math.max(lockDurationMs, Defaults.MINIMUM_LOCK_DURATION_MS);
if (effectiveLockDurationMs > lockDurationMs) {
this.logger.warn("Lock duration was below minimum, automatically increased", {
requestedDuration: lockDurationMs,
effectiveDuration: effectiveLockDurationMs,
minimumDuration: Defaults.MINIMUM_LOCK_DURATION_MS,
});
}
// Clear any existing timeouts/intervals
this.cancelAll();
// Get current lock state to obtain revision
const currentLock = await this.#lockDb.getLock({
lockId: this.#lockId,
lockGroup: this.#lockGroup,
});
// If the lock is recently acquired or renewed, we can't acquire it.
if (currentLock && !currentLock.isReleased && currentLock.createdAt + currentLock.lockDurationMs > Date.now()) {
this.#hasLock = false;
this.#lockDurationMs = currentLock.lockDurationMs;
this.maximumLockDurationMs = currentLock.expiresAt - Date.now();
return false;
}
const currentRevision = currentLock?.revision ?? null;
const newRevision = uuidv7();
const effectiveMaximumLockDurationMs = Math.min(this.getRemainingTimeMs(), Defaults.DEFAULT_MAXIMUM_LOCK_DURATION_MS);
// Add jitter to lock acquisition to help prevent thundering herd
await setTimeoutPromise(Math.ceil(Math.random() * 200));
const lockItem = await this.#lockDb.acquireLock({
lockId: this.#lockId,
lockGroup: this.#lockGroup,
ownerName: this.#ownerName,
lockDurationMs: effectiveLockDurationMs,
currentRevision,
lockMaximumDurationMs: effectiveMaximumLockDurationMs,
newRevision,
recordAccount: this.#recordAccount,
});
if (!lockItem) {
// probably never gets here, becuase the only way this should happen
//is if there was a dynamodb error which would throw past this.
// but need to guard types.
throw new LockAcquisitionError("Lock acquisition failed: no lock item returned", {
cause: {
revision: newRevision,
reason: "no_lock_item",
},
});
}
if (lockItem.revision !== newRevision) {
this.logger.debug("LockRenewal:LockNotAcquired", {
lockId: this.#lockId,
lockGroup: this.#lockGroup,
revision: lockItem.revision,
lockItem,
newRevision,
currentRevision,
});
this.#hasLock = false;
this.#lockDurationMs = lockItem.lockDurationMs;
this.#lockRevision = null;
this.maximumLockDurationMs = lockItem.expiresAt - Date.now();
return false;
}
const recheckLock = await this.#lockDb.getLock({
lockId: this.#lockId,
lockGroup: this.#lockGroup,
nowTimestamp: Date.now(),
});
if (!recheckLock) {
// probably never gets here, as this is a dynamodb error that would throw past this.
throw new LockAcquisitionError("Lock acquisition failed: no recheck lock item returned", {
cause: {
revision: newRevision,
reason: "no_recheck_lock_item",
},
});
}
if (recheckLock.revision !== newRevision) {
this.logger.info("LockRenewal:LockNotAcquired", {
lockId: this.#lockId,
lockGroup: this.#lockGroup,
revision: recheckLock.revision,
recheckLock,
});
// Different owner/instance has the lock
this.#hasLock = false;
this.#lockDurationMs = null;
this.#lockRevision = null;
return false;
}
// We have the lock
this.#lockRevision = lockItem.revision;
this.#hasLock = true;
this.#lockDurationMs = lockItem.lockDurationMs;
this.#cancelAtTimeInMs = lockItem.expiresAt; // 500ms before expiration to provide a buffer for other processes polling.
this.#acquiredAt = lockItem.createdAt;
this.#setupLockTimeout();
this.#setupAutoRenewal();
const lockDuration = Date.now() - startTime;
this.metrics?.addMetric("LockAcquisitionDuration", "Milliseconds", lockDuration);
return true;
} catch (error) {
// Rethrow LockAcquisitionError instances
if (error instanceof LockAcquisitionError) {
throw error;
}
this.logger.error("Lock acquisition failed", { error });
throw new LockAcquisitionError("LockAcquisitionError", { cause: error });
}
}
cleanupTimers() {
if (this.#renewalInterval) {
clearInterval(this.#renewalInterval);
this.#renewalInterval = null;
}
if (this.#timeoutHandle) {
clearTimeout(this.#timeoutHandle);
this.#timeoutHandle = null;
}
}
cleanupLock() {
this.#hasLock = false;
this.#lockRevision = null;
this.#lockDurationMs = null;
if (this.#abortController) {
this.#abortController.abort();
}
}
/**
* Releases a currently held lock
* @param releaseLock - Optional function to override default lock release behavior
*/
async releaseLock(): Promise<void> {
if (!this.#hasLock) {
return;
}
if (!this.#lockRevision) {
throw new Error("DistributedLock does not have a revision, yet has the lock"); //typeguard mostly.
}
const startTime = Date.now();
try {
// Clear renewal state first to prevent any new renewals
this.cleanupTimers();
await this.#attemptRelease();
// Record lock hold duration if we have an acquisition time
if (this.#acquiredAt !== null) {
const holdDuration = Date.now() - this.#acquiredAt;
this.metrics?.addMetric("LockHoldDuration", "Milliseconds", holdDuration);
this.#acquiredAt = null;
}
// Clear lock state after successful release
this.cleanupLock();
} catch (error) {
if (this.#abortController) {
this.#abortController.abort();
this.#abortController = null;
}
throw error;
} finally {
const releaseDuration = Date.now() - startTime;
this.metrics?.addMetric("LockRenewal", "Count", this.renewalCount);
this.renewalCount = 0;
this.metrics?.addMetric("LockReleaseDuration", "Milliseconds", releaseDuration);
}
}
/**
* Private helper method to recursively attempt lock release with exponential backoff
* @param releaseLock - Function to release the lock
* @param attempt - Current attempt number
* @param maxRetries - Maximum number of retries allowed
* @param maxBackoffMs - Maximum backoff time in milliseconds
*/
async #attemptRelease(attempt = 0, maxRetries = 5, maxBackoffMs = 2000): Promise<void> {
if (!this.#hasLock || !this.#lockRevision) {
return;
}
if (attempt >= maxRetries) {
throw new Error(`Failed to release lock after ${maxRetries} attempts`);
}
try {
const lockItem = await this.#lockDb.releaseLock({
lockId: this.#lockId,
lockGroup: this.#lockGroup,
ownerName: this.#ownerName,
revision: this.#lockRevision,
});
if (!lockItem) {
return;
}
if (!lockItem.isReleased && lockItem.revision === this.#lockRevision) {
throw new Error("Lock release failed");
}
} catch (_error) {
//real world this would only happen if dynamodb has errors we don't handle within the db layer.
await setTimeoutPromise(Math.min((maxBackoffMs / maxRetries) * Math.pow(2, attempt), maxBackoffMs) + Math.random() * 100);
// Recursively try again with incremented attempt counter
return this.#attemptRelease(attempt + 1);
}
}
/**
* First tries to acquire a lock. If that fails (else path), waits for the existing lock to be released.
*
* [DO NOT REMOVE] This method implements a thundering herd coordination pattern:
* - When multiple processes need the same data (e.g., from an API with rate limits)
* - First process to acquire the lock fetches from API and stores in database
* - Other processes (else path) wait for RELEASED state, then read from database
* - This ensures only ONE process hits rate-limited APIs while others use cached data
*
* [DO NOT REMOVE] IMPORTANT: This method does NOT attempt to acquire the lock after seeing
* it was released. This is intentional - once a lock is released, the data should already
* be in the database, so other processes should read that data instead of trying to
* acquire the lock and refetch it. The "else" in the method name emphasizes this design:
* either you get the lock initially, or else you wait for release.
*
* [DO NOT REMOVE] Return values:
* - ACQUIRED: You got the lock initially, you should fetch data and store it
* - RELEASED: You hit the else path, another process did the work, read from database
* - FAILED: Timeout occurred in the else path, handle as error case
*
* @param lockDurationMs - How long the lock should be held if acquired
* @param pollIntervalMs - How often to check if lock is released (else path)
* @param maxWaitMs - Maximum time to wait for lock release (else path)
*/
async tryAcquireElseWaitForRelease(
lockDurationMs = Defaults.DEFAULT_LOCK_DURATION_MS,
pollIntervalMs = Defaults.DEFAULT_LOCK_POLL_INTERVAL_MS,
maxWaitMs = Defaults.DEFAULT_LOCK_MAX_WAIT_MS,
): Promise<"ACQUIRED" | "RELEASED" | "FAILED"> {
const startTime = Date.now();
let pollCount = 0;
let waitStart = 0;
let waitForLock = false;
this.logger.trace(
"Attempting to acquire or wait for lock",
{
lockId: this.#lockId,
lockGroup: this.#lockGroup,
ownerName: this.#ownerName,
lockDurationMs,
pollIntervalMs,
maxWaitMs,
},
{ method: "tryAcquireElseWaitForExpire" },
);
try {
this.logger.trace("TryAcquireElseWaitForRelease", { inputs: { lockDurationMs, pollIntervalMs, maxWaitMs } });
const acquiredLock = await this.acquireLock(lockDurationMs);
this.logger.debug("TryAcquireElseWaitForRelease:InitialAcquireResult", { acquiredLock: acquiredLock ? "true" : "false" });
if (acquiredLock) {
this.logger.debug("TryAcquireElseWaitForRelease:LockAcquisitionSuccess");
this.metrics?.addMetric("LockAcquisitionFailed", "Count", 0);
this.metrics?.addMetric("LockAcquisitionSuccess", "Count", 1);
this.metrics?.addMetric("LockAcquisitionError", "Count", 0);
return "ACQUIRED";
}
this.metrics?.addMetric("LockAcquisitionSuccess", "Count", 0);
waitStart = Date.now();
waitForLock = true;
this.logger.trace("TryAcquireElseWaitForRelease:FailedToAcquireLockImmediately", { waitStart });
const calculatedPollInterval = Math.max(pollIntervalMs, (this.#lockDurationMs ?? Defaults.DEFAULT_LOCK_DURATION_MS) / 2);
while (Date.now() - startTime < maxWaitMs) {
this.logger.trace("TryAcquireElseWaitForRelease:PollingForLockRelease", { pollCount: pollCount + 1 });
await setTimeoutPromise(calculatedPollInterval);
const lockItem = await this.#lockDb.getLock({
lockId: this.#lockId,
lockGroup: this.#lockGroup,
nowTimestamp: Date.now(),
});
this.logger.trace("TryAcquireElseWaitForRelease:LockPollResult", { lockItem, pollCount: pollCount + 1 });
pollCount++;
// no lock item, item released, hard expired, or not being renewed.
if (!lockItem || lockItem.isReleased || lockItem.expiresAt <= Date.now() || lockItem.createdAt + lockItem.lockDurationMs + calculatedPollInterval <= Date.now()) {
if (this.logger.level <= LogLevelThreshold.DEBUG) {
const noLockItem = !lockItem ? "TRUE" : "FALSE";
const expired = lockItem && lockItem.expiresAt <= Date.now() ? "TRUE" : "FALSE";
const notRenewed = lockItem && lockItem?.createdAt + lockItem.lockDurationMs + calculatedPollInterval <= Date.now() ? "TRUE" : "FALSE";
this.logger.debug("TryAcquireElseWaitForRelease:LockWasReleasedOrExpired", { pollCount, cause: { noLockItem, expired, notRenewed } });
}
if (lockItem) {
this.metrics?.addMetric("LockNotReleased", "Count", !lockItem.isReleased ? 1 : 0);
}
this.metrics?.addMetric("LockAcquisitionWaitTimeout", "Count", 0);
return "RELEASED";
}
this.logger.trace("TryAcquireElseWaitForRelease:LockStillHeld", {
calculatedPollInterval,
currentHolder: lockItem.ownerName,
expiresAt: new Date(lockItem.expiresAt).toISOString(),
});
}
this.logger.error("TryAcquireElseWaitForRelease:LockWaitTimeoutExceeded", {
waitDuration: Date.now() - waitStart,
maxWaitMs,
pollCount,
});
this.metrics?.addMetric("LockAcquisitionError", "Count", 0);
this.metrics?.addMetric("LockAcquisitionWaitTimeout", "Count", 1);
return "FAILED";
} catch (error) {
if (error instanceof LockAcquisitionError) {
this.metrics?.addMetric("LockAcquisitionError", "Count", 1);
this.logger.warn("LockAcquisitionError", { error }, { method: "tryAcquireElseWaitForExpire" });
return "FAILED";
}
this.logger.error("Unexpected error in tryAcquireElseWaitForExpire", { error }, { method: "tryAcquireElseWaitForExpire" });
throw error;
} finally {
this.metrics?.addMetric("WaitForLock", "Count", waitForLock ? 1 : 0);
if (waitForLock) {
this.metrics?.addMetric("LockAcquisitionWaitDuration", "Milliseconds", Date.now() - waitStart);
this.metrics?.addMetric("LockPollCount", "Count", pollCount);
}
}
}
/**
* Renews a currently held lock with a new duration
* @param lockDurationMs - New duration in milliseconds for which to hold the lock. Defaults to DEFAULT_LOCK_DURATION_MS
* @throws {Error} If lock is not held or revision is missing
*/
async renewLock(lockDurationMs = Defaults.DEFAULT_LOCK_DURATION_MS): Promise<void> {
if (!this.#hasLock) {
throw new Error("DistributedLock does not have the lock");
}
if (!this.#lockRevision) {
throw new Error("DistributedLock does not have a revision, yet has the lock"); //typeguard
}
const lockItem = await this.#lockDb.renewLock({
lockId: this.#lockId,
lockGroup: this.#lockGroup,
revision: this.#lockRevision,
lockDurationMs: this.#lockDurationMs ?? lockDurationMs,
signal: this.#abortController?.signal,
});
// check if the lock item is the same as the one we have.
if (lockItem.revision !== this.#lockRevision) {
throw new Error("Lock renewal failed: revision mismatch");
}
}
/**
* Manually triggers the abort signal if it exists
*/
triggerAbort(): void {
if (this.#abortController) {
this.#abortController.abort();
this.#abortController = null;
}
}
}
class LockAcquisitionError extends Error {
constructor(message: string, options: ErrorOptions) {
super(message, options);
this.name = "LockAcquisitionError";
}
}
export { DistributedLock, LockAcquisitionError };
export type { DistributedLockDependencies, DistributedLockParams };