Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INTERNAL] Optimise themeBuild with workerpool via fsinterface #925

Merged
merged 59 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
dd06b64
Integrate workerpool
d3xter666 Jul 4, 2023
fa52d6b
Use Adaptors to transfer content
d3xter666 Jul 5, 2023
b3ebec0
Make it work
d3xter666 Jul 6, 2023
13d1f00
Make it work with the pool
d3xter666 Jul 6, 2023
7ecba2b
Remove console.log-s
d3xter666 Jul 6, 2023
39bf701
Parallelize per library.source.less
d3xter666 Jul 7, 2023
afeda2e
Cache transferable resources
d3xter666 Jul 7, 2023
e4a32b4
Stash to FS
d3xter666 Jul 10, 2023
80ec516
Make it work with Transferable objects
d3xter666 Jul 10, 2023
5202393
Bugfixes
d3xter666 Jul 10, 2023
18be852
Modify tests for the new workerpool interface
d3xter666 Jul 10, 2023
8de95b6
Align build results with main branch
d3xter666 Jul 11, 2023
339e3c7
Provide correct resources to the pool threads
d3xter666 Jul 11, 2023
b5e6ddc
useWorkers=false by default
d3xter666 Jul 11, 2023
9a5b7a0
Add some comments
d3xter666 Jul 12, 2023
c67a0f1
Update docs
d3xter666 Jul 12, 2023
e92e803
Update lib/processors/themeBuilderWorker.js
d3xter666 Jul 12, 2023
5173ac6
Address code comments
d3xter666 Jul 12, 2023
eca6fde
Use different flows for workers and sequential build
d3xter666 Jul 12, 2023
4ef2d11
Align types
d3xter666 Jul 12, 2023
7197ae2
Revert tests to initial state and make them pass
d3xter666 Jul 12, 2023
67616b4
Add tests for the worker
d3xter666 Jul 13, 2023
c8e50d3
Refactor names
d3xter666 Jul 14, 2023
7c220ce
Set back useWorkers to the default value
d3xter666 Jul 17, 2023
2472244
Remove invalid code comments
d3xter666 Jul 17, 2023
310325c
Initial implementation of virtual FS
d3xter666 Jul 13, 2023
d8b9a6f
Cleanup
d3xter666 Jul 14, 2023
8933747
Close communication channels
d3xter666 Jul 14, 2023
12d87f5
Add perf measurements
d3xter666 Jul 14, 2023
06edec2
Refactor redundancies
d3xter666 Jul 14, 2023
09214bf
Cleanup
d3xter666 Jul 14, 2023
a5a4f3c
Add caching for the main thread reads
d3xter666 Jul 14, 2023
88aae13
Refactor names
d3xter666 Jul 14, 2023
b9e1c14
FsMainThreadInterface now supports multiple MessagePorts in order to …
d3xter666 Jul 17, 2023
7d3da11
More robust caching
d3xter666 Jul 17, 2023
bc2b3eb
Resolve conflicts
d3xter666 Jul 17, 2023
7ef227b
Fix tests
d3xter666 Jul 17, 2023
8331742
Enable real transferable object, but not copies
d3xter666 Jul 21, 2023
b9501aa
Use workerpool when taskUtil is provided
d3xter666 Jul 21, 2023
dc099a7
Cache also error messages
d3xter666 Jul 21, 2023
36e93be
Reverts 83317429eb4d2dcec69cc3760b5241c19447235c
d3xter666 Jul 21, 2023
71d6034
Remove redundant dependencies
d3xter666 Jul 21, 2023
5659993
Refactor code: rename vars
d3xter666 Jul 21, 2023
481d394
Add docblocks
d3xter666 Jul 21, 2023
c1d9942
Remove redundant parameters from doc blocks
d3xter666 Jul 21, 2023
d83d5c0
Fix tests
d3xter666 Jul 24, 2023
8c02fe7
Fix docs
d3xter666 Jul 24, 2023
2564da9
Update lib/processors/themeBuilderWorker.js
d3xter666 Jul 25, 2023
1c9f46d
Update lib/processors/themeBuilderWorker.js
d3xter666 Jul 25, 2023
d17fc24
Update lib/processors/themeBuilderWorker.js
d3xter666 Jul 25, 2023
590875d
Update lib/processors/themeBuilderWorker.js
d3xter666 Jul 25, 2023
bbdaea8
Update lib/processors/themeBuilderWorker.js
d3xter666 Jul 25, 2023
a65ae70
Update lib/tasks/buildThemes.js
d3xter666 Jul 25, 2023
8ca2062
Fix renames
d3xter666 Jul 25, 2023
7d4f974
Fix tests
d3xter666 Jul 25, 2023
831e3ea
Reorder code
d3xter666 Jul 25, 2023
ebf2ca0
Remove redundant actions from fsInterfaces
d3xter666 Jul 25, 2023
1048c44
Update lib/processors/themeBuilderWorker.js
d3xter666 Jul 25, 2023
c02c791
Safe serialization for stats reworked
d3xter666 Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 281 additions & 0 deletions lib/processors/themeBuilderWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
import workerpool from "workerpool";
import themeBuilder from "./themeBuilder.js";
import {createResource} from "@ui5/fs/resourceFactory";
import {Buffer} from "node:buffer";

/**
* Task to build library themes.
*
* @private
* @function default
* @static
*
* @param {object} parameters Parameters
* @param {MessagePort} parameters.fsInterfacePort
* @param {object[]} parameters.themeResources Input array of Uint8Array transferable objects
* that are the less sources to build upon. By nature those are @ui5/fs/Resource.
* @param {object} parameters.options Less compiler options
* @returns {Promise<object[]>} Resulting array of Uint8Array transferable objects
*/
export default async function execThemeBuild({
fsInterfacePort,
themeResources = [],
options = {}
}) {
const fsThemeResources = deserializeResources(themeResources);
const fsReader = new FsWorkerThreadInterface(fsInterfacePort);

const result = await themeBuilder({
resources: fsThemeResources,
fs: fsReader,
options
});

return serializeResources(result);
}

/**
* Casts @ui5/fs/Resource-s into an Uint8Array transferable object
*
* @param {@ui5/fs/Resource[]} resourceCollection
* @returns {Promise<object[]>}
*/
export async function serializeResources(resourceCollection) {
return Promise.all(
resourceCollection.map(async (res) => ({
buffer: await res.getBuffer(),
path: res.getPath()
}))
);
}

/**
* Casts Uint8Array into @ui5/fs/Resource-s transferable object
*
* @param {Promise<object[]>} resources
* @returns {@ui5/fs/Resource[]}
*/
export function deserializeResources(resources) {
return resources.map((res) => {
// res.buffer is an Uint8Array object and needs to be cast
// to a Buffer in order to be read correctly.
return createResource({path: res.path, buffer: Buffer.from(res.buffer)});
});
}

/**
* "@ui5/fs/fsInterface" like class that uses internally
* "@ui5/fs/fsInterface", implements its methods, and
* sends the results to a MessagePort.
*
* Used in the main thread in a combination with FsWorkerThreadInterface.
*/
export class FsMainThreadInterface {
#comPorts = new Set();
#fsInterfaceReader = null;
#cache = Object.create(null);
d3xter666 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Constructor
*
* @param {@ui5/fs/fsInterface} fsInterfaceReader Reader for the Resources
*/
constructor(fsInterfaceReader) {
if (!fsInterfaceReader) {
throw new Error("fsInterfaceReader is mandatory argument");
}

this.#fsInterfaceReader = fsInterfaceReader;
}

/**
* Adds MessagePort and starts listening for requests on it.
*
* @param {MessagePort} comPort port1 from a {code}MessageChannel{/code}
*/
startCommunication(comPort) {
if (!comPort) {
throw new Error("Communication channel is mandatory argument");
}

this.#comPorts.add(comPort);
comPort.on("message", (e) => this.#onMessage(e, comPort));
comPort.on("close", () => comPort.close());
}

/**
* Ends MessagePort communication.
*
* @param {MessagePort} comPort port1 to remove from handling.
*/
endCommunication(comPort) {
comPort.close();
this.#comPorts.delete(comPort);
}

/**
* Destroys the FsMainThreadInterface
*/
cleanup() {
this.#comPorts.forEach((comPort) => comPort.close());
this.#cache = null;
this.#fsInterfaceReader = null;
}

/**
* Handles messages from the MessagePort
*
* @param {object} e data to construct the request
* @param {string} e.action Action to perform. Corresponds to the names of
* the public methods of "@ui5/fs/fsInterface"
* @param {string} e.fsPath Path of the Resource
* @param {object} e.options Options for "readFile" action
* @param {MessagePort} comPort The communication channel
*/
#onMessage(e, comPort) {
switch (e.action) {
case "readFile":
this.#doRequest(comPort, {action: "readFile", fsPath: e.fsPath, options: e.options});
break;
case "stat":
this.#doRequest(comPort, {action: "stat", fsPath: e.fsPath});
break;
}
}

/**
* Requests a Resource from the "@ui5/fs/fsInterface" and sends it to the worker threads
* via postMessage.
*
* @param {MessagePort} comPort The communication channel
* @param {object} parameters
* @param {string} parameters.action Action to perform. Corresponds to the names of
* the public methods of "@ui5/fs/fsInterface" and triggers this method of the
* "@ui5/fs/fsInterface" instance.
* @param {string} parameters.fsPath Path of the Resource
* @param {object} parameters.options Options for "readFile" action
*/
async #doRequest(comPort, {action, fsPath, options}) {
const cacheKey = [fsPath, action].join("-");
d3xter666 marked this conversation as resolved.
Show resolved Hide resolved
if (!this.#cache[cacheKey]) {
this.#cache[cacheKey] = new Promise((res) => {
if (action === "readFile") {
this.#fsInterfaceReader.readFile(fsPath, options, (error, result) => res({error, result}));
} else if (action === "stat") {
this.#fsInterfaceReader.stat(fsPath, (error, result) => res({error, result}));
d3xter666 marked this conversation as resolved.
Show resolved Hide resolved
} else {
res({error: new Error(`Action "${action}" is not available.`), result: null});
}
});
}

let fromCache = await this.#cache[cacheKey];
if (action === "stat") {
// The result from an "action" is usually a simple data type,
// except for the stat. In that case we have a Stat object
// that has some special methods. In this case, we do not need those,
// but just to check whether there's something. The issue is that postMessage
// might not be able to serialize the Stat.
fromCache = JSON.parse(JSON.stringify(fromCache));
}
comPort.postMessage({action, fsPath, ...fromCache});
}
}

/**
* "@ui5/fs/fsInterface" like class that uses internally
* "@ui5/fs/fsInterface", implements its methods, and
* requests resources via MessagePort.
*
* Used in the main thread in a combination with FsMainThreadInterface.
*/
export class FsWorkerThreadInterface {
#comPort = null;
#callbacks = [];
#cache = Object.create(null);

/**
* Constructor
*
* @param {MessagePort} comPort Communication port
*/
constructor(comPort) {
if (!comPort) {
throw new Error("Communication port is mandatory argument");
}

this.#comPort = comPort;
comPort.on("message", this.#onMessage.bind(this));
comPort.on("close", this.#onClose.bind(this));
}

/**
* Handles messages from MessagePort
*
* @param {object} e
* @param {string} e.action Action to perform. Corresponds to the names of
* the public methods of "@ui5/fs/fsInterface"
* @param {string} e.fsPath Path of the Resource
* @param {*} e.result Response from the "action".
* @param {object} e.error Error from the "action".
*/
#onMessage(e) {
const cbObject = this.#callbacks.find((cb) => cb.action === e.action && cb.fsPath === e.fsPath);

if (cbObject) {
this.#cache[`${e.fsPath}-${e.action}`] = {error: e.error, result: e.result};
this.#callbacks.splice(this.#callbacks.indexOf(cbObject), 1);
cbObject.callback(e.error, e.result);
} else {
throw new Error("No callback found for this message! Possible hang for the thread!", e);
}
}

/**
* End communication
*/
#onClose() {
this.#comPort.close();
this.#cache = null;
}

/**
* Makes a request via the MessagePort
*
* @param {object} parameters
* @param {string} parameters.action Action to perform. Corresponds to the names of
* the public methods.
* @param {string} parameters.fsPath Path of the Resource
* @param {object} parameters.options Options for "readFile" action
* @param {Function} callback Callback to call when the "action" is executed and ready.
*/
#doRequest({action, fsPath, options}, callback) {
const cacheKey = `${fsPath}-${action}`;

if (this.#cache[cacheKey]) {
const {result, error} = this.#cache[cacheKey];
callback(error, result);
} else {
this.#callbacks.push({action, fsPath, callback});
this.#comPort.postMessage({action, fsPath, options});
}
}

readFile(fsPath, options, callback) {
this.#doRequest({action: "readFile", fsPath, options}, callback);
}

stat(fsPath, callback) {
this.#doRequest({action: "stat", fsPath}, callback);
}
}

// Test execution via ava is never done on the main thread
/* istanbul ignore else */
if (!workerpool.isMainThread) {
// Script got loaded through workerpool
// => Create a worker and register public functions
workerpool.worker({
execThemeBuild
});
}
Loading