Skip to content

Commit

Permalink
Workflows local dev (#7045)
Browse files Browse the repository at this point in the history
* create workflows-shared package
with initial implementation for local dev

* update workflows-shared implementation

* plumb into miniflare

* plumb into wrangler

* fixup fixture

* fix plugin

* fixture tests

* add ownership

* Fix types

* Reduce sleep

* fixup

* Update packages/workflows-shared/package.json

Co-authored-by: Carmen Popoviciu <cpopoviciu@cloudflare.com>

* Clean up config files

* Update README for MiniflareOptions, use constants in plugin

* Apply suggestions from code review

* Update packages/workflows-shared/package.json

* Review comments

* Fix CI

* Add changeset

* add noop deploy script

* update snapshots

* fix type checks

* fix lints

* fix lint

* run prettier

---------

Co-authored-by: Samuel Macleod <smacleod@cloudflare.com>
Co-authored-by: Sid Chatterjee <sid@cloudflare.com>
Co-authored-by: Carmen Popoviciu <cpopoviciu@cloudflare.com>
  • Loading branch information
4 people authored Oct 24, 2024
1 parent 80e5bc6 commit 5ef6231
Show file tree
Hide file tree
Showing 39 changed files with 2,016 additions and 44 deletions.
7 changes: 7 additions & 0 deletions .changeset/sour-frogs-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@cloudflare/workflows-shared": patch
"miniflare": patch
"wrangler": patch
---

Add preliminary support for Workflows in wrangler dev
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
# kv-asset-handler ownership
/packages/kv-asset-handler/ @cloudflare/wrangler

# Workflows ownership
/packages/workflows-shared/ @cloudflare/workflows @cloudflare/wrangler

# Owners intentionally left blank on these shared directories / files
# to avoid noisy review requests
/.changeset/
Expand Down
4 changes: 3 additions & 1 deletion fixtures/workflow/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
"private": true,
"scripts": {
"deploy": "wrangler deploy",
"start": "wrangler dev --x-dev-env"
"start": "wrangler dev --x-dev-env",
"test:ci": "vitest"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20241022.0",
"undici": "^6.20.1",
"wrangler": "workspace:*"
},
"volta": {
Expand Down
45 changes: 25 additions & 20 deletions fixtures/workflow/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,55 @@
import {
WorkerEntrypoint,
Workflow,
WorkflowEntrypoint,
WorkflowEvent,
WorkflowStep,
} from "cloudflare:workers";

type Params = {
name: string;
};
export class Demo extends Workflow<{}, Params> {
async run(events: Array<WorkflowEvent<Params>>, step: WorkflowStep) {
const { timestamp, payload } = events[0];

export class Demo extends WorkflowEntrypoint<{}, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const { timestamp, payload } = event;

const result = await step.do("First step", async function () {
return {
output: "First step result",
};
});

await step.sleep("Wait", "1 minute");
await step.sleep("Wait", "1 second");

const result2 = await step.do("Second step", async function () {
return {
output: "Second step result",
};
});

return {
result,
result2,
timestamp,
payload,
};
return [result, result2, timestamp, payload];
}
}

type Env = {
WORKFLOW: {
create: (id: string) => {
pause: () => {};
};
};
WORKFLOW: Workflow;
};
export default class extends WorkerEntrypoint<Env> {
async fetch() {
const handle = await this.env.WORKFLOW.create(crypto.randomUUID());
await handle.pause();
return new Response();
async fetch(req: Request) {
const url = new URL(req.url);
const id = url.searchParams.get("workflowName");

if (url.pathname === "/favicon.ico") {
return new Response(null, { status: 404 });
}

let handle: WorkflowInstance;
if (url.pathname === "/create") {
handle = await this.env.WORKFLOW.create({ id });
} else {
handle = await this.env.WORKFLOW.get(id);
}

return Response.json(await handle.status());
}
}
78 changes: 78 additions & 0 deletions fixtures/workflow/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { resolve } from "path";
import { fetch } from "undici";
import { afterAll, beforeAll, describe, it, vi } from "vitest";
import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived";

describe("Workflows", () => {
let ip: string,
port: number,
stop: (() => Promise<unknown>) | undefined,
getOutput: () => string;

beforeAll(async () => {
({ ip, port, stop, getOutput } = await runWranglerDev(
resolve(__dirname, ".."),
[
"--port=0",
"--inspector-port=0",
"--upstream-protocol=https",
"--host=prod.example.org",
]
));
});

afterAll(async () => {
await stop?.();
});

async function fetchJson(url: string) {
const response = await fetch(url, {
headers: {
"MF-Disable-Pretty-Error": "1",
},
});
const text = await response.text();

try {
return JSON.parse(text);
} catch (err) {
throw new Error(`Couldn't parse JSON:\n\n${text}`);
}
}

it("creates a workflow", async ({ expect }) => {
await expect(
fetchJson(`http://${ip}:${port}/create?workflowName=test`)
).resolves.toEqual({
status: "running",
output: [],
});

await vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=test`)
).resolves.toEqual({
status: "running",
output: [{ output: "First step result" }],
});
},
{ timeout: 5000 }
);

await vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=test`)
).resolves.toEqual({
status: "complete",
output: [
{ output: "First step result" },
{ output: "Second step result" },
],
});
},
{ timeout: 5000 }
);
});
});
7 changes: 7 additions & 0 deletions fixtures/workflow/tests/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extends": "@cloudflare/workers-tsconfig/tsconfig.json",
"compilerOptions": {
"types": ["node"]
},
"include": ["**/*.ts", "../../../node-types.d.ts"]
}
16 changes: 8 additions & 8 deletions fixtures/workflow/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"compilerOptions": {
"target": "es2021",
"lib": ["es2021"],
"module": "es2022",
"types": ["@cloudflare/workers-types/experimental"],
"target": "ES2020",
"module": "CommonJS",
"lib": ["ES2020"],
"types": ["@cloudflare/workers-types"],
"moduleResolution": "node",
"noEmit": true,
"isolatedModules": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"skipLibCheck": true
}
},
"include": ["**/*.ts"],
"exclude": ["tests"]
}
9 changes: 9 additions & 0 deletions fixtures/workflow/vitest.config.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { defineProject, mergeConfig } from "vitest/config";
import configShared from "../../vitest.shared";

export default mergeConfig(
configShared,
defineProject({
test: {},
})
);
2 changes: 1 addition & 1 deletion fixtures/workflow/wrangler.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#:schema node_modules/wrangler/config-schema.json
name = "my-workflow-demo"
main = "src/index.ts"
compatibility_date = "2024-10-11"
compatibility_date = "2024-10-22"

[[workflows]]
binding = "WORKFLOW"
Expand Down
23 changes: 23 additions & 0 deletions packages/miniflare/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ Options for an individual Worker/"nanoservice". All bindings are accessible on
the global scope in service-worker format Workers, or via the 2nd `env`
parameter in module format Workers.

### `interface WorkflowOptions`

- `name: string`

The name of the Workflow.

- `className: string`

The name of the class exported from the Worker that implements the `WorkflowEntrypoint`.

- `scriptName?`: string

The name of the script that includes the `WorkflowEntrypoint`. This is optional because it defaults to the current script if not set.

#### Core

- `name?: string`
Expand Down Expand Up @@ -585,6 +599,11 @@ parameter in module format Workers.
- `assetOptions?: { html_handling?: HTMLHandlingOptions, not_found_handling?: NotFoundHandlingOptions}`
Configuration for file-based asset routing - see [docs](https://developers.cloudflare.com/workers/static-assets/routing/#routing-configuration) for options
#### Workflows
- `workflows?: WorkflowOptions[]`
Configuration for one or more Workflows in your project.
#### Analytics Engine, Sending Email, Vectorize and Workers for Platforms
_Not yet supported_
Expand Down Expand Up @@ -725,6 +744,10 @@ Options shared between all Workers/"nanoservices".

Where to persist data stored in D1 databases. See docs for `Persistence`.

- `workflowsPersist?: Persistence`

Where to persist data stored in Workflows. See docs for `Persistence`.

#### Analytics Engine, Browser Rendering, Sending Email, Vectorize, Workers AI and Workers for Platforms

_Not yet supported_
Expand Down
2 changes: 2 additions & 0 deletions packages/miniflare/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"@cloudflare/kv-asset-handler": "workspace:*",
"@cloudflare/workers-shared": "workspace:*",
"@cloudflare/workers-types": "^4.20241022.0",
"@cloudflare/workflows-shared": "workspace:*",
"@microsoft/api-extractor": "^7.47.0",
"@types/debug": "^4.1.7",
"@types/estree": "^1.0.0",
Expand All @@ -85,6 +86,7 @@
"eslint-plugin-import": "2.26.x",
"eslint-plugin-prettier": "^5.0.1",
"expect-type": "^0.15.0",
"heap-js": "^2.5.0",
"http-cache-semantics": "^4.1.0",
"kleur": "^4.1.5",
"mime": "^3.0.0",
Expand Down
13 changes: 11 additions & 2 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1131,9 +1131,10 @@ export class Miniflare {
innerBindings: Worker_Binding[];
}[] = [];

// This will be the user worker or the vitest pool worker wrapping the user worker
// The asset plugin needs this so that it can set the binding between the router worker and the user worker
if (this.#workerOpts[0].assets.assets) {
// This will be the UserWorker, or the vitest pool worker wrapping the UserWorker
// The asset plugin needs this so that it can set the binding between the RouterWorker and the UserWorker
// TODO: apply this to ever this.#workerOpts, not just the first (i.e this.#workerOpts[0])
this.#workerOpts[0].assets.assets.workerName =
this.#workerOpts[0].core.name;
}
Expand All @@ -1144,6 +1145,14 @@ export class Miniflare {
const workerName = workerOpts.core.name ?? "";
const isModulesWorker = Boolean(workerOpts.core.modules);

if (workerOpts.workflows.workflows) {
for (const workflow of Object.values(workerOpts.workflows.workflows)) {
// This will be the UserWorker, or the vitest pool worker wrapping the UserWorker
// The workflows plugin needs this so that it can set the binding between the Engine and the UserWorker
workflow.scriptName ??= workerOpts.core.name;
}
}

// Collect all bindings from this worker
const workerBindings: Worker_Binding[] = [];
allWorkerBindings.set(workerName, workerBindings);
Expand Down
9 changes: 7 additions & 2 deletions packages/miniflare/src/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { KV_PLUGIN, KV_PLUGIN_NAME } from "./kv";
import { QUEUES_PLUGIN, QUEUES_PLUGIN_NAME } from "./queues";
import { R2_PLUGIN, R2_PLUGIN_NAME } from "./r2";
import { RATELIMIT_PLUGIN, RATELIMIT_PLUGIN_NAME } from "./ratelimit";
import { WORKFLOWS_PLUGIN, WORKFLOWS_PLUGIN_NAME } from "./workflows";

export const PLUGINS = {
[CORE_PLUGIN_NAME]: CORE_PLUGIN,
Expand All @@ -23,6 +24,7 @@ export const PLUGINS = {
[HYPERDRIVE_PLUGIN_NAME]: HYPERDRIVE_PLUGIN,
[RATELIMIT_PLUGIN_NAME]: RATELIMIT_PLUGIN,
[ASSETS_PLUGIN_NAME]: ASSETS_PLUGIN,
[WORKFLOWS_PLUGIN_NAME]: WORKFLOWS_PLUGIN,
};
export type Plugins = typeof PLUGINS;

Expand Down Expand Up @@ -70,13 +72,15 @@ export type WorkerOptions = z.input<typeof CORE_PLUGIN.options> &
z.input<typeof R2_PLUGIN.options> &
z.input<typeof HYPERDRIVE_PLUGIN.options> &
z.input<typeof RATELIMIT_PLUGIN.options> &
z.input<typeof ASSETS_PLUGIN.options>;
z.input<typeof ASSETS_PLUGIN.options> &
z.input<typeof WORKFLOWS_PLUGIN.options>;
export type SharedOptions = z.input<typeof CORE_PLUGIN.sharedOptions> &
z.input<typeof CACHE_PLUGIN.sharedOptions> &
z.input<typeof D1_PLUGIN.sharedOptions> &
z.input<typeof DURABLE_OBJECTS_PLUGIN.sharedOptions> &
z.input<typeof KV_PLUGIN.sharedOptions> &
z.input<typeof R2_PLUGIN.sharedOptions>;
z.input<typeof R2_PLUGIN.sharedOptions> &
z.input<typeof WORKFLOWS_PLUGIN.sharedOptions>;

export const PLUGIN_ENTRIES = Object.entries(PLUGINS) as [
keyof Plugins,
Expand Down Expand Up @@ -124,3 +128,4 @@ export * from "./hyperdrive";
export * from "./ratelimit";
export * from "./assets";
export * from "./assets/schema";
export * from "./workflows";
Loading

0 comments on commit 5ef6231

Please sign in to comment.