Skip to content

Commit

Permalink
FABN-852 NodeSDK - monitor all transactions
Browse files Browse the repository at this point in the history
Allow the registration of transaction events to
listen to all transaction status events. Convert
channel event test to await and verify feature
coverage.

Change-Id: I1df6eb10fc2d34adc11fc486ddcf98f646b6b0ff
Signed-off-by: Bret Harrison <beharrison@nc.rr.com>
  • Loading branch information
harrisob committed Aug 7, 2018
1 parent 249eaa2 commit 79b464c
Show file tree
Hide file tree
Showing 3 changed files with 511 additions and 604 deletions.
69 changes: 47 additions & 22 deletions fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const END_ONLY = 2;

const five_minutes_ms = 5 * 60 * 1000;

// Special transaction id to indicate that the transaction listener will be
// notified of all transactions
const ALL = 'all';


/**
* Transaction processing in fabric v1.1 is a long operation spanning multiple
Expand Down Expand Up @@ -97,7 +101,7 @@ const five_minutes_ms = 5 * 60 * 1000;
* the fabric event stream and a message queue:
*
* @example
* var eh = channel.newChannelEventHub(peer);
* const eh = channel.newChannelEventHub(peer);
*
* // register the listeners before calling "connect()" so there
* // is an error callback ready to process an error in case the
Expand Down Expand Up @@ -978,12 +982,22 @@ const ChannelEventHub = class {
if (!txid) {
throw new Error('Missing "txid" parameter');
}
if(typeof txid !== 'string') {
throw new Error('"txid" parameter is not a string');
}
if (!onEvent) {
throw new Error('Missing "onEvent" parameter');
}

this._checkAllowRegistrations();
const temp = this._transactionRegistrations[txid];

let default_unregister = true;
let _txid = txid;
if(txid.toLowerCase() === ALL) {
_txid = txid.toLowerCase();
default_unregister = false;
}
const temp = this._transactionRegistrations[_txid];
if (temp) {
throw new Error(`TransactionId (${txid}) has already been registered`);
}
Expand All @@ -993,17 +1007,17 @@ const ChannelEventHub = class {
default_disconnect = true;
}

const trans_registration = new EventRegistration(onEvent, onError, options, true, default_disconnect);
this._transactionRegistrations[txid] = trans_registration;
const trans_registration = new EventRegistration(onEvent, onError, options, default_unregister, default_disconnect);
this._transactionRegistrations[_txid] = trans_registration;
if (startstop_mode > NO_START_STOP) {
this._start_stop_registration = trans_registration;
trans_registration.unregister_action = () => {
this.unregisterTxEvent(txid);
this.unregisterTxEvent(_txid);
};
}
this._checkConnection();

return txid;
return _txid;
}

/**
Expand Down Expand Up @@ -1054,7 +1068,7 @@ const ChannelEventHub = class {
logger.debug(`_processTxEvents filtered block num=${block.number}`);
if (block.filtered_transactions) {
for (const filtered_transaction of block.filtered_transactions) {
this._callTransactionListener(filtered_transaction.txid,
this._checkTransactionId(filtered_transaction.txid,
filtered_transaction.tx_validation_code,
block.number);
}
Expand All @@ -1064,32 +1078,43 @@ const ChannelEventHub = class {
const txStatusCodes = block.metadata.metadata[_commonProto.BlockMetadataIndex.TRANSACTIONS_FILTER];
for (let index = 0; index < block.data.data.length; index++) {
const channel_header = block.data.data[index].payload.header.channel_header;
this._callTransactionListener(channel_header.tx_id,
this._checkTransactionId(channel_header.tx_id,
txStatusCodes[index],
block.header.number);
}
}
}

/* internal utility method */
_callTransactionListener(tx_id, val_code, block_num) {
_checkTransactionId(tx_id, val_code, block_num) {
const trans_reg = this._transactionRegistrations[tx_id];
if (trans_reg) {
logger.debug('_callTransactionListener - about to call the transaction call back for code=%s tx=%s', val_code, tx_id);
const status = convertValidationCode(val_code);
trans_reg.onEvent(tx_id, status, block_num);
if (trans_reg.unregister) {
this.unregisterTxEvent(tx_id);
logger.debug('_callTransactionListener - automatically unregister tx listener for %s', tx_id);
}
if (trans_reg.disconnect) {
this._disconnect(new Error('Shutdown due to disconnect on transaction id registration'));
}
} else {
this._callTransactionListener(tx_id, val_code, block_num, trans_reg);
}
const all_trans_reg = this._transactionRegistrations[ALL];
if (all_trans_reg) {
this._callTransactionListener(tx_id, val_code, block_num, all_trans_reg);
}
if (trans_reg || all_trans_reg){
logger.debug('_callTransactionListener - no call backs found for this transaction %s', tx_id);
}
}

/* internal utility method */
_callTransactionListener(tx_id, val_code, block_num, trans_reg) {
logger.debug('_callTransactionListener - about to call the transaction call back for code=%s tx=%s', val_code, tx_id);
const status = convertValidationCode(val_code);
trans_reg.onEvent(tx_id, status, block_num);
if (trans_reg.unregister) {
this.unregisterTxEvent(tx_id);
logger.debug('_callTransactionListener - automatically unregister tx listener for %s', tx_id);
}
if (trans_reg.disconnect) {
logger.debug('_callTransactionListener - automatically disconnect');
this._disconnect(new Error('Shutdown due to disconnect on transaction id registration'));
}
}

/*
* private internal method for processing chaincode events
* @param {Object} block protobuf object which might contain the chaincode event from the fabric
Expand Down Expand Up @@ -1288,8 +1313,8 @@ class EventRegistration {
this.onError = onError;
this.unregister = default_unregister;
this.disconnect = default_disconnect;
this.unregister_action = function () {
}; // do nothing by default
this.unregister_action = () => {}; // do nothing by default

if (options) {
if (typeof options.unregister === 'undefined' || options.unregister === null) {
logger.debug('const-EventRegistration - unregister was not defined');
Expand Down
Loading

0 comments on commit 79b464c

Please sign in to comment.