diff --git a/src/common.ts b/src/common.ts index fcf9c09..b4109db 100644 --- a/src/common.ts +++ b/src/common.ts @@ -1,6 +1,7 @@ import type { MessagePort } from 'worker_threads' export interface StartupMessage { + tinypoolStartupMessage: true filename: string | null name: string port: MessagePort diff --git a/src/index.ts b/src/index.ts index 2dc5b62..3f03cc8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ import { MessageChannel, MessagePort, receiveMessageOnPort, + parentPort, } from 'worker_threads' import { once } from 'events' import EventEmitterAsyncResource from './EventEmitterAsyncResource' @@ -688,6 +689,7 @@ class ThreadPool { } const message: StartupMessage = { + tinypoolStartupMessage: true, filename: this.options.filename, name: this.options.name, port: port2, @@ -1024,6 +1026,12 @@ class Tinypool extends EventEmitterAsyncResource { return this.#pool.runTask(task, { transferList, filename, name, signal }) } + broadcastMessage(message: any) { + for (const workerInfo of this.#pool.workers) { + workerInfo.worker.postMessage(message) + } + } + destroy() { return this.#pool.destroy() } @@ -1102,6 +1110,14 @@ class Tinypool extends EventEmitterAsyncResource { } } +export function onBroadcastedMessage(handler: (message: any) => void) { + if (parentPort) { + parentPort.on('message', handler) + } else { + throw new Error('onBroadcastedMessage can only be used in worker threads') + } +} + const _workerId = process.__tinypool_state__?.workerId export * from './common' diff --git a/src/worker.ts b/src/worker.ts index ad8d747..c14073b 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -101,6 +101,7 @@ async function getHandler( parentPort!.on('message', (message: StartupMessage) => { useAtomics = process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics + if (!message?.tinypoolStartupMessage) return const { port, sharedBuffer, filename, name } = message ;(async function () { if (filename !== null) {