diff --git a/modules/loader-utils/src/lib/worker-loader-utils/create-loader-worker.ts b/modules/loader-utils/src/lib/worker-loader-utils/create-loader-worker.ts index 5d13ad951e..59b891b48a 100644 --- a/modules/loader-utils/src/lib/worker-loader-utils/create-loader-worker.ts +++ b/modules/loader-utils/src/lib/worker-loader-utils/create-loader-worker.ts @@ -1,5 +1,5 @@ /* eslint-disable no-restricted-globals */ - +import {createWorker} from '@loaders.gl/worker-utils' import {WorkerBody} from '@loaders.gl/worker-utils'; // import {validateLoaderVersion} from './validate-loader-version'; @@ -10,79 +10,33 @@ let requestId = 0; * @param loader */ export function createLoaderWorker(loader: any) { - // Check that we are actually in a worker thread - if (typeof self === 'undefined') { - return; - } - - WorkerBody.onmessage = async (type, payload) => { - switch (type) { - case 'process': - try { - // validateLoaderVersion(loader, data.source.split('@')[1]); - - const {input, options = {}} = payload; - - const result = await parseData({ - loader, - arrayBuffer: input, - options, - context: { - parse: parseOnMainThread - } - }); - WorkerBody.postMessage('done', {result}); - } catch (error) { - const message = error instanceof Error ? error.message : ''; - WorkerBody.postMessage('error', {error: message}); - } - break; - default: - } - }; -} - -function parseOnMainThread(arrayBuffer, options = {}) { - return new Promise((resolve, reject) => { - const id = requestId++; - - /** - */ - const onMessage = (type, payload) => { - if (payload.id !== id) { - // not ours - return; + createWorker(parseOnWorker) + async function parseOnWorker(input: any, options: {[key: string]: any}, processOnMainThread): Promise { + // validateLoaderVersion(loader, data.source.split('@')[1]); + + const result = await parseData({ + loader, + arrayBuffer: input, + options, + context: { + parse: processOnMainThread } + }); - switch (type) { - case 'done': - WorkerBody.removeEventListener(onMessage); - resolve(payload.result); - break; - - case 'error': - WorkerBody.removeEventListener(onMessage); - reject(payload.error); - break; - - default: - // ignore - } - }; - - WorkerBody.addEventListener(onMessage); - - // Ask the main thread to decode data - const payload = {id, input: arrayBuffer, options}; - WorkerBody.postMessage('process', payload); - }); + return result; + } } // TODO - Support byteOffset and byteLength (enabling parsing of embedded binaries without copies) // TODO - Why not support async loader.parse* funcs here? // TODO - Why not reuse a common function instead of reimplementing loader.parse* selection logic? Keeping loader small? // TODO - Lack of appropriate parser functions can be detected when we create worker, no need to wait until parse -async function parseData({loader, arrayBuffer, options, context}) { +async function parseData({ + loader, + arrayBuffer, + options, + context +}) { let data; let parser; if (loader.parseSync || loader.parse) { diff --git a/modules/loader-utils/src/lib/worker-loader-utils/parse-with-worker.ts b/modules/loader-utils/src/lib/worker-loader-utils/parse-with-worker.ts index 27ce29f297..20c33a3b18 100644 --- a/modules/loader-utils/src/lib/worker-loader-utils/parse-with-worker.ts +++ b/modules/loader-utils/src/lib/worker-loader-utils/parse-with-worker.ts @@ -1,6 +1,7 @@ import type {WorkerJob, WorkerMessageType, WorkerMessagePayload} from '@loaders.gl/worker-utils'; import type {Loader, LoaderOptions, LoaderContext} from '../../types'; -import {WorkerFarm, getWorkerURL} from '@loaders.gl/worker-utils'; +import {canProcessOnWorker, processOnWorker} from '@loaders.gl/worker-utils'; +import parseToNodeImage from '@loaders.gl/images/lib/parsers/parse-to-node-image'; /** * Determines if a loader can parse with worker @@ -8,7 +9,7 @@ import {WorkerFarm, getWorkerURL} from '@loaders.gl/worker-utils'; * @param options */ export function canParseWithWorker(loader: Loader, options?: LoaderOptions) { - if (!WorkerFarm.isSupported()) { + if (canProcessOnWorker(loader, options)) { return false; } @@ -23,69 +24,7 @@ export async function parseWithWorker( loader: Loader, data, options?: LoaderOptions, - context?: LoaderContext, - parseOnMainThread?: Function + context?: LoaderContext ) { - const name = loader.id; // TODO - const url = getWorkerURL(loader, options); - - const workerFarm = WorkerFarm.getWorkerFarm(options); - const workerPool = workerFarm.getWorkerPool({name, url}); - - // options.log object contains functions which cannot be transferred - // TODO - decide how to handle logging on workers - options = JSON.parse(JSON.stringify(options)); - - const job = await workerPool.startJob( - 'process-on-worker', - onMessage.bind(null, parseOnMainThread) - ); - - job.postMessage('process', { - // @ts-ignore - input: data, - options - }); - - const result = await job.result; - return await result.result; -} - -/** - * Handle worker's responses to the main thread - * @param job - * @param type - * @param payload - */ -async function onMessage( - parseOnMainThread, - job: WorkerJob, - type: WorkerMessageType, - payload: WorkerMessagePayload -) { - switch (type) { - case 'done': - job.done(payload); - break; - - case 'error': - job.error(payload.error); - break; - - case 'process': - // Worker is asking for main thread to parseO - const {id, input, options} = payload; - try { - const result = await parseOnMainThread(input, options); - job.postMessage('done', {id, result}); - } catch (error) { - const message = error instanceof Error ? error.message : 'unknown error'; - job.postMessage('error', {id, error: message}); - } - break; - - default: - // eslint-disable-next-line - console.warn(`parse-with-worker unknown message ${type}`); - } + processOnWorker(loader, data, options, parseOnMainThread); }