From 0621a2ded6a0fb3fe64e1e15ce1c783b55806f5d Mon Sep 17 00:00:00 2001 From: Raphael Manke Date: Sun, 21 Jan 2024 18:17:05 +0100 Subject: [PATCH] docs(pipes): Add examples to README --- packages/@aws-cdk/aws-pipes-alpha/README.md | 367 ++++++++++++++---- .../rosetta/default.ts-fixture | 86 +++- .../rosetta/pipes-imports.ts-fixture | 7 + 3 files changed, 382 insertions(+), 78 deletions(-) create mode 100644 packages/@aws-cdk/aws-pipes-alpha/rosetta/pipes-imports.ts-fixture diff --git a/packages/@aws-cdk/aws-pipes-alpha/README.md b/packages/@aws-cdk/aws-pipes-alpha/README.md index 91ad685f1fd0b..b031825eee90e 100644 --- a/packages/@aws-cdk/aws-pipes-alpha/README.md +++ b/packages/@aws-cdk/aws-pipes-alpha/README.md @@ -23,12 +23,10 @@ can be filtered, transformed and enriched. ![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png) -For more details see the service +For more details see the service documentation: [Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html) -[Cloudformation docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html) - ## Pipe [EventBridge Pipes](https://aws.amazon.com/blogs/aws/new-create-point-to-point-integrations-between-event-producers-and-consumers-with-amazon-eventbridge-pipes/) @@ -36,30 +34,31 @@ is a fully managed service that enables point-to-point integrations between event producers and consumers. Pipes can be used to connect several AWS services to each other, or to connect AWS services to external services. -A Pipe has a Source and a Target. The source events can be filtered and enriched +A Pipe has a source and a target. The source events can be filtered and enriched before reaching the target. -## Example +## Example - pipe usage -```ts -import * as sqs from 'aws-cdk-lib/aws-sqs'; -import * as pipes from '@aws-cdk/aws-pipes-alpha'; -import * as sources from '@aws-cdk/aws-pipes-sources-alpha'; -import * as targets from '@aws-cdk/aws-pipes-targets-alpha'; +> The following code examples use an example implementation of a [source](#source) and [target](#target). In the future there will be separate packages for the sources and targets. -const sourceQueue = new sqs.Queue(stack, 'SourceQueue'); -const targetQueue = new sqs.Queue(stack, 'TargetQueue'); +To define a pipe you need to create a new `Pipe` construct. The `Pipe` construct needs a source and a target. +```ts +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; -const pipe = new Pipe(stack, 'Pipe', { - source: new sources.Queue(sourceQueue), - target: new targets.Queue(targetQueue), +const pipe = new pipes.Pipe(this, 'Pipe', { + source: new SqsSource(sourceQueue), + target: new SqsTarget(targetQueue), }); ``` +This minimal example creates a pipe with a SQS queue as source and a SQS queue as target. +Messages from the source are put into the body of the target message. + ## Source -A source is a AWS Service that needs to be polled. The following Sources are +A source is a AWS Service that is polled. The following Sources are possible: - [Amazon DynamoDB stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-dynamodb.html) @@ -69,25 +68,47 @@ possible: - [Self managed Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html) - [Amazon SQS queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html) +> Currently no implementation exist for any of the supported sources. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one. + +### Example source implementation + +```ts fixture=pipes-imports +class SqsSource implements pipes.ISource { + sourceArn: string; + sourceParameters = undefined; + + constructor(private readonly queue: sqs.Queue) { + this.queue = queue; + this.sourceArn = queue.queueArn; + } + + bind(_pipe: pipes.IPipe): pipes.SourceConfig { + return { + sourceParameters: this.sourceParameters, + }; + } + + grantRead(pipeRole: cdk.aws_iam.IRole): void { + this.queue.grantConsumeMessages(pipeRole); + } +} +``` +A source implementation needs to provide the `sourceArn`, `sourceParameters` and grant the pipe role read access to the source. ## Filter A Filter can be used to filter the events from the source before they are -forwarded to the enrichment step. Multiple filter expressions are possible. If -one of the filter expressions matches the event is forwarded to the enrichment -or target step. +forwarded to the enrichment or, if no enrichment is present, target step. Multiple filter expressions are possible. +If one of the filter expressions matches the event is forwarded to the enrichment or target step. -```ts -import * as sqs from 'aws-cdk-lib/aws-sqs'; -import * as pipes from '@aws-cdk/aws-pipes-alpha'; -import * as sources from '@aws-cdk/aws-pipes-sources-alpha'; -import * as targets from '@aws-cdk/aws-pipes-targets-alpha'; +### Example - filter usage -const sourceQueue = new sqs.Queue(stack, 'SourceQueue'); -const targetQueue = new sqs.Queue(stack, 'TargetQueue'); +```ts +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; -const filter = new pipes.Filter( +const sourceFilter = new pipes.Filter( [ pipes.FilterPattern.fromObject({ body: { @@ -98,45 +119,51 @@ const filter = new pipes.Filter( ] ) -const pipe = new Pipe(stack, 'Pipe', { - source: new sources.Queue(sourceQueue), - target: new targets.Queue(targetQueue), - filters: [filter], +const pipe = new pipes.Pipe(this, 'Pipe', { + source: new SqsSource(sourceQueue), + target: new SqsTarget(targetQueue), + filter: sourceFilter, }); ``` -## Input Transformation +This example shows a filter that only forwards events with the `customerType` B2B or B2C from the source messages. Messages that are not matching the filter are not forwarded to the enrichment or target step. + +You can define multiple filter pattern which are combined with a logical `OR`. + +Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html) + +## Input transformation For enrichments and targets the input event can be transformed. The transformation is applied for each item of the batch. A transformation has access to the input event as well to some context information of the pipe itself like the name of the pipe. +See [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html) for details. -### From object +### Example - input transformation from object The input transformation can be created from an object. The object can contain static values, dynamic values or pipe variables. ```ts -import * as sqs from 'aws-cdk-lib/aws-sqs'; -import * as pipes from '@aws-cdk/aws-pipes-alpha'; -import * as sources from '@aws-cdk/aws-pipes-sources-alpha'; -import * as targets from '@aws-cdk/aws-pipes-targets-alpha'; +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; -const sourceQueue = new sqs.Queue(stack, 'SourceQueue'); -const targetQueue = new sqs.Queue(stack, 'TargetQueue'); +const targetInputTransformation = pipes.InputTransformation.fromObject({ + staticField: 'static value', + dynamicField: pipes.DynamicInput.fromEventPath('$.body.payload'), + pipeVariable: pipes.DynamicInput.pipeName, +}); -const pipe = new Pipe(stack, 'Pipe', { +const pipe = new pipes.Pipe(this, 'Pipe', { pipeName: 'MyPipe', - source: new sources.Queue(sourceQueue), - target: new targets.Queue(targetQueue, { - inputTransformation: pipes.InputTransformation.fromObject({ - staticField: 'static value', - dynamicField: pipes.DynamicInput.fromEventPath('$.body.payload'), - pipeVariable: pipes.PipeVariable.pipeName(), - }), + source: new SqsSource(sourceQueue), + target: new SqsTarget(targetQueue, { + inputTransformation: targetInputTransformation, }), }); ``` -The following example shows the input event and the result of the transformation. +This example shows a transformation that adds a static field, a dynamic field and a pipe variable to the input event. The dynamic field is extracted from the input event. The pipe variable is extracted from the pipe context. + +So when the following batch of input events is processed by the pipe ```json [ @@ -148,6 +175,8 @@ The following example shows the input event and the result of the transformation ] ``` +it is converted into the following payload. + ```json [ { @@ -173,7 +202,7 @@ If the transformation is applied to a target it might be converted to a string r ``` -### From event path +### Example - input transformation from event path In cases where you want to forward only a part of the event to the target you can use the transformation event path. @@ -181,18 +210,15 @@ In cases where you want to forward only a part of the event to the target you ca ```ts -import * as sqs from 'aws-cdk-lib/aws-sqs'; -import * as pipes from '@aws-cdk/aws-pipes-alpha'; -import * as sources from '@aws-cdk/aws-pipes-sources-alpha'; -import * as targets from '@aws-cdk/aws-pipes-targets-alpha'; - -const sourceQueue = new sqs.Queue(stack, 'SourceQueue'); -const targetQueue = new sqs.Queue(stack, 'TargetQueue'); - -const pipe = new Pipe(stack, 'Pipe', { - source: new sources.Queue(sourceQueue), - target: new targets.Queue(targetQueue, { - inputTransformation: InputTransformation.fromEventPath('$.body.payload'), +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; + +const targetInputTransformation = pipes.InputTransformation.fromEventPath('$.body.payload') + +const pipe = new pipes.Pipe(this, 'Pipe', { + source: new SqsSource(sourceQueue), + target: new SqsTarget(targetQueue, { + inputTransformation: targetInputTransformation, }), }); ``` @@ -223,25 +249,22 @@ it is converted into the following target payload. ] ``` -> The implicit payload parsing (e.g. SQS message body to JSON) only works if the input is the source payload. Implicit body parsing is not applied on enrichment results. +> The [implicit payload parsing](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html#input-transform-implicit) (e.g. SQS message body to JSON) only works if the input is the source payload. Implicit body parsing is not applied on enrichment results. -### From text +### Example - input transformation from text In cases where you want to forward a static text to the target or use your own formatted `inputTemplate` you can use the transformation from text. ```ts -import * as sqs from 'aws-cdk-lib/aws-sqs'; -import * as pipes from '@aws-cdk/aws-pipes-alpha'; -import * as sources from '@aws-cdk/aws-pipes-sources-alpha'; -import * as targets from '@aws-cdk/aws-pipes-targets-alpha'; - -const sourceQueue = new sqs.Queue(stack, 'SourceQueue'); -const targetQueue = new sqs.Queue(stack, 'TargetQueue'); - -const pipe = new Pipe(stack, 'Pipe', { - source: new sources.Queue(sourceQueue), - target: new targets.Queue(targetQueue, { - inputTransformation: InputTransformation.fromText('My static text'), +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; + +const targetInputTransformation = pipes.InputTransformation.fromText('My static text'); + +const pipe = new pipes.Pipe(this, 'Pipe', { + source: new SqsSource(sourceQueue), + target: new SqsTarget(targetQueue, { + inputTransformation: targetInputTransformation, }), }); ``` @@ -258,8 +281,6 @@ This transformation forwards the static text to the target. ] ``` - - ## Enrichment In the enrichment step the (un)filtered payloads from the source can be used to @@ -271,6 +292,106 @@ invoke one of the following services - Step Functions state machine - only express workflow +### Example enrichment implementation + +> Currently no implementation exist for any of the supported enrichments. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one. + +```ts fixture=pipes-imports +class LambdaEnrichment implements pipes.IEnrichment { + enrichmentArn: string; + private inputTransformation: pipes.InputTransformation | undefined; + + constructor(private readonly lambda: lambda.Function, props: {inputTransformation?: pipes.InputTransformation} = {}) { + this.enrichmentArn = lambda.functionArn; + this.inputTransformation = props?.inputTransformation + } + + bind(pipe: pipes.IPipe): pipes.EnrichmentParametersConfig { + return { + enrichmentParameters: { + inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate, + }, + }; + } + + grantInvoke(pipeRole: cdk.aws_iam.IRole): void { + this.lambda.grantInvoke(pipeRole); + } +} +``` + +An enrichment implementation needs to provide the `enrichmentArn`, `enrichmentParameters` and grant the pipe role invoke access to the enrichment. + +### Example - enrichment usage + +```ts +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; +declare const enrichmentLambda: lambda.Function; + +const enrichmentInputTransformation = pipes.InputTransformation.fromObject({ + staticField: 'static value', + dynamicField: pipes.DynamicInput.fromEventPath('$.body.payload'), + pipeVariable: pipes.DynamicInput.pipeName, +}); + +const pipe = new pipes.Pipe(this, 'Pipe', { + source: new SqsSource(sourceQueue), + target: new SqsTarget(targetQueue), + enrichment: new LambdaEnrichment(enrichmentLambda, { + inputTransformation: enrichmentInputTransformation, + }), +}); +``` + +This example adds a lambda function as enrichment to the pipe. The lambda function is invoked with the batch of messages from the source after applying the transformation. The lambda function can return a result which is forwarded to the target. + +So the following batch of input events is processed by the pipe + +```json +[ + { + ... + "body": "{\"payload\": \"Test message.\"}", + ... + } +] +``` + +it is converted into the following payload which is sent to the lambda function. + +```json +[ + { + ... + "staticField": "static value", + "dynamicField": "Test message.", + "pipeVariable": "MyPipe", + ... + } +] +``` + +The lambda function can return a result which is forwarded to the target. +For example a lambda function that returns a concatenation of the static field, dynamic field and pipe variable + +```ts nofixture +export async function handler (event: any) { + return event.staticField + "-" + event.dynamicField + "-" + event.pipeVariable; +}; +``` + +will produce the following target message in the target SQS queue. + +```json +[ + { + ... + "body": "static value-Test message.-MyPipe", + ... + } +] +``` ## Target @@ -299,3 +420,95 @@ targets are supported: The target event can be transformed before it is forwarded to the target using the same input transformation as in the enrichment step. +### Example target implementation + +> Currently no implementation exist for any of the supported targets. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one. + +```ts fixture=pipes-imports +class SqsTarget implements pipes.ITarget { + targetArn: string; + private inputTransformation: pipes.InputTransformation | undefined; + + constructor(private readonly queue: sqs.Queue, props: {inputTransformation?: pipes.InputTransformation} = {}) { + this.queue = queue; + this.targetArn = queue.queueArn; + this.inputTransformation = props?.inputTransformation + } + + bind(_pipe: pipes.Pipe): pipes.TargetConfig { + return { + targetParameters: { + inputTemplate: this.inputTransformation?.bind(_pipe).inputTemplate, + }, + }; + } + + grantPush(pipeRole: cdk.aws_iam.IRole): void { + this.queue.grantSendMessages(pipeRole); + } +} +``` + +A target implementation needs to provide the `targetArn`, `enrichmentParameters` and grant the pipe role invoke access to the enrichment. + +## Log destination + +A pipe can produce log events that are forwarded to different log destinations. +You can configure multiple destinations, but all the destination share the same log level and log data. +For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html). + +The log level and data that is included in the log events is configured on the pipe class itself. +Whereas the actual destination is defined independent. + +### Example log destination implementation + +> Currently no implementation exist for any of the supported enrichments. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one. + + +```ts fixture=pipes-imports +class CloudwatchDestination implements pipes.ILogDestination { + parameters: pipes.LogDestinationParameters; + + constructor(private readonly logGroup: cdk.aws_logs.LogGroup) { + this.logGroup = logGroup; + this.parameters = { + cloudwatchLogsLogDestination: { + logGroupArn: logGroup.logGroupArn, + }, + }; + } + + bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig { + return { + parameters: this.parameters, + }; + } + + grantPush(pipeRole: cdk.aws_iam.IRole): void { + this.logGroup.grantWrite(pipeRole); + } +} +``` + +### Example log destination usage + +```ts +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; +declare const loggroup: logs.LogGroup; + +const pipe = new pipes.Pipe(this, 'Pipe', { + source: new SqsSource(sourceQueue), + target: new SqsTarget(targetQueue), + + logLevel: pipes.LogLevel.TRACE, + logIncludeExecutionData: [pipes.IncludeExecutionData.ALL], + + logDestinations: [ + new CloudwatchDestination(loggroup), + ], +}); +``` + +This example uses a cloudwatch loggroup to store the log emitted during a pipe execution. The log level is set to `TRACE` so all steps of the pipe are logged. +Additionally all execution data is logged as well. diff --git a/packages/@aws-cdk/aws-pipes-alpha/rosetta/default.ts-fixture b/packages/@aws-cdk/aws-pipes-alpha/rosetta/default.ts-fixture index 2729c6effb19d..782590cf38ec1 100644 --- a/packages/@aws-cdk/aws-pipes-alpha/rosetta/default.ts-fixture +++ b/packages/@aws-cdk/aws-pipes-alpha/rosetta/default.ts-fixture @@ -1,7 +1,91 @@ // Fixture with packages imported, but nothing else import * as cdk from 'aws-cdk-lib'; +import * as sqs from 'aws-cdk-lib/aws-sqs'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import * as logs from 'aws-cdk-lib/aws-logs'; import { Construct } from 'constructs'; -import { App, Stack, TimeZone, Duration } from 'aws-cdk-lib'; +import * as pipes from '@aws-cdk/aws-pipes-alpha'; + +class SqsSource implements pipes.ISource { + sourceArn: string; + sourceParameters = undefined; + constructor(private readonly queue: sqs.Queue) { + this.queue = queue; + this.sourceArn = queue.queueArn; + } + bind(_pipe: pipes.IPipe): pipes.SourceConfig { + return { + sourceParameters: this.sourceParameters, + }; + } + grantRead(pipeRole: cdk.aws_iam.IRole): void { + this.queue.grantConsumeMessages(pipeRole); + } +} + +class SqsTarget implements pipes.ITarget { + targetArn: string; + inputTransformation: pipes.InputTransformation | undefined; + + constructor(private readonly queue: sqs.Queue, props: {inputTransformation?: pipes.InputTransformation} = {}) { + this.inputTransformation = props?.inputTransformation + this.queue = queue; + this.targetArn = queue.queueArn; + } + + bind(_pipe: pipes.IPipe): pipes.TargetConfig { + return { + targetParameters: { + inputTemplate: this.inputTransformation?.bind(_pipe).inputTemplate, + }, + }; + } + + grantPush(pipeRole: cdk.aws_iam.IRole): void { + this.queue.grantSendMessages(pipeRole); + } +} + +class LambdaEnrichment implements pipes.IEnrichment { + enrichmentArn: string; + + private inputTransformation: pipes.InputTransformation | undefined; + constructor(private readonly lambda: lambda.Function, props: {inputTransformation?: pipes.InputTransformation} = {}) { + this.enrichmentArn = lambda.functionArn; + this.inputTransformation = props?.inputTransformation + } + bind(pipe: pipes.IPipe): pipes.EnrichmentParametersConfig { + return { + enrichmentParameters: { + inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate, + }, + }; + } + grantInvoke(pipeRole: cdk.aws_iam.IRole): void { + this.lambda.grantInvoke(pipeRole); + } +} + +class CloudwatchDestination implements pipes.ILogDestination { + parameters: pipes.LogDestinationParameters; + constructor(private readonly logGroup: cdk.aws_logs.LogGroup) { + this.logGroup = logGroup; + this.parameters = { + cloudwatchLogsLogDestination: { + logGroupArn: logGroup.logGroupArn, + }, + }; + } + bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig { + return { + parameters: this.parameters, + }; + } + + grantPush(pipeRole: cdk.aws_iam.IRole): void { + this.logGroup.grantWrite(pipeRole); + } +} class Fixture extends cdk.Stack { constructor(scope: Construct, id: string) { diff --git a/packages/@aws-cdk/aws-pipes-alpha/rosetta/pipes-imports.ts-fixture b/packages/@aws-cdk/aws-pipes-alpha/rosetta/pipes-imports.ts-fixture new file mode 100644 index 0000000000000..861d0e072a8ce --- /dev/null +++ b/packages/@aws-cdk/aws-pipes-alpha/rosetta/pipes-imports.ts-fixture @@ -0,0 +1,7 @@ +// Fixture with packages imported, but nothing else +import * as cdk from 'aws-cdk-lib'; +import * as sqs from 'aws-cdk-lib/aws-sqs'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import * as pipes from '@aws-cdk/aws-pipes-alpha'; + +///here \ No newline at end of file