Skip to content

Commit

Permalink
FAB-10627 NodeSDK - use multiple orderers
Browse files Browse the repository at this point in the history
The sendTransaction() should try all orderers assigned
to the channel if the orderer is not specified.

Change-Id: I0a5ddf6bd9e7a41ae5b18319782a4e957e5b5d38
Signed-off-by: Bret Harrison <beharrison@nc.rr.com>
  • Loading branch information
harrisob committed Jun 14, 2018
1 parent f1340c8 commit e48c3b7
Show file tree
Hide file tree
Showing 11 changed files with 494 additions and 31 deletions.
3 changes: 2 additions & 1 deletion fabric-client/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
"initialize-with-discovery": false,
"discovery-cache-life": 300000,
"discovery-protocol": "grpcs",
"endorsement-handler": "fabric-client/lib/impl/DiscoveryEndorsementHandler.js"
"endorsement-handler": "fabric-client/lib/impl/DiscoveryEndorsementHandler.js",
"commit-handler": "fabric-client/lib/impl/BasicCommitHandler.js"
}
78 changes: 64 additions & 14 deletions fabric-client/lib/Channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,20 @@ const Channel = class {

// setup the endorsement handler
this._endorsement_handler = null;
const handler_path = sdk_utils.getConfigSetting('endorsement-handler');
let handler_path = sdk_utils.getConfigSetting('endorsement-handler');
if(handler_path) {
this._endorsement_handler = require(handler_path).create(this);
this._endorsement_handler.initialize();
}

// setup the commit handler
this._commit_handler = null;
handler_path = sdk_utils.getConfigSetting('commit-handler');
if(handler_path) {
this._commit_handler = require(handler_path).create(this);
this._commit_handler.initialize();
}


logger.debug('Constructed Channel instance: name - %s, network mode: %s', this._name, !this._devMode);
}
Expand Down Expand Up @@ -2189,9 +2197,9 @@ const Channel = class {
*
* @param {ChaincodeInvokeRequest} request
* @param {Number} timeout - A number indicating milliseconds to wait on the
* response before rejecting the promise with a
* timeout error. This overrides the default timeout
* of the Peer instance and the global timeout in the config settings.
* response before rejecting the promise with a timeout error. This
* overrides the default timeout of the Peer instance and the global
* timeout in the config settings.
* @returns {Promise} A Promise for the {@link ProposalResponseObject}
*/
sendTransactionProposal(request, timeout) {
Expand Down Expand Up @@ -2359,12 +2367,18 @@ const Channel = class {
* <li>[sendUpgradeProposal()]{@link Channel#sendUpgradeProposal}
* <li>[sendTransactionProposal()]{@link Channel#sendTransactionProposal}
*
* @param {TransactionRequest} request
* @returns {Promise} A Promise for a "BroadcastResponse" message returned by the orderer that contains a
* single "status" field for a standard [HTTP response code]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/common/common.proto#L27}.
* This will be an acknowledgement from the orderer of successfully submitted transaction.
*/
sendTransaction(request) {
* @param {TransactionRequest} request - {@link TransactionRequest}
* @param {Number} timeout - A number indicating milliseconds to wait on the
* response before rejecting the promise with a timeout error. This
* overrides the default timeout of the Orderer instance and the global
* timeout in the config settings.
* @returns {Promise} A Promise for a "BroadcastResponse" message returned by
* the orderer that contains a single "status" field for a
* standard [HTTP response code]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/common/common.proto#L27}.
* This will be an acknowledgement from the orderer of a successfully
* submitted transaction.
*/
sendTransaction(request, timeout) {
logger.debug('sendTransaction - start :: channel %s', this);
let errorMsg = null;

Expand Down Expand Up @@ -2410,9 +2424,6 @@ const Channel = class {
throw new Error('no valid endorsements found');
}

// verify that we have an orderer configured
const orderer = this._clientContext.getTargetOrderer(request.orderer, this.getOrderers(), this._name);

let use_admin_signer = false;
if (request.txId) {
use_admin_signer = request.txId.isAdmin();
Expand Down Expand Up @@ -2464,7 +2475,19 @@ const Channel = class {
payload: payload_bytes
};

return orderer.sendBroadcast(envelope);
if(this._commit_handler) {
const params = {
signed_envelope: envelope,
request: request,
timeout: timeout
};
return this._commit_handler.commit(params);

} else {
// verify that we have an orderer configured
const orderer = this._clientContext.getTargetOrderer(request.orderer, this.getOrderers(), this._name);
return orderer.sendBroadcast(envelope);
}
}

/**
Expand Down Expand Up @@ -2833,6 +2856,33 @@ const Channel = class {
return targets;
}

/*
* utility method to decide on the orderer
*/
_getOrderer(request_orderer) {
let orderer = null;
if(request_orderer) {
if(typeof request_orderer === 'string') {
orderer = this._orderers.get(request_orderer);
if(!orderer) {
throw new Error(util.format('Orderer %s not assigned to the channel', request_orderer));
}
} else if(request_orderer && request_orderer.constructor && request_orderer.constructor.name === 'Orderer') {
orderer = request_orderer;
} else {
throw new Error('Orderer is not a valid orderer object instance');
}
} else {
const orderers = this.getOrderers();
orderer = orderers[0];
if(!orderer) {
throw new Error('No Orderers assigned to this channel');
}
}

return orderer;
}

// internal utility method to build chaincode policy
_buildEndorsementPolicy(policy) {
return Policy.buildPolicy(this.getMSPManager().getMSPs(), policy);
Expand Down
30 changes: 21 additions & 9 deletions fabric-client/lib/Orderer.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,19 @@ var Orderer = class extends Remote {
/**
* Send a Broadcast message to the orderer service.
*
* @param {byte[]} envelope - Byte data to be included in the broadcast. This must
* be a protobuf encoded byte array of the
* [common.Envelope]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/common/common.proto#L132}
* that contains either a [ConfigUpdateEnvelope]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/common/configtx.proto#L70}
* or a [Transaction]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/peer/transaction.proto#L70}
* in the <code>payload.data</code> property of the envelope.
* @param {byte[]} envelope - Byte data to be included in the broadcast.
* This must be a protobuf encoded byte array of the
* [common.Envelope]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/common/common.proto#L132}
* that contains either a [ConfigUpdateEnvelope]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/common/configtx.proto#L70}
* or a [Transaction]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/peer/transaction.proto#L70}
* in the <code>payload.data</code> property of the envelope.
* @param {Number} timeout - A number indicating milliseconds to wait on the
* response before rejecting the promise with a timeout error. This
* overrides the default timeout of the Peer instance and the global
* timeout in the config settings.
* @returns {Promise} A Promise for a {@link BroadcastResponse} object
*/
sendBroadcast(envelope) {
sendBroadcast(envelope, timeout) {
logger.debug('sendBroadcast - start');

if (!envelope || envelope == '') {
Expand All @@ -84,17 +88,20 @@ var Orderer = class extends Remote {
}

var self = this;
let rto = self._request_timeout;
if (typeof timeout === 'number')
rto = timeout;

return this.waitForReady(this._ordererClient).then(() => {
// Send the envelope to the orderer via grpc
return new Promise(function (resolve, reject) {
var broadcast = self._ordererClient.broadcast();

var broadcast_timeout = setTimeout(function () {
logger.error('sendBroadcast - timed out after:%s', self._request_timeout);
logger.error('sendBroadcast - timed out after:%s', rto);
broadcast.end();
return reject(new Error('REQUEST_TIMEOUT'));
}, self._request_timeout);
}, rto);

broadcast.on('data', function (response) {
logger.debug('sendBroadcast - on data response: %j', response);
Expand Down Expand Up @@ -139,6 +146,11 @@ var Orderer = class extends Remote {
// broadcast.end();
logger.debug('sendBroadcast - sent message');
});
},
(error) =>{
logger.error('Orderer %s has an error %s ', self.getUrl(), error.toString());
self.close();
return Promise.reject(error);
});
}

Expand Down
3 changes: 2 additions & 1 deletion fabric-client/lib/Remote.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,13 @@ class Remote {
if (!client) {
throw new Error('Missing required gRPC client');
}

const timeout = new Date().getTime() + this._grpc_wait_for_ready_timeout;

return new Promise((resolve, reject) => {
client.waitForReady(timeout, (err) => {
if (err) {
logger.error(err);

return reject(err);
}
logger.debug('Successfully connected to remote gRPC server');
Expand Down
52 changes: 50 additions & 2 deletions fabric-client/lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ module.exports.EndorsementHandler = class {

/**
* @typedef {Object} EndorsementHandlerParameters
* @property {Peer[]} request - {@link ChaincodeInvokeRequest}
* @property {Object} request - {@link ChaincodeInvokeRequest}
* @property {Object} signed_proposal - the encoded protobuf "SignedProposal"
* created by the sendTransactionProposal method before calling the
* handler. Will be the object to be endorsed by the target peers.
Expand All @@ -332,7 +332,7 @@ module.exports.EndorsementHandler = class {
* transaction to all targets. The results will be analized to see if
* enough good endorsments have been received.
*
* @param {EndorsementHandlerRequest} params - A {@link EndorsementHandlerParameters}
* @param {EndorsementHandlerParameters} params - A {@link EndorsementHandlerParameters}
* that contains enough information to determine the targets and contains
* a {@link ChaincodeInvokeRequest} to be sent using the included channel
* with the {@link Channel} 'sendTransactionProposal' method.
Expand Down Expand Up @@ -360,3 +360,51 @@ module.exports.EndorsementHandler = class {
throw new Error('The "create" method must be implemented');
}
};

/**
* Base class for commit handling
* @class
*/
module.exports.CommitHandler = class {

/**
* @typedef {Object} CommitHandlerParameters
* @property {Object} request - {@link TransactionRequest}
* @property {Object} signed_envelope - An object that will be sent to the
* orderer that contains the encoded endorsed proposals and the
* signature of the sender.
* @property {Number} timeout - the timeout setting passed on sendTransaction
* method.
*/

/**
* This method will process the parameters to determine the orderers.
* The handler will use the provided orderers or use the orderers assigned to
* the channel. The handler is expected to preform failover and use all available
* orderers to send the endorsed transaction.
*
* @param {CommitHandlerParameters} params - A {@link EndorsementHandlerParameters}
* @returns {Promise} A Promise for the {@link ProposalResponseObject}, the
* same results as calling the {@link Channel} 'sendTransactionProposal'
* method directly.
*/
commit(params) {
throw new Error('The "commit" method must be implemented');
}

/**
* This method will be called by the channel when the channel is initialzied.
*/
initialize() {
throw new Error('The "initialize" method must be implemented');
}

/**
* This static method will be called by the channel to create an instance of
* this handler. It will be passed the channel object this handler is working
* with.
*/
static create(channel) {
throw new Error('The "create" method must be implemented');
}
};
Loading

0 comments on commit e48c3b7

Please sign in to comment.