Skip to content

Commit

Permalink
chore: remove peer-info usage
Browse files Browse the repository at this point in the history
BREAKING CHANGE: using new topology api with peer-id instead of peer-info and new pubsub internal peer data structure
  • Loading branch information
vasco-santos committed Apr 23, 2020
1 parent f2b67a2 commit 602ccaa
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 127 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const registrar = {
}
}

const gsub = new Gossipsub(peerInfo, registrar, options)
const gsub = new Gossipsub(peerId, registrar, options)

await gsub.start()

Expand All @@ -62,7 +62,7 @@ gsub.publish('fruit', new Buffer('banana'))

```js
const options = {…}
const gossipsub = new Gossipsub(peerInfo, registrar, options)
const gossipsub = new Gossipsub(peerId, registrar, options)
```

Options is an optional object with the following key-value pairs:
Expand Down
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@
"err-code": "^2.0.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.0.1",
"libp2p-pubsub": "~0.4.1",
"libp2p-pubsub": "^0.5.0",
"p-map": "^3.0.0",
"peer-id": "~0.13.3",
"peer-info": "~0.17.0",
"protons": "^1.0.1",
"time-cache": "^0.3.0"
},
Expand All @@ -55,7 +54,7 @@
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"it-pair": "^1.0.0",
"libp2p-floodsub": "^0.20.0",
"libp2p-floodsub": "^0.21.0",
"lodash": "^4.17.15",
"mocha": "^6.2.1",
"p-times": "^2.1.0",
Expand Down
4 changes: 2 additions & 2 deletions src/heartbeat.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Heartbeat {
return
}

this.gossipsub.log('HEARTBEAT: Add mesh link to %s in %s', peer.info.id.toB58String(), topic)
this.gossipsub.log('HEARTBEAT: Add mesh link to %s in %s', peer.id.toB58String(), topic)
peers.add(peer)
const peerGrafts = tograft.get(peer)
if (!peerGrafts) {
Expand All @@ -102,7 +102,7 @@ class Heartbeat {
peersArray = peersArray.slice(0, idontneed)

peersArray.forEach((peer) => {
this.gossipsub.log('HEARTBEAT: Remove mesh link to %s in %s', peer.info.id.toB58String(), topic)
this.gossipsub.log('HEARTBEAT: Remove mesh link to %s in %s', peer.id.toB58String(), topic)
peers.delete(peer)
const peerPrunes = toprune.get(peer)
if (!peerPrunes) {
Expand Down
4 changes: 2 additions & 2 deletions src/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

/// <reference types="node"/>

import PeerInfo = require('peer-info');
import PeerId = require('peer-id');

export interface Registrar {
handle: Function;
Expand All @@ -29,7 +29,7 @@ import * as Events from "events";
interface GossipSub extends Events.EventEmitter {}

declare class GossipSub {
constructor(peerInfo: PeerInfo, registrar: Registrar, options: Options);
constructor(peerId: PeerId, registrar: Registrar, options: Options);
publish(topic: string, data: Buffer): Promise<void>;
start(): Promise<void>;
stop(): Promise<void>;
Expand Down
42 changes: 21 additions & 21 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const { utils } = require('libp2p-pubsub')

const PeerInfo = require('peer-info')
const PeerId = require('peer-id')

const BasicPubsub = require('./pubsub')
const { MessageCache } = require('./messageCache')
Expand All @@ -13,7 +13,7 @@ const Heartbeat = require('./heartbeat')

class GossipSub extends BasicPubsub {
/**
* @param {PeerInfo} peerInfo instance of the peer's PeerInfo
* @param {PeerId} peerId instance of the peer's PeerId
* @param {Object} registrar
* @param {function} registrar.handle
* @param {function} registrar.register
Expand All @@ -26,15 +26,15 @@ class GossipSub extends BasicPubsub {
* @param {Object} [options.messageCache] override the default MessageCache
* @constructor
*/
constructor (peerInfo, registrar, options = {}) {
if (!PeerInfo.isPeerInfo(peerInfo)) {
throw new Error('peer info must be an instance of `peer-info`')
constructor (peerId, registrar, options = {}) {
if (!PeerId.isPeerId(peerId)) {
throw new Error('peerId must be an instance of `peer-id`')
}

super({
debugName: 'libp2p:gossipsub',
multicodec: constants.GossipSubID,
peerInfo,
peerId,
registrar,
options
})
Expand Down Expand Up @@ -95,7 +95,7 @@ class GossipSub extends BasicPubsub {
* Removes a peer from the router
* @override
* @param {Peer} peer
* @returns {PeerInfo}
* @returns {Peer}
*/
_removePeer (peer) {
super._removePeer(peer)
Expand Down Expand Up @@ -162,13 +162,13 @@ class GossipSub extends BasicPubsub {

// Emit to floodsub peers
this.peers.forEach((peer) => {
if (peer.info.protocols.has(constants.FloodSubID) &&
peer.info.id.toB58String() !== msg.from &&
if (peer.protocols.includes(constants.FloodSubID) &&
peer.id.toB58String() !== msg.from &&
utils.anyMatch(peer.topics, topics) &&
peer.isWritable
) {
peer.sendMessages(utils.normalizeOutRpcMessages([msg]))
this.log('publish msg on topics - floodsub', topics, peer.info.id.toB58String())
this.log('publish msg on topics - floodsub', topics, peer.id.toB58String())
}
})

Expand All @@ -178,11 +178,11 @@ class GossipSub extends BasicPubsub {
return
}
this.mesh.get(topic).forEach((peer) => {
if (!peer.isWritable || peer.info.id.toB58String() === msg.from) {
if (!peer.isWritable || peer.id.toB58String() === msg.from) {
return
}
peer.sendMessages(utils.normalizeOutRpcMessages([msg]))
this.log('publish msg on topic - meshsub', topic, peer.info.id.toB58String())
this.log('publish msg on topic - meshsub', topic, peer.id.toB58String())
})
})
}
Expand Down Expand Up @@ -213,7 +213,7 @@ class GossipSub extends BasicPubsub {
return
}

this.log('IHAVE: Asking for %d messages from %s', iwant.size, peer.info.id.toB58String())
this.log('IHAVE: Asking for %d messages from %s', iwant.size, peer.id.toB58String())

return {
messageIDs: Array.from(iwant)
Expand Down Expand Up @@ -244,7 +244,7 @@ class GossipSub extends BasicPubsub {
return
}

this.log('IWANT: Sending %d messages to %s', ihave.size, peer.info.id.toB58String())
this.log('IWANT: Sending %d messages to %s', ihave.size, peer.id.toB58String())

return Array.from(ihave.values())
}
Expand All @@ -263,7 +263,7 @@ class GossipSub extends BasicPubsub {
if (!peers) {
prune.push(topicID)
} else {
this.log('GRAFT: Add mesh link from %s in %s', peer.info.id.toB58String(), topicID)
this.log('GRAFT: Add mesh link from %s in %s', peer.id.toB58String(), topicID)
peers.add(peer)
peer.topics.add(topicID)
this.mesh.set(topicID, peers)
Expand Down Expand Up @@ -293,7 +293,7 @@ class GossipSub extends BasicPubsub {
prune.forEach(({ topicID }) => {
const peers = this.mesh.get(topicID)
if (peers) {
this.log('PRUNE: Remove mesh link to %s in %s', peer.info.id.toB58String(), topicID)
this.log('PRUNE: Remove mesh link to %s in %s', peer.id.toB58String(), topicID)
peers.delete(peer)
peer.topics.delete(topicID)
}
Expand Down Expand Up @@ -352,7 +352,7 @@ class GossipSub extends BasicPubsub {
this.mesh.set(topic, peers)
}
this.mesh.get(topic).forEach((peer) => {
this.log('JOIN: Add mesh link to %s in %s', peer.info.id.toB58String(), topic)
this.log('JOIN: Add mesh link to %s in %s', peer.id.toB58String(), topic)
this._sendGraft(peer, topic)
})
})
Expand All @@ -373,7 +373,7 @@ class GossipSub extends BasicPubsub {
const meshPeers = this.mesh.get(topic)
if (meshPeers) {
meshPeers.forEach((peer) => {
this.log('LEAVE: Remove mesh link to %s in %s', peer.info.id.toB58String(), topic)
this.log('LEAVE: Remove mesh link to %s in %s', peer.id.toB58String(), topic)
this._sendPrune(peer, topic)
})
this.mesh.delete(topic)
Expand Down Expand Up @@ -405,7 +405,7 @@ class GossipSub extends BasicPubsub {

// floodsub peers
peersInTopic.forEach((peer) => {
if (peer.info.protocols.has(constants.FloodSubID)) {
if (peer.protocols.includes(constants.FloodSubID)) {
tosend.add(peer)
}
})
Expand Down Expand Up @@ -436,7 +436,7 @@ class GossipSub extends BasicPubsub {
})
// Publish messages to peers
tosend.forEach((peer) => {
if (peer.info.id.toB58String() === msgObj.from) {
if (peer.id.toB58String() === msgObj.from) {
return
}
this._sendRpc(peer, { msgs: [msgObj] })
Expand Down Expand Up @@ -591,7 +591,7 @@ class GossipSub extends BasicPubsub {
* @returns {void}
*/
_pushGossip (peer, controlIHaveMsgs) {
this.log('Add gossip to %s', peer.info.id.toB58String())
this.log('Add gossip to %s', peer.id.toB58String())
const gossip = this.gossip.get(peer) || []
this.gossip.set(peer, gossip.concat(controlIHaveMsgs))
}
Expand Down
30 changes: 15 additions & 15 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ class BasicPubSub extends Pubsub {
* @param {Object} props
* @param {String} props.debugName log namespace
* @param {string} props.multicodec protocol identificer to connect
* @param {PeerInfo} props.peerInfo peer's peerInfo
* @param {PeerId} props.peerId peer's peerId
* @param {Object} props.registrar registrar for libp2p protocols
* @param {function} props.registrar.handle
* @param {function} props.registrar.register
* @param {function} props.registrar.unregister
* @param {Object} [props.options]
* @param {bool} [props.options.emitSelf] if publish should emit to self, if subscribed, defaults to false
* @param {bool} [props.options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
* @param {bool} [props.options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
* @param {boolean} [props.options.emitSelf] if publish should emit to self, if subscribed, defaults to false
* @param {boolean} [props.options.gossipIncoming] if incoming messages on a subscribed topic should be automatically gossiped, defaults to true
* @param {boolean} [props.options.fallbackToFloodsub] if dial should fallback to floodsub, defaults to true
* @constructor
*/
constructor ({ debugName, multicodec, peerInfo, registrar, options = {} }) {
constructor ({ debugName, multicodec, peerId, registrar, options = {} }) {
const multicodecs = [multicodec]
const _options = {
emitSelf: false,
Expand All @@ -48,7 +48,7 @@ class BasicPubSub extends Pubsub {
super({
debugName,
multicodecs,
peerInfo,
peerId,
registrar,
..._options
})
Expand Down Expand Up @@ -83,13 +83,13 @@ class BasicPubSub extends Pubsub {
/**
* Peer connected successfully with pubsub protocol.
* @override
* @param {PeerInfo} peerInfo peer info
* @param {PeerId} peerId peer id
* @param {Connection} conn connection to the peer
* @returns {Promise<void>}
*/
async _onPeerConnected (peerInfo, conn) {
await super._onPeerConnected(peerInfo, conn)
const idB58Str = peerInfo.id.toB58String()
async _onPeerConnected (peerId, conn) {
await super._onPeerConnected(peerId, conn)
const idB58Str = peerId.toB58String()
const peer = this.peers.get(idB58Str)

if (peer && peer.isWritable) {
Expand Down Expand Up @@ -123,7 +123,7 @@ class BasicPubSub extends Pubsub {
}
)
} catch (err) {
this._onPeerDisconnected(peer.info, err)
this._onPeerDisconnected(peer.id, err)
}
}

Expand Down Expand Up @@ -167,7 +167,7 @@ class BasicPubSub extends Pubsub {
topicSet.delete(peer)
}
})
this.emit('pubsub:subscription-change', peer.info, peer.topics, subs)
this.emit('pubsub:subscription-change', peer.id, peer.topics, subs)
}

if (msgs.length) {
Expand Down Expand Up @@ -207,7 +207,7 @@ class BasicPubSub extends Pubsub {
* @param {rpc.RPC.Message} msg
*/
_processRpcMessage (msg) {
if (this.peerInfo.id.toB58String() === msg.from && !this._options.emitSelf) {
if (this.peerId.toB58String() === msg.from && !this._options.emitSelf) {
return
}

Expand Down Expand Up @@ -369,7 +369,7 @@ class BasicPubSub extends Pubsub {
topics = utils.ensureArray(topics)
messages = utils.ensureArray(messages)

const from = this.peerInfo.id.toB58String()
const from = this.peerId.toB58String()

const buildMessage = (msg, cb) => {
const seqno = utils.randomSeqno()
Expand Down Expand Up @@ -448,7 +448,7 @@ class BasicPubSub extends Pubsub {
// Adds all peers using our protocol
let peers = []
peersInTopic.forEach((peer) => {
if (peer.info.protocols.has(GossipSubID)) {
if (peer.protocols.includes(GossipSubID)) {
peers.push(peer)
}
})
Expand Down
Loading

0 comments on commit 602ccaa

Please sign in to comment.