Skip to content

Commit

Permalink
FABN-1034: Prevent hang with concurrent transactions
Browse files Browse the repository at this point in the history
Fixes for fabric-network.

- Don't wait for event hub connections to complete in commit event handling.
- Add integration test for concurrent transactions.

Change-Id: I9d2bfa9cee58f31fb25a6d84762785b2438e6295
Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday committed Nov 27, 2018
1 parent 7051035 commit d999468
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 150 deletions.
36 changes: 10 additions & 26 deletions fabric-network/lib/impl/event/abstracteventstrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

'use strict';

const isPromise = require('is-promise');
const logger = require('fabric-network/lib/logger').getLogger('AbstractStrategy');

/**
Expand All @@ -22,44 +21,30 @@ const logger = require('fabric-network/lib/logger').getLogger('AbstractStrategy'
class AbstractEventStrategy {
/**
* Constructor.
* @param {Promise.ChannelEventHub[]} eventHubsPromise Promise to event hubs for which to process events.
* @param {ChannelEventHub[]} eventHubs Event hubs for which to process events.
*/
constructor(eventHubsPromise) {
if (!isPromise(eventHubsPromise)) {
const message = 'Expected event hubs to be a Promise but was ' + typeof eventHubsPromise;
constructor(eventHubs) {
if (eventHubs.length < 1) {
const message = 'No event hubs for strategy';
logger.error('constructor:', message);
throw new Error(message);
}

this.eventHubsPromise = eventHubsPromise;
this.eventHubs = eventHubs;
this.counts = {
success: 0,
fail: 0,
expected: 0
expected: eventHubs.length
};
}

/**
* Called by event handler to obtain the event hubs to which it should listen. Gives an opportunity for
* the strategy to store information on the events it expects to receive for later use in event handling.
* @async
* @returns {ChannelEventHubs[]} connected event hubs.
* @throws {Error} if the connected event hubs do not satisfy the strategy.
* @returns {ChannelEventHubs[]} Event hubs.
*/
async getConnectedEventHubs() {
const eventHubs = await this.eventHubsPromise;
const connectedEventHubs = eventHubs.filter((eventHub) => eventHub.isconnected());

if (connectedEventHubs.length === 0) {
const message = 'No available event hubs found for strategy';
const eventHubNames = eventHubs.map((eventHub) => eventHub.getName());
logger.error('getConnectedEventHubs:', message, eventHubNames);
throw new Error(message);
}

this.counts.expected = connectedEventHubs.length;

return connectedEventHubs;
getEventHubs() {
return this.eventHubs;
}

/**
Expand Down Expand Up @@ -95,8 +80,7 @@ class AbstractEventStrategy {
* @param {Function} successFn Callback function to invoke if the strategy is successful.
* @param {Function} failFn Callback function to invoke if the strategy fails.
*/
// eslint-disable-next-line no-unused-vars
checkCompletion(counts, successFn, failFn) {
checkCompletion(counts, successFn, failFn) { // eslint-disable-line no-unused-vars
throw new Error('AbstractEventStrategy.checkCompletion() not implemented');
}
}
Expand Down
51 changes: 5 additions & 46 deletions fabric-network/lib/impl/event/eventhubfactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

'use strict';

const util = require('util');

const logger = require('fabric-network/lib/logger').getLogger('EventHubFactory');

/**
Expand All @@ -32,51 +30,12 @@ class EventHubFactory {
}

/**
* Gets event hubs for all specified peers. Where possible, the event hubs will be connected.
* @async
* @param {ChannelPeer[]} peers Peers for which event hubs should be connected.
* @returns {ChannelEventHub[]} Event hubs, which may or may not have successfully connected.
*/
async getEventHubs(peers) {
// Get event hubs in parallel as each may take some time
const eventHubPromises = peers.map((peer) => this.getEventHub(peer));
return Promise.all(eventHubPromises);
}

/**
* Get the event hub for a specific peer. Where possible, the event hub will be connected.
* @private
* @async
* @param {ChannelPeer} peer Peer for which the event hub should be connected.
* @returns {ChannelEventHub} Event hub, which may or may not have successfully connected.
* Gets event hubs for all specified peers.
* @param {ChannelPeer[]} peers Peers for which event hubs should be obtained.
* @returns {ChannelEventHub[]} Event hubs, which may or may not be connected.
*/
async getEventHub(peer) {
const eventHub = this.channel.getChannelEventHub(peer.getName());
if (!eventHub.isconnected()) {
await this.connectEventHub(eventHub);
} else {
// event hub is already connected, nothing else needs to be done
logger.debug('getEventHub:', 'event hub already connected:', eventHub.getName());
}
return eventHub;
}

/**
* Attempt to connect a given event hub. Resolves successfully regardless of whether or the event hub connection
* was successful or failed.
* @private
* @async
* @param {ChannelEventHub} eventHub An event hub.
*/
async connectEventHub(eventHub) {
try {
// Need to wrap in an arrow function to protect the value of this in connect()
const connect = util.promisify((callback) => eventHub.connect({}, callback));
await connect();
logger.debug('connectEventHub:', 'successfully connected event hub:', eventHub.getName());
} catch (error) {
logger.info('connectEventHub:', 'failed to connect event hub:', eventHub.getName(), error);
}
getEventHubs(peers) {
return peers.map((peer) => this.channel.getChannelEventHub(peer.getName()));
}
}

Expand Down
5 changes: 2 additions & 3 deletions fabric-network/lib/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TransactionEventHandler {

logger.debug('constructor:', util.format('transactionId = %s, options = %j', this.transactionId, this.options));

this.eventHubs = [];
this.eventHubs = strategy.getEventHubs();
this.respondedEventHubs = new Set();

this.notificationPromise = new Promise((resolve, reject) => {
Expand All @@ -56,7 +56,6 @@ class TransactionEventHandler {
* @async
*/
async startListening() {
this.eventHubs = await this.strategy.getConnectedEventHubs();
if (this.eventHubs.length > 0) {
this._setListenTimeout();
await this._registerTxEventListeners();
Expand All @@ -83,7 +82,7 @@ class TransactionEventHandler {

const promises = this.eventHubs.map((eventHub) => {
return new Promise((resolve) => {
logger.debug('_registerAllEventListeners:', `registerTxEvent(${this.transactionId}) for event hub:`, eventHub.getName());
logger.debug('_registerTxEventListeners:', `registerTxEvent(${this.transactionId}) for event hub:`, eventHub.getName());

eventHub.registerTxEvent(
this.transactionId,
Expand Down
1 change: 0 additions & 1 deletion fabric-network/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
},
"types": "./types/index.d.ts",
"dependencies": {
"is-promise": "^2.1.0",
"nano": "^6.4.4",
"rimraf": "^2.6.2",
"uuid": "^3.2.1"
Expand Down
45 changes: 7 additions & 38 deletions fabric-network/test/impl/event/eventhubfactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,12 @@ describe('EventHubFactory', () => {
stubEventHub1 = sinon.createStubInstance(ChannelEventHub);
stubEventHub1._stubInfo = 'eventHub1';
stubEventHub1.getName.returns('eventHub1');
stubEventHub1.isconnected.returns(true);

// Unconnected event hub that will successfully connect
stubEventHub2 = sinon.createStubInstance(ChannelEventHub);
stubEventHub2._stubInfo = 'eventHub2';
stubEventHub2.getName.returns('eventHub2');
stubEventHub2.isconnected.returns(false);
// Fake a connection success callback
stubEventHub2.connect.callsFake((fullBlocks, callback) => {
// Invoke callback manually rather than using stub.callsArgWith() to ensure the code will hang rather than
// error if the callback is not passed correctly. An error will be swallowed by the code in EventHubFactory
// and produce a false positive test result.
if (typeof callback === 'function') {
callback(null, stubEventHub2);
}
});

stubChannel = sinon.createStubInstance(Channel);
stubChannel.getName.returns('channel');
stubChannel.getChannelEventHub.withArgs(stubPeer1.getName()).returns(stubEventHub1);
Expand Down Expand Up @@ -85,40 +75,19 @@ describe('EventHubFactory', () => {
factory = new EventHubFactory(stubChannel);
});

it('returns empty array for no peer arguments', async () => {
const results = await factory.getEventHubs([]);
it('returns empty array for no peer arguments', () => {
const results = factory.getEventHubs([]);
expect(results).to.be.an('Array').that.is.empty;
});

it('returns eventHub for peer1', async () => {
const results = await factory.getEventHubs([stubPeer1]);
it('returns eventHub for peer1', () => {
const results = factory.getEventHubs([stubPeer1]);
expect(results).to.have.members([stubEventHub1]);
});

it('returns eventHubs for peer1 and peer2', async () => {
const results = await factory.getEventHubs([stubPeer1, stubPeer2]);
it('returns eventHubs for peer1 and peer2', () => {
const results = factory.getEventHubs([stubPeer1, stubPeer2]);
expect(results).to.have.members([stubEventHub1, stubEventHub2]);
});

it('does not reconnect a connected event hub', async () => {
const results = await factory.getEventHubs([stubPeer1]);
expect(results[0].connect.notCalled).to.be.true;
});

it('connects an unconnected event hub', async () => {
const results = await factory.getEventHubs([stubPeer2]);
expect(results[0].connect.called).to.be.true;
});

it('does not fail on error connecting event hub', async () => {
// Fake a connection failure callback
stubEventHub2.connect.callsFake((fullBlocks, callback) => {
if (typeof callback === 'function') {
callback(new Error('connect failed'));
}
});
const results = await factory.getEventHubs([stubPeer2]);
expect(results[0].connect.called).to.be.true;
});
});
});
40 changes: 11 additions & 29 deletions fabric-network/test/impl/event/eventstrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,19 @@ const AnyForTxStrategy = require('fabric-network/lib/impl/event/anyfortxstrategy
describe('Event Strategy Implementations', () => {
let stubEventHub1;
let stubEventHub2;
let stubEventHub3;
let stubSuccessFn;
let stubFailFn;

beforeEach(() => {
// Include _stubInfo property on stubs to enable easier equality comparison in tests

// Connected event hub
stubEventHub1 = sinon.createStubInstance(ChannelEventHub);
stubEventHub1._stubInfo = 'eventHub1';
stubEventHub1.getName.returns('eventHub1');
stubEventHub1.isconnected.returns(true);

// Unconnected event hub
stubEventHub2 = sinon.createStubInstance(ChannelEventHub);
stubEventHub2._stubInfo = 'eventHub2';
stubEventHub2.getName.returns('eventHub2');
stubEventHub2.isconnected.returns(false);

// Connected event hub
stubEventHub3 = sinon.createStubInstance(ChannelEventHub);
stubEventHub3._stubInfo = 'eventHub3';
stubEventHub3.getName.returns('eventHub3');
stubEventHub3.isconnected.returns(true);

stubSuccessFn = sinon.stub();
stubFailFn = sinon.stub();
Expand All @@ -56,29 +45,24 @@ describe('Event Strategy Implementations', () => {
// Common behaviour for all implementations
[AllForTxStrategy, AnyForTxStrategy].forEach((StrategyClass) => describe(StrategyClass.name + ' common behaviour', () => {
describe('#constructor', () => {
it('throws if event hubs argument is not a promise', () => {
expect(() => new StrategyClass('I am not a promise')).to.throw();
it('throws if no event hubs supplied', () => {
expect(() => new StrategyClass([])).to.throw('No event hubs');
});
});

describe('#getConnectedEventHubs', () => {
it('returns only the connected event hubs', async () => {
const strategy = new StrategyClass(Promise.resolve([stubEventHub1, stubEventHub2]));
const results = await strategy.getConnectedEventHubs();
expect(results).to.have.members([stubEventHub1]);
});

it('throws if no event hubs are connected', async () => {
const strategy = new StrategyClass(Promise.resolve([stubEventHub2]));
await expect(strategy.getConnectedEventHubs())
.to.be.rejectedWith('No available event hubs found for strategy');
describe('#getEventHubs', () => {
it('returns the event hubs', () => {
const eventHubs = [stubEventHub1, stubEventHub2];
const strategy = new StrategyClass(eventHubs);
const results = strategy.getEventHubs();
expect(results).to.equal(eventHubs);
});
});
}));

describe('AbstractEventStrategy', () => {
it('#checkCompletion (abstract) throws if not overridden', () => {
const strategy = new AbstractEventStrategy(Promise.resolve([stubEventHub1, stubEventHub2, stubEventHub3]));
const strategy = new AbstractEventStrategy([stubEventHub1, stubEventHub2]);
expect(() => strategy.checkCompletion()).to.throw();
});
});
Expand All @@ -88,8 +72,7 @@ describe('Event Strategy Implementations', () => {

beforeEach(async () => {
// Two connected and one disconnected event hubs
strategy = new AllForTxStrategy(Promise.resolve([stubEventHub1, stubEventHub2, stubEventHub3]));
await strategy.getConnectedEventHubs();
strategy = new AllForTxStrategy([stubEventHub1, stubEventHub2]);
});

it('does not call callbacks on first event of two expected events', () => {
Expand Down Expand Up @@ -139,8 +122,7 @@ describe('Event Strategy Implementations', () => {

beforeEach(async () => {
// Two connected and one disconnected event hubs
strategy = new AnyForTxStrategy(Promise.resolve([stubEventHub1, stubEventHub2, stubEventHub3]));
await strategy.getConnectedEventHubs();
strategy = new AnyForTxStrategy([stubEventHub1, stubEventHub2]);
});

it('calls success callback on first event of two expected events', () => {
Expand Down
8 changes: 4 additions & 4 deletions fabric-network/test/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ describe('TransactionEventHandler', () => {
});

stubStrategy = {
getConnectedEventHubs: sinon.stub(),
getEventHubs: sinon.stub(),
eventReceived: sinon.stub(),
errorReceived: sinon.stub()
};
stubStrategy.getConnectedEventHubs.resolves([stubEventHub]);
stubStrategy.getEventHubs.returns([stubEventHub]);
});

afterEach(() => {
Expand Down Expand Up @@ -157,7 +157,7 @@ describe('TransactionEventHandler', () => {
});

it('succeeds immediately with no event hubs', async () => {
stubStrategy.getConnectedEventHubs.resolves([]);
stubStrategy.getEventHubs.returns([]);
handler = new TransactionEventHandler(transactionId, stubStrategy);
await handler.startListening();
return expect(handler.waitForEvents()).to.be.fulfilled;
Expand Down Expand Up @@ -208,7 +208,7 @@ describe('TransactionEventHandler', () => {
});

it('does not timeout if no event hubs', async () => {
stubStrategy.getConnectedEventHubs.resolves([]);
stubStrategy.getEventHubs.returns([]);
const options = {commitTimeout: 418};
handler = new TransactionEventHandler(transactionId, stubStrategy, options);
await handler.startListening();
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/src/node_cc/example_cc/chaincode.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ const Chaincode = class {

async echo(stub, args) {
if (args.length > 0) {
return shim.success(args[0]);
return shim.success(Buffer.from(args[0]));
} else {
return shim.success();
}
Expand Down
Loading

0 comments on commit d999468

Please sign in to comment.