Skip to content

Commit

Permalink
[Miniflare 3] Add support for routing to multiple Workers (#520)
Browse files Browse the repository at this point in the history
* Add support for routing to multiple Workers

This change ports Miniflare 2's `mounts` router to Miniflare 3. This
attempts to replicate the logic of the `route(s)` field in the
Wrangler configuration file:
https://developers.cloudflare.com/workers/platform/triggers/routes/#matching-behavior

Internally, this is implemented by binding all routable workers as
service bindings in the entry service. The first worker is always
bound as a "fallback", in case no routes match. Validation has been
added to ensure we a) have a fallback, b) don't have workers with
duplicate names that would cause bindings with the same name, and c)
all routable/fallback workers have code so they actually get added
as `workerd` services.

* Require code for all Workers

It doesn't really make sense to have Workers without code. This
change updates our `zod` schemas to encode this requirement.

* fixup! Add support for routing to multiple Workers

Assert names unique when collecting routes

* fixup! Add support for routing to multiple Workers

Move `CORE_PLUGIN_NAME` back to `core`

* fixup! Add support for routing to multiple Workers

Add specific error message when defining multiple unnamed workers

* fixup! Add support for routing to multiple Workers

Use `Map` when de-duping services

* fixup! Add support for routing to multiple Workers

Extract out common plugin/namespace/persist Worker into function

* fixup! Add support for routing to multiple Workers

Use same specificity calculation for routes as internal service

* fixup! fixup! Add support for routing to multiple Workers
  • Loading branch information
mrbbot authored Mar 11, 2023
1 parent 4cb0dd9 commit 7d032ee
Show file tree
Hide file tree
Showing 18 changed files with 715 additions and 248 deletions.
85 changes: 63 additions & 22 deletions packages/tre/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import {
Plugins,
SERVICE_ENTRY,
SOCKET_ENTRY,
SharedOptions,
WorkerOptions,
getGlobalServices,
maybeGetSitesManifestModule,
normaliseDurableObject,
} from "./plugins";
Expand Down Expand Up @@ -68,20 +71,12 @@ import {
Mutex,
NoOpLog,
OptionalZodTypeOf,
UnionToIntersection,
ValueOf,
defaultClock,
} from "./shared";
import { anyAbortSignal } from "./shared/signal";
import { waitForRequest } from "./wait";

// ===== `Miniflare` User Options =====
export type WorkerOptions = UnionToIntersection<
z.infer<ValueOf<Plugins>["options"]>
>;
export type SharedOptions = UnionToIntersection<
z.infer<Exclude<ValueOf<Plugins>["sharedOptions"], undefined>>
>;
export type MiniflareOptions = SharedOptions &
(WorkerOptions | { workers: WorkerOptions[] });

Expand All @@ -100,6 +95,9 @@ function validateOptions(
const sharedOpts = opts;
const multipleWorkers = "workers" in opts;
const workerOpts = multipleWorkers ? opts.workers : [opts];
if (workerOpts.length === 0) {
throw new MiniflareCoreError("ERR_NO_WORKERS", "No workers defined");
}

// Initialise return values
const pluginSharedOpts = {} as PluginSharedOptions;
Expand All @@ -109,16 +107,31 @@ function validateOptions(

// Validate all options
for (const [key, plugin] of PLUGIN_ENTRIES) {
// @ts-expect-error pluginSharedOpts[key] could be any plugin's
pluginSharedOpts[key] = plugin.sharedOptions?.parse(sharedOpts);
for (let i = 0; i < workerOpts.length; i++) {
// Make sure paths are correct in validation errors
const path = multipleWorkers ? ["workers", i] : undefined;
// @ts-expect-error pluginWorkerOpts[i][key] could be any plugin's
// @ts-expect-error `CoreOptionsSchema` has required options which are
// missing in other plugins' options.
pluginWorkerOpts[i][key] = plugin.options.parse(workerOpts[i], { path });
}
}

// Validate names unique
const names = new Set<string>();
for (const opts of pluginWorkerOpts) {
const name = opts.core.name ?? "";
if (names.has(name)) {
throw new MiniflareCoreError(
"ERR_DUPLICATE_NAME",
name === ""
? "Multiple workers defined without a `name`"
: `Multiple workers defined with the same \`name\`: "${name}"`
);
}
names.add(name);
}

return [pluginSharedOpts, pluginWorkerOpts];
}

Expand Down Expand Up @@ -150,6 +163,21 @@ function getDurableObjectClassNames(
return serviceClassNames;
}

// Collects all routes from all worker services
function getWorkerRoutes(
allWorkerOpts: PluginWorkerOptions[]
): Map<string, string[]> {
const allRoutes = new Map<string, string[]>();
for (const workerOpts of allWorkerOpts) {
if (workerOpts.core.routes !== undefined) {
const name = workerOpts.core.name ?? "";
assert(!allRoutes.has(name));
allRoutes.set(name, workerOpts.core.routes);
}
}
return allRoutes;
}

// ===== `Miniflare` Internal Storage & Routing =====
type OptionalGatewayFactoryType<
Gateway extends GatewayConstructor<any> | undefined
Expand Down Expand Up @@ -622,7 +650,24 @@ export class Miniflare {

sharedOpts.core.cf = await setupCf(this.#log, sharedOpts.core.cf);

const services: Service[] = [];
const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);
const allWorkerRoutes = getWorkerRoutes(allWorkerOpts);

// Use Map to dedupe services by name
const services = new Map<string, Service>();
const globalServices = getGlobalServices({
optionsVersion,
sharedOptions: sharedOpts.core,
allWorkerRoutes,
fallbackWorkerName: this.#workerOpts[0].core.name,
loopbackPort,
});
for (const service of globalServices) {
// Global services should all have unique names
assert(service.name !== undefined && !services.has(service.name));
services.set(service.name, service);
}

const sockets: Socket[] = [
{
name: SOCKET_ENTRY,
Expand All @@ -633,18 +678,15 @@ export class Miniflare {
},
];

const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);

// Dedupe services by name
const serviceNames = new Set<string>();

for (let i = 0; i < allWorkerOpts.length; i++) {
const workerOpts = allWorkerOpts[i];

// Collect all bindings from this worker
const workerBindings: Worker_Binding[] = [];
const additionalModules: Worker_Module[] = [];
for (const [key, plugin] of PLUGIN_ENTRIES) {
// @ts-expect-error `CoreOptionsSchema` has required options which are
// missing in other plugins' options.
const pluginBindings = await plugin.getBindings(workerOpts[key], i);
if (pluginBindings !== undefined) {
workerBindings.push(...pluginBindings);
Expand All @@ -661,28 +703,27 @@ export class Miniflare {
for (const [key, plugin] of PLUGIN_ENTRIES) {
const pluginServices = await plugin.getServices({
log: this.#log,
// @ts-expect-error `CoreOptionsSchema` has required options which are
// missing in other plugins' options.
options: workerOpts[key],
optionsVersion,
sharedOptions: sharedOpts[key],
workerBindings,
workerIndex: i,
durableObjectClassNames,
additionalModules,
loopbackPort,
tmpPath: this.#tmpPath,
});
if (pluginServices !== undefined) {
for (const service of pluginServices) {
if (service.name !== undefined && !serviceNames.has(service.name)) {
serviceNames.add(service.name);
services.push(service);
if (service.name !== undefined && !services.has(service.name)) {
services.set(service.name, service);
}
}
}
}
}

return { services, sockets };
return { services: Array.from(services.values()), sockets };
}

get ready(): Promise<URL> {
Expand Down
12 changes: 4 additions & 8 deletions packages/tre/src/plugins/cache/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { z } from "zod";
import { Worker_Binding } from "../../runtime";
import { SERVICE_LOOPBACK } from "../core";
import {
BINDING_SERVICE_LOOPBACK,
BINDING_TEXT_PERSIST,
Expand All @@ -9,6 +7,7 @@ import {
HEADER_PERSIST,
PersistenceSchema,
Plugin,
WORKER_BINDING_SERVICE_LOOPBACK,
encodePersist,
} from "../shared";
import { HEADER_CACHE_WARN_USAGE } from "./constants";
Expand All @@ -25,6 +24,7 @@ export const CacheSharedOptionsSchema = z.object({

const BINDING_JSON_CACHE_WARN_USAGE = "MINIFLARE_CACHE_WARN_USAGE";

const CACHE_SCRIPT_COMPAT_DATE = "2022-09-01";
export const CACHE_LOOPBACK_SCRIPT = `addEventListener("fetch", (event) => {
const request = new Request(event.request);
const url = new URL(request.url);
Expand Down Expand Up @@ -69,10 +69,6 @@ export const CACHE_PLUGIN: Plugin<
},
getServices({ sharedOptions, options, workerIndex }) {
const persistBinding = encodePersist(sharedOptions.cachePersist);
const loopbackBinding: Worker_Binding = {
name: BINDING_SERVICE_LOOPBACK,
service: { name: SERVICE_LOOPBACK },
};
return [
{
name: getCacheServiceName(workerIndex),
Expand All @@ -87,9 +83,9 @@ export const CACHE_PLUGIN: Plugin<
name: BINDING_JSON_CACHE_WARN_USAGE,
json: JSON.stringify(options.cacheWarnUsage ?? false),
},
loopbackBinding,
WORKER_BINDING_SERVICE_LOOPBACK,
],
compatibilityDate: "2022-09-01",
compatibilityDate: CACHE_SCRIPT_COMPAT_DATE,
},
},
];
Expand Down
17 changes: 7 additions & 10 deletions packages/tre/src/plugins/core/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { z } from "zod";
import { Request, Response } from "../../../http";
import { Log } from "../../../shared";
import {
ModuleDefinition,
SourceOptions,
contentsToString,
maybeGetStringScriptPathIndex,
} from "../modules";
Expand Down Expand Up @@ -42,12 +42,6 @@ import { getSourceMapper } from "./sourcemap";
// [ii] { script: "<contents:3>", modules: true }, -> "<script:3>"
// ]
//
export interface SourceOptions {
script?: string;
scriptPath?: string;
modules?: boolean | ModuleDefinition[];
modulesRoot?: string;
}

interface SourceFile {
path?: string; // Path may be undefined if file is in-memory
Expand Down Expand Up @@ -80,7 +74,8 @@ function maybeGetFile(
// ((g)[ii], (h)[ii]) custom `contents`, use those.
for (const srcOpts of workerSrcOpts) {
if (Array.isArray(srcOpts.modules)) {
const modulesRoot = srcOpts.modulesRoot;
const modulesRoot =
"modulesRoot" in srcOpts ? srcOpts.modulesRoot : undefined;
// Handle cases (h)[i] and (h)[ii], by re-resolving file relative to
// module root if any
const modulesRootedFilePath =
Expand All @@ -106,6 +101,8 @@ function maybeGetFile(
// 2. If path matches any `scriptPath`s with custom `script`s, use those
for (const srcOpts of workerSrcOpts) {
if (
"scriptPath" in srcOpts &&
"script" in srcOpts &&
srcOpts.scriptPath !== undefined &&
srcOpts.script !== undefined &&
path.resolve(srcOpts.scriptPath) === filePath
Expand All @@ -120,7 +117,7 @@ function maybeGetFile(
const workerIndex = maybeGetStringScriptPathIndex(file);
if (workerIndex !== undefined) {
const srcOpts = workerSrcOpts[workerIndex];
if (srcOpts.script !== undefined) {
if ("script" in srcOpts && srcOpts.script !== undefined) {
return { contents: srcOpts.script };
}
}
Expand All @@ -139,7 +136,7 @@ function maybeGetFile(
file === "worker.js" &&
(srcOpts.modules === undefined || srcOpts.modules === false)
) {
if (srcOpts.script !== undefined) {
if ("script" in srcOpts && srcOpts.script !== undefined) {
// Cases: (a), (c)
// ...if a custom `script` is defined, use that, with the defined
// `scriptPath` if any (Case (c))
Expand Down
Loading

0 comments on commit 7d032ee

Please sign in to comment.