Skip to content

feat: worker thread pool #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open

feat: worker thread pool #41

wants to merge 11 commits into from

Conversation

zone117x
Copy link
Member

@zone117x zone117x commented Apr 17, 2025

This PR implements a utility for using node:worker_modules with a type-safe interface. Additionally, it implements:

  • Task queueing with FIFO when there are more pending tasks than logical CPUs (or the max worker count which defaults to logical CPU count but can be configured).
  • Error serialization: js errors have many quirks with being serialized/deserialized -- this PR implements a helper util that assists in ensuring error handling works as expected.

Example usage

Example of a worker module

First, define a worker module in a new file. This module is initialized in each worker thread, in an isolated environment. See the nodejs docs for more information.

The module must export two properties:

  • A processTask function that can be sync or async and can take any amount of arguments
  • A workerModule constant that is assigned to the global module variable for this file

These two properties can either be exported by themselves, or there can be a single default export object, e.g. a file named example-worker.ts:

export const processTask = (x: number, y: number) => x * y;
export const workerModule = module;

or

export default {
  processTask: (x: number, y: number) => x * y,
  workerModule: module,
}

Example manager usage

Once a worker module is implemented, for example the above example-worker.ts module, then it can be used by the WorkerThreadManager utility implemented in this PR.

For example, here's a main.ts file using WorkerThreadManager to parallelize the code from example-worker.ts using multiple logical CPUs:

import { WorkerThreadManager } from '@hirosystems/api-toolkit';
import exampleWorker from './example-worker';

async function performTasksParallel() {
  const workerManager = new WorkerThreadManager(exampleModule);
  await Promise.all(strings.map(str => workerManager.exec(Math.random(), Math.random())));
  await workerManager.close();
}

async function performTasksSync() {
  await Promise.all(strings.map(str => exampleModule.processTask(Math.random(), Math.random())));
}

// If the task takes 500ms, and there are 4 tasks, this resolves in 500ms (assuming 4 or more CPUs)
await performTasksParallel();

// If the task takes 500ms, and there are 4 tasks, this resolves in 2000ms
await performTasksSync();

Notes:

  • WorkerThreadManager is type-checked to ensure that the the module provided (in this case example-worker.ts) implements the correct interface, and WorkerThreadManager.exec function mirrors the processTask function type from the given worker module.
  • The above code processTask is a simple "multiply" function, however, in real-world usage this can be a CPU-intensive function (for example complex binary deserialization or cryptographic code), and it can also be a async/Promise.

@zone117x zone117x requested a review from rafaelcr April 17, 2025 11:59
Copy link
Collaborator

@rafaelcr rafaelcr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Will be super useful for C2.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants