diff --git a/.github/workflows/build-docs.yml b/.github/workflows/build-docs.yml index a4ab6e7de..a75c13b52 100644 --- a/.github/workflows/build-docs.yml +++ b/.github/workflows/build-docs.yml @@ -3,7 +3,6 @@ name: Build Docs on: pull_request: branches: - - main - v2 paths: - 'docs/**' diff --git a/.github/workflows/pr_artifacts_size.yml b/.github/workflows/pr_artifacts_size.yml index e7e3158e3..c4d29205b 100644 --- a/.github/workflows/pr_artifacts_size.yml +++ b/.github/workflows/pr_artifacts_size.yml @@ -3,13 +3,11 @@ name: Artifacts Size on: pull_request: branches: - - main - v2 paths: - 'powertools-batch/**' - 'powertools-cloudformation/**' - - 'powertools-core/**' # not in v2 - - 'powertools-common/**' # v2 only + - 'powertools-common/**' - 'powertools-e2e-tests/**' - 'powertools-idempotency-core/**' - 'powertools-idempotency-dynamodb/**' @@ -18,8 +16,6 @@ on: - 'powertools-metrics/**' - 'powertools-parameters/**' - 'powertools-serialization/**' - - 'powertools-sqs/**' # not in v2 - - 'powertools-test-suite/**' # not in v2 - 'powertools-tracing/**' - 'powertools-validation/**' - 'pom.xml' diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index 1ff2dc48b..2079bcb6b 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -3,11 +3,11 @@ name: Build on: pull_request: branches: - - main + - v2 paths: - 'powertools-batch/**' - 'powertools-cloudformation/**' - - 'powertools-core/**' + - 'powertools-common/**' - 'powertools-e2e-tests/**' - 'powertools-idempotency/**' - 'powertools-large-messages/**' @@ -15,8 +15,6 @@ on: - 'powertools-metrics/**' - 'powertools-parameters/**' - 'powertools-serialization/**' - - 'powertools-sqs/**' - - 'powertools-test-suite/**' - 'powertools-tracing/**' - 'powertools-validation/**' - 'examples/**' @@ -25,11 +23,11 @@ on: - '.github/workflows/**' push: branches: - - main + - v2 paths: - 'powertools-batch/**' - 'powertools-cloudformation/**' - - 'powertools-core/**' + - 'powertools-common/**' - 'powertools-e2e-tests/**' - 'powertools-idempotency/**' - 'powertools-large-messages/**' @@ -37,8 +35,6 @@ on: - 'powertools-metrics/**' - 'powertools-parameters/**' - 'powertools-serialization/**' - - 'powertools-sqs/**' - - 'powertools-test-suite/**' - 'powertools-tracing/**' - 'powertools-validation/**' - 'examples/**' @@ -70,12 +66,10 @@ jobs: - name: Build with Maven run: mvn -B install --file pom.xml - name: Build Gradle Example - Java - if: ${{ matrix.java == '8' }} # Gradle example can only be built on Java 8 - working-directory: examples/powertools-examples-core/gradle + working-directory: examples/powertools-examples-core-utilities/gradle run: ./gradlew build - name: Build Gradle Example - Kotlin - if: ${{ matrix.java == '8' }} # Gradle example can only be built on Java 8 - working-directory: examples/powertools-examples-core/kotlin + working-directory: examples/powertools-examples-core-utilities/kotlin run: ./gradlew build - name: Upload coverage to Codecov uses: codecov/codecov-action@d9f34f8cd5cb3b3eb79b3e4b5dae3a16df499a70 # v3.1.1 diff --git a/.github/workflows/pr_build_v2.yml b/.github/workflows/pr_build_v2.yml deleted file mode 100644 index 2079bcb6b..000000000 --- a/.github/workflows/pr_build_v2.yml +++ /dev/null @@ -1,93 +0,0 @@ -name: Build - -on: - pull_request: - branches: - - v2 - paths: - - 'powertools-batch/**' - - 'powertools-cloudformation/**' - - 'powertools-common/**' - - 'powertools-e2e-tests/**' - - 'powertools-idempotency/**' - - 'powertools-large-messages/**' - - 'powertools-logging/**' - - 'powertools-metrics/**' - - 'powertools-parameters/**' - - 'powertools-serialization/**' - - 'powertools-tracing/**' - - 'powertools-validation/**' - - 'examples/**' - - 'pom.xml' - - 'examples/pom.xml' - - '.github/workflows/**' - push: - branches: - - v2 - paths: - - 'powertools-batch/**' - - 'powertools-cloudformation/**' - - 'powertools-common/**' - - 'powertools-e2e-tests/**' - - 'powertools-idempotency/**' - - 'powertools-large-messages/**' - - 'powertools-logging/**' - - 'powertools-metrics/**' - - 'powertools-parameters/**' - - 'powertools-serialization/**' - - 'powertools-tracing/**' - - 'powertools-validation/**' - - 'examples/**' - - 'pom.xml' - - 'examples/pom.xml' - - '.github/workflows/**' -jobs: - build-corretto: - runs-on: ubuntu-latest - strategy: - max-parallel: 5 - matrix: - java: [11, 17, 21] - name: Java ${{ matrix.java }} - env: - JAVA: ${{ matrix.java }} - AWS_REGION: eu-west-1 - permissions: - id-token: write # needed to interact with GitHub's OIDC Token endpoint. - contents: read - steps: - - uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 - - name: Setup java - uses: actions/setup-java@5ffc13f4174014e2d4d4572b3d74c3fa61aeb2c2 # v3.11.0 - with: - distribution: 'corretto' - java-version: ${{ matrix.java }} - cache: 'maven' - - name: Build with Maven - run: mvn -B install --file pom.xml - - name: Build Gradle Example - Java - working-directory: examples/powertools-examples-core-utilities/gradle - run: ./gradlew build - - name: Build Gradle Example - Kotlin - working-directory: examples/powertools-examples-core-utilities/kotlin - run: ./gradlew build - - name: Upload coverage to Codecov - uses: codecov/codecov-action@d9f34f8cd5cb3b3eb79b3e4b5dae3a16df499a70 # v3.1.1 - if: ${{ matrix.java == '11' }} # publish results once - with: - files: ./powertools-cloudformation/target/site/jacoco/jacoco.xml,./powertools-core/target/site/jacoco/jacoco.xml,./powertools-idempotency/target/site/jacoco/jacoco.xml,./powertools-logging/target/site/jacoco/jacoco.xml,./powertools-metrics/target/site/jacoco/jacoco.xml,./powertools-parameters/target/site/jacoco/jacoco.xml,./powertools-serialization/target/site/jacoco/jacoco.xml,./powertools-sqs/target/site/jacoco/jacoco.xml,./powertools-tracing/target/site/jacoco/jacoco.xml,./powertools-validation/target/site/jacoco/jacoco.xml,./powertools-large-messages/target/site/jacoco/jacoco.xml,./powertools-batch/target/site/jacoco/jacoco.xml - savepr: - runs-on: ubuntu-latest - name: Save PR number if running on PR by dependabot - if: github.actor == 'dependabot[bot]' - steps: - - name: Create Directory and save issue - run: | - mkdir -p ./pr - echo ${{ github.event.number }} - echo ${{ github.event.number }} > ./pr/NR - - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 - name: Upload artifact - with: - name: pr - path: pr/ diff --git a/.github/workflows/pr_iac_lint.yml b/.github/workflows/pr_iac_lint.yml index 09ba5f02b..531ccbbcb 100644 --- a/.github/workflows/pr_iac_lint.yml +++ b/.github/workflows/pr_iac_lint.yml @@ -3,11 +3,9 @@ name: Validate IaC on: push: branches: - - main - v2 pull_request: branches: - - main - v2 paths: - 'examples/**' diff --git a/.github/workflows/run-e2e-tests-v2.yml b/.github/workflows/run-e2e-tests-v2.yml deleted file mode 100644 index 255c89cfe..000000000 --- a/.github/workflows/run-e2e-tests-v2.yml +++ /dev/null @@ -1,58 +0,0 @@ -name: Run end-to-end tests - -on: - workflow_dispatch: - - push: - branches: - - v2 - paths: # add other modules when there are under e2e tests - - 'powertools-e2e-tests/**' - - 'powertools-batch/**' - - 'powertools-core/**' - - 'powertools-common/**' - - 'powertools-idempotency/**' - - 'powertools-large-message/**' - - 'powertools-logging/**' - - 'powertools-metrics/**' - - 'powertools-parameters/**' - - 'powertools-serialization/**' - - 'powertools-tracing/**' - - 'pom.xml' - - '.github/workflows/**' - - pull_request: - branches: - - v2 - paths: - - 'powertools-e2e-tests/**' - -jobs: - e2e: - runs-on: ubuntu-latest - strategy: - max-parallel: 4 - matrix: - java: [ 11, 17, 21 ] - name: End-to-end tests java${{ matrix.java }} - env: - AWS_DEFAULT_REGION: eu-west-1 - JAVA_VERSION: ${{ matrix.java }} - permissions: - id-token: write # needed to interact with GitHub's OIDC Token endpoint. - contents: read - steps: - - uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 - - name: Setup java - uses: actions/setup-java@5ffc13f4174014e2d4d4572b3d74c3fa61aeb2c2 # v3.11.0 - with: - distribution: 'corretto' - java-version: ${{ matrix.java }} - cache: maven - - name: Setup AWS credentials - uses: aws-actions/configure-aws-credentials@5fd3084fc36e372ff1fff382a39b10d03659f355 # v2.2.0 - with: - role-to-assume: ${{ secrets.AWS_ROLE_ARN_TO_ASSUME }} - aws-region: ${{ env.AWS_DEFAULT_REGION }} - - name: Run e2e test with Maven - run: mvn -DskipTests install --file pom.xml && mvn -Pe2e -B verify --file powertools-e2e-tests/pom.xml \ No newline at end of file diff --git a/.github/workflows/run-e2e-tests.yml b/.github/workflows/run-e2e-tests.yml index 86d66156b..255c89cfe 100644 --- a/.github/workflows/run-e2e-tests.yml +++ b/.github/workflows/run-e2e-tests.yml @@ -5,7 +5,7 @@ on: push: branches: - - main + - v2 paths: # add other modules when there are under e2e tests - 'powertools-e2e-tests/**' - 'powertools-batch/**' @@ -23,7 +23,7 @@ on: pull_request: branches: - - main + - v2 paths: - 'powertools-e2e-tests/**' @@ -54,7 +54,5 @@ jobs: with: role-to-assume: ${{ secrets.AWS_ROLE_ARN_TO_ASSUME }} aws-region: ${{ env.AWS_DEFAULT_REGION }} - - name: Build with Maven - run: mvn -DskipTests install --file pom.xml - name: Run e2e test with Maven - run: mvn -Pe2e -B verify --file powertools-e2e-tests/pom.xml + run: mvn -DskipTests install --file pom.xml && mvn -Pe2e -B verify --file powertools-e2e-tests/pom.xml \ No newline at end of file diff --git a/.github/workflows/spotbugs.yml b/.github/workflows/spotbugs.yml index a75cba8f1..106905a70 100644 --- a/.github/workflows/spotbugs.yml +++ b/.github/workflows/spotbugs.yml @@ -3,7 +3,6 @@ name: SpotBugs on: pull_request: branches: - - main - v2 paths: - 'powertools-cloudformation/**' diff --git a/docs/core/tracing.md b/docs/core/tracing.md index 4cb06fc29..7ad896462 100644 --- a/docs/core/tracing.md +++ b/docs/core/tracing.md @@ -308,8 +308,24 @@ under a subsegment, or you are doing multithreaded programming. Refer examples b ## Instrumenting SDK clients and HTTP calls -User should make sure to instrument the SDK clients explicitly based on the function dependency. Refer details on -[how to instrument SDK client with Xray](https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-java-awssdkclients.html) and [outgoing http calls](https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-java-httpclients.html). +Powertools for Lambda (Java) cannot intercept SDK clients instantiation to add X-Ray instrumentation. You should make sure to instrument the SDK clients explicitly. Refer details on +[how to instrument SDK client with Xray](https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-java.html#xray-sdk-java-awssdkclients) +and [outgoing http calls](https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-java.html#xray-sdk-java-httpclients). For example: + +=== "LambdaHandler.java" + + ```java hl_lines="1 2 7" + import com.amazonaws.xray.AWSXRay; + import com.amazonaws.xray.handlers.TracingHandler; + + public class LambdaHandler { + private AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + .withRegion(Regions.fromName(System.getenv("AWS_REGION"))) + .withRequestHandlers(new TracingHandler(AWSXRay.getGlobalRecorder())) + .build(); + // ... + } + ``` ## Testing your code diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 7b571bc6e..7693ac98f 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -30,6 +30,7 @@ stateDiagram-v2 * Reports batch item failures to reduce number of retries for a record upon errors * Simple interface to process each batch record +* Parallel processing of batches * Integrates with Java Events library and the deserialization module * Build your own batch processor by extending primitives @@ -110,16 +111,9 @@ You can use your preferred deployment framework to set the correct configuration while the `powertools-batch` module handles generating the response, which simply needs to be returned as the result of your Lambda handler. -A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found -[here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering -all of the batch sources. - -For more information on configuring `ReportBatchItemFailures`, -see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), -[Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting),and -[DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). - +A complete [Serverless Application Model](https://aws.amazon.com/serverless/sam/) example can be found [here](https://github.com/aws-powertools/powertools-lambda-java/tree/main/examples/powertools-examples-batch) covering all the batch sources. +For more information on configuring `ReportBatchItemFailures`, see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting), [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting), and [DynamoDB Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). !!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires." @@ -150,12 +144,10 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { return handler.processBatch(sqsEvent, context); } - - + private void processMessage(Product p, Context c) { // Process the product } - } ``` @@ -276,7 +268,6 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. private void processMessage(Product p, Context c) { // process the product } - } ``` @@ -475,6 +466,51 @@ see the details for [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs. } ``` +## Parallel processing +You can choose to process batch items in parallel using the `BatchMessageHandler#processBatchInParallel()` +instead of `BatchMessageHandler#processBatch()`. Partial batch failure works the same way but items are processed +in parallel rather than sequentially. + +This feature is available for SQS, Kinesis and DynamoDB Streams but cannot be +used with SQS FIFO. In that case, an `UnsupportedOperationException` is thrown. + +!!! warning + Note that parallel processing is not always better than sequential processing, + and you should benchmark your code to determine the best approach for your use case. + +!!! info + To get more threads available (more vCPUs), you need to increase the amount of memory allocated to your Lambda function. + While it is possible to increase the number of threads using Java options or custom thread pools, + in most cases the defaults work well, and changing them is more likely to decrease performance + (see [here](https://www.baeldung.com/java-when-to-use-parallel-stream#fork-join-framework) + and [here](https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool)). + In situations where this may be useful - such as performing IO-bound work in parallel - make sure to measure before and after! + + +=== "Example with SQS" + + ```java hl_lines="13" + public class SqsBatchHandler implements RequestHandler { + + private final BatchMessageHandler handler; + + public SqsBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithMessageHandler(this::processMessage, Product.class); + } + + @Override + public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + return handler.processBatchInParallel(sqsEvent, context); + } + + private void processMessage(Product p, Context c) { + // Process the product + } + } + ``` + ## Handling Messages @@ -490,7 +526,7 @@ In general, the deserialized message handler should be used unless you need acce === "Raw Message Handler" - ```java + ```java hl_lines="4 7" public void setup() { BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -505,7 +541,7 @@ In general, the deserialized message handler should be used unless you need acce === "Deserialized Message Handler" - ```java + ```java hl_lines="4 7" public void setup() { BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -529,20 +565,20 @@ provide a custom failure handler. Handlers can be provided when building the batch processor and are available for all event sources. For instance for DynamoDB: -```java - BatchMessageHandler handler = new BatchMessageHandlerBuilder() - .withDynamoDbBatchHandler() - .withSuccessHandler((m) -> { - // Success handler receives the raw message - LOGGER.info("Message with sequenceNumber {} was successfully processed", - m.getDynamodb().getSequenceNumber()); - }) - .withFailureHandler((m, e) -> { - // Failure handler receives the raw message and the exception thrown. - LOGGER.info("Message with sequenceNumber {} failed to be processed: {}" - , e.getDynamodb().getSequenceNumber(), e); - }) - .buildWithMessageHander(this::processMessage); +```java hl_lines="3 8" +BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .withSuccessHandler((m) -> { + // Success handler receives the raw message + LOGGER.info("Message with sequenceNumber {} was successfully processed", + m.getDynamodb().getSequenceNumber()); + }) + .withFailureHandler((m, e) -> { + // Failure handler receives the raw message and the exception thrown. + LOGGER.info("Message with sequenceNumber {} failed to be processed: {}" + , e.getDynamodb().getSequenceNumber(), e); + }) + .buildWithMessageHander(this::processMessage); ``` !!! info diff --git a/examples/README.md b/examples/README.md index 2e34513db..41640b5ad 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,7 +14,6 @@ Each example can be copied from its subdirectory and used independently of the r * [powertools-examples-idempotency](powertools-examples-idempotency) - An idempotent HTTP API * [powertools-examples-parameters](powertools-examples-parameters) - Uses the parameters module to provide runtime parameters to a function * [powertools-examples-serialization](powertools-examples-serialization) - Uses the serialization module to serialize and deserialize API Gateway & SQS payloads -* [powertools-examples-sqs](powertools-examples-sqs) - Processes SQS batch requests (**Deprecated** - will be replaced by `powertools-examples-batch` in version 2 of this library) * [powertools-examples-validation](powertools-examples-validation) - Uses the validation module to validate user requests received via API Gateway * [powertools-examples-cloudformation](powertools-examples-cloudformation) - Deploys a Cloudformation custom resource * [powertools-examples-batch](powertools-examples-batch) - Examples for each of the different batch processing deployments diff --git a/examples/powertools-examples-batch/deploy/sqs/template.yml b/examples/powertools-examples-batch/deploy/sqs/template.yml index 764ba4863..2f1d6c363 100644 --- a/examples/powertools-examples-batch/deploy/sqs/template.yml +++ b/examples/powertools-examples-batch/deploy/sqs/template.yml @@ -7,12 +7,10 @@ Globals: Function: Timeout: 20 Runtime: java11 - MemorySize: 512 - Tracing: Active + MemorySize: 5400 Environment: Variables: POWERTOOLS_LOG_LEVEL: INFO - POWERTOOLS_LOGGER_SAMPLE_RATE: 1.0 POWERTOOLS_LOGGER_LOG_EVENT: true Resources: @@ -45,6 +43,9 @@ Resources: AliasName: alias/powertools-batch-sqs-demo TargetKeyId: !Ref CustomerKey + Bucket: + Type: AWS::S3::Bucket + DemoDlqSqsQueue: Type: AWS::SQS::Queue Properties: @@ -96,11 +97,57 @@ Resources: DemoSQSConsumerFunction: Type: AWS::Serverless::Function Properties: + Tracing: Active CodeUri: ../.. Handler: org.demo.batch.sqs.SqsBatchHandler::handleRequest Environment: Variables: POWERTOOLS_SERVICE_NAME: sqs-demo + BUCKET: !Ref Bucket + Policies: + - Statement: + - Sid: SQSDeleteGetAttribute + Effect: Allow + Action: + - sqs:DeleteMessageBatch + - sqs:GetQueueAttributes + Resource: !GetAtt DemoSqsQueue.Arn + - Sid: SQSSendMessageBatch + Effect: Allow + Action: + - sqs:SendMessageBatch + - sqs:SendMessage + Resource: !GetAtt DemoDlqSqsQueue.Arn + - Sid: SQSKMSKey + Effect: Allow + Action: + - kms:GenerateDataKey + - kms:Decrypt + Resource: !GetAtt CustomerKey.Arn + - Sid: WriteToS3 + Effect: Allow + Action: + - s3:PutObject + Resource: !Sub ${Bucket.Arn}/* + +# Events: +# MySQSEvent: +# Type: SQS +# Properties: +# Queue: !GetAtt DemoSqsQueue.Arn +# BatchSize: 100 +# MaximumBatchingWindowInSeconds: 60 + + DemoSQSParallelConsumerFunction: + Type: AWS::Serverless::Function + Properties: + Tracing: Active + CodeUri: ../.. + Handler: org.demo.batch.sqs.SqsParallelBatchHandler::handleRequest + Environment: + Variables: + POWERTOOLS_SERVICE_NAME: sqs-demo + BUCKET: !Ref Bucket Policies: - Statement: - Sid: SQSDeleteGetAttribute @@ -121,13 +168,19 @@ Resources: - kms:GenerateDataKey - kms:Decrypt Resource: !GetAtt CustomerKey.Arn + - Sid: WriteToS3 + Effect: Allow + Action: + - s3:PutObject + Resource: !Sub ${Bucket.Arn}/* + Events: MySQSEvent: Type: SQS Properties: Queue: !GetAtt DemoSqsQueue.Arn - BatchSize: 2 - MaximumBatchingWindowInSeconds: 300 + BatchSize: 100 + MaximumBatchingWindowInSeconds: 60 Outputs: DemoSqsQueue: diff --git a/examples/powertools-examples-batch/pom.xml b/examples/powertools-examples-batch/pom.xml index 207040476..6eb548937 100644 --- a/examples/powertools-examples-batch/pom.xml +++ b/examples/powertools-examples-batch/pom.xml @@ -42,6 +42,17 @@ software.amazon.awssdk sdk-core ${sdk.version} + + + org.slf4j + slf4j-api + + + + + software.amazon.awssdk + s3 + ${sdk.version} software.amazon.awssdk diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java new file mode 100644 index 000000000..25dba47bb --- /dev/null +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/AbstractSqsBatchHandler.java @@ -0,0 +1,70 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.demo.batch.sqs; + +import com.amazonaws.services.lambda.runtime.Context; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import org.demo.batch.model.Product; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.lambda.powertools.logging.Logging; +import software.amazon.lambda.powertools.tracing.Tracing; +import software.amazon.lambda.powertools.tracing.TracingUtils; + +public class AbstractSqsBatchHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSqsBatchHandler.class); + private final ObjectMapper mapper = new ObjectMapper(); + private final String bucket = System.getenv("BUCKET"); + private final S3Client s3 = S3Client.builder().httpClient(UrlConnectionHttpClient.create()).build(); + private final Random r = new Random(); + + /** + * Simulate some processing (I/O + S3 put request) + * @param p deserialized product + * @param context Lambda context + */ + @Logging + @Tracing + protected void processMessage(Product p, Context context) { + TracingUtils.putAnnotation("productId", p.getId()); + TracingUtils.putAnnotation("Thread", Thread.currentThread().getName()); + MDC.put("product", String.valueOf(p.getId())); + LOGGER.info("Processing product {}", p); + + char c = (char)(r.nextInt(26) + 'a'); + char[] chars = new char[1024 * 1000]; + Arrays.fill(chars, c); + p.setName(new String(chars)); + try { + File file = new File("/tmp/"+p.getId()+".json"); + mapper.writeValue(file, p); + s3.putObject( + PutObjectRequest.builder().bucket(bucket).key(p.getId()+".json").build(), RequestBody.fromFile(file)); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + MDC.remove("product"); + } + } +} diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java index 27689485c..bc0f57cb8 100644 --- a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchHandler.java @@ -4,13 +4,15 @@ import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.demo.batch.model.Product; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.demo.batch.model.Product; import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; +import software.amazon.lambda.powertools.logging.Logging; +import software.amazon.lambda.powertools.tracing.Tracing; -public class SqsBatchHandler implements RequestHandler { +public class SqsBatchHandler extends AbstractSqsBatchHandler implements RequestHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchHandler.class); private final BatchMessageHandler handler; @@ -20,14 +22,11 @@ public SqsBatchHandler() { .buildWithMessageHandler(this::processMessage, Product.class); } + @Logging + @Tracing @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size()); return handler.processBatch(sqsEvent, context); } - - - private void processMessage(Product p, Context c) { - LOGGER.info("Processing product " + p); - } - } diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java index 4050ab98b..58b24d735 100644 --- a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsBatchSender.java @@ -10,14 +10,13 @@ import java.security.SecureRandom; import java.util.List; import java.util.stream.IntStream; +import org.demo.batch.model.Product; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.demo.batch.model.Product; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; -import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; /** @@ -45,16 +44,12 @@ public SqsBatchSender() { public String handleRequest(ScheduledEvent scheduledEvent, Context context) { String queueUrl = System.getenv("QUEUE_URL"); - LOGGER.info("handleRequest"); - - // Push 5 messages on each invoke. - List batchRequestEntries = IntStream.range(0, 5) + List batchRequestEntries = IntStream.range(0, 50) .mapToObj(value -> { - long id = random.nextLong(); - float price = random.nextFloat(); + long id = Math.abs(random.nextLong()); + float price = Math.abs(random.nextFloat() * 3465); Product product = new Product(id, "product-" + id, price); try { - return SendMessageBatchRequestEntry.builder() .id(scheduledEvent.getId() + value) .messageBody(objectMapper.writeValueAsString(product)) @@ -65,12 +60,12 @@ public String handleRequest(ScheduledEvent scheduledEvent, Context context) { } }).collect(toList()); - SendMessageBatchResponse sendMessageBatchResponse = sqsClient.sendMessageBatch(SendMessageBatchRequest.builder() - .queueUrl(queueUrl) - .entries(batchRequestEntries) - .build()); - - LOGGER.info("Sent Message {}", sendMessageBatchResponse); + for (int i = 0; i < 50; i += 10) { + sqsClient.sendMessageBatch(SendMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(batchRequestEntries.subList(i, i + 10)) + .build()); + } return "Success"; } diff --git a/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java new file mode 100644 index 000000000..0151c0a32 --- /dev/null +++ b/examples/powertools-examples-batch/src/main/java/org/demo/batch/sqs/SqsParallelBatchHandler.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.demo.batch.sqs; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.demo.batch.model.Product; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder; +import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; +import software.amazon.lambda.powertools.logging.Logging; +import software.amazon.lambda.powertools.tracing.Tracing; + +public class SqsParallelBatchHandler extends AbstractSqsBatchHandler implements RequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsParallelBatchHandler.class); + private final BatchMessageHandler handler; + + public SqsParallelBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithMessageHandler(this::processMessage, Product.class); + } + + @Logging + @Tracing + @Override + public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { + LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size()); + MDC.put("requestId", context.getAwsRequestId()); // should be propagated to other threads + return handler.processBatchInParallel(sqsEvent, context); + } +} diff --git a/examples/powertools-examples-batch/src/main/resources/LogLayout.json b/examples/powertools-examples-batch/src/main/resources/LogLayout.json new file mode 100644 index 000000000..60f102e09 --- /dev/null +++ b/examples/powertools-examples-batch/src/main/resources/LogLayout.json @@ -0,0 +1,75 @@ +{ + "level": { + "$resolver": "level", + "field": "name" + }, + "message": { + "$resolver": "message" + }, + "error": { + "message": { + "$resolver": "exception", + "field": "message" + }, + "name": { + "$resolver": "exception", + "field": "className" + }, + "stack": { + "$resolver": "exception", + "field": "stackTrace", + "stackTrace": { + "stringified": true + } + } + }, + "cold_start": { + "$resolver": "powertools", + "field": "cold_start" + }, + "thread": { + "$resolver": "thread", + "field": "name" + }, + "function_arn": { + "$resolver": "powertools", + "field": "function_arn" + }, + "function_memory_size": { + "$resolver": "powertools", + "field": "function_memory_size" + }, + "function_name": { + "$resolver": "powertools", + "field": "function_name" + }, + "function_request_id": { + "$resolver": "powertools", + "field": "function_request_id" + }, + "function_version": { + "$resolver": "powertools", + "field": "function_version" + }, + "sampling_rate": { + "$resolver": "powertools", + "field": "sampling_rate" + }, + "service": { + "$resolver": "powertools", + "field": "service" + }, + "timestamp": { + "$resolver": "timestamp", + "pattern": { + "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" + } + }, + "xray_trace_id": { + "$resolver": "powertools", + "field": "xray_trace_id" + }, + "": { + "$resolver": "powertools" + } +} \ No newline at end of file diff --git a/examples/powertools-examples-batch/src/main/resources/log4j2.xml b/examples/powertools-examples-batch/src/main/resources/log4j2.xml index ea3ecf474..c48ca3ef4 100644 --- a/examples/powertools-examples-batch/src/main/resources/log4j2.xml +++ b/examples/powertools-examples-batch/src/main/resources/log4j2.xml @@ -2,7 +2,8 @@ - + + diff --git a/examples/powertools-examples-cloudformation/pom.xml b/examples/powertools-examples-cloudformation/pom.xml index 0aa8fab83..fa6a9136c 100644 --- a/examples/powertools-examples-cloudformation/pom.xml +++ b/examples/powertools-examples-cloudformation/pom.xml @@ -14,7 +14,7 @@ 11 1.2.3 3.11.3 - 2.25.35 + 2.26.28 1.9.20.1 diff --git a/pom.xml b/pom.xml index 140418095..bec482d8e 100644 --- a/pom.xml +++ b/pom.xml @@ -70,8 +70,8 @@ 2.20.0 2.23.1 2.0.7 - 2.17.1 - 2.25.35 + 2.17.2 + 2.26.28 2.16.0 2.2.0 UTF-8 @@ -287,7 +287,7 @@ org.apache.commons commons-lang3 - 3.13.0 + 3.15.0 test @@ -299,7 +299,7 @@ org.assertj assertj-core - 3.25.3 + 3.26.3 test diff --git a/powertools-batch/pom.xml b/powertools-batch/pom.xml index 665fa256a..819c19927 100644 --- a/powertools-batch/pom.xml +++ b/powertools-batch/pom.xml @@ -21,6 +21,16 @@ true + + org.apache.maven.plugins + maven-surefire-plugin + + + + -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 + + + @@ -47,6 +57,12 @@ junit-jupiter-api test + + org.slf4j + slf4j-simple + 2.0.7 + test + org.assertj assertj-core diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java index 730211feb..18d74bb25 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java @@ -33,6 +33,21 @@ public interface BatchMessageHandler { * @param context The lambda context * @return A partial batch response */ - public abstract R processBatch(E event, Context context); + R processBatch(E event, Context context); + /** + * Processes the given batch in parallel returning a partial batch + * response indicating the success and failure of individual + * messages within the batch.
+ * Note that parallel processing is not always better than sequential processing, + * and you should benchmark your code to determine the best approach for your use case.
+ * Also note that to get more threads available (more vCPUs), + * you need to increase the amount of memory allocated to your Lambda function.
+ + * + * @param event The Lambda event containing the batch to process + * @param context The lambda context + * @return A partial batch response + */ + R processBatchInParallel(E event, Context context); } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java index 83a8bf7dd..4b03d0947 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java @@ -17,12 +17,14 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; -import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; /** * A batch message processor for DynamoDB Streams batches. @@ -46,35 +48,60 @@ public DynamoDbBatchMessageHandler(Consumer @Override public StreamsEventResponse processBatch(DynamodbEvent event, Context context) { - List batchFailures = new ArrayList<>(); + List batchItemFailures = event.getRecords() + .stream() + .map(eventRecord -> processBatchItem(eventRecord, context)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); - for (DynamodbEvent.DynamodbStreamRecord record : event.getRecords()) { - try { + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } - rawMessageHandler.accept(record, context); - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(record); - } - } catch (Throwable t) { - String sequenceNumber = record.getDynamodb().getSequenceNumber(); - LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures", - sequenceNumber, t.getMessage()); - LOGGER.error("Error was", t); - batchFailures.add(new StreamsEventResponse.BatchItemFailure(sequenceNumber)); + @Override + public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context) { + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(eventRecord -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(eventRecord, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(record, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } + private Optional processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) { + try { + LOGGER.debug("Processing item {}", streamRecord.getEventID()); + + rawMessageHandler.accept(streamRecord, context); + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(streamRecord); + } + return Optional.empty(); + } catch (Throwable t) { + String sequenceNumber = streamRecord.getDynamodb().getSequenceNumber(); + LOGGER.error("Error while processing record with id {}: {}, adding it to batch item failures", + sequenceNumber, t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(streamRecord, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); } } + return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(sequenceNumber).build()); } - - return new StreamsEventResponse(batchFailures); } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java index ad1dd302d..7b4179de7 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java @@ -18,12 +18,14 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; -import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; import software.amazon.lambda.powertools.utilities.EventDeserializer; /** @@ -57,42 +59,67 @@ public KinesisStreamsBatchMessageHandler(BiConsumer batchFailures = new ArrayList<>(); - - for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { - try { - if (this.rawMessageHandler != null) { - rawMessageHandler.accept(record, context); - } else { - M messageDeserialized = EventDeserializer.extractDataFrom(record).as(messageClass); - messageHandler.accept(messageDeserialized, context); - } + List batchItemFailures = event.getRecords() + .stream() + .map(eventRecord -> processBatchItem(eventRecord, context)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(record); - } - } catch (Throwable t) { - String sequenceNumber = record.getEventID(); - LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures", - sequenceNumber, t.getMessage()); - LOGGER.error("Error was", t); - - batchFailures.add(new StreamsEventResponse.BatchItemFailure(record.getKinesis().getSequenceNumber())); - - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(record, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + @Override + public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context) { + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(eventRecord -> { + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(eventRecord, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + private Optional processBatchItem(KinesisEvent.KinesisEventRecord eventRecord, Context context) { + try { + LOGGER.debug("Processing item {}", eventRecord.getEventID()); + + if (this.rawMessageHandler != null) { + rawMessageHandler.accept(eventRecord, context); + } else { + M messageDeserialized = EventDeserializer.extractDataFrom(eventRecord).as(messageClass); + messageHandler.accept(messageDeserialized, context); + } + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(eventRecord); + } + return Optional.empty(); + } catch (Throwable t) { + String sequenceNumber = eventRecord.getEventID(); + LOGGER.error("Error while processing record with eventID {}: {}, adding it to batch item failures", + sequenceNumber, t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(eventRecord, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); } } - } - return new StreamsEventResponse(batchFailures); + return Optional.of(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build()); + } } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java index b634f9b62..2dfb0a28e 100644 --- a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java @@ -18,10 +18,15 @@ import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC; import software.amazon.lambda.powertools.utilities.EventDeserializer; /** @@ -61,57 +66,27 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { // If we are working on a FIFO queue, when any message fails we should stop processing and return the // rest of the batch as failed too. We use this variable to track when that has happened. // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - boolean failWholeBatch = false; + final AtomicBoolean failWholeBatch = new AtomicBoolean(false); int messageCursor = 0; - for (; messageCursor < event.getRecords().size() && !failWholeBatch; messageCursor++) { + for (; messageCursor < event.getRecords().size() && !failWholeBatch.get(); messageCursor++) { SQSEvent.SQSMessage message = event.getRecords().get(messageCursor); String messageGroupId = message.getAttributes() != null ? message.getAttributes().get(MESSAGE_GROUP_ID_KEY) : null; - try { - if (this.rawMessageHandler != null) { - rawMessageHandler.accept(message, context); - } else { - M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass); - messageHandler.accept(messageDeserialized, context); - } - - // Report success if we have a handler - if (this.successHandler != null) { - this.successHandler.accept(message); - } - - } catch (Throwable t) { - LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures", - message.getMessageId(), t.getMessage()); - LOGGER.error("Error was", t); - - response.getBatchItemFailures() - .add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId()) - .build()); + processBatchItem(message, context).ifPresent(batchItemFailure -> { + response.getBatchItemFailures().add(batchItemFailure); if (messageGroupId != null) { - failWholeBatch = true; + failWholeBatch.set(true); LOGGER.info( "A message in a batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too" , messageGroupId, message.getMessageId()); } - - // Report failure if we have a handler - if (this.failureHandler != null) { - // A failing failure handler is no reason to fail the batch - try { - this.failureHandler.accept(message, t); - } catch (Throwable t2) { - LOGGER.warn("failureHandler threw handling failure", t2); - } - } - - } + }); } - if (failWholeBatch) { + if (failWholeBatch.get()) { // Add the remaining messages to the batch item failures event.getRecords() .subList(messageCursor, event.getRecords().size()) @@ -121,4 +96,60 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) { } return response; } + + @Override + public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) { + if (!event.getRecords().isEmpty() && event.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null) { + throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead"); + } + + MultiThreadMDC multiThreadMDC = new MultiThreadMDC(); + List batchItemFailures = event.getRecords() + .parallelStream() // Parallel processing + .map(sqsMessage -> { + + multiThreadMDC.copyMDCToThread(Thread.currentThread().getName()); + return processBatchItem(sqsMessage, context); + }) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + return SQSBatchResponse.builder().withBatchItemFailures(batchItemFailures).build(); + } + + private Optional processBatchItem(SQSEvent.SQSMessage message, Context context) { + try { + LOGGER.debug("Processing message {}", message.getMessageId()); + + if (this.rawMessageHandler != null) { + rawMessageHandler.accept(message, context); + } else { + M messageDeserialized = EventDeserializer.extractDataFrom(message).as(messageClass); + messageHandler.accept(messageDeserialized, context); + } + + // Report success if we have a handler + if (this.successHandler != null) { + this.successHandler.accept(message); + } + return Optional.empty(); + } catch (Throwable t) { + LOGGER.error("Error while processing message with messageId {}: {}, adding it to batch item failures", + message.getMessageId(), t.getMessage()); + LOGGER.error("Error was", t); + + // Report failure if we have a handler + if (this.failureHandler != null) { + // A failing failure handler is no reason to fail the batch + try { + this.failureHandler.accept(message, t); + } catch (Throwable t2) { + LOGGER.warn("failureHandler threw handling failure", t2); + } + } + return Optional.of(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId()) + .build()); + } + } } diff --git a/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java new file mode 100644 index 000000000..df1c2e7a0 --- /dev/null +++ b/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.batch.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * MDC (SLF4J) is not passed to other threads (ThreadLocal). + * This class permits to manually copy the MDC to a given thread. + */ +public class MultiThreadMDC { + + private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadMDC.class); + + private final List mdcAwareThreads = new ArrayList<>(); + private final Map contextMap; + + public MultiThreadMDC() { + mdcAwareThreads.add("main"); + contextMap = MDC.getCopyOfContextMap(); + } + + public void copyMDCToThread(String thread) { + if (!mdcAwareThreads.contains(thread)) { + LOGGER.debug("Copy MDC to thread {}", thread); + MDC.setContextMap(contextMap); + mdcAwareThreads.add(thread); + } + } +} diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java index 9e2c211e2..6bb247323 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/DdbBatchProcessorTest.java @@ -20,16 +20,27 @@ import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; -public class DdbBatchProcessorTest { +class DdbBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + private void processMessageSucceeds(DynamodbEvent.DynamodbStreamRecord record, Context context) { // Great success } @@ -40,9 +51,36 @@ private void processMessageFailsForFixedMessage(DynamodbEvent.DynamodbStreamReco } } + private void processMessageInParallelSucceeds(DynamodbEvent.DynamodbStreamRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void processMessageInParallelFailsForFixedMessage(DynamodbEvent.DynamodbStreamRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (record.getDynamodb().getSequenceNumber().equals("4421584500000000017450439091")) { + throw new RuntimeException("fake exception"); + } + } + @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void batchProcessingSucceedsAndReturns(DynamodbEvent event) { + void batchProcessingSucceedsAndReturns(DynamodbEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withDynamoDbBatchHandler() @@ -52,12 +90,28 @@ public void batchProcessingSucceedsAndReturns(DynamodbEvent event) { StreamsEventResponse dynamodbBatchResponse = handler.processBatch(event, context); // Assert - assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).isEmpty(); + } + + @ParameterizedTest + @Event(value = "dynamo_event_big.json", type = DynamodbEvent.class) + void parallelBatchProcessingSucceedsAndReturns(DynamodbEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + StreamsEventResponse dynamodbBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(dynamodbBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); } @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withDynamoDbBatchHandler() @@ -72,9 +126,27 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEve assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); } + @ParameterizedTest + @Event(value = "dynamo_event_big.json", type = DynamodbEvent.class) + void parallelBatchProcessing_shouldAddMessageToBatchFailure_whenException_withMessage(DynamodbEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + StreamsEventResponse dynamodbBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); + StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { + void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -92,7 +164,7 @@ public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { // Assert assertThat(dynamodbBatchResponse).isNotNull(); - assertThat(dynamodbBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); @@ -100,7 +172,7 @@ public void failingFailureHandlerShouldntFailBatch(DynamodbEvent event) { @ParameterizedTest @Event(value = "dynamo_event.json", type = DynamodbEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -118,7 +190,7 @@ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(DynamodbE // Assert assertThat(dynamodbBatchResponse).isNotNull(); - assertThat(dynamodbBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(dynamodbBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = dynamodbBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("4421584500000000017450439091"); diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java index d78638e1d..059a4d2d0 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/KinesisBatchProcessorTest.java @@ -20,17 +20,28 @@ import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; import software.amazon.lambda.powertools.batch.model.Product; -public class KinesisBatchProcessorTest { +class KinesisBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + private void processMessageSucceeds(KinesisEvent.KinesisEventRecord record, Context context) { // Great success } @@ -42,6 +53,34 @@ private void processMessageFailsForFixedMessage(KinesisEvent.KinesisEventRecord } } + private void processMessageInParallelSucceeds(KinesisEvent.KinesisEventRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void processMessageInParallelFailsForFixedMessage(KinesisEvent.KinesisEventRecord record, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (record.getKinesis().getSequenceNumber() + .equals("49545115243490985018280067714973144582180062593244200961")) { + throw new RuntimeException("fake exception"); + } + } + // A handler that throws an exception for _one_ of the deserialized products in the same messages public void processMessageFailsForFixedProduct(Product product, Context context) { if (product.getId() == 1234) { @@ -51,7 +90,7 @@ public void processMessageFailsForFixedProduct(Product product, Context context) @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void batchProcessingSucceedsAndReturns(KinesisEvent event) { + void batchProcessingSucceedsAndReturns(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -61,12 +100,28 @@ public void batchProcessingSucceedsAndReturns(KinesisEvent event) { StreamsEventResponse kinesisBatchResponse = handler.processBatch(event, context); // Assert - assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(kinesisBatchResponse.getBatchItemFailures()).isEmpty(); + } + + @ParameterizedTest + @Event(value = "kinesis_event_big.json", type = KinesisEvent.class) + void batchProcessingInParallelSucceedsAndReturns(KinesisEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withKinesisBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + StreamsEventResponse kinesisBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(kinesisBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); } @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -82,9 +137,28 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEven "49545115243490985018280067714973144582180062593244200961"); } + @ParameterizedTest + @Event(value = "kinesis_event_big.json", type = KinesisEvent.class) + void batchProcessingInParallel_shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withKinesisBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + StreamsEventResponse kinesisBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); + StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( + "49545115243490985018280067714973144582180062593244200961"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEvent event) { + void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withKinesisBatchHandler() @@ -102,7 +176,7 @@ public void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEven @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { + void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { // Arrange AtomicBoolean wasCalled = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -118,7 +192,7 @@ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { // Assert assertThat(kinesisBatchResponse).isNotNull(); - assertThat(kinesisBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalled.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( @@ -127,7 +201,7 @@ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) { @ParameterizedTest @Event(value = "kinesis_event.json", type = KinesisEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -146,7 +220,7 @@ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEv // Assert assertThat(kinesisBatchResponse).isNotNull(); - assertThat(kinesisBatchResponse.getBatchItemFailures().size()).isEqualTo(1); + assertThat(kinesisBatchResponse.getBatchItemFailures()).hasSize(1); assertThat(wasCalledAndFailed.get()).isTrue(); StreamsEventResponse.BatchItemFailure batchItemFailure = kinesisBatchResponse.getBatchItemFailures().get(0); assertThat(batchItemFailure.getItemIdentifier()).isEqualTo( diff --git a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java index 2f9429fa3..7dd51374e 100644 --- a/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java +++ b/powertools-batch/src/test/java/software/amazon/lambda/powertools/batch/SQSBatchProcessorTest.java @@ -20,20 +20,43 @@ import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.tests.annotations.Event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.mockito.Mock; import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; import software.amazon.lambda.powertools.batch.model.Product; -public class SQSBatchProcessorTest { +class SQSBatchProcessorTest { @Mock private Context context; + private final List threadList = Collections.synchronizedList(new ArrayList<>()); + + @AfterEach + public void clear() { + threadList.clear(); + } + // A handler that works private void processMessageSucceeds(SQSEvent.SQSMessage sqsMessage) { } + private void processMessageInParallelSucceeds(SQSEvent.SQSMessage sqsMessage) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + // A handler that throws an exception for _one_ of the sample messages private void processMessageFailsForFixedMessage(SQSEvent.SQSMessage message, Context context) { if (message.getMessageId().equals("e9144555-9a4f-4ec3-99a0-34ce359b4b54")) { @@ -41,8 +64,23 @@ private void processMessageFailsForFixedMessage(SQSEvent.SQSMessage message, Con } } + private void processMessageInParallelFailsForFixedMessage(SQSEvent.SQSMessage message, Context context) { + String thread = Thread.currentThread().getName(); + if (!threadList.contains(thread)) { + threadList.add(thread); + } + try { + Thread.sleep(500); // simulate some processing + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (message.getMessageId().equals("e9144555-9a4f-4ec3-99a0-34ce359b4b54")) { + throw new RuntimeException("fake exception"); + } + } + // A handler that throws an exception for _one_ of the deserialized products in the same messages - public void processMessageFailsForFixedProduct(Product product, Context context) { + private void processMessageFailsForFixedProduct(Product product, Context context) { if (product.getId() == 12345) { throw new RuntimeException("fake exception"); } @@ -50,7 +88,7 @@ public void processMessageFailsForFixedProduct(Product product, Context context) @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void batchProcessingSucceedsAndReturns(SQSEvent event) { + void batchProcessingSucceedsAndReturns(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -60,13 +98,28 @@ public void batchProcessingSucceedsAndReturns(SQSEvent event) { SQSBatchResponse sqsBatchResponse = handler.processBatch(event, context); // Assert - assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(0); + assertThat(sqsBatchResponse.getBatchItemFailures()).isEmpty(); } + @ParameterizedTest + @Event(value = "sqs_event_big.json", type = SQSEvent.class) + void parallelBatchProcessingSucceedsAndReturns(SQSEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelSucceeds); + + // Act + SQSBatchResponse sqsBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(sqsBatchResponse.getBatchItemFailures()).isEmpty(); + assertThat(threadList).hasSizeGreaterThan(1); + } @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -81,9 +134,27 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent ev assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54"); } + @ParameterizedTest + @Event(value = "sqs_event_big.json", type = SQSEvent.class) + void parallelBatchProcessing_shouldAddMessageToBatchFailure_whenException_withMessage(SQSEvent event) { + // Arrange + BatchMessageHandler handler = new BatchMessageHandlerBuilder() + .withSqsBatchHandler() + .buildWithRawMessageHandler(this::processMessageInParallelFailsForFixedMessage); + + // Act + SQSBatchResponse sqsBatchResponse = handler.processBatchInParallel(event, context); + + // Assert + assertThat(sqsBatchResponse.getBatchItemFailures()).hasSize(1); + SQSBatchResponse.BatchItemFailure batchItemFailure = sqsBatchResponse.getBatchItemFailures().get(0); + assertThat(batchItemFailure.getItemIdentifier()).isEqualTo("e9144555-9a4f-4ec3-99a0-34ce359b4b54"); + assertThat(threadList).hasSizeGreaterThan(1); + } + @ParameterizedTest @Event(value = "sqs_fifo_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() .withSqsBatchHandler() @@ -103,7 +174,7 @@ public void shouldAddMessageToBatchFailure_whenException_withSQSFIFO(SQSEvent ev @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent event) { + void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent event) { // Arrange BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -121,7 +192,7 @@ public void shouldAddMessageToBatchFailure_whenException_withProduct(SQSEvent ev @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void failingFailureHandlerShouldntFailBatch(SQSEvent event) { + void failingFailureHandlerShouldntFailBatch(SQSEvent event) { // Arrange AtomicBoolean wasCalled = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() @@ -144,7 +215,7 @@ public void failingFailureHandlerShouldntFailBatch(SQSEvent event) { @ParameterizedTest @Event(value = "sqs_event.json", type = SQSEvent.class) - public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(SQSEvent event) { + void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(SQSEvent event) { // Arrange AtomicBoolean wasCalledAndFailed = new AtomicBoolean(false); BatchMessageHandler handler = new BatchMessageHandlerBuilder() diff --git a/powertools-batch/src/test/resources/dynamo_event_big.json b/powertools-batch/src/test/resources/dynamo_event_big.json new file mode 100644 index 000000000..fa0a75c24 --- /dev/null +++ b/powertools-batch/src/test/resources/dynamo_event_big.json @@ -0,0 +1,376 @@ +{ + "Records": [ + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439001", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439031", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439001", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439092", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439031", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899", + "userIdentity": { + "principalId": "dynamodb.amazonaws.com", + "type": "Service" + } + }, + { + "eventID": "c81e728d9d4c2f636f067f89cc14862c", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439091", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + }, + { + "eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3", + "eventName": "REMOVE", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "eu-central-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "ApproximateCreationDateTime": 1428537600, + "SequenceNumber": "4421584500000000017450439093", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:eu-central-1:123456789012:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899" + } + ] +} \ No newline at end of file diff --git a/powertools-batch/src/test/resources/kinesis_event_big.json b/powertools-batch/src/test/resources/kinesis_event_big.json new file mode 100644 index 000000000..57f702d27 --- /dev/null +++ b/powertools-batch/src/test/resources/kinesis_event_big.json @@ -0,0 +1,224 @@ +{ + "Records": [ + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200962", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200962", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200963", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200963", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200964", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200964", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200965", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200965", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + },{ + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200966", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200966", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200961", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200967", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200967", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200968", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200968", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200969", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200969", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200971", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200971", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200981", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200981", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNDUsICJuYW1lIjoicHJvZHVjdDUiLCAicHJpY2UiOjQ1fQ==", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200992", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200992", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "eyJpZCI6MTIzNCwgIm5hbWUiOiJwcm9kdWN0IiwgInByaWNlIjo0Mn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244210961", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244210961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + } + ] +} \ No newline at end of file diff --git a/powertools-batch/src/test/resources/sqs_event_big.json b/powertools-batch/src/test/resources/sqs_event_big.json new file mode 100644 index 000000000..f5c83f442 --- /dev/null +++ b/powertools-batch/src/test/resources/sqs_event_big.json @@ -0,0 +1,429 @@ +{ + "Records": [ + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1234,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "f9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1235,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "g9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1236,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1237,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1238,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144552-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1239,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "32144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1240,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1241,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b354", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1242,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "e9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1243,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "19144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1244,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce3a9b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1245,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a5-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1247,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144555-9a4f-4ec3-97a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1248,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "3k144555-9a4f-4ec2-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1249,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "h9144555-9aaf-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1250,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1234,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "f9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1235,\n \"name\": \"product\",\n \"price\": 43\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "g9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1236,\n \"name\": \"product\",\n \"price\": 44\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "c9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1237,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "b4144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1238,\n \"name\": \"product\",\n \"price\": 486\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a2144552-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "14e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1239,\n \"name\": \"product\",\n \"price\": 430\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "14e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "32144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "15e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1240,\n \"name\": \"product\",\n \"price\": 445\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "15e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "a9144555-9a4f-4ec3-99a0-34ce359b4b54", + "receiptHandle": "16e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1241,\n \"name\": \"product\",\n \"price\": 45\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "16e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-34ce359b354", + "receiptHandle": "13e7f7851d2eaa5c01f208ebadbf1e72==", + "body": "{\n \"id\": 1242,\n \"name\": \"product\",\n \"price\": 42\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975706495", + "SenderId": "AROAIFU437PVZ5L2J53F5", + "ApproximateFirstReceiveTimestamp": "1601975706499" + }, + "messageAttributes": { + }, + "md5OfBody": "13e7f7851d2eaa5c01f208ebadbf1e72", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + } + ] +} \ No newline at end of file diff --git a/powertools-cloudformation/src/main/java/software/amazon/lambda/powertools/cloudformation/Response.java b/powertools-cloudformation/src/main/java/software/amazon/lambda/powertools/cloudformation/Response.java index fe18000d4..215151d44 100644 --- a/powertools-cloudformation/src/main/java/software/amazon/lambda/powertools/cloudformation/Response.java +++ b/powertools-cloudformation/src/main/java/software/amazon/lambda/powertools/cloudformation/Response.java @@ -47,23 +47,6 @@ public static Builder builder() { return new Builder(); } - /** - * Creates a failed Response with no physicalResourceId set. Powertools for AWS Lambda (Java) will set the physicalResourceId to the - * Lambda LogStreamName - *

- * The value returned for a PhysicalResourceId can change custom resource update operations. If the value returned - * is the same, it is considered a normal update. If the value returned is different, AWS CloudFormation recognizes - * the update as a replacement and sends a delete request to the old resource. For more information, - * see AWS::CloudFormation::CustomResource. - * - * @return a failed Response with no value. - * @deprecated this method is not safe. Provide a physicalResourceId. - */ - @Deprecated - public static Response failed() { - return new Response(null, Status.FAILED, null, false); - } - /** * Creates a failed Response with a given physicalResourceId. * @@ -80,23 +63,6 @@ public static Response failed(String physicalResourceId) { return new Response(null, Status.FAILED, physicalResourceId, false); } - /** - * Creates a successful Response with no physicalResourceId set. Powertools for AWS Lambda (Java) will set the physicalResourceId to the - * Lambda LogStreamName - *

- * The value returned for a PhysicalResourceId can change custom resource update operations. If the value returned - * is the same, it is considered a normal update. If the value returned is different, AWS CloudFormation recognizes - * the update as a replacement and sends a delete request to the old resource. For more information, - * see AWS::CloudFormation::CustomResource. - * - * @return a success Response with no physicalResourceId value. - * @deprecated this method is not safe. Provide a physicalResourceId. - */ - @Deprecated - public static Response success() { - return new Response(null, Status.SUCCESS, null, false); - } - /** * Creates a successful Response with a given physicalResourceId. * diff --git a/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/CloudFormationResponseTest.java b/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/CloudFormationResponseTest.java index 51f0e95f9..0701c98fe 100644 --- a/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/CloudFormationResponseTest.java +++ b/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/CloudFormationResponseTest.java @@ -38,7 +38,7 @@ import software.amazon.awssdk.utils.StringInputStream; import software.amazon.lambda.powertools.cloudformation.CloudFormationResponse.ResponseBody; -public class CloudFormationResponseTest { +class CloudFormationResponseTest { /** * Creates a mock CloudFormationCustomResourceEvent with a non-null response URL. @@ -215,7 +215,7 @@ void reasonIncludesLogStreamName() { } @Test - public void sendWithNoResponseData() throws Exception { + void sendWithNoResponseData() throws Exception { CloudFormationCustomResourceEvent event = mockCloudFormationCustomResourceEvent(); Context context = mock(Context.class); CloudFormationResponse cfnResponse = testableCloudFormationResponse(); @@ -237,7 +237,7 @@ public void sendWithNoResponseData() throws Exception { } @Test - public void sendWithNonNullResponseData() throws Exception { + void sendWithNonNullResponseData() throws Exception { CloudFormationCustomResourceEvent event = mockCloudFormationCustomResourceEvent(); Context context = mock(Context.class); CloudFormationResponse cfnResponse = testableCloudFormationResponse(); @@ -289,7 +289,7 @@ void responseBodyStreamSuccessResponse() throws Exception { Context context = mock(Context.class); CloudFormationResponse cfnResponse = testableCloudFormationResponse(); - StringInputStream stream = cfnResponse.responseBodyStream(event, context, Response.success()); + StringInputStream stream = cfnResponse.responseBodyStream(event, context, Response.success(null)); String expectedJson = "{" + "\"Status\":\"SUCCESS\"," + @@ -310,7 +310,7 @@ void responseBodyStreamFailedResponse() throws Exception { Context context = mock(Context.class); CloudFormationResponse cfnResponse = testableCloudFormationResponse(); - StringInputStream stream = cfnResponse.responseBodyStream(event, context, Response.failed()); + StringInputStream stream = cfnResponse.responseBodyStream(event, context, Response.failed(null)); String expectedJson = "{" + "\"Status\":\"FAILED\"," + diff --git a/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/ResponseTest.java b/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/ResponseTest.java index 37fe73d0f..e577aecca 100644 --- a/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/ResponseTest.java +++ b/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/ResponseTest.java @@ -22,7 +22,7 @@ import java.util.Map; import org.junit.jupiter.api.Test; -public class ResponseTest { +class ResponseTest { @Test void defaultValues() { @@ -92,7 +92,7 @@ void jsonMapValueWithDefaultObjectMapper() { String expected = "{\"foo\":\"bar\"}"; assertThat(response.getJsonNode()).isNotNull(); - assertThat(response.getJsonNode().toString()).isEqualTo(expected); + assertThat(response.getJsonNode()).hasToString(expected); assertThat(response.toString()).contains("JSON = " + expected); } @@ -105,7 +105,7 @@ void jsonObjectValueWithDefaultObjectMapper() { .build(); String expected = "{\"PropertyWithLongName\":\"test\"}"; - assertThat(response.getJsonNode().toString()).isEqualTo(expected); + assertThat(response.getJsonNode()).hasToString(expected); assertThat(response.toString()).contains("JSON = " + expected); } @@ -119,7 +119,7 @@ void jsonObjectValueWithNullObjectMapper() { .build(); String expected = "{\"PropertyWithLongName\":\"test\"}"; - assertThat(response.getJsonNode().toString()).isEqualTo(expected); + assertThat(response.getJsonNode()).hasToString(expected); assertThat(response.toString()).contains("JSON = " + expected); } @@ -135,7 +135,7 @@ void jsonObjectValueWithCustomObjectMapper() { .build(); String expected = "{\"property-with-long-name\":10}"; - assertThat(response.getJsonNode().toString()).isEqualTo(expected); + assertThat(response.getJsonNode()).hasToString(expected); assertThat(response.toString()).contains("JSON = " + expected); } @@ -154,13 +154,13 @@ void jsonObjectValueWithPostConfiguredObjectMapper() { customMapper.setPropertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE); String expected = "{\"property-with-long-name\":10}"; - assertThat(response.getJsonNode().toString()).isEqualTo(expected); + assertThat(response.getJsonNode()).hasToString(expected); assertThat(response.toString()).contains("JSON = " + expected); } @Test void successFactoryMethod() { - Response response = Response.success(); + Response response = Response.success(null); assertThat(response).isNotNull(); assertThat(response.getStatus()).isEqualTo(Response.Status.SUCCESS); @@ -168,7 +168,7 @@ void successFactoryMethod() { @Test void failedFactoryMethod() { - Response response = Response.failed(); + Response response = Response.failed(null); assertThat(response).isNotNull(); assertThat(response.getStatus()).isEqualTo(Response.Status.FAILED); diff --git a/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/handlers/NoPhysicalResourceIdSetHandler.java b/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/handlers/NoPhysicalResourceIdSetHandler.java index 2bbda309f..e55abca03 100644 --- a/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/handlers/NoPhysicalResourceIdSetHandler.java +++ b/powertools-cloudformation/src/test/java/software/amazon/lambda/powertools/cloudformation/handlers/NoPhysicalResourceIdSetHandler.java @@ -23,16 +23,16 @@ public class NoPhysicalResourceIdSetHandler extends AbstractCustomResourceHandle @Override protected Response create(CloudFormationCustomResourceEvent event, Context context) { - return Response.success(); + return Response.success(null); } @Override protected Response update(CloudFormationCustomResourceEvent event, Context context) { - return Response.success(); + return Response.success(null); } @Override protected Response delete(CloudFormationCustomResourceEvent event, Context context) { - return Response.success(); + return Response.success(null); } } diff --git a/powertools-metrics/src/main/java/software/amazon/lambda/powertools/metrics/MetricsUtils.java b/powertools-metrics/src/main/java/software/amazon/lambda/powertools/metrics/MetricsUtils.java index ba53bad1f..6c3a89a65 100644 --- a/powertools-metrics/src/main/java/software/amazon/lambda/powertools/metrics/MetricsUtils.java +++ b/powertools-metrics/src/main/java/software/amazon/lambda/powertools/metrics/MetricsUtils.java @@ -14,7 +14,6 @@ package software.amazon.lambda.powertools.metrics; -import static java.util.Objects.requireNonNull; import static java.util.Optional.ofNullable; import static software.amazon.lambda.powertools.common.internal.LambdaHandlerProcessor.getXrayTraceId; import static software.amazon.lambda.powertools.metrics.internal.LambdaMetricsAspect.REQUEST_ID_PROPERTY; @@ -63,23 +62,6 @@ public static void defaultDimensions(final DimensionSet... dimensionSets) { MetricsUtils.defaultDimensions = dimensionSets; } - /** - * Configure default dimension to be used by logger. - * By default, @{@link Metrics} annotation captures configured service as a dimension Service - * - * @param dimensionSet Default value of dimension set for logger - * @deprecated use {@link #defaultDimensions(DimensionSet...)} instead - */ - @Deprecated - public static void defaultDimensionSet(final DimensionSet dimensionSet) { - requireNonNull(dimensionSet, "Null dimension set not allowed"); - - if (dimensionSet.getDimensionKeys().size() > 0) { - defaultDimensions(dimensionSet); - } - } - - /** * Add and immediately flush a single metric. It will use the default namespace * specified either on {@link Metrics} annotation or via POWERTOOLS_METRICS_NAMESPACE env var. @@ -146,20 +128,6 @@ public static void withMetricsLogger(final Consumer logger) { } } - /** - * Provide and immediately flush a {@link MetricsLogger}. It uses the default namespace - * specified either on {@link Metrics} annotation or via POWERTOOLS_METRICS_NAMESPACE env var. - * It by default captures function_request_id as property if used together with {@link Metrics} annotation. It will also - * capture xray_trace_id as property if tracing is enabled. - * - * @param logger the MetricsLogger - * @deprecated use {@link MetricsUtils#withMetricsLogger} instead - */ - @Deprecated - public static void withMetricLogger(final Consumer logger) { - withMetricsLogger(logger); - } - public static DimensionSet[] getDefaultDimensions() { return Arrays.copyOf(defaultDimensions, defaultDimensions.length); } diff --git a/powertools-metrics/src/test/java/software/amazon/lambda/powertools/metrics/MetricsLoggerTest.java b/powertools-metrics/src/test/java/software/amazon/lambda/powertools/metrics/MetricsLoggerTest.java index 26ae41a00..518e11739 100644 --- a/powertools-metrics/src/test/java/software/amazon/lambda/powertools/metrics/MetricsLoggerTest.java +++ b/powertools-metrics/src/test/java/software/amazon/lambda/powertools/metrics/MetricsLoggerTest.java @@ -188,18 +188,6 @@ void metricsLoggerCaptureUtilityWithDefaultNameSpace() { testLogger(MetricsUtils::withMetricsLogger); } - @Test - void deprecatedMetricLoggerCaptureUtilityWithDefaultNameSpace() { - testLogger(MetricsUtils::withMetricLogger); - } - - @Test - void shouldThrowExceptionWhenDefaultDimensionIsNull() { - assertThatNullPointerException() - .isThrownBy(() -> MetricsUtils.defaultDimensionSet(null)) - .withMessage("Null dimension set not allowed"); - } - @Test void shouldUseTraceIdFromSystemPropertyIfEnvVarNotPresent() { try (MockedStatic mocked = mockStatic(SystemWrapper.class); diff --git a/powertools-tracing/pom.xml b/powertools-tracing/pom.xml index 8b16529ef..0e089a3ab 100644 --- a/powertools-tracing/pom.xml +++ b/powertools-tracing/pom.xml @@ -87,6 +87,10 @@ org.slf4j slf4j-simple test + + org.junit-pioneer + junit-pioneer + test org.apache.commons diff --git a/powertools-tracing/src/main/java/software/amazon/lambda/powertools/tracing/Tracing.java b/powertools-tracing/src/main/java/software/amazon/lambda/powertools/tracing/Tracing.java index 6f17a2e33..432e475a3 100644 --- a/powertools-tracing/src/main/java/software/amazon/lambda/powertools/tracing/Tracing.java +++ b/powertools-tracing/src/main/java/software/amazon/lambda/powertools/tracing/Tracing.java @@ -50,22 +50,6 @@ public @interface Tracing { String namespace() default ""; - /** - * @deprecated As of release 1.2.0, replaced by captureMode() - * in order to support different modes and support via - * environment variables - */ - @Deprecated - boolean captureResponse() default true; - - /** - * @deprecated As of release 1.2.0, replaced by captureMode() - * in order to support different modes and support via - * environment variables - */ - @Deprecated - boolean captureError() default true; - String segmentName() default ""; CaptureMode captureMode() default CaptureMode.ENVIRONMENT_VAR; diff --git a/powertools-tracing/src/main/java/software/amazon/lambda/powertools/tracing/internal/LambdaTracingAspect.java b/powertools-tracing/src/main/java/software/amazon/lambda/powertools/tracing/internal/LambdaTracingAspect.java index 198cb7f34..a4a48532c 100644 --- a/powertools-tracing/src/main/java/software/amazon/lambda/powertools/tracing/internal/LambdaTracingAspect.java +++ b/powertools-tracing/src/main/java/software/amazon/lambda/powertools/tracing/internal/LambdaTracingAspect.java @@ -83,9 +83,8 @@ public Object around(ProceedingJoinPoint pjp, private boolean captureResponse(Tracing powerToolsTracing) { switch (powerToolsTracing.captureMode()) { case ENVIRONMENT_VAR: - boolean captureResponse = environmentVariable("POWERTOOLS_TRACER_CAPTURE_RESPONSE"); - return isEnvironmentVariableSet("POWERTOOLS_TRACER_CAPTURE_RESPONSE") ? captureResponse : - powerToolsTracing.captureResponse(); + return isEnvironmentVariableSet("POWERTOOLS_TRACER_CAPTURE_RESPONSE") + && environmentVariable("POWERTOOLS_TRACER_CAPTURE_RESPONSE"); case RESPONSE: case RESPONSE_AND_ERROR: return true; @@ -98,9 +97,8 @@ private boolean captureResponse(Tracing powerToolsTracing) { private boolean captureError(Tracing powerToolsTracing) { switch (powerToolsTracing.captureMode()) { case ENVIRONMENT_VAR: - boolean captureError = environmentVariable("POWERTOOLS_TRACER_CAPTURE_ERROR"); - return isEnvironmentVariableSet("POWERTOOLS_TRACER_CAPTURE_ERROR") ? captureError : - powerToolsTracing.captureError(); + return isEnvironmentVariableSet("POWERTOOLS_TRACER_CAPTURE_ERROR") + && environmentVariable("POWERTOOLS_TRACER_CAPTURE_ERROR"); case ERROR: case RESPONSE_AND_ERROR: return true; diff --git a/powertools-tracing/src/test/java/software/amazon/lambda/powertools/tracing/handlers/PowerTracerToolEnabledWithNoMetaDataDeprecated.java b/powertools-tracing/src/test/java/software/amazon/lambda/powertools/tracing/handlers/PowerTracerToolEnabledWithNoMetaDataDeprecated.java deleted file mode 100644 index c13f28df0..000000000 --- a/powertools-tracing/src/test/java/software/amazon/lambda/powertools/tracing/handlers/PowerTracerToolEnabledWithNoMetaDataDeprecated.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2023 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package software.amazon.lambda.powertools.tracing.handlers; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import software.amazon.lambda.powertools.tracing.Tracing; - -public class PowerTracerToolEnabledWithNoMetaDataDeprecated implements RequestHandler { - - @Override - @Tracing(captureResponse = false, captureError = false) - public Object handleRequest(Object input, Context context) { - return null; - } -} diff --git a/powertools-tracing/src/test/java/software/amazon/lambda/powertools/tracing/internal/LambdaTracingAspectTest.java b/powertools-tracing/src/test/java/software/amazon/lambda/powertools/tracing/internal/LambdaTracingAspectTest.java index a676bf683..4aefdec9c 100644 --- a/powertools-tracing/src/test/java/software/amazon/lambda/powertools/tracing/internal/LambdaTracingAspectTest.java +++ b/powertools-tracing/src/test/java/software/amazon/lambda/powertools/tracing/internal/LambdaTracingAspectTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.SetEnvironmentVariable; import org.mockito.Mock; import org.mockito.MockedStatic; import software.amazon.lambda.powertools.common.internal.LambdaHandlerProcessor; @@ -47,7 +48,6 @@ import software.amazon.lambda.powertools.tracing.handlers.PowerTracerToolEnabledForStreamWithNoMetaData; import software.amazon.lambda.powertools.tracing.handlers.PowerTracerToolEnabledWithException; import software.amazon.lambda.powertools.tracing.handlers.PowerTracerToolEnabledWithNoMetaData; -import software.amazon.lambda.powertools.tracing.handlers.PowerTracerToolEnabledWithNoMetaDataDeprecated; import software.amazon.lambda.powertools.tracing.nonhandler.PowerToolNonHandler; class LambdaTracingAspectTest { @@ -114,6 +114,28 @@ void shouldCaptureNonHandlerMethodWithCustomSegmentName() { void shouldCaptureTraces() { requestHandler.handleRequest(new Object(), context); + assertThat(AWSXRay.getTraceEntity()) + .isNotNull(); + + assertThat(AWSXRay.getTraceEntity().getSubsegmentsCopy()) + .hasSize(1) + .allSatisfy(subsegment -> + { + assertThat(subsegment.getAnnotations()) + .hasSize(2) + .containsEntry("ColdStart", true) + .containsEntry("Service", "lambdaHandler"); + + assertThat(subsegment.getMetadata()) + .hasSize(0); + }); + } + + @Test + @SetEnvironmentVariable(key = "POWERTOOLS_TRACER_CAPTURE_RESPONSE", value = "true") + void shouldCaptureTracesWithResponseMetadata() { + requestHandler.handleRequest(new Object(), context); + assertThat(AWSXRay.getTraceEntity()) .isNotNull(); @@ -133,6 +155,7 @@ void shouldCaptureTraces() { } @Test + @SetEnvironmentVariable(key = "POWERTOOLS_TRACER_CAPTURE_ERROR", value = "true") void shouldCaptureTracesWithExceptionMetaData() { requestHandler = new PowerTracerToolEnabledWithException(); @@ -164,6 +187,25 @@ void shouldCaptureTracesWithExceptionMetaData() { void shouldCaptureTracesForStream() throws IOException { streamHandler.handleRequest(new ByteArrayInputStream("test".getBytes()), new ByteArrayOutputStream(), context); + assertThat(AWSXRay.getTraceEntity()) + .isNotNull(); + + assertThat(AWSXRay.getTraceEntity().getSubsegmentsCopy()) + .hasSize(1) + .allSatisfy(subsegment -> + { + assertThat(subsegment.getAnnotations()) + .hasSize(2) + .containsEntry("ColdStart", true) + .containsEntry("Service", "streamHandler"); + }); + } + + @Test + @SetEnvironmentVariable(key = "POWERTOOLS_TRACER_CAPTURE_RESPONSE", value = "true") + void shouldCaptureTracesForStreamWithResponseMetadata() throws IOException { + streamHandler.handleRequest(new ByteArrayInputStream("test".getBytes()), new ByteArrayOutputStream(), context); + assertThat(AWSXRay.getTraceEntity()) .isNotNull(); @@ -246,29 +288,6 @@ void shouldCaptureTracesForStreamWithNoMetadata() throws IOException { }); } - @Test - void shouldCaptureTracesWithNoMetadataDeprecated() { - requestHandler = new PowerTracerToolEnabledWithNoMetaDataDeprecated(); - - requestHandler.handleRequest(new Object(), context); - - assertThat(AWSXRay.getTraceEntity()) - .isNotNull(); - - assertThat(AWSXRay.getTraceEntity().getSubsegmentsCopy()) - .hasSize(1) - .allSatisfy(subsegment -> - { - assertThat(subsegment.getAnnotations()) - .hasSize(2) - .containsEntry("ColdStart", true) - .containsEntry("Service", "service_undefined"); - - assertThat(subsegment.getMetadata()) - .isEmpty(); - }); - } - @Test void shouldNotCaptureTracesIfDisabledViaEnvironmentVariable() { try (MockedStatic mocked = mockStatic(SystemWrapper.class)) { diff --git a/powertools-validation/pom.xml b/powertools-validation/pom.xml index 18c9b064a..7a686bfc2 100644 --- a/powertools-validation/pom.xml +++ b/powertools-validation/pom.xml @@ -65,7 +65,7 @@ com.networknt json-schema-validator - 1.4.3 + 1.5.1 com.amazonaws