Skip to content

Commit

Permalink
chore: improve docs
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jan 25, 2019
1 parent 7ca7f06 commit 326d73d
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 6 deletions.
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ js-libp2p-pubsub

## Usage

Create your pubsub implementation extending the base protocol.
A pubsub implementation **MUST** override the `_processConnection`, `publish`, `subscribe` and `unsubscribe` functions. `add_peer` and `remove_peer` may be overwritten if the pubsub implementation needs to add custom logic when peers are added and remove. All the remaining functions **MUST NOT** be overwritten.

The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic.

```JavaScript
const Pubsub = require('libp2p-pubsub')
Expand All @@ -43,23 +45,30 @@ class PubsubImplementation extends Pubsub {
}

_processConnection(idB58Str, conn, peer) {
// Required to be implemented by the subclass
// Process each message accordingly
}

publish() {

// Required to be implemented by the subclass
}

subscribe() {

// Required to be implemented by the subclass
}

unsubscribe() {

// Required to be implemented by the subclass
}
}
```

## Implementations using this base protocol

You can use the following implementations as examples for building your own pubsub implementation.

- [libp2p/js-libp2p-floodsub](https://github.com/libp2p/js-libp2p-floodsub)

## Contribute

Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/js-libp2p-pubsub/issues)!
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"dependencies": {
"async": "^2.6.1",
"debug": "^4.1.1",
"err-code": "^1.1.2",
"length-prefixed-stream": "^1.6.0",
"protons": "^1.0.1",
"pull-pushable": "^2.2.0"
Expand Down
90 changes: 88 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ const EventEmitter = require('events')
const pull = require('pull-stream/pull')
const empty = require('pull-stream/sources/empty')
const asyncEach = require('async/each')

const debug = require('debug')
const errcode = require('err-code')

const Peer = require('./peer')
const message = require('./message')
Expand All @@ -18,7 +20,7 @@ class PubsubBaseProtocol extends EventEmitter {
/**
* @param {String} debugName
* @param {String} multicodec
* @param {Object} libp2p
* @param {Object} libp2p libp2p implementation
* @constructor
*/
constructor (debugName, multicodec, libp2p) {
Expand All @@ -44,6 +46,12 @@ class PubsubBaseProtocol extends EventEmitter {
this._dialPeer = this._dialPeer.bind(this)
}

/**
* Add a new connected peer to the peers map.
* @private
* @param {PeerInfo} peer peer info
* @returns {PeerInfo}
*/
_addPeer (peer) {
const id = peer.info.id.toB58String()

Expand All @@ -67,6 +75,12 @@ class PubsubBaseProtocol extends EventEmitter {
return existing
}

/**
* Remove a peer from the peers map if it has no references.
* @private
* @param {Peer} peer peer state
* @returns {PeerInfo}
*/
_removePeer (peer) {
const id = peer.info.id.toB58String()

Expand All @@ -80,6 +94,13 @@ class PubsubBaseProtocol extends EventEmitter {
return peer
}

/**
* Dial a received peer.
* @private
* @param {PeerInfo} peerInfo peer info
* @param {function} callback
* @returns {void}
*/
_dialPeer (peerInfo, callback) {
callback = callback || function noop () { }
const idB58Str = peerInfo.id.toB58String()
Expand Down Expand Up @@ -121,6 +142,13 @@ class PubsubBaseProtocol extends EventEmitter {
})
}

/**
* Dial a received peer.
* @private
* @param {PeerInfo} peerInfo peer info
* @param {Connection} conn connection to the peer
* @param {function} callback
*/
_onDial (peerInfo, conn, callback) {
const idB58Str = peerInfo.id.toB58String()
this.log('connected', idB58Str)
Expand All @@ -131,6 +159,12 @@ class PubsubBaseProtocol extends EventEmitter {
nextTick(() => callback())
}

/**
* On successful connection event.
* @private
* @param {String} protocol connection protocol
* @param {Connection} conn connection to the peer
*/
_onConnection (protocol, conn) {
conn.getPeerInfo((err, peerInfo) => {
if (err) {
Expand All @@ -145,10 +179,27 @@ class PubsubBaseProtocol extends EventEmitter {
})
}

/**
* Overriding the implementation of _processConnection should keep the connection and is
* responsible for processing each RPC message received by other peers.
* @abstract
* @param {string} idB58Str peer id string in base58
* @param {Connection} conn connection
* @param {PeerInfo} peer peer info
* @returns {undefined}
*
*/
_processConnection (idB58Str, conn, peer) {
throw new Error('_processConnection must be implemented by the subclass')
throw errcode('_processConnection must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}

/**
* On connection end event.
* @private
* @param {string} idB58Str peer id string in base58
* @param {PeerInfo} peer peer info
* @param {Error} err error for connection end
*/
_onConnectionEnd (idB58Str, peer, err) {
// socket hang up, means the one side canceled
if (err && err.message !== 'socket hang up') {
Expand All @@ -159,6 +210,41 @@ class PubsubBaseProtocol extends EventEmitter {
this._removePeer(peer)
}

/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
* @abstract
* @param {Array<string>|string} topics
* @param {Array<any>|any} messages
* @returns {undefined}
*
*/
publish (topics, messages) {
throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}

/**
* Overriding the implementation of subscribe should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply send a message for every peer showing interest in the topics
* @abstract
* @param {Array<string>|string} topics
* @returns {undefined}
*/
subscribe (topics) {
throw errcode('subscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}

/**
* Overriding the implementation of unsubscribe should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply send a message for every peer revoking interest in the topics
* @abstract
* @param {Array<string>|string} topics
* @returns {undefined}
*/
unsubscribe (topics) {
throw errcode('unsubscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}

/**
* Mounts the pubsub protocol onto the libp2p node and sends our
* subscriptions to every peer conneceted
Expand Down
12 changes: 12 additions & 0 deletions test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ class PubsubImplementation extends PubsubBaseProtocol {
super('libp2p:pubsub', 'libp2p:pubsub-implementation', libp2p)
}

publish (topics, messages) {
// ...
}

subscribe (topics) {
// ...
}

unsubscribe (topics) {
// ...
}

_processConnection (idB58Str, conn, peer) {
// ...
}
Expand Down

0 comments on commit 326d73d

Please sign in to comment.