Skip to content

Commit

Permalink
[FABN-1100] Event handling abstractions
Browse files Browse the repository at this point in the history
- Handle commit, contract and block events
- Checkpoint each event listener to allow easy event catch up
- New event hub manager to centralise all event hubs used by fabric-network
- Automatic reconnect to an event hub when a registered listener loses connection

Change-Id: I93f3df089e5393fbaf90592528463f1ee5d79b1d
Signed-off-by: Liam Grace <liamgrace.896@gmail.com>
  • Loading branch information
liam-grace committed Mar 27, 2019
1 parent f20b918 commit 404adbe
Show file tree
Hide file tree
Showing 62 changed files with 3,835 additions and 137 deletions.
75 changes: 75 additions & 0 deletions docs/tutorials/event-checkpointer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
This tutorial describes the approaches that can be selected by users of the fabric-network module for replaying missed events emitted by peers.

### Overview

Events are emitted by peers when blocks are committed. Two types of events support checkpointing:
1. Contract events (also known as chaincode events) - Defined in transactions to be emitted. E.g. an event emitted when a commercial paper is sold
2. Block Events - Emitted when a block is committed

In the case of an application crashing and events being missed, applications may still want to execute the event callback for the event it missed. Peers in a Fabric network support event replay, and to support this, the fabric-network module supports checkpointing strategies that track the last block and transactions in that block, that have been seen by the client.

### Checkpointers

The `BaseCheckpoint` class is an interface that is to be used by all Checkpoint classes. fabric-network has one default class, `FileSystemCheckpointer` that is exported as a factory in the `CheckpointFactories`. The `FILE_SYSTEM_CHECKPOINTER` is the default checkpointer.

A checkpoint factory is a function that returns an instance with `BaseCheckpointer` as a parent class. These classes implement the `async save(channelName, listenerName)` and `async load()` functions.

A checkpointer is called each time the event callback is triggered.

The checkpointer can be set when connecting to a gateway or when creating the event listener.
```javascript
const { Gateway, CheckpointFactories } = require('fabric-network');

const connectOptions = {
checkpointer: {
factory: CheckpointFactories.FILE_SYSTEM_CHECKPOINTER,
options: {} // Options usable by the factory
}
};

const gateway = new Gateway()
await gateway.connect(connectionProfile, connectOptions);
```

Configuring a listener to be checkpointed required two properties:
1. `replay : boolean` - Tells the listener to record a checkpoint. Required if checkpointing is desired
2. `checkpointer : BaseCheckpointer` - If a checkpointer is not specified in the gateway, it must be specified here
```javascript
const listener = await contract.addContractListener('saleEventListener', 'sale', (err, event, blockNumber, txId) => {
if (err) {
console.error(err);
return;
}
// -- Do something
}, {replay: true, checkpointer: {factory: MyCheckpointer});
```
### Custom Checkpointer
Users can configure their own checkpointer. This requires two components to be created:
1. The Checkpointer class
2. The Factory
```javascript
class DbCheckpointer extends BaseCheckpointer {
constructor(channelName, listenerName, dbOptions) {
super(channelName, listenerName);
this.db = new Db(dbOptions);
}

async save(transactionId, blockNumber) { /* Your implementation using a database */ }

async load() { /* Your implementation using a database*/ }
}

function BD_CHECKPOINTER_FACTORY(channelName, listenerName, options) {
return new DbCheckpointer(channelName, listenerName, options);
}

const gateway = new Gateway();
await gateway.connect({
checkpointer: {
factory: DB_CHECKPOINTER_FACTORY,
options: {host: 'http://localhost'}
});
```
43 changes: 43 additions & 0 deletions docs/tutorials/event-hub-management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
This tutorial describes how to define the behavior of the event hub selection strategy used when event hubs disconnect or new event hubs are required.

The `ChannelEventHub` is a fabric-client class that receives contract, commit and block events from the event hub within a peer. The `fabric-network` abstracts the event hub away, and instead uses an event hub selection strategy to create new event hub instances or reuse existing instances.

The interface for an event hub selection strategy is as follows:

```javascript
class BaseEventHubSelectionStrategy {
/**
* Returns the next peer in the list per the strategy implementation
* @returns {ChannelPeer}
*/
getNextPeer() {
// Peer selection logic. Called whenever an event hub is required
}

/**
* Updates the status of a peers event hub
* @param {ChannelPeer} deadPeer The peer that needs its status updating
*/
updateEventHubAvailability(deadPeer) {
// Peer availability update logic. Called whenever the event hub disconnects.
}
}
```

The event hub strategy exists at a gateway level, and is included in the `GatewayOptions` in the form of a factory function. The factory gives the event hub selection strategy instance a list of peers that it can select event hubs from.

```javascript
function EXAMPLE_EVENT_HUB_SELECTION_FACTORY(network) {
const orgPeers = getOrganizationPeers(network);
const eventEmittingPeers = filterEventEmittingPeers(orgPeers);
return new ExampleEventHubSelectionStrategy(eventEmittingPeers);
}

const gateway = new Gateway();
await gateway.connect(connectionProfile, {
...
eventHubSelectionOptions: {
strategy: EXAMPLE_EVENT_HUB_SELECTION_FACTORY
}
})
```
111 changes: 111 additions & 0 deletions docs/tutorials/listening-to-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
This tutorial describes the different ways to listen to events emitted by a network using the fabric-network module.

### Overview

There are three event types that can be subscribed to:
1. Contract events - Those emitted explicitly by the chaincode developer within a transaction
2. Transaction (Commit) events - Those emitted automatically when a transaction is committed after an invoke
3. Block events - Those emitted automatically when a block is committed

Listening for these events allows the application to react without directly calling a transaction. This is ideal in use cases such as tracking network analytics.

### Usage

Each listener type takes at least one parameter, the event callback. This is the function that is called when an event is detected. This callback is overridden by the `fabric-network` in order to support `Checkpointing`.

#### Contract events

```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
const network = await gateway.getNetwork('my-channel');
const contract = network.getContract('my-contract');

/**
* @param {String} listenerName the name of the event listener
* @param {String} eventName the name of the event being listened to
* @param {Function} callback the callback function with signature (error, event, blockNumber, transactionId, status)
* @param {Object} options
**/
const listener = await contract.addContractListener('my-contract-listener', 'sale', (error, event, blockNumber, transactionId, status) => {
if (err) {
console.error(err);
return;
}
console.log(`Block Number: ${blockNumber} Transaction ID: ${transactionId} Status: ${status}`);
})
```
Notice that there is no need to specify an event hub, as the `EventHubSelectionStrategy` will select it automatically.

#### Block events

```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
const network = await gateway.getNetwork('my-channel');

/**
* @param {String} listenerName the name of the event listener
* @param {Function} callback the callback function with signature (error, blockNumber, transactionId, status)
* @param {Object} options
**/
const listener = await network.addBlockListener('my-block-listener', (error, block) => {
if (err) {
console.error(err);
return;
}
console.log(`Block: ${block}`);
}, {filtered: true /*false*/})
```
When listening for block events, it is important to specify if you want a filtered or none filtered event, as this determines which event hub is compatible with the request.

#### Commit events

Option 1:
```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
const network = await gateway.getNetwork('my-channel');
const contract = network.getContract('my-contract');

const transaction = contract.newTransaction('sell');
/**
* @param {String} transactionId the name of the event listener
* @param {Function} callback the callback function with signature (error, transactionId, status, blockNumber)
* @param {Object} options
**/
const listener = await network.addCommitListener(transaction.getTransactionID().getTransactionID(), (error, transactionId, status, blockNumber) => {
if (err) {
console.error(err);
return;
}
console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`);
}, {});
```

Option 2:
```javascript
const gateway = new Gateway();
await gateway.connect(connectionProfile, gatewayOptions);
const network = await gateway.getNetwork('my-channel');
const contract = network.getContract('my-contract');

const transaction = contract.newTransaction('sell');
/**
* @param {String} transactionId the name of the event listener
* @param {Function} callback the callback function with signature (error, transactionId, status, blockNumber)
* @param {Object} options
**/
const listener = await transaction.addCommitListener((error, transactionId, status, blockNumber) => {
if (err) {
console.error(err);
return;
}
console.log(`Transaction ID: ${transactionId} Status: ${status} Block number: ${blockNumber}`);
});
```





4 changes: 2 additions & 2 deletions docs/tutorials/transaction-commit-events.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
This tutorial describes the approaches that can be selected by users of the
fabric-network module for ensuring that submitted transactions are commited
fabric-network module for ensuring that submitted transactions are committed
on peers.

### Overview
Expand Down Expand Up @@ -61,7 +61,7 @@ strategies, it is possible to implement your own event handling. This is
achieved by specifying your own factory function as the event handling
strategy. The factory function should return a *transaction event handler*
object and take two parameters:
1. Transaction ID: `String`
1. Transaction: `Transaction`
2. Blockchain network: `Network`

The Network provides access to peers and event hubs from which events should
Expand Down
9 changes: 9 additions & 0 deletions docs/tutorials/tutorials.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@
"query-peers": {
"title": "fabric-network: How to select peers for evaluating transactions (queries)"
},
"event-checkpointer": {
"title": "fabric-network: How to replay missed events"
},
"event-hub-management": {
"title": "fabric-network: How to automatically select and reconnect to event hubs"
},
"listening-to-events": {
"title": "fabric-network: How to listen to events"
},
"grpc-settings": {
"title": "fabric-client: How to set gRPC settings"
},
Expand Down
17 changes: 11 additions & 6 deletions fabric-client/lib/ChannelEventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const logger = utils.getLogger('ChannelEventHub.js');
const {Identity} = require('./msp/identity');
const TransactionID = require('./TransactionID');
const util = require('util');
const EventHubDisconnectError = require('./errors/EventHubDisconnectError');

const BlockDecoder = require('./BlockDecoder.js');

Expand Down Expand Up @@ -588,7 +589,7 @@ class ChannelEventHub {

/**
* Disconnects the ChannelEventHub from the peer event source.
* Will close all event listeners and send an Error object
* Will close all event listeners and send an EventHubDisconnectError object
* with the message "ChannelEventHub has been shutdown" to
* all listeners that provided an "onError" callback.
*/
Expand All @@ -597,14 +598,14 @@ class ChannelEventHub {
logger.debug('disconnect - disconnect is running');
} else {
this._disconnect_running = true;
this._disconnect(new Error('ChannelEventHub has been shutdown'));
this._disconnect(new EventHubDisconnectError('ChannelEventHub has been shutdown'));
this._disconnect_running = false;
}
}

/**
* Disconnects the ChannelEventHub from the fabric peer service.
* Will close all event listeners and send an Error object
* Will close all event listeners and send an EventHubDisconnectError object
* with the message "ChannelEventHub has been shutdown" to
* all listeners that provided an "onError" callback.
*/
Expand Down Expand Up @@ -1387,6 +1388,10 @@ class ChannelEventHub {
}
}

isFiltered() {
return !!this._filtered_stream;
}

/*
* private internal method for processing block events
* @param {Object} block protobuf object
Expand Down Expand Up @@ -1462,7 +1467,7 @@ class ChannelEventHub {
}
if (trans_reg.disconnect) {
logger.debug('_callTransactionListener - automatically disconnect');
this._disconnect(new Error('Shutdown due to disconnect on transaction id registration'));
this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on transaction id registration'));
}
}

Expand Down Expand Up @@ -1556,7 +1561,7 @@ class ChannelEventHub {
logger.debug('_callChaincodeListener - automatically unregister tx listener for %s', tx_id);
}
if (chaincode_reg.event_reg.disconnect) {
this._disconnect(new Error('Shutdown due to disconnect on transaction id registration'));
this._disconnect(new EventHubDisconnectError('Shutdown due to disconnect on transaction id registration'));
}
} else {
logger.debug('_callChaincodeListener - NOT calling chaincode listener callback');
Expand All @@ -1577,7 +1582,7 @@ class ChannelEventHub {
this._start_stop_registration.unregister_action();
}
if (this._start_stop_registration.disconnect) {
this._disconnect(new Error('Shutdown due to end block number has been seen'));
this._disconnect(new EventHubDisconnectError('Shutdown due to end block number has been seen'));
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions fabric-client/lib/errors/EventHubDisconnectError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright 2019, 2018 IBM All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

'use strict';

/**
* Error when an event hub is disconnected.
* @interface
* @memberof module:fabric-network
* @property {String} [message] The error message
*/
class EventHubDisconnectError extends Error {
constructor(message) {
super(message);
}
}

module.exports = EventHubDisconnectError;
2 changes: 2 additions & 0 deletions fabric-network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,6 @@ module.exports.FileSystemWallet = require('./lib/impl/wallet/filesystemwallet');
module.exports.CouchDBWallet = require('./lib/impl/wallet/couchdbwallet');
module.exports.DefaultEventHandlerStrategies = require('fabric-network/lib/impl/event/defaulteventhandlerstrategies');
module.exports.DefaultQueryHandlerStrategies = require('fabric-network/lib/impl/query/defaultqueryhandlerstrategies');
module.exports.CheckpointFactories = require('fabric-network/lib/impl/event/checkpointfactories');
module.exports.EventHubSelectionStrategies = require('fabric-network/lib/impl/event/defaulteventhubselectionstrategies');
module.exports.TimeoutError = require('fabric-network/lib/errors/timeouterror');
Loading

0 comments on commit 404adbe

Please sign in to comment.