diff --git a/.nvmrc b/.nvmrc index 3c03207..2bd5a0a 100644 --- a/.nvmrc +++ b/.nvmrc @@ -1 +1 @@ -18 +22 diff --git a/jest.config.js b/jest.config.js index 85190fb..06218e7 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,8 +1,6 @@ -/* - * For a detailed explanation regarding each configuration property, visit: - * https://jestjs.io/docs/configuration - */ +// @ts-check +/** @type {import('jest').Config} */ module.exports = { // All imported modules in your tests should be mocked automatically // automock: false, diff --git a/package-lock.json b/package-lock.json index 23c0e57..c9febd6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "@fastify/swagger": "^8.3.1", "@fastify/type-provider-typebox": "^3.2.0", "@sinclair/typebox": "^0.28.20", + "@types/node": "^22.14.1", "fastify": "^4.3.0", "fastify-metrics": "^10.2.0", "node-pg-migrate": "^6.2.2", @@ -26,7 +27,7 @@ "@commitlint/cli": "^17.5.0", "@commitlint/config-conventional": "^17.4.4", "@stacks/eslint-config": "^1.2.0", - "@types/jest": "^29.5.0", + "@types/jest": "^29.5.14", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", "babel-jest": "^29.5.0", @@ -35,15 +36,15 @@ "eslint-plugin-prettier": "^4.2.1", "eslint-plugin-tsdoc": "^0.2.17", "husky": "^8.0.3", - "jest": "^29.5.0", + "jest": "^29.7.0", "prettier": "^2.8.6", "rimraf": "^4.4.1", - "ts-jest": "^29.0.5", - "ts-node": "^10.9.1", - "typescript": "^5.0.2" + "ts-jest": "^29.3.1", + "ts-node": "^10.9.2", + "typescript": "^5.8.2" }, "engines": { - "node": ">=18" + "node": ">=22" } }, "node_modules/@aashutoshrathi/word-wrap": { @@ -2803,10 +2804,11 @@ } }, "node_modules/@types/jest": { - "version": "29.5.5", - "resolved": "https://registry.npmjs.org/@types/jest/-/jest-29.5.5.tgz", - "integrity": "sha512-ebylz2hnsWR9mYvmBFbXJXr+33UPc4+ZdxyDXh5w0FlPBTfCVN3wPL+kuOiQt3xvrK419v7XWeAs+AeOksafXg==", + "version": "29.5.14", + "resolved": "https://registry.npmjs.org/@types/jest/-/jest-29.5.14.tgz", + "integrity": "sha512-ZN+4sdnLUbo8EVvVc2ao0GFW6oVrQRPn4K2lglySj7APvSrgzxHiNNK99us4WDMi57xxA2yggblIAMNhXOotLQ==", "dev": true, + "license": "MIT", "dependencies": { "expect": "^29.0.0", "pretty-format": "^29.0.0" @@ -2831,9 +2833,13 @@ "dev": true }, "node_modules/@types/node": { - "version": "20.8.2", - "resolved": "https://registry.npmjs.org/@types/node/-/node-20.8.2.tgz", - "integrity": "sha512-Vvycsc9FQdwhxE3y3DzeIxuEJbWGDsnrxvMADzTDF/lcdR9/K+AQIeAghTQsHtotg/q0j3WEOYS/jQgSdWue3w==" + "version": "22.14.1", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.14.1.tgz", + "integrity": "sha512-u0HuPQwe/dHrItgHHpmw3N2fYCR6x4ivMNbPHRkBVP4CvN+kiRrKHWk3i8tXiO/joPwXLMYvF9TTF0eqgHIuOw==", + "license": "MIT", + "dependencies": { + "undici-types": "~6.21.0" + } }, "node_modules/@types/normalize-package-data": { "version": "2.4.2", @@ -3545,6 +3551,13 @@ "node": ">=0.10.0" } }, + "node_modules/async": { + "version": "3.2.6", + "resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz", + "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==", + "dev": true, + "license": "MIT" + }, "node_modules/atomic-sleep": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/atomic-sleep/-/atomic-sleep-1.0.0.tgz", @@ -4514,6 +4527,22 @@ "node": ">=8" } }, + "node_modules/ejs": { + "version": "3.1.10", + "resolved": "https://registry.npmjs.org/ejs/-/ejs-3.1.10.tgz", + "integrity": "sha512-UeJmFfOrAQS8OJWPZ4qtgHyWExa088/MtK5UEyoJGFH67cDEXkZSviOiKRCZ4Xij0zxI3JECgYs3oKx+AizQBA==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "jake": "^10.8.5" + }, + "bin": { + "ejs": "bin/cli.js" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/electron-to-chromium": { "version": "1.4.539", "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.539.tgz", @@ -5460,6 +5489,39 @@ "node": "^10.12.0 || >=12.0.0" } }, + "node_modules/filelist": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/filelist/-/filelist-1.0.4.tgz", + "integrity": "sha512-w1cEuf3S+DrLCQL7ET6kz+gmlJdbq9J7yXCSjK/OZCPA+qEN1WyF4ZAf0YYJa4/shHJra2t/d/r8SV4Ji+x+8Q==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "minimatch": "^5.0.1" + } + }, + "node_modules/filelist/node_modules/brace-expansion": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", + "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "dev": true, + "license": "MIT", + "dependencies": { + "balanced-match": "^1.0.0" + } + }, + "node_modules/filelist/node_modules/minimatch": { + "version": "5.1.6", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.1.6.tgz", + "integrity": "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==", + "dev": true, + "license": "ISC", + "dependencies": { + "brace-expansion": "^2.0.1" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/fill-range": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", @@ -6538,11 +6600,107 @@ "node": ">=8" } }, + "node_modules/jake": { + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/jake/-/jake-10.9.2.tgz", + "integrity": "sha512-2P4SQ0HrLQ+fw6llpLnOaGAvN2Zu6778SJMrCUwns4fOoG9ayrTiZk3VV8sCPkVZF8ab0zksVpS8FDY5pRCNBA==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "async": "^3.2.3", + "chalk": "^4.0.2", + "filelist": "^1.0.4", + "minimatch": "^3.1.2" + }, + "bin": { + "jake": "bin/cli.js" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/jake/node_modules/ansi-styles": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", + "integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==", + "dev": true, + "license": "MIT", + "dependencies": { + "color-convert": "^2.0.1" + }, + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/chalk/ansi-styles?sponsor=1" + } + }, + "node_modules/jake/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, + "node_modules/jake/node_modules/color-convert": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", + "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "color-name": "~1.1.4" + }, + "engines": { + "node": ">=7.0.0" + } + }, + "node_modules/jake/node_modules/color-name": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", + "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==", + "dev": true, + "license": "MIT" + }, + "node_modules/jake/node_modules/has-flag": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", + "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=8" + } + }, + "node_modules/jake/node_modules/supports-color": { + "version": "7.2.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", + "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==", + "dev": true, + "license": "MIT", + "dependencies": { + "has-flag": "^4.0.0" + }, + "engines": { + "node": ">=8" + } + }, "node_modules/jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest/-/jest-29.7.0.tgz", "integrity": "sha512-NIy3oAFp9shda19hy4HK0HRTWKtPJmGdnvywu01nOqNC2vZg+Z+fvJDxpMQA88eb2I9EcafcdjYgsDthnYTvGw==", "dev": true, + "license": "MIT", "dependencies": { "@jest/core": "^29.7.0", "@jest/types": "^29.6.3", @@ -10556,28 +10714,32 @@ } }, "node_modules/ts-jest": { - "version": "29.1.1", - "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.1.1.tgz", - "integrity": "sha512-D6xjnnbP17cC85nliwGiL+tpoKN0StpgE0TeOjXQTU6MVCfsB4v7aW05CgQ/1OywGb0x/oy9hHFnN+sczTiRaA==", + "version": "29.3.1", + "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.3.1.tgz", + "integrity": "sha512-FT2PIRtZABwl6+ZCry8IY7JZ3xMuppsEV9qFVHOVe8jDzggwUZ9TsM4chyJxL9yi6LvkqcZYU3LmapEE454zBQ==", "dev": true, + "license": "MIT", "dependencies": { - "bs-logger": "0.x", - "fast-json-stable-stringify": "2.x", + "bs-logger": "^0.2.6", + "ejs": "^3.1.10", + "fast-json-stable-stringify": "^2.1.0", "jest-util": "^29.0.0", "json5": "^2.2.3", - "lodash.memoize": "4.x", - "make-error": "1.x", - "semver": "^7.5.3", - "yargs-parser": "^21.0.1" + "lodash.memoize": "^4.1.2", + "make-error": "^1.3.6", + "semver": "^7.7.1", + "type-fest": "^4.38.0", + "yargs-parser": "^21.1.1" }, "bin": { "ts-jest": "cli.js" }, "engines": { - "node": "^14.15.0 || ^16.10.0 || >=18.0.0" + "node": "^14.15.0 || ^16.10.0 || ^18.0.0 || >=20.0.0" }, "peerDependencies": { "@babel/core": ">=7.0.0-beta.0 <8", + "@jest/transform": "^29.0.0", "@jest/types": "^29.0.0", "babel-jest": "^29.0.0", "jest": "^29.0.0", @@ -10587,6 +10749,9 @@ "@babel/core": { "optional": true }, + "@jest/transform": { + "optional": true + }, "@jest/types": { "optional": true }, @@ -10598,26 +10763,12 @@ } } }, - "node_modules/ts-jest/node_modules/lru-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", - "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", - "dev": true, - "dependencies": { - "yallist": "^4.0.0" - }, - "engines": { - "node": ">=10" - } - }, "node_modules/ts-jest/node_modules/semver": { - "version": "7.5.4", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", - "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", + "version": "7.7.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.1.tgz", + "integrity": "sha512-hlq8tAfn0m/61p4BVRcPzIGr6LKiMwo4VM6dGi6pt4qcRkmNzTcWq6eCEjEh+qXjkMDvPlOFFSGwQjoEa6gyMA==", "dev": true, - "dependencies": { - "lru-cache": "^6.0.0" - }, + "license": "ISC", "bin": { "semver": "bin/semver.js" }, @@ -10625,11 +10776,18 @@ "node": ">=10" } }, - "node_modules/ts-jest/node_modules/yallist": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", - "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==", - "dev": true + "node_modules/ts-jest/node_modules/type-fest": { + "version": "4.39.0", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.39.0.tgz", + "integrity": "sha512-w2IGJU1tIgcrepg9ZJ82d8UmItNQtOFJG0HCUE3SzMokKkTsruVDALl2fAdiEzJlfduoU+VyXJWIIUZ+6jV+nw==", + "dev": true, + "license": "(MIT OR CC0-1.0)", + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } }, "node_modules/ts-jest/node_modules/yargs-parser": { "version": "21.1.1", @@ -10641,10 +10799,11 @@ } }, "node_modules/ts-node": { - "version": "10.9.1", - "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.1.tgz", - "integrity": "sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==", + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.2.tgz", + "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", "dev": true, + "license": "MIT", "dependencies": { "@cspotcode/source-map-support": "^0.8.0", "@tsconfig/node10": "^1.0.7", @@ -10836,10 +10995,11 @@ } }, "node_modules/typescript": { - "version": "5.2.2", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.2.2.tgz", - "integrity": "sha512-mI4WrpHsbCIcwT9cF4FZvr80QUeKvsUsUvKDoR+X/7XHQH98xYD8YHZg7ANtz2GtZt/CBq2QJ0thkGJMHfqc1w==", + "version": "5.8.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.8.2.tgz", + "integrity": "sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ==", "dev": true, + "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -10863,6 +11023,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/undici-types": { + "version": "6.21.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", + "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", + "license": "MIT" + }, "node_modules/universalify": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.0.tgz", diff --git a/package.json b/package.json index c3b8763..12714ae 100644 --- a/package.json +++ b/package.json @@ -36,13 +36,13 @@ "homepage": "https://github.com/hirosystems/api-toolkit#readme", "prettier": "@stacks/prettier-config", "engines": { - "node": ">=18" + "node": ">=22" }, "devDependencies": { "@commitlint/cli": "^17.5.0", "@commitlint/config-conventional": "^17.4.4", "@stacks/eslint-config": "^1.2.0", - "@types/jest": "^29.5.0", + "@types/jest": "^29.5.14", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", "babel-jest": "^29.5.0", @@ -51,18 +51,19 @@ "eslint-plugin-prettier": "^4.2.1", "eslint-plugin-tsdoc": "^0.2.17", "husky": "^8.0.3", - "jest": "^29.5.0", + "jest": "^29.7.0", "prettier": "^2.8.6", "rimraf": "^4.4.1", - "ts-jest": "^29.0.5", - "ts-node": "^10.9.1", - "typescript": "^5.0.2" + "ts-jest": "^29.3.1", + "ts-node": "^10.9.2", + "typescript": "^5.8.2" }, "dependencies": { "@fastify/cors": "^8.0.0", "@fastify/swagger": "^8.3.1", "@fastify/type-provider-typebox": "^3.2.0", "@sinclair/typebox": "^0.28.20", + "@types/node": "^22.14.1", "fastify": "^4.3.0", "fastify-metrics": "^10.2.0", "node-pg-migrate": "^6.2.2", diff --git a/src/helpers/__tests__/my-worker-export-default.ts b/src/helpers/__tests__/my-worker-export-default.ts new file mode 100644 index 0000000..ced8800 --- /dev/null +++ b/src/helpers/__tests__/my-worker-export-default.ts @@ -0,0 +1,5 @@ +import * as myWorker from './my-worker'; +export default { + workerModule: myWorker.workerModule, + processTask: myWorker.processTask, +}; diff --git a/src/helpers/__tests__/my-worker.ts b/src/helpers/__tests__/my-worker.ts new file mode 100644 index 0000000..f6493b5 --- /dev/null +++ b/src/helpers/__tests__/my-worker.ts @@ -0,0 +1,61 @@ +/** Block the thread for `ms` milliseconds */ +function sleepSync(ms: number) { + Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms); +} + +export function processTask(req: number, cpuWaitTimeMs: number) { + if (req === 2222) { + throw createError(); + } + if (req === 3333) { + throw 'boom'; + } + if (req == 4444) { + throw createAggregateError(); + } + if (req === 5555) { + throwDOMError(); + } + sleepSync(cpuWaitTimeMs); + return req.toString(); +} + +export class MyCustomError extends Error { + constructor(message?: string) { + super(message); + this.name = this.constructor.name; + } +} + +function createError() { + const error = new MyCustomError(`Error at req`); + Object.assign(error, { code: 123 }); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (error as any).randoProp = { + foo: 'bar', + baz: 123, + aggregate: [ + Object.assign(new Error('Error in aggregate 1'), { inner1code: 123 }), + new MyCustomError('Error in aggregate 2'), + ], + sourceError: Object.assign(new MyCustomError('Source error'), { + sourceErrorInfo: { code: 44 }, + }), + }; + return error; +} + +function createAggregateError() { + const error1 = new Error('Error1 in aggregate 1'); + Object.assign(error1, { inner1code: 123 }); + const error2 = new TypeError('Error2 in aggregate 2'); + return new AggregateError([error1, error2], 'My aggregate error message', { cause: 'foo' }); +} + +function throwDOMError() { + const ac = new AbortController(); + ac.abort(); + ac.signal.throwIfAborted(); +} + +export const workerModule = module; diff --git a/src/helpers/__tests__/worker.test.ts b/src/helpers/__tests__/worker.test.ts new file mode 100644 index 0000000..1f626f0 --- /dev/null +++ b/src/helpers/__tests__/worker.test.ts @@ -0,0 +1,225 @@ +import * as assert from 'node:assert/strict'; +import * as os from 'node:os'; +import { WorkerThreadManager } from '../worker-thread-manager'; +import * as workerModule from './my-worker'; +import workerModuleDefaultExport from './my-worker-export-default'; +import { MyCustomError } from './my-worker'; +import { addKnownErrorConstructor } from '../serialize-error'; +import { stopwatch } from '../time'; + +test('worker module with default exports', async () => { + const workerManager = await WorkerThreadManager.init(workerModuleDefaultExport, { + workerCount: 2, + }); + const res = await workerManager.exec(1, 1); + expect(res).toBe('1'); + await workerManager.close(); +}); + +describe('Worker tests', () => { + let workerManager: Awaited>; + const workerCount = Math.min(4, os.cpus().length); + const cpuPeggedTimeMs = 500; + + function initWorkerManager() { + return WorkerThreadManager.init(workerModule, { workerCount }); + } + + beforeAll(async () => { + addKnownErrorConstructor(MyCustomError); + console.time('worker manager init'); + const manager = await initWorkerManager(); + console.timeEnd('worker manager init'); + workerManager = manager; + }); + + afterAll(async () => { + await workerManager.close(); + }); + + test('run tasks with workers', async () => { + const watch = stopwatch(); + const taskPromises = Array.from({ length: workerCount }, async (_, i) => { + console.time(`task ${i}`); + const res = await workerManager.exec(i, cpuPeggedTimeMs); + console.timeEnd(`task ${i}`); + return res; + }); + + // Ensure all workers were assigned a task + expect(workerManager.busyWorkerCount).toBe(workerCount); + expect(workerManager.idleWorkerCount).toBe(0); + + const results = await Promise.allSettled(taskPromises); + + // All tasks should complete roughly within the time in takes for one task to complete + // because the tasks are run in parallel on different threads. + expect(watch.getElapsed()).toBeLessThan(cpuPeggedTimeMs * 1.75); + + // Ensure tasks returned in expected order: + for (let i = 0; i < workerCount; i++) { + const result = results[i]; + assert(result.status === 'fulfilled'); + expect(result.value).toBe(i.toString()); + } + }); + + test('worker task throws with non-Error value', async () => { + const [res] = await Promise.allSettled([ + // The worker will throw a non-error value when it receives this specific req value + workerManager.exec(3333, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBe('boom'); + }); + + test('worker task throws error', async () => { + // Test that error de/ser across worker thread boundary works as expected + const [res] = await Promise.allSettled([ + // The worker will throw an error when it receives this specific req value + workerManager.exec(2222, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBeInstanceOf(MyCustomError); + expect(res.reason).toMatchObject({ + name: 'MyCustomError', + message: 'Error at req', + code: 123, + stack: expect.any(String), + randoProp: { + foo: 'bar', + baz: 123, + aggregate: [ + { + name: 'Error', + message: 'Error in aggregate 1', + inner1code: 123, + stack: expect.any(String), + }, + { + name: 'MyCustomError', + message: 'Error in aggregate 2', + stack: expect.any(String), + }, + ], + sourceError: { + name: 'MyCustomError', + message: 'Source error', + sourceErrorInfo: { + code: 44, + }, + stack: expect.any(String), + }, + }, + }); + }); + + test('worker task throws with non-Error value', async () => { + const [res] = await Promise.allSettled([ + // The worker will throw a non-error value when it receives this specific req value + workerManager.exec(3333, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBe('boom'); + }); + + test('worker task serializes AggregateError', async () => { + // Test that error de/ser across worker thread boundary works as expected + const [res] = await Promise.allSettled([ + // The worker will throw an error when it receives this specific req value + workerManager.exec(4444, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBeInstanceOf(AggregateError); + expect(res.reason).toMatchObject({ + name: 'AggregateError', + message: 'My aggregate error message', + stack: expect.any(String), + cause: 'foo', + errors: [ + { + name: 'Error', + message: 'Error1 in aggregate 1', + inner1code: 123, + stack: expect.any(String), + }, + { + name: 'TypeError', + message: 'Error2 in aggregate 2', + stack: expect.any(String), + }, + ], + }); + }); + + test('worker task serializes DOMException (AbortError)', async () => { + // Test that error de/ser across worker thread boundary works as expected + const [res] = await Promise.allSettled([ + // The worker will throw an error when it receives this specific req value + workerManager.exec(5555, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBeInstanceOf(DOMException); + expect(res.reason).toMatchObject({ + constructor: expect.objectContaining({ + name: 'DOMException', + }), + name: 'AbortError', + message: 'This operation was aborted', + stack: expect.any(String), + }); + }); + + test('run tasks on main thread', async () => { + const watch = stopwatch(); + const results = await Promise.allSettled( + Array.from({ length: workerCount }, (_, i) => { + return Promise.resolve().then(() => workerModule.processTask(i, cpuPeggedTimeMs)); + }) + ); + + // All tasks should take at least as long as taskCount * cpuPeggedTimeMs because + // they are run synchronously on the main thread. + expect(watch.getElapsed()).toBeGreaterThanOrEqual(workerCount * cpuPeggedTimeMs); + + // Ensure tasks returned in expected order: + for (let i = 0; i < workerCount; i++) { + const result = results[i]; + assert(result.status === 'fulfilled'); + expect(result.value).toBe(i.toString()); + } + }); + + test('Run more tasks than CPUs', async () => { + const watch = stopwatch(); + const taskCount = workerManager.workerCount * 3; + const taskTime = 50; + const taskPromises = Array.from({ length: taskCount }, async (_, i) => { + console.time(`task ${i}`); + const res = await workerManager.exec(i, taskTime); + console.timeEnd(`task ${i}`); + return res; + }); + + // Ensure all workers were assigned a task and queue is correct length + expect(workerManager.busyWorkerCount).toBe(workerCount); + expect(workerManager.idleWorkerCount).toBe(0); + expect(workerManager.queuedJobCount).toBe(taskCount - workerCount); + + const results = await Promise.allSettled(taskPromises); + + // All tasks should complete roughly within the time in takes for one task to complete + // because the tasks are run in parallel on different threads. + // (Pad timing with an extra 50% to account for test code execution overhead) + expect(watch.getElapsed()).toBeLessThan( + Math.ceil(taskCount / workerManager.workerCount) * taskTime * 1.5 + ); + + // Ensure tasks returned in expected order: + for (let i = 0; i < taskPromises.length; i++) { + const result = results[i]; + assert(result.status === 'fulfilled'); + expect(result.value).toBe(i.toString()); + } + }); +}); diff --git a/src/helpers/index.ts b/src/helpers/index.ts index 4fa4ee7..e3d91a2 100644 --- a/src/helpers/index.ts +++ b/src/helpers/index.ts @@ -1,3 +1,5 @@ export * from './iterators'; export * from './time'; export * from './values'; +export { WorkerThreadManager } from './worker-thread-manager'; +export type { WorkerPoolModuleInterface } from './worker-thread-manager'; diff --git a/src/helpers/serialize-error.ts b/src/helpers/serialize-error.ts new file mode 100644 index 0000000..c05b000 --- /dev/null +++ b/src/helpers/serialize-error.ts @@ -0,0 +1,161 @@ +/* eslint-disable @typescript-eslint/no-unsafe-member-access */ +/* eslint-disable @typescript-eslint/no-explicit-any */ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ + +const errorConstructors = new Map( + [ + // Native ES errors https://262.ecma-international.org/12.0/#sec-well-known-intrinsic-objects + Error, + EvalError, + RangeError, + ReferenceError, + SyntaxError, + TypeError, + URIError, + AggregateError, + + // Built-in errors + globalThis.DOMException, + + // Node-specific errors https://nodejs.org/api/errors.html + (globalThis as any).AssertionError as Error, + (globalThis as any).SystemError as Error, + ] + // Non-native Errors are used with `globalThis` because they might be missing. This filter drops them when undefined. + .filter(Boolean) + .map(constructor => [constructor.name, constructor as ErrorConstructor] as const) +); + +/** + * Custom errors can only be deserialized correctly if they are registered here. + */ +export function addKnownErrorConstructor( + constructor: new (message?: string, ..._arguments: unknown[]) => Error +) { + try { + new constructor(); + } catch (error) { + throw new Error(`The error constructor "${constructor.name}" is not compatible`, { + cause: error, + }); + } + errorConstructors.set(constructor.name, constructor as ErrorConstructor); +} + +const commonProperties: Record = { + name: false, + message: false, + stack: false, + code: true, + cause: false, + errors: false, +}; + +type SerializedError = { + constructorName: string; + name: string; + message: string; + stack: string; + [key: string]: any; +}; + +export type ErrorLike = { + name: string; + message: string; + stack: string; +}; + +export function isErrorLike(value: unknown): value is ErrorLike { + return ( + typeof value === 'object' && + value !== null && + 'name' in value && + 'message' in value && + 'stack' in value && + typeof (value as Error).name === 'string' && + typeof (value as Error).message === 'string' && + typeof (value as Error).stack === 'string' + ); +} + +export function serializeError(subject: Error): SerializedError { + if (!isErrorLike(subject)) { + // If the subject is not an error, for example `throw "boom", then we throw. + // This function should only be passed error objects, callers can use `isErrorLike`. + throw new TypeError('Failed to serialize error, expected an error object'); + } + + const data: SerializedError = { + constructorName: subject.constructor.name ?? 'Error', // new field + name: subject.name, + message: '', + stack: '', + }; + + for (const key of Object.keys(commonProperties)) { + if (key in subject) data[key] = deepSerialize((subject as any)[key]); + } + + // Include any other enumerable own properties + for (const key of Object.keys(subject)) { + if (!(key in data)) data[key] = deepSerialize((subject as any)[key]); + } + + return data; +} + +export function deserializeError(subject: ErrorLike): Error { + if (!isErrorLike(subject)) { + // If the subject is not an error, for example `throw "boom", then we throw. + // This function should only be passed error objects, callers can use `isErrorLike`. + throw new TypeError('Failed to desserialize error, expected an error object'); + } + + let con = errorConstructors.get((subject as SerializedError).constructorName ?? subject.name); + if (!con) { + // If the constructor is not found, use the generic Error constructor + con = Error; + console.error( + `Error constructor "${subject.name}" not found during worker error deserialization, using generic Error constructor` + ); + } + const output = Object.create(con.prototype) as Error; + + for (const [key, enumerable] of Object.entries(commonProperties)) { + if (key in subject) { + Object.defineProperty(output, key, { + enumerable, + configurable: true, + writable: true, + value: deepDeserialize((subject as any)[key]), + }); + } + } + + // Add any other properties (custom props not in commonProperties) + for (const key of Object.keys(subject)) { + if (!(key in commonProperties)) { + (output as any)[key] = deepDeserialize((subject as any)[key]); + } + } + + return output; +} + +function deepSerialize(value: unknown): unknown { + if (Array.isArray(value)) return value.map(deepSerialize); + if (isErrorLike(value)) return serializeError(value); + if (value && typeof value === 'object') { + return Object.fromEntries(Object.entries(value).map(([k, v]) => [k, deepSerialize(v)])); + } + return value; +} + +function deepDeserialize(value: unknown): unknown { + if (Array.isArray(value)) return value.map(deepDeserialize); + if (isErrorLike(value)) return deserializeError(value); + if (value && typeof value === 'object') { + return Object.fromEntries(Object.entries(value).map(([k, v]) => [k, deepDeserialize(v)])); + } + return value; +} diff --git a/src/helpers/worker-thread-init.ts b/src/helpers/worker-thread-init.ts new file mode 100644 index 0000000..b02b318 --- /dev/null +++ b/src/helpers/worker-thread-init.ts @@ -0,0 +1,79 @@ +import * as WorkerThreads from 'node:worker_threads'; +import type { + WorkerDataInterface, + WorkerPoolModuleInterface, + WorkerReqMsg, + WorkerRespMsg, +} from './worker-thread-manager'; +import { isErrorLike, serializeError } from './serialize-error'; + +// Minimal worker thread initialization code. This file is the entry point for worker threads +// and is responsible for setting up the worker environment and handling messages from the main thread. +// Imports should be kept to a minimum to avoid in worker thread init overhead and memory usage. + +export const filename = __filename; + +/** + * Invokes a function that may return a value or a promise, and passes the result + * to a callback in a consistent format. Handles both synchronous and asynchronous cases, + * ensuring type safety and avoiding unnecessary async transitions for sync functions. + */ +function getMaybePromiseResult( + fn: () => T | Promise, + cb: (result: { ok: T; err?: null } | { ok?: null; err: unknown }) => void +): void { + try { + const maybePromise = fn(); + if (maybePromise instanceof Promise) { + maybePromise.then( + ok => cb({ ok }), + (err: unknown) => cb({ err }) + ); + } else { + cb({ ok: maybePromise }); + } + } catch (err: unknown) { + cb({ err }); + } +} + +// Check if this file is being run in a worker thread. If so, it will set up the worker environment. +if (!WorkerThreads.isMainThread && (WorkerThreads.workerData as WorkerDataInterface)?.workerFile) { + const { workerFile } = WorkerThreads.workerData as WorkerDataInterface; + // eslint-disable-next-line @typescript-eslint/no-require-imports, @typescript-eslint/no-var-requires + const workerModule = require(workerFile) as WorkerPoolModuleInterface; + const parentPort = WorkerThreads.parentPort as WorkerThreads.MessagePort; + // Determine if the worker module `processTask` function is a default export or a named export. + const processTask = + 'default' in workerModule ? workerModule.default.processTask : workerModule.processTask; + parentPort.on('messageerror', err => { + console.error(`Worker thread message error`, err); + }); + parentPort.on('message', (message: unknown) => { + const msg = message as WorkerReqMsg; + getMaybePromiseResult( + () => processTask(...msg.req), + result => { + try { + let reply: WorkerRespMsg; + if (result.ok) { + reply = { + msgId: msg.msgId, + resp: result.ok, + }; + } else { + const error = isErrorLike(result.err) ? serializeError(result.err) : result.err; + reply = { + msgId: msg.msgId, + error, + }; + } + parentPort.postMessage(reply); + } catch (err: unknown) { + console.error(`Critical bug in work task processing`, err); + } + } + ); + }); + parentPort.postMessage('ready'); +} diff --git a/src/helpers/worker-thread-manager.ts b/src/helpers/worker-thread-manager.ts new file mode 100644 index 0000000..2087dbe --- /dev/null +++ b/src/helpers/worker-thread-manager.ts @@ -0,0 +1,200 @@ +import * as WorkerThreads from 'node:worker_threads'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { EventEmitter, addAbortListener } from 'node:events'; +import { waiter, Waiter } from './time'; +import { deserializeError, isErrorLike } from './serialize-error'; +import { filename as workerThreadInitFilename } from './worker-thread-init'; + +export type WorkerDataInterface = { + workerFile: string; +}; + +export type WorkerReqMsg = { + msgId: number; + req: TArgs; +}; + +export type WorkerRespMsg = { + msgId: number; +} & ( + | { + resp: TResp; + error?: null; + } + | { + resp?: null; + error: TErr; + } +); + +export type WorkerPoolModuleInterface = + | { + workerModule: NodeJS.Module; + processTask: (...args: TArgs) => Promise | TResp; + } + | { + default: { + workerModule: NodeJS.Module; + processTask: (...args: TArgs) => Promise | TResp; + }; + }; + +export class WorkerThreadManager { + private readonly workers = new Set(); + private readonly idleWorkers: WorkerThreads.Worker[] = []; + + private readonly jobQueue: WorkerReqMsg[] = []; + private readonly msgRequests: Map> = new Map(); + private lastMsgId = 0; + + readonly workerCount: number; + readonly workerFile: string; + + private readonly abortControlller = new AbortController(); + + readonly events = new EventEmitter<{ + workersReady: []; + }>(); + + get idleWorkerCount() { + return this.idleWorkers.length; + } + + get busyWorkerCount() { + return this.workerCount - this.idleWorkers.length; + } + + get queuedJobCount() { + return this.jobQueue.length; + } + + public static init( + workerModule: WorkerPoolModuleInterface, + opts: { workerCount?: number } = {} + ) { + const workerManager = new WorkerThreadManager(workerModule, opts); + return new Promise>(resolve => { + workerManager.events.once('workersReady', () => { + resolve(workerManager); + }); + }); + } + + constructor( + workerModule: WorkerPoolModuleInterface, + opts: { workerCount?: number } = {} + ) { + if (!WorkerThreads.isMainThread) { + throw new Error(`${this.constructor.name} must be instantiated in the main thread`); + } + + if ('default' in workerModule) { + this.workerFile = workerModule.default.workerModule.filename; + } else { + this.workerFile = workerModule.workerModule.filename; + } + this.workerCount = opts.workerCount ?? os.cpus().length; + this.createWorkerPool(); + } + + exec(...args: TArgs): Promise { + this.abortControlller.signal.throwIfAborted(); + if (this.lastMsgId >= Number.MAX_SAFE_INTEGER) { + this.lastMsgId = 0; + } + const msgId = this.lastMsgId++; + const replyWaiter = waiter(); + this.msgRequests.set(msgId, replyWaiter); + const reqMsg: WorkerReqMsg = { + msgId, + req: args, + }; + this.jobQueue.push(reqMsg); + this.assignJobs(); + return replyWaiter; + } + + createWorkerPool() { + let workersReady = 0; + for (let i = 0; i < this.workerCount; i++) { + const workerData: WorkerDataInterface = { + workerFile: this.workerFile, + }; + const workerOpt: WorkerThreads.WorkerOptions = { + workerData, + }; + if (path.extname(workerThreadInitFilename) === '.ts') { + if (process.env.NODE_ENV !== 'test') { + throw new Error( + 'Worker threads are being created with ts-node outside of a test environment' + ); + } + workerOpt.execArgv = ['-r', 'ts-node/register/transpile-only']; + } + const worker = new WorkerThreads.Worker(workerThreadInitFilename, workerOpt); + worker.unref(); + this.workers.add(worker); + worker.on('error', err => { + console.error(`Worker error`, err); + }); + worker.on('messageerror', err => { + console.error(`Worker message error`, err); + }); + worker.once('message', (message: unknown) => { + if (message !== 'ready') { + throw new Error(`Unexpected first msg from worker thread: ${JSON.stringify(message)}`); + } + this.setupWorkerHandler(worker); + this.idleWorkers.push(worker); + this.assignJobs(); + workersReady++; + if (workersReady === this.workerCount) { + this.events.emit('workersReady'); + } + }); + } + addAbortListener(this.abortControlller.signal, () => { + for (const replyWaiter of this.msgRequests.values()) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + replyWaiter.reject(this.abortControlller.signal.reason); + } + this.msgRequests.clear(); + }); + } + + private setupWorkerHandler(worker: WorkerThreads.Worker) { + worker.on('message', (message: unknown) => { + this.idleWorkers.push(worker); + this.assignJobs(); + const msg = message as WorkerRespMsg; + const replyWaiter = this.msgRequests.get(msg.msgId); + if (replyWaiter) { + if (msg.error) { + const error = isErrorLike(msg.error) ? deserializeError(msg.error) : msg.error; + replyWaiter.reject(error as Error); + } else if (msg.resp) { + replyWaiter.resolve(msg.resp); + } + this.msgRequests.delete(msg.msgId); + } else { + console.error('Received unexpected message from worker', msg); + } + }); + } + + private assignJobs() { + while (this.idleWorkers.length > 0 && this.jobQueue.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const worker = this.idleWorkers.shift()!; + const job = this.jobQueue.shift(); + worker.postMessage(job); + } + } + + async close() { + this.abortControlller.abort(); + await Promise.all([...this.workers].map(worker => worker.terminate())); + this.workers.clear(); + } +} diff --git a/tsconfig.json b/tsconfig.json index e82cef2..0f066c3 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,12 +1,12 @@ { "compilerOptions": { - "target": "es2021", - "lib": [ "es2021" ], - "module": "commonjs", - "moduleResolution": "node", + "target": "es2024", + "module": "node16", + "moduleResolution": "node16", "typeRoots": [ "./node_modules/@types" ], + "isolatedModules": true, "declaration": true, "sourceMap": true, "outDir": "./dist",