Skip to content

Commit

Permalink
PIPE-37 Add authenticated HTTP source options to Worker Pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
oliy committed Oct 15, 2024
1 parent 9b5910f commit f05f11d
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 33 deletions.
113 changes: 109 additions & 4 deletions packages/wrangler/src/__tests__/pipelines.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { mockConsoleMethods } from "./helpers/mock-console";
import { msw } from "./helpers/msw";
import { runInTempDir } from "./helpers/run-in-tmp";
import { runWrangler } from "./helpers/run-wrangler";
import type { Pipeline, PipelineEntry } from "../pipelines/client";
import type { HttpSource, Pipeline, PipelineEntry } from "../pipelines/client";

describe("pipelines", () => {
const std = mockConsoleMethods();
Expand Down Expand Up @@ -39,7 +39,7 @@ describe("pipelines", () => {
},
},
endpoint: "https://0001.pipelines.cloudflarestorage.com",
};
} satisfies Pipeline;

function mockCreateR2Token(bucket: string) {
const requests = { count: 0 };
Expand Down Expand Up @@ -145,14 +145,18 @@ describe("pipelines", () => {
status: number = 200,
error?: object
) {
const requests = { count: 0 };
const requests: { count: number; body: Pipeline | null } = {
count: 0,
body: null,
};
msw.use(
http.post(
"*/accounts/:accountId/pipelines",
async ({ request, params }) => {
expect(params.accountId).toEqual("some-account-id");
const config = (await request.json()) as Pipeline;
expect(config.name).toEqual(name);
requests.body = config;
requests.count++;
const pipeline: Pipeline = {
...config,
Expand All @@ -167,7 +171,7 @@ describe("pipelines", () => {
messages: [],
result: pipeline,
},
{ status: status }
{ status }
);
},
{ once: true }
Expand Down Expand Up @@ -329,6 +333,53 @@ describe("pipelines", () => {
`);
});

it("shows create usage details", async () => {
await runWrangler("pipelines create -h");
await endEventLoop();

expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"wrangler pipelines create <pipeline>
Create a new pipeline
POSITIONALS
pipeline The name of the new pipeline [string] [required]
GLOBAL FLAGS
-j, --experimental-json-config Experimental: support wrangler.json [boolean]
-c, --config Path to .toml configuration file [string]
-e, --env Environment to use for operations and .env files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]
OPTIONS
--secret-access-key The R2 service token Access Key to write data [string]
--access-key-id The R2 service token Secret Key to write data [string]
--batch-max-mb The approximate maximum size of a batch before flush in megabytes
Default: 10 [number]
--batch-max-rows The approximate maximum size of a batch before flush in rows
Default: 10000 [number]
--batch-max-seconds The approximate maximum duration of a batch before flush in seconds
Default: 15 [number]
--transform The worker and entrypoint of the PipelineTransform implementation in the format \\"worker.entrypoint\\"
Default: No transformation worker [string]
--compression Sets the compression format of output files
Default: gzip [string] [choices: \\"none\\", \\"gzip\\", \\"deflate\\"]
--filepath The path to store files in the destination bucket
Default: event_date=\${date}/hr=\${hr} [string]
--filename The name of the file in the bucket. Must contain \\"\${slug}\\". File extension is optional
Default: \${slug}-\${hr}.json [string]
--binding Enable Worker binding to this pipeline
Default: true [boolean]
--http Enable HTTPS endpoint to send data to this pipeline
Default: true [boolean]
--authentication Require authentication (Cloudflare API Token) to send data to the HTTPS endpoint
Default: false [boolean]
--r2 Destination R2 bucket name [string] [required]"
`);
});

it("create - should create a pipeline", async () => {
const tokenReq = mockCreateR2Token("test-bucket");
const requests = mockCreateRequest("my-pipeline");
Expand Down Expand Up @@ -361,6 +412,32 @@ describe("pipelines", () => {
expect(requests.count).toEqual(1);
});

it("create - should create a pipeline with auth", async () => {
const requests = mockCreateRequest("my-pipeline");
await runWrangler(
"pipelines create my-pipeline --authentication --r2 test-bucket --access-key-id my-key --secret-access-key my-secret"
);
expect(requests.count).toEqual(1);

// contain http source and include auth
expect(requests.body?.source[1].type).toEqual("http");
expect(
(requests.body?.source[1] as HttpSource).config?.authentication
).toEqual(true);
});

it("create - should create a pipeline without http", async () => {
const requests = mockCreateRequest("my-pipeline");
await runWrangler(
"pipelines create my-pipeline --http=false --r2 test-bucket --access-key-id my-key --secret-access-key my-secret"
);
expect(requests.count).toEqual(1);

// only contains binding source
expect(requests.body?.source.length).toEqual(1);
expect(requests.body?.source[0].type).toEqual("binding");
});

it("list - should list pipelines", async () => {
const requests = mockListRequest([
{
Expand Down Expand Up @@ -493,6 +570,34 @@ describe("pipelines", () => {
expect(updateReq.count).toEqual(1);
});

it("update - should update a pipeline with source changes http auth", async () => {
const pipeline: Pipeline = samplePipeline;
mockShowRequest(pipeline.name, pipeline);

const update = JSON.parse(JSON.stringify(pipeline));
update.source = [
{
type: "http",
format: "json",
config: {
authenticated: true,
},
},
];
const updateReq = mockUpdateRequest(update.name, update);

await runWrangler(
"pipelines update my-pipeline --binding=false --http --authentication --r2 new-bucket --access-key-id new-key --secret-access-key new-secret"
);

expect(updateReq.count).toEqual(1);
expect(updateReq.body?.source.length).toEqual(1);
expect(updateReq.body?.source[0].type).toEqual("http");
expect(
(updateReq.body?.source[0] as HttpSource).config?.authentication
).toEqual(true);
});

it("update - should fail a missing pipeline", async () => {
const requests = mockShowRequest("bad-pipeline", null, 404, {
code: 1000,
Expand Down
20 changes: 15 additions & 5 deletions packages/wrangler/src/pipelines/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,24 @@ export type TransformConfig = {
script: string;
entrypoint: string;
};
export type HttpSource = {
type: "http";
format: string;
schema?: string;
config?: {
authentication: boolean;
};
};
export type BindingSource = {
type: "binding";
format: string;
schema?: string;
};
export type Source = HttpSource | BindingSource;
export type PipelineUserConfig = {
name: string;
metadata: { [x: string]: string };
source: {
type: string;
format: string;
schema?: string;
}[];
source: Source[];
transforms: TransformConfig[];
destination: {
type: string;
Expand Down
98 changes: 74 additions & 24 deletions packages/wrangler/src/pipelines/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import {
updatePipeline,
} from "./client";
import type { CommonYargsArgv, CommonYargsOptions } from "../yargs-types";
import type { PipelineUserConfig } from "./client";
import type {
BindingSource,
HttpSource,
PipelineUserConfig,
Source,
} from "./client";
import type { Argv } from "yargs";

// flag to skip delays for tests
Expand Down Expand Up @@ -136,9 +141,20 @@ function addCreateAndUpdateOptions(yargs: Argv<CommonYargsOptions>) {
type: "string",
demandOption: false,
})
.option("binding", {
describe: "Enable Worker binding to this pipeline\nDefault: true",
type: "boolean",
demandOption: false,
})
.option("http", {
describe:
"Enable HTTPS endpoint to send data to this pipeline\nDefault: true",
type: "boolean",
demandOption: false,
})
.option("authentication", {
describe:
"Enabling authentication means that data can only be sent to the pipeline via the binding \nDefault: false",
"Require authentication (Cloudflare API Token) to send data to the HTTPS endpoint\nDefault: false",
type: "boolean",
demandOption: false,
});
Expand Down Expand Up @@ -186,16 +202,7 @@ export function pipelines(pipelineYargs: CommonYargsArgv) {
const pipelineConfig: PipelineUserConfig = {
name: name,
metadata: {},
source: [
{
type: "http",
format: "json",
},
{
type: "binding",
format: "json",
},
],
source: [],
transforms: [],
destination: {
type: "r2",
Expand Down Expand Up @@ -237,13 +244,27 @@ export function pipelines(pipelineYargs: CommonYargsArgv) {
throw new FatalError("Requires a r2 secret access key");
}

if (args.authentication) {
pipelineConfig.source = [
{
type: "binding",
format: "json",
// add binding source (default to add)
if (args.binding === undefined || args.binding) {
pipelineConfig.source.push({
type: "binding",
format: "json",
} satisfies BindingSource);
}

// add http source (possibly authenticated), default to add
if (args.http === undefined || args.http) {
pipelineConfig.source.push({
type: "http",
format: "json",
config: {
authentication:
args.authentication !== undefined && args.authentication,
},
];
} satisfies HttpSource);
}
if (pipelineConfig.source.length < 1) {
throw new FatalError("At leat one source should be specified");
}

if (args.transform !== undefined) {
Expand Down Expand Up @@ -393,18 +414,47 @@ export function pipelines(pipelineYargs: CommonYargsArgv) {
}
}

if (args.authentication !== undefined) {
// strip off existing http source
if (args.binding !== undefined) {
// strip off old source & keep if necessary
const source = pipelineConfig.source.find(
(s: Source) => s.type == "binding"
);
pipelineConfig.source = pipelineConfig.source.filter(
(s) => s.type == "http"
(s: Source) => s.type != "binding"
);
if (args.binding) {
// add back only if specified
pipelineConfig.source.push({
type: "binding",
format: "json",
...source,
});
}
}

// add back only if unauthenticated
if (!args.authentication) {
if (args.http !== undefined) {
// strip off old source & keep if necessary
const source = pipelineConfig.source.find(
(s: Source) => s.type == "http"
);
pipelineConfig.source = pipelineConfig.source.filter(
(s: Source) => s.type != "http"
);
if (args.http) {
// add back if specified
pipelineConfig.source.push({
type: "http",
format: "json",
});
...source,
config: {
authentication:
args.authentication !== undefined
? // if auth specified, use it
args.authentication
: // if auth not specified, use previos value or default(false)
source?.config?.authentication || false,
},
} satisfies HttpSource);
}
}

Expand Down

0 comments on commit f05f11d

Please sign in to comment.