Skip to content

Commit

Permalink
fix wasm runtime err not caught (#1714)
Browse files Browse the repository at this point in the history
  • Loading branch information
srliao authored Sep 21, 2023
1 parent a641b5f commit dc0c1f7
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 99 deletions.
174 changes: 88 additions & 86 deletions ui/packages/executors/src/WasmExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,27 @@ export class WasmExecutor implements Executor {
for (let i = 0; i < diff; i++) {
promises.push(new Promise<boolean>((resolve, reject) => {
const worker = new Worker(new URL("./Workers/worker.ts", import.meta.url));
worker.postMessage(SimWorker.ReadyRequest(this.wasmPath));
worker.postMessage(SimWorker.ReadyRequest(this.wasmPath));

const idx = this.workers.push(worker) - 1;
worker.onmessage = (ev) => {
switch (ev.data.type as SimWorker.Response) {
case SimWorker.Response.Ready:
resolve(true);
return;
case SimWorker.Response.Failed:
const idx = this.workers.push(worker) - 1;
worker.onmessage = (ev) => {
switch (ev.data.type as SimWorker.Response) {
case SimWorker.Response.Ready:
resolve(true);
return;
case SimWorker.Response.Failed:
reject("Worker " + idx + " " + (ev.data as SimWorker.FailedResponse).reason);
return;
}
};
return;
}
};
}));
}
return Promise.all(promises).then(() => true);
}

public run(
cfg: string, updateResult: (result: SimResults, hash: string) => void
): Promise<boolean | void> {
): Promise<boolean | void> {
this.isRunning = true;

// 1. Create Aggregator & Workers
Expand All @@ -106,106 +106,108 @@ export class WasmExecutor implements Executor {

// initialize aggregator
promises.push(new Promise<boolean>((resolve, reject) => {
if (this.aggregator == null) {
reject("Aggregator is null!");
return;
}

this.aggregator.onmessage = (ev) => {
switch (ev.data.type as Aggregator.Response) {
case Aggregator.Response.Initialized:
result = (ev.data as Aggregator.InitializeResponse).result;
maxIterations = result?.simulator_settings?.iterations ?? 1000;
resolve(true);
return;
case Aggregator.Response.Failed:
reject((ev.data as Aggregator.FailedResponse).reason);
return;
if (this.aggregator == null) {
reject("Aggregator is null!");
return;
}
};
this.aggregator.postMessage(Aggregator.InitializeRequest(cfg));
}));

// initialize workers
this.workers.forEach((worker) => {
promises.push(new Promise<boolean>((resolve, reject) => {
worker.onmessage = (ev) => {
switch (ev.data.type as SimWorker.Response) {
case SimWorker.Response.Initialized:
this.aggregator.onmessage = (ev) => {
switch (ev.data.type as Aggregator.Response) {
case Aggregator.Response.Initialized:
result = (ev.data as Aggregator.InitializeResponse).result;
maxIterations = result?.simulator_settings?.iterations ?? 1000;
resolve(true);
return;
case SimWorker.Response.Failed:
reject((ev.data as SimWorker.FailedResponse).reason);
case Aggregator.Response.Failed:
reject((ev.data as Aggregator.FailedResponse).reason);
return;
}
};
worker.postMessage(SimWorker.InitializeRequest(cfg));
this.aggregator.postMessage(Aggregator.InitializeRequest(cfg));
}));

// initialize workers
this.workers.forEach((worker) => {
promises.push(new Promise<boolean>((resolve, reject) => {
worker.onmessage = (ev) => {
switch (ev.data.type as SimWorker.Response) {
case SimWorker.Response.Initialized:
resolve(true);
return;
case SimWorker.Response.Failed:
reject((ev.data as SimWorker.FailedResponse).reason);
return;
}
};
worker.postMessage(SimWorker.InitializeRequest(cfg));
}));
});

return Promise.all(promises);
});

const throttledFlush = throttle(() => {
if (this.isRunning) {
this.aggregator?.postMessage(Aggregator.FlushRequest());
}
if (this.isRunning) {
this.aggregator?.postMessage(Aggregator.FlushRequest());
}
}, VIEWER_THROTTLE, { leading: true, trailing: true });

// 3. start execution
return initialized.then(() => {
if (this.aggregator == null) {
return Promise.reject("Aggregator is null!");
}

let completed = 0;
this.aggregator.onmessage = (ev) => {
switch (ev.data.type as Aggregator.Response) {
case Aggregator.Response.Result:
return new Promise((resolve, reject) => {
if (this.aggregator == null) {
reject("Aggregator is null!");
return;
}
let completed = 0;
this.aggregator.onmessage = (ev) => {
switch (ev.data.type as Aggregator.Response) {
case Aggregator.Response.Result:
const { hash, stats } = (ev.data as Aggregator.ResultResponse).result;

const out = Object.assign({}, result);
out.statistics = stats;
updateResult(out, hash);

if (completed >= maxIterations) {
this.isRunning = false;
return Promise.resolve(true);
}
return;
case Aggregator.Response.Done:
completed += 1;
throttledFlush();
return;
case Aggregator.Response.Failed:
// TODO: bug with throttled flush where a flush may happen after a cancel request.
// When this happens, the existing aggregator has no data and fails to flush.
// this doesnt cause any problems (yet) and just produces an error in console.
if (this.isRunning) {
return Promise.reject((ev.data as Aggregator.FailedResponse).reason);
}
}
};
const out = Object.assign({}, result);
out.statistics = stats;
updateResult(out, hash);

let requested = 0;
this.workers.forEach((worker) => {
worker.onmessage = (ev) => {
switch (ev.data.type as SimWorker.Response) {
case SimWorker.Response.Done:
const resp: SimWorker.RunResponse = ev.data;
this.aggregator?.postMessage(Aggregator.AddRequest(resp.result));
if (requested < maxIterations) {
worker.postMessage(SimWorker.RunRequest(requested++));
if (completed >= maxIterations) {
this.isRunning = false;
resolve(true);
}
return;
case SimWorker.Response.Failed:
return Promise.reject((ev.data as Aggregator.FailedResponse).reason);
case Aggregator.Response.Done:
completed += 1;
throttledFlush();
return;
case Aggregator.Response.Failed:
// TODO: bug with throttled flush where a flush may happen after a cancel request.
// When this happens, the existing aggregator has no data and fails to flush.
// this doesnt cause any problems (yet) and just produces an error in console.
if (this.isRunning) {
reject((ev.data as Aggregator.FailedResponse).reason);
}
}
};

if (requested < maxIterations) {
worker.postMessage(SimWorker.RunRequest(requested++));
}
let requested = 0;
this.workers.forEach((worker) => {
worker.onmessage = (ev) => {
switch (ev.data.type as SimWorker.Response) {
case SimWorker.Response.Done:
const resp: SimWorker.RunResponse = ev.data;
this.aggregator?.postMessage(Aggregator.AddRequest(resp.result));
if (requested < maxIterations) {
worker.postMessage(SimWorker.RunRequest(requested++));
}
return;
case SimWorker.Response.Failed:
reject((ev.data as Aggregator.FailedResponse).reason);
}
};

if (requested < maxIterations) {
worker.postMessage(SimWorker.RunRequest(requested++));
}
});
});
});
}
Expand Down
36 changes: 23 additions & 13 deletions ui/packages/executors/src/Workers/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ if (!WebAssembly.instantiateStreaming) {
function ready(req: { wasm: string }) {
const go = new Go();
WebAssembly.instantiateStreaming(fetch(req.wasm), go.importObject)
.then((result) => {
go.run(result.instance);
postMessage({ type: WorkerResponse.Ready });
}).catch((e) => {
console.error(e);
postMessage({
type: WorkerResponse.Failed,
reason: e instanceof Error ? e.message : "Unknown Error" });
.then((result) => {
go.run(result.instance);
postMessage({ type: WorkerResponse.Ready });
})
.catch((e) => {
console.error(e);
postMessage({
type: WorkerResponse.Failed,
reason: e instanceof Error ? e.message : "Unknown Error",
});
});
}

// @ts-ignore
Expand All @@ -35,11 +37,19 @@ function initialize(req: { cfg: string }) {
}

function run(req: { itr: number }) {
const resp = simulate();
if (typeof(resp) == "string" || resp instanceof String) {
return { type: WorkerResponse.Failed, reason: JSON.parse(resp as string).error };
try {
const resp = simulate();
if (typeof resp == "string" || resp instanceof String) {
return {
type: WorkerResponse.Failed,
reason: JSON.parse(resp as string).error,
};
}
return { type: WorkerResponse.Done, result: resp, itr: req.itr };
} catch (e) {
console.log("simulate() call failed");
return { type: WorkerResponse.Failed, reason: `Failed with error: ${e}` };
}
return { type: WorkerResponse.Done, result: resp, itr: req.itr };
}

// @ts-ignore
Expand Down Expand Up @@ -74,4 +84,4 @@ enum WorkerResponse {
Ready = "ready",
Initialized = "initialized",
Done = "done",
}
}

0 comments on commit dc0c1f7

Please sign in to comment.