Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(idempotency): handle lambda timeout scenarios for INPROGRESS records #933

Merged
merged 9 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ Desktop.ini
######################
/bin/
/deploy/
/dist/
/site/

######################
# Logs
Expand Down
126 changes: 124 additions & 2 deletions docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,141 @@ Imagine the function executes successfully, but the client never receives the re

This sequence diagram shows an example flow of what happens in the payment scenario:

![Idempotent sequence](../media/idempotent_sequence.png)
<center>
```mermaid
sequenceDiagram
participant Client
participant Lambda
participant Persistence Layer
alt initial request
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
activate Persistence Layer
Note right of Persistence Layer: Locked to prevent concurrent<br/>invocations with <br/> the same payload.
Lambda-->>Lambda: Call handler (event)
Lambda->>Persistence Layer: Update record with result
deactivate Persistence Layer
Persistence Layer-->>Persistence Layer: Update record with result
Lambda-->>Client: Response sent to client
else retried request
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
Persistence Layer-->>Lambda: Already exists in persistence layer. Return result
Lambda-->>Client: Response sent to client
end
```
<i>Idempotent sequence</i>
</center>

The client was successful in receiving the result after the retry. Since the Lambda handler was only executed once, our customer hasn't been charged twice.

!!! note
Bear in mind that the entire Lambda handler is treated as a single idempotent operation. If your Lambda handler can cause multiple side effects, consider splitting it into separate functions.

#### Lambda timeouts

This is automatically done when you annotate your Lambda handler with [@Idempotent annotation](#idempotent-annotation).

To prevent against extended failed retries when a [Lambda function times out](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-verify-invocation-timeouts/), Powertools calculates and includes the remaining invocation available time as part of the idempotency record.

!!! example
If a second invocation happens **after** this timestamp, and the record is marked as `INPROGRESS`, we will execute the invocation again as if it was in the `EXPIRED` state.
This means that if an invocation expired during execution, it will be quickly executed again on the next retry.

!!! important
If you are using the [@Idempotent annotation on another method](#idempotent-annotation-on-another-method) to guard isolated parts of your code, you must use `registerLambdaContext` method available in the `Idempotency` object to benefit from this protection.

Here is an example on how you register the Lambda context in your handler:

```java hl_lines="13-19" title="Registering the Lambda context"
public class PaymentHandler implements RequestHandler<SQSEvent, List<String>> {

public PaymentHandler() {
Idempotency.config()
.withPersistenceStore(
DynamoDBPersistenceStore.builder()
.withTableName(System.getenv("IDEMPOTENCY_TABLE"))
.build())
.configure();
}

@Override
public List<String> handleRequest(SQSEvent sqsEvent, Context context) {
Idempotency.registerLambdaContext(context);
return sqsEvent.getRecords().stream().map(record -> process(record.getMessageId(), record.getBody())).collect(Collectors.toList());
}

@Idempotent
private String process(String messageId, @IdempotencyKey String messageBody) {
logger.info("Processing messageId: {}", messageId);
PaymentRequest request = extractDataFrom(messageBody).as(PaymentRequest.class);
return paymentService.process(request);
}

}
```

#### Lambda timeout sequence diagram

This sequence diagram shows an example flow of what happens if a Lambda function times out:

<center>
```mermaid
sequenceDiagram
participant Client
participant Lambda
participant Persistence Layer
alt initial request
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
activate Persistence Layer
Note right of Persistence Layer: Locked to prevent concurrent<br/>invocations with <br/> the same payload.
Note over Lambda: Time out
Lambda--xLambda: Call handler (event)
Lambda-->>Client: Return error response
deactivate Persistence Layer
else concurrent request before timeout
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
Persistence Layer-->>Lambda: Request already INPROGRESS
Lambda--xClient: Return IdempotencyAlreadyInProgressError
else retry after Lambda timeout
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
activate Persistence Layer
Note right of Persistence Layer: Locked to prevent concurrent<br/>invocations with <br/> the same payload.
Lambda-->>Lambda: Call handler (event)
Lambda->>Persistence Layer: Update record with result
deactivate Persistence Layer
Persistence Layer-->>Persistence Layer: Update record with result
Lambda-->>Client: Response sent to client
end
```
<i>Idempotent sequence for Lambda timeouts</i>
</center>

### Handling exceptions

If you are using the `@Idempotent` annotation on your Lambda handler or any other method, any unhandled exceptions that are thrown during the code execution will cause **the record in the persistence layer to be deleted**.
This means that new invocations will execute your code again despite having the same payload. If you don't want the record to be deleted, you need to catch exceptions within the idempotent function and return a successful response.

![Idempotent sequence exception](../media/idempotent_sequence_exception.png)
<center>
```mermaid
sequenceDiagram
participant Client
participant Lambda
participant Persistence Layer
Client->>Lambda: Invoke (event)
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
activate Persistence Layer
Note right of Persistence Layer: Locked during this time. Prevents multiple<br/>Lambda invocations with the same<br/>payload running concurrently.
Lambda--xLambda: Call handler (event).<br/>Raises exception
Lambda->>Persistence Layer: Delete record (id=event.search(payload))
deactivate Persistence Layer
Lambda-->>Client: Return error response
```
<i>Idempotent sequence exception</i>
</center>

If an Exception is raised _outside_ the scope of a decorated method and after your method has been called, the persistent record will not be affected. In this case, idempotency will be maintained for your decorated function. Example:

Expand Down
5 changes: 5 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ markdown_extensions:
- pymdownx.snippets:
base_path: '.'
check_paths: true
- pymdownx.superfences:
custom_fences:
- name: mermaid
class: mermaid
format: !!python/name:pymdownx.superfences.fence_code_format
- meta
- toc:
permalink: true
Expand Down
1 change: 1 addition & 0 deletions powertools-idempotency/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-tests</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public static Idempotency getInstance() {
return Holder.instance;
}

/**
* Can be used in a method which is not the handler to capture the Lambda context,
* to calculate the remaining time before the invocation times out.
*
* @param lambdaContext
*/
public static void registerLambdaContext(Context lambdaContext) {
getInstance().getConfig().setLambdaContext(lambdaContext);
}

/**
* Acts like a builder that can be used to configure {@link Idempotency}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package software.amazon.lambda.powertools.idempotency;

import com.amazonaws.services.lambda.runtime.Context;
import software.amazon.lambda.powertools.idempotency.internal.cache.LRUCache;

import java.time.Duration;
Expand All @@ -28,6 +29,7 @@ public class IdempotencyConfig {
private final String payloadValidationJMESPath;
private final boolean throwOnNoIdempotencyKey;
private final String hashFunction;
private Context lambdaContext;

private IdempotencyConfig(String eventKeyJMESPath, String payloadValidationJMESPath, boolean throwOnNoIdempotencyKey, boolean useLocalCache, int localCacheMaxItems, long expirationInSeconds, String hashFunction) {
this.localCacheMaxItems = localCacheMaxItems;
Expand Down Expand Up @@ -71,12 +73,20 @@ public String getHashFunction() {
/**
* Create a builder that can be used to configure and create a {@link IdempotencyConfig}.
*
* @return a new instance of {@link IdempotencyConfig.Builder}
* @return a new instance of {@link Builder}
*/
public static Builder builder() {
return new Builder();
}

public void setLambdaContext(Context lambdaContext) {
this.lambdaContext = lambdaContext;
}

public Context getLambdaContext() {
return lambdaContext;
}

public static class Builder {

private int localCacheMaxItems = 256;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

/**
* IdempotencyInconsistentStateException can happen under rare but expected cases
* when persistent state changes in the small-time between put & get requests.
* when persistent state changes in the small-time between put &amp; get requests.
*/
public class IdempotencyInconsistentStateException extends RuntimeException {
private static final long serialVersionUID = -4293951999802300672L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package software.amazon.lambda.powertools.idempotency.internal;

import com.amazonaws.services.lambda.runtime.Context;
import com.fasterxml.jackson.databind.JsonNode;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
Expand All @@ -25,6 +26,7 @@
import software.amazon.lambda.powertools.utilities.JsonConfig;

import java.time.Instant;
import java.util.OptionalInt;

import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.EXPIRED;
import static software.amazon.lambda.powertools.idempotency.persistence.DataRecord.Status.INPROGRESS;
Expand All @@ -40,10 +42,12 @@ public class IdempotencyHandler {
private final ProceedingJoinPoint pjp;
private final JsonNode data;
private final BasePersistenceStore persistenceStore;
private final Context lambdaContext;

public IdempotencyHandler(ProceedingJoinPoint pjp, String functionName, JsonNode payload) {
public IdempotencyHandler(ProceedingJoinPoint pjp, String functionName, JsonNode payload, Context lambdaContext) {
this.pjp = pjp;
this.data = payload;
this.lambdaContext = lambdaContext;
persistenceStore = Idempotency.getInstance().getPersistenceStore();
persistenceStore.configure(Idempotency.getInstance().getConfig(), functionName);
}
Expand Down Expand Up @@ -77,7 +81,7 @@ private Object processIdempotency() throws Throwable {
try {
// We call saveInProgress first as an optimization for the most common case where no idempotent record
// already exists. If it succeeds, there's no need to call getRecord.
persistenceStore.saveInProgress(data, Instant.now());
persistenceStore.saveInProgress(data, Instant.now(), getRemainingTimeInMillis());
} catch (IdempotencyItemAlreadyExistsException iaee) {
DataRecord record = getIdempotencyRecord();
return handleForStatus(record);
Expand All @@ -89,6 +93,21 @@ private Object processIdempotency() throws Throwable {
return getFunctionResponse();
}

/**
* Tries to determine the remaining time available for the current lambda invocation.
* Currently, it only works if the idempotent handler decorator is used or using {@link Idempotency#registerLambdaContext(Context)}
*
* @return the remaining time in milliseconds or empty if the context was not provided/found
*/
private OptionalInt getRemainingTimeInMillis() {
if (lambdaContext != null) {
return OptionalInt.of(lambdaContext.getRemainingTimeInMillis());
} else {
LOG.warn("Couldn't determine the remaining time left. Did you call registerLambdaContext on Idempotency?");
}
return OptionalInt.empty();
}

/**
* Retrieve the idempotency record from the persistence layer.
*
Expand Down Expand Up @@ -121,12 +140,18 @@ private Object handleForStatus(DataRecord record) {
}

if (INPROGRESS.equals(record.getStatus())) {
if (record.getInProgressExpiryTimestamp().isPresent()
&& record.getInProgressExpiryTimestamp().getAsLong() < Instant.now().toEpochMilli()) {
throw new IdempotencyInconsistentStateException("Item should have been expired in-progress because it already time-outed.");
}
throw new IdempotencyAlreadyInProgressException("Execution already in progress with idempotency key: " + record.getIdempotencyKey());
}

Class<?> returnType = ((MethodSignature) pjp.getSignature()).getReturnType();
try {
LOG.debug("Response for key '{}' retrieved from idempotency store, skipping the function", record.getIdempotencyKey());
if (returnType.equals(String.class))
return record.getResponseData();
return JsonConfig.get().getObjectMapper().reader().readValue(record.getResponseData(), returnType);
} catch (Exception e) {
throw new IdempotencyPersistenceLayerException("Unable to get function response as " + returnType.getSimpleName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import com.amazonaws.services.lambda.runtime.Context;
import software.amazon.lambda.powertools.idempotency.Constants;
import software.amazon.lambda.powertools.idempotency.Idempotency;
import software.amazon.lambda.powertools.idempotency.IdempotencyKey;
import software.amazon.lambda.powertools.idempotency.Idempotent;
import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyConfigurationException;
Expand Down Expand Up @@ -56,12 +58,20 @@ public Object around(ProceedingJoinPoint pjp,
throw new IdempotencyConfigurationException("The annotated method doesn't return anything. Unable to perform idempotency on void return type");
}

JsonNode payload = getPayload(pjp, method);
boolean isHandler = (isHandlerMethod(pjp) && placedOnRequestHandler(pjp));
JsonNode payload = getPayload(pjp, method, isHandler);
if (payload == null) {
throw new IdempotencyConfigurationException("Unable to get payload from the method. Ensure there is at least one parameter or that you use @IdempotencyKey");
}

IdempotencyHandler idempotencyHandler = new IdempotencyHandler(pjp, method.getName(), payload);
Context lambdaContext;
if (isHandler) {
lambdaContext = (Context) pjp.getArgs()[1];
} else {
lambdaContext = Idempotency.getInstance().getConfig().getLambdaContext();
}

IdempotencyHandler idempotencyHandler = new IdempotencyHandler(pjp, method.getName(), payload, lambdaContext);
return idempotencyHandler.handle();
}

Expand All @@ -71,11 +81,10 @@ public Object around(ProceedingJoinPoint pjp,
* @param method the annotated method
* @return the payload used for idempotency
*/
private JsonNode getPayload(ProceedingJoinPoint pjp, Method method) {
private JsonNode getPayload(ProceedingJoinPoint pjp, Method method, boolean isHandler) {
JsonNode payload = null;
// handleRequest or method with one parameter: get the first one
if ((isHandlerMethod(pjp) && placedOnRequestHandler(pjp))
|| pjp.getArgs().length == 1) {
if (isHandler || pjp.getArgs().length == 1) {
payload = JsonConfig.get().getObjectMapper().valueToTree(pjp.getArgs()[0]);
} else {
// Look for a parameter annotated with @IdempotencyKey
Expand Down
Loading