Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: no more circular dependency, become a good block of libp2p #13

Merged
merged 5 commits into from
Jul 17, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"protocol-buffers": "^3.2.1",
"pull-length-prefixed": "^1.3.0",
"pull-stream": "^3.6.0",
"safe-buffer": "^5.1.1",
"varint": "^5.0.0",
"xor-distance": "^1.0.0"
},
Expand All @@ -67,17 +68,9 @@
"datastore-level": "^0.4.2",
"dirty-chai": "^2.0.1",
"interface-connection": "^0.3.2",
"left-pad": "^1.1.3",
"libp2p": "^0.10.1",
"libp2p-mdns": "^0.7.1",
"libp2p-multiplex": "^0.4.4",
"libp2p-railing": "^0.5.2",
"libp2p-secio": "^0.6.8",
"libp2p-spdy": "^0.10.6",
"libp2p-swarm": "^0.29.2",
"libp2p-swarm": "^0.30.0",
"libp2p-tcp": "^0.10.1",
"libp2p-webrtc-star": "^0.11.0",
"libp2p-websockets": "^0.10.0",
"lodash": "^4.17.4",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
Expand All @@ -90,4 +83,4 @@
"Friedel Ziegelmayer <dignifiedquire@gmail.com>",
"Pedro Teixeira <i@pgte.me>"
]
}
}
91 changes: 42 additions & 49 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const errors = require('./errors')
const privateApi = require('./private')
const Providers = require('./providers')
const Message = require('./message')
const assert = require('assert')

/**
* A DHT implementation modeled after Kademlia with Coral and S/Kademlia modifications.
Expand All @@ -28,70 +29,74 @@ class KadDHT {
/**
* Create a new KadDHT.
*
* @param {Libp2p} libp2p
* @param {number} [kBucketSize=20]
* @param {Datastore} [datastore=MemoryDatastore]
* @param {swarm} Swarm
* @param {options} {kBucketSize=20, datastore=MemoryDatastore}
*/
constructor (libp2p, kBucketSize, datastore) {
constructor (swarm, options) {
assert(swarm, 'libp2p-kad-dht requires a instance of kad-dht')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you mean "libp2p-kad-dht requires a instance of swarm"?

options = options || {}

/**
* Local reference to libp2p.
* Local reference to libp2p-swarm.
*
* @type {Libp2p}
* @type {Swarm}
*/
this.libp2p = libp2p
this.swarm = swarm

/**
* k-bucket size, defaults to 20.
*
* @type {number}
*/
this.kBucketSize = kBucketSize || 20
this.kBucketSize = options.kBucketSize || 20

/**
* Number of closest peers to return on kBucket search
* Number of closest peers to return on kBucket search, default 6
*
* @type {number}
*/
this.ncp = 6
this.ncp = options.ncp || 6

/**
* The routing table.
*
* @type {RoutingTable}
*/
this.routingTable = new RoutingTable(this.self.id, this.kBucketSize)
this.routingTable = new RoutingTable(this.peerInfo.id, this.kBucketSize)

/**
* Reference to the datastore, uses an in-memory store if none given.
*
* @type {Datastore}
*/
this.datastore = datastore || new MemoryStore()
this.datastore = options.datastore || new MemoryStore()

/**
* Provider management
*
* @type {Providers}
*/
this.providers = new Providers(this.datastore, this.self.id)

this.validators = {
pk: libp2pRecord.validator.validators.pk
}
this.providers = new Providers(this.datastore, this.peerInfo.id)

this.selectors = {
pk: libp2pRecord.selection.selectors.pk
}
this.validators = { pk: libp2pRecord.validator.validators.pk }
this.selectors = { pk: libp2pRecord.selection.selectors.pk }

this.network = new Network(this, this.libp2p)
this.network = new Network(this)

this._log = utils.logger(this.self.id)
this._log = utils.logger(this.peerInfo.id)

// Inject private apis so we don't clutter up this file
const pa = privateApi(this)
Object.keys(pa).forEach((name) => {
this[name] = pa[name]
})
Object.keys(pa).forEach((name) => { this[name] = pa[name] })
}

/**
* Is this DHT running.
*
* @type {bool}
*/
get isStarted () {
return this._running
}

/**
Expand All @@ -118,29 +123,17 @@ class KadDHT {
this.network.stop(callback)
}

/**
* Alias to the peerbook from libp2p
*/
get peerBook () {
return this.libp2p.peerBook
}

/**
* Is this DHT running.
*
* @type {bool}
*/
get isRunning () {
return this._running
}

/**
* Local peer (yourself)
*
* @type {PeerInfo}
*/
get self () {
return this.libp2p.peerInfo
get peerInfo () {
return this.swarm._peerInfo
}

get peerBook () {
return this.swarm._peerBook
}

/**
Expand Down Expand Up @@ -205,7 +198,7 @@ class KadDHT {
}

waterfall([
(cb) => utils.createPutRecord(key, value, this.self.id, sign, cb),
(cb) => utils.createPutRecord(key, value, this.peerInfo.id, sign, cb),
(rec, cb) => waterfall([
(cb) => this._putLocal(key, rec, cb),
(cb) => this.getClosestPeers(key, cb),
Expand Down Expand Up @@ -266,7 +259,7 @@ class KadDHT {
if (err == null) {
vals.push({
val: localRec.value,
from: this.self.id
from: this.peerInfo.id
})
}

Expand Down Expand Up @@ -342,7 +335,7 @@ class KadDHT {
// local check
let info
if (this.peerBook.has(peer)) {
info = this.libp2p.peerBook.get(peer)
info = this.peerBook.get(peer)

if (info && info.id.pubKey) {
this._log('getPublicKey: found local copy')
Expand All @@ -355,7 +348,7 @@ class KadDHT {
this._getPublicKeyFromNode(peer, (err, pk) => {
if (!err) {
info.id = new PeerId(peer.id, null, pk)
this.libp2p.peerBook.put(info)
this.peerBook.put(info)

return callback(null, pk)
}
Expand All @@ -369,7 +362,7 @@ class KadDHT {

const pk = crypto.unmarshalPublicKey(value)
info.id = new PeerId(peer, null, pk)
this.libp2p.peerBook.put(info)
this.peerBook.put(info)

callback(null, pk)
})
Expand All @@ -389,7 +382,7 @@ class KadDHT {
this._log('provide: %s', key.toBaseEncodedString())

waterfall([
(cb) => this.providers.addProvider(key, this.self.id, cb),
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb),
(cb) => this.getClosestPeers(key.buffer, cb),
(peers, cb) => {
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
Expand Down
44 changes: 22 additions & 22 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ class Network {
/**
* Create a new network.
*
* @param {DHT} dht
* @param {Libp2p} libp2p
* @param {KadDHT} self
*/
constructor (dht, libp2p) {
this.dht = dht
this.libp2p = libp2p
constructor (self) {
this.dht = self
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT
this._log = utils.logger(this.dht.self.id, 'net')
this._log = utils.logger(this.dht.peerInfo.id, 'net')
this._rpc = rpc(this.dht)
this._onPeerConnected = this._onPeerConnected.bind(this)
this._online = false
this._running = false
}

/**
Expand All @@ -43,17 +41,18 @@ class Network {
return cb(new Error('Network is already running'))
}

if (!this.dht.isRunning || !this.dht.libp2p.isStarted()) {
// TODO add a way to check if swarm has started or not
if (!this.dht.isStarted) {
return cb(new Error('Can not start network'))
}

this._online = true
this._running = true

// handle incoming connections
this.libp2p.swarm.handle(c.PROTOCOL_DHT, this._rpc)
this.dht.swarm.handle(c.PROTOCOL_DHT, this._rpc)

// handle new connections
this.libp2p.on('peer:connect', this._onPeerConnected)
this.dht.swarm.on('peer-mux-established', this._onPeerConnected)

cb()
}
Expand All @@ -67,13 +66,13 @@ class Network {
stop (callback) {
const cb = (err) => setImmediate(() => callback(err))

if (!this.isOnline) {
if (!this.dht.isStarted && !this.isStarted) {
return cb(new Error('Network is already stopped'))
}
this._online = false
this.libp2p.removeListener('peer:connect', this._onPeerConnected)
this._running = false
this.dht.swarm.removeListener('peer-mux-established', this._onPeerConnected)

this.libp2p.swarm.unhandle(c.PROTOCOL_DHT)
this.dht.swarm.unhandle(c.PROTOCOL_DHT)
cb()
}

Expand All @@ -82,8 +81,8 @@ class Network {
*
* @type {bool}
*/
get isOnline () {
return this._online
get isStarted () {
return this._running
}

/**
Expand All @@ -92,7 +91,8 @@ class Network {
* @type {bool}
*/
get isConnected () {
return this.dht.libp2p.isStarted() && this.dht.isRunning && this.isOnline
// TODO add a way to check if swarm has started or not
return this.dht.isStarted && this.isStarted
}

/**
Expand All @@ -107,7 +107,7 @@ class Network {
return this._log.error('Network is offline')
}

this.libp2p.dial(peer, c.PROTOCOL_DHT, (err, conn) => {
this.dht.swarm.dial(peer, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return this._log('%s does not support protocol: %s', peer.id.toB58String(), c.PROTOCOL_DHT)
}
Expand Down Expand Up @@ -140,7 +140,7 @@ class Network {
}

this._log('sending to: %s', to.toB58String())
this.dht.libp2p.dial(to, c.PROTOCOL_DHT, (err, conn) => {
this.dht.swarm.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}
Expand All @@ -159,12 +159,12 @@ class Network {
*/
sendMessage (to, msg, callback) {
if (!this.isConnected) {
return callback(new Error('Network is offline'))
return setImmediate(() => callback(new Error('Network is offline')))
}

this._log('sending to: %s', to.toB58String())

this.dht.libp2p.dial(to, c.PROTOCOL_DHT, (err, conn) => {
this.dht.swarm.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}
Expand Down
8 changes: 4 additions & 4 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ module.exports = (dht) => ({
// 5. check validity

// 5. if: we are the author, all good
if (record.author.isEqual(dht.self.id)) {
if (record.author.isEqual(dht.peerInfo.id)) {
return callback(null, record)
}

Expand Down Expand Up @@ -226,7 +226,7 @@ module.exports = (dht) => ({
* @private
*/
_isSelf (other) {
return other && dht.self.id.id.equals(other.id)
return other && dht.peerInfo.id.id.equals(other.id)
},
/**
* Ask peer `peer` if they know where the peer with id `target` is.
Expand Down Expand Up @@ -308,7 +308,7 @@ module.exports = (dht) => ({

// Send out correction record
waterfall([
(cb) => utils.createPutRecord(key, best, dht.self.id, true, cb),
(cb) => utils.createPutRecord(key, best, dht.peerInfo.id, true, cb),
(fixupRec, cb) => each(vals, (v, cb) => {
// no need to do anything
if (v.val.equals(best)) {
Expand Down Expand Up @@ -523,7 +523,7 @@ module.exports = (dht) => ({
(cb) => dht._findProvidersSingle(peer, key, cb),
(msg, cb) => {
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.self.id.toB58String(), provs.length)
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)

provs.forEach((prov) => {
out.push(dht.peerBook.put(prov))
Expand Down
2 changes: 1 addition & 1 deletion src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Query {
this.key = key
this.query = query
this.concurrency = c.ALPHA
this._log = utils.logger(this.dht.self.id, 'query:' + key.toString())
this._log = utils.logger(this.dht.peerInfo.id, 'query:' + key.toString())
}

/**
Expand Down
Loading