Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: no more circular dependency, become a good block of libp2p (#13)
Browse files Browse the repository at this point in the history
* fix: no more circular dependency, become a good block of libp2p

* there were more tests :) fix remaining things, test with new swarm ✔️

* apply cr

* fix docs

* apply cr
  • Loading branch information
daviddias authored Jul 17, 2017
1 parent 9c2e022 commit 810be4d
Show file tree
Hide file tree
Showing 30 changed files with 329 additions and 462 deletions.
13 changes: 3 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"protocol-buffers": "^3.2.1",
"pull-length-prefixed": "^1.3.0",
"pull-stream": "^3.6.0",
"safe-buffer": "^5.1.1",
"varint": "^5.0.0",
"xor-distance": "^1.0.0"
},
Expand All @@ -67,17 +68,9 @@
"datastore-level": "^0.4.2",
"dirty-chai": "^2.0.1",
"interface-connection": "^0.3.2",
"left-pad": "^1.1.3",
"libp2p": "^0.10.1",
"libp2p-mdns": "^0.7.1",
"libp2p-multiplex": "^0.4.4",
"libp2p-railing": "^0.5.2",
"libp2p-secio": "^0.6.8",
"libp2p-spdy": "^0.10.6",
"libp2p-swarm": "^0.29.2",
"libp2p-swarm": "^0.30.0",
"libp2p-tcp": "^0.10.1",
"libp2p-webrtc-star": "^0.11.0",
"libp2p-websockets": "^0.10.0",
"lodash": "^4.17.4",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
Expand All @@ -90,4 +83,4 @@
"Friedel Ziegelmayer <dignifiedquire@gmail.com>",
"Pedro Teixeira <i@pgte.me>"
]
}
}
91 changes: 42 additions & 49 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const errors = require('./errors')
const privateApi = require('./private')
const Providers = require('./providers')
const Message = require('./message')
const assert = require('assert')

/**
* A DHT implementation modeled after Kademlia with Coral and S/Kademlia modifications.
Expand All @@ -28,70 +29,74 @@ class KadDHT {
/**
* Create a new KadDHT.
*
* @param {Libp2p} libp2p
* @param {number} [kBucketSize=20]
* @param {Datastore} [datastore=MemoryDatastore]
* @param {swarm} Swarm
* @param {options} {kBucketSize=20, datastore=MemoryDatastore}
*/
constructor (libp2p, kBucketSize, datastore) {
constructor (swarm, options) {
assert(swarm, 'libp2p-kad-dht requires a instance of swarmt a')
options = options || {}

/**
* Local reference to libp2p.
* Local reference to libp2p-swarm.
*
* @type {Libp2p}
* @type {Swarm}
*/
this.libp2p = libp2p
this.swarm = swarm

/**
* k-bucket size, defaults to 20.
*
* @type {number}
*/
this.kBucketSize = kBucketSize || 20
this.kBucketSize = options.kBucketSize || 20

/**
* Number of closest peers to return on kBucket search
* Number of closest peers to return on kBucket search, default 6
*
* @type {number}
*/
this.ncp = 6
this.ncp = options.ncp || 6

/**
* The routing table.
*
* @type {RoutingTable}
*/
this.routingTable = new RoutingTable(this.self.id, this.kBucketSize)
this.routingTable = new RoutingTable(this.peerInfo.id, this.kBucketSize)

/**
* Reference to the datastore, uses an in-memory store if none given.
*
* @type {Datastore}
*/
this.datastore = datastore || new MemoryStore()
this.datastore = options.datastore || new MemoryStore()

/**
* Provider management
*
* @type {Providers}
*/
this.providers = new Providers(this.datastore, this.self.id)

this.validators = {
pk: libp2pRecord.validator.validators.pk
}
this.providers = new Providers(this.datastore, this.peerInfo.id)

this.selectors = {
pk: libp2pRecord.selection.selectors.pk
}
this.validators = { pk: libp2pRecord.validator.validators.pk }
this.selectors = { pk: libp2pRecord.selection.selectors.pk }

this.network = new Network(this, this.libp2p)
this.network = new Network(this)

this._log = utils.logger(this.self.id)
this._log = utils.logger(this.peerInfo.id)

// Inject private apis so we don't clutter up this file
const pa = privateApi(this)
Object.keys(pa).forEach((name) => {
this[name] = pa[name]
})
Object.keys(pa).forEach((name) => { this[name] = pa[name] })
}

/**
* Is this DHT running.
*
* @type {bool}
*/
get isStarted () {
return this._running
}

/**
Expand All @@ -118,29 +123,17 @@ class KadDHT {
this.network.stop(callback)
}

/**
* Alias to the peerbook from libp2p
*/
get peerBook () {
return this.libp2p.peerBook
}

/**
* Is this DHT running.
*
* @type {bool}
*/
get isRunning () {
return this._running
}

/**
* Local peer (yourself)
*
* @type {PeerInfo}
*/
get self () {
return this.libp2p.peerInfo
get peerInfo () {
return this.swarm._peerInfo
}

get peerBook () {
return this.swarm._peerBook
}

/**
Expand Down Expand Up @@ -205,7 +198,7 @@ class KadDHT {
}

waterfall([
(cb) => utils.createPutRecord(key, value, this.self.id, sign, cb),
(cb) => utils.createPutRecord(key, value, this.peerInfo.id, sign, cb),
(rec, cb) => waterfall([
(cb) => this._putLocal(key, rec, cb),
(cb) => this.getClosestPeers(key, cb),
Expand Down Expand Up @@ -266,7 +259,7 @@ class KadDHT {
if (err == null) {
vals.push({
val: localRec.value,
from: this.self.id
from: this.peerInfo.id
})
}

Expand Down Expand Up @@ -342,7 +335,7 @@ class KadDHT {
// local check
let info
if (this.peerBook.has(peer)) {
info = this.libp2p.peerBook.get(peer)
info = this.peerBook.get(peer)

if (info && info.id.pubKey) {
this._log('getPublicKey: found local copy')
Expand All @@ -355,7 +348,7 @@ class KadDHT {
this._getPublicKeyFromNode(peer, (err, pk) => {
if (!err) {
info.id = new PeerId(peer.id, null, pk)
this.libp2p.peerBook.put(info)
this.peerBook.put(info)

return callback(null, pk)
}
Expand All @@ -369,7 +362,7 @@ class KadDHT {

const pk = crypto.unmarshalPublicKey(value)
info.id = new PeerId(peer, null, pk)
this.libp2p.peerBook.put(info)
this.peerBook.put(info)

callback(null, pk)
})
Expand All @@ -389,7 +382,7 @@ class KadDHT {
this._log('provide: %s', key.toBaseEncodedString())

waterfall([
(cb) => this.providers.addProvider(key, this.self.id, cb),
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb),
(cb) => this.getClosestPeers(key.buffer, cb),
(peers, cb) => {
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
Expand Down
44 changes: 22 additions & 22 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ class Network {
/**
* Create a new network.
*
* @param {DHT} dht
* @param {Libp2p} libp2p
* @param {KadDHT} self
*/
constructor (dht, libp2p) {
this.dht = dht
this.libp2p = libp2p
constructor (self) {
this.dht = self
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT
this._log = utils.logger(this.dht.self.id, 'net')
this._log = utils.logger(this.dht.peerInfo.id, 'net')
this._rpc = rpc(this.dht)
this._onPeerConnected = this._onPeerConnected.bind(this)
this._online = false
this._running = false
}

/**
Expand All @@ -43,17 +41,18 @@ class Network {
return cb(new Error('Network is already running'))
}

if (!this.dht.isRunning || !this.dht.libp2p.isStarted()) {
// TODO add a way to check if swarm has started or not
if (!this.dht.isStarted) {
return cb(new Error('Can not start network'))
}

this._online = true
this._running = true

// handle incoming connections
this.libp2p.swarm.handle(c.PROTOCOL_DHT, this._rpc)
this.dht.swarm.handle(c.PROTOCOL_DHT, this._rpc)

// handle new connections
this.libp2p.on('peer:connect', this._onPeerConnected)
this.dht.swarm.on('peer-mux-established', this._onPeerConnected)

cb()
}
Expand All @@ -67,13 +66,13 @@ class Network {
stop (callback) {
const cb = (err) => setImmediate(() => callback(err))

if (!this.isOnline) {
if (!this.dht.isStarted && !this.isStarted) {
return cb(new Error('Network is already stopped'))
}
this._online = false
this.libp2p.removeListener('peer:connect', this._onPeerConnected)
this._running = false
this.dht.swarm.removeListener('peer-mux-established', this._onPeerConnected)

this.libp2p.swarm.unhandle(c.PROTOCOL_DHT)
this.dht.swarm.unhandle(c.PROTOCOL_DHT)
cb()
}

Expand All @@ -82,8 +81,8 @@ class Network {
*
* @type {bool}
*/
get isOnline () {
return this._online
get isStarted () {
return this._running
}

/**
Expand All @@ -92,7 +91,8 @@ class Network {
* @type {bool}
*/
get isConnected () {
return this.dht.libp2p.isStarted() && this.dht.isRunning && this.isOnline
// TODO add a way to check if swarm has started or not
return this.dht.isStarted && this.isStarted
}

/**
Expand All @@ -107,7 +107,7 @@ class Network {
return this._log.error('Network is offline')
}

this.libp2p.dial(peer, c.PROTOCOL_DHT, (err, conn) => {
this.dht.swarm.dial(peer, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return this._log('%s does not support protocol: %s', peer.id.toB58String(), c.PROTOCOL_DHT)
}
Expand Down Expand Up @@ -140,7 +140,7 @@ class Network {
}

this._log('sending to: %s', to.toB58String())
this.dht.libp2p.dial(to, c.PROTOCOL_DHT, (err, conn) => {
this.dht.swarm.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}
Expand All @@ -159,12 +159,12 @@ class Network {
*/
sendMessage (to, msg, callback) {
if (!this.isConnected) {
return callback(new Error('Network is offline'))
return setImmediate(() => callback(new Error('Network is offline')))
}

this._log('sending to: %s', to.toB58String())

this.dht.libp2p.dial(to, c.PROTOCOL_DHT, (err, conn) => {
this.dht.swarm.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}
Expand Down
8 changes: 4 additions & 4 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ module.exports = (dht) => ({
// 5. check validity

// 5. if: we are the author, all good
if (record.author.isEqual(dht.self.id)) {
if (record.author.isEqual(dht.peerInfo.id)) {
return callback(null, record)
}

Expand Down Expand Up @@ -226,7 +226,7 @@ module.exports = (dht) => ({
* @private
*/
_isSelf (other) {
return other && dht.self.id.id.equals(other.id)
return other && dht.peerInfo.id.id.equals(other.id)
},
/**
* Ask peer `peer` if they know where the peer with id `target` is.
Expand Down Expand Up @@ -308,7 +308,7 @@ module.exports = (dht) => ({

// Send out correction record
waterfall([
(cb) => utils.createPutRecord(key, best, dht.self.id, true, cb),
(cb) => utils.createPutRecord(key, best, dht.peerInfo.id, true, cb),
(fixupRec, cb) => each(vals, (v, cb) => {
// no need to do anything
if (v.val.equals(best)) {
Expand Down Expand Up @@ -523,7 +523,7 @@ module.exports = (dht) => ({
(cb) => dht._findProvidersSingle(peer, key, cb),
(msg, cb) => {
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.self.id.toB58String(), provs.length)
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)

provs.forEach((prov) => {
out.push(dht.peerBook.put(prov))
Expand Down
2 changes: 1 addition & 1 deletion src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Query {
this.key = key
this.query = query
this.concurrency = c.ALPHA
this._log = utils.logger(this.dht.self.id, 'query:' + key.toString())
this._log = utils.logger(this.dht.peerInfo.id, 'query:' + key.toString())
}

/**
Expand Down
Loading

0 comments on commit 810be4d

Please sign in to comment.