Skip to content

Commit

Permalink
feat: worker pool (#22)
Browse files Browse the repository at this point in the history
* feat: worker pool

* fix: unit test or duplicate ts-node registration
  • Loading branch information
climba03003 authored Feb 8, 2024
1 parent 2870bbf commit cb652ec
Show file tree
Hide file tree
Showing 15 changed files with 944 additions and 1 deletion.
115 changes: 115 additions & 0 deletions .github/workflows/ci-worker-pool.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
name: Continuous Integration - Worker Pool

on:
push:
paths:
- ".github/workflows/ci-worker-pool.yml"
- "packages/worker-pool/**"
pull_request:
paths:
- ".github/workflows/ci-worker-pool.yml"
- "packages/worker-pool/**"

jobs:
linter:
name: Lint Code
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Check out repo
uses: actions/checkout@v4
with:
persist-credentials: false

- name: Setup Node
uses: actions/setup-node@v4
with:
node-version: 18

- name: Install pnpm
uses: pnpm/action-setup@v2
with:
version: 8
run_install: false

- name: Get pnpm store directory
shell: bash
run: |
echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV
- uses: actions/cache@v4
name: Setup pnpm cache
with:
path: ${{ env.STORE_PATH }}
key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-pnpm-store-
- name: Install dependencies
run: pnpm install

- name: Lint code
run: pnpm --filter "./packages/worker-pool" run lint

test:
name: Test
runs-on: ${{ matrix.os }}
permissions:
contents: read
strategy:
matrix:
node-version: [18, 20]
os: [macos-latest, ubuntu-latest, windows-latest]
steps:
- name: Check out repo
uses: actions/checkout@v4
with:
persist-credentials: false

- name: Setup Node ${{ matrix.node-version }}
uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node-version }}

- name: Install pnpm
uses: pnpm/action-setup@v2
with:
version: 8
run_install: false

- name: Get pnpm store directory
shell: bash
run: |
echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV
- uses: actions/cache@v4
name: Setup pnpm cache
with:
path: ${{ env.STORE_PATH }}
key: ${{ runner.os }}-${{ matrix.node-version }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-${{ matrix.node-version }}-pnpm-store-
- name: Install dependencies
run: pnpm install

- name: Run tests
run: pnpm --filter "./packages/worker-pool" run test

automerge:
name: Automerge Dependabot PRs
if: >
github.event_name == 'pull_request' &&
github.event.pull_request.user.login == 'dependabot[bot]'
needs: test
permissions:
pull-requests: write
contents: write
runs-on: ubuntu-latest
steps:
- uses: fastify/github-action-merge-dependabot@v3
with:
exclude: ${{ inputs.auto-merge-exclude }}
github-token: ${{ secrets.GITHUB_TOKEN }}
target: major
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"scripts": {
"unit": "pnpm -r --workspace-concurrency=1 unit",
"test": "pnpm -r --workspace-concurrency=1 test",
"lint": "pnpm -r --workspace-concurrency=1 lint"
"lint": "pnpm -r --workspace-concurrency=1 lint",
"build": "pnpm -r --workspace-concurrency=1 build",
"clean": "pnpm -r --workspace-concurrency=1 clean"
}
}
24 changes: 24 additions & 0 deletions packages/worker-pool/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"extends": "standard-with-typescript",
"parserOptions": {
"project": "./tsconfig.json"
},
"rules": {
// conflict between standard and standard-typescript
"no-void": ["error", { "allowAsStatement": true }]
},
"overrides": [
{
"files": ["**/*.test.ts"],
"rules": {
"@typescript-eslint/no-floating-promises": "off"
}
},
{
"files": ["scripts/*.mjs"],
"rules": {
"@typescript-eslint/explicit-function-return-type": "off"
}
}
]
}
2 changes: 2 additions & 0 deletions packages/worker-pool/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './pool'
export * from './worker'
3 changes: 3 additions & 0 deletions packages/worker-pool/lib/mjs/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"type": "module"
}
190 changes: 190 additions & 0 deletions packages/worker-pool/lib/pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import { randomUUID } from 'node:crypto'
import EventEmitter from 'node:events'
import { cpus } from 'node:os'
import { Worker, type TransferListItem, type WorkerOptions } from 'node:worker_threads'
import { TYPESCRIPT_WORKER, filenameFixture, handleMaybePromise, isTSNode, optionsFixture } from './utils'

export interface WorkerMessage {
messageId: string
action: string
value: unknown
transferList?: TransferListItem[]
}

export interface WorkerPoolOptions extends WorkerOptions {
minWorker?: number
maxWorker?: number

maxQueueSize?: number
}

export class WorkerPool extends EventEmitter {
// public variables
minWorker: number
maxWorker: number
maxQueueSize: number
// private variables
readonly #filename: string
readonly #options: WorkerOptions
readonly #queue: WorkerMessage[]
readonly #workers: Map<number, Worker>
readonly #idleWorkers: Set<number>
#terminated: boolean

get workerCount (): number {
return this.#workers.size
}

get queueSize (): number {
return this.#queue.length
}

// WorkerPool provide similar interface of
// Worker to ease the usage
constructor (filename: string | URL, options?: WorkerPoolOptions) {
super()
this.#filename = filenameFixture(filename)
// use spread operator to extract worker options
const { minWorker, maxWorker, maxQueueSize, ...workerOptions } = options ?? {}
this.minWorker = minWorker ?? 1
this.maxWorker = maxWorker ?? Math.max(1, cpus().length - 1)
this.maxQueueSize = maxQueueSize ?? Infinity

this.#options = optionsFixture(workerOptions)
if (isTSNode()) {
this.#options.workerData.__filename = this.#filename
this.#filename = TYPESCRIPT_WORKER
}

// unbounded array
this.#queue = []
this.#workers = new Map()
this.#idleWorkers = new Set()
this.#terminated = false

// spwan worker until minWorker
this.#spwan()
}

postMessage (value: unknown, transferList?: TransferListItem[]): void {
if (this.#terminated) return
this.#pushBuffer('message', value, transferList)
this.#distribute()
}

terminate (): void {
if (this.#terminated) return
this.#terminated = true
for (const [, worker] of this.#workers) {
worker.postMessage({ messageId: randomUUID(), action: 'close' })
}
}

#pushBuffer (action: string, value: unknown, transferList?: TransferListItem[]): void {
if (this.queueSize >= this.maxQueueSize) throw Error('exceed queue size')
this.#queue.push({
messageId: randomUUID(),
action,
value,
transferList: transferList ?? []
})
}

#spwan (): void {
if (this.#terminated) return
for (let i = this.workerCount; i < this.minWorker; i++) {
this.#spwanOnce()
}
}

#spwanOnce (): number | undefined {
if (this.#terminated) return
if (this.#workers.size >= this.maxWorker) return
const worker = new Worker(this.#filename, this.#options)
// thread id will be negative number after exit
// so, we cache it first
const threadId = worker.threadId
this.#workers.set(threadId, worker)
let terminationTimeout: NodeJS.Timeout | null = null

worker.on('online', () => {
this.emit('worker:online', worker)
})
worker.on('message', (value) => {
if (typeof value === 'object') {
switch (value.action) {
case 'idle': {
this.#idleWorkers.add(threadId)
this.emit('worker:idle', worker)
this.#distribute()
break
}
case 'busy': {
this.#idleWorkers.delete(threadId)
this.emit('worker:busy', worker)
break
}
case 'terminate': {
this.#workers.delete(threadId)
this.#idleWorkers.delete(threadId)
// we provide 30 seconds to cleanup
terminationTimeout = setTimeout(() => {
handleMaybePromise(async () => {
await worker.terminate()
}, () => {
terminationTimeout = null
this.emit('worker:terminate', worker)
this.#spwan()
})
}, 30e3)
break
}
default: {
// do not handle something we don't understand
this.emit('worker:message', worker, value?.value ?? value)
break
}
}
} else {
// do not handle something we don't understand
this.emit('worker:message', worker, value)
}
})
worker.on('exit', () => {
// when the worker is properly exit
this.#idleWorkers.delete(threadId)
this.#workers.delete(threadId)
terminationTimeout !== null && clearTimeout(terminationTimeout)
this.emit('worker:exit', worker)
this.#spwan()

if (this.#terminated && this.workerCount === 0) {
this.emit('terminated')
}
})

return worker.threadId
}

#distribute (): void {
if (this.#idleWorkers.size === 0) {
// we increase worker until max
this.#spwanOnce()
}

if (this.#queue.length === 0 && this.#idleWorkers.size === 0) {
return
}

for (const threadId of this.#idleWorkers) {
const worker = this.#workers.get(threadId)
if (worker === undefined) continue
// FIFO
const message = this.#queue.shift()
if (message === undefined) continue
const { transferList, ...value } = message
worker.postMessage(value, transferList)
this.#idleWorkers.delete(threadId)
}
}
}
Loading

0 comments on commit cb652ec

Please sign in to comment.