Skip to content

Commit

Permalink
fix(merge): catch promise errors to avoid unhandled exceptions (#354)
Browse files Browse the repository at this point in the history
* fix(merge): catch promise errors to avoid unhandled exceptions

fixes #353
  • Loading branch information
trxcllnt authored Dec 30, 2023
1 parent 1e4dd6f commit 520e096
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
"devDependencies": {
"@types/glob": "7.1.1",
"@types/jest": "27.4.0",
"@typescript-eslint/eslint-plugin": "^5.31.0",
"@typescript-eslint/parser": "^5.31.0",
"@typescript-eslint/eslint-plugin": "^6.16.0",
"@typescript-eslint/parser": "^6.16.0",
"abortcontroller-polyfill": "1.4.0",
"async-done": "1.3.2",
"benchmark": "2.1.4",
Expand All @@ -67,8 +67,8 @@
"coveralls": "3.0.9",
"cz-conventional-changelog": "3.1.0",
"del": "5.1.0",
"eslint": "^8.20.0",
"eslint-plugin-jest": "^26.6.0",
"eslint": "^8.56.0",
"eslint-plugin-jest": "^27.6.0",
"esm": "https://github.com/jsg2021/esm/releases/download/v3.x.x-pr883/esm-3.x.x-pr883.tgz",
"glob": "7.1.6",
"google-closure-compiler": "20220601.0.0",
Expand Down
28 changes: 14 additions & 14 deletions src/asynciterable/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import { safeRace } from '../util/safeRace';
// eslint-disable-next-line @typescript-eslint/no-empty-function
const NEVER_PROMISE = new Promise(() => {});

type MergeResult<T> = { value: T; index: number };
type MergeResult<T> = { value: T; index: number; done?: boolean; error?: any };

function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) {
return promise.then((value) => ({ value, index })) as Promise<MergeResult<T>>;
function wrapPromiseWithIndex<T>(promise: Promise<IteratorResult<T>>, index: number) {
return promise
.then(({value, done}) => ({ value, done, index }))
.catch((error) => ({ error, index })) as Promise<MergeResult<T>>;
}

/** @ignore */
Expand All @@ -25,7 +27,7 @@ export class MergeAsyncIterable<T> extends AsyncIterableX<T> {
throwIfAborted(signal);
const length = this._source.length;
const iterators = new Array<AsyncIterator<T>>(length);
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length);
const nexts = new Array<Promise<MergeResult<T>>>(length);
let active = length;
for (let i = 0; i < length; i++) {
const iterator = wrapWithAbort(this._source[i], signal)[Symbol.asyncIterator]();
Expand All @@ -34,18 +36,16 @@ export class MergeAsyncIterable<T> extends AsyncIterableX<T> {
}

while (active > 0) {
const next = safeRace(nexts);
const {
value: { done: done$, value: value$ },
index,
} = await next;
if (done$) {
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE;
const next = await safeRace(nexts);
if (next.hasOwnProperty('error')) {
throw next.error;
} else if (next.done) {
nexts[next.index] = <Promise<MergeResult<T>>>NEVER_PROMISE;
active--;
} else {
const iterator$ = iterators[index];
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
yield value$;
const iterator$ = iterators[next.index];
nexts[next.index] = wrapPromiseWithIndex(iterator$.next(), next.index);
yield next.value;
}
}
}
Expand Down

0 comments on commit 520e096

Please sign in to comment.