From 5ef6231a5cefbaaef123e6e8ee899fb81fc69e3e Mon Sep 17 00:00:00 2001 From: Rahul Sethi <5822355+RamIdeas@users.noreply.github.com> Date: Thu, 24 Oct 2024 08:42:44 +0100 Subject: [PATCH] Workflows local dev (#7045) * create workflows-shared package with initial implementation for local dev * update workflows-shared implementation * plumb into miniflare * plumb into wrangler * fixup fixture * fix plugin * fixture tests * add ownership * Fix types * Reduce sleep * fixup * Update packages/workflows-shared/package.json Co-authored-by: Carmen Popoviciu * Clean up config files * Update README for MiniflareOptions, use constants in plugin * Apply suggestions from code review * Update packages/workflows-shared/package.json * Review comments * Fix CI * Add changeset * add noop deploy script * update snapshots * fix type checks * fix lints * fix lint * run prettier --------- Co-authored-by: Samuel Macleod Co-authored-by: Sid Chatterjee Co-authored-by: Carmen Popoviciu --- .changeset/sour-frogs-jam.md | 7 + CODEOWNERS | 3 + fixtures/workflow/package.json | 4 +- fixtures/workflow/src/index.ts | 45 +- fixtures/workflow/tests/index.test.ts | 78 +++ fixtures/workflow/tests/tsconfig.json | 7 + fixtures/workflow/tsconfig.json | 16 +- fixtures/workflow/vitest.config.mts | 9 + fixtures/workflow/wrangler.toml | 2 +- packages/miniflare/README.md | 23 + packages/miniflare/package.json | 2 + packages/miniflare/src/index.ts | 13 +- packages/miniflare/src/plugins/index.ts | 9 +- .../miniflare/src/plugins/workflows/index.ts | 125 ++++ .../src/workers/workflows/binding.worker.ts | 6 + packages/workflows-shared/.eslintrc.js | 5 + packages/workflows-shared/README.md | 7 + packages/workflows-shared/package.json | 61 ++ packages/workflows-shared/src/binding.ts | 86 +++ packages/workflows-shared/src/context.ts | 537 ++++++++++++++++++ packages/workflows-shared/src/engine.ts | 256 +++++++++ packages/workflows-shared/src/index.ts | 4 + packages/workflows-shared/src/instance.ts | 196 +++++++ packages/workflows-shared/src/lib/cache.ts | 10 + packages/workflows-shared/src/lib/errors.ts | 18 + .../src/lib/gracePeriodSemaphore.ts | 87 +++ packages/workflows-shared/src/lib/retries.ts | 26 + .../src/lib/timePriorityQueue.ts | 267 +++++++++ .../workflows-shared/src/lib/validators.ts | 12 + .../src/local-binding-worker.ts | 2 + packages/workflows-shared/tsconfig.json | 18 + packages/workflows-shared/turbo.json | 10 + .../src/api/integrations/platform/index.ts | 8 +- .../wrangler/src/api/startDevWorker/types.ts | 9 +- .../wrangler/src/api/startDevWorker/utils.ts | 3 + packages/wrangler/src/dev/miniflare.ts | 16 + pnpm-lock.yaml | 71 ++- .../__tests__/deploy-non-npm-packages.test.ts | 1 + .../__tests__/validate-changesets.test.ts | 1 + 39 files changed, 2016 insertions(+), 44 deletions(-) create mode 100644 .changeset/sour-frogs-jam.md create mode 100644 fixtures/workflow/tests/index.test.ts create mode 100644 fixtures/workflow/tests/tsconfig.json create mode 100644 fixtures/workflow/vitest.config.mts create mode 100644 packages/miniflare/src/plugins/workflows/index.ts create mode 100644 packages/miniflare/src/workers/workflows/binding.worker.ts create mode 100644 packages/workflows-shared/.eslintrc.js create mode 100644 packages/workflows-shared/README.md create mode 100644 packages/workflows-shared/package.json create mode 100644 packages/workflows-shared/src/binding.ts create mode 100644 packages/workflows-shared/src/context.ts create mode 100644 packages/workflows-shared/src/engine.ts create mode 100644 packages/workflows-shared/src/index.ts create mode 100644 packages/workflows-shared/src/instance.ts create mode 100644 packages/workflows-shared/src/lib/cache.ts create mode 100644 packages/workflows-shared/src/lib/errors.ts create mode 100644 packages/workflows-shared/src/lib/gracePeriodSemaphore.ts create mode 100644 packages/workflows-shared/src/lib/retries.ts create mode 100644 packages/workflows-shared/src/lib/timePriorityQueue.ts create mode 100644 packages/workflows-shared/src/lib/validators.ts create mode 100644 packages/workflows-shared/src/local-binding-worker.ts create mode 100644 packages/workflows-shared/tsconfig.json create mode 100644 packages/workflows-shared/turbo.json diff --git a/.changeset/sour-frogs-jam.md b/.changeset/sour-frogs-jam.md new file mode 100644 index 000000000000..a5ba506ed4f8 --- /dev/null +++ b/.changeset/sour-frogs-jam.md @@ -0,0 +1,7 @@ +--- +"@cloudflare/workflows-shared": patch +"miniflare": patch +"wrangler": patch +--- + +Add preliminary support for Workflows in wrangler dev diff --git a/CODEOWNERS b/CODEOWNERS index aacdfd5a014c..947acc0410ca 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -29,6 +29,9 @@ # kv-asset-handler ownership /packages/kv-asset-handler/ @cloudflare/wrangler +# Workflows ownership +/packages/workflows-shared/ @cloudflare/workflows @cloudflare/wrangler + # Owners intentionally left blank on these shared directories / files # to avoid noisy review requests /.changeset/ diff --git a/fixtures/workflow/package.json b/fixtures/workflow/package.json index 9a66e5c30b01..c2d6a1a5d37b 100644 --- a/fixtures/workflow/package.json +++ b/fixtures/workflow/package.json @@ -3,10 +3,12 @@ "private": true, "scripts": { "deploy": "wrangler deploy", - "start": "wrangler dev --x-dev-env" + "start": "wrangler dev --x-dev-env", + "test:ci": "vitest" }, "devDependencies": { "@cloudflare/workers-types": "^4.20241022.0", + "undici": "^6.20.1", "wrangler": "workspace:*" }, "volta": { diff --git a/fixtures/workflow/src/index.ts b/fixtures/workflow/src/index.ts index 2326cf0b78d1..c83f28c43453 100644 --- a/fixtures/workflow/src/index.ts +++ b/fixtures/workflow/src/index.ts @@ -1,6 +1,6 @@ import { WorkerEntrypoint, - Workflow, + WorkflowEntrypoint, WorkflowEvent, WorkflowStep, } from "cloudflare:workers"; @@ -8,16 +8,18 @@ import { type Params = { name: string; }; -export class Demo extends Workflow<{}, Params> { - async run(events: Array>, step: WorkflowStep) { - const { timestamp, payload } = events[0]; + +export class Demo extends WorkflowEntrypoint<{}, Params> { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { timestamp, payload } = event; + const result = await step.do("First step", async function () { return { output: "First step result", }; }); - await step.sleep("Wait", "1 minute"); + await step.sleep("Wait", "1 second"); const result2 = await step.do("Second step", async function () { return { @@ -25,26 +27,29 @@ export class Demo extends Workflow<{}, Params> { }; }); - return { - result, - result2, - timestamp, - payload, - }; + return [result, result2, timestamp, payload]; } } type Env = { - WORKFLOW: { - create: (id: string) => { - pause: () => {}; - }; - }; + WORKFLOW: Workflow; }; export default class extends WorkerEntrypoint { - async fetch() { - const handle = await this.env.WORKFLOW.create(crypto.randomUUID()); - await handle.pause(); - return new Response(); + async fetch(req: Request) { + const url = new URL(req.url); + const id = url.searchParams.get("workflowName"); + + if (url.pathname === "/favicon.ico") { + return new Response(null, { status: 404 }); + } + + let handle: WorkflowInstance; + if (url.pathname === "/create") { + handle = await this.env.WORKFLOW.create({ id }); + } else { + handle = await this.env.WORKFLOW.get(id); + } + + return Response.json(await handle.status()); } } diff --git a/fixtures/workflow/tests/index.test.ts b/fixtures/workflow/tests/index.test.ts new file mode 100644 index 000000000000..025580ca474e --- /dev/null +++ b/fixtures/workflow/tests/index.test.ts @@ -0,0 +1,78 @@ +import { resolve } from "path"; +import { fetch } from "undici"; +import { afterAll, beforeAll, describe, it, vi } from "vitest"; +import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived"; + +describe("Workflows", () => { + let ip: string, + port: number, + stop: (() => Promise) | undefined, + getOutput: () => string; + + beforeAll(async () => { + ({ ip, port, stop, getOutput } = await runWranglerDev( + resolve(__dirname, ".."), + [ + "--port=0", + "--inspector-port=0", + "--upstream-protocol=https", + "--host=prod.example.org", + ] + )); + }); + + afterAll(async () => { + await stop?.(); + }); + + async function fetchJson(url: string) { + const response = await fetch(url, { + headers: { + "MF-Disable-Pretty-Error": "1", + }, + }); + const text = await response.text(); + + try { + return JSON.parse(text); + } catch (err) { + throw new Error(`Couldn't parse JSON:\n\n${text}`); + } + } + + it("creates a workflow", async ({ expect }) => { + await expect( + fetchJson(`http://${ip}:${port}/create?workflowName=test`) + ).resolves.toEqual({ + status: "running", + output: [], + }); + + await vi.waitFor( + async () => { + await expect( + fetchJson(`http://${ip}:${port}/status?workflowName=test`) + ).resolves.toEqual({ + status: "running", + output: [{ output: "First step result" }], + }); + }, + { timeout: 5000 } + ); + + await vi.waitFor( + async () => { + await expect( + fetchJson(`http://${ip}:${port}/status?workflowName=test`) + ).resolves.toEqual({ + status: "complete", + output: [ + { output: "First step result" }, + { output: "Second step result" }, + ], + }); + }, + { timeout: 5000 } + ); + }); +}); diff --git a/fixtures/workflow/tests/tsconfig.json b/fixtures/workflow/tests/tsconfig.json new file mode 100644 index 000000000000..d2ce7f144694 --- /dev/null +++ b/fixtures/workflow/tests/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "@cloudflare/workers-tsconfig/tsconfig.json", + "compilerOptions": { + "types": ["node"] + }, + "include": ["**/*.ts", "../../../node-types.d.ts"] +} diff --git a/fixtures/workflow/tsconfig.json b/fixtures/workflow/tsconfig.json index 2431bac6e945..856398634a5e 100644 --- a/fixtures/workflow/tsconfig.json +++ b/fixtures/workflow/tsconfig.json @@ -1,13 +1,13 @@ { "compilerOptions": { - "target": "es2021", - "lib": ["es2021"], - "module": "es2022", - "types": ["@cloudflare/workers-types/experimental"], + "target": "ES2020", + "module": "CommonJS", + "lib": ["ES2020"], + "types": ["@cloudflare/workers-types"], + "moduleResolution": "node", "noEmit": true, - "isolatedModules": true, - "forceConsistentCasingInFileNames": true, - "strict": true, "skipLibCheck": true - } + }, + "include": ["**/*.ts"], + "exclude": ["tests"] } diff --git a/fixtures/workflow/vitest.config.mts b/fixtures/workflow/vitest.config.mts new file mode 100644 index 000000000000..846cddc41995 --- /dev/null +++ b/fixtures/workflow/vitest.config.mts @@ -0,0 +1,9 @@ +import { defineProject, mergeConfig } from "vitest/config"; +import configShared from "../../vitest.shared"; + +export default mergeConfig( + configShared, + defineProject({ + test: {}, + }) +); diff --git a/fixtures/workflow/wrangler.toml b/fixtures/workflow/wrangler.toml index 6cf6d4593655..592fc9805420 100644 --- a/fixtures/workflow/wrangler.toml +++ b/fixtures/workflow/wrangler.toml @@ -1,7 +1,7 @@ #:schema node_modules/wrangler/config-schema.json name = "my-workflow-demo" main = "src/index.ts" -compatibility_date = "2024-10-11" +compatibility_date = "2024-10-22" [[workflows]] binding = "WORKFLOW" diff --git a/packages/miniflare/README.md b/packages/miniflare/README.md index 67726c34c58d..3a1ee8106c18 100644 --- a/packages/miniflare/README.md +++ b/packages/miniflare/README.md @@ -214,6 +214,20 @@ Options for an individual Worker/"nanoservice". All bindings are accessible on the global scope in service-worker format Workers, or via the 2nd `env` parameter in module format Workers. +### `interface WorkflowOptions` + +- `name: string` + + The name of the Workflow. + +- `className: string` + + The name of the class exported from the Worker that implements the `WorkflowEntrypoint`. + +- `scriptName?`: string + + The name of the script that includes the `WorkflowEntrypoint`. This is optional because it defaults to the current script if not set. + #### Core - `name?: string` @@ -585,6 +599,11 @@ parameter in module format Workers. - `assetOptions?: { html_handling?: HTMLHandlingOptions, not_found_handling?: NotFoundHandlingOptions}` Configuration for file-based asset routing - see [docs](https://developers.cloudflare.com/workers/static-assets/routing/#routing-configuration) for options +#### Workflows + +- `workflows?: WorkflowOptions[]` + Configuration for one or more Workflows in your project. + #### Analytics Engine, Sending Email, Vectorize and Workers for Platforms _Not yet supported_ @@ -725,6 +744,10 @@ Options shared between all Workers/"nanoservices". Where to persist data stored in D1 databases. See docs for `Persistence`. +- `workflowsPersist?: Persistence` + +Where to persist data stored in Workflows. See docs for `Persistence`. + #### Analytics Engine, Browser Rendering, Sending Email, Vectorize, Workers AI and Workers for Platforms _Not yet supported_ diff --git a/packages/miniflare/package.json b/packages/miniflare/package.json index 057cf328946e..61fc4442575e 100644 --- a/packages/miniflare/package.json +++ b/packages/miniflare/package.json @@ -60,6 +60,7 @@ "@cloudflare/kv-asset-handler": "workspace:*", "@cloudflare/workers-shared": "workspace:*", "@cloudflare/workers-types": "^4.20241022.0", + "@cloudflare/workflows-shared": "workspace:*", "@microsoft/api-extractor": "^7.47.0", "@types/debug": "^4.1.7", "@types/estree": "^1.0.0", @@ -85,6 +86,7 @@ "eslint-plugin-import": "2.26.x", "eslint-plugin-prettier": "^5.0.1", "expect-type": "^0.15.0", + "heap-js": "^2.5.0", "http-cache-semantics": "^4.1.0", "kleur": "^4.1.5", "mime": "^3.0.0", diff --git a/packages/miniflare/src/index.ts b/packages/miniflare/src/index.ts index ffd2d52a8121..2c5987a4c216 100644 --- a/packages/miniflare/src/index.ts +++ b/packages/miniflare/src/index.ts @@ -1131,9 +1131,10 @@ export class Miniflare { innerBindings: Worker_Binding[]; }[] = []; - // This will be the user worker or the vitest pool worker wrapping the user worker - // The asset plugin needs this so that it can set the binding between the router worker and the user worker if (this.#workerOpts[0].assets.assets) { + // This will be the UserWorker, or the vitest pool worker wrapping the UserWorker + // The asset plugin needs this so that it can set the binding between the RouterWorker and the UserWorker + // TODO: apply this to ever this.#workerOpts, not just the first (i.e this.#workerOpts[0]) this.#workerOpts[0].assets.assets.workerName = this.#workerOpts[0].core.name; } @@ -1144,6 +1145,14 @@ export class Miniflare { const workerName = workerOpts.core.name ?? ""; const isModulesWorker = Boolean(workerOpts.core.modules); + if (workerOpts.workflows.workflows) { + for (const workflow of Object.values(workerOpts.workflows.workflows)) { + // This will be the UserWorker, or the vitest pool worker wrapping the UserWorker + // The workflows plugin needs this so that it can set the binding between the Engine and the UserWorker + workflow.scriptName ??= workerOpts.core.name; + } + } + // Collect all bindings from this worker const workerBindings: Worker_Binding[] = []; allWorkerBindings.set(workerName, workerBindings); diff --git a/packages/miniflare/src/plugins/index.ts b/packages/miniflare/src/plugins/index.ts index 7175d6ebe9b0..05a4d6a3c63f 100644 --- a/packages/miniflare/src/plugins/index.ts +++ b/packages/miniflare/src/plugins/index.ts @@ -11,6 +11,7 @@ import { KV_PLUGIN, KV_PLUGIN_NAME } from "./kv"; import { QUEUES_PLUGIN, QUEUES_PLUGIN_NAME } from "./queues"; import { R2_PLUGIN, R2_PLUGIN_NAME } from "./r2"; import { RATELIMIT_PLUGIN, RATELIMIT_PLUGIN_NAME } from "./ratelimit"; +import { WORKFLOWS_PLUGIN, WORKFLOWS_PLUGIN_NAME } from "./workflows"; export const PLUGINS = { [CORE_PLUGIN_NAME]: CORE_PLUGIN, @@ -23,6 +24,7 @@ export const PLUGINS = { [HYPERDRIVE_PLUGIN_NAME]: HYPERDRIVE_PLUGIN, [RATELIMIT_PLUGIN_NAME]: RATELIMIT_PLUGIN, [ASSETS_PLUGIN_NAME]: ASSETS_PLUGIN, + [WORKFLOWS_PLUGIN_NAME]: WORKFLOWS_PLUGIN, }; export type Plugins = typeof PLUGINS; @@ -70,13 +72,15 @@ export type WorkerOptions = z.input & z.input & z.input & z.input & - z.input; + z.input & + z.input; export type SharedOptions = z.input & z.input & z.input & z.input & z.input & - z.input; + z.input & + z.input; export const PLUGIN_ENTRIES = Object.entries(PLUGINS) as [ keyof Plugins, @@ -124,3 +128,4 @@ export * from "./hyperdrive"; export * from "./ratelimit"; export * from "./assets"; export * from "./assets/schema"; +export * from "./workflows"; diff --git a/packages/miniflare/src/plugins/workflows/index.ts b/packages/miniflare/src/plugins/workflows/index.ts new file mode 100644 index 000000000000..edd08d2e0be1 --- /dev/null +++ b/packages/miniflare/src/plugins/workflows/index.ts @@ -0,0 +1,125 @@ +import fs from "fs/promises"; +import SCRIPT_WORKFLOWS_BINDING from "worker:workflows/binding"; +import { z } from "zod"; +import { Service } from "../../runtime"; +import { getUserServiceName } from "../core"; +import { + getPersistPath, + PersistenceSchema, + Plugin, + ProxyNodeBinding, +} from "../shared"; + +export const WorkflowsOptionsSchema = z.object({ + workflows: z + .record( + z.object({ + name: z.string(), + className: z.string(), + scriptName: z.string().optional(), + }) + ) + .optional(), +}); +export const WorkflowsSharedOptionsSchema = z.object({ + workflowsPersist: PersistenceSchema, +}); + +export const WORKFLOWS_PLUGIN_NAME = "workflows"; +export const WORKFLOWS_STORAGE_SERVICE_NAME = `${WORKFLOWS_PLUGIN_NAME}:storage`; + +export const WORKFLOWS_PLUGIN: Plugin< + typeof WorkflowsOptionsSchema, + typeof WorkflowsSharedOptionsSchema +> = { + options: WorkflowsOptionsSchema, + sharedOptions: WorkflowsSharedOptionsSchema, + async getBindings(options: z.infer) { + return Object.entries(options.workflows ?? {}).map( + ([bindingName, workflow]) => ({ + name: bindingName, + service: { + name: `${WORKFLOWS_PLUGIN_NAME}:${workflow.name}`, + entrypoint: "WorkflowBinding", + }, + }) + ); + }, + + async getNodeBindings(options) { + return Object.fromEntries( + Object.keys(options.workflows ?? {}).map((bindingName) => [ + bindingName, + new ProxyNodeBinding(), + ]) + ); + }, + + async getServices({ options, sharedOptions, tmpPath }) { + const persistPath = getPersistPath( + WORKFLOWS_PLUGIN_NAME, + tmpPath, + sharedOptions.workflowsPersist + ); + await fs.mkdir(persistPath, { recursive: true }); + const storageService: Service = { + name: WORKFLOWS_STORAGE_SERVICE_NAME, + disk: { path: persistPath, writable: true }, + }; + + // this creates one miniflare service per workflow that the user's script has. we should dedupe engine definition later + const services = Object.entries(options.workflows ?? {}).map( + ([_bindingName, workflow]) => { + const uniqueKey = `miniflare-workflows`; + + const workflowsBinding: Service = { + name: `${WORKFLOWS_PLUGIN_NAME}:${workflow.name}`, + worker: { + compatibilityDate: "2024-10-22", + modules: [ + { + name: "workflows.mjs", + esModule: SCRIPT_WORKFLOWS_BINDING(), + }, + ], + durableObjectNamespaces: [ + { + className: "Engine", + enableSql: true, + uniqueKey, + preventEviction: true, + }, + ], + // this might conflict between workflows + durableObjectStorage: { localDisk: WORKFLOWS_STORAGE_SERVICE_NAME }, + bindings: [ + { + name: "ENGINE", + durableObjectNamespace: { className: "Engine" }, + }, + { + name: "USER_WORKFLOW", + service: { + name: getUserServiceName(workflow.scriptName), + entrypoint: workflow.className, + }, + }, + ], + }, + }; + + return workflowsBinding; + } + ); + + if (services.length === 0) { + return []; + } + + return [storageService, ...services]; + }, + + getPersistPath({ workflowsPersist }, tmpPath) { + return getPersistPath(WORKFLOWS_PLUGIN_NAME, tmpPath, workflowsPersist); + }, +}; diff --git a/packages/miniflare/src/workers/workflows/binding.worker.ts b/packages/miniflare/src/workers/workflows/binding.worker.ts new file mode 100644 index 000000000000..d7d061cedfbe --- /dev/null +++ b/packages/miniflare/src/workers/workflows/binding.worker.ts @@ -0,0 +1,6 @@ +// Simply re-export both entrypoints so that it gets compiled into the Miniflare code base. +// This allows us to have it as a devDependency only. +export { + WorkflowBinding, + Engine, +} from "@cloudflare/workflows-shared/src/local-binding-worker"; diff --git a/packages/workflows-shared/.eslintrc.js b/packages/workflows-shared/.eslintrc.js new file mode 100644 index 000000000000..a56877269a90 --- /dev/null +++ b/packages/workflows-shared/.eslintrc.js @@ -0,0 +1,5 @@ +module.exports = { + root: true, + extends: ["@cloudflare/eslint-config-worker"], + ignorePatterns: ["dist"], +}; diff --git a/packages/workflows-shared/README.md b/packages/workflows-shared/README.md new file mode 100644 index 000000000000..a70d1b59b613 --- /dev/null +++ b/packages/workflows-shared/README.md @@ -0,0 +1,7 @@ +# `@cloudflare/workflows-shared` + +This is a package that is used at Cloudflare to power some internal features of [Cloudflare Workflows](https://developers.cloudflare.com/workflows/), as well as their open-source equivalents here in workers-sdk and Wrangler. + +> [!NOTE] +> Since code in this package is used by the Workflows infrastructure, it is important that PRs are given careful review with regards to how they could cause a failure in production. +> Ideally, there should be comprehensive tests for changes being made to give extra confidence about the behavior. diff --git a/packages/workflows-shared/package.json b/packages/workflows-shared/package.json new file mode 100644 index 000000000000..2edf8721affe --- /dev/null +++ b/packages/workflows-shared/package.json @@ -0,0 +1,61 @@ +{ + "name": "@cloudflare/workflows-shared", + "version": "0.1.0", + "private": true, + "description": "Package that is used at Cloudflare to power some internal features of Cloudflare Workflows.", + "keywords": [ + "cloudflare", + "workflows", + "cloudflare workflows" + ], + "homepage": "https://github.com/cloudflare/workers-sdk/tree/main/packages/workflows-shared#readme", + "bugs": { + "url": "https://github.com/cloudflare/workers-sdk/issues" + }, + "repository": { + "type": "git", + "url": "https://github.com/cloudflare/workers-sdk.git", + "directory": "packages/workflows-shared" + }, + "license": "MIT OR Apache-2.0", + "author": "wrangler@cloudflare.com", + "types": "./dist", + "files": [ + "dist" + ], + "scripts": { + "build": "esbuild ./src/local-binding-worker.ts --format=esm --bundle --outfile=dist/local-binding-worker.mjs --sourcemap=external --external:cloudflare:*", + "check:lint": "eslint . --max-warnings=0", + "check:type": "tsc", + "clean": "rimraf dist", + "deploy": "echo 'no deploy'", + "dev": "pnpm run bundle:local-binding --watch", + "test:ci": "echo 'no tests'" + }, + "dependencies": { + "heap-js": "^2.5.0", + "itty-time": "^1.0.6", + "mime": "^3.0.0", + "zod": "^3.22.3" + }, + "devDependencies": { + "@cloudflare/eslint-config-worker": "workspace:*", + "@cloudflare/workers-tsconfig": "workspace:*", + "@cloudflare/workers-types": "^4.20241022.0", + "@types/mime": "^3.0.4", + "esbuild": "0.17.19", + "rimraf": "^6.0.1", + "typescript": "^5.5.4", + "vitest": "catalog:default" + }, + "engines": { + "node": ">=16.7.0" + }, + "volta": { + "extends": "../../package.json" + }, + "workers-sdk": { + "prerelease": true, + "deploy": true + } +} diff --git a/packages/workflows-shared/src/binding.ts b/packages/workflows-shared/src/binding.ts new file mode 100644 index 000000000000..d8d1f25a1f0c --- /dev/null +++ b/packages/workflows-shared/src/binding.ts @@ -0,0 +1,86 @@ +import { RpcTarget, WorkerEntrypoint } from "cloudflare:workers"; +import { InstanceEvent, instanceStatusName } from "./instance"; +import type { + DatabaseInstance, + DatabaseVersion, + DatabaseWorkflow, + Engine, +} from "./engine"; + +type Env = { + ENGINE: DurableObjectNamespace; +}; + +// this.env.WORKFLOW is WorkflowBinding +export class WorkflowBinding extends WorkerEntrypoint implements Workflow { + public async create({ + id, + params, + }: WorkflowInstanceCreateOptions): Promise { + if (!id) { + id = crypto.randomUUID(); + } + const stubId = this.env.ENGINE.idFromName(id); + const stub = this.env.ENGINE.get(stubId); + + void stub.init( + 0, // accountId: number, + {} as DatabaseWorkflow, // workflow: DatabaseWorkflow, + {} as DatabaseVersion, // version: DatabaseVersion, + { id } as DatabaseInstance, // instance: DatabaseInstance, + { + timestamp: new Date(), + payload: params as Readonly, + } + ); + + return new WorkflowHandle(id, stub); + } + + public async get(id: string): Promise { + const stubId = this.env.ENGINE.idFromName(id); + const stub = this.env.ENGINE.get(stubId); + return new WorkflowHandle(id, stub); + } +} + +export class WorkflowHandle extends RpcTarget implements WorkflowInstance { + constructor( + public id: string, + private stub: DurableObjectStub + ) { + super(); + } + + public async pause(): Promise { + // Look for instance in namespace + // Get engine stub + // Call a few functions on stub + throw new Error("Not implemented yet"); + } + + public async resume(): Promise { + throw new Error("Not implemented yet"); + } + + public async terminate(): Promise { + throw new Error("Not implemented yet"); + } + + public async restart(): Promise { + throw new Error("Not implemented yet"); + } + + public async status(): Promise { + const status = await this.stub.getStatus(0, this.id); + const { logs } = await this.stub.readLogs(); + // @ts-expect-error TODO: Fix this + const filteredLogs = logs.filter( + // @ts-expect-error TODO: Fix this + (log) => log.event === InstanceEvent.STEP_SUCCESS + ); + // @ts-expect-error TODO: Fix this + const output = filteredLogs.map((log) => log.metadata.result); + return { status: instanceStatusName(status), output }; // output, error + } +} diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts new file mode 100644 index 000000000000..ae922f21837f --- /dev/null +++ b/packages/workflows-shared/src/context.ts @@ -0,0 +1,537 @@ +import { RpcTarget } from "cloudflare:workers"; +import { ms } from "itty-time"; +import { INSTANCE_METADATA, InstanceEvent, InstanceStatus } from "./instance"; +import { computeHash } from "./lib/cache"; +import { + WorkflowFatalError, + WorkflowInternalError, + WorkflowTimeoutError, +} from "./lib/errors"; +import { calcRetryDuration } from "./lib/retries"; +import { MAX_STEP_NAME_LENGTH, validateStepName } from "./lib/validators"; +import type { Engine } from "./engine"; +import type { InstanceMetadata } from "./instance"; +import type { + WorkflowSleepDuration, + WorkflowStepConfig, +} from "cloudflare:workers"; + +export type ResolvedStepConfig = Required; + +const defaultConfig: Required = { + retries: { + limit: 5, + delay: 1000, + backoff: "constant", + }, + timeout: "15 minutes", +}; + +export interface UserErrorField { + isUserError?: boolean; +} + +export type StepState = { + attemptedCount: number; +}; + +export class Context extends RpcTarget { + #engine: Engine; + #state: DurableObjectState; + + #counters: Map = new Map(); + + constructor(engine: Engine, state: DurableObjectState) { + super(); + this.#engine = engine; + this.#state = state; + } + + #getCount(name: string): number { + let val = this.#counters.get(name) ?? 0; + // 1-indexed, as we're increasing the value before write + val++; + this.#counters.set(name, val); + + return val; + } + + do(name: string, callback: () => Promise): Promise; + do( + name: string, + config: WorkflowStepConfig, + callback: () => Promise + ): Promise; + + async do( + name: string, + configOrCallback: WorkflowStepConfig | (() => Promise), + callback?: () => Promise + ): Promise { + let closure, stepConfig; + // If a user passes in a config, we'd like it to be the second arg so the callback is always last + if (callback) { + closure = callback; + stepConfig = configOrCallback as WorkflowStepConfig; + } else { + closure = configOrCallback as () => Promise; + stepConfig = {}; + } + + if (!validateStepName(name)) { + // NOTE(lduarte): marking errors as user error allows the observability layer to avoid leaking + // user errors to sentry while making everything more observable. `isUserError` is not serialized + // into userland code due to how workerd serialzises errors over RPC - we also set it as undefined + // in the obs layer in case changes to workerd happen + const error = new WorkflowFatalError( + `Step name "${name}" exceeds max length (${MAX_STEP_NAME_LENGTH} chars) or invalid characters found` + ) as Error & UserErrorField; + error.isUserError = true; + throw error; + } + + let config: ResolvedStepConfig = { + ...defaultConfig, + ...stepConfig, + retries: { + ...defaultConfig.retries, + ...stepConfig.retries, + }, + }; + + const hash = await computeHash(name); + const count = this.#getCount("run-" + name); + const cacheKey = `${hash}-${count}`; + + const valueKey = `${cacheKey}-value`; + const configKey = `${cacheKey}-config`; + const stepNameWithCounter = `${name}-${count}`; + const stepStateKey = `${cacheKey}-metadata`; + + const maybeMap = await this.#state.storage.get([valueKey, configKey]); + + // Check cache + const maybeResult = maybeMap.get(valueKey); + + if (maybeResult) { + // console.log(`Cache hit for ${cacheKey}`); + return (maybeResult as { value: T }).value; + } + + // Persist initial config because user can pass in dynamic config + if (!maybeMap.has(configKey)) { + await this.#state.storage.put(configKey, config); + } else { + config = maybeMap.get(configKey) as ResolvedStepConfig; + } + + const attemptLogs = this.#engine + .readLogsFromStep(cacheKey) + .filter((val) => + [ + InstanceEvent.ATTEMPT_SUCCESS, + InstanceEvent.ATTEMPT_FAILURE, + InstanceEvent.ATTEMPT_START, + ].includes(val.event) + ); + + // this means that the the engine died while executing this step - we can mark the latest attempt as failed + if ( + attemptLogs.length > 0 && + attemptLogs.at(-1)?.event === InstanceEvent.ATTEMPT_START + ) { + // TODO: We should get this from SQL + const stepState = ((await this.#state.storage.get( + stepStateKey + )) as StepState) ?? { + attemptedCount: 1, + }; + + const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; + + // @ts-expect-error priorityQueue is initiated in init + const timeoutEntryPQ = this.#engine.priorityQueue.getFirst( + (a) => a.hash === priorityQueueHash && a.type === "timeout" + ); + // if there's a timeout on the PQ we pop it, because we wont need it + if (timeoutEntryPQ !== undefined) { + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove(timeoutEntryPQ); + } + this.#engine.writeLog( + InstanceEvent.ATTEMPT_FAILURE, + cacheKey, + stepNameWithCounter, + { + attempt: stepState.attemptedCount, + error: { + name: "WorkflowInternalError", + message: `Attempt failed due to internal workflows error`, + }, + } + ); + + await this.#state.storage.put(stepStateKey, stepState); + } + + const doWrapper = async ( + doWrapperClosure: () => Promise + ): Promise => { + const stepState = ((await this.#state.storage.get( + stepStateKey + )) as StepState) ?? { + attemptedCount: 0, + }; + await this.#engine.timeoutHandler.acquire(this.#engine); + + if (stepState.attemptedCount == 0) { + this.#engine.writeLog( + InstanceEvent.STEP_START, + cacheKey, + stepNameWithCounter, + { + config, + } + ); + } else { + // in case the engine dies while retrying and wakes up before the retry period + const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; + // @ts-expect-error priorityQueue is initiated in init + const retryEntryPQ = this.#engine.priorityQueue.getFirst( + (a) => a.hash === priorityQueueHash && a.type === "retry" + ); + // complete sleep if it didn't finish for some reason + if (retryEntryPQ !== undefined) { + await this.#engine.timeoutHandler.release(this.#engine); + await scheduler.wait(retryEntryPQ.targetTimestamp - Date.now()); + await this.#engine.timeoutHandler.acquire(this.#engine); + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ + hash: priorityQueueHash, + type: "retry", + }); + } + } + + let result; + + const instanceMetadata = + await this.#state.storage.get(INSTANCE_METADATA); + if (!instanceMetadata) { + throw new Error("instanceMetadata is undefined"); + } + const { accountId, instance } = instanceMetadata; + + try { + const timeoutPromise = async () => { + const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; + const timeout = ms(config.timeout); + // @ts-expect-error priorityQueue is initiated in init + await this.#engine.priorityQueue.add({ + hash: priorityQueueHash, + targetTimestamp: Date.now() + timeout, + type: "timeout", + }); + await scheduler.wait(timeout); + // if we reach here, means that we can try to delete the timeout from the PQ + // because we managed to wait in the same lifetime + // @ts-expect-error priorityQueue is initiated in init + await this.#engine.priorityQueue.remove({ + hash: priorityQueueHash, + type: "timeout", + }); + throw new WorkflowTimeoutError( + `Execution timed out after ${timeout}ms` + ); + }; + + this.#engine.writeLog( + InstanceEvent.ATTEMPT_START, + cacheKey, + stepNameWithCounter, + { + attempt: stepState.attemptedCount + 1, + } + ); + stepState.attemptedCount++; + await this.#state.storage.put(stepStateKey, stepState); + const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; + + result = await Promise.race([doWrapperClosure(), timeoutPromise()]); + + // if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ + // @ts-expect-error priorityQueue is initiated in init + await this.#engine.priorityQueue.remove({ + hash: priorityQueueHash, + type: "timeout", + }); + + // We store the value of `output` in an object with a `value` property. This allows us to store `undefined`, + // in the case that it's returned from the user's code. This is because DO storage will error if you try to + // store undefined directly. + try { + await this.#state.storage.put(valueKey, { value: result }); + } catch (e) { + // something that cannot be written to storage + if (e instanceof Error && e.name === "DataCloneError") { + this.#engine.writeLog( + InstanceEvent.ATTEMPT_FAILURE, + cacheKey, + stepNameWithCounter, + { + attempt: stepState.attemptedCount, + error: new WorkflowFatalError( + `Value returned from step "${name}" is not serialisable` + ), + } + ); + this.#engine.writeLog( + InstanceEvent.STEP_FAILURE, + cacheKey, + stepNameWithCounter, + {} + ); + this.#engine.writeLog(InstanceEvent.WORKFLOW_FAILURE, null, null, { + error: new WorkflowFatalError( + `The execution of the Workflow instance was terminated, as the step "${name}" returned a value which is not serialisable` + ), + }); + + await this.#engine.setStatus( + accountId, + instance.id, + InstanceStatus.Errored + ); + await this.#engine.timeoutHandler.release(this.#engine); + await this.#engine.abort("Value is not serialisable"); + } else { + // TODO (WOR-77): Send this to Sentry + throw new WorkflowInternalError( + `Storage failure for ${valueKey}: ${e} ` + ); + } + return; + } + + this.#engine.writeLog( + InstanceEvent.ATTEMPT_SUCCESS, + cacheKey, + stepNameWithCounter, + { + attempt: stepState.attemptedCount, + } + ); + } catch (e) { + const error = e as Error; + // if we reach here, means that the clouse ran but errored out and we can remove the timeout from the PQ + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ + hash: `${cacheKey}-${stepState.attemptedCount}`, + type: "timeout", + }); + + if (e instanceof Error && error.name === "NonRetryableError") { + this.#engine.writeLog( + InstanceEvent.ATTEMPT_FAILURE, + cacheKey, + stepNameWithCounter, + { + attempt: stepState.attemptedCount, + error: new WorkflowFatalError( + `Step threw a NonRetryableError with message "${e.message}"` + ), + } + ); + this.#engine.writeLog( + InstanceEvent.STEP_FAILURE, + cacheKey, + stepNameWithCounter, + {} + ); + this.#engine.writeLog(InstanceEvent.WORKFLOW_FAILURE, null, null, { + error: new WorkflowFatalError( + `The execution of the Workflow instance was terminated, as the step "${name}" threw a NonRetryableError` + ), + }); + + await this.#engine.setStatus( + accountId, + instance.id, + InstanceStatus.Errored + ); + await this.#engine.timeoutHandler.release(this.#engine); + return this.#engine.abort(`Step "${name}" threw a NonRetryableError`); + } + + this.#engine.writeLog( + InstanceEvent.ATTEMPT_FAILURE, + cacheKey, + stepNameWithCounter, + { + attempt: stepState.attemptedCount, + error: { + name: error.name, + message: error.message, + // TODO (WOR-79): Stacks are all incorrect over RPC and need work + // stack: error.stack, + }, + } + ); + + await this.#state.storage.put(stepStateKey, stepState); + + if (stepState.attemptedCount <= config.retries.limit) { + // TODO (WOR-71): Think through if every Error should transition + const durationMs = calcRetryDuration(config, stepState); + + const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; + // @ts-expect-error priorityQueue is initiated in init + await this.#engine.priorityQueue.add({ + hash: priorityQueueHash, + targetTimestamp: Date.now() + durationMs, + type: "retry", + }); + await this.#engine.timeoutHandler.release(this.#engine); + // this may never finish because of the grace period - but waker will take of it + await scheduler.wait(durationMs); + + // if it ever reaches here, we can try to remove it from the priority queue since it's no longer useful + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ + hash: priorityQueueHash, + type: "retry", + }); + + return doWrapper(doWrapperClosure); + } else { + await this.#engine.timeoutHandler.release(this.#engine); + this.#engine.writeLog( + InstanceEvent.STEP_FAILURE, + cacheKey, + stepNameWithCounter, + {} + ); + this.#engine.writeLog( + InstanceEvent.WORKFLOW_FAILURE, + cacheKey, + null, + {} + ); + await this.#engine.setStatus( + accountId, + instance.id, + InstanceStatus.Errored + ); + throw error; + } + } + + this.#engine.writeLog( + InstanceEvent.STEP_SUCCESS, + cacheKey, + stepNameWithCounter, + { + // TODO (WOR-86): Add limits, figure out serialization + result, + } + ); + await this.#engine.timeoutHandler.release(this.#engine); + return result; + }; + + return doWrapper(closure); + } + + async sleep(name: string, duration: WorkflowSleepDuration): Promise { + if (typeof duration == "string") { + duration = ms(duration); + } + + const hash = await computeHash(name + duration.toString()); + const count = this.#getCount("sleep-" + name + duration.toString()); + const cacheKey = `${hash}-${count}`; + const sleepNameWithCounter = `${name}-${count}`; + + const sleepKey = `${cacheKey}-value`; + const sleepLogWrittenKey = `${cacheKey}-log-written`; + const maybeResult = await this.#state.storage.get(sleepKey); + + if (maybeResult != undefined) { + // @ts-expect-error priorityQueue is initiated in init + const entryPQ = this.#engine.priorityQueue.getFirst( + (a) => a.hash === cacheKey && a.type === "sleep" + ); + // in case the engine dies while sleeping and wakes up before the retry period + if (entryPQ !== undefined) { + await scheduler.wait(entryPQ.targetTimestamp - Date.now()); + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ hash: cacheKey, type: "sleep" }); + } + const shouldWriteLog = + (await this.#state.storage.get(sleepLogWrittenKey)) == undefined; + if (shouldWriteLog) { + this.#engine.writeLog( + InstanceEvent.SLEEP_COMPLETE, + cacheKey, + sleepNameWithCounter, + {} + ); + await this.#state.storage.put(sleepLogWrittenKey, true); + } + return; + } + + this.#engine.writeLog( + InstanceEvent.SLEEP_START, + cacheKey, + sleepNameWithCounter, + { + durationMs: duration, + } + ); + const instanceMetadata = + await this.#state.storage.get(INSTANCE_METADATA); + if (!instanceMetadata) { + throw new Error("instanceMetadata is undefined"); + } + + // TODO(lduarte): not sure of this order of operations + await this.#state.storage.put(sleepKey, true); // Any value will do for cache hit + + // @ts-expect-error priorityQueue is initiated in init + await this.#engine.priorityQueue.add({ + hash: cacheKey, + targetTimestamp: Date.now() + duration, + type: "sleep", + }); + // this probably will never finish except if sleep is less than the grace period + await scheduler.wait(duration); + + this.#engine.writeLog( + InstanceEvent.SLEEP_COMPLETE, + cacheKey, + sleepNameWithCounter, + {} + ); + await this.#state.storage.put(sleepLogWrittenKey, true); + + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ hash: cacheKey, type: "sleep" }); + } + + async sleepUntil(name: string, timestamp: Date | number): Promise { + if (timestamp instanceof Date) { + timestamp = timestamp.valueOf(); + } + + const now = Date.now(); + // timestamp needs to be in the future, throw if not + if (timestamp < now) { + throw new Error( + "You can't sleep until a time in the past, time-traveler" + ); + } + + return this.sleep(name, timestamp - now); + } +} diff --git a/packages/workflows-shared/src/engine.ts b/packages/workflows-shared/src/engine.ts new file mode 100644 index 000000000000..a72f3f5fe68e --- /dev/null +++ b/packages/workflows-shared/src/engine.ts @@ -0,0 +1,256 @@ +import { DurableObject } from "cloudflare:workers"; +import { Context } from "./context"; +import { + INSTANCE_METADATA, + InstanceEvent, + InstanceStatus, + InstanceTrigger, +} from "./instance"; +import { + ENGINE_TIMEOUT, + GracePeriodSemaphore, + startGracePeriod, +} from "./lib/gracePeriodSemaphore"; +import { TimePriorityQueue } from "./lib/timePriorityQueue"; +import type { + InstanceLogsResponse, + InstanceMetadata, + RawInstanceLog, +} from "./instance"; +import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers"; + +export interface Env { + USER_WORKFLOW: WorkflowEntrypoint; +} + +export type DatabaseWorkflow = { + name: string; + id: string; + created_on: string; + modified_on: string; + script_name: string; + class_name: string | null; + triggered_on: string | null; +}; + +export type DatabaseVersion = { + id: string; + class_name: string; + created_on: string; + modified_on: string; + workflow_id: string; + mutable_pipeline_id: string; +}; + +export type DatabaseInstance = { + id: string; + created_on: string; + modified_on: string; + workflow_id: string; + version_id: string; + status: InstanceStatus; + started_on: string | null; + ended_on: string | null; +}; + +export class Engine extends DurableObject { + logs: Array = []; + status: InstanceStatus = InstanceStatus.Queued; + + isRunning: boolean = false; + accountId: number | undefined; + instanceId: string | undefined; + workflowName: string | undefined; + timeoutHandler: GracePeriodSemaphore; + priorityQueue: TimePriorityQueue | undefined; + + constructor(state: DurableObjectState, env: Env) { + super(state, env); + + void this.ctx.blockConcurrencyWhile(async () => { + this.ctx.storage.transactionSync(() => { + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS priority_queue ( + id INTEGER PRIMARY KEY NOT NULL, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + target_timestamp INTEGER NOT NULL, + action INTEGER NOT NULL, -- should only be 0 or 1 (1 for added, 0 for deleted), + entryType INTEGER NOT NULL, + hash TEXT NOT NULL, + CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1 + UNIQUE (action, entryType, hash) + ) + `); + }); + }); + + this.timeoutHandler = new GracePeriodSemaphore( + startGracePeriod, + ENGINE_TIMEOUT + ); + } + + writeLog( + event: InstanceEvent, + group: string | null, + target: string | null = null, + metadata: Record + ) { + this.logs.push({ + event, + group, + target, + metadata, + }); + } + + readLogsFromStep(_cacheKey: string): RawInstanceLog[] { + return []; + } + + readLogs(): InstanceLogsResponse { + return { + // @ts-expect-error TODO: Fix this + logs: this.logs, + }; + } + + async getStatus( + _accountId: number, + _instanceId: string + ): Promise { + return this.status; + } + + async setStatus( + accountId: number, + instanceId: string, + status: InstanceStatus + ): Promise { + this.status = status; + } + + async abort(_reason: string) { + // TODO: Maybe don't actually kill but instead check a flag and return early if true + } + + async userTriggeredTerminate() {} + + async init( + accountId: number, + workflow: DatabaseWorkflow, + version: DatabaseVersion, + instance: DatabaseInstance, + event: WorkflowEvent + ) { + if (this.priorityQueue === undefined) { + this.priorityQueue = new TimePriorityQueue( + this.ctx, + // this.env, + { + accountId, + workflow, + version, + instance, + event, + } + ); + } + this.priorityQueue.popPastEntries(); + await this.priorityQueue.handleNextAlarm(); + + if (this.isRunning) { + return; + } + + // We are not running and are possibly starting a new lifetime + this.accountId = accountId; + this.instanceId = instance.id; + this.workflowName = workflow.name; + + const status = await this.getStatus(accountId, instance.id); + if ( + [ + InstanceStatus.Errored, // TODO (WOR-85): Remove this once upgrade story is done + InstanceStatus.Terminated, + InstanceStatus.Complete, + ].includes(status) + ) { + return; + } + + if ((await this.ctx.storage.get(INSTANCE_METADATA)) == undefined) { + const instanceMetadata: InstanceMetadata = { + accountId, + workflow, + version, + instance, + event, + }; + await this.ctx.storage.put(INSTANCE_METADATA, instanceMetadata); + + // TODO (WOR-78): We currently don't have a queue mechanism + // WORKFLOW_QUEUED should happen before engine is spun up + this.writeLog(InstanceEvent.WORKFLOW_QUEUED, null, null, { + params: event.payload, + versionId: version.id, + trigger: { + source: InstanceTrigger.API, + }, + }); + this.writeLog(InstanceEvent.WORKFLOW_START, null, null, {}); + } + + const stubStep = new Context(this, this.ctx); + + const workflowRunningHandler = async () => { + await this.ctx.storage.transaction(async () => { + // manually start the grace period + // startGracePeriod(this, this.timeoutHandler.timeoutMs); + await this.setStatus(accountId, instance.id, InstanceStatus.Running); + }); + }; + this.isRunning = true; + void workflowRunningHandler(); + try { + const target = this.env.USER_WORKFLOW; + const result = await target.run(event, stubStep); + this.writeLog(InstanceEvent.WORKFLOW_SUCCESS, null, null, { + result, + }); + // NOTE(lduarte): we want to run this in a transaction to guarentee ordering with running setstatus call + // in case that it returns immediately + await this.ctx.storage.transaction(async () => { + await this.setStatus(accountId, instance.id, InstanceStatus.Complete); + }); + this.isRunning = false; + } catch (err) { + let error; + if (err instanceof Error) { + error = { + message: err.message, + name: err.name, + }; + } else { + error = { + name: "Error", + message: err, + }; + } + + this.writeLog(InstanceEvent.WORKFLOW_FAILURE, null, null, { + error, + }); + // NOTE(lduarte): we want to run this in a transaction to guarentee ordering with running setstatus call + // in case that it throws immediately + await this.ctx.storage.transaction(async () => { + await this.setStatus(accountId, instance.id, InstanceStatus.Errored); + }); + this.isRunning = false; + } + + return { + id: instance.id, + }; + } +} diff --git a/packages/workflows-shared/src/index.ts b/packages/workflows-shared/src/index.ts new file mode 100644 index 000000000000..2bd6abf17a36 --- /dev/null +++ b/packages/workflows-shared/src/index.ts @@ -0,0 +1,4 @@ +export * from "./binding"; +export * from "./context"; +export * from "./engine"; +export * from "./instance"; diff --git a/packages/workflows-shared/src/instance.ts b/packages/workflows-shared/src/instance.ts new file mode 100644 index 000000000000..2f6a37bcfdd6 --- /dev/null +++ b/packages/workflows-shared/src/instance.ts @@ -0,0 +1,196 @@ +import type { ResolvedStepConfig } from "./context"; +import type { + DatabaseInstance, + DatabaseVersion, + DatabaseWorkflow, +} from "./engine"; + +export type Instance = { + id: string; + created_on: string; + modified_on: string; + workflow_id: string; + version_id: string; + status: InstanceStatus; + started_on: string | null; + ended_on: string | null; +}; + +export const INSTANCE_METADATA = `INSTANCE_METADATA`; + +export type InstanceMetadata = { + accountId: number; + workflow: DatabaseWorkflow; + version: DatabaseVersion; + instance: DatabaseInstance; + event: { + payload: Record; + timestamp: Date; + }; +}; + +export enum InstanceStatus { + Queued = 0, // Queued and waiting to start + Running = 1, + Paused = 2, // TODO (WOR-73): Implement pause + Errored = 3, // Stopped due to a user or system Error + Terminated = 4, // Stopped explicitly by user + Complete = 5, // Successful completion + // TODO (WOR-71): Sleep +} + +export function instanceStatusName(status: InstanceStatus) { + switch (status) { + case InstanceStatus.Queued: + return "queued"; + case InstanceStatus.Running: + return "running"; + case InstanceStatus.Paused: + return "paused"; + case InstanceStatus.Errored: + return "errored"; + case InstanceStatus.Terminated: + return "terminated"; + case InstanceStatus.Complete: + return "complete"; + default: + return "unknown"; + } +} + +export const instanceStatusNames = [ + "queued", + "running", + "paused", + "errored", + "terminated", + "complete", + "unknown", +] as const; + +export function toInstanceStatus(status: string): InstanceStatus { + switch (status) { + case "queued": + return InstanceStatus.Queued; + case "running": + return InstanceStatus.Running; + case "paused": + return InstanceStatus.Paused; + case "errored": + return InstanceStatus.Errored; + case "terminated": + return InstanceStatus.Terminated; + case "complete": + return InstanceStatus.Complete; + case "unknown": + throw new Error("unknown cannot be parsed into a InstanceStatus"); + default: + throw new Error(`${status} was not handled`); + } +} + +export const enum InstanceEvent { + WORKFLOW_QUEUED = 0, + WORKFLOW_START = 1, + WORKFLOW_SUCCESS = 2, + WORKFLOW_FAILURE = 3, + WORKFLOW_TERMINATED = 4, + + STEP_START = 5, + STEP_SUCCESS = 6, + STEP_FAILURE = 7, + + SLEEP_START = 8, + SLEEP_COMPLETE = 9, + + ATTEMPT_START = 10, + ATTEMPT_SUCCESS = 11, + ATTEMPT_FAILURE = 12, +} + +export const enum InstanceTrigger { + API = 0, + BINDING = 1, + EVENT = 2, + CRON = 3, +} + +export function instanceTriggerName(trigger: InstanceTrigger) { + switch (trigger) { + case InstanceTrigger.API: + return "api"; + case InstanceTrigger.BINDING: + return "binding"; + case InstanceTrigger.EVENT: + return "event"; + case InstanceTrigger.CRON: + return "cron"; + default: + return "unknown"; + } +} + +export type RawInstanceLog = { + id: number; + timestamp: string; + event: InstanceEvent; + groupKey: string | null; + target: string | null; + metadata: string; +}; + +export type InstanceAttempt = { + start: string; + end: string | null; + success: boolean | null; + error: { name: string; message: string } | null; +}; + +export type InstanceStepLog = { + name: string; + start: string; + end: string | null; + attempts: InstanceAttempt[]; + config: ResolvedStepConfig; + output: unknown; + success: boolean | null; + type: "step"; +}; + +export type InstanceSleepLog = { + name: string; + start: string; + end: string; + finished: boolean; + type: "sleep"; +}; + +export type InstanceTerminateLog = { + type: "termination"; + trigger: { + source: string; + }; +}; + +export type InstanceLogsResponse = { + params: Record; + trigger: { + source: ReturnType; + }; + versionId: string; + queued: string; + start: string | null; + end: string | null; + steps: (InstanceStepLog | InstanceSleepLog | InstanceTerminateLog)[]; + success: boolean | null; + error: { name: string; message: string } | null; + output: Rpc.Serializable; +}; + +export type WakerPriorityEntry = { + hash: string; + type: WakerPriorityType; + targetTimestamp: number; +}; + +export type WakerPriorityType = "sleep" | "retry" | "timeout"; diff --git a/packages/workflows-shared/src/lib/cache.ts b/packages/workflows-shared/src/lib/cache.ts new file mode 100644 index 000000000000..556c1053a2ed --- /dev/null +++ b/packages/workflows-shared/src/lib/cache.ts @@ -0,0 +1,10 @@ +export async function computeHash(value: string) { + const msgUint8 = new TextEncoder().encode(value); // encode as (utf-8) Uint8Array + const hashBuffer = await crypto.subtle.digest("SHA-1", msgUint8); // hash the message + const hashArray = Array.from(new Uint8Array(hashBuffer)); // convert buffer to byte array + const hashHex = hashArray + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); // convert bytes to hex string + + return hashHex; +} diff --git a/packages/workflows-shared/src/lib/errors.ts b/packages/workflows-shared/src/lib/errors.ts new file mode 100644 index 000000000000..d43668fee166 --- /dev/null +++ b/packages/workflows-shared/src/lib/errors.ts @@ -0,0 +1,18 @@ +export class WorkflowTimeoutError extends Error { + name = "WorkflowTimeoutError"; +} + +export class WorkflowInternalError extends Error { + name = "WorkflowInternalError"; +} + +export class WorkflowFatalError extends Error { + name = "WorkflowFatalError"; + + toJSON() { + return { + name: this.name, + message: this.message, + }; + } +} diff --git a/packages/workflows-shared/src/lib/gracePeriodSemaphore.ts b/packages/workflows-shared/src/lib/gracePeriodSemaphore.ts new file mode 100644 index 000000000000..6c48c9d73ea5 --- /dev/null +++ b/packages/workflows-shared/src/lib/gracePeriodSemaphore.ts @@ -0,0 +1,87 @@ +import { ms } from "itty-time"; +import type { Engine } from "../engine"; +import type { WorkflowSleepDuration } from "cloudflare:workers"; + +export const ENGINE_TIMEOUT = ms("5 minutes" satisfies WorkflowSleepDuration); + +let latestGracePeriodTimestamp: number | undefined = undefined; + +export type GracePeriodCallback = (engine: Engine, timeoutMs: number) => void; + +export class GracePeriodSemaphore { + #counter: number = 0; + readonly callback: GracePeriodCallback; + readonly timeoutMs: number; + + constructor(callback: GracePeriodCallback, timeoutMs: number) { + this.callback = callback; + this.timeoutMs = timeoutMs; + } + + // acquire takes engine to be the same as release + async acquire(_engine: Engine) { + // when the counter goes from 0 to 1 - we can safely reject the previous grace period + if (this.#counter == 0) { + latestGracePeriodTimestamp = undefined; + } + this.#counter += 1; + } + + async release(engine: Engine) { + this.#counter = Math.max(this.#counter - 1, 0); + if (this.#counter == 0) { + // Trigger timeout promise, no need to await here, + // this can be triggered slightly after it's not time sensitive + this.callback(engine, this.timeoutMs); + } + } + + isRunningStep() { + return this.#counter > 0; + } +} + +export const startGracePeriod: GracePeriodCallback = async ( + engine: Engine, + timeoutMs: number +) => { + const gracePeriodHandler = async () => { + const thisTimestamp = new Date().valueOf(); + + // TODO: Should the grace period be 5 mins every time or 5 mins across lifetimes? + // At the moment, it looks like this will reset to 5 mins if a metal crashes + // There might possibly waste memory waiting for `timeoutMs` every time + // We should eventually strongly persist and respect this value across lifetimes + + // We are starting a new grace period + // 1. There should not be one already set + // 2. Or if there is, it should be in the past + if ( + !( + latestGracePeriodTimestamp === undefined || + latestGracePeriodTimestamp < thisTimestamp + ) + ) { + throw new Error( + "Can't start grace period since there is already an active one started on " + + latestGracePeriodTimestamp + ); + } + + latestGracePeriodTimestamp = thisTimestamp; + await scheduler.wait(timeoutMs); + if ( + thisTimestamp !== latestGracePeriodTimestamp || + engine.timeoutHandler.isRunningStep() + ) { + return; + } + // priorityQueue is set before the user code runs which implies that a grace period cannot start + // before init finishes where it is set + + // Ensure next alarm is set before we abort + await engine.priorityQueue?.handleNextAlarm(); + await engine.abort("Grace period complete"); + }; + void gracePeriodHandler(); +}; diff --git a/packages/workflows-shared/src/lib/retries.ts b/packages/workflows-shared/src/lib/retries.ts new file mode 100644 index 000000000000..725799ebcd18 --- /dev/null +++ b/packages/workflows-shared/src/lib/retries.ts @@ -0,0 +1,26 @@ +import { ms } from "itty-time"; +// @ts-expect-error workflows "shared" package will be pulled in later +import type { ResolvedStepConfig, StepState } from "shared"; + +export function calcRetryDuration( + config: ResolvedStepConfig, + stepState: StepState +): number { + const { attemptedCount: attemptCount } = stepState; + const { retries } = config; + + const delay = ms(retries.delay); + + switch (retries.backoff) { + case "exponential": { + return delay * Math.pow(2, attemptCount - 1); + } + case "linear": { + return delay * attemptCount; + } + case "constant": + default: { + return delay; + } + } +} diff --git a/packages/workflows-shared/src/lib/timePriorityQueue.ts b/packages/workflows-shared/src/lib/timePriorityQueue.ts new file mode 100644 index 000000000000..b14cdc737ad8 --- /dev/null +++ b/packages/workflows-shared/src/lib/timePriorityQueue.ts @@ -0,0 +1,267 @@ +import Heap from "heap-js"; +import type { + InstanceMetadata, + WakerPriorityEntry, + WakerPriorityType, +} from "../instance"; + +const wakerPriorityEntryComparator = ( + a: WakerPriorityEntry, + b: WakerPriorityEntry +) => { + return a.targetTimestamp - b.targetTimestamp; +}; + +const enum SQLiteBoolean { + FALSE = 0, + TRUE = 1, +} + +const enum EntryType { + RETRY = 0, + SLEEP = 1, + TIMEOUT = 2, +} + +type PriorityQueueDBEntry = { + id: number; + created_on: string; + target_timestamp: number; + action: SQLiteBoolean; + entryType: number; + hash: string; +}; + +export class TimePriorityQueue { + #heap: Heap = new Heap(wakerPriorityEntryComparator); + // #env: Env; + #ctx: DurableObjectState; + #instanceMetadata: InstanceMetadata; + + constructor( + ctx: DurableObjectState, + // env: Env, + instanceMetadata: InstanceMetadata + ) { + this.#ctx = ctx; + // this.#env = env; + this.#instanceMetadata = instanceMetadata; + this.#heap.init(this.getEntries()); + } + + popPastEntries(): WakerPriorityEntry[] | undefined { + // early return if there is nothing in the queue + if (this.#heap.length === 0) { + return; + } + const res: WakerPriorityEntry[] = []; + const currentTimestamp = new Date().valueOf(); + // heap-js does not have a ordered iterator that doesn't consume the input so we + // peek the first one, and pop if it's old until it's empty or in the future + // eslint-disable-next-line no-constant-condition + while (true) { + const element = this.#heap.peek(); + if (element === undefined) { + break; + } + if (element.targetTimestamp > currentTimestamp) { + break; + } + // at this point, targetTimestamp is older so we can pop this node because it's no + // longer relevant + res.push(element); + this.#heap.pop(); + } + this.#ctx.storage.transactionSync(() => { + for (const entry of res) { + this.removeEntryDB(entry); + } + }); + return res; + } + + /** + * `add` is ran using a transaction so it's race condition free, if it's ran atomically + * @param entry + */ + async add(entry: WakerPriorityEntry) { + await this.#ctx.storage.transaction(async () => { + // TODO: Handle this + // const waker = this.#env.WAKERS.idFromName( + // this.#instanceMetadata.instance.id + // ); + // const wakerStub = this.#env.WAKERS.get(waker); + // We can optimise this by only calling it if time is sooner than any other + // await wakerStub.wake( + // new Date(entry.targetTimestamp), + // this.#instanceMetadata + // ); + + this.#heap.add(entry); + this.addEntryDB(entry); + }); + } + + /** + * `remove` is ran using a transaction so it's race condition free, if it's ran atomically + * @param entry + */ + remove(entry: Omit) { + this.#ctx.storage.transactionSync(() => { + this.removeFirst((e) => { + if (e.hash === entry.hash && e.type === entry.type) { + return true; + } + return false; + }); + }); + } + + popTypeAll(entryType: WakerPriorityType) { + this.#ctx.storage.transactionSync(() => { + this.filter((e) => e.type !== entryType); + }); + } + + // Idempotent, perhaps name should suggest so + async handleNextAlarm() { + const nextWakeCall = this.#heap.peek(); + if (nextWakeCall === undefined) { + return; + } + // TODO: Handle this + // const waker = this.#env.WAKERS.idFromName( + // this.#instanceMetadata.instance.id + // ); + // const wakerStub = this.#env.WAKERS.get(waker); + // await wakerStub.wake( + // new Date(nextWakeCall.targetTimestamp), + // this.#instanceMetadata + // ); + } + + getFirst( + callbackFn: (a: WakerPriorityEntry) => boolean + ): WakerPriorityEntry | undefined { + // clone it so that people cant just modify the entry on the PQ + return structuredClone(this.#heap.toArray().find(callbackFn)); + } + + private removeFirst(callbackFn: (a: WakerPriorityEntry) => boolean) { + const elements = this.#heap.toArray(); + const index = elements.findIndex(callbackFn); + if (index === -1) { + return; + } + + const removedEntry = elements.splice(index, 1)[0]; + this.removeEntryDB(removedEntry); + + this.#heap = new Heap(wakerPriorityEntryComparator); + this.#heap.init(elements); + } + + private filter(callbackFn: (a: WakerPriorityEntry) => boolean) { + const filteredElements = this.#heap.toArray().filter(callbackFn); + const removedElements = this.#heap.toArray().filter((a) => !callbackFn(a)); + this.#ctx.storage.transactionSync(() => { + for (const entry of removedElements) { + this.removeEntryDB(entry); + } + }); + this.#heap = new Heap(wakerPriorityEntryComparator); + this.#heap.init(filteredElements); + } + + length() { + return this.#heap.length; + } + + private getEntries() { + const entries = [ + ...this.#ctx.storage.sql.exec("SELECT * FROM priority_queue ORDER BY id"), + ] as PriorityQueueDBEntry[]; + + const activeEntries: WakerPriorityEntry[] = []; + + entries.forEach((val) => { + const entryType = toWakerPriorityType(val.entryType); + // 0 - removed + if (val.action == 0) { + const index = activeEntries.findIndex( + (activeVal) => + val.hash == activeVal.hash && entryType == activeVal.type + ); + // if it's found remove it from the active list + if (index !== -1) { + activeEntries.splice(index, 1); + } + } else { + // 1 - added + const index = activeEntries.findIndex( + (activeVal) => + val.hash == activeVal.hash && entryType == activeVal.type + ); + // if it's found remove it from the active list + if (index === -1) { + activeEntries.push({ + hash: val.hash, + targetTimestamp: val.target_timestamp, + type: entryType, + }); + } + } + }); + return activeEntries; + } + + private removeEntryDB(entry: WakerPriorityEntry) { + this.#ctx.storage.sql.exec( + ` + INSERT INTO priority_queue (target_timestamp, action, entryType, hash) + VALUES (?, ?, ? ,?) + `, + entry.targetTimestamp, + SQLiteBoolean.FALSE, + fromWakerPriorityType(entry.type), + entry.hash + ); + } + + private addEntryDB(entry: WakerPriorityEntry) { + this.#ctx.storage.sql.exec( + ` + INSERT INTO priority_queue (target_timestamp, action, entryType, hash) + VALUES (?, ?, ? ,?) + `, + entry.targetTimestamp, + SQLiteBoolean.TRUE, + fromWakerPriorityType(entry.type), + entry.hash + ); + } +} + +const toWakerPriorityType = (entryType: EntryType): WakerPriorityType => { + switch (entryType) { + case EntryType.RETRY: + return "retry"; + case EntryType.SLEEP: + return "sleep"; + case EntryType.TIMEOUT: + return "timeout"; + } +}; + +const fromWakerPriorityType = (entryType: WakerPriorityType): EntryType => { + switch (entryType) { + case "retry": + return EntryType.RETRY; + case "sleep": + return EntryType.SLEEP; + case "timeout": + return EntryType.TIMEOUT; + default: + throw new Error(`WakerPriorityType "${entryType}" has not been handled`); + } +}; diff --git a/packages/workflows-shared/src/lib/validators.ts b/packages/workflows-shared/src/lib/validators.ts new file mode 100644 index 000000000000..646c19570adb --- /dev/null +++ b/packages/workflows-shared/src/lib/validators.ts @@ -0,0 +1,12 @@ +export const MAX_STEP_NAME_LENGTH = 256; +// eslint-disable-next-line no-control-regex +const CONTROL_CHAR_REGEX = new RegExp("[\x00-\x1F]"); + +export function validateStepName(string: string): boolean { + if (string.length > MAX_STEP_NAME_LENGTH) { + return false; + } + + //check for control chars + return !CONTROL_CHAR_REGEX.test(string); +} diff --git a/packages/workflows-shared/src/local-binding-worker.ts b/packages/workflows-shared/src/local-binding-worker.ts new file mode 100644 index 000000000000..d242767776a1 --- /dev/null +++ b/packages/workflows-shared/src/local-binding-worker.ts @@ -0,0 +1,2 @@ +export { WorkflowBinding } from "./binding"; +export { Engine } from "./engine"; diff --git a/packages/workflows-shared/tsconfig.json b/packages/workflows-shared/tsconfig.json new file mode 100644 index 000000000000..f6d1335ef171 --- /dev/null +++ b/packages/workflows-shared/tsconfig.json @@ -0,0 +1,18 @@ +{ + "extends": "@cloudflare/workers-tsconfig/tsconfig.json", + "compilerOptions": { + "target": "esnext", + "lib": ["es2022"], + "module": "esnext", + "moduleResolution": "node", + "types": ["@cloudflare/workers-types/experimental"], + "noEmit": true, + "isolatedModules": true, + "allowSyntheticDefaultImports": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "skipLibCheck": true + }, + "include": ["**/*.ts", "vitest.config.mts"], + "exclude": ["node_modules", "dist", "**/tests", "**/*.test.ts"] +} diff --git a/packages/workflows-shared/turbo.json b/packages/workflows-shared/turbo.json new file mode 100644 index 000000000000..0525d6e0d9fe --- /dev/null +++ b/packages/workflows-shared/turbo.json @@ -0,0 +1,10 @@ +{ + "$schema": "http://turbo.build/schema.json", + "extends": ["//"], + "pipeline": { + "build": { + "inputs": ["src/**", "*.js", "*.ts", "*.json"], + "outputs": ["dist/**"] + } + } +} diff --git a/packages/wrangler/src/api/integrations/platform/index.ts b/packages/wrangler/src/api/integrations/platform/index.ts index df384094e939..5d9693f06f36 100644 --- a/packages/wrangler/src/api/integrations/platform/index.ts +++ b/packages/wrangler/src/api/integrations/platform/index.ts @@ -198,7 +198,11 @@ function getMiniflarePersistOptions( persist: GetPlatformProxyOptions["persist"] ): Pick< MiniflareOptions, - "kvPersist" | "durableObjectsPersist" | "r2Persist" | "d1Persist" + | "kvPersist" + | "durableObjectsPersist" + | "r2Persist" + | "d1Persist" + | "workflowsPersist" > { if (persist === false) { // the user explicitly asked for no persistance @@ -207,6 +211,7 @@ function getMiniflarePersistOptions( durableObjectsPersist: false, r2Persist: false, d1Persist: false, + workflowsPersist: false, }; } @@ -220,6 +225,7 @@ function getMiniflarePersistOptions( durableObjectsPersist: `${persistPath}/do`, r2Persist: `${persistPath}/r2`, d1Persist: `${persistPath}/d1`, + workflowsPersist: `${persistPath}/workflows`, }; } diff --git a/packages/wrangler/src/api/startDevWorker/types.ts b/packages/wrangler/src/api/startDevWorker/types.ts index 99590f178b59..63e717acdddb 100644 --- a/packages/wrangler/src/api/startDevWorker/types.ts +++ b/packages/wrangler/src/api/startDevWorker/types.ts @@ -242,19 +242,20 @@ export type Trigger = | { type: "cron"; cron: string } | ({ type: "queue-consumer" } & QueueConsumer); -type BindingOmit = Omit; +type BindingOmit = Omit; +type NameOmit = Omit; export type Binding = | { type: "plain_text"; value: string } | { type: "json"; value: Json } | ({ type: "kv_namespace" } & BindingOmit) - | ({ type: "send_email" } & BindingOmit) + | ({ type: "send_email" } & NameOmit) | { type: "wasm_module"; source: BinaryFile } | { type: "text_blob"; source: File } | { type: "browser" } | { type: "ai" } | { type: "version_metadata" } | { type: "data_blob"; source: BinaryFile } - | ({ type: "durable_object_namespace" } & BindingOmit) + | ({ type: "durable_object_namespace" } & NameOmit) | ({ type: "workflow" } & BindingOmit) | ({ type: "queue" } & BindingOmit) | ({ type: "r2_bucket" } & BindingOmit) @@ -267,7 +268,7 @@ export type Binding = | ({ type: "dispatch_namespace" } & Omit) | ({ type: "mtls_certificate" } & Omit) | ({ type: "pipeline" } & Omit) - | ({ type: "logfwdr" } & Omit) + | ({ type: "logfwdr" } & NameOmit) | { type: `unsafe_${string}` } | { type: "assets" }; diff --git a/packages/wrangler/src/api/startDevWorker/utils.ts b/packages/wrangler/src/api/startDevWorker/utils.ts index a6c2c16ed1d3..fde0c2860570 100644 --- a/packages/wrangler/src/api/startDevWorker/utils.ts +++ b/packages/wrangler/src/api/startDevWorker/utils.ts @@ -374,6 +374,9 @@ export async function convertBindingsToCfWorkerInitBindings( } else if (binding.type === "logfwdr") { bindings.logfwdr ??= { bindings: [] }; bindings.logfwdr.bindings.push({ ...binding, name: name }); + } else if (binding.type === "workflow") { + bindings.workflows ??= []; + bindings.workflows.push({ ...binding, binding: name }); } else if (isUnsafeBindingType(binding.type)) { bindings.unsafe ??= { bindings: [], diff --git a/packages/wrangler/src/dev/miniflare.ts b/packages/wrangler/src/dev/miniflare.ts index 9da59e651fc9..2adb4d7cbfa2 100644 --- a/packages/wrangler/src/dev/miniflare.ts +++ b/packages/wrangler/src/dev/miniflare.ts @@ -36,6 +36,7 @@ import type { CfScriptFormat, CfUnsafeBinding, CfWorkerInit, + CfWorkflow, } from "../deployment-bundle/worker"; import type { WorkerEntrypointsDefinition, @@ -337,6 +338,18 @@ function queueProducerEntry( function hyperdriveEntry(hyperdrive: CfHyperdrive): [string, string] { return [hyperdrive.binding, hyperdrive.localConnectionString ?? ""]; } +function workflowEntry( + workflow: CfWorkflow +): [string, { name: string; className: string; scriptName?: string }] { + return [ + workflow.binding, + { + name: workflow.name, + className: workflow.class_name, + scriptName: workflow.script_name, + }, + ]; +} function ratelimitEntry(ratelimit: CfUnsafeBinding): [string, object] { return [ratelimit.name, ratelimit]; } @@ -366,6 +379,7 @@ type WorkerOptionsBindings = Pick< | "hyperdrives" | "durableObjects" | "serviceBindings" + | "workflows" | "wrappedBindings" >; @@ -628,6 +642,7 @@ export function buildMiniflareBindingOptions(config: MiniflareBindingsConfig): { hyperdrives: Object.fromEntries( bindings.hyperdrive?.map(hyperdriveEntry) ?? [] ), + workflows: Object.fromEntries(bindings.workflows?.map(workflowEntry) ?? []), durableObjects: Object.fromEntries([ ...internalObjects.map(({ name, class_name }) => { @@ -692,6 +707,7 @@ export function buildPersistOptions( kvPersist: path.join(v3Path, "kv"), r2Persist: path.join(v3Path, "r2"), d1Persist: path.join(v3Path, "d1"), + workflowsPersist: path.join(v3Path, "workflows"), }; } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b6ca2b0b29e3..335770937824 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -743,6 +743,9 @@ importers: '@cloudflare/workers-types': specifier: ^4.20241022.0 version: 4.20241022.0 + undici: + specifier: ^6.20.1 + version: 6.20.1 wrangler: specifier: workspace:* version: link:../../packages/wrangler @@ -1085,6 +1088,9 @@ importers: '@cloudflare/workers-types': specifier: ^4.20241022.0 version: 4.20241022.0 + '@cloudflare/workflows-shared': + specifier: workspace:* + version: link:../workflows-shared '@microsoft/api-extractor': specifier: ^7.47.0 version: 7.47.4(@types/node@20.8.3) @@ -1160,6 +1166,9 @@ importers: expect-type: specifier: ^0.15.0 version: 0.15.0 + heap-js: + specifier: ^2.5.0 + version: 2.5.0 http-cache-semantics: specifier: ^4.1.0 version: 4.1.1 @@ -1627,6 +1636,46 @@ importers: specifier: workspace:* version: link:../wrangler + packages/workflows-shared: + dependencies: + heap-js: + specifier: ^2.5.0 + version: 2.5.0 + itty-time: + specifier: ^1.0.6 + version: 1.0.6 + mime: + specifier: ^3.0.0 + version: 3.0.0 + zod: + specifier: ^3.22.3 + version: 3.22.3 + devDependencies: + '@cloudflare/eslint-config-worker': + specifier: workspace:* + version: link:../eslint-config-worker + '@cloudflare/workers-tsconfig': + specifier: workspace:* + version: link:../workers-tsconfig + '@cloudflare/workers-types': + specifier: ^4.20241022.0 + version: 4.20241022.0 + '@types/mime': + specifier: ^3.0.4 + version: 3.0.4 + esbuild: + specifier: 0.17.19 + version: 0.17.19 + rimraf: + specifier: ^6.0.1 + version: 6.0.1 + typescript: + specifier: ^5.5.4 + version: 5.5.4 + vitest: + specifier: catalog:default + version: 2.1.1(@types/node@20.8.3)(@vitest/ui@1.6.0)(msw@2.3.0(typescript@5.5.4))(supports-color@9.2.2) + packages/wrangler: dependencies: '@cloudflare/kv-asset-handler': @@ -2388,31 +2437,31 @@ packages: resolution: {integrity: sha512-H8q/Msk+9Fga6iqqmff7i4mi+kraBCQWFbMEaKIRq3+HBNN5gkpizk05DSG6iIHVxCG1M3WR1FkN9CQ0ZtK4Cw==} '@cloudflare/workerd-darwin-64@1.20241022.0': - resolution: {integrity: sha512-1NNYun37myMTgCUiPQEJ0cMal4mKZVTpkD0b2tx9hV70xji+frVJcSK8YVLeUm1P+Rw1d/ct8DMgQuCpsz3Fsw==} + resolution: {integrity: sha512-1NNYun37myMTgCUiPQEJ0cMal4mKZVTpkD0b2tx9hV70xji+frVJcSK8YVLeUm1P+Rw1d/ct8DMgQuCpsz3Fsw==, tarball: https://registry.npmjs.org/@cloudflare/workerd-darwin-64/-/workerd-darwin-64-1.20241022.0.tgz} engines: {node: '>=16'} cpu: [x64] os: [darwin] '@cloudflare/workerd-darwin-arm64@1.20241022.0': - resolution: {integrity: sha512-FOO/0P0U82EsTLTdweNVgw+4VOk5nghExLPLSppdOziq6IR5HVgP44Kmq5LdsUeHUhwUmfOh9hzaTpkNzUqKvw==} + resolution: {integrity: sha512-FOO/0P0U82EsTLTdweNVgw+4VOk5nghExLPLSppdOziq6IR5HVgP44Kmq5LdsUeHUhwUmfOh9hzaTpkNzUqKvw==, tarball: https://registry.npmjs.org/@cloudflare/workerd-darwin-arm64/-/workerd-darwin-arm64-1.20241022.0.tgz} engines: {node: '>=16'} cpu: [arm64] os: [darwin] '@cloudflare/workerd-linux-64@1.20241022.0': - resolution: {integrity: sha512-RsNc19BQJG9yd+ngnjuDeG9ywZG+7t1L4JeglgceyY5ViMNMKVO7Zpbsu69kXslU9h6xyQG+lrmclg3cBpnhYA==} + resolution: {integrity: sha512-RsNc19BQJG9yd+ngnjuDeG9ywZG+7t1L4JeglgceyY5ViMNMKVO7Zpbsu69kXslU9h6xyQG+lrmclg3cBpnhYA==, tarball: https://registry.npmjs.org/@cloudflare/workerd-linux-64/-/workerd-linux-64-1.20241022.0.tgz} engines: {node: '>=16'} cpu: [x64] os: [linux] '@cloudflare/workerd-linux-arm64@1.20241022.0': - resolution: {integrity: sha512-x5mUXpKxfsosxcFmcq5DaqLs37PejHYVRsNz1cWI59ma7aC4y4Qn6Tf3i0r9MwQTF/MccP4SjVslMU6m4W7IaA==} + resolution: {integrity: sha512-x5mUXpKxfsosxcFmcq5DaqLs37PejHYVRsNz1cWI59ma7aC4y4Qn6Tf3i0r9MwQTF/MccP4SjVslMU6m4W7IaA==, tarball: https://registry.npmjs.org/@cloudflare/workerd-linux-arm64/-/workerd-linux-arm64-1.20241022.0.tgz} engines: {node: '>=16'} cpu: [arm64] os: [linux] '@cloudflare/workerd-windows-64@1.20241022.0': - resolution: {integrity: sha512-eBCClx4szCOgKqOlxxbdNszMqQf3MRG1B9BRIqEM/diDfdR9IrZ8l3FaEm+l9gXgPmS6m1NBn40aWuGBl8UTSw==} + resolution: {integrity: sha512-eBCClx4szCOgKqOlxxbdNszMqQf3MRG1B9BRIqEM/diDfdR9IrZ8l3FaEm+l9gXgPmS6m1NBn40aWuGBl8UTSw==, tarball: https://registry.npmjs.org/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20241022.0.tgz} engines: {node: '>=16'} cpu: [x64] os: [win32] @@ -5524,6 +5573,10 @@ packages: headers-polyfill@4.0.3: resolution: {integrity: sha512-IScLbePpkvO846sIwOtOTDjutRMWdXdJmXdMvk6gCBHxFO8d+QKOQedyZSxFTTFYRSmlgSTDtXqqq4pcenBXLQ==} + heap-js@2.5.0: + resolution: {integrity: sha512-kUGoI3p7u6B41z/dp33G6OaL7J4DRqRYwVmeIlwLClx7yaaAy7hoDExnuejTKtuDwfcatGmddHDEOjf6EyIxtQ==} + engines: {node: '>=10.0.0'} + hex2dec@1.1.2: resolution: {integrity: sha512-Yu+q/XWr2fFQ11tHxPq4p4EiNkb2y+lAacJNhAdRXVfRIcDH6gi7htWFnnlIzvqHMHoWeIsfXlNAjZInpAOJDA==} @@ -8059,6 +8112,10 @@ packages: resolution: {integrity: sha512-72RFADWFqKmUb2hmmvNODKL3p9hcB6Gt2DOQMis1SEBaV6a4MH8soBvzg+95CYhCKPFedut2JY9bMfrDl9D23g==} engines: {node: '>=14.0'} + undici@6.20.1: + resolution: {integrity: sha512-AjQF1QsmqfJys+LXfGTNum+qw4S88CojRInG/6t31W/1fk6G59s92bnAvGz5Cmur+kQv2SURXEvvudLmbrE8QA==} + engines: {node: '>=18.17'} + unenv-nightly@2.0.0-20241018-011344-e666fcf: resolution: {integrity: sha512-D00bYn8rzkCBOlLx+k1iHQlc69jvtJRT7Eek4yIGQ6461a2tUBjngGZdRpqsoXAJCz/qBW0NgPting7Zvg+ysg==} @@ -12723,6 +12780,8 @@ snapshots: headers-polyfill@4.0.3: {} + heap-js@2.5.0: {} + hex2dec@1.1.2: {} hono@3.12.11: {} @@ -15303,6 +15362,8 @@ snapshots: dependencies: '@fastify/busboy': 2.1.1 + undici@6.20.1: {} + unenv-nightly@2.0.0-20241018-011344-e666fcf: dependencies: defu: 6.1.4 diff --git a/tools/deployments/__tests__/deploy-non-npm-packages.test.ts b/tools/deployments/__tests__/deploy-non-npm-packages.test.ts index 49061d3ab74d..ee6394b8e94c 100644 --- a/tools/deployments/__tests__/deploy-non-npm-packages.test.ts +++ b/tools/deployments/__tests__/deploy-non-npm-packages.test.ts @@ -107,6 +107,7 @@ describe("findDeployablePackageNames()", () => { "workers-playground", "@cloudflare/workers-shared", "workers.new", + "@cloudflare/workflows-shared", "@cloudflare/wrangler-devtools", } `); diff --git a/tools/deployments/__tests__/validate-changesets.test.ts b/tools/deployments/__tests__/validate-changesets.test.ts index 157f66eddb9a..c5d830642b0d 100644 --- a/tools/deployments/__tests__/validate-changesets.test.ts +++ b/tools/deployments/__tests__/validate-changesets.test.ts @@ -38,6 +38,7 @@ describe("findPackageNames()", () => { "workers-playground", "@cloudflare/workers-shared", "workers.new", + "@cloudflare/workflows-shared", "wrangler", "@cloudflare/wrangler-devtools", }