diff --git a/.changeset/serious-gifts-thank.md b/.changeset/serious-gifts-thank.md new file mode 100644 index 000000000000..ae6df70418fa --- /dev/null +++ b/.changeset/serious-gifts-thank.md @@ -0,0 +1,5 @@ +--- +"wrangler": patch +--- + +Add HTTP authentication options for Workers Pipelines diff --git a/packages/wrangler/src/__tests__/pipelines.test.ts b/packages/wrangler/src/__tests__/pipelines.test.ts index 3296e26a28af..d3d8cf7780d1 100644 --- a/packages/wrangler/src/__tests__/pipelines.test.ts +++ b/packages/wrangler/src/__tests__/pipelines.test.ts @@ -7,7 +7,7 @@ import { mockConsoleMethods } from "./helpers/mock-console"; import { msw } from "./helpers/msw"; import { runInTempDir } from "./helpers/run-in-tmp"; import { runWrangler } from "./helpers/run-wrangler"; -import type { Pipeline, PipelineEntry } from "../pipelines/client"; +import type { HttpSource, Pipeline, PipelineEntry } from "../pipelines/client"; describe("pipelines", () => { const std = mockConsoleMethods(); @@ -39,7 +39,7 @@ describe("pipelines", () => { }, }, endpoint: "https://0001.pipelines.cloudflarestorage.com", - }; + } satisfies Pipeline; function mockCreateR2Token(bucket: string) { const requests = { count: 0 }; @@ -145,7 +145,10 @@ describe("pipelines", () => { status: number = 200, error?: object ) { - const requests = { count: 0 }; + const requests: { count: number; body: Pipeline | null } = { + count: 0, + body: null, + }; msw.use( http.post( "*/accounts/:accountId/pipelines", @@ -153,6 +156,7 @@ describe("pipelines", () => { expect(params.accountId).toEqual("some-account-id"); const config = (await request.json()) as Pipeline; expect(config.name).toEqual(name); + requests.body = config; requests.count++; const pipeline: Pipeline = { ...config, @@ -167,7 +171,7 @@ describe("pipelines", () => { messages: [], result: pipeline, }, - { status: status } + { status } ); }, { once: true } @@ -329,6 +333,50 @@ describe("pipelines", () => { `); }); + it("shows create usage details", async () => { + await runWrangler("pipelines create -h"); + await endEventLoop(); + + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + "wrangler pipelines create + + Create a new pipeline + + POSITIONALS + pipeline The name of the new pipeline [string] [required] + + GLOBAL FLAGS + -j, --experimental-json-config Experimental: support wrangler.json [boolean] + -c, --config Path to .toml configuration file [string] + -e, --env Environment to use for operations and .env files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean] + + OPTIONS + --secret-access-key The R2 service token Access Key to write data [string] + --access-key-id The R2 service token Secret Key to write data [string] + --batch-max-mb The approximate maximum size of a batch before flush in megabytes + Default: 10 [number] + --batch-max-rows The approximate maximum size of a batch before flush in rows + Default: 10000 [number] + --batch-max-seconds The approximate maximum duration of a batch before flush in seconds + Default: 15 [number] + --transform The worker and entrypoint of the PipelineTransform implementation in the format \\"worker.entrypoint\\" + Default: No transformation worker [string] + --compression Sets the compression format of output files + Default: gzip [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"] + --filepath The path to store files in the destination bucket + Default: event_date=\${date}/hr=\${hr} [string] + --filename The name of the file in the bucket. Must contain \\"\${slug}\\". File extension is optional + Default: \${slug}-\${hr}.json [string] + --binding Enable Worker binding to this pipeline [boolean] [default: true] + --http Enable HTTPS endpoint to send data to this pipeline [boolean] [default: true] + --authentication Require authentication (Cloudflare API Token) to send data to the HTTPS endpoint [boolean] [default: false] + --r2 Destination R2 bucket name [string] [required]" + `); + }); + it("create - should create a pipeline", async () => { const tokenReq = mockCreateR2Token("test-bucket"); const requests = mockCreateRequest("my-pipeline"); @@ -361,6 +409,32 @@ describe("pipelines", () => { expect(requests.count).toEqual(1); }); + it("create - should create a pipeline with auth", async () => { + const requests = mockCreateRequest("my-pipeline"); + await runWrangler( + "pipelines create my-pipeline --authentication --r2 test-bucket --access-key-id my-key --secret-access-key my-secret" + ); + expect(requests.count).toEqual(1); + + // contain http source and include auth + expect(requests.body?.source[1].type).toEqual("http"); + expect( + (requests.body?.source[1] as HttpSource).config?.authentication + ).toEqual(true); + }); + + it("create - should create a pipeline without http", async () => { + const requests = mockCreateRequest("my-pipeline"); + await runWrangler( + "pipelines create my-pipeline --http=false --r2 test-bucket --access-key-id my-key --secret-access-key my-secret" + ); + expect(requests.count).toEqual(1); + + // only contains binding source + expect(requests.body?.source.length).toEqual(1); + expect(requests.body?.source[0].type).toEqual("binding"); + }); + it("list - should list pipelines", async () => { const requests = mockListRequest([ { @@ -493,6 +567,34 @@ describe("pipelines", () => { expect(updateReq.count).toEqual(1); }); + it("update - should update a pipeline with source changes http auth", async () => { + const pipeline: Pipeline = samplePipeline; + mockShowRequest(pipeline.name, pipeline); + + const update = JSON.parse(JSON.stringify(pipeline)); + update.source = [ + { + type: "http", + format: "json", + config: { + authenticated: true, + }, + }, + ]; + const updateReq = mockUpdateRequest(update.name, update); + + await runWrangler( + "pipelines update my-pipeline --binding=false --http --authentication --r2 new-bucket --access-key-id new-key --secret-access-key new-secret" + ); + + expect(updateReq.count).toEqual(1); + expect(updateReq.body?.source.length).toEqual(1); + expect(updateReq.body?.source[0].type).toEqual("http"); + expect( + (updateReq.body?.source[0] as HttpSource).config?.authentication + ).toEqual(true); + }); + it("update - should fail a missing pipeline", async () => { const requests = mockShowRequest("bad-pipeline", null, 404, { code: 1000, diff --git a/packages/wrangler/src/pipelines/client.ts b/packages/wrangler/src/pipelines/client.ts index 76cbf5b7eed9..f983045bcb08 100644 --- a/packages/wrangler/src/pipelines/client.ts +++ b/packages/wrangler/src/pipelines/client.ts @@ -14,14 +14,24 @@ export type TransformConfig = { script: string; entrypoint: string; }; +export type HttpSource = { + type: "http"; + format: string; + schema?: string; + config?: { + authentication: boolean; + }; +}; +export type BindingSource = { + type: "binding"; + format: string; + schema?: string; +}; +export type Source = HttpSource | BindingSource; export type PipelineUserConfig = { name: string; metadata: { [x: string]: string }; - source: { - type: string; - format: string; - schema?: string; - }[]; + source: Source[]; transforms: TransformConfig[]; destination: { type: string; diff --git a/packages/wrangler/src/pipelines/index.ts b/packages/wrangler/src/pipelines/index.ts index e970d4f198ed..6d112cc409fd 100644 --- a/packages/wrangler/src/pipelines/index.ts +++ b/packages/wrangler/src/pipelines/index.ts @@ -1,6 +1,6 @@ import { readConfig } from "../config"; import { sleep } from "../deploy/deploy"; -import { FatalError } from "../errors"; +import { FatalError, UserError } from "../errors"; import { printWranglerBanner } from "../index"; import { logger } from "../logger"; import * as metrics from "../metrics"; @@ -17,7 +17,12 @@ import { updatePipeline, } from "./client"; import type { CommonYargsArgv, CommonYargsOptions } from "../yargs-types"; -import type { PipelineUserConfig } from "./client"; +import type { + BindingSource, + HttpSource, + PipelineUserConfig, + Source, +} from "./client"; import type { Argv } from "yargs"; // flag to skip delays for tests @@ -136,10 +141,23 @@ function addCreateAndUpdateOptions(yargs: Argv) { type: "string", demandOption: false, }) + .option("binding", { + describe: "Enable Worker binding to this pipeline", + type: "boolean", + default: true, + demandOption: false, + }) + .option("http", { + describe: "Enable HTTPS endpoint to send data to this pipeline", + type: "boolean", + default: true, + demandOption: false, + }) .option("authentication", { describe: - "Enabling authentication means that data can only be sent to the pipeline via the binding \nDefault: false", + "Require authentication (Cloudflare API Token) to send data to the HTTPS endpoint", type: "boolean", + default: false, demandOption: false, }); } @@ -186,16 +204,7 @@ export function pipelines(pipelineYargs: CommonYargsArgv) { const pipelineConfig: PipelineUserConfig = { name: name, metadata: {}, - source: [ - { - type: "http", - format: "json", - }, - { - type: "binding", - format: "json", - }, - ], + source: [], transforms: [], destination: { type: "r2", @@ -237,13 +246,29 @@ export function pipelines(pipelineYargs: CommonYargsArgv) { throw new FatalError("Requires a r2 secret access key"); } - if (args.authentication) { - pipelineConfig.source = [ - { - type: "binding", - format: "json", + // add binding source (default to add) + if (args.binding === undefined || args.binding) { + pipelineConfig.source.push({ + type: "binding", + format: "json", + } satisfies BindingSource); + } + + // add http source (possibly authenticated), default to add + if (args.http === undefined || args.http) { + pipelineConfig.source.push({ + type: "http", + format: "json", + config: { + authentication: + args.authentication !== undefined && args.authentication, }, - ]; + } satisfies HttpSource); + } + if (pipelineConfig.source.length < 1) { + throw new UserError( + "Too many sources have been disabled. At least one source (http or binding) should be enabled" + ); } if (args.transform !== undefined) { @@ -393,18 +418,47 @@ export function pipelines(pipelineYargs: CommonYargsArgv) { } } - if (args.authentication !== undefined) { - // strip off existing http source + if (args.binding !== undefined) { + // strip off old source & keep if necessary + const source = pipelineConfig.source.find( + (s: Source) => s.type == "binding" + ); pipelineConfig.source = pipelineConfig.source.filter( - (s) => s.type == "http" + (s: Source) => s.type != "binding" ); + if (args.binding) { + // add back only if specified + pipelineConfig.source.push({ + type: "binding", + format: "json", + ...source, + }); + } + } - // add back only if unauthenticated - if (!args.authentication) { + if (args.http !== undefined) { + // strip off old source & keep if necessary + const source = pipelineConfig.source.find( + (s: Source) => s.type == "http" + ); + pipelineConfig.source = pipelineConfig.source.filter( + (s: Source) => s.type != "http" + ); + if (args.http) { + // add back if specified pipelineConfig.source.push({ type: "http", format: "json", - }); + ...source, + config: { + authentication: + args.authentication !== undefined + ? // if auth specified, use it + args.authentication + : // if auth not specified, use previos value or default(false) + source?.config?.authentication || false, + }, + } satisfies HttpSource); } }