From b966370e3396d3be72df13799e2b2205e7353f9d Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Thu, 9 Nov 2023 11:20:59 +0200 Subject: [PATCH 01/15] Parallelizer --- lib/lbt/utils/Parallelizer.js | 40 +++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 lib/lbt/utils/Parallelizer.js diff --git a/lib/lbt/utils/Parallelizer.js b/lib/lbt/utils/Parallelizer.js new file mode 100644 index 000000000..a4b9450b4 --- /dev/null +++ b/lib/lbt/utils/Parallelizer.js @@ -0,0 +1,40 @@ +import workerpool from "workerpool"; +import os from "node:os"; +import {getLogger} from "@ui5/logger"; +const log = getLogger("builder:processors:minifier"); + +const MIN_WORKERS = 2; +const MAX_WORKERS = 4; +const osCpus = os.cpus().length || 1; +const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); + +class Parallelizer { + #log; + + constructor() { + + } + + getInstance() { + + } +} + +let pool; +function getPool(taskUtil) { + if (!pool) { + log.verbose(`Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})`); + const workerPath = fileURLToPath(new URL("./minifierWorker.js", import.meta.url)); + pool = workerpool.pool(workerPath, { + workerType: "auto", + maxWorkers + }); + taskUtil.registerCleanupTask(() => { + log.verbose(`Terminating workerpool`); + const poolToBeTerminated = pool; + pool = null; + poolToBeTerminated.terminate(); + }); + } + return pool; +} \ No newline at end of file From 17714af603f531fd71f27d0c344e703e2cbf447c Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Fri, 10 Nov 2023 09:08:29 +0200 Subject: [PATCH 02/15] Abstract workerpool implementation --- lib/lbt/utils/Parallelizer.js | 324 +++++++++++++++++++++++++--- lib/lbt/utils/ParallelizerThread.js | 18 ++ 2 files changed, 318 insertions(+), 24 deletions(-) create mode 100644 lib/lbt/utils/ParallelizerThread.js diff --git a/lib/lbt/utils/Parallelizer.js b/lib/lbt/utils/Parallelizer.js index a4b9450b4..667f05674 100644 --- a/lib/lbt/utils/Parallelizer.js +++ b/lib/lbt/utils/Parallelizer.js @@ -1,40 +1,316 @@ import workerpool from "workerpool"; import os from "node:os"; +import {fileURLToPath} from "node:url"; import {getLogger} from "@ui5/logger"; -const log = getLogger("builder:processors:minifier"); const MIN_WORKERS = 2; const MAX_WORKERS = 4; const osCpus = os.cpus().length || 1; const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); -class Parallelizer { - #log; +export class Parallelizer { + #log = getLogger("builder:utils:Parallelizer"); + #pool; + static #ensureSingleton = false; + static #instance; + + #getPool() { + if (!this.#pool) { + this.#log.verbose( + `Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})` + ); + const workerPath = fileURLToPath( + new URL("./ParallelizerThread.js", import.meta.url) + ); + this.#pool = workerpool.pool(workerPath, { + workerType: "auto", + maxWorkers, + }); + } + return this.#pool; + } constructor() { + if (!Parallelizer.#ensureSingleton) { + throw new Error( + "Constructor must not be called! This is a singleton class. Use Parallelizer.getInstance()" + ); + } + } + + static getInstance() { + if (!Parallelizer.#instance) { + Parallelizer.#ensureSingleton = true; + Parallelizer.#instance = new Parallelizer(); + Parallelizer.#ensureSingleton = false; + } + + return Parallelizer.#instance; + } + + async execInThread(args) { + return this.#getPool().exec("execInThread", [args]); + } + + async cleanup() { + const attemptPoolTermination = () => { + this.#log.verbose(`Attempt to terminate the workerpool...`); + + if (!this.#pool) { + this.#log.verbose( + "Pool termination requested, but a pool has not been initialized or already has been terminated." + ); + return; + } + + // There are many stats that could be used, but these ones seem the most + // convenient. When all the (available) workers are idle, then it's safe to terminate. + const {idleWorkers, totalWorkers} = this.#pool.stats(); + if (idleWorkers !== totalWorkers) { + return new Promise((resolve) => + setTimeout(() => resolve(attemptPoolTermination()), 100) // Retry after a while + ); + } + + return this.terminateTasks(/* terminate gracefully */); + }; + return attemptPoolTermination(); } - - getInstance() { - + + async terminateTasks(force) { + if (!this.#pool) { + this.#log.verbose("Pool termination requested, but a pool has not been initialized"); + return; + } + + const pool = this.#pool; + this.#pool = null; + return pool.terminate(force); } } -let pool; -function getPool(taskUtil) { - if (!pool) { - log.verbose(`Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})`); - const workerPath = fileURLToPath(new URL("./minifierWorker.js", import.meta.url)); - pool = workerpool.pool(workerPath, { - workerType: "auto", - maxWorkers - }); - taskUtil.registerCleanupTask(() => { - log.verbose(`Terminating workerpool`); - const poolToBeTerminated = pool; - pool = null; - poolToBeTerminated.terminate(); - }); - } - return pool; -} \ No newline at end of file +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * sends the results to a MessagePort. + * + * Used in the main thread in a combination with FsWorkerThreadInterface. + */ +export class FsMainThreadInterface { + #comPorts = new Set(); + #fsInterfaceReader = null; + #cache = Object.create(null); + + /** + * Constructor + * + * @param {@ui5/fs/fsInterface} fsInterfaceReader Reader for the Resources + */ + constructor(fsInterfaceReader) { + if (!fsInterfaceReader) { + throw new Error("fsInterfaceReader is mandatory argument"); + } + + this.#fsInterfaceReader = fsInterfaceReader; + } + + /** + * Adds MessagePort and starts listening for requests on it. + * + * @param {MessagePort} comPort port1 from a {code}MessageChannel{/code} + */ + startCommunication(comPort) { + if (!comPort) { + throw new Error("Communication channel is mandatory argument"); + } + + this.#comPorts.add(comPort); + comPort.on("message", (e) => this.#onMessage(e, comPort)); + comPort.on("close", () => comPort.close()); + } + + /** + * Ends MessagePort communication. + * + * @param {MessagePort} comPort port1 to remove from handling. + */ + endCommunication(comPort) { + comPort.close(); + this.#comPorts.delete(comPort); + } + + /** + * Destroys the FsMainThreadInterface + */ + cleanup() { + this.#comPorts.forEach((comPort) => comPort.close()); + this.#cache = null; + this.#fsInterfaceReader = null; + } + + /** + * Handles messages from the MessagePort + * + * @param {object} e data to construct the request + * @param {string} e.action Action to perform. Corresponds to the names of + * the public methods of "@ui5/fs/fsInterface" + * @param {string} e.fsPath Path of the Resource + * @param {object} e.options Options for "readFile" action + * @param {MessagePort} comPort The communication channel + */ + #onMessage(e, comPort) { + switch (e.action) { + case "readFile": + this.#doRequest(comPort, { + action: "readFile", + fsPath: e.fsPath, + options: e.options, + }); + break; + case "stat": + this.#doRequest(comPort, { action: "stat", fsPath: e.fsPath }); + break; + } + } + + /** + * Requests a Resource from the "@ui5/fs/fsInterface" and sends it to the worker threads + * via postMessage. + * + * @param {MessagePort} comPort The communication channel + * @param {object} parameters + * @param {string} parameters.action Action to perform. Corresponds to the names of + * the public methods of "@ui5/fs/fsInterface" and triggers this method of the + * "@ui5/fs/fsInterface" instance. + * @param {string} parameters.fsPath Path of the Resource + * @param {object} parameters.options Options for "readFile" action + */ + async #doRequest(comPort, { action, fsPath, options }) { + const cacheKey = `${fsPath}-${action}`; + if (!this.#cache[cacheKey]) { + this.#cache[cacheKey] = new Promise((res) => { + if (action === "readFile") { + this.#fsInterfaceReader.readFile( + fsPath, + options, + (error, result) => res({ error, result }) + ); + } else if (action === "stat") { + this.#fsInterfaceReader.stat(fsPath, (error, result) => + // The Stat object has some special methods that sometimes cannot be serialized + // properly in the postMessage. In this scenario, we do not need those methods, + // but just to check whether stats has resolved to something. + res(JSON.parse(JSON.stringify({ error, result }))) + ); + } else { + res({ + error: new Error( + `Action "${action}" is not available.` + ), + result: null, + }); + } + }); + } + + const fromCache = await this.#cache[cacheKey]; + comPort.postMessage({ action, fsPath, ...fromCache }); + } +} + +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * requests resources via MessagePort. + * + * Used in the worker thread in a combination with FsMainThreadInterface. + */ +export class FsWorkerThreadInterface { + #comPort = null; + #callbacks = []; + #cache = Object.create(null); + + /** + * Constructor + * + * @param {MessagePort} comPort Communication port + */ + constructor(comPort) { + if (!comPort) { + throw new Error("Communication port is mandatory argument"); + } + + this.#comPort = comPort; + comPort.on("message", this.#onMessage.bind(this)); + comPort.on("close", this.#onClose.bind(this)); + } + + /** + * Handles messages from MessagePort + * + * @param {object} e + * @param {string} e.action Action to perform. Corresponds to the names of + * the public methods of "@ui5/fs/fsInterface" + * @param {string} e.fsPath Path of the Resource + * @param {*} e.result Response from the "action". + * @param {object} e.error Error from the "action". + */ + #onMessage(e) { + const cbObject = this.#callbacks.find( + (cb) => cb.action === e.action && cb.fsPath === e.fsPath + ); + + if (cbObject) { + this.#cache[`${e.fsPath}-${e.action}`] = { + error: e.error, + result: e.result, + }; + this.#callbacks.splice(this.#callbacks.indexOf(cbObject), 1); + cbObject.callback(e.error, e.result); + } else { + throw new Error( + "No callback found for this message! Possible hang for the thread!", + e + ); + } + } + + /** + * End communication + */ + #onClose() { + this.#comPort.close(); + this.#cache = null; + } + + /** + * Makes a request via the MessagePort + * + * @param {object} parameters + * @param {string} parameters.action Action to perform. Corresponds to the names of + * the public methods. + * @param {string} parameters.fsPath Path of the Resource + * @param {object} parameters.options Options for "readFile" action + * @param {Function} callback Callback to call when the "action" is executed and ready. + */ + #doRequest({ action, fsPath, options }, callback) { + const cacheKey = `${fsPath}-${action}`; + + if (this.#cache[cacheKey]) { + const { result, error } = this.#cache[cacheKey]; + callback(error, result); + } else { + this.#callbacks.push({ action, fsPath, callback }); + this.#comPort.postMessage({ action, fsPath, options }); + } + } + + readFile(fsPath, options, callback) { + this.#doRequest({ action: "readFile", fsPath, options }, callback); + } + + stat(fsPath, callback) { + this.#doRequest({ action: "stat", fsPath }, callback); + } +} diff --git a/lib/lbt/utils/ParallelizerThread.js b/lib/lbt/utils/ParallelizerThread.js new file mode 100644 index 000000000..6bc890ab7 --- /dev/null +++ b/lib/lbt/utils/ParallelizerThread.js @@ -0,0 +1,18 @@ +import workerpool from "workerpool"; + +export default async function execInThread({url, methodName, args}) { + const moduleToRegister = await import(url); + const methodCall = moduleToRegister[methodName] || moduleToRegister["default"]; + + return await methodCall(args); +} + +// Test execution via ava is never done on the main thread +/* istanbul ignore else */ +if (!workerpool.isMainThread) { + // Script got loaded through workerpool + // => Create a worker and register public functions + workerpool.worker({ + execInThread, + }); +} From a7c46895cf893aadd286ce097f6521cd3d0bb16b Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Fri, 10 Nov 2023 09:11:37 +0200 Subject: [PATCH 03/15] Implement new workerpool abstraction to the minifier --- lib/processors/minifier.js | 37 +++++++------------------------- lib/processors/minifierWorker.js | 11 ---------- 2 files changed, 8 insertions(+), 40 deletions(-) diff --git a/lib/processors/minifier.js b/lib/processors/minifier.js index b70d4dd02..589c76be6 100644 --- a/lib/processors/minifier.js +++ b/lib/processors/minifier.js @@ -1,45 +1,24 @@ import {fileURLToPath} from "node:url"; import posixPath from "node:path/posix"; import {promisify} from "node:util"; -import os from "node:os"; -import workerpool from "workerpool"; +import {Parallelizer} from "../lbt/utils/Parallelizer.js"; import Resource from "@ui5/fs/Resource"; import {getLogger} from "@ui5/logger"; const log = getLogger("builder:processors:minifier"); const debugFileRegex = /((?:\.view|\.fragment|\.controller|\.designtime|\.support)?\.js)$/; -const MIN_WORKERS = 2; -const MAX_WORKERS = 4; -const osCpus = os.cpus().length || 1; -const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); - const sourceMappingUrlPattern = /\/\/# sourceMappingURL=(\S+)\s*$/; const httpPattern = /^https?:\/\//i; -// Shared workerpool across all executions until the taskUtil cleanup is triggered -let pool; - -function getPool(taskUtil) { - if (!pool) { - log.verbose(`Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})`); - const workerPath = fileURLToPath(new URL("./minifierWorker.js", import.meta.url)); - pool = workerpool.pool(workerPath, { - workerType: "auto", - maxWorkers - }); - taskUtil.registerCleanupTask(() => { - log.verbose(`Terminating workerpool`); - const poolToBeTerminated = pool; - pool = null; - poolToBeTerminated.terminate(); - }); - } - return pool; -} - async function minifyInWorker(options, taskUtil) { - return getPool(taskUtil).exec("execMinification", [options]); + const url = fileURLToPath(new URL("./minifierWorker.js", import.meta.url)); + + return Parallelizer.getInstance().execInThread({ + url, + methodName: "execMinification", + args: options + }); } async function extractAndRemoveSourceMappingUrl(resource) { diff --git a/lib/processors/minifierWorker.js b/lib/processors/minifierWorker.js index 943422903..a0057c234 100644 --- a/lib/processors/minifierWorker.js +++ b/lib/processors/minifierWorker.js @@ -1,4 +1,3 @@ -import workerpool from "workerpool"; import {minify} from "terser"; /** @@ -67,13 +66,3 @@ export default async function execMinification({ }); } } - -// Test execution via ava is never done on the main thread -/* istanbul ignore else */ -if (!workerpool.isMainThread) { - // Script got loaded through workerpool - // => Create a worker and register public functions - workerpool.worker({ - execMinification - }); -} From 094bec604019501ea374690496ae990934c76fce Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Mon, 13 Nov 2023 10:31:05 +0200 Subject: [PATCH 04/15] Refactor, rename and provide the new thread interface --- .../{Parallelizer.js => PoolDispatcher.js} | 53 ++++++++++++++----- ...elizerThread.js => TaskProcessorThread.js} | 0 lib/processors/minifier.js | 9 ++-- 3 files changed, 44 insertions(+), 18 deletions(-) rename lib/lbt/utils/{Parallelizer.js => PoolDispatcher.js} (87%) rename lib/lbt/utils/{ParallelizerThread.js => TaskProcessorThread.js} (100%) diff --git a/lib/lbt/utils/Parallelizer.js b/lib/lbt/utils/PoolDispatcher.js similarity index 87% rename from lib/lbt/utils/Parallelizer.js rename to lib/lbt/utils/PoolDispatcher.js index 667f05674..3b735168f 100644 --- a/lib/lbt/utils/Parallelizer.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -8,8 +8,9 @@ const MAX_WORKERS = 4; const osCpus = os.cpus().length || 1; const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); -export class Parallelizer { - #log = getLogger("builder:utils:Parallelizer"); +export class PoolDispatcher { + #log = getLogger("builder:utils:PoolDispatcher"); + #projectBuilders = []; #pool; static #ensureSingleton = false; static #instance; @@ -20,7 +21,7 @@ export class Parallelizer { `Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})` ); const workerPath = fileURLToPath( - new URL("./ParallelizerThread.js", import.meta.url) + new URL("./TaskProcessorThread.js", import.meta.url) ); this.#pool = workerpool.pool(workerPath, { workerType: "auto", @@ -31,7 +32,7 @@ export class Parallelizer { } constructor() { - if (!Parallelizer.#ensureSingleton) { + if (!PoolDispatcher.#ensureSingleton) { throw new Error( "Constructor must not be called! This is a singleton class. Use Parallelizer.getInstance()" ); @@ -39,21 +40,35 @@ export class Parallelizer { } static getInstance() { - if (!Parallelizer.#instance) { - Parallelizer.#ensureSingleton = true; - Parallelizer.#instance = new Parallelizer(); - Parallelizer.#ensureSingleton = false; + if (!PoolDispatcher.#instance) { + PoolDispatcher.#ensureSingleton = true; + PoolDispatcher.#instance = new PoolDispatcher(); + PoolDispatcher.#ensureSingleton = false; } - return Parallelizer.#instance; + return PoolDispatcher.#instance; } - async execInThread(args) { - return this.#getPool().exec("execInThread", [args]); + getProcessor(url) { + return { + execute: async (methodName, args) => { + const buildUpArgs = {url, methodName, args}; + return this.#getPool().exec("execInThread", [buildUpArgs]); + } + }; } - async cleanup() { + async cleanup(project) { const attemptPoolTermination = () => { + if (this.#projectBuilders.length) { + this.#log.verbose( + `Pool termination canceled. Still pending projects to build: ${this.#projectBuilders.map( + (project) => project.getName() + )}` + ); + return; + } + this.#log.verbose(`Attempt to terminate the workerpool...`); if (!this.#pool) { @@ -75,6 +90,11 @@ export class Parallelizer { return this.terminateTasks(/* terminate gracefully */); }; + if (project) { + const projectIndex = this.#projectBuilders.indexOf(project); + this.#projectBuilders.splice(projectIndex, 1); + } + return attemptPoolTermination(); } @@ -84,10 +104,19 @@ export class Parallelizer { return; } + this.#projectBuilders = []; const pool = this.#pool; this.#pool = null; return pool.terminate(force); } + + registerProjectBuilder(project) { + this.#projectBuilders.push(project); + } + + getQueuedProjectBuilders() { + return this.#projectBuilders; + } } /** diff --git a/lib/lbt/utils/ParallelizerThread.js b/lib/lbt/utils/TaskProcessorThread.js similarity index 100% rename from lib/lbt/utils/ParallelizerThread.js rename to lib/lbt/utils/TaskProcessorThread.js diff --git a/lib/processors/minifier.js b/lib/processors/minifier.js index 589c76be6..e386f0b0a 100644 --- a/lib/processors/minifier.js +++ b/lib/processors/minifier.js @@ -1,7 +1,7 @@ import {fileURLToPath} from "node:url"; import posixPath from "node:path/posix"; import {promisify} from "node:util"; -import {Parallelizer} from "../lbt/utils/Parallelizer.js"; +import {PoolDispatcher} from "../lbt/utils/PoolDispatcher.js"; import Resource from "@ui5/fs/Resource"; import {getLogger} from "@ui5/logger"; const log = getLogger("builder:processors:minifier"); @@ -14,11 +14,8 @@ const httpPattern = /^https?:\/\//i; async function minifyInWorker(options, taskUtil) { const url = fileURLToPath(new URL("./minifierWorker.js", import.meta.url)); - return Parallelizer.getInstance().execInThread({ - url, - methodName: "execMinification", - args: options - }); + const processor = PoolDispatcher.getInstance().getProcessor(url); + return processor.execute("execMinification", options); } async function extractAndRemoveSourceMappingUrl(resource) { From 347db1f2ee0adcdc90647eeed80cbe8e8cbb451b Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Mon, 13 Nov 2023 11:00:52 +0200 Subject: [PATCH 05/15] Fix formats --- lib/lbt/utils/PoolDispatcher.js | 40 ++++++++++++++++----------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js index 3b735168f..00bc89e8b 100644 --- a/lib/lbt/utils/PoolDispatcher.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -190,16 +190,16 @@ export class FsMainThreadInterface { */ #onMessage(e, comPort) { switch (e.action) { - case "readFile": - this.#doRequest(comPort, { - action: "readFile", - fsPath: e.fsPath, - options: e.options, - }); - break; - case "stat": - this.#doRequest(comPort, { action: "stat", fsPath: e.fsPath }); - break; + case "readFile": + this.#doRequest(comPort, { + action: "readFile", + fsPath: e.fsPath, + options: e.options, + }); + break; + case "stat": + this.#doRequest(comPort, {action: "stat", fsPath: e.fsPath}); + break; } } @@ -215,7 +215,7 @@ export class FsMainThreadInterface { * @param {string} parameters.fsPath Path of the Resource * @param {object} parameters.options Options for "readFile" action */ - async #doRequest(comPort, { action, fsPath, options }) { + async #doRequest(comPort, {action, fsPath, options}) { const cacheKey = `${fsPath}-${action}`; if (!this.#cache[cacheKey]) { this.#cache[cacheKey] = new Promise((res) => { @@ -223,14 +223,14 @@ export class FsMainThreadInterface { this.#fsInterfaceReader.readFile( fsPath, options, - (error, result) => res({ error, result }) + (error, result) => res({error, result}) ); } else if (action === "stat") { this.#fsInterfaceReader.stat(fsPath, (error, result) => // The Stat object has some special methods that sometimes cannot be serialized // properly in the postMessage. In this scenario, we do not need those methods, // but just to check whether stats has resolved to something. - res(JSON.parse(JSON.stringify({ error, result }))) + res(JSON.parse(JSON.stringify({error, result}))) ); } else { res({ @@ -244,7 +244,7 @@ export class FsMainThreadInterface { } const fromCache = await this.#cache[cacheKey]; - comPort.postMessage({ action, fsPath, ...fromCache }); + comPort.postMessage({action, fsPath, ...fromCache}); } } @@ -323,23 +323,23 @@ export class FsWorkerThreadInterface { * @param {object} parameters.options Options for "readFile" action * @param {Function} callback Callback to call when the "action" is executed and ready. */ - #doRequest({ action, fsPath, options }, callback) { + #doRequest({action, fsPath, options}, callback) { const cacheKey = `${fsPath}-${action}`; if (this.#cache[cacheKey]) { - const { result, error } = this.#cache[cacheKey]; + const {result, error} = this.#cache[cacheKey]; callback(error, result); } else { - this.#callbacks.push({ action, fsPath, callback }); - this.#comPort.postMessage({ action, fsPath, options }); + this.#callbacks.push({action, fsPath, callback}); + this.#comPort.postMessage({action, fsPath, options}); } } readFile(fsPath, options, callback) { - this.#doRequest({ action: "readFile", fsPath, options }, callback); + this.#doRequest({action: "readFile", fsPath, options}, callback); } stat(fsPath, callback) { - this.#doRequest({ action: "stat", fsPath }, callback); + this.#doRequest({action: "stat", fsPath}, callback); } } From c8d5e2c8c14663123fc3622f633e1612f7d1ed97 Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Mon, 13 Nov 2023 14:54:05 +0200 Subject: [PATCH 06/15] Abstract thread communication --- lib/processors/themeBuilderWorker.js | 150 +++++++++++++-------------- 1 file changed, 72 insertions(+), 78 deletions(-) diff --git a/lib/processors/themeBuilderWorker.js b/lib/processors/themeBuilderWorker.js index 91956e566..59d605e63 100644 --- a/lib/processors/themeBuilderWorker.js +++ b/lib/processors/themeBuilderWorker.js @@ -2,6 +2,7 @@ import workerpool from "workerpool"; import themeBuilder from "./themeBuilder.js"; import {createResource} from "@ui5/fs/resourceFactory"; import {Buffer} from "node:buffer"; +import { callbackify } from "node:util"; /** * Task to build library themes. @@ -63,29 +64,20 @@ export function deserializeResources(resources) { }); } -/** - * "@ui5/fs/fsInterface" like class that uses internally - * "@ui5/fs/fsInterface", implements its methods, and - * sends the results to a MessagePort. - * - * Used in the main thread in a combination with FsWorkerThreadInterface. - */ -export class FsMainThreadInterface { - #comPorts = new Set(); - #fsInterfaceReader = null; - #cache = Object.create(null); +class AbstractMain { + _comPorts = new Set(); + _collection = null; + _cache = Object.create(null); /** * Constructor - * - * @param {@ui5/fs/fsInterface} fsInterfaceReader Reader for the Resources */ - constructor(fsInterfaceReader) { - if (!fsInterfaceReader) { - throw new Error("fsInterfaceReader is mandatory argument"); + constructor(collection) { + if (!collection) { + throw new Error("collection is mandatory argument"); } - this.#fsInterfaceReader = fsInterfaceReader; + this._collection = collection; } /** @@ -98,7 +90,7 @@ export class FsMainThreadInterface { throw new Error("Communication channel is mandatory argument"); } - this.#comPorts.add(comPort); + this._comPorts.add(comPort); comPort.on("message", (e) => this.#onMessage(e, comPort)); comPort.on("close", () => comPort.close()); } @@ -110,16 +102,16 @@ export class FsMainThreadInterface { */ endCommunication(comPort) { comPort.close(); - this.#comPorts.delete(comPort); + this._comPorts.delete(comPort); } /** * Destroys the FsMainThreadInterface */ cleanup() { - this.#comPorts.forEach((comPort) => comPort.close()); - this.#cache = null; - this.#fsInterfaceReader = null; + this._comPorts.forEach((comPort) => comPort.close()); + this._cache = null; + this._collection = null; } /** @@ -132,61 +124,47 @@ export class FsMainThreadInterface { * @param {object} e.options Options for "readFile" action * @param {MessagePort} comPort The communication channel */ - #onMessage(e, comPort) { - switch (e.action) { - case "readFile": - this.#doRequest(comPort, {action: "readFile", fsPath: e.fsPath, options: e.options}); - break; - case "stat": - this.#doRequest(comPort, {action: "stat", fsPath: e.fsPath}); - break; - } - } + async #onMessage(e, comPort) { + const {action, args, key: cacheKey} = e; - /** - * Requests a Resource from the "@ui5/fs/fsInterface" and sends it to the worker threads - * via postMessage. - * - * @param {MessagePort} comPort The communication channel - * @param {object} parameters - * @param {string} parameters.action Action to perform. Corresponds to the names of - * the public methods of "@ui5/fs/fsInterface" and triggers this method of the - * "@ui5/fs/fsInterface" instance. - * @param {string} parameters.fsPath Path of the Resource - * @param {object} parameters.options Options for "readFile" action - */ - async #doRequest(comPort, {action, fsPath, options}) { - const cacheKey = `${fsPath}-${action}`; - if (!this.#cache[cacheKey]) { - this.#cache[cacheKey] = new Promise((res) => { - if (action === "readFile") { - this.#fsInterfaceReader.readFile(fsPath, options, (error, result) => res({error, result})); - } else if (action === "stat") { - this.#fsInterfaceReader.stat(fsPath, (error, result) => - // The Stat object has some special methods that sometimes cannot be serialized - // properly in the postMessage. In this scenario, we do not need those methods, - // but just to check whether stats has resolved to something. - res(JSON.parse(JSON.stringify({error, result}))) - ); - } else { - res({error: new Error(`Action "${action}" is not available.`), result: null}); - } - }); + if (!this._cache[cacheKey]) { + this._cache[cacheKey] = this.get(action, args); } - const fromCache = await this.#cache[cacheKey]; - comPort.postMessage({action, fsPath, ...fromCache}); + const fromCache = await this._cache[cacheKey]; + comPort.postMessage({action, key: cacheKey, ...fromCache}); + } + + get(method) { + throw new Error(`${method} method's handler has to be implemented`); } } /** * "@ui5/fs/fsInterface" like class that uses internally * "@ui5/fs/fsInterface", implements its methods, and - * requests resources via MessagePort. + * sends the results to a MessagePort. * - * Used in the main thread in a combination with FsMainThreadInterface. + * Used in the main thread in a combination with FsWorkerThreadInterface. */ -export class FsWorkerThreadInterface { +export class FsMainThreadInterface extends AbstractMain { + constructor(fsInterfacePort) { + super(fsInterfacePort); + } + + get(method, args) { + const {fsPath, options} = args; + const composedArgs = [fsPath, options].filter(($) => $ !== undefined); + + return new Promise((resolve) => { + this._collection[method](...composedArgs, (error, result) => { + resolve({error, result}); + }); + }); + } +} + +class AbstractThread { #comPort = null; #callbacks = []; #cache = Object.create(null); @@ -217,14 +195,20 @@ export class FsWorkerThreadInterface { * @param {object} e.error Error from the "action". */ #onMessage(e) { - const cbObject = this.#callbacks.find((cb) => cb.action === e.action && cb.fsPath === e.fsPath); + const cbObject = this.#callbacks.find((cb) => cb.key === e.key); if (cbObject) { - this.#cache[`${e.fsPath}-${e.action}`] = {error: e.error, result: e.result}; + this.#cache[e.key] = { + error: e.error, + result: e.result, + }; this.#callbacks.splice(this.#callbacks.indexOf(cbObject), 1); cbObject.callback(e.error, e.result); } else { - throw new Error("No callback found for this message! Possible hang for the thread!", e); + throw new Error( + "No callback found for this message! Possible hang for the thread!", + e + ); } } @@ -246,24 +230,34 @@ export class FsWorkerThreadInterface { * @param {object} parameters.options Options for "readFile" action * @param {Function} callback Callback to call when the "action" is executed and ready. */ - #doRequest({action, fsPath, options}, callback) { - const cacheKey = `${fsPath}-${action}`; - - if (this.#cache[cacheKey]) { - const {result, error} = this.#cache[cacheKey]; + _doRequest({action, key, args}, callback) { + // fsPath, options + if (this.#cache[key]) { + const {result, error} = this.#cache[key]; callback(error, result); } else { - this.#callbacks.push({action, fsPath, callback}); - this.#comPort.postMessage({action, fsPath, options}); + this.#callbacks.push({key, callback}); + this.#comPort.postMessage({action, key, args}); } } +} +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * requests resources via MessagePort. + * + * Used in the worker thread in a combination with FsMainThreadInterface. + */ +export class FsWorkerThreadInterface extends AbstractThread { readFile(fsPath, options, callback) { - this.#doRequest({action: "readFile", fsPath, options}, callback); + const key = `${fsPath}-readFile`; + this._doRequest({action: "readFile", key, args: {fsPath, options}}, callback); } stat(fsPath, callback) { - this.#doRequest({action: "stat", fsPath}, callback); + const key = `${fsPath}-stat`; + this._doRequest({action: "stat", key, args: {fsPath}}, callback); } } From a7d5f0cad16324d0713d88bcad3e2c62d301ba42 Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Mon, 13 Nov 2023 14:58:38 +0200 Subject: [PATCH 07/15] Move code across modules --- lib/lbt/utils/PoolDispatcher.js | 194 +++++++++++----------- lib/processors/themeBuilderWorker.js | 230 +-------------------------- lib/tasks/buildThemes.js | 2 +- 3 files changed, 100 insertions(+), 326 deletions(-) diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js index 00bc89e8b..b58106974 100644 --- a/lib/lbt/utils/PoolDispatcher.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -2,6 +2,7 @@ import workerpool from "workerpool"; import os from "node:os"; import {fileURLToPath} from "node:url"; import {getLogger} from "@ui5/logger"; +import {createResource} from "@ui5/fs/resourceFactory"; const MIN_WORKERS = 2; const MAX_WORKERS = 4; @@ -120,28 +121,48 @@ export class PoolDispatcher { } /** - * "@ui5/fs/fsInterface" like class that uses internally - * "@ui5/fs/fsInterface", implements its methods, and - * sends the results to a MessagePort. + * Casts @ui5/fs/Resource-s into an Uint8Array transferable object * - * Used in the main thread in a combination with FsWorkerThreadInterface. + * @param {@ui5/fs/Resource[]} resourceCollection + * @returns {Promise} */ -export class FsMainThreadInterface { - #comPorts = new Set(); - #fsInterfaceReader = null; - #cache = Object.create(null); +export async function serializeResources(resourceCollection) { + return Promise.all( + resourceCollection.map(async (res) => ({ + buffer: await res.getBuffer(), + path: res.getPath() + })) + ); +} + +/** + * Casts Uint8Array into @ui5/fs/Resource-s transferable object + * + * @param {Promise} resources + * @returns {@ui5/fs/Resource[]} + */ +export function deserializeResources(resources) { + return resources.map((res) => { + // res.buffer is an Uint8Array object and needs to be cast + // to a Buffer in order to be read correctly. + return createResource({path: res.path, buffer: Buffer.from(res.buffer)}); + }); +} + +class AbstractMain { + _comPorts = new Set(); + _collection = null; + _cache = Object.create(null); /** * Constructor - * - * @param {@ui5/fs/fsInterface} fsInterfaceReader Reader for the Resources */ - constructor(fsInterfaceReader) { - if (!fsInterfaceReader) { - throw new Error("fsInterfaceReader is mandatory argument"); + constructor(collection) { + if (!collection) { + throw new Error("collection is mandatory argument"); } - this.#fsInterfaceReader = fsInterfaceReader; + this._collection = collection; } /** @@ -154,7 +175,7 @@ export class FsMainThreadInterface { throw new Error("Communication channel is mandatory argument"); } - this.#comPorts.add(comPort); + this._comPorts.add(comPort); comPort.on("message", (e) => this.#onMessage(e, comPort)); comPort.on("close", () => comPort.close()); } @@ -166,16 +187,16 @@ export class FsMainThreadInterface { */ endCommunication(comPort) { comPort.close(); - this.#comPorts.delete(comPort); + this._comPorts.delete(comPort); } /** * Destroys the FsMainThreadInterface */ cleanup() { - this.#comPorts.forEach((comPort) => comPort.close()); - this.#cache = null; - this.#fsInterfaceReader = null; + this._comPorts.forEach((comPort) => comPort.close()); + this._cache = null; + this._collection = null; } /** @@ -188,74 +209,23 @@ export class FsMainThreadInterface { * @param {object} e.options Options for "readFile" action * @param {MessagePort} comPort The communication channel */ - #onMessage(e, comPort) { - switch (e.action) { - case "readFile": - this.#doRequest(comPort, { - action: "readFile", - fsPath: e.fsPath, - options: e.options, - }); - break; - case "stat": - this.#doRequest(comPort, {action: "stat", fsPath: e.fsPath}); - break; - } - } + async #onMessage(e, comPort) { + const {action, args, key: cacheKey} = e; - /** - * Requests a Resource from the "@ui5/fs/fsInterface" and sends it to the worker threads - * via postMessage. - * - * @param {MessagePort} comPort The communication channel - * @param {object} parameters - * @param {string} parameters.action Action to perform. Corresponds to the names of - * the public methods of "@ui5/fs/fsInterface" and triggers this method of the - * "@ui5/fs/fsInterface" instance. - * @param {string} parameters.fsPath Path of the Resource - * @param {object} parameters.options Options for "readFile" action - */ - async #doRequest(comPort, {action, fsPath, options}) { - const cacheKey = `${fsPath}-${action}`; - if (!this.#cache[cacheKey]) { - this.#cache[cacheKey] = new Promise((res) => { - if (action === "readFile") { - this.#fsInterfaceReader.readFile( - fsPath, - options, - (error, result) => res({error, result}) - ); - } else if (action === "stat") { - this.#fsInterfaceReader.stat(fsPath, (error, result) => - // The Stat object has some special methods that sometimes cannot be serialized - // properly in the postMessage. In this scenario, we do not need those methods, - // but just to check whether stats has resolved to something. - res(JSON.parse(JSON.stringify({error, result}))) - ); - } else { - res({ - error: new Error( - `Action "${action}" is not available.` - ), - result: null, - }); - } - }); + if (!this._cache[cacheKey]) { + this._cache[cacheKey] = this.get(action, args); } - const fromCache = await this.#cache[cacheKey]; - comPort.postMessage({action, fsPath, ...fromCache}); + const fromCache = await this._cache[cacheKey]; + comPort.postMessage({action, key: cacheKey, ...fromCache}); + } + + get(method) { + throw new Error(`${method} method's handler has to be implemented`); } } -/** - * "@ui5/fs/fsInterface" like class that uses internally - * "@ui5/fs/fsInterface", implements its methods, and - * requests resources via MessagePort. - * - * Used in the worker thread in a combination with FsMainThreadInterface. - */ -export class FsWorkerThreadInterface { +class AbstractThread { #comPort = null; #callbacks = []; #cache = Object.create(null); @@ -286,12 +256,10 @@ export class FsWorkerThreadInterface { * @param {object} e.error Error from the "action". */ #onMessage(e) { - const cbObject = this.#callbacks.find( - (cb) => cb.action === e.action && cb.fsPath === e.fsPath - ); + const cbObject = this.#callbacks.find((cb) => cb.key === e.key); if (cbObject) { - this.#cache[`${e.fsPath}-${e.action}`] = { + this.#cache[e.key] = { error: e.error, result: e.result, }; @@ -319,27 +287,61 @@ export class FsWorkerThreadInterface { * @param {object} parameters * @param {string} parameters.action Action to perform. Corresponds to the names of * the public methods. - * @param {string} parameters.fsPath Path of the Resource - * @param {object} parameters.options Options for "readFile" action + * @param {string} parameters.key + * @param {object} parameters.args * @param {Function} callback Callback to call when the "action" is executed and ready. */ - #doRequest({action, fsPath, options}, callback) { - const cacheKey = `${fsPath}-${action}`; - - if (this.#cache[cacheKey]) { - const {result, error} = this.#cache[cacheKey]; + _doRequest({action, key, args}, callback) { + // fsPath, options + if (this.#cache[key]) { + const {result, error} = this.#cache[key]; callback(error, result); } else { - this.#callbacks.push({action, fsPath, callback}); - this.#comPort.postMessage({action, fsPath, options}); + this.#callbacks.push({key, callback}); + this.#comPort.postMessage({action, key, args}); } } +} + +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * sends the results to a MessagePort. + * + * Used in the main thread in a combination with FsWorkerThreadInterface. + */ +export class FsMainThreadInterface extends AbstractMain { + constructor(fsInterfacePort) { + super(fsInterfacePort); + } + + get(method, args) { + const {fsPath, options} = args; + const composedArgs = [fsPath, options].filter(($) => $ !== undefined); + + return new Promise((resolve) => { + this._collection[method](...composedArgs, (error, result) => { + resolve({error, result}); + }); + }); + } +} +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * requests resources via MessagePort. + * + * Used in the worker thread in a combination with FsMainThreadInterface. + */ +export class FsWorkerThreadInterface extends AbstractThread { readFile(fsPath, options, callback) { - this.#doRequest({action: "readFile", fsPath, options}, callback); + const key = `${fsPath}-readFile`; + this._doRequest({action: "readFile", key, args: {fsPath, options}}, callback); } stat(fsPath, callback) { - this.#doRequest({action: "stat", fsPath}, callback); + const key = `${fsPath}-stat`; + this._doRequest({action: "stat", key, args: {fsPath}}, callback); } } diff --git a/lib/processors/themeBuilderWorker.js b/lib/processors/themeBuilderWorker.js index 59d605e63..57eeee75b 100644 --- a/lib/processors/themeBuilderWorker.js +++ b/lib/processors/themeBuilderWorker.js @@ -1,8 +1,6 @@ import workerpool from "workerpool"; import themeBuilder from "./themeBuilder.js"; -import {createResource} from "@ui5/fs/resourceFactory"; -import {Buffer} from "node:buffer"; -import { callbackify } from "node:util"; +import {deserializeResources, serializeResources, FsWorkerThreadInterface} from "../lbt/utils/PoolDispatcher.js"; /** * Task to build library themes. @@ -35,232 +33,6 @@ export default async function execThemeBuild({ return serializeResources(result); } -/** - * Casts @ui5/fs/Resource-s into an Uint8Array transferable object - * - * @param {@ui5/fs/Resource[]} resourceCollection - * @returns {Promise} - */ -export async function serializeResources(resourceCollection) { - return Promise.all( - resourceCollection.map(async (res) => ({ - buffer: await res.getBuffer(), - path: res.getPath() - })) - ); -} - -/** - * Casts Uint8Array into @ui5/fs/Resource-s transferable object - * - * @param {Promise} resources - * @returns {@ui5/fs/Resource[]} - */ -export function deserializeResources(resources) { - return resources.map((res) => { - // res.buffer is an Uint8Array object and needs to be cast - // to a Buffer in order to be read correctly. - return createResource({path: res.path, buffer: Buffer.from(res.buffer)}); - }); -} - -class AbstractMain { - _comPorts = new Set(); - _collection = null; - _cache = Object.create(null); - - /** - * Constructor - */ - constructor(collection) { - if (!collection) { - throw new Error("collection is mandatory argument"); - } - - this._collection = collection; - } - - /** - * Adds MessagePort and starts listening for requests on it. - * - * @param {MessagePort} comPort port1 from a {code}MessageChannel{/code} - */ - startCommunication(comPort) { - if (!comPort) { - throw new Error("Communication channel is mandatory argument"); - } - - this._comPorts.add(comPort); - comPort.on("message", (e) => this.#onMessage(e, comPort)); - comPort.on("close", () => comPort.close()); - } - - /** - * Ends MessagePort communication. - * - * @param {MessagePort} comPort port1 to remove from handling. - */ - endCommunication(comPort) { - comPort.close(); - this._comPorts.delete(comPort); - } - - /** - * Destroys the FsMainThreadInterface - */ - cleanup() { - this._comPorts.forEach((comPort) => comPort.close()); - this._cache = null; - this._collection = null; - } - - /** - * Handles messages from the MessagePort - * - * @param {object} e data to construct the request - * @param {string} e.action Action to perform. Corresponds to the names of - * the public methods of "@ui5/fs/fsInterface" - * @param {string} e.fsPath Path of the Resource - * @param {object} e.options Options for "readFile" action - * @param {MessagePort} comPort The communication channel - */ - async #onMessage(e, comPort) { - const {action, args, key: cacheKey} = e; - - if (!this._cache[cacheKey]) { - this._cache[cacheKey] = this.get(action, args); - } - - const fromCache = await this._cache[cacheKey]; - comPort.postMessage({action, key: cacheKey, ...fromCache}); - } - - get(method) { - throw new Error(`${method} method's handler has to be implemented`); - } -} - -/** - * "@ui5/fs/fsInterface" like class that uses internally - * "@ui5/fs/fsInterface", implements its methods, and - * sends the results to a MessagePort. - * - * Used in the main thread in a combination with FsWorkerThreadInterface. - */ -export class FsMainThreadInterface extends AbstractMain { - constructor(fsInterfacePort) { - super(fsInterfacePort); - } - - get(method, args) { - const {fsPath, options} = args; - const composedArgs = [fsPath, options].filter(($) => $ !== undefined); - - return new Promise((resolve) => { - this._collection[method](...composedArgs, (error, result) => { - resolve({error, result}); - }); - }); - } -} - -class AbstractThread { - #comPort = null; - #callbacks = []; - #cache = Object.create(null); - - /** - * Constructor - * - * @param {MessagePort} comPort Communication port - */ - constructor(comPort) { - if (!comPort) { - throw new Error("Communication port is mandatory argument"); - } - - this.#comPort = comPort; - comPort.on("message", this.#onMessage.bind(this)); - comPort.on("close", this.#onClose.bind(this)); - } - - /** - * Handles messages from MessagePort - * - * @param {object} e - * @param {string} e.action Action to perform. Corresponds to the names of - * the public methods of "@ui5/fs/fsInterface" - * @param {string} e.fsPath Path of the Resource - * @param {*} e.result Response from the "action". - * @param {object} e.error Error from the "action". - */ - #onMessage(e) { - const cbObject = this.#callbacks.find((cb) => cb.key === e.key); - - if (cbObject) { - this.#cache[e.key] = { - error: e.error, - result: e.result, - }; - this.#callbacks.splice(this.#callbacks.indexOf(cbObject), 1); - cbObject.callback(e.error, e.result); - } else { - throw new Error( - "No callback found for this message! Possible hang for the thread!", - e - ); - } - } - - /** - * End communication - */ - #onClose() { - this.#comPort.close(); - this.#cache = null; - } - - /** - * Makes a request via the MessagePort - * - * @param {object} parameters - * @param {string} parameters.action Action to perform. Corresponds to the names of - * the public methods. - * @param {string} parameters.fsPath Path of the Resource - * @param {object} parameters.options Options for "readFile" action - * @param {Function} callback Callback to call when the "action" is executed and ready. - */ - _doRequest({action, key, args}, callback) { - // fsPath, options - if (this.#cache[key]) { - const {result, error} = this.#cache[key]; - callback(error, result); - } else { - this.#callbacks.push({key, callback}); - this.#comPort.postMessage({action, key, args}); - } - } -} - -/** - * "@ui5/fs/fsInterface" like class that uses internally - * "@ui5/fs/fsInterface", implements its methods, and - * requests resources via MessagePort. - * - * Used in the worker thread in a combination with FsMainThreadInterface. - */ -export class FsWorkerThreadInterface extends AbstractThread { - readFile(fsPath, options, callback) { - const key = `${fsPath}-readFile`; - this._doRequest({action: "readFile", key, args: {fsPath, options}}, callback); - } - - stat(fsPath, callback) { - const key = `${fsPath}-stat`; - this._doRequest({action: "stat", key, args: {fsPath}}, callback); - } -} - // Test execution via ava is never done on the main thread /* istanbul ignore else */ if (!workerpool.isMainThread) { diff --git a/lib/tasks/buildThemes.js b/lib/tasks/buildThemes.js index 4660b1d15..3ed4fcb1b 100644 --- a/lib/tasks/buildThemes.js +++ b/lib/tasks/buildThemes.js @@ -6,7 +6,7 @@ const log = getLogger("builder:tasks:buildThemes"); import {fileURLToPath} from "node:url"; import os from "node:os"; import workerpool from "workerpool"; -import {deserializeResources, serializeResources, FsMainThreadInterface} from "../processors/themeBuilderWorker.js"; +import {deserializeResources, serializeResources, FsMainThreadInterface} from "../lbt/utils/PoolDispatcher.js"; let pool; From 4e6b3ccad3f45aad0c392159f933a8951b7d237f Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Tue, 14 Nov 2023 09:18:43 +0200 Subject: [PATCH 08/15] Start migration to the new API --- lib/lbt/utils/PoolDispatcher.js | 64 ++++++++++++++++++++++++++++ lib/lbt/utils/TaskProcessorThread.js | 1 + lib/tasks/buildThemes.js | 40 +++++++++++------ 3 files changed, 93 insertions(+), 12 deletions(-) diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js index b58106974..54eea6e11 100644 --- a/lib/lbt/utils/PoolDispatcher.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -54,6 +54,9 @@ export class PoolDispatcher { return { execute: async (methodName, args) => { const buildUpArgs = {url, methodName, args}; + // {url, methodName, args} + const {resources, workspace, dependencies, options} = args; + return this.#getPool().exec("execInThread", [buildUpArgs]); } }; @@ -345,3 +348,64 @@ export class FsWorkerThreadInterface extends AbstractThread { this._doRequest({action: "stat", key, args: {fsPath}}, callback); } } + +export class DuplexCollectionMainInterface extends AbstractMain { + constructor(collection) { + super(collection); + } + + get(method, args) { + const {virPattern, virPath, resource, options} = args; + const composedArgs = [virPattern, virPath, resource, options].filter(($) => $ !== undefined); + + return new Promise((resolve) => { + this._collection[method](...composedArgs, (error, result) => { + resolve({error, result}); + }); + }); + } +} + +export class DuplexCollectionThreadInterface extends AbstractThread { + #promisifyRequest(args) { + return new Promise((resolve, reject) => { + this._doRequest(args, (error, result) => { + if (error) { + reject(error); + } else { + resolve(result); + } + }); + }); + } + + byGlob(virPattern, options) { + const key = virPattern; + + return this.#promisifyRequest({ + action: "byGlob", + key, + args: {virPattern, options}, + }); + } + + byPath(virPath, options) { + const key = virPath; + + return this.#promisifyRequest({ + action: "byPath", + key, + args: {virPath, options}, + }); + } + + write(resource, options) { + const key = resource.getName(); + + return this.#promisifyRequest({ + action: "write", + key, + args: {resource, options}, + }); + } +} diff --git a/lib/lbt/utils/TaskProcessorThread.js b/lib/lbt/utils/TaskProcessorThread.js index 6bc890ab7..73d25f91f 100644 --- a/lib/lbt/utils/TaskProcessorThread.js +++ b/lib/lbt/utils/TaskProcessorThread.js @@ -4,6 +4,7 @@ export default async function execInThread({url, methodName, args}) { const moduleToRegister = await import(url); const methodCall = moduleToRegister[methodName] || moduleToRegister["default"]; + // return await methodCall({resources, workspace, dependencies, options}); return await methodCall(args); } diff --git a/lib/tasks/buildThemes.js b/lib/tasks/buildThemes.js index 3ed4fcb1b..a044bda97 100644 --- a/lib/tasks/buildThemes.js +++ b/lib/tasks/buildThemes.js @@ -6,7 +6,12 @@ const log = getLogger("builder:tasks:buildThemes"); import {fileURLToPath} from "node:url"; import os from "node:os"; import workerpool from "workerpool"; -import {deserializeResources, serializeResources, FsMainThreadInterface} from "../lbt/utils/PoolDispatcher.js"; +import { + deserializeResources, + serializeResources, + DuplexCollectionMainInterface, + PoolDispatcher, +} from "../lbt/utils/PoolDispatcher.js"; let pool; @@ -33,10 +38,20 @@ function getPool(taskUtil) { return pool; } -async function buildThemeInWorker(taskUtil, options, transferList) { - const toTransfer = transferList ? {transfer: transferList} : undefined; +async function buildThemeInWorker(taskUtil, options, transferList, {resources, workspace, dependencies}) { + // const toTransfer = transferList ? {transfer: transferList} : undefined; + // return getPool(taskUtil).exec("execThemeBuild", [options], toTransfer); - return getPool(taskUtil).exec("execThemeBuild", [options], toTransfer); + const args = { + options, + resources: await serializeResources(resources), + workspace: new DuplexCollectionMainInterface(workspace), + dependencies: new DuplexCollectionMainInterface(dependencies), + }; + const url = fileURLToPath(new URL("../processors/themeBuilderWorker.js", import.meta.url)); + + const processor = PoolDispatcher.getInstance().getProcessor(url); + return processor.execute("execThemeBuild", args); } @@ -172,29 +187,30 @@ export default async function({ let processedResources; const useWorkers = !!taskUtil; if (useWorkers) { - const threadMessageHandler = new FsMainThreadInterface(fsInterface(combo)); + // const threadMessageHandler = new FsMainThreadInterface(fsInterface(combo)); processedResources = await Promise.all(themeResources.map(async (themeRes) => { - const {port1, port2} = new MessageChannel(); - threadMessageHandler.startCommunication(port1); + // const {port1, port2} = new MessageChannel(); + // threadMessageHandler.startCommunication(port1); const result = await buildThemeInWorker(taskUtil, { - fsInterfacePort: port2, - themeResources: await serializeResources([themeRes]), + // fsInterfacePort: port2, + // themeResources: await serializeResources([themeRes]), options: { compress, cssVariables: !!cssVariables, }, - }, [port2]); + }, [undefined], + {resources: [themeRes], workspace, dependencies}); - threadMessageHandler.endCommunication(port1); + // threadMessageHandler.endCommunication(port1); return result; })) .then((resources) => Array.prototype.concat.apply([], resources)) .then(deserializeResources); - threadMessageHandler.cleanup(); + // threadMessageHandler.cleanup(); } else { // Do not use workerpool const themeBuilder = (await import("../processors/themeBuilder.js")).default; From 21a8c465029591843c294370aea9ce96b6b2cfa0 Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Tue, 14 Nov 2023 18:36:53 +0200 Subject: [PATCH 09/15] Align API with the spec --- lib/lbt/utils/PoolDispatcher.js | 37 +++++++++++++++++++++++----- lib/lbt/utils/TaskProcessorThread.js | 27 ++++++++++++++++---- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js index 54eea6e11..9a6cd53a7 100644 --- a/lib/lbt/utils/PoolDispatcher.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -50,14 +50,39 @@ export class PoolDispatcher { return PoolDispatcher.#instance; } - getProcessor(url) { + getProcessor(modulePath) { return { execute: async (methodName, args) => { - const buildUpArgs = {url, methodName, args}; - // {url, methodName, args} - const {resources, workspace, dependencies, options} = args; - - return this.#getPool().exec("execInThread", [buildUpArgs]); + const {resources, fs, options} = args; + const buildUpArgs = {modulePath, methodName, args: {options}}; + const useTransfers = !!fs; // TODO: Workaround- themeBuild uses fs, while minify- not + let toTransfer; + let threadMessageHandler; + let fsInterfaceMainPort; + + if (useTransfers) { + const {port1, port2} = new MessageChannel(); + fsInterfaceMainPort = port1; + buildUpArgs.args.fs = port2; + toTransfer = {transfer: [port2]}; + + threadMessageHandler = new FsMainThreadInterface(fs); + threadMessageHandler.startCommunication(fsInterfaceMainPort); + } + + if (resources) { + buildUpArgs.args.resources = await serializeResources(resources); + } + + const result = await this.#getPool().exec("execInThread", [buildUpArgs], toTransfer); + + if (useTransfers) { + threadMessageHandler.endCommunication(fsInterfaceMainPort); + // TODO: Workaround- themeBuild. Returns resources and uses fs, but minify returns a plain object. + return deserializeResources(result); + } + + return result; // TODO: Workaround- minify } }; } diff --git a/lib/lbt/utils/TaskProcessorThread.js b/lib/lbt/utils/TaskProcessorThread.js index 73d25f91f..c39209057 100644 --- a/lib/lbt/utils/TaskProcessorThread.js +++ b/lib/lbt/utils/TaskProcessorThread.js @@ -1,11 +1,28 @@ import workerpool from "workerpool"; +import {deserializeResources, FsWorkerThreadInterface, serializeResources} from "./PoolDispatcher.js"; -export default async function execInThread({url, methodName, args}) { - const moduleToRegister = await import(url); - const methodCall = moduleToRegister[methodName] || moduleToRegister["default"]; +export default async function execInThread({modulePath, methodName, args}) { + const moduleToExecute = await import(modulePath); + const methodCall = moduleToExecute[methodName] || moduleToExecute["default"]; + const {options, resources, fs} = args; - // return await methodCall({resources, workspace, dependencies, options}); - return await methodCall(args); + const buildUpArgs = {options}; + + if (resources) { + buildUpArgs.resources = await deserializeResources(resources); + } + if (fs) { + buildUpArgs.fs = new FsWorkerThreadInterface(fs); + } + + const result = await methodCall(buildUpArgs); + + if (fs) { + // TODO: Workaround- themeBuild. Returns resources and uses fs, but minify returns a plain object. + return serializeResources(result); + } else { + return result; // TODO: Workaround- minify + } } // Test execution via ava is never done on the main thread From cfee13da24890bd20fd0250a53877ca72de2dc48 Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Tue, 14 Nov 2023 18:37:08 +0200 Subject: [PATCH 10/15] Migrate minifier to the new API --- lib/processors/minifier.js | 17 +++++----- lib/processors/minifierWorker.js | 55 ++++++++++++++++---------------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/lib/processors/minifier.js b/lib/processors/minifier.js index e386f0b0a..2a4c892d9 100644 --- a/lib/processors/minifier.js +++ b/lib/processors/minifier.js @@ -11,11 +11,10 @@ const debugFileRegex = /((?:\.view|\.fragment|\.controller|\.designtime|\.suppor const sourceMappingUrlPattern = /\/\/# sourceMappingURL=(\S+)\s*$/; const httpPattern = /^https?:\/\//i; -async function minifyInWorker(options, taskUtil) { +async function minifyInWorker(args) { const url = fileURLToPath(new URL("./minifierWorker.js", import.meta.url)); - const processor = PoolDispatcher.getInstance().getProcessor(url); - return processor.execute("execMinification", options); + return processor.execute("execMinification", args); } async function extractAndRemoveSourceMappingUrl(resource) { @@ -225,11 +224,13 @@ export default async function({ } const result = await minify({ - filename, - dbgFilename, - code, - sourceMapOptions - }, taskUtil); + options: { + filename, + dbgFilename, + code, + sourceMapOptions, + }, + }); resource.setString(result.code); const sourceMapResource = new Resource({ path: resource.getPath() + ".map", diff --git a/lib/processors/minifierWorker.js b/lib/processors/minifierWorker.js index a0057c234..cc6fd7028 100644 --- a/lib/processors/minifierWorker.js +++ b/lib/processors/minifierWorker.js @@ -26,43 +26,42 @@ const copyrightCommentsAndBundleCommentPattern = /copyright|\(c\)(?:[0-9]+|\s+[0 * @static * * @param {object} parameters Parameters - * @param {string} parameters.filename - * @param {string} parameters.dbgFilename - * @param {string} parameters.code - * @param {object} parameters.sourceMapOptions + * @param {string} parameters.options + * @param {string} parameters.options.filename + * @param {string} parameters.options.dbgFilename + * @param {string} parameters.options.code + * @param {object} parameters.options.sourceMapOptions * @returns {Promise} Promise resolving once minification of the resource has finished */ export default async function execMinification({ - filename, - dbgFilename, - code, - sourceMapOptions + options: {filename, dbgFilename, code, sourceMapOptions}, }) { try { - return await minify({ - // Use debug-name since this will be referenced in the source map "sources" - [dbgFilename]: code - }, { - output: { - comments: copyrightCommentsAndBundleCommentPattern, - wrap_func_args: false + return await minify( + { + // Use debug-name since this will be referenced in the source map "sources" + [dbgFilename]: code, }, - compress: false, - mangle: { - reserved: [ - "jQuery", - "jquery", - "sap", - ] - }, - sourceMap: sourceMapOptions - }); + { + output: { + comments: copyrightCommentsAndBundleCommentPattern, + wrap_func_args: false, + }, + compress: false, + mangle: { + reserved: ["jQuery", "jquery", "sap"], + }, + sourceMap: sourceMapOptions, + } + ); } catch (err) { // Note: err.filename contains the debug-name throw new Error( `Minification failed with error: ${err.message} in file ${filename} ` + - `(line ${err.line}, col ${err.col}, pos ${err.pos})`, { - cause: err - }); + `(line ${err.line}, col ${err.col}, pos ${err.pos})`, + { + cause: err, + } + ); } } From 56399807bca17878574587ea4ab7ad5eab32cca6 Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Tue, 14 Nov 2023 18:37:19 +0200 Subject: [PATCH 11/15] Migrate buildThemes to the new API --- lib/processors/themeBuilderWorker.js | 44 ---------------- lib/tasks/buildThemes.js | 77 ++++------------------------ 2 files changed, 9 insertions(+), 112 deletions(-) delete mode 100644 lib/processors/themeBuilderWorker.js diff --git a/lib/processors/themeBuilderWorker.js b/lib/processors/themeBuilderWorker.js deleted file mode 100644 index 57eeee75b..000000000 --- a/lib/processors/themeBuilderWorker.js +++ /dev/null @@ -1,44 +0,0 @@ -import workerpool from "workerpool"; -import themeBuilder from "./themeBuilder.js"; -import {deserializeResources, serializeResources, FsWorkerThreadInterface} from "../lbt/utils/PoolDispatcher.js"; - -/** - * Task to build library themes. - * - * @private - * @function default - * @static - * - * @param {object} parameters Parameters - * @param {MessagePort} parameters.fsInterfacePort - * @param {object[]} parameters.themeResources Input array of Uint8Array transferable objects - * that are the less sources to build upon. By nature those are @ui5/fs/Resource. - * @param {object} parameters.options Less compiler options - * @returns {Promise} Resulting array of Uint8Array transferable objects - */ -export default async function execThemeBuild({ - fsInterfacePort, - themeResources = [], - options = {} -}) { - const fsThemeResources = deserializeResources(themeResources); - const fsReader = new FsWorkerThreadInterface(fsInterfacePort); - - const result = await themeBuilder({ - resources: fsThemeResources, - fs: fsReader, - options - }); - - return serializeResources(result); -} - -// Test execution via ava is never done on the main thread -/* istanbul ignore else */ -if (!workerpool.isMainThread) { - // Script got loaded through workerpool - // => Create a worker and register public functions - workerpool.worker({ - execThemeBuild - }); -} diff --git a/lib/tasks/buildThemes.js b/lib/tasks/buildThemes.js index a044bda97..5e37a1329 100644 --- a/lib/tasks/buildThemes.js +++ b/lib/tasks/buildThemes.js @@ -4,55 +4,7 @@ import ReaderCollectionPrioritized from "@ui5/fs/ReaderCollectionPrioritized"; import {getLogger} from "@ui5/logger"; const log = getLogger("builder:tasks:buildThemes"); import {fileURLToPath} from "node:url"; -import os from "node:os"; -import workerpool from "workerpool"; -import { - deserializeResources, - serializeResources, - DuplexCollectionMainInterface, - PoolDispatcher, -} from "../lbt/utils/PoolDispatcher.js"; - -let pool; - -function getPool(taskUtil) { - if (!pool) { - const MIN_WORKERS = 2; - const MAX_WORKERS = 4; - const osCpus = os.cpus().length || 1; - const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); - - log.verbose(`Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})`); - const workerPath = fileURLToPath(new URL("../processors/themeBuilderWorker.js", import.meta.url)); - pool = workerpool.pool(workerPath, { - workerType: "thread", - maxWorkers - }); - taskUtil.registerCleanupTask(() => { - log.verbose(`Terminating workerpool`); - const poolToBeTerminated = pool; - pool = null; - poolToBeTerminated.terminate(); - }); - } - return pool; -} - -async function buildThemeInWorker(taskUtil, options, transferList, {resources, workspace, dependencies}) { - // const toTransfer = transferList ? {transfer: transferList} : undefined; - // return getPool(taskUtil).exec("execThemeBuild", [options], toTransfer); - - const args = { - options, - resources: await serializeResources(resources), - workspace: new DuplexCollectionMainInterface(workspace), - dependencies: new DuplexCollectionMainInterface(dependencies), - }; - const url = fileURLToPath(new URL("../processors/themeBuilderWorker.js", import.meta.url)); - - const processor = PoolDispatcher.getInstance().getProcessor(url); - return processor.execute("execThemeBuild", args); -} +import {PoolDispatcher} from "../lbt/utils/PoolDispatcher.js"; /** @@ -187,30 +139,19 @@ export default async function({ let processedResources; const useWorkers = !!taskUtil; if (useWorkers) { - // const threadMessageHandler = new FsMainThreadInterface(fsInterface(combo)); - + const modulePath = fileURLToPath(new URL("../processors/themeBuilder.js", import.meta.url)); + const processor = PoolDispatcher.getInstance().getProcessor(modulePath); processedResources = await Promise.all(themeResources.map(async (themeRes) => { - // const {port1, port2} = new MessageChannel(); - // threadMessageHandler.startCommunication(port1); - - const result = await buildThemeInWorker(taskUtil, { - // fsInterfacePort: port2, - // themeResources: await serializeResources([themeRes]), + return processor.execute("default", { + resources: [themeRes], + fs: fsInterface(combo), options: { compress, cssVariables: !!cssVariables, - }, - }, [undefined], - {resources: [themeRes], workspace, dependencies}); - - // threadMessageHandler.endCommunication(port1); - - return result; + } + }); })) - .then((resources) => Array.prototype.concat.apply([], resources)) - .then(deserializeResources); - - // threadMessageHandler.cleanup(); + .then((resources) => Array.prototype.concat.apply([], resources)); } else { // Do not use workerpool const themeBuilder = (await import("../processors/themeBuilder.js")).default; From e4b74971dec1560440085d4e672d0b127ffa05d2 Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Tue, 14 Nov 2023 18:57:18 +0200 Subject: [PATCH 12/15] Stabilize tests --- lib/lbt/utils/PoolDispatcher.js | 18 +++++++++--------- test/lib/processors/minifier.js | 3 --- test/lib/tasks/buildThemes.js | 2 +- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js index 9a6cd53a7..874898ee3 100644 --- a/lib/lbt/utils/PoolDispatcher.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -3,6 +3,7 @@ import os from "node:os"; import {fileURLToPath} from "node:url"; import {getLogger} from "@ui5/logger"; import {createResource} from "@ui5/fs/resourceFactory"; +import {setTimeout as setTimeoutPromise} from "node:timers/promises"; const MIN_WORKERS = 2; const MAX_WORKERS = 4; @@ -88,12 +89,10 @@ export class PoolDispatcher { } async cleanup(project) { - const attemptPoolTermination = () => { + const attemptPoolTermination = async () => { if (this.#projectBuilders.length) { this.#log.verbose( - `Pool termination canceled. Still pending projects to build: ${this.#projectBuilders.map( - (project) => project.getName() - )}` + `Pool termination canceled. Still pending projects to build: ${this.#projectBuilders.length}` ); return; } @@ -109,11 +108,12 @@ export class PoolDispatcher { // There are many stats that could be used, but these ones seem the most // convenient. When all the (available) workers are idle, then it's safe to terminate. - const {idleWorkers, totalWorkers} = this.#pool.stats(); - if (idleWorkers !== totalWorkers) { - return new Promise((resolve) => - setTimeout(() => resolve(attemptPoolTermination()), 100) // Retry after a while - ); + // There are many stats that could be used, but these ones seem the most + // convenient. When all the (available) workers are idle, then it's safe to terminate. + let {idleWorkers, totalWorkers} = this.#pool.stats(); + while (idleWorkers !== totalWorkers) { + await setTimeoutPromise(100); // Wait a bit workers to finish and try again + ({idleWorkers, totalWorkers} = this.#pool.stats()); } return this.terminateTasks(/* terminate gracefully */); diff --git a/test/lib/processors/minifier.js b/test/lib/processors/minifier.js index a2ebf6a3d..30fa4f087 100644 --- a/test/lib/processors/minifier.js +++ b/test/lib/processors/minifier.js @@ -104,9 +104,6 @@ ${SOURCE_MAPPING_URL}=test.controller.js.map`; "mappings": ";;;AAGC,SAASA,OAAOC,GACfC,OAAOC,IAAIC,QAAQ,aACnBC,QAAQC,IAAI,qBACb,CACDN" }); t.deepEqual(await sourceMapResource.getString(), expectedSourceMap, "Correct source map content"); - - // Call to registerCleanupTask indicates worker pool was used - t.is(taskUtilMock.registerCleanupTask.callCount, 1, "taskUtil#registerCleanupTask got called once"); }); test("minifier with useWorkers: true and missing taskUtil", async (t) => { diff --git a/test/lib/tasks/buildThemes.js b/test/lib/tasks/buildThemes.js index 48fd06a8f..84505d45d 100644 --- a/test/lib/tasks/buildThemes.js +++ b/test/lib/tasks/buildThemes.js @@ -1,7 +1,7 @@ import test from "ava"; import sinon from "sinon"; import esmock from "esmock"; -import {deserializeResources} from "../../../lib/processors/themeBuilderWorker.js"; +import {deserializeResources} from "../../../lib/lbt/utils/PoolDispatcher.js"; let buildThemes; test.before(async () => { From aebb8aba3e032e547f8d90bb536c4c7f962703c4 Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Tue, 14 Nov 2023 19:05:59 +0200 Subject: [PATCH 13/15] Fix eslint errors --- lib/lbt/utils/PoolDispatcher.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js index 874898ee3..7b1e15976 100644 --- a/lib/lbt/utils/PoolDispatcher.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -184,6 +184,8 @@ class AbstractMain { /** * Constructor + * + * @param {object} collection */ constructor(collection) { if (!collection) { From 9be0a997d05b5fd9b3de6dc70eb3ead6d8498a6f Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Thu, 16 Nov 2023 10:41:13 +0200 Subject: [PATCH 14/15] Bugfixes --- lib/lbt/utils/PoolDispatcher.js | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js index 7b1e15976..245b7a422 100644 --- a/lib/lbt/utils/PoolDispatcher.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -36,7 +36,7 @@ export class PoolDispatcher { constructor() { if (!PoolDispatcher.#ensureSingleton) { throw new Error( - "Constructor must not be called! This is a singleton class. Use Parallelizer.getInstance()" + "Constructor must not be called! This is a singleton class. Use PoolDispatcher.getInstance()" ); } } @@ -345,13 +345,23 @@ export class FsMainThreadInterface extends AbstractMain { super(fsInterfacePort); } + #parseResults(method, result) { + // Stats object cannot be sent over postMessage. + // Cast it to simple object that is alike the stats. + if (method === "stat" && !!result) { + return JSON.parse(JSON.stringify(result)); + } else { + return result; + } + } + get(method, args) { const {fsPath, options} = args; const composedArgs = [fsPath, options].filter(($) => $ !== undefined); return new Promise((resolve) => { this._collection[method](...composedArgs, (error, result) => { - resolve({error, result}); + resolve({error, result: this.#parseResults(method, result)}); }); }); } From 231ef6985afed128f3fd1dbd02459bf91e8d2854 Mon Sep 17 00:00:00 2001 From: Yavor Ivanov Date: Mon, 20 Nov 2023 11:19:26 +0200 Subject: [PATCH 15/15] Implement serializer for seamless thread result handling --- lib/lbt/utils/PoolDispatcher.js | 53 +++++++++++++++++++++++++--- lib/lbt/utils/TaskProcessorThread.js | 11 ++---- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js index 245b7a422..ae5c25670 100644 --- a/lib/lbt/utils/PoolDispatcher.js +++ b/lib/lbt/utils/PoolDispatcher.js @@ -1,8 +1,10 @@ import workerpool from "workerpool"; import os from "node:os"; +import {Buffer} from "node:buffer"; import {fileURLToPath} from "node:url"; import {getLogger} from "@ui5/logger"; import {createResource} from "@ui5/fs/resourceFactory"; +import Resource from "@ui5/fs/Resource"; import {setTimeout as setTimeoutPromise} from "node:timers/promises"; const MIN_WORKERS = 2; @@ -55,7 +57,7 @@ export class PoolDispatcher { return { execute: async (methodName, args) => { const {resources, fs, options} = args; - const buildUpArgs = {modulePath, methodName, args: {options}}; + const buildUpArgs = {modulePath, methodName, args: {options: await serializeData(options)}}; const useTransfers = !!fs; // TODO: Workaround- themeBuild uses fs, while minify- not let toTransfer; let threadMessageHandler; @@ -79,11 +81,9 @@ export class PoolDispatcher { if (useTransfers) { threadMessageHandler.endCommunication(fsInterfaceMainPort); - // TODO: Workaround- themeBuild. Returns resources and uses fs, but minify returns a plain object. - return deserializeResources(result); } - return result; // TODO: Workaround- minify + return deserializeData(result); } }; } @@ -177,6 +177,51 @@ export function deserializeResources(resources) { }); } +function isPojo(obj) { + const proto = Object.prototype; + const gpo = Object.getPrototypeOf; + + if (obj === null || typeof obj !== "object") { + return false; + } + return gpo(obj) === proto; +} + +function isFsResourceLikeTransfer(input) { + return isPojo(input) && + input["buffer"] && (Buffer.isBuffer(input.buffer) || ArrayBuffer.isView(input.buffer)) && + input["path"] && typeof input["path"] === "string"; +} + +export async function serializeData(input) { + if (Array.isArray(input) || isPojo(input)) { + for (const prop in input) { + if (Object.hasOwn(input, prop)) { + input[prop] = await serializeData(input[prop]); + } + } + } else if (input instanceof Resource) { + return (await serializeResources([input]))[0]; + } + + return input; +} + +export async function deserializeData(input) { + // Resource like transferrable object that could be converted to a @ui5/fs/Resource + if (isFsResourceLikeTransfer(input)) { + return (await deserializeResources([input]))[0]; + } else if (Array.isArray(input) || isPojo(input)) { + for (const prop in input) { + if (Object.hasOwn(input, prop)) { + input[prop] = await deserializeData(input[prop]); + } + } + } + + return input; +} + class AbstractMain { _comPorts = new Set(); _collection = null; diff --git a/lib/lbt/utils/TaskProcessorThread.js b/lib/lbt/utils/TaskProcessorThread.js index c39209057..3e9c600e0 100644 --- a/lib/lbt/utils/TaskProcessorThread.js +++ b/lib/lbt/utils/TaskProcessorThread.js @@ -1,12 +1,12 @@ import workerpool from "workerpool"; -import {deserializeResources, FsWorkerThreadInterface, serializeResources} from "./PoolDispatcher.js"; +import {FsWorkerThreadInterface, deserializeResources, serializeData, deserializeData} from "./PoolDispatcher.js"; export default async function execInThread({modulePath, methodName, args}) { const moduleToExecute = await import(modulePath); const methodCall = moduleToExecute[methodName] || moduleToExecute["default"]; const {options, resources, fs} = args; - const buildUpArgs = {options}; + const buildUpArgs = {options: await deserializeData(options)}; if (resources) { buildUpArgs.resources = await deserializeResources(resources); @@ -17,12 +17,7 @@ export default async function execInThread({modulePath, methodName, args}) { const result = await methodCall(buildUpArgs); - if (fs) { - // TODO: Workaround- themeBuild. Returns resources and uses fs, but minify returns a plain object. - return serializeResources(result); - } else { - return result; // TODO: Workaround- minify - } + return serializeData(result); } // Test execution via ava is never done on the main thread