Skip to content

Commit

Permalink
Add support for routing to multiple Workers
Browse files Browse the repository at this point in the history
This change ports Miniflare 2's `mounts` router to Miniflare 3. This
attempts to replicate the logic of the `route(s)` field in the
Wrangler configuration file:
https://developers.cloudflare.com/workers/platform/triggers/routes/#matching-behavior

Internally, this is implemented by binding all routable workers as
service bindings in the entry service. The first worker is always
bound as a "fallback", in case no routes match. Validation has been
added to ensure we a) have a fallback, b) don't have workers with
duplicate names that would cause bindings with the same name, and c)
all routable/fallback workers have code so they actually get added
as `workerd` services.
  • Loading branch information
mrbbot committed Feb 27, 2023
1 parent 4cb0dd9 commit 9229b30
Show file tree
Hide file tree
Showing 14 changed files with 562 additions and 104 deletions.
50 changes: 45 additions & 5 deletions packages/tre/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
Plugins,
SERVICE_ENTRY,
SOCKET_ENTRY,
getGlobalServices,
maybeGetSitesManifestModule,
normaliseDurableObject,
} from "./plugins";
Expand Down Expand Up @@ -100,6 +101,9 @@ function validateOptions(
const sharedOpts = opts;
const multipleWorkers = "workers" in opts;
const workerOpts = multipleWorkers ? opts.workers : [opts];
if (workerOpts.length === 0) {
throw new MiniflareCoreError("ERR_NO_WORKERS", "No workers defined");
}

// Initialise return values
const pluginSharedOpts = {} as PluginSharedOptions;
Expand All @@ -119,6 +123,19 @@ function validateOptions(
}
}

// Validate names unique
const names = new Set<string>();
for (const opts of pluginWorkerOpts) {
const name = opts.core.name ?? "";
if (names.has(name)) {
throw new MiniflareCoreError(
"ERR_DUPLICATE_NAME",
`Multiple workers defined with the same name: "${name}"`
);
}
names.add(name);
}

return [pluginSharedOpts, pluginWorkerOpts];
}

Expand Down Expand Up @@ -150,6 +167,19 @@ function getDurableObjectClassNames(
return serviceClassNames;
}

// Collects all routes from all worker services
function getWorkerRoutes(
allWorkerOpts: PluginWorkerOptions[]
): Map<string, string[]> {
const allRoutes = new Map<string, string[]>();
for (const workerOpts of allWorkerOpts) {
if (workerOpts.core.routes !== undefined) {
allRoutes.set(workerOpts.core.name ?? "", workerOpts.core.routes);
}
}
return allRoutes;
}

// ===== `Miniflare` Internal Storage & Routing =====
type OptionalGatewayFactoryType<
Gateway extends GatewayConstructor<any> | undefined
Expand Down Expand Up @@ -622,7 +652,16 @@ export class Miniflare {

sharedOpts.core.cf = await setupCf(this.#log, sharedOpts.core.cf);

const services: Service[] = [];
const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);
const allWorkerRoutes = getWorkerRoutes(allWorkerOpts);

const services: Service[] = getGlobalServices({
optionsVersion,
sharedOptions: sharedOpts.core,
allWorkerRoutes,
fallbackWorkerName: this.#workerOpts[0].core.name,
loopbackPort,
});
const sockets: Socket[] = [
{
name: SOCKET_ENTRY,
Expand All @@ -633,10 +672,13 @@ export class Miniflare {
},
];

const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);

// Dedupe services by name
const serviceNames = new Set<string>();
for (const service of services) {
// Global services should all have unique names
assert(service.name !== undefined && !serviceNames.has(service.name));
serviceNames.add(service.name);
}

for (let i = 0; i < allWorkerOpts.length; i++) {
const workerOpts = allWorkerOpts[i];
Expand All @@ -662,13 +704,11 @@ export class Miniflare {
const pluginServices = await plugin.getServices({
log: this.#log,
options: workerOpts[key],
optionsVersion,
sharedOptions: sharedOpts[key],
workerBindings,
workerIndex: i,
durableObjectClassNames,
additionalModules,
loopbackPort,
tmpPath: this.#tmpPath,
});
if (pluginServices !== undefined) {
Expand Down
9 changes: 2 additions & 7 deletions packages/tre/src/plugins/cache/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { z } from "zod";
import { Worker_Binding } from "../../runtime";
import { SERVICE_LOOPBACK } from "../core";
import {
BINDING_SERVICE_LOOPBACK,
BINDING_TEXT_PERSIST,
Expand All @@ -9,6 +7,7 @@ import {
HEADER_PERSIST,
PersistenceSchema,
Plugin,
WORKER_BINDING_SERVICE_LOOPBACK,
encodePersist,
} from "../shared";
import { HEADER_CACHE_WARN_USAGE } from "./constants";
Expand Down Expand Up @@ -69,10 +68,6 @@ export const CACHE_PLUGIN: Plugin<
},
getServices({ sharedOptions, options, workerIndex }) {
const persistBinding = encodePersist(sharedOptions.cachePersist);
const loopbackBinding: Worker_Binding = {
name: BINDING_SERVICE_LOOPBACK,
service: { name: SERVICE_LOOPBACK },
};
return [
{
name: getCacheServiceName(workerIndex),
Expand All @@ -87,7 +82,7 @@ export const CACHE_PLUGIN: Plugin<
name: BINDING_JSON_CACHE_WARN_USAGE,
json: JSON.stringify(options.cacheWarnUsage ?? false),
},
loopbackBinding,
WORKER_BINDING_SERVICE_LOOPBACK,
],
compatibilityDate: "2022-09-01",
},
Expand Down
160 changes: 98 additions & 62 deletions packages/tre/src/plugins/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import { getCacheServiceName } from "../cache";
import { DURABLE_OBJECTS_STORAGE_SERVICE_NAME } from "../do";
import {
BINDING_SERVICE_LOOPBACK,
CORE_PLUGIN_NAME,
CloudflareFetchSchema,
HEADER_CF_BLOB,
Plugin,
SERVICE_LOOPBACK,
WORKER_BINDING_SERVICE_LOOPBACK,
matchRoutes,
parseRoutes,
} from "../shared";
import { HEADER_ERROR_STACK } from "./errors";
import {
Expand Down Expand Up @@ -50,6 +55,8 @@ export const CoreOptionsSchema = z.object({
compatibilityDate: z.string().optional(),
compatibilityFlags: z.string().array().optional(),

routes: z.string().array().optional(),

bindings: z.record(JsonSchema).optional(),
wasmBindings: z.record(z.string()).optional(),
textBlobBindings: z.record(z.string()).optional(),
Expand All @@ -74,10 +81,6 @@ export const CoreSharedOptionsSchema = z.object({
liveReload: z.boolean().optional(),
});

export const CORE_PLUGIN_NAME = "core";

// Service looping back to Miniflare's Node.js process (for storage, etc)
export const SERVICE_LOOPBACK = `${CORE_PLUGIN_NAME}:loopback`;
// Service for HTTP socket entrypoint (for checking runtime ready, routing, etc)
export const SERVICE_ENTRY = `${CORE_PLUGIN_NAME}:entry`;
// Service prefix for all regular user workers
Expand All @@ -102,9 +105,11 @@ export const HEADER_CUSTOM_SERVICE = "MF-Custom-Service";
export const HEADER_ORIGINAL_URL = "MF-Original-URL";

const BINDING_JSON_VERSION = "MINIFLARE_VERSION";
const BINDING_SERVICE_USER = "MINIFLARE_USER";
const BINDING_SERVICE_USER_ROUTE_PREFIX = "MINIFLARE_USER_ROUTE_";
const BINDING_SERVICE_USER_FALLBACK = "MINIFLARE_USER_FALLBACK";
const BINDING_TEXT_CUSTOM_SERVICE = "MINIFLARE_CUSTOM_SERVICE";
const BINDING_JSON_CF_BLOB = "CF_BLOB";
const BINDING_JSON_ROUTES = "MINIFLARE_ROUTES";
const BINDING_DATA_LIVE_RELOAD_SCRIPT = "MINIFLARE_LIVE_RELOAD_SCRIPT";

const LIVE_RELOAD_SCRIPT_TEMPLATE = (
Expand All @@ -129,7 +134,10 @@ const LIVE_RELOAD_SCRIPT_TEMPLATE = (

// Using `>=` for version check to handle multiple `setOptions` calls before
// reload complete.
export const SCRIPT_ENTRY = `async function handleEvent(event) {
export const SCRIPT_ENTRY = `
const matchRoutes = ${matchRoutes.toString()};
async function handleEvent(event) {
const probe = event.request.headers.get("${HEADER_PROBE}");
if (probe !== null) {
const probeMin = parseInt(probe);
Expand All @@ -147,11 +155,16 @@ export const SCRIPT_ENTRY = `async function handleEvent(event) {
});
request.headers.delete("${HEADER_ORIGINAL_URL}");
if (globalThis.${BINDING_SERVICE_USER} === undefined) {
let service = globalThis.${BINDING_SERVICE_USER_FALLBACK};
const url = new URL(request.url);
const route = matchRoutes(${BINDING_JSON_ROUTES}, url);
if (route !== null) service = globalThis["${BINDING_SERVICE_USER_ROUTE_PREFIX}" + route];
if (service === undefined) {
return new Response("No entrypoint worker found", { status: 404 });
}
try {
let response = await ${BINDING_SERVICE_USER}.fetch(request);
let response = await service.fetch(request);
if (
response.status === 500 &&
Expand Down Expand Up @@ -319,60 +332,12 @@ export const CORE_PLUGIN: Plugin<
async getServices({
log,
options,
optionsVersion,
workerBindings,
workerIndex,
sharedOptions,
durableObjectClassNames,
additionalModules,
loopbackPort,
}) {
// Define core/shared services.
const loopbackBinding: Worker_Binding = {
name: BINDING_SERVICE_LOOPBACK,
service: { name: SERVICE_LOOPBACK },
};

// Services get de-duped by name, so only the first worker's
// SERVICE_LOOPBACK and SERVICE_ENTRY will be used
const serviceEntryBindings: Worker_Binding[] = [
loopbackBinding, // For converting stack-traces to pretty-error pages
{ name: BINDING_JSON_VERSION, json: optionsVersion.toString() },
{ name: BINDING_JSON_CF_BLOB, json: JSON.stringify(sharedOptions.cf) },
];
if (sharedOptions.liveReload) {
const liveReloadScript = LIVE_RELOAD_SCRIPT_TEMPLATE(loopbackPort);
serviceEntryBindings.push({
name: BINDING_DATA_LIVE_RELOAD_SCRIPT,
data: encoder.encode(liveReloadScript),
});
}
const services: Service[] = [
{
name: SERVICE_LOOPBACK,
external: { http: { cfBlobHeader: HEADER_CF_BLOB } },
},
{
name: SERVICE_ENTRY,
worker: {
serviceWorkerScript: SCRIPT_ENTRY,
compatibilityDate: "2022-09-01",
bindings: serviceEntryBindings,
},
},
// Allow access to private/public addresses:
// https://github.com/cloudflare/miniflare/issues/412
{
name: "internet",
network: {
// Can't use `["public", "private"]` here because of
// https://github.com/cloudflare/workerd/issues/62
allow: ["0.0.0.0/0"],
deny: [],
tlsOptions: { trustBrowserCas: true },
},
},
];
const services: Service[] = [];

// Define regular user worker if script is set
const workerScript = getWorkerScript(options, workerIndex);
Expand Down Expand Up @@ -412,10 +377,13 @@ export const CORE_PLUGIN: Plugin<
cacheApiOutbound: { name: getCacheServiceName(workerIndex) },
},
});
serviceEntryBindings.push({
name: BINDING_SERVICE_USER,
service: { name },
});
} else if (workerIndex === 0 || options.routes?.length) {
throw new MiniflareCoreError(
"ERR_ROUTABLE_NO_SCRIPT",
`Worker [${workerIndex}] ${
options.name === undefined ? "" : `("${options.name}") `
}must have code defined as it's routable or the fallback`
);
}

// Define custom `fetch` services if set
Expand All @@ -433,7 +401,7 @@ export const CORE_PLUGIN: Plugin<
name: BINDING_TEXT_CUSTOM_SERVICE,
text: `${workerIndex}/${name}`,
},
loopbackBinding,
WORKER_BINDING_SERVICE_LOOPBACK,
],
},
});
Expand All @@ -451,6 +419,74 @@ export const CORE_PLUGIN: Plugin<
},
};

export interface GlobalServicesOptions {
optionsVersion: number;
sharedOptions: z.infer<typeof CoreSharedOptionsSchema>;
allWorkerRoutes: Map<string, string[]>;
fallbackWorkerName: string | undefined;
loopbackPort: number;
}
export function getGlobalServices({
optionsVersion,
sharedOptions,
allWorkerRoutes,
fallbackWorkerName,
loopbackPort,
}: GlobalServicesOptions): Service[] {
// Collect list of workers we could route to, then parse and sort all routes
const routableWorkers = [...allWorkerRoutes.keys()];
const routes = parseRoutes(allWorkerRoutes);

// Define core/shared services.
const serviceEntryBindings: Worker_Binding[] = [
WORKER_BINDING_SERVICE_LOOPBACK, // For converting stack-traces to pretty-error pages
{ name: BINDING_JSON_VERSION, json: optionsVersion.toString() },
{ name: BINDING_JSON_ROUTES, json: JSON.stringify(routes) },
{ name: BINDING_JSON_CF_BLOB, json: JSON.stringify(sharedOptions.cf) },
{
name: BINDING_SERVICE_USER_FALLBACK,
service: { name: getUserServiceName(fallbackWorkerName) },
},
...routableWorkers.map((name) => ({
name: BINDING_SERVICE_USER_ROUTE_PREFIX + name,
service: { name: getUserServiceName(name) },
})),
];
if (sharedOptions.liveReload) {
const liveReloadScript = LIVE_RELOAD_SCRIPT_TEMPLATE(loopbackPort);
serviceEntryBindings.push({
name: BINDING_DATA_LIVE_RELOAD_SCRIPT,
data: encoder.encode(liveReloadScript),
});
}
return [
{
name: SERVICE_LOOPBACK,
external: { http: { cfBlobHeader: HEADER_CF_BLOB } },
},
{
name: SERVICE_ENTRY,
worker: {
serviceWorkerScript: SCRIPT_ENTRY,
compatibilityDate: "2022-09-01",
bindings: serviceEntryBindings,
},
},
// Allow access to private/public addresses:
// https://github.com/cloudflare/miniflare/issues/412
{
name: "internet",
network: {
// Can't use `["public", "private"]` here because of
// https://github.com/cloudflare/workerd/issues/62
allow: ["0.0.0.0/0"],
deny: [],
tlsOptions: { trustBrowserCas: true },
},
},
];
}

function getWorkerScript(
options: z.infer<typeof CoreOptionsSchema>,
workerIndex: number
Expand Down
Loading

0 comments on commit 9229b30

Please sign in to comment.