Skip to content

Commit

Permalink
chore(loader-utils): Use processOnWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
ibgreen committed Jun 28, 2021
1 parent d5c81f2 commit 983fc7a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 132 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable no-restricted-globals */

import {createWorker} from '@loaders.gl/worker-utils'
import {WorkerBody} from '@loaders.gl/worker-utils';
// import {validateLoaderVersion} from './validate-loader-version';

Expand All @@ -10,79 +10,33 @@ let requestId = 0;
* @param loader
*/
export function createLoaderWorker(loader: any) {
// Check that we are actually in a worker thread
if (typeof self === 'undefined') {
return;
}

WorkerBody.onmessage = async (type, payload) => {
switch (type) {
case 'process':
try {
// validateLoaderVersion(loader, data.source.split('@')[1]);

const {input, options = {}} = payload;

const result = await parseData({
loader,
arrayBuffer: input,
options,
context: {
parse: parseOnMainThread
}
});
WorkerBody.postMessage('done', {result});
} catch (error) {
const message = error instanceof Error ? error.message : '';
WorkerBody.postMessage('error', {error: message});
}
break;
default:
}
};
}

function parseOnMainThread(arrayBuffer, options = {}) {
return new Promise((resolve, reject) => {
const id = requestId++;

/**
*/
const onMessage = (type, payload) => {
if (payload.id !== id) {
// not ours
return;
createWorker(parseOnWorker)
async function parseOnWorker(input: any, options: {[key: string]: any}, processOnMainThread): Promise<any> {
// validateLoaderVersion(loader, data.source.split('@')[1]);

const result = await parseData({
loader,
arrayBuffer: input,
options,
context: {
parse: processOnMainThread
}
});

switch (type) {
case 'done':
WorkerBody.removeEventListener(onMessage);
resolve(payload.result);
break;

case 'error':
WorkerBody.removeEventListener(onMessage);
reject(payload.error);
break;

default:
// ignore
}
};

WorkerBody.addEventListener(onMessage);

// Ask the main thread to decode data
const payload = {id, input: arrayBuffer, options};
WorkerBody.postMessage('process', payload);
});
return result;
}
}

// TODO - Support byteOffset and byteLength (enabling parsing of embedded binaries without copies)
// TODO - Why not support async loader.parse* funcs here?
// TODO - Why not reuse a common function instead of reimplementing loader.parse* selection logic? Keeping loader small?
// TODO - Lack of appropriate parser functions can be detected when we create worker, no need to wait until parse
async function parseData({loader, arrayBuffer, options, context}) {
async function parseData({
loader,
arrayBuffer,
options,
context
}) {
let data;
let parser;
if (loader.parseSync || loader.parse) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import type {WorkerJob, WorkerMessageType, WorkerMessagePayload} from '@loaders.gl/worker-utils';
import type {Loader, LoaderOptions, LoaderContext} from '../../types';
import {WorkerFarm, getWorkerURL} from '@loaders.gl/worker-utils';
import {canProcessOnWorker, processOnWorker} from '@loaders.gl/worker-utils';
import parseToNodeImage from '@loaders.gl/images/lib/parsers/parse-to-node-image';

/**
* Determines if a loader can parse with worker
* @param loader
* @param options
*/
export function canParseWithWorker(loader: Loader, options?: LoaderOptions) {
if (!WorkerFarm.isSupported()) {
if (canProcessOnWorker(loader, options)) {
return false;
}

Expand All @@ -23,69 +24,7 @@ export async function parseWithWorker(
loader: Loader,
data,
options?: LoaderOptions,
context?: LoaderContext,
parseOnMainThread?: Function
context?: LoaderContext
) {
const name = loader.id; // TODO
const url = getWorkerURL(loader, options);

const workerFarm = WorkerFarm.getWorkerFarm(options);
const workerPool = workerFarm.getWorkerPool({name, url});

// options.log object contains functions which cannot be transferred
// TODO - decide how to handle logging on workers
options = JSON.parse(JSON.stringify(options));

const job = await workerPool.startJob(
'process-on-worker',
onMessage.bind(null, parseOnMainThread)
);

job.postMessage('process', {
// @ts-ignore
input: data,
options
});

const result = await job.result;
return await result.result;
}

/**
* Handle worker's responses to the main thread
* @param job
* @param type
* @param payload
*/
async function onMessage(
parseOnMainThread,
job: WorkerJob,
type: WorkerMessageType,
payload: WorkerMessagePayload
) {
switch (type) {
case 'done':
job.done(payload);
break;

case 'error':
job.error(payload.error);
break;

case 'process':
// Worker is asking for main thread to parseO
const {id, input, options} = payload;
try {
const result = await parseOnMainThread(input, options);
job.postMessage('done', {id, result});
} catch (error) {
const message = error instanceof Error ? error.message : 'unknown error';
job.postMessage('error', {id, error: message});
}
break;

default:
// eslint-disable-next-line
console.warn(`parse-with-worker unknown message ${type}`);
}
processOnWorker(loader, data, options, parseOnMainThread);
}

0 comments on commit 983fc7a

Please sign in to comment.