-
Notifications
You must be signed in to change notification settings - Fork 19
/
actor.ts
139 lines (132 loc) · 3.79 KB
/
actor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import { Timer } from "./performance";
import { CancelablePromise, IsTransferrable, Timing } from "./types";
import { withTimeout } from "./utils";
let id = 0;
interface Cancel {
type: "cancel";
id: number;
}
interface Response {
type: "response";
id: number;
error?: string;
response?: any;
timings: Timing;
}
interface Request {
type: "request";
id?: number;
name: string;
args: any[];
}
type Message = Cancel | Response | Request;
type MethodsReturning<T, R> = {
[K in keyof T]: T[K] extends (...args: any) => R ? T[K] : never;
};
/**
* Utility for sending messages to a remote instance of `<T>` running in a web worker
* from the main thread, or in the main thread running from a web worker.
*/
export default class Actor<T> {
callbacks: {
[id: number]: (
error: Error | undefined,
message: any,
timings: Timing,
) => void;
};
cancels: { [id: number]: () => void };
dest: Worker;
timeoutMs: number;
constructor(dest: Worker, dispatcher: any, timeoutMs: number = 20_000) {
this.callbacks = {};
this.cancels = {};
this.dest = dest;
this.timeoutMs = timeoutMs;
this.dest.onmessage = async ({ data }) => {
const message: Message = data;
if (message.type === "cancel") {
const cancel = this.cancels[message.id];
delete this.cancels[message.id];
if (cancel) {
cancel();
}
} else if (message.type === "response") {
const callback = this.callbacks[message.id];
delete this.callbacks[message.id];
if (callback) {
callback(
message.error ? new Error(message.error) : undefined,
message.response,
message.timings,
);
}
} else if (message.type === "request") {
const timer = new Timer("worker");
const handler: Function = (dispatcher as any)[message.name];
const request = handler.apply(handler, [...message.args, timer]);
const url = `${message.name}_${message.id}`;
if (message.id && request) {
this.cancels[message.id] = request.cancel;
try {
const response = await request.value;
const transferrables = (response as IsTransferrable)
?.transferrables;
this.postMessage(
{
id: message.id,
type: "response",
response,
timings: timer.finish(url),
},
transferrables,
);
} catch (e: any) {
this.postMessage({
id: message.id,
type: "response",
error: e?.toString() || "error",
timings: timer.finish(url),
});
}
delete this.cancels[message.id];
}
}
};
}
postMessage(message: Message, transferrables?: Transferable[]) {
this.dest.postMessage(message, transferrables || []);
}
/** Invokes a method by name with a set of arguments in the remote context. */
send<
R,
M extends MethodsReturning<T, CancelablePromise<R>>,
K extends keyof M & string,
P extends Parameters<M[K]>,
>(
name: K,
transferrables: Transferable[],
timer?: Timer,
...args: P
): CancelablePromise<R> {
const thisId = ++id;
const value: Promise<R> = new Promise((resolve, reject) => {
this.postMessage(
{ id: thisId, type: "request", name, args },
transferrables,
);
this.callbacks[thisId] = (error, result, timings) => {
timer?.addAll(timings);
if (error) reject(error);
else resolve(result);
};
});
return withTimeout(this.timeoutMs, {
value,
cancel: () => {
delete this.callbacks[thisId];
this.postMessage({ id: thisId, type: "cancel" });
},
});
}
}