Skip to content

Commit

Permalink
feat: Add Batch Processor module (#1317)
Browse files Browse the repository at this point in the history
* Starting to sketch out shape of API for batch processor

* Variant 1

* Some more examples

* Add extra bit for handling message-specific mutation

* Make clear what's not public

* test with interfaces

* move tests

* refactoring a bit

* refactoring and adding FIFO

* refactoring and adding FIFO

* adding FIFO management

* cleanup

* add javadoc

* Flesh out builder option a bit

* Flesh out a bit more

* more changes

* Leaning into the builder style. needs some more thought

* The shape of it is rightish

* Working working

* Work

* Work on kinesis batch handler

* More tests

* More tests and starting to add an example

* Working on batch

* feat(batch): initial DdbBatchMessageHandler implementation

* more

* fix pom.xml for powertools-examples-batch

* Add dynamodb example

* Move template into subdir

* Better structure

* tidy up

* Trying to get kinesis going

* Kinesis demo working

* Updated readme

* Deprecated everywhere

* Address initial review comments

* Add success tests for Kinesis/S3

* Increase DDB coverage

* Tell sonar to ignore dupes in examples

* Add docs

* Add warning

* More doco

* Format

* Docs good

* Disabling formatting check for now as its breaking the build and I can't work out how to autoapply it from intellij properly

* Make checkstyle happy

* Add docs from heitor

* More docs changes

* move ddb template in the right folder

* Changes

* add items updates and deletions to ddb example

* Will it blend?

* More changes

* e2e test handler

* Try work for SQS only

* More greatness

* Almost good

* SQS works

* Also kinesis e2e

* Lets try doing it with streams

* Try make it work with streams

* Streams?

* Make SQS test work

* SQS and Kinesis work

* DynamoDB E2E works

* Formatting

* Try exclude e2e-tests from dupe checking

* Rename sonar file

* Formatting

* Update docs/utilities/batch.md

Co-authored-by: Jérôme Van Der Linden <117538+jeromevdl@users.noreply.github.com>

* Update docs/utilities/batch.md

Co-authored-by: Jérôme Van Der Linden <117538+jeromevdl@users.noreply.github.com>

* Address review comments

* Missed one

* Formatting

* Cleanup doc linking

* More doco

* Update docs/utilities/batch.md

Co-authored-by: Jérôme Van Der Linden <117538+jeromevdl@users.noreply.github.com>

* Update batch.md

Address review comments

* Skip aspectj run

---------

Co-authored-by: Scott Gerring <scott.gerring@spookfish.com>
Co-authored-by: Jerome Van Der Linden <jeromevdl@gmail.com>
Co-authored-by: Michele Ricciardi <mriccia@amazon.com>
Co-authored-by: Jérôme Van Der Linden <117538+jeromevdl@users.noreply.github.com>
  • Loading branch information
5 people authored Aug 4, 2023
1 parent fc3e971 commit d0c7f91
Show file tree
Hide file tree
Showing 53 changed files with 4,460 additions and 358 deletions.
2 changes: 2 additions & 0 deletions .sonarcloud.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Ignore code duplicates in the examples
sonar.cpd.exclusions=examples/**/*,powertools-e2e-tests/**/*
805 changes: 458 additions & 347 deletions docs/utilities/batch.md

Large diffs are not rendered by default.

489 changes: 489 additions & 0 deletions docs/utilities/sqs_batch.md

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions docs/utilities/sqs_large_message_handling.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
---
title: SQS Large Message Handling
title: SQS Large Message Handling (Deprecated)
description: Utility
---

!!! warning
This module is now deprecated and will be removed in version 2.
See [Large Message Handling](large_messages.md) for the new module (`powertools-large-messages`) documentation.
This module is now deprecated and will be removed in version 2.
See [Large Message Handling](large_messages.md) and
[the migration guide](http://localhost:8000/lambda-java/utilities/large_messages/#migration-from-the-sqs-large-message-utility)
for the new module (`powertools-large-messages`) documentation

The large message handling utility handles SQS messages which have had their payloads
offloaded to S3 due to them being larger than the SQS maximum.
Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ Each example can be copied from its subdirectory and used independently of the r
* [powertools-examples-idempotency](powertools-examples-idempotency) - An idempotent HTTP API
* [powertools-examples-parameters](powertools-examples-parameters) - Uses the parameters module to provide runtime parameters to a function
* [powertools-examples-serialization](powertools-examples-serialization) - Uses the serialization module to serialize and deserialize API Gateway & SQS payloads
* [powertools-examples-sqs](powertools-examples-sqs) - Processes SQS batch requests
* [powertools-examples-sqs](powertools-examples-sqs) - Processes SQS batch requests (**Deprecated** - will be replaced by `powertools-examples-batch` in version 2 of this library)
* [powertools-examples-validation](powertools-examples-validation) - Uses the validation module to validate user requests received via API Gateway
* [powertools-examples-cloudformation](powertools-examples-cloudformation) - Deploys a Cloudformation custom resource
* [powertools-examples-batch](powertools-examples-batch) - Examples for each of the different batch processing deployments

## Working with AWS Serverless Application Model (SAM) Examples
Many of the examples use [AWS Serverless Application Model](https://aws.amazon.com/serverless/sam/) (SAM). To get started
Expand Down
1 change: 1 addition & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<module>powertools-examples-parameters</module>
<module>powertools-examples-serialization</module>
<module>powertools-examples-sqs</module>
<module>powertools-examples-batch</module>
<module>powertools-examples-validation</module>
<module>powertools-examples-cloudformation</module>
</modules>
Expand Down
35 changes: 35 additions & 0 deletions examples/powertools-examples-batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Powertools for AWS Lambda (Java) - Batch Example

This project contains examples of Lambda function using the batch processing module of Powertools for AWS Lambda (Java).
For more information on this module, please refer to the
[documentation](https://docs.powertools.aws.dev/lambda-java/utilities/batch/).

Three different examples and SAM deployments are included, covering each of the batch sources:

* [SQS](src/main/java/org/demo/batch/sqs) - SQS batch processing
* [Kinesis Streams](src/main/java/org/demo/batch/kinesis) - Kinesis Streams batch processing
* [DynamoDB Streams](src/main/java/org/demo/batch/dynamo) - DynamoDB Streams batch processing

## Deploy the sample application

This sample is based on Serverless Application Model (SAM). To deploy it, check out the instructions for getting
started with SAM in [the examples directory](../README.md)

This sample contains three different deployments, depending on which batch processor you'd like to use, you can
change to the subdirectory containing the example SAM template, and deploy. For instance, for the SQS batch
deployment:

```bash
cd deploy/sqs
sam build
sam deploy --guided
```

## Test the application

Each of the examples uses a Lambda scheduled every 5 minutes to push a batch, and a separate lambda to read it. To
see this in action, we can simply tail the logs of our stack:

```bash
sam logs --tail $STACK_NAME
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
DynamoDB Streams batch processing demo
Globals:
Function:
Timeout: 20
Runtime: java11
MemorySize: 512
Tracing: Active
Architectures:
- x86_64
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: INFO
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
POWERTOOLS_LOGGER_LOG_EVENT: true

Resources:
DynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
StreamSpecification:
StreamViewType: NEW_IMAGE


DemoDynamoDBWriter:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.dynamo.DynamoDBWriter::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: ddbstreams-demo
TABLE_NAME: !Ref DynamoDBTable
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref DynamoDBTable
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'rate(1 minute)'
Name: !Join [ "-", [ "ddb-writer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Write records to DynamoDB via a Lambda function
Enabled: true

DemoDynamoDBStreamsConsumerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.dynamo.DynamoDBStreamBatchHandler::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: ddbstreams-batch-demo
Policies: AWSLambdaDynamoDBExecutionRole
Events:
Stream:
Type: DynamoDB
Properties:
Stream: !GetAtt DynamoDBTable.StreamArn
BatchSize: 100
StartingPosition: TRIM_HORIZON

83 changes: 83 additions & 0 deletions examples/powertools-examples-batch/deploy/kinesis/template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
Kinesis batch processing demo
Globals:
Function:
Timeout: 20
Runtime: java11
MemorySize: 512
Tracing: Active
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: INFO
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
POWERTOOLS_LOGGER_LOG_EVENT: true

Resources:

DemoKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1

StreamConsumer:
Type: "AWS::Kinesis::StreamConsumer"
Properties:
StreamARN: !GetAtt DemoKinesisStream.Arn
ConsumerName: KinesisBatchHandlerConsumer

DemoKinesisSenderFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.kinesis.KinesisBatchSender::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: kinesis-batch-demo
STREAM_NAME: !Ref DemoKinesisStream
Policies:
- Statement:
- Sid: WriteToKinesis
Effect: Allow
Action:
- kinesis:PutRecords
- kinesis:DescribeStream
Resource: !GetAtt DemoKinesisStream.Arn
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'rate(5 minutes)'
Name: !Join [ "-", [ "message-producer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Produce message to Kinesis via a Lambda function
Enabled: true

DemoKinesisConsumerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.kinesis.KinesisBatchHandler::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: kinesis-demo
Events:
Kinesis:
Type: Kinesis
Properties:
Stream: !GetAtt StreamConsumer.ConsumerARN
StartingPosition: LATEST
BatchSize: 2

Outputs:
DemoKinesisQueue:
Description: "ARN for Kinesis Stream"
Value: !GetAtt DemoKinesisStream.Arn
DemoKinesisSenderFunction:
Description: "Kinesis Batch Sender - Lambda Function ARN"
Value: !GetAtt DemoKinesisSenderFunction.Arn
DemoSQSConsumerFunction:
Description: "SQS Batch Handler - Lambda Function ARN"
Value: !GetAtt DemoKinesisConsumerFunction.Arn

147 changes: 147 additions & 0 deletions examples/powertools-examples-batch/deploy/sqs/template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sqs batch processing demo
Globals:
Function:
Timeout: 20
Runtime: java11
MemorySize: 512
Tracing: Active
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: INFO
POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0
POWERTOOLS_LOGGER_LOG_EVENT: true

Resources:
CustomerKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypted queues
Enabled: true
KeyPolicy:
Version: '2012-10-17'
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: Allow use of the key
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: '*'

CustomerKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: alias/powertools-batch-sqs-demo
TargetKeyId: !Ref CustomerKey

DemoDlqSqsQueue:
Type: AWS::SQS::Queue
Properties:
KmsMasterKeyId: !Ref CustomerKey

DemoSqsQueue:
Type: AWS::SQS::Queue
Properties:
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- "DemoDlqSqsQueue"
- "Arn"
maxReceiveCount: 2
KmsMasterKeyId: !Ref CustomerKey

DemoSQSSenderFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.sqs.SqsBatchSender::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: sqs-batch-demo
QUEUE_URL: !Ref DemoSqsQueue
Policies:
- Statement:
- Sid: SQSSendMessageBatch
Effect: Allow
Action:
- sqs:SendMessageBatch
- sqs:SendMessage
Resource: !GetAtt DemoSqsQueue.Arn
- Sid: SQSKMSKey
Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource: !GetAtt CustomerKey.Arn
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'rate(5 minutes)'
Name: !Join [ "-", [ "message-producer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Produce message to SQS via a Lambda function
Enabled: true

DemoSQSConsumerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../..
Handler: org.demo.batch.sqs.SqsBatchHandler::handleRequest
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: sqs-demo
Policies:
- Statement:
- Sid: SQSDeleteGetAttribute
Effect: Allow
Action:
- sqs:DeleteMessageBatch
- sqs:GetQueueAttributes
Resource: !GetAtt DemoSqsQueue.Arn
- Sid: SQSSendMessageBatch
Effect: Allow
Action:
- sqs:SendMessageBatch
- sqs:SendMessage
Resource: !GetAtt DemoDlqSqsQueue.Arn
- Sid: SQSKMSKey
Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource: !GetAtt CustomerKey.Arn
Events:
MySQSEvent:
Type: SQS
Properties:
Queue: !GetAtt DemoSqsQueue.Arn
BatchSize: 2
MaximumBatchingWindowInSeconds: 300

Outputs:
DemoSqsQueue:
Description: "ARN for main SQS queue"
Value: !GetAtt DemoSqsQueue.Arn
DemoDlqSqsQueue:
Description: "ARN for DLQ"
Value: !GetAtt DemoDlqSqsQueue.Arn
DemoSQSSenderFunction:
Description: "SQS Batch Sender - Lambda Function ARN"
Value: !GetAtt DemoSQSSenderFunction.Arn
DemoSQSConsumerFunction:
Description: "SQS Batch Handler - Lambda Function ARN"
Value: !GetAtt DemoSQSConsumerFunction.Arn
DemoSQSConsumerFunctionRole:
Description: "Implicit IAM Role created for SQS Lambda Function ARN"
Value: !GetAtt DemoSQSConsumerFunctionRole.Arn
Loading

0 comments on commit d0c7f91

Please sign in to comment.