Skip to content

Commit

Permalink
FABN-865: Simplify sample plug-in event handler
Browse files Browse the repository at this point in the history
- Register and connect event hubs in parallel.
- Remove plug-in strategy prototype.
- Documentation for plug-in commit event handling.

Change-Id: I9e402ef14d32f931ff2f62d60e94d2f1d0c01287
Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday committed Oct 24, 2018
1 parent 96cfd8f commit db0f4cf
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 539 deletions.
112 changes: 112 additions & 0 deletions docs/tutorials/transaction-commit-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
This tutorial describes the approaches that can be selected by users of the
SDK for ensuring that submitted transactions are commited on peers.

### Overview

The submit of a transaction involves several steps:
1. Send proposals to endorsing peers.
2. Send the endorsed transaction to the orderer.
3. The transaction is *eventually* committed on all peers in the network.

In some cases a client application might be happy to proceed immediately after
the transaction is successfully sent to the orderer. In other cases a client
application might need to ensure that the transaction has been committed on
certain peers with which it wants to interact before proceeding.

It is important to note that the blockchain state visible from a specific peer
will remain unchanged until a transaction is committed on that peer. If a
client application queries a peer for state after an endorsed transaction has
been successfully sent to the orderer but before the transaction has been
committed on that peer, the state returned will still be that prior to the
transaction. For example, a query of a bank balance after a transaction to
deduct funds from that bank account is submitted to the orderer will return
the old balance until the transaction is eventually committed on the peer
being queried.

### Event handling strategies

The SDK provides several selectable strategies for how it should wait for
commit events following a transaction invocation. The available strategies
are defined in `DefaultEventHandlerStrategies`. The desired strategy is
(optionally) specified as an argument to `connect()` on the `Gateway`, and
is used for all transaction invocations on Contracts obtained from that
Gateway instance.

If no event handling strategy is specified, `MSPID_SCOPE_ALLFORTX` is used
by default.

```javascript
const { Gateway, DefaultEventHandlerStrategies } = require('fabric-network');

const connectOptions = {
eventHandlerOptions: {
strategy: DefaultEventHandlerStrategies.MSPID_SCOPE_ALLFORTX
}
}

const gateway = new Gateway();
await gateway.connect(connectionProfile, connectOptions);
```

Specifying `null` as the event handling strategy will cause transaction
invocations to return immediately after successfully sending the endorsed
transaction to the orderer. It will not wait for any commit events to be
received from peers.

### Plug-in event handlers

If behavior not provided by the default event handling strategies is
required, it is possible to implement your own event handling. This is
achieved by specifying your own factory function as the event handling
strategy. The factory function should return a *transaction event handler*
object and take two parameters:
1. Transaction ID: `String`
2. Blockchain network: `Network`

The Network provides access to peers and event hubs from which events should
be recieved.

```javascript
function createTransactionEventHandler(transactionId, network) {
/* Your implementation here */
return new MyTransactionEventHandler(transactionId, eventHubs);
}

const connectOptions = {
eventHandlerOptions: {
strategy: createTransactionEventhandler
}
}

const gateway = new Gateway();
await gateway.connect(connectionProfile, connectOptions);
```

The *transaction event handler* object returned must implement the following
lifecycle functions.

```javascript
class MyTransactionEventHandler {
/**
* Called to initiate listening for transaction events.
* @async
* @throws {Error} if not in a state where the handling strategy can be satified and the transaction should
* be aborted. For example, if insufficient event hubs are available.
*/
async startListening() { /* Your implementation here */ }

/**
* Wait until enough events have been received from the event hubs to satisfy the event handling strategy.
* @async
* @throws {Error} if the transaction commit is not successfully confirmed.
*/
async waitForEvents() { /* Your implementation here */ }

/**
* Cancel listening for events.
*/
cancelListening() { /* Your imeplementation here */ }
}
```

For a complete sample plug-in event handler implementation, see [sample-transaction-event-handler.js](https://github.com/hyperledger/fabric-sdk-node/blob/master/test/integration/network-e2e/sample-transaction-event-handler.js).
43 changes: 25 additions & 18 deletions fabric-network/lib/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class TransactionEventHandler {
this.respondedEventHubs = new Set();

this.notificationPromise = new Promise((resolve, reject) => {
this._txResolve = resolve;
this._txReject = reject;
this._resolveNotificationPromise = resolve;
this._rejectNotificationPromise = reject;
});
}

Expand All @@ -58,11 +58,11 @@ class TransactionEventHandler {
async startListening() {
this.eventHubs = await this.strategy.getConnectedEventHubs();
if (this.eventHubs.length > 0) {
this._registerTxEventListeners();
this._setListenTimeout();
await this._registerTxEventListeners();
} else {
logger.debug('startListening: No event hubs');
this._txResolve();
this._resolveNotificationPromise();
}
}

Expand All @@ -78,14 +78,25 @@ class TransactionEventHandler {
}, this.options.commitTimeout * 1000);
}

_registerTxEventListeners() {
for (const eventHub of this.eventHubs) {
logger.debug('_registerTxEventListeners:', `registerTxEvent(${this.transactionId}) for event hub:`, eventHub.getName());
async _registerTxEventListeners() {
const registrationOptions = { unregister: true };

const promises = this.eventHubs.map((eventHub) => {
return new Promise((resolve) => {
logger.debug('_registerAllEventListeners:', `registerTxEvent(${this.transactionId}) for event hub:`, eventHub.getName());

eventHub.registerTxEvent(
this.transactionId,
(txId, code) => this._onEvent(eventHub, txId, code),
(err) => this._onError(eventHub, err),
registrationOptions
);
eventHub.connect();
resolve();
});
});

eventHub.registerTxEvent(this.transactionId,
(txId, code) => this._onEvent(eventHub, txId, code),
(err) => this._onError(eventHub, err));
}
await Promise.all(promises);
}

_timeoutFail() {
Expand All @@ -100,7 +111,6 @@ class TransactionEventHandler {
_onEvent(eventHub, txId, code) {
logger.debug('_onEvent:', util.format('received event for %j with code %j', txId, code));

eventHub.unregisterTxEvent(this.transactionId);
this._receivedEventHubResponse(eventHub);
if (code !== 'VALID') {
const message = util.format('Peer %s has rejected transaction %j with code %j', eventHub.getPeerAddr(), txId, code);
Expand All @@ -113,7 +123,6 @@ class TransactionEventHandler {
_onError(eventHub, err) {
logger.info('_onError:', util.format('received error from peer %s: %s', eventHub.getPeerAddr(), err));

eventHub.unregisterTxEvent(this.transactionId);
this._receivedEventHubResponse(eventHub);
this.strategy.errorReceived(this._strategySuccess.bind(this), this._strategyFail.bind(this));
}
Expand All @@ -130,7 +139,7 @@ class TransactionEventHandler {
logger.info('_strategySuccess:', util.format('strategy success for transaction %j', this.transactionId));

this.cancelListening();
this._txResolve();
this._resolveNotificationPromise();
}

/**
Expand All @@ -142,7 +151,7 @@ class TransactionEventHandler {
logger.warn('_strategyFail:', util.format('strategy fail for transaction %j: %s', this.transactionId, error));

this.cancelListening();
this._txReject(error);
this._rejectNotificationPromise(error);
}

/**
Expand All @@ -162,9 +171,7 @@ class TransactionEventHandler {
logger.debug('cancelListening called');

clearTimeout(this.timeoutHandler);
for (const eventHub of this.eventHubs) {
eventHub.unregisterTxEvent(this.transactionId);
}
this.eventHubs.forEach((eventHub) => eventHub.unregisterTxEvent(this.transactionId));
}

}
Expand Down
36 changes: 21 additions & 15 deletions fabric-network/test/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,27 @@ describe('TransactionEventHandler', () => {
handler.cancelListening();
});

it('calls registerTxEvent() on event hub with transaction ID', async () => {
await handler.startListening();
sinon.assert.calledWith(stubEventHub.registerTxEvent, transactionId);
describe('#startListening', () => {
it('calls registerTxEvent() on event hub with transaction ID', async () => {
await handler.startListening();
sinon.assert.calledWith(stubEventHub.registerTxEvent, transactionId);
});

it('sets auto-unregister option when calling registerTxEvent() on event hub', async () => {
await handler.startListening();
sinon.assert.calledWith(
stubEventHub.registerTxEvent,
sinon.match.any,
sinon.match.any,
sinon.match.any,
sinon.match.has('unregister', true)
);
});

it('calls connect() on event hub', async () => {
await handler.startListening();
sinon.assert.called(stubEventHub.connect);
});
});

it('calls eventReceived() on strategy when event hub sends valid event', async () => {
Expand All @@ -98,18 +116,6 @@ describe('TransactionEventHandler', () => {
sinon.assert.notCalled(stubStrategy.eventReceived);
});

it('calls unregisterTxEvent() on event hub when event hub sends an event', async () => {
await handler.startListening();
stubEventHub._onEventFn(transactionId, 'VALID');
sinon.assert.calledWith(stubEventHub.unregisterTxEvent, transactionId);
});

it('calls unregisterTxEvent() on event hub when event hub sends an error', async () => {
await handler.startListening();
stubEventHub._onEventFn(transactionId, 'VALID');
sinon.assert.calledWith(stubEventHub.unregisterTxEvent, transactionId);
});

it('fails when event hub sends an invalid event', async () => {
const code = 'ERROR_CODE';
await handler.startListening();
Expand Down
Loading

0 comments on commit db0f4cf

Please sign in to comment.