Skip to content

Commit

Permalink
Merge pull request #97 from pagopa/PRDP-261-update-retry-function
Browse files Browse the repository at this point in the history
[PRDP-261] Updated RetryReviewedPoisonMessages function
  • Loading branch information
pasqualespica authored Dec 4, 2023
2 parents ddd1043 + 75ca90f commit 2df85a2
Show file tree
Hide file tree
Showing 12 changed files with 348 additions and 99 deletions.
11 changes: 7 additions & 4 deletions integration-test/src/features/receipt_pdf_datastore.feature
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
Feature: All about payment events consumed by Azure functions receipt-pdf-generator

Scenario: a biz event enqueued on receipts queue trigger the PDF receipt generation that is stored on receipts generator and blob storage
Given a receipt with id "receipt-generator-int-test-id-1" stored into receipt datastore
Given a receipt with id "receipt-generator-int-test-id-1" and status "INSERTED" stored into receipt datastore
And a random biz event with id "receipt-generator-int-test-id-1" enqueued on receipts queue
When the PDF receipt has been properly generate from biz event after 20000 ms
Then the receipts datastore returns the receipt
And the receipt has eventId "receipt-generator-int-test-id-1"
And the receipt has not the status "TO_REVIEW"
And the receipt has not the status "NOT_QUEUE_SENT"
And the receipt has not the status "INSERTED"
And the blob storage has the PDF document

Scenario: a biz event enqueued on receipts poison queue is enqueued on receipt queue that trigger the PDF receipt generation
Given a receipt with id "receipt-generator-int-test-id-2" stored into receipt datastore
Given a receipt with id "receipt-generator-int-test-id-2" and status "INSERTED" stored into receipt datastore
And a random biz event with id "receipt-generator-int-test-id-2" enqueued on receipts poison queue with poison retry "false"
When the PDF receipt has been properly generate from biz event after 20000 ms
Then the receipts datastore returns the receipt
And the receipt has eventId "receipt-generator-int-test-id-2"
And the receipt has not the status "TO_REVIEW"
And the receipt has not the status "NOT_QUEUE_SENT"
And the receipt has not the status "INSERTED"
And the blob storage has the PDF document

Scenario: a biz event enqueued on receipts poison queue is stored on receipt-message-error datastore and the receipt status is updated to TO_REVIEW
Given a receipt with id "receipt-generator-int-test-id-3" stored into receipt datastore
Given a receipt with id "receipt-generator-int-test-id-3" and status "INSERTED" stored into receipt datastore
And a random biz event with id "receipt-generator-int-test-id-3" enqueued on receipts poison queue with poison retry "true"
When the biz event has been properly stored on receipt-message-error datastore after 20000 ms
Then the receipt-message-error datastore returns the error receipt
Expand All @@ -30,11 +32,12 @@ Feature: All about payment events consumed by Azure functions receipt-pdf-genera
And the receipt has the status "TO_REVIEW"

Scenario: a biz event stored on receipt-message-error is enqueued on receipt queue that trigger the PDF receipt generation
Given a receipt with id "receipt-generator-int-test-id-4" stored into receipt datastore
Given a receipt with id "receipt-generator-int-test-id-4" and status "TO_REVIEW" stored into receipt datastore
And a error receipt with id "receipt-generator-int-test-id-4" stored into receipt-message-error datastore with status REVIEWED
When the PDF receipt has been properly generate from biz event after 20000 ms
Then the receipts datastore returns the receipt
And the receipt has eventId "receipt-generator-int-test-id-4"
And the receipt has not the status "TO_REVIEW"
And the receipt has not the status "NOT_QUEUE_SENT"
And the receipt has not the status "INSERTED"
And the blob storage has the PDF document
4 changes: 2 additions & 2 deletions integration-test/src/step_definitions/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ function createEventForPoisonQueue(id, attemptedPoisonRetry) {
return json_event
}

function createReceipt(id) {
function createReceipt(id, status) {
let receipt =
{
"eventId": id,
Expand All @@ -141,7 +141,7 @@ function createReceipt(id) {
}
]
},
"status": "INSERTED",
"status": status,
"numRetry": 0,
"id": id,
"_rid": "Z9AJAMdamqNjAAAAAAAAAA==",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ this.event = null;
// After each Scenario
After(async function () {
// remove documents
if (this.eventId != null && this.receiptId != null) {
await deleteDocumentFromReceiptsDatastore(this.receiptId, this.eventId);
if (this.receiptId != null) {
await deleteDocumentFromReceiptsDatastore(this.receiptId);
}
if (this.errorReceiptId != null) {
await deleteDocumentFromErrorReceiptsDatastore(this.errorReceiptId);
Expand All @@ -32,12 +32,12 @@ After(async function () {
});


Given('a receipt with id {string} stored into receipt datastore', async function (id) {
Given('a receipt with id {string} and status {string} stored into receipt datastore', async function (id, status) {
this.eventId = id;
// prior cancellation to avoid dirty cases
await deleteDocumentFromReceiptsDatastore(this.eventId, this.eventId);
await deleteDocumentFromReceiptsDatastore(this.eventId);

let receiptsStoreResponse = await createDocumentInReceiptsDatastore(this.eventId);
let receiptsStoreResponse = await createDocumentInReceiptsDatastore(this.eventId, status);
assert.strictEqual(receiptsStoreResponse.statusCode, 201);
this.receiptId = this.eventId;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ async function deleteDocumentFromReceiptsDatastoreByEventId(eventId){
let documents = await getDocumentByIdFromReceiptsDatastore(eventId);

documents?.resources?.forEach(el => {
deleteDocumentFromReceiptsDatastore(el.id, eventId);
deleteDocumentFromReceiptsDatastore(el.id);
})
}

async function createDocumentInReceiptsDatastore(id) {
let receipt = createReceipt(id);
async function createDocumentInReceiptsDatastore(id, status) {
let receipt = createReceipt(id, status);
try {
return await receiptContainer.items.create(receipt);
} catch (err) {
console.log(err);
}
}

async function deleteDocumentFromReceiptsDatastore(id, partitionKey) {
async function deleteDocumentFromReceiptsDatastore(id) {
try {
return await receiptContainer.item(id, partitionKey).delete();
return await receiptContainer.item(id, id).delete();
} catch (error) {
if (error.code !== 404) {
console.log(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@
import com.microsoft.azure.functions.annotation.CosmosDBTrigger;
import com.microsoft.azure.functions.annotation.ExponentialBackoffRetry;
import com.microsoft.azure.functions.annotation.FunctionName;
import it.gov.pagopa.receipt.pdf.generator.client.ReceiptQueueClient;
import it.gov.pagopa.receipt.pdf.generator.client.impl.ReceiptQueueClientImpl;
import it.gov.pagopa.receipt.pdf.generator.entity.event.BizEvent;
import it.gov.pagopa.receipt.pdf.generator.entity.receipt.Receipt;
import it.gov.pagopa.receipt.pdf.generator.entity.receipt.ReceiptError;
import it.gov.pagopa.receipt.pdf.generator.entity.receipt.enumeration.ReceiptErrorStatusType;
import it.gov.pagopa.receipt.pdf.generator.entity.receipt.enumeration.ReceiptStatusType;
import it.gov.pagopa.receipt.pdf.generator.exception.ReceiptNotFoundException;
import it.gov.pagopa.receipt.pdf.generator.exception.UnableToQueueException;
import it.gov.pagopa.receipt.pdf.generator.exception.UnableToSaveException;
import it.gov.pagopa.receipt.pdf.generator.service.ReceiptCosmosService;
import it.gov.pagopa.receipt.pdf.generator.service.impl.ReceiptCosmosServiceImpl;
import it.gov.pagopa.receipt.pdf.generator.utils.Aes256Utils;
import it.gov.pagopa.receipt.pdf.generator.utils.ObjectMapperUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,12 +37,24 @@ public class RetryReviewedPoisonMessages {

private final Logger logger = LoggerFactory.getLogger(RetryReviewedPoisonMessages.class);

private final ReceiptCosmosService receiptCosmosService;
private final ReceiptQueueClient queueService;

public RetryReviewedPoisonMessages() {
this.receiptCosmosService = new ReceiptCosmosServiceImpl();
this.queueService = ReceiptQueueClientImpl.getInstance();
}
RetryReviewedPoisonMessages(ReceiptCosmosService receiptCosmosService, ReceiptQueueClient receiptQueueClient) {
this.receiptCosmosService = receiptCosmosService;
this.queueService = receiptQueueClient;
}

/**
* This function will be invoked when an CosmosDB trigger occurs
*
* When an updated document in the receipt-message-errors CosmosDB has status REVIEWED attempts
* to send it back to the provided output topic.
* If succeeds saves the element with status REQUEUED
* If succeeds saves the element with status REQUEUED and updated the relative receipt's status to INSERTED
* If fails updates the document back in status TO_REVIEW with an updated error description
*
* @param items Reviewed Receipt Errors that triggered the function from the Cosmos database
Expand Down Expand Up @@ -65,8 +86,6 @@ public void processRetryReviewedPoisonMessages(
logger.info("[{}] documentCaptorValue stat {} function - num errors reviewed triggered {}",
context.getFunctionName(), context.getInvocationId(), items.size());

ReceiptQueueClientImpl queueService = ReceiptQueueClientImpl.getInstance();

//Retrieve receipt data from biz-event
for (ReceiptError receiptError : items) {

Expand All @@ -75,15 +94,20 @@ public void processRetryReviewedPoisonMessages(

try {
String decodedEvent = Aes256Utils.decrypt(receiptError.getMessagePayload());

//Find and update Receipt with bizEventId
BizEvent bizEvent = ObjectMapperUtils.mapString(decodedEvent, BizEvent.class);
updateReceiptToInserted(context, bizEvent.getId());

//Send decoded BizEvent to queue
Response<SendMessageResult> sendMessageResult =
queueService.sendMessageToQueue(Base64.getMimeEncoder().encodeToString(decodedEvent.getBytes()));
this.queueService.sendMessageToQueue(Base64.getMimeEncoder().encodeToString(decodedEvent.getBytes()));
if (sendMessageResult.getStatusCode() != HttpStatus.CREATED.value()) {
throw new UnableToQueueException("Unable to queue due to error: " +
sendMessageResult.getStatusCode());
}

receiptError.setStatus(ReceiptErrorStatusType.REQUEUED);

} catch (Exception e) {
//Error info
logger.error("[{}] Error to process receiptError with id {}",
Expand All @@ -96,6 +120,15 @@ public void processRetryReviewedPoisonMessages(
}
}

documentdb.setValue(itemsDone);
if(!itemsDone.isEmpty()){
documentdb.setValue(itemsDone);
}
}

private void updateReceiptToInserted(ExecutionContext context, String bizEventId) throws ReceiptNotFoundException, UnableToSaveException {
Receipt receipt = this.receiptCosmosService.getReceipt(bizEventId);
receipt.setStatus(ReceiptStatusType.INSERTED);
logger.info("[{}] updating receipt with id {} to status {}", context.getFunctionName(), receipt.getId(), ReceiptStatusType.INSERTED);
this.receiptCosmosService.saveReceipt(receipt);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
package it.gov.pagopa.receipt.pdf.generator.client;

import com.azure.cosmos.models.CosmosItemResponse;
import it.gov.pagopa.receipt.pdf.generator.entity.receipt.Receipt;
import it.gov.pagopa.receipt.pdf.generator.exception.ReceiptNotFoundException;

public interface ReceiptCosmosClient {

Receipt getReceiptDocument(String receiptId) throws ReceiptNotFoundException;
/**
* Retrieve receipt document from CosmosDB database
*
* @param eventId Biz-event id
* @return receipt document
* @throws ReceiptNotFoundException in case no receipt has been found with the given idEvent
*/
Receipt getReceiptDocument(String eventId) throws ReceiptNotFoundException;

/**
* Save Receipts on CosmosDB database
*
* @param receipt Receipts to save
* @return receipt documents
*/
CosmosItemResponse<Receipt> saveReceipt(Receipt receipt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.util.CosmosPagedIterable;
import it.gov.pagopa.receipt.pdf.generator.client.ReceiptCosmosClient;
Expand Down Expand Up @@ -45,12 +46,9 @@ public static ReceiptCosmosClientImpl getInstance() {
}

/**
* Retrieve receipt document from CosmosDB database
*
* @param eventId Biz-event id
* @return receipt document
* @throws ReceiptNotFoundException in case no receipt has been found with the given idEvent
* {@inheritDoc}
*/
@Override
public Receipt getReceiptDocument(String eventId) throws ReceiptNotFoundException {
CosmosDatabase cosmosDatabase = this.cosmosClient.getDatabase(databaseId);

Expand All @@ -70,4 +68,16 @@ public Receipt getReceiptDocument(String eventId) throws ReceiptNotFoundExceptio
}

}

/**
* {@inheritDoc}
*/
@Override
public CosmosItemResponse<Receipt> saveReceipt(Receipt receipt) {
CosmosDatabase cosmosDatabase = this.cosmosClient.getDatabase(databaseId);

CosmosContainer cosmosContainer = cosmosDatabase.getContainer(containerId);

return cosmosContainer.createItem(receipt);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package it.gov.pagopa.receipt.pdf.generator.exception;

public class UnableToSaveException extends Exception {

/**
* Constructs new exception with provided message and cause
*
* @param message Detail message
*/
public UnableToSaveException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import it.gov.pagopa.receipt.pdf.generator.entity.receipt.Receipt;
import it.gov.pagopa.receipt.pdf.generator.exception.ReceiptNotFoundException;
import it.gov.pagopa.receipt.pdf.generator.client.ReceiptCosmosClient;
import it.gov.pagopa.receipt.pdf.generator.exception.UnableToSaveException;

public interface ReceiptCosmosService {

Expand All @@ -12,4 +14,11 @@ public interface ReceiptCosmosService {
* @throws ReceiptNotFoundException when no receipt has been found
*/
Receipt getReceipt(String bizEventId) throws ReceiptNotFoundException;

/**
* Saves receipts on CosmosDB using {@link ReceiptCosmosClient}
*
* @param receipt Receipt to save
*/
void saveReceipt(Receipt receipt) throws UnableToSaveException;
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package it.gov.pagopa.receipt.pdf.generator.service.impl;

import com.azure.cosmos.models.CosmosItemResponse;
import it.gov.pagopa.receipt.pdf.generator.client.ReceiptCosmosClient;
import it.gov.pagopa.receipt.pdf.generator.client.impl.ReceiptCosmosClientImpl;
import it.gov.pagopa.receipt.pdf.generator.entity.receipt.Receipt;
import it.gov.pagopa.receipt.pdf.generator.exception.ReceiptNotFoundException;
import it.gov.pagopa.receipt.pdf.generator.exception.UnableToSaveException;
import it.gov.pagopa.receipt.pdf.generator.service.ReceiptCosmosService;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReceiptCosmosServiceImpl implements ReceiptCosmosService {

private final Logger logger = LoggerFactory.getLogger(ReceiptCosmosServiceImpl.class);
private final ReceiptCosmosClient receiptCosmosClient;

public ReceiptCosmosServiceImpl() {
Expand All @@ -27,7 +32,7 @@ public Receipt getReceipt(String bizEventId) throws ReceiptNotFoundException {
try {
receipt = receiptCosmosClient.getReceiptDocument(bizEventId);
} catch (ReceiptNotFoundException e) {
String errorMsg = String.format("Receipt not found with the biz-event id %s",bizEventId);
String errorMsg = String.format("Receipt not found with the biz-event id %s", bizEventId);
throw new ReceiptNotFoundException(errorMsg, e);
}

Expand All @@ -37,4 +42,25 @@ public Receipt getReceipt(String bizEventId) throws ReceiptNotFoundException {
}
return receipt;
}

/**
* {@inheritDoc}
*/
@Override
public void saveReceipt(Receipt receipt) throws UnableToSaveException {
int statusCode;

try{
CosmosItemResponse<Receipt> response = receiptCosmosClient.saveReceipt(receipt);
statusCode = response.getStatusCode();
} catch (Exception e) {
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
logger.error(String.format("Save receipt with eventId %s on cosmos failed", receipt.getEventId()), e);
}

if(statusCode != com.microsoft.azure.functions.HttpStatus.CREATED.value()){
String errorMsg = String.format("Save receipt with eventId %s on cosmos failed with status %s", receipt.getEventId(), statusCode);
throw new UnableToSaveException(errorMsg);
}
}
}
Loading

0 comments on commit 2df85a2

Please sign in to comment.