diff --git a/.gitignore b/.gitignore
index 12a60ce4d..04e85211d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -82,6 +82,8 @@ Desktop.ini
######################
/bin/
/deploy/
+/dist/
+/site/
######################
# Logs
diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md
index 56b9acb78..17391218d 100644
--- a/docs/utilities/idempotency.md
+++ b/docs/utilities/idempotency.md
@@ -355,19 +355,141 @@ Imagine the function executes successfully, but the client never receives the re
This sequence diagram shows an example flow of what happens in the payment scenario:
-![Idempotent sequence](../media/idempotent_sequence.png)
+
+```mermaid
+sequenceDiagram
+ participant Client
+ participant Lambda
+ participant Persistence Layer
+ alt initial request
+ Client->>Lambda: Invoke (event)
+ Lambda->>Persistence Layer: Get or set (id=event.search(payload))
+ activate Persistence Layer
+ Note right of Persistence Layer: Locked to prevent concurrent
invocations with
the same payload.
+ Lambda-->>Lambda: Call handler (event)
+ Lambda->>Persistence Layer: Update record with result
+ deactivate Persistence Layer
+ Persistence Layer-->>Persistence Layer: Update record with result
+ Lambda-->>Client: Response sent to client
+ else retried request
+ Client->>Lambda: Invoke (event)
+ Lambda->>Persistence Layer: Get or set (id=event.search(payload))
+ Persistence Layer-->>Lambda: Already exists in persistence layer. Return result
+ Lambda-->>Client: Response sent to client
+ end
+```
+Idempotent sequence
+
The client was successful in receiving the result after the retry. Since the Lambda handler was only executed once, our customer hasn't been charged twice.
!!! note
Bear in mind that the entire Lambda handler is treated as a single idempotent operation. If your Lambda handler can cause multiple side effects, consider splitting it into separate functions.
+#### Lambda timeouts
+
+This is automatically done when you annotate your Lambda handler with [@Idempotent annotation](#idempotent-annotation).
+
+To prevent against extended failed retries when a [Lambda function times out](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-verify-invocation-timeouts/), Powertools calculates and includes the remaining invocation available time as part of the idempotency record.
+
+!!! example
+ If a second invocation happens **after** this timestamp, and the record is marked as `INPROGRESS`, we will execute the invocation again as if it was in the `EXPIRED` state.
+ This means that if an invocation expired during execution, it will be quickly executed again on the next retry.
+
+!!! important
+ If you are using the [@Idempotent annotation on another method](#idempotent-annotation-on-another-method) to guard isolated parts of your code, you must use `registerLambdaContext` method available in the `Idempotency` object to benefit from this protection.
+
+ Here is an example on how you register the Lambda context in your handler:
+
+ ```java hl_lines="13-19" title="Registering the Lambda context"
+ public class PaymentHandler implements RequestHandler> {
+
+ public PaymentHandler() {
+ Idempotency.config()
+ .withPersistenceStore(
+ DynamoDBPersistenceStore.builder()
+ .withTableName(System.getenv("IDEMPOTENCY_TABLE"))
+ .build())
+ .configure();
+ }
+
+ @Override
+ public List handleRequest(SQSEvent sqsEvent, Context context) {
+ Idempotency.registerLambdaContext(context);
+ return sqsEvent.getRecords().stream().map(record -> process(record.getMessageId(), record.getBody())).collect(Collectors.toList());
+ }
+
+ @Idempotent
+ private String process(String messageId, @IdempotencyKey String messageBody) {
+ logger.info("Processing messageId: {}", messageId);
+ PaymentRequest request = extractDataFrom(messageBody).as(PaymentRequest.class);
+ return paymentService.process(request);
+ }
+
+ }
+ ```
+
+#### Lambda timeout sequence diagram
+
+This sequence diagram shows an example flow of what happens if a Lambda function times out:
+
+
+```mermaid
+sequenceDiagram
+ participant Client
+ participant Lambda
+ participant Persistence Layer
+ alt initial request
+ Client->>Lambda: Invoke (event)
+ Lambda->>Persistence Layer: Get or set (id=event.search(payload))
+ activate Persistence Layer
+ Note right of Persistence Layer: Locked to prevent concurrent
invocations with
the same payload.
+ Note over Lambda: Time out
+ Lambda--xLambda: Call handler (event)
+ Lambda-->>Client: Return error response
+ deactivate Persistence Layer
+ else concurrent request before timeout
+ Client->>Lambda: Invoke (event)
+ Lambda->>Persistence Layer: Get or set (id=event.search(payload))
+ Persistence Layer-->>Lambda: Request already INPROGRESS
+ Lambda--xClient: Return IdempotencyAlreadyInProgressError
+ else retry after Lambda timeout
+ Client->>Lambda: Invoke (event)
+ Lambda->>Persistence Layer: Get or set (id=event.search(payload))
+ activate Persistence Layer
+ Note right of Persistence Layer: Locked to prevent concurrent
invocations with
the same payload.
+ Lambda-->>Lambda: Call handler (event)
+ Lambda->>Persistence Layer: Update record with result
+ deactivate Persistence Layer
+ Persistence Layer-->>Persistence Layer: Update record with result
+ Lambda-->>Client: Response sent to client
+ end
+```
+Idempotent sequence for Lambda timeouts
+
+
### Handling exceptions
If you are using the `@Idempotent` annotation on your Lambda handler or any other method, any unhandled exceptions that are thrown during the code execution will cause **the record in the persistence layer to be deleted**.
This means that new invocations will execute your code again despite having the same payload. If you don't want the record to be deleted, you need to catch exceptions within the idempotent function and return a successful response.
-![Idempotent sequence exception](../media/idempotent_sequence_exception.png)
+
+```mermaid
+sequenceDiagram
+ participant Client
+ participant Lambda
+ participant Persistence Layer
+ Client->>Lambda: Invoke (event)
+ Lambda->>Persistence Layer: Get or set (id=event.search(payload))
+ activate Persistence Layer
+ Note right of Persistence Layer: Locked during this time. Prevents multiple
Lambda invocations with the same
payload running concurrently.
+ Lambda--xLambda: Call handler (event).
Raises exception
+ Lambda->>Persistence Layer: Delete record (id=event.search(payload))
+ deactivate Persistence Layer
+ Lambda-->>Client: Return error response
+```
+Idempotent sequence exception
+
If an Exception is raised _outside_ the scope of a decorated method and after your method has been called, the persistent record will not be affected. In this case, idempotency will be maintained for your decorated function. Example:
diff --git a/mkdocs.yml b/mkdocs.yml
index 574b8b30a..58e7fe757 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -56,6 +56,11 @@ markdown_extensions:
- pymdownx.snippets:
base_path: '.'
check_paths: true
+ - pymdownx.superfences:
+ custom_fences:
+ - name: mermaid
+ class: mermaid
+ format: !!python/name:pymdownx.superfences.fence_code_format
- meta
- toc:
permalink: true
diff --git a/powertools-idempotency/pom.xml b/powertools-idempotency/pom.xml
index 632619d45..396cbe8a5 100644
--- a/powertools-idempotency/pom.xml
+++ b/powertools-idempotency/pom.xml
@@ -122,6 +122,7 @@
com.amazonaws
aws-lambda-java-tests
+ test
com.amazonaws
diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/Idempotency.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/Idempotency.java
index 1ff2ed47f..ce652791b 100644
--- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/Idempotency.java
+++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/Idempotency.java
@@ -65,6 +65,16 @@ public static Idempotency getInstance() {
return Holder.instance;
}
+ /**
+ * Can be used in a method which is not the handler to capture the Lambda context,
+ * to calculate the remaining time before the invocation times out.
+ *
+ * @param lambdaContext
+ */
+ public static void registerLambdaContext(Context lambdaContext) {
+ getInstance().getConfig().setLambdaContext(lambdaContext);
+ }
+
/**
* Acts like a builder that can be used to configure {@link Idempotency}
*
diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/IdempotencyConfig.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/IdempotencyConfig.java
index 4089d3ed8..28f20ffa9 100644
--- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/IdempotencyConfig.java
+++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/IdempotencyConfig.java
@@ -13,6 +13,7 @@
*/
package software.amazon.lambda.powertools.idempotency;
+import com.amazonaws.services.lambda.runtime.Context;
import software.amazon.lambda.powertools.idempotency.internal.cache.LRUCache;
import java.time.Duration;
@@ -28,6 +29,7 @@ public class IdempotencyConfig {
private final String payloadValidationJMESPath;
private final boolean throwOnNoIdempotencyKey;
private final String hashFunction;
+ private Context lambdaContext;
private IdempotencyConfig(String eventKeyJMESPath, String payloadValidationJMESPath, boolean throwOnNoIdempotencyKey, boolean useLocalCache, int localCacheMaxItems, long expirationInSeconds, String hashFunction) {
this.localCacheMaxItems = localCacheMaxItems;
@@ -71,12 +73,20 @@ public String getHashFunction() {
/**
* Create a builder that can be used to configure and create a {@link IdempotencyConfig}.
*
- * @return a new instance of {@link IdempotencyConfig.Builder}
+ * @return a new instance of {@link Builder}
*/
public static Builder builder() {
return new Builder();
}
+ public void setLambdaContext(Context lambdaContext) {
+ this.lambdaContext = lambdaContext;
+ }
+
+ public Context getLambdaContext() {
+ return lambdaContext;
+ }
+
public static class Builder {
private int localCacheMaxItems = 256;
diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/exceptions/IdempotencyInconsistentStateException.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/exceptions/IdempotencyInconsistentStateException.java
index c6fe38d23..40c90dcab 100644
--- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/exceptions/IdempotencyInconsistentStateException.java
+++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/exceptions/IdempotencyInconsistentStateException.java
@@ -15,7 +15,7 @@
/**
* IdempotencyInconsistentStateException can happen under rare but expected cases
- * when persistent state changes in the small-time between put & get requests.
+ * when persistent state changes in the small-time between put & get requests.
*/
public class IdempotencyInconsistentStateException extends RuntimeException {
private static final long serialVersionUID = -4293951999802300672L;
diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java
index 1f3724919..c7e910bce 100644
--- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java
+++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java
@@ -13,6 +13,7 @@
*/
package software.amazon.lambda.powertools.idempotency.internal;
+import com.amazonaws.services.lambda.runtime.Context;
import com.fasterxml.jackson.databind.JsonNode;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
@@ -25,6 +26,7 @@
import software.amazon.lambda.powertools.utilities.JsonConfig;
import java.time.Instant;
+import java.util.OptionalInt;
import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.EXPIRED;
import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.INPROGRESS;
@@ -40,10 +42,12 @@ public class IdempotencyHandler {
private final ProceedingJoinPoint pjp;
private final JsonNode data;
private final BasePersistenceStore persistenceStore;
+ private final Context lambdaContext;
- public IdempotencyHandler(ProceedingJoinPoint pjp, String functionName, JsonNode payload) {
+ public IdempotencyHandler(ProceedingJoinPoint pjp, String functionName, JsonNode payload, Context lambdaContext) {
this.pjp = pjp;
this.data = payload;
+ this.lambdaContext = lambdaContext;
persistenceStore = Idempotency.getInstance().getPersistenceStore();
persistenceStore.configure(Idempotency.getInstance().getConfig(), functionName);
}
@@ -77,7 +81,7 @@ private Object processIdempotency() throws Throwable {
try {
// We call saveInProgress first as an optimization for the most common case where no idempotent record
// already exists. If it succeeds, there's no need to call getRecord.
- persistenceStore.saveInProgress(data, Instant.now());
+ persistenceStore.saveInProgress(data, Instant.now(), getRemainingTimeInMillis());
} catch (IdempotencyItemAlreadyExistsException iaee) {
DataRecord record = getIdempotencyRecord();
return handleForStatus(record);
@@ -89,6 +93,21 @@ private Object processIdempotency() throws Throwable {
return getFunctionResponse();
}
+ /**
+ * Tries to determine the remaining time available for the current lambda invocation.
+ * Currently, it only works if the idempotent handler decorator is used or using {@link Idempotency#registerLambdaContext(Context)}
+ *
+ * @return the remaining time in milliseconds or empty if the context was not provided/found
+ */
+ private OptionalInt getRemainingTimeInMillis() {
+ if (lambdaContext != null) {
+ return OptionalInt.of(lambdaContext.getRemainingTimeInMillis());
+ } else {
+ LOG.warn("Couldn't determine the remaining time left. Did you call registerLambdaContext on Idempotency?");
+ }
+ return OptionalInt.empty();
+ }
+
/**
* Retrieve the idempotency record from the persistence layer.
*
@@ -121,12 +140,18 @@ private Object handleForStatus(DataRecord record) {
}
if (INPROGRESS.equals(record.getStatus())) {
+ if (record.getInProgressExpiryTimestamp().isPresent()
+ && record.getInProgressExpiryTimestamp().getAsLong() < Instant.now().toEpochMilli()) {
+ throw new IdempotencyInconsistentStateException("Item should have been expired in-progress because it already time-outed.");
+ }
throw new IdempotencyAlreadyInProgressException("Execution already in progress with idempotency key: " + record.getIdempotencyKey());
}
Class> returnType = ((MethodSignature) pjp.getSignature()).getReturnType();
try {
LOG.debug("Response for key '{}' retrieved from idempotency store, skipping the function", record.getIdempotencyKey());
+ if (returnType.equals(String.class))
+ return record.getResponseData();
return JsonConfig.get().getObjectMapper().reader().readValue(record.getResponseData(), returnType);
} catch (Exception e) {
throw new IdempotencyPersistenceLayerException("Unable to get function response as " + returnType.getSimpleName(), e);
diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotentAspect.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotentAspect.java
index 088e81249..8bc3c8c8a 100644
--- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotentAspect.java
+++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotentAspect.java
@@ -19,7 +19,9 @@
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
+import com.amazonaws.services.lambda.runtime.Context;
import software.amazon.lambda.powertools.idempotency.Constants;
+import software.amazon.lambda.powertools.idempotency.Idempotency;
import software.amazon.lambda.powertools.idempotency.IdempotencyKey;
import software.amazon.lambda.powertools.idempotency.Idempotent;
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyConfigurationException;
@@ -56,12 +58,20 @@ public Object around(ProceedingJoinPoint pjp,
throw new IdempotencyConfigurationException("The annotated method doesn't return anything. Unable to perform idempotency on void return type");
}
- JsonNode payload = getPayload(pjp, method);
+ boolean isHandler = (isHandlerMethod(pjp) && placedOnRequestHandler(pjp));
+ JsonNode payload = getPayload(pjp, method, isHandler);
if (payload == null) {
throw new IdempotencyConfigurationException("Unable to get payload from the method. Ensure there is at least one parameter or that you use @IdempotencyKey");
}
- IdempotencyHandler idempotencyHandler = new IdempotencyHandler(pjp, method.getName(), payload);
+ Context lambdaContext;
+ if (isHandler) {
+ lambdaContext = (Context) pjp.getArgs()[1];
+ } else {
+ lambdaContext = Idempotency.getInstance().getConfig().getLambdaContext();
+ }
+
+ IdempotencyHandler idempotencyHandler = new IdempotencyHandler(pjp, method.getName(), payload, lambdaContext);
return idempotencyHandler.handle();
}
@@ -71,11 +81,10 @@ public Object around(ProceedingJoinPoint pjp,
* @param method the annotated method
* @return the payload used for idempotency
*/
- private JsonNode getPayload(ProceedingJoinPoint pjp, Method method) {
+ private JsonNode getPayload(ProceedingJoinPoint pjp, Method method, boolean isHandler) {
JsonNode payload = null;
// handleRequest or method with one parameter: get the first one
- if ((isHandlerMethod(pjp) && placedOnRequestHandler(pjp))
- || pjp.getArgs().length == 1) {
+ if (isHandler || pjp.getArgs().length == 1) {
payload = JsonConfig.get().getObjectMapper().valueToTree(pjp.getArgs()[0]);
} else {
// Look for a parameter annotated with @IdempotencyKey
diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java
index db0aaa688..79db29f05 100644
--- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java
+++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java
@@ -36,9 +36,11 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.OptionalLong;
+import java.util.OptionalInt;
import java.util.stream.Stream;
+import java.util.Spliterators;
+import java.util.Spliterator;
import java.util.stream.StreamSupport;
/**
@@ -108,7 +110,12 @@ public void configure(IdempotencyConfig config, String functionName) {
public void saveSuccess(JsonNode data, Object result, Instant now) {
ObjectWriter writer = JsonConfig.get().getObjectMapper().writer();
try {
- String responseJson = writer.writeValueAsString(result);
+ String responseJson;
+ if (result instanceof String) {
+ responseJson = (String) result;
+ } else {
+ responseJson = writer.writeValueAsString(result);
+ }
DataRecord record = new DataRecord(
getHashedIdempotencyKey(data),
DataRecord.Status.COMPLETED,
@@ -131,19 +138,25 @@ public void saveSuccess(JsonNode data, Object result, Instant now) {
* @param data Payload
* @param now
*/
- public void saveInProgress(JsonNode data, Instant now) throws IdempotencyItemAlreadyExistsException {
+ public void saveInProgress(JsonNode data, Instant now, OptionalInt remainingTimeInMs) throws IdempotencyItemAlreadyExistsException {
String idempotencyKey = getHashedIdempotencyKey(data);
if (retrieveFromCache(idempotencyKey, now) != null) {
throw new IdempotencyItemAlreadyExistsException();
}
+ OptionalLong inProgressExpirationMsTimestamp = OptionalLong.empty();
+ if (remainingTimeInMs.isPresent()) {
+ inProgressExpirationMsTimestamp = OptionalLong.of(now.plus(remainingTimeInMs.getAsInt(), ChronoUnit.MILLIS).toEpochMilli());
+ }
+
DataRecord record = new DataRecord(
idempotencyKey,
DataRecord.Status.INPROGRESS,
getExpiryEpochSecond(now),
null,
- getHashedPayload(data)
+ getHashedPayload(data),
+ inProgressExpirationMsTimestamp
);
LOG.debug("saving in progress record for idempotency key: {}", record.getIdempotencyKey());
putRecord(record, now);
@@ -212,7 +225,8 @@ private String getHashedIdempotencyKey(JsonNode data) {
}
String hash = generateHash(node);
- return functionName + "#" + hash;
+ hash = functionName + "#" + hash;
+ return hash;
}
private boolean isMissingIdemPotencyKey(JsonNode data) {
diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java
index b4f58a73d..934ec3d09 100644
--- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java
+++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DataRecord.java
@@ -13,8 +13,12 @@
*/
package software.amazon.lambda.powertools.idempotency.persistence;
+import software.amazon.lambda.powertools.idempotency.IdempotencyConfig;
+
import java.time.Instant;
import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
/**
* Data Class for idempotency records. This is actually the item that will be stored in the persistence layer.
@@ -25,6 +29,7 @@ public class DataRecord {
private final long expiryTimestamp;
private final String responseData;
private final String payloadHash;
+ private final OptionalLong inProgressExpiryTimestamp;
public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData, String payloadHash) {
this.idempotencyKey = idempotencyKey;
@@ -32,6 +37,16 @@ public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, St
this.expiryTimestamp = expiryTimestamp;
this.responseData = responseData;
this.payloadHash = payloadHash;
+ this.inProgressExpiryTimestamp = OptionalLong.empty();
+ }
+
+ public DataRecord(String idempotencyKey, Status status, long expiryTimestamp, String responseData, String payloadHash, OptionalLong inProgressExpiryTimestamp) {
+ this.idempotencyKey = idempotencyKey;
+ this.status = status.toString();
+ this.expiryTimestamp = expiryTimestamp;
+ this.responseData = responseData;
+ this.payloadHash = payloadHash;
+ this.inProgressExpiryTimestamp = inProgressExpiryTimestamp;
}
public String getIdempotencyKey() {
@@ -39,7 +54,7 @@ public String getIdempotencyKey() {
}
/**
- * Check if data record is expired (based on expiration configured in the {@link software.amazon.lambda.powertools.idempotency.IdempotencyConfig})
+ * Check if data record is expired (based on expiration configured in the {@link IdempotencyConfig})
*
* @return Whether the record is currently expired or not
*/
@@ -60,6 +75,10 @@ public long getExpiryTimestamp() {
return expiryTimestamp;
}
+ public OptionalLong getInProgressExpiryTimestamp() {
+ return inProgressExpiryTimestamp;
+ }
+
public String getResponseData() {
return responseData;
}
@@ -85,6 +104,7 @@ public int hashCode() {
return Objects.hash(idempotencyKey, status, expiryTimestamp, responseData, payloadHash);
}
+
/**
* Status of the record:
*
diff --git a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java
index 4985f845d..e48cb78e2 100644
--- a/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java
+++ b/powertools-idempotency/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStore.java
@@ -28,13 +28,14 @@
import java.time.Instant;
import java.util.AbstractMap;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static software.amazon.lambda.powertools.idempotency.Constants.AWS_REGION_ENV;
+import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.INPROGRESS;
/**
* DynamoDB version of the {@link PersistenceStore}. Will store idempotency data in DynamoDB.
@@ -49,6 +50,8 @@ public class DynamoDBPersistenceStore extends BasePersistenceStore implements Pe
private final String staticPkValue;
private final String sortKeyAttr;
private final String expiryAttr;
+
+ private final String inProgressExpiryAttr;
private final String statusAttr;
private final String dataAttr;
private final String validationAttr;
@@ -62,6 +65,7 @@ private DynamoDBPersistenceStore(String tableName,
String staticPkValue,
String sortKeyAttr,
String expiryAttr,
+ String inProgressExpiryAttr,
String statusAttr,
String dataAttr,
String validationAttr,
@@ -71,6 +75,7 @@ private DynamoDBPersistenceStore(String tableName,
this.staticPkValue = staticPkValue;
this.sortKeyAttr = sortKeyAttr;
this.expiryAttr = expiryAttr;
+ this.inProgressExpiryAttr = inProgressExpiryAttr;
this.statusAttr = statusAttr;
this.dataAttr = dataAttr;
this.validationAttr = validationAttr;
@@ -115,6 +120,11 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre
Map item = new HashMap<>(getKey(record.getIdempotencyKey()));
item.put(this.expiryAttr, AttributeValue.builder().n(String.valueOf(record.getExpiryTimestamp())).build());
item.put(this.statusAttr, AttributeValue.builder().s(record.getStatus().toString()).build());
+
+ if (record.getInProgressExpiryTimestamp().isPresent()) {
+ item.put(this.inProgressExpiryAttr, AttributeValue.builder().n(String.valueOf(record.getInProgressExpiryTimestamp().getAsLong())).build());
+ }
+
if (this.payloadValidationEnabled) {
item.put(this.validationAttr, AttributeValue.builder().s(record.getPayloadHash()).build());
}
@@ -124,16 +134,24 @@ public void putRecord(DataRecord record, Instant now) throws IdempotencyItemAlre
Map expressionAttributeNames = Stream.of(
new AbstractMap.SimpleEntry<>("#id", this.keyAttr),
- new AbstractMap.SimpleEntry<>("#expiry", this.expiryAttr))
+ new AbstractMap.SimpleEntry<>("#expiry", this.expiryAttr),
+ new AbstractMap.SimpleEntry<>("#in_progress_expiry", this.inProgressExpiryAttr),
+ new AbstractMap.SimpleEntry<>("#status", this.statusAttr))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ Map expressionAttributeValues = Stream.of(
+ new AbstractMap.SimpleEntry<>(":now", AttributeValue.builder().n(String.valueOf(now.getEpochSecond())).build()),
+ new AbstractMap.SimpleEntry<>(":inprogress", AttributeValue.builder().s(INPROGRESS.toString()).build())
+ ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+
dynamoDbClient.putItem(
PutItemRequest.builder()
.tableName(tableName)
.item(item)
- .conditionExpression("attribute_not_exists(#id) OR #expiry < :now")
+ .conditionExpression("attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now AND #status = :inprogress)")
.expressionAttributeNames(expressionAttributeNames)
- .expressionAttributeValues(Collections.singletonMap(":now", AttributeValue.builder().n(String.valueOf(now.getEpochSecond())).build()))
+ .expressionAttributeValues(expressionAttributeValues)
.build()
);
} catch (ConditionalCheckFailedException e) {
@@ -212,12 +230,12 @@ private DataRecord itemToRecord(Map item) {
// data and validation payload may be null
AttributeValue data = item.get(this.dataAttr);
AttributeValue validation = item.get(this.validationAttr);
-
return new DataRecord(item.get(sortKeyAttr != null ? sortKeyAttr: keyAttr).s(),
DataRecord.Status.valueOf(item.get(this.statusAttr).s()),
Long.parseLong(item.get(this.expiryAttr).n()),
data != null ? data.s() : null,
- validation != null ? validation.s() : null);
+ validation != null ? validation.s() : null,
+ item.get(this.inProgressExpiryAttr) != null ? OptionalLong.of(Long.parseLong(item.get(this.inProgressExpiryAttr).n())) : OptionalLong.empty());
}
public static Builder builder() {
@@ -238,6 +256,8 @@ public static class Builder {
private String staticPkValue = String.format("idempotency#%s", funcEnv != null ? funcEnv : "");
private String sortKeyAttr;
private String expiryAttr = "expiration";
+
+ private String inProgressExpiryAttr = "in_progress_expiration";
private String statusAttr = "status";
private String dataAttr = "data";
private String validationAttr = "validation";
@@ -256,7 +276,7 @@ public DynamoDBPersistenceStore build() {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name is not specified");
}
- return new DynamoDBPersistenceStore(tableName, keyAttr, staticPkValue, sortKeyAttr, expiryAttr, statusAttr, dataAttr, validationAttr, dynamoDbClient);
+ return new DynamoDBPersistenceStore(tableName, keyAttr, staticPkValue, sortKeyAttr, expiryAttr, inProgressExpiryAttr, statusAttr, dataAttr, validationAttr, dynamoDbClient);
}
/**
@@ -315,6 +335,17 @@ public Builder withExpiryAttr(String expiryAttr) {
return this;
}
+ /**
+ * DynamoDB attribute name for in progress expiry timestamp (optional), by default "in_progress_expiration"
+ *
+ * @param inProgressExpiryAttr name of the attribute in the table
+ * @return the builder instance (to chain operations)
+ */
+ public Builder withInProgressExpiryAttr(String inProgressExpiryAttr) {
+ this.inProgressExpiryAttr = inProgressExpiryAttr;
+ return this;
+ }
+
/**
* DynamoDB attribute name for status (optional), by default "status"
*
diff --git a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/handlers/IdempotencyInternalFunction.java b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/handlers/IdempotencyInternalFunction.java
index 549d9e7ed..f3c1bdbc9 100644
--- a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/handlers/IdempotencyInternalFunction.java
+++ b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/handlers/IdempotencyInternalFunction.java
@@ -15,6 +15,7 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
+import software.amazon.lambda.powertools.idempotency.Idempotency;
import software.amazon.lambda.powertools.idempotency.IdempotencyKey;
import software.amazon.lambda.powertools.idempotency.Idempotent;
import software.amazon.lambda.powertools.idempotency.model.Basket;
@@ -25,10 +26,19 @@
*/
public class IdempotencyInternalFunction implements RequestHandler {
+ private final boolean registerContext;
private boolean called = false;
+ public IdempotencyInternalFunction(boolean registerContext) {
+ this.registerContext = registerContext;
+ }
+
@Override
public Basket handleRequest(Product input, Context context) {
+ if (registerContext) {
+ Idempotency.registerLambdaContext(context);
+ }
+
return createBasket("fake", input);
}
diff --git a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/handlers/IdempotencyStringFunction.java b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/handlers/IdempotencyStringFunction.java
new file mode 100644
index 000000000..e6d719598
--- /dev/null
+++ b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/handlers/IdempotencyStringFunction.java
@@ -0,0 +1,22 @@
+package software.amazon.lambda.powertools.idempotency.handlers;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import software.amazon.lambda.powertools.idempotency.Idempotent;
+import software.amazon.lambda.powertools.idempotency.model.Product;
+
+public class IdempotencyStringFunction implements RequestHandler {
+
+ private boolean handlerCalled = false;
+
+ public boolean handlerCalled() {
+ return handlerCalled;
+ }
+
+ @Override
+ @Idempotent
+ public String handleRequest(Product input, Context context) {
+ handlerCalled = true;
+ return input.getName();
+ }
+}
diff --git a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyAspectTest.java b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyAspectTest.java
index fc91c6c61..56687e338 100644
--- a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyAspectTest.java
+++ b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyAspectTest.java
@@ -27,6 +27,7 @@
import software.amazon.lambda.powertools.idempotency.IdempotencyConfig;
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyAlreadyInProgressException;
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyConfigurationException;
+import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyInconsistentStateException;
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyItemAlreadyExistsException;
import software.amazon.lambda.powertools.idempotency.handlers.*;
import software.amazon.lambda.powertools.idempotency.model.Basket;
@@ -36,6 +37,8 @@
import software.amazon.lambda.powertools.utilities.JsonConfig;
import java.time.Instant;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
@@ -67,17 +70,22 @@ public void firstCall_shouldPutInStore() {
IdempotencyEnabledFunction function = new IdempotencyEnabledFunction();
+ when(context.getRemainingTimeInMillis()).thenReturn(30000);
+
Product p = new Product(42, "fake product", 12);
Basket basket = function.handleRequest(p, context);
assertThat(basket.getProducts()).hasSize(1);
assertThat(function.handlerCalled()).isTrue();
ArgumentCaptor nodeCaptor = ArgumentCaptor.forClass(JsonNode.class);
- verify(store).saveInProgress(nodeCaptor.capture(), any());
+ ArgumentCaptor expiryCaptor = ArgumentCaptor.forClass(OptionalInt.class);
+ verify(store).saveInProgress(nodeCaptor.capture(), any(), expiryCaptor.capture());
assertThat(nodeCaptor.getValue().get("id").asLong()).isEqualTo(p.getId());
assertThat(nodeCaptor.getValue().get("name").asText()).isEqualTo(p.getName());
assertThat(nodeCaptor.getValue().get("price").asDouble()).isEqualTo(p.getPrice());
+ assertThat(expiryCaptor.getValue().orElse(-1)).isEqualTo(30000);
+
ArgumentCaptor resultCaptor = ArgumentCaptor.forClass(Basket.class);
verify(store).saveSuccess(any(), resultCaptor.capture(), any());
assertThat(resultCaptor.getValue()).isEqualTo(basket);
@@ -93,7 +101,7 @@ public void secondCall_notExpired_shouldGetFromStore() throws JsonProcessingExce
.build()
).configure();
- doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any());
+ doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
Product p = new Product(42, "fake product", 12);
Basket b = new Basket(p);
@@ -114,6 +122,36 @@ public void secondCall_notExpired_shouldGetFromStore() throws JsonProcessingExce
assertThat(function.handlerCalled()).isFalse();
}
+ @Test
+ public void secondCall_notExpired_shouldGetStringFromStore() {
+ // GIVEN
+ Idempotency.config()
+ .withPersistenceStore(store)
+ .withConfig(IdempotencyConfig.builder()
+ .withEventKeyJMESPath("id")
+ .build()
+ ).configure();
+
+ doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
+
+ Product p = new Product(42, "fake product", 12);
+ DataRecord record = new DataRecord(
+ "42",
+ DataRecord.Status.COMPLETED,
+ Instant.now().plus(356, SECONDS).getEpochSecond(),
+ p.getName(),
+ null);
+ doReturn(record).when(store).getRecord(any(), any());
+
+ // WHEN
+ IdempotencyStringFunction function = new IdempotencyStringFunction();
+ String name = function.handleRequest(p, context);
+
+ // THEN
+ assertThat(name).isEqualTo(p.getName());
+ assertThat(function.handlerCalled()).isFalse();
+ }
+
@Test
public void secondCall_inProgress_shouldThrowIdempotencyAlreadyInProgressException() throws JsonProcessingException {
// GIVEN
@@ -124,16 +162,18 @@ public void secondCall_inProgress_shouldThrowIdempotencyAlreadyInProgressExcepti
.build()
).configure();
- doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any());
+ doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
Product p = new Product(42, "fake product", 12);
Basket b = new Basket(p);
+ OptionalLong timestampInFuture = OptionalLong.of(Instant.now().toEpochMilli() + 1000); // timeout not expired (in 1sec)
DataRecord record = new DataRecord(
"42",
DataRecord.Status.INPROGRESS,
Instant.now().plus(356, SECONDS).getEpochSecond(),
JsonConfig.get().getObjectMapper().writer().writeValueAsString(b),
- null);
+ null,
+ timestampInFuture);
doReturn(record).when(store).getRecord(any(), any());
// THEN
@@ -141,6 +181,35 @@ public void secondCall_inProgress_shouldThrowIdempotencyAlreadyInProgressExcepti
assertThatThrownBy(() -> function.handleRequest(p, context)).isInstanceOf(IdempotencyAlreadyInProgressException.class);
}
+ @Test
+ public void secondCall_inProgress_lambdaTimeout_timeoutExpired_shouldThrowInconsistentState() throws JsonProcessingException {
+ // GIVEN
+ Idempotency.config()
+ .withPersistenceStore(store)
+ .withConfig(IdempotencyConfig.builder()
+ .withEventKeyJMESPath("id")
+ .build()
+ ).configure();
+
+ doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
+
+ Product p = new Product(42, "fake product", 12);
+ Basket b = new Basket(p);
+ OptionalLong timestampInThePast = OptionalLong.of(Instant.now().toEpochMilli() - 100); // timeout expired 100ms ago
+ DataRecord record = new DataRecord(
+ "42",
+ DataRecord.Status.INPROGRESS,
+ Instant.now().plus(356, SECONDS).getEpochSecond(),
+ JsonConfig.get().getObjectMapper().writer().writeValueAsString(b),
+ null,
+ timestampInThePast);
+ doReturn(record).when(store).getRecord(any(), any());
+
+ // THEN
+ IdempotencyEnabledFunction function = new IdempotencyEnabledFunction();
+ assertThatThrownBy(() -> function.handleRequest(p, context)).isInstanceOf(IdempotencyInconsistentStateException.class);
+ }
+
@Test
public void functionThrowException_shouldDeleteRecord_andThrowFunctionException() {
// GIVEN
@@ -190,7 +259,37 @@ public void idempotencyOnSubMethodAnnotated_firstCall_shouldPutInStore() {
.configure();
// WHEN
- IdempotencyInternalFunction function = new IdempotencyInternalFunction();
+ boolean registerContext = true;
+ when(context.getRemainingTimeInMillis()).thenReturn(30000);
+
+ IdempotencyInternalFunction function = new IdempotencyInternalFunction(registerContext);
+ Product p = new Product(42, "fake product", 12);
+ Basket basket = function.handleRequest(p, context);
+
+ // THEN
+ assertThat(basket.getProducts()).hasSize(2);
+ assertThat(function.subMethodCalled()).isTrue();
+
+ ArgumentCaptor nodeCaptor = ArgumentCaptor.forClass(JsonNode.class);
+ ArgumentCaptor expiryCaptor = ArgumentCaptor.forClass(OptionalInt.class);
+ verify(store).saveInProgress(nodeCaptor.capture(), any(), expiryCaptor.capture());
+ assertThat(nodeCaptor.getValue().asText()).isEqualTo("fake");
+ assertThat(expiryCaptor.getValue().orElse(-1)).isEqualTo(30000);
+
+ ArgumentCaptor resultCaptor = ArgumentCaptor.forClass(Basket.class);
+ verify(store).saveSuccess(any(), resultCaptor.capture(), any());
+ assertThat(resultCaptor.getValue().getProducts()).contains(basket.getProducts().get(0), new Product(0, "fake", 0));
+ }
+
+ @Test
+ public void idempotencyOnSubMethodAnnotated_firstCall_contextNotRegistered_shouldPutInStore() {
+ Idempotency.config()
+ .withPersistenceStore(store)
+ .configure();
+
+ // WHEN
+ boolean registerContext = false;
+ IdempotencyInternalFunction function = new IdempotencyInternalFunction(registerContext);
Product p = new Product(42, "fake product", 12);
Basket basket = function.handleRequest(p, context);
@@ -199,8 +298,10 @@ public void idempotencyOnSubMethodAnnotated_firstCall_shouldPutInStore() {
assertThat(function.subMethodCalled()).isTrue();
ArgumentCaptor nodeCaptor = ArgumentCaptor.forClass(JsonNode.class);
- verify(store).saveInProgress(nodeCaptor.capture(), any());
+ ArgumentCaptor expiryCaptor = ArgumentCaptor.forClass(OptionalInt.class);
+ verify(store).saveInProgress(nodeCaptor.capture(), any(), expiryCaptor.capture());
assertThat(nodeCaptor.getValue().asText()).isEqualTo("fake");
+ assertThat(expiryCaptor.getValue()).isEmpty();
ArgumentCaptor resultCaptor = ArgumentCaptor.forClass(Basket.class);
verify(store).saveSuccess(any(), resultCaptor.capture(), any());
@@ -214,7 +315,7 @@ public void idempotencyOnSubMethodAnnotated_secondCall_notExpired_shouldGetFromS
.withPersistenceStore(store)
.configure();
- doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any());
+ doThrow(IdempotencyItemAlreadyExistsException.class).when(store).saveInProgress(any(), any(), any());
Product p = new Product(42, "fake product", 12);
Basket b = new Basket(p);
@@ -227,7 +328,7 @@ public void idempotencyOnSubMethodAnnotated_secondCall_notExpired_shouldGetFromS
doReturn(record).when(store).getRecord(any(), any());
// WHEN
- IdempotencyInternalFunction function = new IdempotencyInternalFunction();
+ IdempotencyInternalFunction function = new IdempotencyInternalFunction(false);
Basket basket = function.handleRequest(p, context);
// THEN
@@ -285,4 +386,5 @@ public void idempotencyOnSubMethodVoid_shouldThrowException() {
// THEN
assertThatThrownBy(() -> function.handleRequest(p, context)).isInstanceOf(IdempotencyConfigurationException.class);
}
+
}
diff --git a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java
index dac9a9288..e77a995a4 100644
--- a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java
+++ b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java
@@ -32,6 +32,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
+import java.util.OptionalInt;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -83,12 +84,30 @@ public void saveInProgress_defaultConfig() {
persistenceStore.configure(IdempotencyConfig.builder().build(), null);
Instant now = Instant.now();
- persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now);
+ persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.empty());
assertThat(dr.getStatus()).isEqualTo(DataRecord.Status.INPROGRESS);
assertThat(dr.getExpiryTimestamp()).isEqualTo(now.plus(3600, ChronoUnit.SECONDS).getEpochSecond());
assertThat(dr.getResponseData()).isNull();
assertThat(dr.getIdempotencyKey()).isEqualTo("testFunction#47261bd5b456f400f8d191cfb3a7482f");
assertThat(dr.getPayloadHash()).isEqualTo("");
+ assertThat(dr.getInProgressExpiryTimestamp()).isEmpty();
+ assertThat(status).isEqualTo(1);
+ }
+
+ @Test
+ public void saveInProgress_withRemainingTime() {
+ APIGatewayProxyRequestEvent event = EventLoader.loadApiGatewayRestEvent("apigw_event.json");
+ persistenceStore.configure(IdempotencyConfig.builder().build(), null);
+
+ int lambdaTimeoutMs = 30000;
+ Instant now = Instant.now();
+ persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.of(lambdaTimeoutMs));
+ assertThat(dr.getStatus()).isEqualTo(DataRecord.Status.INPROGRESS);
+ assertThat(dr.getExpiryTimestamp()).isEqualTo(now.plus(3600, ChronoUnit.SECONDS).getEpochSecond());
+ assertThat(dr.getResponseData()).isNull();
+ assertThat(dr.getIdempotencyKey()).isEqualTo("testFunction#47261bd5b456f400f8d191cfb3a7482f");
+ assertThat(dr.getPayloadHash()).isEqualTo("");
+ assertThat(dr.getInProgressExpiryTimestamp().orElse(-1)).isEqualTo(now.plus(lambdaTimeoutMs, ChronoUnit.MILLIS).toEpochMilli());
assertThat(status).isEqualTo(1);
}
@@ -100,7 +119,7 @@ public void saveInProgress_jmespath() {
.build(), "myfunc");
Instant now = Instant.now();
- persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now);
+ persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.empty());
assertThat(dr.getStatus()).isEqualTo(DataRecord.Status.INPROGRESS);
assertThat(dr.getExpiryTimestamp()).isEqualTo(now.plus(3600, ChronoUnit.SECONDS).getEpochSecond());
assertThat(dr.getResponseData()).isNull();
@@ -117,7 +136,7 @@ public void saveInProgress_jmespath_NotFound_shouldThrowException() {
.withThrowOnNoIdempotencyKey(true) // should throw
.build(), "");
Instant now = Instant.now();
- assertThatThrownBy(() -> persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now))
+ assertThatThrownBy(() -> persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.empty()))
.isInstanceOf(IdempotencyKeyException.class)
.hasMessageContaining("No data found to create a hashed idempotency key");
assertThat(status).isEqualTo(-1);
@@ -130,7 +149,7 @@ public void saveInProgress_jmespath_NotFound_shouldNotThrowException() {
.withEventKeyJMESPath("unavailable")
.build(), "");
Instant now = Instant.now();
- persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now);
+ persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.empty());
assertThat(dr.getStatus()).isEqualTo(DataRecord.Status.INPROGRESS);
assertThat(status).isEqualTo(1);
}
@@ -151,7 +170,7 @@ public void saveInProgress_withLocalCache_NotExpired_ShouldThrowException() {
now.plus(3600, ChronoUnit.SECONDS).getEpochSecond(),
null, null)
);
- assertThatThrownBy(() -> persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now))
+ assertThatThrownBy(() -> persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.empty()))
.isInstanceOf(IdempotencyItemAlreadyExistsException.class);
assertThat(status).isEqualTo(-1);
}
@@ -173,7 +192,7 @@ public void saveInProgress_withLocalCache_Expired_ShouldRemoveFromCache() {
now.minus(3, ChronoUnit.SECONDS).getEpochSecond(),
null, null)
);
- persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now);
+ persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.empty());
assertThat(dr.getStatus()).isEqualTo(DataRecord.Status.INPROGRESS);
assertThat(cache).isEmpty();
assertThat(status).isEqualTo(1);
@@ -362,4 +381,20 @@ public void generateHashDouble_shouldGenerateMd5ofDouble() {
String generatedHash = persistenceStore.generateHash(new DoubleNode(256.42));
assertThat(generatedHash).isEqualTo(expectedHash);
}
+
+ @Test
+ public void generateHashString_withSha256Algorithm_shouldGenerateSha256ofString() {
+ persistenceStore.configure(IdempotencyConfig.builder().withHashFunction("SHA-256").build(), null);
+ String expectedHash = "e6139efa88ef3337e901e826e6f327337f414860fb499d9f26eefcff21d719af"; // SHA-256(Lambda rocks)
+ String generatedHash = persistenceStore.generateHash(new TextNode("Lambda rocks"));
+ assertThat(generatedHash).isEqualTo(expectedHash);
+ }
+
+ @Test
+ public void generateHashString_unknownAlgorithm_shouldGenerateMd5ofString() {
+ persistenceStore.configure(IdempotencyConfig.builder().withHashFunction("HASH").build(), null);
+ String expectedHash = "70c24d88041893f7fbab4105b76fd9e1"; // MD5(Lambda rocks)
+ String generatedHash = persistenceStore.generateHash(new TextNode("Lambda rocks"));
+ assertThat(generatedHash).isEqualTo(expectedHash);
+ }
}
diff --git a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java
index 0da516324..86e35cd33 100644
--- a/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java
+++ b/powertools-idempotency/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/DynamoDBPersistenceStoreTest.java
@@ -57,6 +57,68 @@ public void putRecord_shouldCreateRecordInDynamoDB() throws IdempotencyItemAlrea
assertThat(item.get("expiration").n()).isEqualTo(String.valueOf(expiry));
}
+ @Test
+ public void putRecord_shouldCreateRecordInDynamoDB_IfPreviousExpired() {
+ key = Collections.singletonMap("id", AttributeValue.builder().s("key").build());
+
+ // GIVEN: Insert a fake item with same id and expired
+ Map item = new HashMap<>(key);
+ Instant now = Instant.now();
+ long expiry = now.minus(30, ChronoUnit.SECONDS).getEpochSecond();
+ item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build());
+ item.put("status", AttributeValue.builder().s(DataRecord.Status.COMPLETED.toString()).build());
+ item.put("data", AttributeValue.builder().s("Fake Data").build());
+ client.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item).build());
+
+ // WHEN: call putRecord
+ long expiry2 = now.plus(3600, ChronoUnit.SECONDS).getEpochSecond();
+ dynamoDBPersistenceStore.putRecord(
+ new DataRecord("key",
+ DataRecord.Status.INPROGRESS,
+ expiry2,
+ null,
+ null
+ ), now);
+
+ // THEN: an item is inserted
+ Map itemInDb = client.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();
+ assertThat(itemInDb).isNotNull();
+ assertThat(itemInDb.get("status").s()).isEqualTo("INPROGRESS");
+ assertThat(itemInDb.get("expiration").n()).isEqualTo(String.valueOf(expiry2));
+ }
+
+ @Test
+ public void putRecord_shouldCreateRecordInDynamoDB_IfLambdaWasInProgressAndTimedOut() {
+ key = Collections.singletonMap("id", AttributeValue.builder().s("key").build());
+
+ // GIVEN: Insert a fake item with same id and progress expired (Lambda timed out before and we allow a new execution)
+ Map item = new HashMap<>(key);
+ Instant now = Instant.now();
+ long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond();
+ long progressExpiry = now.minus(30, ChronoUnit.SECONDS).getEpochSecond();
+ item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build());
+ item.put("status", AttributeValue.builder().s(DataRecord.Status.INPROGRESS.toString()).build());
+ item.put("data", AttributeValue.builder().s("Fake Data").build());
+ item.put("in_progress_expiration", AttributeValue.builder().n(String.valueOf(progressExpiry)).build());
+ client.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item).build());
+
+ // WHEN: call putRecord
+ long expiry2 = now.plus(3600, ChronoUnit.SECONDS).getEpochSecond();
+ dynamoDBPersistenceStore.putRecord(
+ new DataRecord("key",
+ DataRecord.Status.INPROGRESS,
+ expiry2,
+ null,
+ null
+ ), now);
+
+ // THEN: an item is inserted
+ Map itemInDb = client.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();
+ assertThat(itemInDb).isNotNull();
+ assertThat(itemInDb.get("status").s()).isEqualTo("INPROGRESS");
+ assertThat(itemInDb.get("expiration").n()).isEqualTo(String.valueOf(expiry2));
+ }
+
@Test
public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordAlreadyExist() {
key = Collections.singletonMap("id", AttributeValue.builder().s("key").build());
@@ -65,7 +127,7 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA
Map item = new HashMap<>(key);
Instant now = Instant.now();
long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond();
- item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build());
+ item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build()); // not expired
item.put("status", AttributeValue.builder().s(DataRecord.Status.COMPLETED.toString()).build());
item.put("data", AttributeValue.builder().s("Fake Data").build());
client.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item).build());
@@ -89,6 +151,41 @@ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordA
assertThat(itemInDb.get("data").s()).isEqualTo("Fake Data");
}
+ @Test
+ public void putRecord_shouldThrowIdempotencyItemAlreadyExistsException_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() {
+ key = Collections.singletonMap("id", AttributeValue.builder().s("key").build());
+
+ // GIVEN: Insert a fake item with same id
+ Map item = new HashMap<>(key);
+ Instant now = Instant.now();
+ long expiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); // not expired
+ long progressExpiry = now.plus(30, ChronoUnit.SECONDS).getEpochSecond(); // not expired
+ item.put("expiration", AttributeValue.builder().n(String.valueOf(expiry)).build());
+ item.put("status", AttributeValue.builder().s(DataRecord.Status.INPROGRESS.toString()).build());
+ item.put("data", AttributeValue.builder().s("Fake Data").build());
+ item.put("in_progress_expiration", AttributeValue.builder().n(String.valueOf(progressExpiry)).build());
+ client.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item).build());
+
+ // WHEN: call putRecord
+ long expiry2 = now.plus(3600, ChronoUnit.SECONDS).getEpochSecond();
+ assertThatThrownBy(() -> dynamoDBPersistenceStore.putRecord(
+ new DataRecord("key",
+ DataRecord.Status.INPROGRESS,
+ expiry2,
+ null,
+ null
+ ), now)
+ ).isInstanceOf(IdempotencyItemAlreadyExistsException.class);
+
+ // THEN: item was not updated, retrieve the initial one
+ Map itemInDb = client.getItem(GetItemRequest.builder().tableName(TABLE_NAME).key(key).build()).item();
+ assertThat(itemInDb).isNotNull();
+ assertThat(itemInDb.get("status").s()).isEqualTo("INPROGRESS");
+ assertThat(itemInDb.get("expiration").n()).isEqualTo(String.valueOf(expiry));
+ assertThat(itemInDb.get("data").s()).isEqualTo("Fake Data");
+ }
+
+
//
// =================================================================
diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml
index 0c8d1d8f8..fe4194bfd 100644
--- a/spotbugs-exclude.xml
+++ b/spotbugs-exclude.xml
@@ -32,6 +32,10 @@
+
+
+
+
@@ -69,6 +73,10 @@
+
+
+
+