From 91c1f170d8f1f5a342ba01099427d25a20fb2137 Mon Sep 17 00:00:00 2001 From: zhaochy Date: Fri, 7 Sep 2018 16:25:09 +0800 Subject: [PATCH] [FABN-908] using channel eventhub offline support sign the payload for channel eventhub registration offline Change-Id: I3671348f2d24fa5abb801b319cd7505006ad5a2b Signed-off-by: zhaochy --- fabric-client/lib/Channel.js | 6 +- fabric-client/lib/ChannelEventHub.js | 237 +++++++++++++++++---- test/integration/signTransactionOffline.js | 47 +++- test/unit/channel-event-hub.js | 2 +- 4 files changed, 237 insertions(+), 55 deletions(-) diff --git a/fabric-client/lib/Channel.js b/fabric-client/lib/Channel.js index 09c5f86385..a7e976e671 100755 --- a/fabric-client/lib/Channel.js +++ b/fabric-client/lib/Channel.js @@ -2914,8 +2914,8 @@ const Channel = class { }; // certificate, publicKey, mspId, cryptoSuite - const signer = new Identity(certificate, null, mspId); - const txId = new TransactionID(signer, admin); + const identity = new Identity(certificate, null, mspId); + const txId = new TransactionID(identity, admin); const channelHeader = client_utils.buildChannelHeader( _commonProto.HeaderType.ENDORSER_TRANSACTION, @@ -2927,7 +2927,7 @@ const Channel = class { this._clientContext.getClientCertHash() ); - const header = client_utils.buildHeader(signer, channelHeader, txId.getNonce()); + const header = client_utils.buildHeader(identity, channelHeader, txId.getNonce()); const proposal = client_utils.buildProposal(invokeSpec, header, request.transientMap); return { proposal, txId }; } diff --git a/fabric-client/lib/ChannelEventHub.js b/fabric-client/lib/ChannelEventHub.js index 8e894d5411..c1657f2a25 100644 --- a/fabric-client/lib/ChannelEventHub.js +++ b/fabric-client/lib/ChannelEventHub.js @@ -10,6 +10,9 @@ const Long = require('long'); const utils = require('./utils.js'); const clientUtils = require('./client-utils.js'); const logger = utils.getLogger('ChannelEventHub.js'); +const { Identity } = require('./msp/identity'); +const TransactionID = require('./TransactionID'); +const util = require('util'); const BlockDecoder = require('./BlockDecoder.js'); @@ -125,7 +128,7 @@ const NEWEST = 'newest'; * * @class */ -const ChannelEventHub = class { +class ChannelEventHub { /** * Constructs a ChannelEventHub object @@ -230,6 +233,27 @@ const ChannelEventHub = class { return this._connected; } + /** + * @typedef {Object} SignedEvent + * @property {Buffer} signature the signature over this payload + * @property {Buffer} payload the payload byte array to be send to peer + */ + + /** + * @typedef {ConnectOptions} + * @property {boolean} full_block - Optional. to indicated that the connection + * with the peer will be sending full blocks or filtered blocks to this + * ChannelEventHub. + * The default will be to establish a connection using filtered blocks. + * Filtered blocks have the required information to provided transaction + * status and chaincode events. When using the non filtered blocks the user + * will be required to have access to establish the connection to + * receive full blocks. + * Registering a block listener on a filtered block connection may not + * provide sufficient information. + * @property {SignedEvent} signedEvent - Optional. the signed event to be send to peer + */ + /** * Establishes a connection with the peer event source. * @@ -242,21 +266,24 @@ const ChannelEventHub = class { * [registerChaincodeEvent]{@link ChannelEventHub#registerChaincodeEvent} * methods, before calling connect(). * - * @param {boolean} full_block - to indicated that the connection with the peer - * will be sending full blocks or filtered blocks to this ChannelEventHub. - * The default - * will be to establish a connection using filtered blocks. Filtered - * blocks have the required information to provided transaction status - * and chaincode events. When using the non filtered blocks the user - * will be required to have access to establish the connection to - * receive full blocks. - * Registering a block listener on a filtered block connection may not - * provide sufficient information. + * @param {ConnectOptions|boolean} options - Optional */ - connect(full_block) { - logger.debug('connect - start %s', this.getPeerAddr()); - if (!this._clientContext._userContext && !this._clientContext._adminSigningIdentity) { - throw new Error('The clientContext has not been properly initialized, missing userContext or admin identity'); + connect(options) { + let signedEvent = null; + let full_block = null; + if (typeof options === 'boolean') { + full_block = options; + } + if (typeof options === 'object') { + signedEvent = options.signedEvent || null; + full_block = options.full_block || null; + } + if (signedEvent) { + signedEvent = this._validateSignedEvent(signedEvent); + } + logger.debug('connect - start peerAddr:%s', this.getPeerAddr()); + if (!this._clientContext._userContext && !this._clientContext._adminSigningIdentity && !signedEvent) { + throw new Error('Error connect the ChannelEventhub to peer, either the clientContext has not been properly initialized, missing userContext or admin identity or missing signedEvent'); } if (typeof full_block === 'boolean') { @@ -268,17 +295,32 @@ const ChannelEventHub = class { throw new Error('"filtered" parameter is invalid'); } - this._connect(); + logger.debug('connect - signed event:%s', !!signedEvent); + this._connect({ signedEvent }); logger.debug('connect - end %s', this.getPeerAddr()); } + /** + * @typedef {InternalConnectOptions} + * @property {boolean} force - Optional. internal use only, will reestablish the + * the connection to the peer event hub + * @property {SignedEvent} signedEvent - Optional. the signed event to be send to peer + */ + + /* * Internal use only * Establishes a connection with the peer event source - * @param {boolean} force - internal use only, will reestablish the - * the connection to the peer event hub + * @param {InternalConnectOptions} request - internal use only, the options to be passed + * to the internal method _connect() */ - _connect(force) { + _connect(request) { + let force = false; + let signedEvent = null; + if (request) { + force = request.force; + signedEvent = request.signedEvent; + } logger.debug('_connect - start - %s', new Date()); if (this._connect_running) { logger.debug('_connect - connect is running'); @@ -368,7 +410,7 @@ const ChannelEventHub = class { } } else if (deliverResponse.Type === 'status') { - if( deliverResponse.status === 'SUCCESS') { + if (deliverResponse.status === 'SUCCESS') { if (self._ending_block_seen) { // this is normal after the last block comes in when we set an ending block logger.debug('on.data - status received after last block seen: %s block_num:', deliverResponse.status, self._last_block_seen); @@ -427,7 +469,11 @@ const ChannelEventHub = class { } }); - this._sendRegistration(); + if (signedEvent) { + this._sendSignedRegistration(signedEvent); + } else { + this._sendRegistration(); + } logger.debug('_connect - end stream:', stream_id); } @@ -495,6 +541,114 @@ const ChannelEventHub = class { * and sends it to the peer's event hub. */ _sendRegistration() { + // use the admin if available + const txId = this._clientContext.newTransactionID(true); + const signer = this._clientContext._getSigningIdentity(true); + + const opt = { + identity: signer, + txId, + }; + const seekPayloadBytes = this.generateUnsignedRegistration(opt); + + const sig = signer.sign(seekPayloadBytes); + const signature = Buffer.from(sig); + + // building manually or will get protobuf errors on send + const envelope = { + signature: signature, + payload: seekPayloadBytes + }; + + this._stream.write(envelope); + } + + /** + * Internal method + * validate the signedEvent has signature and payload + * and return the signedEvent + * + * @param {SignedEvent} signedEvent the signed event to be send to peer + */ + _validateSignedEvent(signedEvent) { + const method = '_validateSignedEvent'; + logger.debug('%s - enter', method); + if (!signedEvent.signature) { + throw new Error('Empty signature in signed event'); + } + if (!signedEvent.payload) { + throw new Error('Empty payload for signed event'); + } + logger.debug('%s - exit', method); + return { + signature: signedEvent.signature, + payload: signedEvent.payload, + }; + } + + /** + * Internal method + * Send a signed event registration to the peer's eventhub + */ + _sendSignedRegistration(signedEvent) { + const method = '_sendSignedRegistration'; + logger.debug('%s - enter', method); + this._stream.write(signedEvent); + } + + /** + * @typedef {Object} EventHubRegistrationRequest + * + * @property {Identity} identity the identity who is doing this registration + * @property {TransactionID} txId a transaction id for this registration + * @property {string} certificate The certificate file, in PEM format + * @property {string} mspId The member service provider Id used to process the identity + */ + + /** + * generate the unsigned eventhub registration, the returned payload should be signed by the + * identity's private key. + * + * @param {EventHubRegistrationRequest} options the options for register the eventhub at peer, + * Notice the options should contain either both + * identity and txId, or both certificate or mspId + * + * @returns {Buffer} the byte array contains the registration payload + */ + generateUnsignedRegistration(options) { + const method = 'generateUnsignedRegistration'; + logger.debug('%s - enter', method); + if (!options) { + throw new Error(util.format('%s - Missing Required Argument "options"', method)); + } + let { identity, txId, certificate, mspId } = options; + // either we have both identity and txId, or we have both certificate or mspId + if (identity || txId) { + if (!txId) { + throw new Error('"options.txId" is required to generate unsigned event'); + } + if (!identity) { + throw new Error('"options.identity" is required to generate unsigned event'); + } + if (certificate || mspId) { + certificate = null; + mspId = null; + } + } + if (certificate || mspId) { + if (!certificate) { + throw new Error('"options.certificate" is required to generate unsigned event'); + } + if (!mspId) { + throw new Error('"options.mspId" is required to generate unsigned event'); + } + } + + if (!identity) { + identity = new Identity(certificate, null, mspId); + txId = new TransactionID(identity, options.admin === true); + } + // The behavior when a missing block is encountered let behavior = _abProto.SeekInfo.SeekBehavior.BLOCK_UNTIL_READY; // build start @@ -510,7 +664,7 @@ const ChannelEventHub = class { // build stop const seekStop = new _abProto.SeekPosition(); - if(this._ending_block_newest) { + if (this._ending_block_newest) { const seekNewest = new _abProto.SeekNewest(); seekStop.setNewest(seekNewest); behavior = _abProto.SeekInfo.SeekBehavior.FAIL_IF_NOT_READY; @@ -535,37 +689,25 @@ const ChannelEventHub = class { // FAIL_IF_NOT_READY will mean if the block is not there throw an error seekInfo.setBehavior(behavior); - // use the admin if available - const tx_id = this._clientContext.newTransactionID(true); - const signer = this._clientContext._getSigningIdentity(true); // build the header for use with the seekInfo payload const seekInfoHeader = clientUtils.buildChannelHeader( _commonProto.HeaderType.DELIVER_SEEK_INFO, this._channel._name, - tx_id.getTransactionID(), + txId.getTransactionID(), this._initial_epoch, null, clientUtils.buildCurrentTimestamp(), this._clientContext.getClientCertHash() ); - const seekHeader = clientUtils.buildHeader(signer, seekInfoHeader, tx_id.getNonce()); + const seekHeader = clientUtils.buildHeader(identity, seekInfoHeader, txId.getNonce()); const seekPayload = new _commonProto.Payload(); seekPayload.setHeader(seekHeader); seekPayload.setData(seekInfo.toBuffer()); const seekPayloadBytes = seekPayload.toBuffer(); - const sig = signer.sign(seekPayloadBytes); - const signature = Buffer.from(sig); - - // building manually or will get protobuf errors on send - const envelope = { - signature: signature, - payload: seekPayloadBytes - }; - - this._stream.write(envelope); + return seekPayloadBytes; } /* @@ -642,8 +784,8 @@ const ChannelEventHub = class { if (options && typeof options.endBlock !== 'undefined') { try { let end_block = options.endBlock; - if(typeof end_block === 'string') { - if(end_block.toLowerCase() === NEWEST) { + if (typeof end_block === 'string') { + if (end_block.toLowerCase() === NEWEST) { end_block = Long.MAX_VALUE; this._ending_block_newest = true; } @@ -739,14 +881,14 @@ const ChannelEventHub = class { } else if (state !== 2) { // try to reconnect this._connect_running = false; - this._connect(true); + this._connect({ force: true }); } } else { logger.debug('checkConnection - stream was shutdown - will reconnected'); // try to reconnect this._connect_running = false; - this._connect(true); + this._connect({ force: true }); } } catch (error) { @@ -1023,7 +1165,7 @@ const ChannelEventHub = class { if (!txid) { throw new Error('Missing "txid" parameter'); } - if(typeof txid !== 'string') { + if (typeof txid !== 'string') { throw new Error('"txid" parameter is not a string'); } if (!onEvent) { @@ -1034,7 +1176,7 @@ const ChannelEventHub = class { let default_unregister = true; let _txid = txid; - if(txid.toLowerCase() === ALL) { + if (txid.toLowerCase() === ALL) { _txid = txid.toLowerCase(); default_unregister = false; } @@ -1136,7 +1278,7 @@ const ChannelEventHub = class { if (all_trans_reg) { this._callTransactionListener(tx_id, val_code, block_num, all_trans_reg); } - if (trans_reg || all_trans_reg){ + if (trans_reg || all_trans_reg) { logger.debug('_callTransactionListener - no call backs found for this transaction %s', tx_id); } } @@ -1193,7 +1335,7 @@ const ChannelEventHub = class { if (channel_header.type === 3) { //only ENDORSER_TRANSACTION have chaincode events const tx = payload.data; if (tx && tx.actions) { - for (const {payload} of tx.actions) { + for (const { payload } of tx.actions) { const chaincode_event = payload.action.proposal_response_payload.extension.events; logger.debug('_processChaincodeEvents - chaincode_event %s', chaincode_event); @@ -1289,7 +1431,8 @@ const ChannelEventHub = class { this._start_stop_registration = null; } -}; +} + module.exports = ChannelEventHub; function convertValidationCode(code) { @@ -1370,7 +1513,7 @@ class EventRegistration { this.onError = onError; this.unregister = default_unregister; this.disconnect = default_disconnect; - this.unregister_action = () => {}; // do nothing by default + this.unregister_action = () => { }; // do nothing by default if (options) { if (typeof options.unregister === 'undefined' || options.unregister === null) { diff --git a/test/integration/signTransactionOffline.js b/test/integration/signTransactionOffline.js index 8f1dab3030..3eaf1cbafc 100644 --- a/test/integration/signTransactionOffline.js +++ b/test/integration/signTransactionOffline.js @@ -82,8 +82,7 @@ function sign(privateKey, proposalBytes, algorithm, keySize) { return Buffer.from(sig.toDER()); } -function signProposal(proposal) { - const proposalBytes = proposal.toBuffer(); +function signProposal(proposalBytes) { const signature = sign(privateKeyPem, proposalBytes, 'sha2', 256); const signedProposal = { signature, proposal_bytes: proposalBytes }; return signedProposal; @@ -136,6 +135,42 @@ async function setupChannel() { return channel; } +async function transactionMonitor(txId, eh, t) { + return new Promise((resolve, reject) => { + const handle = setTimeout(() => { + t.fail('Timeout - Failed to receive event for txId ' + txId); + eh.disconnect(); //shutdown + throw new Error('TIMEOUT - no event received'); + }, 60000); + + eh.registerTxEvent(txId, (txnid, code, block_num) => { + clearTimeout(handle); + t.pass('Event has been seen with transaction code:' + code + ' for transaction id:' + txnid + ' for block_num:' + block_num); + resolve('Got the replayed transaction'); + }, (error) => { + clearTimeout(handle); + t.fail('Failed to receive event replay for Event for transaction id ::' + txId); + reject(error); + }, { disconnect: true } + // Setting the disconnect to true as we do not want to use this + // ChannelEventHub after the event we are looking for comes in + ); + t.pass('Successfully registered event for ' + txId); + + const unsignedEvent = eh.generateUnsignedRegistration({ + certificate: certPem, + mspId, + }); + const signedProposal = signProposal(unsignedEvent); + const signedEvent = { + signature: signedProposal.signature, + payload: signedProposal.proposal_bytes, + }; + eh.connect({ signedEvent }); + t.pass('Successfully called connect on ' + eh.getPeerAddr()); + }); +} + test('Test sign a contract with a private key offline', async (t) => { try { const channel = await setupChannel(); @@ -148,7 +183,7 @@ test('Test sign a contract with a private key offline', async (t) => { }; const { proposal, txId } = channel.generateUnsignedProposal(transactionProposalReq, mspId, certPem); - const signedProposal = signProposal(proposal); + const signedProposal = signProposal(proposal.toBuffer()); t.pass('Successfully build endorse transaction proposal'); const peer = channel.getPeer('localhost:7051'); @@ -174,7 +209,7 @@ test('Test sign a contract with a private key offline', async (t) => { t.pass('Successfully build commit transaction proposal'); // sign this commit proposal at local - const signedCommitProposal = signProposal(commitProposal); + const signedCommitProposal = signProposal(commitProposal.toBuffer()); const response = await channel.sendSignedTransaction({ signedProposal: signedCommitProposal, @@ -182,6 +217,10 @@ test('Test sign a contract with a private key offline', async (t) => { }); t.equal(response.status, 'SUCCESS', 'commit should response success'); + const eh = channel.newChannelEventHub(peer); + await transactionMonitor(txId.getTransactionID(), eh, t); + t.pass(`Successfully listened the event for transaction ${txId.getTransactionID()}`); + t.end(); } catch (e) { t.fail(e.message); diff --git a/test/unit/channel-event-hub.js b/test/unit/channel-event-hub.js index 29456451a0..57798170ca 100644 --- a/test/unit/channel-event-hub.js +++ b/test/unit/channel-event-hub.js @@ -55,7 +55,7 @@ test('\n\n** ChannelEventHub tests\n\n', (t) => { eh.registerBlockEvent({}); eh.connect(); }, - /The clientContext has not been properly initialized, missing userContext/, + /Error connect the ChannelEventhub to peer, either the clientContext has not been properly initialized, missing userContext or admin identity or missing signedEvent/, 'Must pass in a clientContext that has the user context already initialized' );