Skip to content

Commit

Permalink
Merge "[FAB-2864] Replace hashtable module"
Browse files Browse the repository at this point in the history
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed May 23, 2017
2 parents 9d3de19 + 392dd9f commit ff69ba4
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 46 deletions.
84 changes: 45 additions & 39 deletions fabric-client/lib/EventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ var utils = require('./utils.js');
var Remote = require('./Remote.js');
var BlockDecoder = require('./BlockDecoder.js');
var grpc = require('grpc');
var HashTable = require('hashtable');
var logger = utils.getLogger('EventHub.js');

var _events = grpc.load(__dirname + '/protos/peer/events.proto').protos;
Expand Down Expand Up @@ -87,14 +86,14 @@ var EventHub = class {
constructor(clientContext) {
logger.debug('const ');
// hashtable of clients registered for chaincode events
this.chaincodeRegistrants = new HashTable();
this.chaincodeRegistrants = {};
// set of clients registered for block events
this.block_registrant_count = 1;
this.blockOnEvents = new HashTable();
this.blockOnErrors = new HashTable();
this.blockOnEvents = {};
this.blockOnErrors = {};
// hashtable of clients registered for transactional events
this.transactionOnEvents = new HashTable();
this.transactionOnErrors = new HashTable();
this.transactionOnEvents = {};
this.transactionOnErrors = {};
// peer node to connect to
this.ep = null;
// grpc event client interface
Expand Down Expand Up @@ -265,22 +264,27 @@ var EventHub = class {
_closeAllCallbacks(err) {
logger.debug('_closeAllCallbacks - start');

var closer = function(key, cb) {
logger.debug('_closeAllCallbacks - closing this callback %s',key);
cb(err);
var closer = function(list) {
for (let key in list) {
let cb = list[key];
logger.debug('_closeAllCallbacks - closing this callback %s',key);
cb(err);
}
};

logger.debug('_closeAllCallbacks - blockOnErrors %s',this.blockOnErrors.size());
this.blockOnErrors.forEach(closer);
this.blockOnEvents.clear();
this.blockOnErrors.clear();
logger.debug('_closeAllCallbacks - blockOnErrors %s', Object.keys(this.blockOnErrors).length);
closer(this.blockOnErrors);
this.blockOnEvents = {};
this.blockOnErrors = {};

logger.debug('_closeAllCallbacks - transactionOnErrors %s',this.transactionOnErrors.size());
this.transactionOnErrors.forEach(closer);
this.transactionOnEvents.clear();
this.transactionOnErrors.clear();
logger.debug('_closeAllCallbacks - transactionOnErrors %s', Object.keys(this.transactionOnErrors).length);
closer(this.transactionOnErrors);
this.transactionOnEvents = {};
this.transactionOnErrors = {};

var cc_closer = function(key, cbtable) {
var self = this;
var cc_closer = function(key) {
var cbtable = self.chaincodeRegistrants[key];
cbtable.forEach(function(cbe) {
logger.debug('_closeAllCallbacks - closing this chaincode event %s %s',cbe.ccid, cbe.eventNameFilter);
if(cbe.onError) {
Expand All @@ -289,9 +293,9 @@ var EventHub = class {
});
};

logger.debug('_closeAllCallbacks - chaincodeRegistrants %s',this.chaincodeRegistrants.size());
this.chaincodeRegistrants.forEach(cc_closer);
this.chaincodeRegistrants.clear();
logger.debug('_closeAllCallbacks - chaincodeRegistrants %s', Object.keys(this.chaincodeRegistrants).length);
Object.keys(this.chaincodeRegistrants).forEach(cc_closer);
this.chaincodeRegistrants = {};
}

/*
Expand Down Expand Up @@ -368,10 +372,10 @@ var EventHub = class {
this._checkConnection(!have_error_cb, false);

var cbe = new ChainCodeCBE(ccid, eventname, onEvent, onError);
var cbtable = this.chaincodeRegistrants.get(ccid);
var cbtable = this.chaincodeRegistrants[ccid];
if (!cbtable) {
cbtable = new Set();
this.chaincodeRegistrants.put(ccid, cbtable);
this.chaincodeRegistrants[ccid] = cbtable;
}
cbtable.add(cbe);

Expand All @@ -394,14 +398,14 @@ var EventHub = class {
if(!cbe) {
throw new Error('Missing "cbe" parameter');
}
var cbtable = this.chaincodeRegistrants.get(cbe.ccid);
var cbtable = this.chaincodeRegistrants[cbe.ccid];
if (!cbtable) {
logger.debug('No event registration for ccid %s ', cbe.ccid);
return;
}
cbtable.delete(cbe);
if (cbtable.size <= 0) {
this.chaincodeRegistrants.remove(cbe.ccid);
delete this.chaincodeRegistrants[cbe.ccid];
}
}

Expand Down Expand Up @@ -430,12 +434,12 @@ var EventHub = class {
this._checkConnection(!have_error_cb, false);

var block_registration_number = this.block_registrant_count++;
this.blockOnEvents.put(block_registration_number, onEvent);
this.blockOnEvents[block_registration_number] = onEvent;

// when there is an error callback try to reconnect this
// event hub if is not connected
if(have_error_cb) {
this.blockOnErrors.put(block_registration_number, onError);
this.blockOnErrors[block_registration_number] = onError;
this._checkConnection(false, this.force_reconnect);
}

Expand All @@ -453,8 +457,8 @@ var EventHub = class {
if(!block_registration_number) {
throw new Error('Missing "block_registration_number" parameter');
}
this.blockOnEvents.remove(block_registration_number);
this.blockOnErrors.remove(block_registration_number);
delete this.blockOnEvents[block_registration_number];
delete this.blockOnErrors[block_registration_number];
}

/**
Expand Down Expand Up @@ -483,12 +487,12 @@ var EventHub = class {
// when this hub is not connected
this._checkConnection(!have_error_cb, false);

this.transactionOnEvents.put(txid, onEvent);
this.transactionOnEvents[txid] = onEvent;

// when there is an onError callback try to reconnect this
// event hub if is not connected
if(have_error_cb) {
this.transactionOnErrors.put(txid, onError);
this.transactionOnErrors[txid] = onError;
this._checkConnection(false, this.force_reconnect);
}
}
Expand All @@ -502,8 +506,8 @@ var EventHub = class {
if(!txid) {
throw new Error('Missing "txid" parameter');
}
this.transactionOnEvents.remove(txid);
this.transactionOnErrors.remove(txid);
delete this.transactionOnEvents[txid];
delete this.transactionOnErrors[txid];
}

/*
Expand All @@ -512,13 +516,15 @@ var EventHub = class {
*/
_processBlockOnEvents(block) {
logger.debug('_processBlockOnEvents block=%s', block.header.number);
if(this.blockOnEvents.size() == 0) {
if(Object.keys(this.blockOnEvents).length == 0) {
logger.debug('_processBlockOnEvents - no registered block event "listeners"');
return;
}

// send to all registered block listeners
this.blockOnEvents.forEach(function(key, cb) {
let self = this;
Object.keys(this.blockOnEvents).forEach(function(key) {
var cb = self.blockOnEvents[key];
cb(block);
});
}
Expand All @@ -529,7 +535,7 @@ var EventHub = class {
*/
_processTxOnEvents(block) {
logger.debug('_processTxOnEvents block=%s', block.header.number);
if(this.transactionOnEvents.size() == 0) {
if(Object.keys(this.transactionOnEvents).length == 0) {
logger.debug('_processTxOnEvents - no registered transaction event "listeners"');
return;
}
Expand All @@ -541,7 +547,7 @@ var EventHub = class {
var channel_header = block.data.data[index].payload.header.channel_header;
var val_code = convertValidationCode(txStatusCodes[index]);
logger.debug('_processTxOnEvents - txid=%s val_code=%s', channel_header.tx_id, val_code);
var cb = this.transactionOnEvents.get(channel_header.tx_id);
var cb = this.transactionOnEvents[channel_header.tx_id];
if (cb){
logger.debug('_processTxOnEvents - about to stream the transaction call back for code=%s tx=%s', val_code, channel_header.tx_id);
cb(channel_header.tx_id, val_code);
Expand All @@ -555,7 +561,7 @@ var EventHub = class {
*/
_processChainCodeOnEvents(block) {
logger.debug('_processChainCodeOnEvents block=%s', block.header.number);
if(this.chaincodeRegistrants.size() == 0) {
if(Object.keys(this.chaincodeRegistrants).length == 0) {
logger.debug('_processChainCodeOnEvents - no registered chaincode event "listeners"');
return;
}
Expand All @@ -573,7 +579,7 @@ var EventHub = class {
var caPayload = propRespPayload.extension;
var ccEvent = caPayload.events;
logger.debug('_processChainCodeOnEvents - ccEvent %s',ccEvent);
var cbtable = this.chaincodeRegistrants.get(ccEvent.chaincode_id);
var cbtable = this.chaincodeRegistrants[ccEvent.chaincode_id];
if (!cbtable) {
return;
}
Expand Down
1 change: 0 additions & 1 deletion fabric-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"fs": "0.0.2",
"fs-extra": ">=0.30.0 <0.31.0",
"grpc": ">=1.1.2 <1.3.0",
"hashtable": "^2.0.2",
"js-sha3": "^0.5.1",
"jsrsasign": "6.2.2",
"jssha": "^2.1.0",
Expand Down
72 changes: 66 additions & 6 deletions test/unit/event-hub.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,27 @@ test('\n\n** EventHub block callback \n\n', (t) => {
eh.connected = true; //force this into connected state
eh.force_reconnect = false;

eh.registerBlockEvent((block) => {
t.fail('Should not have called success callback');
var index = eh.registerBlockEvent((block) => {
t.fail('Should not have called success callback when disconnect() is called');
t.end();
}, (error) =>{
t.pass('Successfully called error callback');
t.pass('Successfully called error callback from disconnect()');
t.end();
});

t.pass('successfully registered block callbacks');
t.equal(index, 1, 'Check the first block listener is at index 1');

index = eh.registerBlockEvent(() => {
// empty method body
}, () => {
// empty method body
});

t.equal(index, 2, 'Check the 2nd block listener is at index 2');
t.equal(Object.keys(eh.blockOnEvents).length, 2, 'Check the size of the blockOnEvents hash table');
t.equal(Object.keys(eh.blockOnErrors).length, 2, 'Check the size of the blockOnErrors hash table');

eh.disconnect();
});

Expand All @@ -213,14 +226,36 @@ test('\n\n** EventHub transaction callback \n\n', (t) => {
eh.connected = true; //force this into connected state
eh.force_reconnect = false;

eh.registerTxEvent('txid', (block) => {
eh.registerTxEvent('txid1', (block) => {
// empty method body
}, (error) =>{
// empty method body
});
t.pass('successfully registered transaction callbacks');
t.equal(Object.keys(eh.transactionOnEvents).length, 1, 'Check the size of the transactionOnEvents hash table');
t.equal(Object.keys(eh.transactionOnErrors).length, 1, 'Check the size of the transactionOnErrors hash table');

eh.registerTxEvent('txid1', (block) => {
t.fail('Should not have called success callback');
t.end();
}, (error) =>{
t.pass('Successfully called error callback');
t.end();
});
t.pass('successfully registered transaction callbacks');
t.equal(Object.keys(eh.transactionOnEvents).length, 1,
'Size of the transactionOnEvents hash table should still be 1 since the listeners are for the same txId');
t.equal(Object.keys(eh.transactionOnErrors).length, 1,
'Size of the transactionOnErrors hash table should still be 1 since the listeners are for the same txId');

eh.registerTxEvent('txid2', (block) => {
// empty method body
}, (error) =>{
// empty method body
});

t.equal(Object.keys(eh.transactionOnEvents).length, 2, 'Check the size of the transactionOnEvents hash table');
t.equal(Object.keys(eh.transactionOnErrors).length, 2, 'Check the size of the transactionOnErrors hash table');

eh.disconnect();
});

Expand All @@ -230,14 +265,35 @@ test('\n\n** EventHub chaincode callback \n\n', (t) => {
eh.connected = true; //force this into connected state
eh.force_reconnect = false;

eh.registerChaincodeEvent('ccid', 'eventfilter', (block) => {
eh.registerChaincodeEvent('ccid1', 'eventfilter', (block) => {
t.fail('Should not have called success callback');
t.end();
}, (error) =>{
t.pass('Successfully called error callback');
t.end();
});
t.pass('successfully registered chaincode callbacks');

t.equal(Object.keys(eh.chaincodeRegistrants).length, 1, 'Check the size of the chaincodeRegistrants hash table');

eh.registerChaincodeEvent('ccid1', 'eventfilter', (block) => {
// empty method body
}, (error) =>{
// empty method body
});

t.equal(Object.keys(eh.chaincodeRegistrants).length, 1,
'Size of the chaincodeRegistrants hash table should still be 1 because both listeners are for the same chaincode');

eh.registerChaincodeEvent('ccid2', 'eventfilter', (block) => {
// empty method body
}, (error) =>{
// empty method body
});

t.equal(Object.keys(eh.chaincodeRegistrants).length, 2,
'Size of the chaincodeRegistrants hash table should still be 2');

eh.disconnect();
});

Expand Down Expand Up @@ -303,6 +359,7 @@ test('\n\n** EventHub remove block callback \n\n', (t) => {
var brn = eh.registerBlockEvent( blockcallback, blockerrorcallback);
t.pass('successfully registered block callbacks');
eh.unregisterBlockEvent(brn);
t.equal(Object.keys(eh.blockOnEvents).length, 0, 'Check the size of the blockOnEvents hash table');
t.pass('successfuly unregistered block callback');
eh.disconnect();
t.pass('successfuly disconnected eventhub');
Expand All @@ -326,6 +383,7 @@ test('\n\n** EventHub remove transaction callback \n\n', (t) => {
t.pass('successfully registered transaction callbacks');
eh.unregisterTxEvent(txid);
t.pass('successfuly unregistered transaction callback');
t.equal(Object.keys(eh.transactionOnEvents).length, 0, 'Check the size of the transactionOnEvents hash table');
eh.disconnect();
t.pass('successfuly disconnected eventhub');
t.end();
Expand All @@ -347,6 +405,8 @@ test('\n\n** EventHub remove chaincode callback \n\n', (t) => {
t.pass('successfully registered chaincode callbacks');
eh.unregisterChaincodeEvent(cbe);
t.pass('successfuly unregistered chaincode callback');
t.equal(Object.keys(eh.chaincodeRegistrants).length, 0,
'Size of the chaincodeRegistrants hash table should be 0');
eh.disconnect();
t.pass('successfuly disconnected eventhub');
t.end();
Expand Down

0 comments on commit ff69ba4

Please sign in to comment.