Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Miniflare 3] Add support for routing to multiple Workers #520

Merged
merged 9 commits into from
Mar 11, 2023
85 changes: 63 additions & 22 deletions packages/tre/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import {
Plugins,
SERVICE_ENTRY,
SOCKET_ENTRY,
SharedOptions,
WorkerOptions,
getGlobalServices,
maybeGetSitesManifestModule,
normaliseDurableObject,
} from "./plugins";
Expand Down Expand Up @@ -68,20 +71,12 @@ import {
Mutex,
NoOpLog,
OptionalZodTypeOf,
UnionToIntersection,
ValueOf,
defaultClock,
} from "./shared";
import { anyAbortSignal } from "./shared/signal";
import { waitForRequest } from "./wait";

// ===== `Miniflare` User Options =====
export type WorkerOptions = UnionToIntersection<
z.infer<ValueOf<Plugins>["options"]>
>;
export type SharedOptions = UnionToIntersection<
z.infer<Exclude<ValueOf<Plugins>["sharedOptions"], undefined>>
>;
export type MiniflareOptions = SharedOptions &
(WorkerOptions | { workers: WorkerOptions[] });

Expand All @@ -100,6 +95,9 @@ function validateOptions(
const sharedOpts = opts;
const multipleWorkers = "workers" in opts;
const workerOpts = multipleWorkers ? opts.workers : [opts];
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
if (workerOpts.length === 0) {
throw new MiniflareCoreError("ERR_NO_WORKERS", "No workers defined");
}

// Initialise return values
const pluginSharedOpts = {} as PluginSharedOptions;
Expand All @@ -109,16 +107,31 @@ function validateOptions(

// Validate all options
for (const [key, plugin] of PLUGIN_ENTRIES) {
// @ts-expect-error pluginSharedOpts[key] could be any plugin's
pluginSharedOpts[key] = plugin.sharedOptions?.parse(sharedOpts);
for (let i = 0; i < workerOpts.length; i++) {
// Make sure paths are correct in validation errors
const path = multipleWorkers ? ["workers", i] : undefined;
// @ts-expect-error pluginWorkerOpts[i][key] could be any plugin's
// @ts-expect-error `CoreOptionsSchema` has required options which are
// missing in other plugins' options.
pluginWorkerOpts[i][key] = plugin.options.parse(workerOpts[i], { path });
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The // @ts-expect-errors appear to be because PluginSharedOptions has readonly keys, which can be fixed (and the errors removed) by removing the as const in packages/tre/src/plugins/index.ts. Would it be possible to make that change, or would it break something else? If it's not possible, could the // @ts-expect-error comments be updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I think removing the as const should be ok. It looks like that still keeps the specific Zod types so options inference/completions still work. Will make that change. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

839b02c (unfortunately the change to enforce code means CoreOptionsSchema has required options that aren't satisfied by the other options types, so we still need some // @ts-expect-errors)

}

// Validate names unique
const names = new Set<string>();
for (const opts of pluginWorkerOpts) {
const name = opts.core.name ?? "";
if (names.has(name)) {
throw new MiniflareCoreError(
"ERR_DUPLICATE_NAME",
name === ""
? "Multiple workers defined without a `name`"
: `Multiple workers defined with the same \`name\`: "${name}"`
);
}
names.add(name);
}

return [pluginSharedOpts, pluginWorkerOpts];
}

Expand Down Expand Up @@ -150,6 +163,21 @@ function getDurableObjectClassNames(
return serviceClassNames;
}

// Collects all routes from all worker services
function getWorkerRoutes(
allWorkerOpts: PluginWorkerOptions[]
): Map<string, string[]> {
const allRoutes = new Map<string, string[]>();
for (const workerOpts of allWorkerOpts) {
if (workerOpts.core.routes !== undefined) {
const name = workerOpts.core.name ?? "";
assert(!allRoutes.has(name));
allRoutes.set(name, workerOpts.core.routes);
}
}
return allRoutes;
}

// ===== `Miniflare` Internal Storage & Routing =====
type OptionalGatewayFactoryType<
Gateway extends GatewayConstructor<any> | undefined
Expand Down Expand Up @@ -622,7 +650,24 @@ export class Miniflare {

sharedOpts.core.cf = await setupCf(this.#log, sharedOpts.core.cf);

const services: Service[] = [];
const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);
const allWorkerRoutes = getWorkerRoutes(allWorkerOpts);

// Use Map to dedupe services by name
const services = new Map<string, Service>();
const globalServices = getGlobalServices({
optionsVersion,
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
sharedOptions: sharedOpts.core,
allWorkerRoutes,
fallbackWorkerName: this.#workerOpts[0].core.name,
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
loopbackPort,
});
for (const service of globalServices) {
// Global services should all have unique names
assert(service.name !== undefined && !services.has(service.name));
services.set(service.name, service);
}

const sockets: Socket[] = [
{
name: SOCKET_ENTRY,
Expand All @@ -633,18 +678,15 @@ export class Miniflare {
},
];

const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);

// Dedupe services by name
const serviceNames = new Set<string>();

for (let i = 0; i < allWorkerOpts.length; i++) {
const workerOpts = allWorkerOpts[i];

// Collect all bindings from this worker
const workerBindings: Worker_Binding[] = [];
const additionalModules: Worker_Module[] = [];
for (const [key, plugin] of PLUGIN_ENTRIES) {
// @ts-expect-error `CoreOptionsSchema` has required options which are
// missing in other plugins' options.
const pluginBindings = await plugin.getBindings(workerOpts[key], i);
if (pluginBindings !== undefined) {
workerBindings.push(...pluginBindings);
Expand All @@ -661,28 +703,27 @@ export class Miniflare {
for (const [key, plugin] of PLUGIN_ENTRIES) {
const pluginServices = await plugin.getServices({
log: this.#log,
// @ts-expect-error `CoreOptionsSchema` has required options which are
// missing in other plugins' options.
options: workerOpts[key],
optionsVersion,
sharedOptions: sharedOpts[key],
workerBindings,
workerIndex: i,
durableObjectClassNames,
additionalModules,
loopbackPort,
tmpPath: this.#tmpPath,
});
if (pluginServices !== undefined) {
for (const service of pluginServices) {
if (service.name !== undefined && !serviceNames.has(service.name)) {
serviceNames.add(service.name);
services.push(service);
if (service.name !== undefined && !services.has(service.name)) {
services.set(service.name, service);
}
}
}
}
}

return { services, sockets };
return { services: Array.from(services.values()), sockets };
}

get ready(): Promise<URL> {
Expand Down
12 changes: 4 additions & 8 deletions packages/tre/src/plugins/cache/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { z } from "zod";
import { Worker_Binding } from "../../runtime";
import { SERVICE_LOOPBACK } from "../core";
import {
BINDING_SERVICE_LOOPBACK,
BINDING_TEXT_PERSIST,
Expand All @@ -9,6 +7,7 @@ import {
HEADER_PERSIST,
PersistenceSchema,
Plugin,
WORKER_BINDING_SERVICE_LOOPBACK,
encodePersist,
} from "../shared";
import { HEADER_CACHE_WARN_USAGE } from "./constants";
Expand All @@ -25,6 +24,7 @@ export const CacheSharedOptionsSchema = z.object({

const BINDING_JSON_CACHE_WARN_USAGE = "MINIFLARE_CACHE_WARN_USAGE";

const CACHE_SCRIPT_COMPAT_DATE = "2022-09-01";
export const CACHE_LOOPBACK_SCRIPT = `addEventListener("fetch", (event) => {
const request = new Request(event.request);
const url = new URL(request.url);
Expand Down Expand Up @@ -69,10 +69,6 @@ export const CACHE_PLUGIN: Plugin<
},
getServices({ sharedOptions, options, workerIndex }) {
const persistBinding = encodePersist(sharedOptions.cachePersist);
const loopbackBinding: Worker_Binding = {
name: BINDING_SERVICE_LOOPBACK,
service: { name: SERVICE_LOOPBACK },
};
return [
{
name: getCacheServiceName(workerIndex),
Expand All @@ -87,9 +83,9 @@ export const CACHE_PLUGIN: Plugin<
name: BINDING_JSON_CACHE_WARN_USAGE,
json: JSON.stringify(options.cacheWarnUsage ?? false),
},
loopbackBinding,
WORKER_BINDING_SERVICE_LOOPBACK,
],
compatibilityDate: "2022-09-01",
compatibilityDate: CACHE_SCRIPT_COMPAT_DATE,
},
},
];
Expand Down
17 changes: 7 additions & 10 deletions packages/tre/src/plugins/core/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { z } from "zod";
import { Request, Response } from "../../../http";
import { Log } from "../../../shared";
import {
ModuleDefinition,
SourceOptions,
contentsToString,
maybeGetStringScriptPathIndex,
} from "../modules";
Expand Down Expand Up @@ -42,12 +42,6 @@ import { getSourceMapper } from "./sourcemap";
// [ii] { script: "<contents:3>", modules: true }, -> "<script:3>"
// ]
//
export interface SourceOptions {
script?: string;
scriptPath?: string;
modules?: boolean | ModuleDefinition[];
modulesRoot?: string;
}

interface SourceFile {
path?: string; // Path may be undefined if file is in-memory
Expand Down Expand Up @@ -80,7 +74,8 @@ function maybeGetFile(
// ((g)[ii], (h)[ii]) custom `contents`, use those.
for (const srcOpts of workerSrcOpts) {
if (Array.isArray(srcOpts.modules)) {
const modulesRoot = srcOpts.modulesRoot;
const modulesRoot =
"modulesRoot" in srcOpts ? srcOpts.modulesRoot : undefined;
// Handle cases (h)[i] and (h)[ii], by re-resolving file relative to
// module root if any
const modulesRootedFilePath =
Expand All @@ -106,6 +101,8 @@ function maybeGetFile(
// 2. If path matches any `scriptPath`s with custom `script`s, use those
for (const srcOpts of workerSrcOpts) {
if (
"scriptPath" in srcOpts &&
"script" in srcOpts &&
srcOpts.scriptPath !== undefined &&
srcOpts.script !== undefined &&
path.resolve(srcOpts.scriptPath) === filePath
Expand All @@ -120,7 +117,7 @@ function maybeGetFile(
const workerIndex = maybeGetStringScriptPathIndex(file);
if (workerIndex !== undefined) {
const srcOpts = workerSrcOpts[workerIndex];
if (srcOpts.script !== undefined) {
if ("script" in srcOpts && srcOpts.script !== undefined) {
return { contents: srcOpts.script };
}
}
Expand All @@ -139,7 +136,7 @@ function maybeGetFile(
file === "worker.js" &&
(srcOpts.modules === undefined || srcOpts.modules === false)
) {
if (srcOpts.script !== undefined) {
if ("script" in srcOpts && srcOpts.script !== undefined) {
// Cases: (a), (c)
// ...if a custom `script` is defined, use that, with the defined
// `scriptPath` if any (Case (c))
Expand Down
Loading