-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPromise.ts
144 lines (124 loc) · 3.49 KB
/
Promise.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class NOTHING {}
const nothing = new NOTHING();
export interface Suspendable<T> extends PromiseLike<T> {
getOrSuspend(): T;
}
class Deferred<T> implements Suspendable<T> {
promise: Promise<T>;
error: Error | null;
_value: T | NOTHING;
_resolve: (value: T) => void;
_reject: (err: Error) => void;
constructor() {
this._resolve = null as any;
this._reject = null as any;
this._value = nothing;
this.error = null;
this.promise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
}) as Promise<T>;
}
resolve = (value: T) => {
if (this.isCompleted) throw new Error("promise already completed");
this._value = value;
this._resolve(value);
};
reject = (error: Error) => {
if (this.isCompleted) throw new Error("promise already completed");
this.error = error;
this._reject(error);
};
get isResolved() {
return this._value !== nothing;
}
get isRejected() {
return this.error !== null;
}
get isCompleted() {
return this.isResolved || this.isRejected;
}
get value() {
if (this.isResolved) return this._value as T;
if (this.isRejected) throw this.error;
throw new Error("value is not yet available");
}
then<TResult1 = T, TResult2 = never>(
onfulfilled?:
| ((value: T) => TResult1 | PromiseLike<TResult1>)
| undefined
| null,
onrejected?:
| ((reason: any) => TResult2 | PromiseLike<TResult2>)
| undefined
| null
): Deferred<TResult1 | TResult2> {
return Deferred.ofPromise(this.promise.then(onfulfilled, onrejected));
}
getOrSuspend(): T {
if (this.isResolved) return this._value as T;
if (this.isRejected) throw this.error;
throw this.promise;
}
static ofPromise<T>(promise: T | PromiseLike<T>) {
let deferred = new Deferred<T>();
Promise.resolve(promise).then(deferred.resolve, deferred.reject);
return deferred;
}
}
export type { Deferred };
export function isDeferred<T>(
value: Deferred<T> | unknown
): value is Deferred<T> {
return value instanceof Deferred;
}
export function deferred<T>(): Deferred<T> {
return new Deferred<T>();
}
export function suspendable<T>(
f: () => T | PromiseLike<T>
): () => Suspendable<T> {
let deferred: Deferred<T> | null = null;
return () => {
if (deferred == null) {
deferred = Deferred.ofPromise(f());
}
return deferred;
};
}
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/** Execute async tasks with limited concurrency. */
export type Pool = {
/** Pool size. */
size: number;
/** Execute `f` once there's a free slot in pool. */
run<T>(f: () => Promise<T>): Promise<T>;
};
/** Create a pool with `size` of simultaneously executing tasks. */
export function pool(size: number): Pool {
if (size <= 0) throw new Error("Pool: size must be > 0");
let slot: Deferred<void> = deferred();
return {
size,
async run<T>(f: () => Promise<T>): Promise<T> {
while (size === 0)
// No free slot, must wait for a free one. As there could me multiple
// tasks racing for the slot we wait in a loop.
await slot;
size = size - 1;
if (size === 0)
// Afterwards no free slot left, refresh deferred.
slot = deferred();
try {
return await f();
} finally {
if (size === 0)
// There's a free slot, resolve deferred.
slot.resolve();
size = size + 1;
}
},
};
}