Skip to content

Commit

Permalink
FABN-857, FABN-858
Browse files Browse the repository at this point in the history
FABN-857: event handler for submitTransaction()

- Minor tidy-up of existing code.
- Add default transaction event handler

FABN-858: Additional selectable event handling strategies

- Add multiple event handling strategies, and allow selection when
creating a Network.

Change-Id: I370a50619b659eaf399cc37cb8f48472de2b8237
Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday committed Sep 3, 2018
1 parent 117a00f commit 5ce3363
Show file tree
Hide file tree
Showing 19 changed files with 1,461 additions and 135 deletions.
1 change: 1 addition & 0 deletions fabric-network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ module.exports.Network = require('./lib/network');
module.exports.InMemoryWallet = require('./lib/impl/wallet/inmemorywallet');
module.exports.X509WalletMixin = require('./lib/impl/wallet/x509walletmixin');
module.exports.FileSystemWallet = require('./lib/impl/wallet/filesystemwallet');
module.exports.EventStrategies = require('./lib/eventstrategies');
21 changes: 20 additions & 1 deletion fabric-network/lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
const FabricConstants = require('fabric-client/lib/Constants');
const Contract = require('./contract');
const logger = require('./logger').getLogger('FabricNetwork.Channel');
const EventHubFactory = require('./impl/event/eventhubfactory');
const TransactionEventHandler = require('./impl/event/transactioneventhandler');
const util = require('util');

class Channel {
Expand All @@ -23,6 +25,22 @@ class Channel {

this.network = network;
this.channel = channel;

this.eventHandlerFactory = {
createTxEventHandler: () => null
};
const createEventStrategyFn = network.getOptions().eventStrategy;
if (createEventStrategyFn) {
const self = this;
const eventHubFactory = new EventHubFactory(channel);
const mspId = network.getCurrentIdentity()._mspId;
const commitTimeout = network.getOptions().commitTimeout;
this.eventHandlerFactory.createTxEventHandler = (txId) => {
const eventStrategy = createEventStrategyFn(eventHubFactory, self, mspId);
return new TransactionEventHandler(txId, eventStrategy, { timeout: commitTimeout });
};
}

this.contracts = new Map();
this.initialized = false;
this.queryHandler;
Expand Down Expand Up @@ -156,7 +174,8 @@ class Channel {
this.channel,
chaincodeId,
this.network,
this.queryHandler
this.queryHandler,
this.eventHandlerFactory
);
this.contracts.set(chaincodeId, contract);
}
Expand Down
106 changes: 61 additions & 45 deletions fabric-network/lib/contract.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ const util = require('util');

class Contract {

constructor(channel, chaincodeId, network, queryHandler) {
constructor(channel, chaincodeId, network, queryHandler, eventHandlerFactory) {
logger.debug('in Contract constructor');

this.channel = channel;
this.chaincodeId = chaincodeId;
this.network = network;
this.queryHandler = queryHandler;

this.eventHandlerFactory = eventHandlerFactory;
}

/**
Expand Down Expand Up @@ -78,48 +78,20 @@ class Contract {
return {validResponses, invalidResponses, invalidResponseMsgs};
}

_checkTransactionParameters(method, transactionName, parameters) {
if(typeof transactionName !== 'string' || transactionName.length === 0) {
const msg = util.format('transactionName must be a non-empty string: %j', transactionName);
logger.error('%s: %s', method, msg);
throw new Error(msg);
}
parameters.forEach((parameter) => {
if(typeof parameter !== 'string') {
const msg = util.format('transaction parameters must be strings: %j', parameter);
logger.error('%s: %s', method, msg);
throw new Error(msg);
}
});

}

/**
* Submit a transaction to the contract.
* @param {string} transactionName Transaction function name
* @param {...string} parameters Transaction function parameters
* @returns {Buffer} Payload response
*/
async submitTransaction(transactionName, ...parameters) {
logger.debug('in submitTransaction: ' + transactionName);

// check parameters
this._checkTransactionParameters('submitTransaction', transactionName, parameters);
/*
if(typeof transactionName !== 'string' || transactionName.length === 0) {
const msg = util.format('transactionName must be a non-empty string: %j', transactionName);
logger.error('submitTransaction: ' + msg);
throw new Error(msg);
}
parameters.forEach((parameter) => {
if(typeof parameter !== 'string') {
const msg = util.format('transaction parameters must be strings: %j', parameter);
logger.error('submitTransaction: ' + msg);
throw new Error(msg);
}
});
*/
this._verifyTransactionDetails('submitTransaction', transactionName, parameters);

const txId = this.network.getClient().newTransactionID();
// createTxEventHandler() will return null if no event handler is requested
const eventHandler = this.eventHandlerFactory.createTxEventHandler(txId.getTransactionID());

// Submit the transaction to the endorsers.
const request = {
Expand All @@ -132,39 +104,83 @@ class Contract {
// node sdk will target all peers on the channel that are endorsingPeer or do something special for a discovery environment
const results = await this.channel.sendTransactionProposal(request);
const proposalResponses = results[0];
const proposal = results[1];

//TODO: what to do about invalidResponses
const {validResponses} = this._validatePeerResponses(proposalResponses);
if (validResponses.length === 0) {
//TODO: include the invalidResponsesMsgs ?
const msg = 'No valid responses from any peers';
logger.error('submitTransaction: ' + msg);
throw new Error(msg);
}

// Submit the endorsed transaction to the primary orderers.
const proposal = results[1];

//TODO: more to do regarding checking the response (see hlfconnection.invokeChaincode)

eventHandler && await eventHandler.startListening();

// Submit the endorsed transaction to the primary orderers.
const response = await this.channel.sendTransaction({
proposalResponses: validResponses,
proposal
});

if (response.status !== 'SUCCESS') {
const msg = util.format('Failed to send peer responses for transaction \'%j\' to orderer. Response status: %j', txId.getTransactionID(), response.status);
logger.error('submitTransaction: ' + msg);
logger.error('submitTransaction:', msg);
eventHandler && eventHandler.cancelListening();
throw new Error(msg);
}

eventHandler && await eventHandler.waitForEvents();

// return the payload from the invoked chaincode
let result = null;
if (validResponses[0].response.payload && validResponses[0].response.payload.length > 0) {
result = validResponses[0].response.payload;
}
return result;
}

/**
* Verify the supplied transaction details.
* @private
* @param {String} methodName Requesting method name, used for logging.
* @param {String} transactionName Name of a transaction.
* @param {String[]} parameters transaction parameters.
* @throws {Error} if the details are not acceptable.
*/
_verifyTransactionDetails(methodName, transactionName, parameters) {
this._verifyTransactionName(methodName, transactionName);
this._verifyTransactionParameters(methodName, parameters);
}

/**
* Ensure a supplied transaction name is valid.
* @private
* @param {String} methodName Requesting method name, used for logging.
* @param {String} transactionName Name of a transaction.
* @throws {Error} if the name is not valid.
*/
_verifyTransactionName(methodName, transactionName) {
if(typeof transactionName !== 'string' || transactionName.length === 0) {
const msg = util.format('Transaction name must be a non-empty string: %j', transactionName);
logger.error(methodName + ':', msg);
throw new Error(msg);
}
}

/**
* Ensure supplied transaction parameters are valid.
* @private
* @param {String} methodName Requesting method name, used for logging.
* @param {String[]} parameters transaction parameters.
* @throws {Error} if any parameters are invalid.
*/
_verifyTransactionParameters(methodName, parameters) {
const invalidParameters = parameters.filter((parameter) => typeof parameter !== 'string');
if (invalidParameters.length > 0) {
const invalidParamString = invalidParameters
.map((parameter) => util.format('%j', parameter))
.join(', ');
const msg = 'Transaction parameters must be strings: ' + invalidParamString;
logger.error(methodName + ':', msg);
throw new Error(msg);
}
}

/**
Expand All @@ -173,7 +189,7 @@ class Contract {
* @returns {byte[]} payload response
*/
async executeTransaction(transactionName, ...parameters) {
this._checkTransactionParameters('executeTransaction', transactionName, parameters);
this._verifyTransactionDetails('executeTransaction', transactionName, parameters);
const txId = this.network.getClient().newTransactionID();
const result = await this.queryHandler.queryChaincode(this.chaincodeId, txId, transactionName, parameters);
return result ? result : null;
Expand Down
37 changes: 37 additions & 0 deletions fabric-network/lib/eventstrategies.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright 2018 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

'use strict';

const AllForTxStrategy = require('fabric-network/lib/impl/event/allfortxstrategy');
const AnyForTxStrategy = require('fabric-network/lib/impl/event/anyfortxstrategy');

function MSPID_SCOPE_ALLFORTX(eventHubFactory, channel, mspId) {
const peers = channel.getPeerMap().get(mspId);
return new AllForTxStrategy(eventHubFactory, peers);
}

function MSPID_SCOPE_ANYFORTX(eventHubFactory, channel, mspId) {
const peers = channel.getPeerMap().get(mspId);
return new AnyForTxStrategy(eventHubFactory, peers);
}

function CHANNEL_SCOPE_ALLFORTX(eventHubFactory, channel, mspId) {
const peers = channel.getInternalChannel().getPeers();
return new AllForTxStrategy(eventHubFactory, peers);
}

function CHANNEL_SCOPE_ANYFORTX(eventHubFactory, channel, mspId) {
const peers = channel.getInternalChannel().getPeers();
return new AnyForTxStrategy(eventHubFactory, peers);
}

module.exports = {
MSPID_SCOPE_ALLFORTX,
MSPID_SCOPE_ANYFORTX,
CHANNEL_SCOPE_ALLFORTX,
CHANNEL_SCOPE_ANYFORTX
};
106 changes: 106 additions & 0 deletions fabric-network/lib/impl/event/abstracteventstrategy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Copyright 2018 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

const logger = require('fabric-network/lib/logger').getLogger('AbstractStrategy');

/**
* Event handling strategy base class that keeps counts of success and fail events to allow
* subclasses to implement concrete event handling strategies. On each success or fail event,
* the checkCompletion() function is called, which must be implemented by
* subclasses.
*
* Instances of the strategy are stateful and must only be used for a single transaction.
* @private
* @class
*/
class AbstractEventStrategy {
/**
* Constructor.
* @param {EventHubFactory} eventHubFactory Factory for obtaining event hubs for peers.
* @param {ChannelPeer[]} peers Peers from which to process events.
*/
constructor(eventHubFactory, peers) {
if (!eventHubFactory) {
const message = 'Event hub factory not set';
logger.error('constructor:', message);
throw new Error(message);
}
if (!peers || peers.length === 0) {
const message = 'Peers not set';
logger.error('constructor:', message);
throw new Error(message);
}

this.eventHubFactory = eventHubFactory;
this.peers = peers;
this.counts = {
success: 0,
fail: 0,
expected: 0
};
}

/**
* Called by event handler to obtain the event hubs to which it should listen. Gives an opportunity for
* the strategy to store information on the events it expects to receive for later use in event handling.
* @async
* @throws {Error} if the connected event hubs do not satisfy the strategy.
*/
async getConnectedEventHubs() {
const eventHubs = await this.eventHubFactory.getEventHubs(this.peers);
const connectedEventHubs = eventHubs.filter((eventHub) => eventHub.isconnected());

if (connectedEventHubs.length === 0) {
const message = 'No available event hubs found for strategy';
const eventHubNames = eventHubs.map((eventHub) => eventHub.getName());
logger.error('getConnectedEventHubs:', message, eventHubNames);
throw new Error(message);
}

this.counts.expected = connectedEventHubs.length;

return connectedEventHubs;
}

/**
* Called when an event is received.
* @param {Function} successFn Callback function to invoke if this event satisfies the strategy.
* @param {Function} failFn Callback function to invoke if this event fails the strategy.
*/
eventReceived(successFn, failFn) {
this.counts.success++;
this.checkCompletion(this.counts, successFn, failFn);
}

/**
* Called when an error is received.
* @param {Function} successFn Callback function to invoke if this error satisfies the strategy.
* @param {Function} failFn Callback function to invoke if this error fails the strategy.
*/
errorReceived(successFn, failFn) {
this.counts.fail++;
this.checkCompletion(this.counts, successFn, failFn);
}
/**
* @typedef {Object} EventCount
* @property {Number} success Number of successful events received.
* @property {Number} fail Number of errors received.
* @property {Number} expected Number of event hubs for which response events (or errors) are expected.
*/

/**
* Called when a successful event or error is received.
* @private
* @param {EventCount} counts Count of events received.
* @param {Function} successFn Callback function to invoke if the strategy is successful.
* @param {Function} failFn Callback function to invoke if the strategy fails.
*/
checkCompletion(counts, successFn, failFn) {
throw new Error('AbstractEventStrategy.checkCompletion() not implemented');
}
}

module.exports = AbstractEventStrategy;
Loading

0 comments on commit 5ce3363

Please sign in to comment.