Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: worker pool #22

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions .github/workflows/ci-worker-pool.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
name: Continuous Integration - Worker Pool

on:
push:
paths:
- ".github/workflows/ci-worker-pool.yml"
- "packages/worker-pool/**"
pull_request:
paths:
- ".github/workflows/ci-worker-pool.yml"
- "packages/worker-pool/**"

jobs:
linter:
name: Lint Code
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Check out repo
uses: actions/checkout@v4
with:
persist-credentials: false

- name: Setup Node
uses: actions/setup-node@v4
with:
node-version: 18

- name: Install pnpm
uses: pnpm/action-setup@v2
with:
version: 8
run_install: false

- name: Get pnpm store directory
shell: bash
run: |
echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV
- uses: actions/cache@v4
name: Setup pnpm cache
with:
path: ${{ env.STORE_PATH }}
key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-pnpm-store-
- name: Install dependencies
run: pnpm install

- name: Lint code
run: pnpm --filter "./packages/worker-pool" run lint

test:
name: Test
runs-on: ${{ matrix.os }}
permissions:
contents: read
strategy:
matrix:
node-version: [18, 20]
os: [macos-latest, ubuntu-latest, windows-latest]
steps:
- name: Check out repo
uses: actions/checkout@v4
with:
persist-credentials: false

- name: Setup Node ${{ matrix.node-version }}
uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node-version }}

- name: Install pnpm
uses: pnpm/action-setup@v2
with:
version: 8
run_install: false

- name: Get pnpm store directory
shell: bash
run: |
echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV
- uses: actions/cache@v4
name: Setup pnpm cache
with:
path: ${{ env.STORE_PATH }}
key: ${{ runner.os }}-${{ matrix.node-version }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-${{ matrix.node-version }}-pnpm-store-
- name: Install dependencies
run: pnpm install

- name: Run tests
run: pnpm --filter "./packages/worker-pool" run test

automerge:
name: Automerge Dependabot PRs
if: >
github.event_name == 'pull_request' &&
github.event.pull_request.user.login == 'dependabot[bot]'
needs: test
permissions:
pull-requests: write
contents: write
runs-on: ubuntu-latest
steps:
- uses: fastify/github-action-merge-dependabot@v3
with:
exclude: ${{ inputs.auto-merge-exclude }}
github-token: ${{ secrets.GITHUB_TOKEN }}
target: major
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"scripts": {
"unit": "pnpm -r --workspace-concurrency=1 unit",
"test": "pnpm -r --workspace-concurrency=1 test",
"lint": "pnpm -r --workspace-concurrency=1 lint"
"lint": "pnpm -r --workspace-concurrency=1 lint",
"build": "pnpm -r --workspace-concurrency=1 build",
"clean": "pnpm -r --workspace-concurrency=1 clean"
}
}
24 changes: 24 additions & 0 deletions packages/worker-pool/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"extends": "standard-with-typescript",
"parserOptions": {
"project": "./tsconfig.json"
},
"rules": {
// conflict between standard and standard-typescript
"no-void": ["error", { "allowAsStatement": true }]
},
"overrides": [
{
"files": ["**/*.test.ts"],
"rules": {
"@typescript-eslint/no-floating-promises": "off"
}
},
{
"files": ["scripts/*.mjs"],
"rules": {
"@typescript-eslint/explicit-function-return-type": "off"
}
}
]
}
2 changes: 2 additions & 0 deletions packages/worker-pool/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './pool'
export * from './worker'
3 changes: 3 additions & 0 deletions packages/worker-pool/lib/mjs/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"type": "module"
}
190 changes: 190 additions & 0 deletions packages/worker-pool/lib/pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import { randomUUID } from 'node:crypto'
import EventEmitter from 'node:events'
import { cpus } from 'node:os'
import { Worker, type TransferListItem, type WorkerOptions } from 'node:worker_threads'
import { TYPESCRIPT_WORKER, filenameFixture, handleMaybePromise, isTSNode, optionsFixture } from './utils'

export interface WorkerMessage {
messageId: string
action: string
value: unknown
transferList?: TransferListItem[]
}

export interface WorkerPoolOptions extends WorkerOptions {
minWorker?: number
maxWorker?: number

maxQueueSize?: number
}

export class WorkerPool extends EventEmitter {
// public variables
minWorker: number
maxWorker: number
maxQueueSize: number
// private variables
readonly #filename: string
readonly #options: WorkerOptions
readonly #queue: WorkerMessage[]
readonly #workers: Map<number, Worker>
readonly #idleWorkers: Set<number>
#terminated: boolean

get workerCount (): number {
return this.#workers.size
}

get queueSize (): number {
return this.#queue.length
}

// WorkerPool provide similar interface of
// Worker to ease the usage
constructor (filename: string | URL, options?: WorkerPoolOptions) {
super()
this.#filename = filenameFixture(filename)
// use spread operator to extract worker options
const { minWorker, maxWorker, maxQueueSize, ...workerOptions } = options ?? {}
this.minWorker = minWorker ?? 1
this.maxWorker = maxWorker ?? Math.max(1, cpus().length - 1)
this.maxQueueSize = maxQueueSize ?? Infinity

this.#options = optionsFixture(workerOptions)
if (isTSNode()) {
this.#options.workerData.__filename = this.#filename
this.#filename = TYPESCRIPT_WORKER
}

// unbounded array
this.#queue = []
this.#workers = new Map()
this.#idleWorkers = new Set()
this.#terminated = false

// spwan worker until minWorker
this.#spwan()
}

postMessage (value: unknown, transferList?: TransferListItem[]): void {
if (this.#terminated) return
this.#pushBuffer('message', value, transferList)
this.#distribute()
}

terminate (): void {
if (this.#terminated) return
this.#terminated = true
for (const [, worker] of this.#workers) {
worker.postMessage({ messageId: randomUUID(), action: 'close' })
}
}

#pushBuffer (action: string, value: unknown, transferList?: TransferListItem[]): void {
if (this.queueSize >= this.maxQueueSize) throw Error('exceed queue size')
this.#queue.push({
messageId: randomUUID(),
action,
value,
transferList: transferList ?? []
})
}

#spwan (): void {
if (this.#terminated) return
for (let i = this.workerCount; i < this.minWorker; i++) {
this.#spwanOnce()
}
}

#spwanOnce (): number | undefined {
if (this.#terminated) return
if (this.#workers.size >= this.maxWorker) return
const worker = new Worker(this.#filename, this.#options)
// thread id will be negative number after exit
// so, we cache it first
const threadId = worker.threadId
this.#workers.set(threadId, worker)
let terminationTimeout: NodeJS.Timeout | null = null

worker.on('online', () => {
this.emit('worker:online', worker)
})
worker.on('message', (value) => {
if (typeof value === 'object') {
switch (value.action) {
case 'idle': {
this.#idleWorkers.add(threadId)
this.emit('worker:idle', worker)
this.#distribute()
break
}
case 'busy': {
this.#idleWorkers.delete(threadId)
this.emit('worker:busy', worker)
break
}
case 'terminate': {
this.#workers.delete(threadId)
this.#idleWorkers.delete(threadId)
// we provide 30 seconds to cleanup
terminationTimeout = setTimeout(() => {
handleMaybePromise(async () => {
await worker.terminate()
}, () => {
terminationTimeout = null
this.emit('worker:terminate', worker)
this.#spwan()
})
}, 30e3)
break
}
default: {
// do not handle something we don't understand
this.emit('worker:message', worker, value?.value ?? value)
break
}
}
} else {
// do not handle something we don't understand
this.emit('worker:message', worker, value)
}
})
worker.on('exit', () => {
// when the worker is properly exit
this.#idleWorkers.delete(threadId)
this.#workers.delete(threadId)
terminationTimeout !== null && clearTimeout(terminationTimeout)
this.emit('worker:exit', worker)
this.#spwan()

if (this.#terminated && this.workerCount === 0) {
this.emit('terminated')
}
})

return worker.threadId
}

#distribute (): void {
if (this.#idleWorkers.size === 0) {
// we increase worker until max
this.#spwanOnce()
}

if (this.#queue.length === 0 && this.#idleWorkers.size === 0) {
return
}

for (const threadId of this.#idleWorkers) {
const worker = this.#workers.get(threadId)
if (worker === undefined) continue
// FIFO
const message = this.#queue.shift()
if (message === undefined) continue
const { transferList, ...value } = message
worker.postMessage(value, transferList)
this.#idleWorkers.delete(threadId)
}
}
}
Loading
Loading