Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bulk/#2802): merge integration tests for bulk parties and quotes #376

Merged
merged 57 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
27495e2
updated deps
Sep 7, 2022
f670ebd
docker compose for end-end testing
Sep 9, 2022
e7130e5
updated dependencies
Sep 9, 2022
0643d2d
updates to end-end docker compose
Sep 9, 2022
1c339ad
merge latest changes from bulk-sdk
Sep 9, 2022
ce02117
added docker compose for local testing
Sep 9, 2022
7631679
added local-testing folder
Sep 9, 2022
2095e43
Merge branch 'mvp/bulk-sdk' into feat/#2802-bulk-quotes-intg-tests
Sep 9, 2022
a714ab1
test case for bulk quote accept event
Sep 9, 2022
49dd0f6
Refactored the func test files and p2p is working
Sep 9, 2022
9b9454d
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 9, 2022
ca98b9a
running tests to check they are working
Sep 9, 2022
17f4395
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 9, 2022
3b08d5d
chore: added sample ttk env file
Sep 9, 2022
5339bf9
chore: updated sdk-out api in TTK func test framework
Sep 9, 2022
bcd8d87
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 9, 2022
a5ad7d7
Merge branch 'mvp/bulk-sdk' of https://github.com/mojaloop/sdk-scheme…
Sep 9, 2022
acb0a2b
fix
Sep 9, 2022
4a76817
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 9, 2022
8ea8b64
fix
Sep 9, 2022
b133aef
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 9, 2022
48521d8
added kafka related env vars
Sep 10, 2022
7dc5272
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 10, 2022
2f3416d
removed duplicate env vars
Sep 12, 2022
d6ecba7
Merge branch 'mvp/bulk-sdk' of https://github.com/mojaloop/sdk-scheme…
Sep 13, 2022
f5e622c
fix: func-tests-kafka-consumer-group-config
Sep 13, 2022
687e2b3
fix: ttk-func-config
Sep 13, 2022
c0aff4d
Merge branch 'mvp/bulk-sdk' of https://github.com/mojaloop/sdk-scheme…
Sep 13, 2022
ebeba48
Merge branch 'mvp/bulk-sdk' of https://github.com/mojaloop/sdk-scheme…
Sep 14, 2022
448be4c
TTK test cases for bulk - initial commit
Sep 14, 2022
7b552f1
feat: added put bulktransactions to mojaloop sim api in TTK
Sep 14, 2022
1beb568
chore: added some sample code
Sep 14, 2022
9f71903
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 14, 2022
8f995b9
chore: some ttk rules changes
Sep 14, 2022
401b1a3
Merge branch 'mvp/bulk-sdk' of https://github.com/mojaloop/sdk-scheme…
Sep 14, 2022
de977b5
removed ttk for bulk
Sep 14, 2022
33e4982
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 14, 2022
8065f54
added redis insights to docker compose
Sep 14, 2022
d0939bd
Merge branch 'mvp/bulk-sdk' into feat/#2802-bulk-quotes-intg-tests
Sep 14, 2022
261df59
feat(mojaloop/#2802): rework ProcessBulkQuotesCallback logic and add …
kleyow Sep 15, 2022
3f8aa02
end-end testing diagram
Sep 15, 2022
ad0231a
Merge branch 'feat/#2802-bulk-quotes-intg-tests' of https://github.co…
Sep 15, 2022
1d9fa94
folder restructure for tests
Sep 15, 2022
4695578
refactor folders for discovery integration tests
Sep 15, 2022
52100ca
updated bulk ttk test case
Sep 15, 2022
f2b6fb8
changed mvp bulk testcase
Sep 15, 2022
3f80925
fixed ttk rule
Sep 15, 2022
13abf93
renamed files
Sep 15, 2022
0061d6f
added more test assertions
Sep 19, 2022
64a7d13
Merge branch 'mvp/bulk-sdk' into feat/#2802-bulk-quotes-intg-tests
Sep 19, 2022
f772235
feat: added parties error response rule
Sep 19, 2022
fcbe86b
Merge branch 'mvp/bulk-sdk' of https://github.com/mojaloop/sdk-scheme…
Sep 20, 2022
bd580c3
refactor(mojaloop/#2802): rework party lookup loop and count incremen…
kleyow Sep 22, 2022
6a200cb
fix: merge mvp-bulk-sdk branch
Sep 22, 2022
6bfd784
fix: fix build issue
Sep 22, 2022
78940d1
fix: fix some merge issue
Sep 22, 2022
de7ab28
chore: updated dependencies and lint
Sep 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ jobs:
at: /tmp
- run:
name: Load the pre-built docker image from workspace
command: |
command: |
docker load -i /tmp/docker-image.tar
- run:
name: Login to Docker Hub
Expand Down Expand Up @@ -576,7 +576,7 @@ jobs:
at: /tmp
- run:
name: Load the pre-built docker image from workspace
command: |
command: |
docker load -i /tmp/docker-image.tar
- run:
name: Login to Docker Hub
Expand Down
20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,23 @@ services:
- kafka
profiles:
- debug

init-kafka:
networks:
- mojaloop-net
image: docker.io/bitnami/kafka:3.2
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics.sh --bootstrap-server kafka:9093 --list

echo -e 'Creating kafka topics'
kafka-topics.sh --bootstrap-server kafka:9093 --create --if-not-exists --topic topic-sdk-outbound-command-events --replication-factor 1 --partitions 1
kafka-topics.sh --bootstrap-server kafka:9093 --create --if-not-exists --topic topic-sdk-outbound-domain-events --replication-factor 1 --partitions 1

echo -e 'Successfully created the following topics:'
kafka-topics.sh --bootstrap-server kafka:9093 --list
"
11 changes: 6 additions & 5 deletions docs/design-bulk-transfers/outbound-sequence.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ sequenceDiagram
SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: Update the party response
SDKOutboundCommandEventHandler->>SDKOutboundDomainEventHandler: PartyInfoCallbackProcessed
Note right of SDKOutboundDomainEventHandler: topic-sdk-outbound-domain-events
SDKOutboundDomainEventHandler->>SDKOutboundDomainEventHandler: Check the status of the remaining items in the bulk
SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: Check the status of the remaining items in the bulk
end
SDKOutboundDomainEventHandler->>SDKOutboundCommandEventHandler: ProcessSDKOutboundBulkPartyInfoRequestComplete
Note left of SDKOutboundCommandEventHandler: topic-sdk-outbound-command-events

SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: Update global state "DISCOVERY_COMPLETED"
SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: check optiions.autoAcceptParty in redis

SDKOutboundCommandEventHandler->>SDKOutboundDomainEventHandler: SDKOutboundBulkPartyInfoRequestProcessed
Note right of SDKOutboundDomainEventHandler: topic-sdk-outbound-domain-events

SDKOutboundCommandEventHandler->>SDKOutboundCommandEventHandler: check options.autoAcceptParty in redis

alt autoAcceptParty == false
SDKOutboundCommandEventHandler->>SDKOutboundAPI: SDKOutboundBulkAcceptPartyInfoRequested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ module.exports.handlePartyInfoRequestedDmEvt = async (
await options.producer.sendDomainEvent(partyInfoCallbackReceivedDmEvt);
} catch (err) {
logger.push({ err }).log('Error in handlePartyInfoRequestedDmEvt');
const { code, message } = Errors.MojaloopApiErrorCodes.SERVER_TIMED_OUT
const { code, message } = Errors.MojaloopApiErrorCodes.SERVER_TIMED_OUT;
const partyInfoCallbackReceivedDmEvt = new PartyInfoCallbackReceivedDmEvt({
bulkId: event.getKey(),
content: {
Expand Down
4 changes: 2 additions & 2 deletions modules/api-svc/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ module.exports = {
enabled: env.get('ENABLE_BACKEND_EVENT_HANDLER').default('true').asBool(),
domainEventConsumer: {
brokerList: env.get('BACKEND_EVENT_CONSUMER_BROKER_LIST').default('localhost:9092').asString(),
groupId: env.get('BACKEND_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_group').asString(),
groupId: env.get('BACKEND_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_api_svc_backend_group').asString(),
clientId: env.get('BACKEND_EVENT_CONSUMER_CLIENT_ID').default('backend_consumer_client_id').asString(),
topics: env.get('BACKEND_EVENT_CONSUMER_TOPICS').default('topic-sdk-outbound-domain-events').asArray(),
},
Expand All @@ -107,7 +107,7 @@ module.exports = {
enabled: env.get('ENABLE_FSPIOP_EVENT_HANDLER').default('true').asBool(),
domainEventConsumer: {
brokerList: env.get('FSPIOP_EVENT_CONSUMER_BROKER_LIST').default('localhost:9092').asString(),
groupId: env.get('FSPIOP_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_group').asString(),
groupId: env.get('FSPIOP_EVENT_CONSUMER_GROUP_ID').default('domain_events_consumer_api_svc_fspiop_group').asString(),
clientId: env.get('FSPIOP_EVENT_CONSUMER_CLIENT_ID').default('fspiop_consumer_client_id').asString(),
topics: env.get('FSPIOP_EVENT_CONSUMER_TOPICS').default('topic-sdk-outbound-domain-events').asArray(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import {
ProcessSDKOutboundBulkRequestCmdEvt,
ProcessSDKOutboundBulkPartyInfoRequestCmdEvt,
ProcessPartyInfoCallbackCmdEvt,
ProcessSDKOutboundBulkPartyInfoRequestCompleteCmdEvt,
ProcessSDKOutboundBulkAcceptPartyInfoCmdEvt,
ProcessSDKOutboundBulkQuotesRequestCmdEvt,
ProcessBulkQuotesCallbackCmdEvt,
Expand Down Expand Up @@ -126,14 +125,6 @@ export class OutboundEventHandler implements IRunHandler {
);
break;
}
case ProcessSDKOutboundBulkPartyInfoRequestCompleteCmdEvt.name: {
BulkTransactionAgg.ProcessCommandEvent(
ProcessSDKOutboundBulkPartyInfoRequestCompleteCmdEvt.CreateFromCommandEvent(message),
this._commandEventHandlerOptions,
this._logger,
);
break;
}
case ProcessSDKOutboundBulkAcceptPartyInfoCmdEvt.name: {
BulkTransactionAgg.ProcessCommandEvent(
ProcessSDKOutboundBulkAcceptPartyInfoCmdEvt.CreateFromCommandEvent(message),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@

import * as ProcessSDKOutboundBulkRequestHandler from './process_sdk_outbound_bulk_request';
import * as ProcessSDKOutboundBulkPartyInfoRequestHandler from './process_sdk_outbound_bulk_party_info_request';
import * as ProcessSDKOutboundBulkPartyInfoRequestCompleteHandler from './process_sdk_outbound_bulk_party_info_request_complete';
import * as ProcessPartyInfoCallbackHandler from './process_party_info_callback';
import * as ProcessSDKOutboundBulkAcceptPartyInfoHandler from './process_sdk_outbound_bulk_accept_party_info';
import * as ProcessBulkQuotesCallbackHandler from './process-bulk-quotes-callback';
import * as ProcessBulkQuotesCallbackHandler from './process_bulk_quotes_callback';
import * as ProcessSDKOutboundBulkQuotesRequestHandler from './process_sdk_outbound_bulk_quotes_request';

import { CommandEvent } from '@mojaloop/sdk-scheme-adapter-private-shared-lib';
Expand All @@ -37,7 +36,6 @@ import { ILogger } from '@mojaloop/logging-bc-public-types-lib';
export default {
...ProcessSDKOutboundBulkRequestHandler,
...ProcessSDKOutboundBulkPartyInfoRequestHandler,
...ProcessSDKOutboundBulkPartyInfoRequestCompleteHandler,
...ProcessPartyInfoCallbackHandler,
...ProcessSDKOutboundBulkAcceptPartyInfoHandler,
...ProcessBulkQuotesCallbackHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export async function handleProcessBulkQuotesCallbackCmdEvt(
const processBulkQuotesCallbackMessage = message as ProcessBulkQuotesCallbackCmdEvt;
try {
logger.info(`Got ProcessBulkQuotesCallbackCmdEvt: id=${processBulkQuotesCallbackMessage.getKey()}`);
let successCountAfterIncrement;
let failedCountAfterIncrement;

// Create aggregate
const bulkTransactionAgg = await BulkTransactionAgg.CreateFromRepo(
Expand All @@ -50,30 +52,45 @@ export async function handleProcessBulkQuotesCallbackCmdEvt(
processBulkQuotesCallbackMessage.batchId,
);
const bulkQuotesResult = processBulkQuotesCallbackMessage.bulkQuotesResult;
// TODO: There is no currentState in the bulkQuotesResult specification, but its there in the response. Need to discuss on this.
// if(bulkQuotesResult.currentState && bulkQuotesResult.currentState === 'COMPLETED') {
if(bulkQuotesResult.individualQuoteResults.length > 0) {
bulkBatch.setState(BulkBatchInternalState.AGREEMENT_SUCCESS);
bulkTransactionAgg.incrementBulkQuotesSuccessCount();

// If individual quote result contains `lastError` the individual transfer state should be AGREEMENT_FAILED.
// bulkQuotesResult.currentState === 'ERROR_OCCURRED' necessitates erroring out all individual transfers in that bulk batch.
if(bulkQuotesResult.currentState &&
bulkQuotesResult.currentState === 'COMPLETED') {
bulkBatch.setState(BulkBatchInternalState.AGREEMENT_COMPLETED);
successCountAfterIncrement = await bulkTransactionAgg.incrementBulkQuotesSuccessCount();

// Iterate through items in batch and update the individual states
for await (const quoteResult of bulkQuotesResult.individualQuoteResults) {
if(quoteResult.quoteId && !quoteResult.lastError) {
const individualTransferId = bulkBatch.getReferenceIdForQuoteId(quoteResult.quoteId);
const individualTransfer = await bulkTransactionAgg.getIndividualTransferById(individualTransferId);
individualTransfer.setTransferState(IndividualTransferInternalState.AGREEMENT_SUCCESS);
individualTransfer.setQuoteResponse(quoteResult);
await bulkTransactionAgg.setIndividualTransferById(individualTransfer.id, individualTransfer);
} else {
const individualTransferId = bulkBatch.getReferenceIdForQuoteId(quoteResult.quoteId);
const individualTransfer = await bulkTransactionAgg.getIndividualTransferById(individualTransferId);
individualTransfer.setTransferState(IndividualTransferInternalState.AGREEMENT_FAILED);
individualTransfer.setQuoteResponse(quoteResult);
await bulkTransactionAgg.setIndividualTransferById(individualTransfer.id, individualTransfer);
}
}
// If the bulk quote is in any other state, update the bulk batch and all individual transfers
// to AGREEMENT_FAILED.
} else {
bulkBatch.setState(BulkBatchInternalState.AGREEMENT_FAILED);
bulkTransactionAgg.incrementBulkQuotesFailedCount();
}
bulkBatch.setBulkQuotesResponse(bulkQuotesResult);
await bulkTransactionAgg.setBulkBatchById(bulkBatch.id, bulkBatch);

failedCountAfterIncrement = await bulkTransactionAgg.incrementBulkQuotesFailedCount();

// Iterate through items in batch and update the individual states
for await (const quoteResult of bulkQuotesResult.individualQuoteResults) {
// TODO: quoteId should be required field in the quoteResponse. But it is optional now, so if it is empty we are ignoring the result for now
if(quoteResult.quoteId) {
const individualTransferId = bulkBatch.getReferenceIdForQuoteId(quoteResult.quoteId);
const individualTransferIds = Object.values(bulkBatch.quoteIdReferenceIdMap);
for await (const individualTransferId of individualTransferIds) {
const individualTransfer = await bulkTransactionAgg.getIndividualTransferById(individualTransferId);
individualTransfer.setTransferState(IndividualTransferInternalState.AGREEMENT_SUCCESS);
individualTransfer.setQuoteResponse(quoteResult);
individualTransfer.setTransferState(IndividualTransferInternalState.AGREEMENT_FAILED);
await bulkTransactionAgg.setIndividualTransferById(individualTransfer.id, individualTransfer);
}
}
bulkBatch.setBulkQuotesResponse(bulkQuotesResult);
await bulkTransactionAgg.setBulkBatchById(bulkBatch.id, bulkBatch);

const bulkQuotesCallbackProcessedDmEvt = new BulkQuotesCallbackProcessedDmEvt({
bulkId: bulkTransactionAgg.bulkId,
Expand All @@ -88,12 +105,12 @@ export async function handleProcessBulkQuotesCallbackCmdEvt(
// Progressing to the next step
// Check the status of the remaining items in the bulk
const bulkQuotesTotalCount = await bulkTransactionAgg.getBulkQuotesTotalCount();
const bulkQuotesSuccessCount = await bulkTransactionAgg.getBulkQuotesSuccessCount();
const bulkQuotesFailedCount = await bulkTransactionAgg.getBulkQuotesFailedCount();
const bulkQuotesSuccessCount = successCountAfterIncrement || await bulkTransactionAgg.getBulkQuotesSuccessCount();
const bulkQuotesFailedCount = failedCountAfterIncrement || await bulkTransactionAgg.getBulkQuotesFailedCount();
if(bulkQuotesTotalCount === (bulkQuotesSuccessCount + bulkQuotesFailedCount)) {
// Update global state "AGREEMENT_COMPLETED"
await bulkTransactionAgg.setGlobalState(BulkTransactionInternalState.AGREEMENT_COMPLETED);

// Send the domain message SDKOutboundBulkQuotesRequestProcessed
const sdkOutboundBulkQuotesRequestProcessedDmEvt = new SDKOutboundBulkQuotesRequestProcessedDmEvt({
bulkId: bulkTransactionAgg.bulkId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import {
IndividualTransferInternalState,
ProcessPartyInfoCallbackCmdEvt,
PartyInfoCallbackProcessedDmEvt,
IPartyResult,
SDKOutboundTransferState,
BulkTransactionInternalState,
SDKOutboundBulkPartyInfoRequestProcessedDmEvt,
SDKOutboundBulkAutoAcceptPartyInfoRequestedDmEvt,
SDKOutboundBulkAcceptPartyInfoRequestedDmEvt,
} from '@mojaloop/sdk-scheme-adapter-private-shared-lib';
import { BulkTransactionAgg } from '..';
import { ICommandEventHandlerOptions } from '@module-types';
Expand All @@ -43,6 +46,8 @@ export async function handleProcessPartyInfoCallbackCmdEvt(
): Promise<void> {
const processPartyInfoCallback = message as ProcessPartyInfoCallbackCmdEvt;
try {
let successCountAfterIncrement;
let failedCountAfterIncrement;
logger.info(`Got ProcessPartyInfoCallbackCmdEvt: id=${processPartyInfoCallback.getKey()}`);

// Create aggregate
Expand All @@ -58,13 +63,12 @@ export async function handleProcessPartyInfoCallbackCmdEvt(
const partyResult = processPartyInfoCallback.getPartyResult();
if(partyResult.currentState && partyResult.currentState === SDKOutboundTransferState.COMPLETED && !partyResult.errorInformation) {
individualTransfer.setTransferState(IndividualTransferInternalState.DISCOVERY_SUCCESS);
await bulkTransactionAgg.incrementPartyLookupSuccessCount();
successCountAfterIncrement = await bulkTransactionAgg.incrementPartyLookupSuccessCount();
} else {
individualTransfer.setTransferState(IndividualTransferInternalState.DISCOVERY_FAILED);
await bulkTransactionAgg.incrementPartyLookupFailedCount();
failedCountAfterIncrement = await bulkTransactionAgg.incrementPartyLookupFailedCount();
}
individualTransfer.setPartyResponse(partyResult);

await bulkTransactionAgg.setIndividualTransferById(individualTransfer.id, individualTransfer);

const msg = new PartyInfoCallbackProcessedDmEvt({
Expand All @@ -76,6 +80,69 @@ export async function handleProcessPartyInfoCallbackCmdEvt(
headers: [],
});
await options.domainProducer.sendDomainEvent(msg);

// Progressing to the next step
// Check the status of the remaining party lookups
const partyLookupTotalCount = await bulkTransactionAgg.getPartyLookupTotalCount();
const partyLookupSuccessCount = successCountAfterIncrement || await bulkTransactionAgg.getPartyLookupSuccessCount();
const partyLookupFailedCount = failedCountAfterIncrement || await bulkTransactionAgg.getPartyLookupFailedCount();
if(partyLookupTotalCount === (partyLookupSuccessCount + partyLookupFailedCount)) {
// Update global state "DISCOVERY_COMPLETED"
await bulkTransactionAgg.setGlobalState(BulkTransactionInternalState.DISCOVERY_COMPLETED);

// Send the domain message SDKOutboundBulkPartyInfoRequestProcessedDmEvt
const sdkOutboundBulkPartyInfoRequestProcessedDmEvt = new SDKOutboundBulkPartyInfoRequestProcessedDmEvt({
bulkId: bulkTransactionAgg.bulkId,
timestamp: Date.now(),
headers: [],
});
await options.domainProducer.sendDomainEvent(sdkOutboundBulkPartyInfoRequestProcessedDmEvt);
logger.info(`Sent domain event message ${SDKOutboundBulkPartyInfoRequestProcessedDmEvt.name}`);

// Progressing to the next step
// Check configuration parameter isAutoAcceptPartyEnabled
const bulkTx = bulkTransactionAgg.getBulkTransaction();
if(bulkTx.isAutoAcceptPartyEnabled()) {
const autoAcceptPartyMsg = new SDKOutboundBulkAutoAcceptPartyInfoRequestedDmEvt({
bulkId: bulkTx.id,
timestamp: Date.now(),
headers: [],
});
await options.domainProducer.sendDomainEvent(autoAcceptPartyMsg);
} else {
const individualTransferResults = [];
const allIndividualTransferIds = await bulkTransactionAgg.getAllIndividualTransferIds();
for await (const individualTransferId of allIndividualTransferIds) {
const individualTransferData = await bulkTransactionAgg
.getIndividualTransferById(individualTransferId);
if(individualTransferData.partyResponse) {
individualTransferResults.push({
homeTransactionId: individualTransferData.request.homeTransactionId,
transactionId: individualTransferData.id,
to: individualTransferData.partyResponse?.party,
lastError: individualTransferData.partyResponse?.errorInformation && {
mojaloopError: individualTransferData.partyResponse?.errorInformation,
},
});
}
}
const sdkOutboundBulkAcceptPartyInfoRequestedDmEvt = new SDKOutboundBulkAcceptPartyInfoRequestedDmEvt({
bulkId: bulkTransactionAgg.bulkId,
request: {
bulkHomeTransactionID: bulkTx.bulkHomeTransactionID,
bulkTransactionId: bulkTransactionAgg.bulkId,
individualTransferResults,
},
timestamp: Date.now(),
headers: [],
});
await options.domainProducer.sendDomainEvent(sdkOutboundBulkAcceptPartyInfoRequestedDmEvt);
logger.info(`Sent domain event message ${SDKOutboundBulkAcceptPartyInfoRequestedDmEvt.name}`);

bulkTx.setTxState(BulkTransactionInternalState.DISCOVERY_ACCEPTANCE_PENDING);
}
await bulkTransactionAgg.setTransaction(bulkTx);
}
} catch (err) {
logger.error(`Failed to create BulkTransactionAggregate. ${(err as Error).message}`);
}
Expand Down
Loading