Skip to content

Commit

Permalink
feat(bulk/#2802): merge integration tests for bulk parties and quotes (
Browse files Browse the repository at this point in the history
…#376)

* updated deps

* docker compose for end-end testing

* updated dependencies

* updates to end-end docker compose

* added docker compose for local testing

* added local-testing folder

* test case for bulk quote accept event

* Refactored the func test files and p2p is working

* running tests to check they are working

* chore: added sample ttk env file

* chore: updated sdk-out api in TTK func test framework

* fix

* fix

* added kafka related env vars

* removed duplicate env vars

* fix: func-tests-kafka-consumer-group-config

* fix: ttk-func-config

* TTK test cases for bulk - initial commit

* feat: added put bulktransactions to mojaloop sim api in TTK

* chore: added some sample code

* chore: some ttk rules changes

* removed ttk for bulk

* added redis insights to docker compose

* feat(mojaloop/#2802): rework ProcessBulkQuotesCallback logic and add int tests for bulk quotes (#366)

* feat: add tests for bulk quotes

* chore: more

* chore: format

* chore: lock

* chore: cleanup

* chore: dep test

* chore: file dep test

* chore: dep and bug fix

* chore: address comments

* chore: address more comments

* chore: add assert

* chore: changes

* chore: add assertion

* chore: revert

* chore: shorthand

* chore: remove unnecessary asserts

* chore: update logic

* chore: update tests

* chore: time

* chore: timeout

* chore: update logic

* end-end testing diagram

* folder restructure for tests

* refactor folders for discovery integration tests

* updated bulk ttk test case

* changed mvp bulk testcase

* fixed ttk rule

* renamed files

* added more test assertions

* feat: added parties error response rule

* refactor(mojaloop/#2802): rework party lookup loop and count increment logic (#378)

* refactor: rework party lookup loop

* chore: fix seq doc

* chore: doc

* chore: fix comment

* chore: resolve duplicate callback timing issue

* chore: test fix and bulk quote increment

* chore: dep up

* chore: missing await

* chore: increase wait

* chore: wait

* chore: dep

* chore: update

* chore: give docker more time to boot

* chore: fix

* chore: ci wait

* chore: diff

* wait

* chore: log

* chore: try topics

* chore: config

* fix: fix build issue

* fix: fix some merge issue

* chore: updated dependencies and lint

Co-authored-by: Vijay Kumar <vijaya.guthi@modusbox.com>
Co-authored-by: Kevin Leyow <kleyow@gmail.com>
  • Loading branch information
3 people authored Sep 22, 2022
1 parent 3ee88b1 commit 73217eb
Show file tree
Hide file tree
Showing 66 changed files with 12,444 additions and 1,783 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ jobs:
at: /tmp
- run:
name: Load the pre-built docker image from workspace
command: |
command: |
docker load -i /tmp/docker-image.tar
- run:
name: Login to Docker Hub
Expand Down Expand Up @@ -576,7 +576,7 @@ jobs:
at: /tmp
- run:
name: Load the pre-built docker image from workspace
command: |
command: |
docker load -i /tmp/docker-image.tar
- run:
name: Login to Docker Hub
Expand Down
20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,23 @@ services:
- kafka
profiles:
- debug

init-kafka:
networks:
- mojaloop-net
image: docker.io/bitnami/kafka:3.2
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics.sh --bootstrap-server kafka:9093 --list
echo -e 'Creating kafka topics'
kafka-topics.sh --bootstrap-server kafka:9093 --create --if-not-exists --topic topic-sdk-outbound-command-events --replication-factor 1 --partitions 1
kafka-topics.sh --bootstrap-server kafka:9093 --create --if-not-exists --topic topic-sdk-outbound-domain-events --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics.sh --bootstrap-server kafka:9093 --list
"
11 changes: 6 additions & 5 deletions docs/design-bulk-transfers/outbound-sequence.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ sequenceDiagram
SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: Update the party response
SDKOutboundCommandEventHandler->>SDKOutboundDomainEventHandler: PartyInfoCallbackProcessed
Note right of SDKOutboundDomainEventHandler: topic-sdk-outbound-domain-events
SDKOutboundDomainEventHandler->>SDKOutboundDomainEventHandler: Check the status of the remaining items in the bulk
SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: Check the status of the remaining items in the bulk
end
SDKOutboundDomainEventHandler->>SDKOutboundCommandEventHandler: ProcessSDKOutboundBulkPartyInfoRequestComplete
Note left of SDKOutboundCommandEventHandler: topic-sdk-outbound-command-events
SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: Update global state "DISCOVERY_COMPLETED"
SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: check optiions.autoAcceptParty in redis
SDKOutboundCommandEventHandler->>SDKOutboundDomainEventHandler: SDKOutboundBulkPartyInfoRequestProcessed
Note right of SDKOutboundDomainEventHandler: topic-sdk-outbound-domain-events
SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: check options.autoAcceptParty in redis
alt autoAcceptParty == false
SDKOutboundCommandEventHandler->>SDKOutboundAPI: SDKOutboundBulkAcceptPartyInfoRequested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ module.exports.handlePartyInfoRequestedDmEvt = async (
await options.producer.sendDomainEvent(partyInfoCallbackReceivedDmEvt);
} catch (err) {
logger.push({ err }).log('Error in handlePartyInfoRequestedDmEvt');
const { code, message } = Errors.MojaloopApiErrorCodes.SERVER_TIMED_OUT
const { code, message } = Errors.MojaloopApiErrorCodes.SERVER_TIMED_OUT;
const partyInfoCallbackReceivedDmEvt = new PartyInfoCallbackReceivedDmEvt({
bulkId: event.getKey(),
content: {
Expand Down
4 changes: 2 additions & 2 deletions modules/api-svc/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ module.exports = {
enabled: env.get('ENABLE_BACKEND_EVENT_HANDLER').default('true').asBool(),
domainEventConsumer: {
brokerList: env.get('BACKEND_EVENT_CONSUMER_BROKER_LIST').default('localhost:9092').asString(),
groupId: env.get('BACKEND_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_group').asString(),
groupId: env.get('BACKEND_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_api_svc_backend_group').asString(),
clientId: env.get('BACKEND_EVENT_CONSUMER_CLIENT_ID').default('backend_consumer_client_id').asString(),
topics: env.get('BACKEND_EVENT_CONSUMER_TOPICS').default('topic-sdk-outbound-domain-events').asArray(),
},
Expand All @@ -107,7 +107,7 @@ module.exports = {
enabled: env.get('ENABLE_FSPIOP_EVENT_HANDLER').default('true').asBool(),
domainEventConsumer: {
brokerList: env.get('FSPIOP_EVENT_CONSUMER_BROKER_LIST').default('localhost:9092').asString(),
groupId: env.get('FSPIOP_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_group').asString(),
groupId: env.get('FSPIOP_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_api_svc_fspiop_group').asString(),
clientId: env.get('FSPIOP_EVENT_CONSUMER_CLIENT_ID').default('fspiop_consumer_client_id').asString(),
topics: env.get('FSPIOP_EVENT_CONSUMER_TOPICS').default('topic-sdk-outbound-domain-events').asArray(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import {
ProcessSDKOutboundBulkRequestCmdEvt,
ProcessSDKOutboundBulkPartyInfoRequestCmdEvt,
ProcessPartyInfoCallbackCmdEvt,
ProcessSDKOutboundBulkPartyInfoRequestCompleteCmdEvt,
ProcessSDKOutboundBulkAcceptPartyInfoCmdEvt,
ProcessSDKOutboundBulkQuotesRequestCmdEvt,
ProcessBulkQuotesCallbackCmdEvt,
Expand Down Expand Up @@ -126,14 +125,6 @@ export class OutboundEventHandler implements IRunHandler {
);
break;
}
case ProcessSDKOutboundBulkPartyInfoRequestCompleteCmdEvt.name: {
BulkTransactionAgg.ProcessCommandEvent(
ProcessSDKOutboundBulkPartyInfoRequestCompleteCmdEvt.CreateFromCommandEvent(message),
this._commandEventHandlerOptions,
this._logger,
);
break;
}
case ProcessSDKOutboundBulkAcceptPartyInfoCmdEvt.name: {
BulkTransactionAgg.ProcessCommandEvent(
ProcessSDKOutboundBulkAcceptPartyInfoCmdEvt.CreateFromCommandEvent(message),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@

import * as ProcessSDKOutboundBulkRequestHandler from './process_sdk_outbound_bulk_request';
import * as ProcessSDKOutboundBulkPartyInfoRequestHandler from './process_sdk_outbound_bulk_party_info_request';
import * as ProcessSDKOutboundBulkPartyInfoRequestCompleteHandler from './process_sdk_outbound_bulk_party_info_request_complete';
import * as ProcessPartyInfoCallbackHandler from './process_party_info_callback';
import * as ProcessSDKOutboundBulkAcceptPartyInfoHandler from './process_sdk_outbound_bulk_accept_party_info';
import * as ProcessBulkQuotesCallbackHandler from './process-bulk-quotes-callback';
import * as ProcessBulkQuotesCallbackHandler from './process_bulk_quotes_callback';
import * as ProcessSDKOutboundBulkQuotesRequestHandler from './process_sdk_outbound_bulk_quotes_request';

import { CommandEvent } from '@mojaloop/sdk-scheme-adapter-private-shared-lib';
Expand All @@ -37,7 +36,6 @@ import { ILogger } from '@mojaloop/logging-bc-public-types-lib';
export default {
...ProcessSDKOutboundBulkRequestHandler,
...ProcessSDKOutboundBulkPartyInfoRequestHandler,
...ProcessSDKOutboundBulkPartyInfoRequestCompleteHandler,
...ProcessPartyInfoCallbackHandler,
...ProcessSDKOutboundBulkAcceptPartyInfoHandler,
...ProcessBulkQuotesCallbackHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export async function handleProcessBulkQuotesCallbackCmdEvt(
const processBulkQuotesCallbackMessage = message as ProcessBulkQuotesCallbackCmdEvt;
try {
logger.info(`Got ProcessBulkQuotesCallbackCmdEvt: id=${processBulkQuotesCallbackMessage.getKey()}`);
let successCountAfterIncrement;
let failedCountAfterIncrement;

// Create aggregate
const bulkTransactionAgg = await BulkTransactionAgg.CreateFromRepo(
Expand All @@ -50,30 +52,45 @@ export async function handleProcessBulkQuotesCallbackCmdEvt(
processBulkQuotesCallbackMessage.batchId,
);
const bulkQuotesResult = processBulkQuotesCallbackMessage.bulkQuotesResult;
// TODO: There is no currentState in the bulkQuotesResult specification, but its there in the response. Need to discuss on this.
// if(bulkQuotesResult.currentState && bulkQuotesResult.currentState === 'COMPLETED') {
if(bulkQuotesResult.individualQuoteResults.length > 0) {
bulkBatch.setState(BulkBatchInternalState.AGREEMENT_SUCCESS);
bulkTransactionAgg.incrementBulkQuotesSuccessCount();

// If individual quote result contains `lastError` the individual transfer state should be AGREEMENT_FAILED.
// bulkQuotesResult.currentState === 'ERROR_OCCURRED' necessitates erroring out all individual transfers in that bulk batch.
if(bulkQuotesResult.currentState &&
bulkQuotesResult.currentState === 'COMPLETED') {
bulkBatch.setState(BulkBatchInternalState.AGREEMENT_COMPLETED);
successCountAfterIncrement = await bulkTransactionAgg.incrementBulkQuotesSuccessCount();

// Iterate through items in batch and update the individual states
for await (const quoteResult of bulkQuotesResult.individualQuoteResults) {
if(quoteResult.quoteId && !quoteResult.lastError) {
const individualTransferId = bulkBatch.getReferenceIdForQuoteId(quoteResult.quoteId);
const individualTransfer = await bulkTransactionAgg.getIndividualTransferById(individualTransferId);
individualTransfer.setTransferState(IndividualTransferInternalState.AGREEMENT_SUCCESS);
individualTransfer.setQuoteResponse(quoteResult);
await bulkTransactionAgg.setIndividualTransferById(individualTransfer.id, individualTransfer);
} else {
const individualTransferId = bulkBatch.getReferenceIdForQuoteId(quoteResult.quoteId);
const individualTransfer = await bulkTransactionAgg.getIndividualTransferById(individualTransferId);
individualTransfer.setTransferState(IndividualTransferInternalState.AGREEMENT_FAILED);
individualTransfer.setQuoteResponse(quoteResult);
await bulkTransactionAgg.setIndividualTransferById(individualTransfer.id, individualTransfer);
}
}
// If the bulk quote is in any other state, update the bulk batch and all individual transfers
// to AGREEMENT_FAILED.
} else {
bulkBatch.setState(BulkBatchInternalState.AGREEMENT_FAILED);
bulkTransactionAgg.incrementBulkQuotesFailedCount();
}
bulkBatch.setBulkQuotesResponse(bulkQuotesResult);
await bulkTransactionAgg.setBulkBatchById(bulkBatch.id, bulkBatch);

failedCountAfterIncrement = await bulkTransactionAgg.incrementBulkQuotesFailedCount();

// Iterate through items in batch and update the individual states
for await (const quoteResult of bulkQuotesResult.individualQuoteResults) {
// TODO: quoteId should be required field in the quoteResponse. But it is optional now, so if it is empty we are ignoring the result for now
if(quoteResult.quoteId) {
const individualTransferId = bulkBatch.getReferenceIdForQuoteId(quoteResult.quoteId);
const individualTransferIds = Object.values(bulkBatch.quoteIdReferenceIdMap);
for await (const individualTransferId of individualTransferIds) {
const individualTransfer = await bulkTransactionAgg.getIndividualTransferById(individualTransferId);
individualTransfer.setTransferState(IndividualTransferInternalState.AGREEMENT_SUCCESS);
individualTransfer.setQuoteResponse(quoteResult);
individualTransfer.setTransferState(IndividualTransferInternalState.AGREEMENT_FAILED);
await bulkTransactionAgg.setIndividualTransferById(individualTransfer.id, individualTransfer);
}
}
bulkBatch.setBulkQuotesResponse(bulkQuotesResult);
await bulkTransactionAgg.setBulkBatchById(bulkBatch.id, bulkBatch);

const bulkQuotesCallbackProcessedDmEvt = new BulkQuotesCallbackProcessedDmEvt({
bulkId: bulkTransactionAgg.bulkId,
Expand All @@ -88,12 +105,12 @@ export async function handleProcessBulkQuotesCallbackCmdEvt(
// Progressing to the next step
// Check the status of the remaining items in the bulk
const bulkQuotesTotalCount = await bulkTransactionAgg.getBulkQuotesTotalCount();
const bulkQuotesSuccessCount = await bulkTransactionAgg.getBulkQuotesSuccessCount();
const bulkQuotesFailedCount = await bulkTransactionAgg.getBulkQuotesFailedCount();
const bulkQuotesSuccessCount = successCountAfterIncrement || await bulkTransactionAgg.getBulkQuotesSuccessCount();
const bulkQuotesFailedCount = failedCountAfterIncrement || await bulkTransactionAgg.getBulkQuotesFailedCount();
if(bulkQuotesTotalCount === (bulkQuotesSuccessCount + bulkQuotesFailedCount)) {
// Update global state "AGREEMENT_COMPLETED"
await bulkTransactionAgg.setGlobalState(BulkTransactionInternalState.AGREEMENT_COMPLETED);

// Send the domain message SDKOutboundBulkQuotesRequestProcessed
const sdkOutboundBulkQuotesRequestProcessedDmEvt = new SDKOutboundBulkQuotesRequestProcessedDmEvt({
bulkId: bulkTransactionAgg.bulkId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import {
IndividualTransferInternalState,
ProcessPartyInfoCallbackCmdEvt,
PartyInfoCallbackProcessedDmEvt,
IPartyResult,
SDKOutboundTransferState,
BulkTransactionInternalState,
SDKOutboundBulkPartyInfoRequestProcessedDmEvt,
SDKOutboundBulkAutoAcceptPartyInfoRequestedDmEvt,
SDKOutboundBulkAcceptPartyInfoRequestedDmEvt,
} from '@mojaloop/sdk-scheme-adapter-private-shared-lib';
import { BulkTransactionAgg } from '..';
import { ICommandEventHandlerOptions } from '@module-types';
Expand All @@ -43,6 +46,8 @@ export async function handleProcessPartyInfoCallbackCmdEvt(
): Promise<void> {
const processPartyInfoCallback = message as ProcessPartyInfoCallbackCmdEvt;
try {
let successCountAfterIncrement;
let failedCountAfterIncrement;
logger.info(`Got ProcessPartyInfoCallbackCmdEvt: id=${processPartyInfoCallback.getKey()}`);

// Create aggregate
Expand All @@ -58,13 +63,12 @@ export async function handleProcessPartyInfoCallbackCmdEvt(
const partyResult = processPartyInfoCallback.getPartyResult();
if(partyResult.currentState && partyResult.currentState === SDKOutboundTransferState.COMPLETED && !partyResult.errorInformation) {
individualTransfer.setTransferState(IndividualTransferInternalState.DISCOVERY_SUCCESS);
await bulkTransactionAgg.incrementPartyLookupSuccessCount();
successCountAfterIncrement = await bulkTransactionAgg.incrementPartyLookupSuccessCount();
} else {
individualTransfer.setTransferState(IndividualTransferInternalState.DISCOVERY_FAILED);
await bulkTransactionAgg.incrementPartyLookupFailedCount();
failedCountAfterIncrement = await bulkTransactionAgg.incrementPartyLookupFailedCount();
}
individualTransfer.setPartyResponse(partyResult);

await bulkTransactionAgg.setIndividualTransferById(individualTransfer.id, individualTransfer);

const msg = new PartyInfoCallbackProcessedDmEvt({
Expand All @@ -76,6 +80,69 @@ export async function handleProcessPartyInfoCallbackCmdEvt(
headers: [],
});
await options.domainProducer.sendDomainEvent(msg);

// Progressing to the next step
// Check the status of the remaining party lookups
const partyLookupTotalCount = await bulkTransactionAgg.getPartyLookupTotalCount();
const partyLookupSuccessCount = successCountAfterIncrement || await bulkTransactionAgg.getPartyLookupSuccessCount();
const partyLookupFailedCount = failedCountAfterIncrement || await bulkTransactionAgg.getPartyLookupFailedCount();
if(partyLookupTotalCount === (partyLookupSuccessCount + partyLookupFailedCount)) {
// Update global state "DISCOVERY_COMPLETED"
await bulkTransactionAgg.setGlobalState(BulkTransactionInternalState.DISCOVERY_COMPLETED);

// Send the domain message SDKOutboundBulkPartyInfoRequestProcessedDmEvt
const sdkOutboundBulkPartyInfoRequestProcessedDmEvt = new SDKOutboundBulkPartyInfoRequestProcessedDmEvt({
bulkId: bulkTransactionAgg.bulkId,
timestamp: Date.now(),
headers: [],
});
await options.domainProducer.sendDomainEvent(sdkOutboundBulkPartyInfoRequestProcessedDmEvt);
logger.info(`Sent domain event message ${SDKOutboundBulkPartyInfoRequestProcessedDmEvt.name}`);

// Progressing to the next step
// Check configuration parameter isAutoAcceptPartyEnabled
const bulkTx = bulkTransactionAgg.getBulkTransaction();
if(bulkTx.isAutoAcceptPartyEnabled()) {
const autoAcceptPartyMsg = new SDKOutboundBulkAutoAcceptPartyInfoRequestedDmEvt({
bulkId: bulkTx.id,
timestamp: Date.now(),
headers: [],
});
await options.domainProducer.sendDomainEvent(autoAcceptPartyMsg);
} else {
const individualTransferResults = [];
const allIndividualTransferIds = await bulkTransactionAgg.getAllIndividualTransferIds();
for await (const individualTransferId of allIndividualTransferIds) {
const individualTransferData = await bulkTransactionAgg
.getIndividualTransferById(individualTransferId);
if(individualTransferData.partyResponse) {
individualTransferResults.push({
homeTransactionId: individualTransferData.request.homeTransactionId,
transactionId: individualTransferData.id,
to: individualTransferData.partyResponse?.party,
lastError: individualTransferData.partyResponse?.errorInformation && {
mojaloopError: individualTransferData.partyResponse?.errorInformation,
},
});
}
}
const sdkOutboundBulkAcceptPartyInfoRequestedDmEvt = new SDKOutboundBulkAcceptPartyInfoRequestedDmEvt({
bulkId: bulkTransactionAgg.bulkId,
request: {
bulkHomeTransactionID: bulkTx.bulkHomeTransactionID,
bulkTransactionId: bulkTransactionAgg.bulkId,
individualTransferResults,
},
timestamp: Date.now(),
headers: [],
});
await options.domainProducer.sendDomainEvent(sdkOutboundBulkAcceptPartyInfoRequestedDmEvt);
logger.info(`Sent domain event message ${SDKOutboundBulkAcceptPartyInfoRequestedDmEvt.name}`);

bulkTx.setTxState(BulkTransactionInternalState.DISCOVERY_ACCEPTANCE_PENDING);
}
await bulkTransactionAgg.setTransaction(bulkTx);
}
} catch (err) {
logger.error(`Failed to create BulkTransactionAggregate. ${(err as Error).message}`);
}
Expand Down
Loading

0 comments on commit 73217eb

Please sign in to comment.