Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: actually send the add provider rpc with addresses #201

Merged
merged 1 commit into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"it-pair": "^1.0.0",
"libp2p": "^0.28.0",
"libp2p": "^0.28.5",
"lodash": "^4.17.11",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
Expand Down
4 changes: 3 additions & 1 deletion src/content-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ module.exports = (dht) => {
// Add peer as provider
await dht.providers.addProvider(key, dht.peerId)

const multiaddrs = dht.libp2p ? dht.libp2p.multiaddrs : []
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
msg.providerPeers = [{
id: dht.peerId
id: dht.peerId,
multiaddrs
}]

// Notify closest peers
Expand Down
8 changes: 8 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class KadDHT extends EventEmitter {
/**
* Create a new KadDHT.
* @param {Object} props
* @param {Libp2p} [props.libp2p] the libp2p instance
* @param {Dialer} props.dialer libp2p dialer instance
* @param {PeerId} props.peerId peer's peerId
* @param {PeerStore} props.peerStore libp2p peerStore
Expand All @@ -54,6 +55,7 @@ class KadDHT extends EventEmitter {
* @param {randomWalkOptions} options.randomWalk randomWalk options
*/
constructor ({
libp2p,
dialer,
peerId,
peerStore,
Expand All @@ -72,6 +74,12 @@ class KadDHT extends EventEmitter {
throw new Error('libp2p-kad-dht requires an instance of Dialer')
}

/**
* Local reference to the libp2p instance. May be undefined.
* @type {Libp2p}
*/
this.libp2p = libp2p

/**
* Local reference to the libp2p dialer instance
* @type {Dialer}
Expand Down
4 changes: 3 additions & 1 deletion src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const errcode = require('err-code')
const pipe = require('it-pipe')
const lp = require('it-length-prefixed')
const pTimeout = require('p-timeout')
const { consume } = require('streaming-iterables')

const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

Expand Down Expand Up @@ -186,7 +187,8 @@ class Network {
return pipe(
[msg],
lp.encode(),
stream
stream,
consume
)
}
}
Expand Down
57 changes: 57 additions & 0 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,63 @@ describe('KadDHT', () => {
return tdht.teardown()
})

it('find providers from client', async function () {
this.timeout(20 * 1000)

const val = values[0]
const tdht = new TestDHT()
const dhts = await tdht.spawn(2)
const [clientDHT] = await tdht.spawn(1, { clientMode: true })

// Connect
await Promise.all([
tdht.connect(clientDHT, dhts[0]),
tdht.connect(dhts[0], dhts[1])
])

await Promise.all(dhts.map((dht) => dht.provide(val.cid)))

const res0 = await all(clientDHT.findProviders(val.cid))
const res1 = await all(clientDHT.findProviders(val.cid, { maxNumProviders: 1 }))

// find providers find all the 2 providers
expect(res0).to.exist()
expect(res0).to.have.length(2)

// find providers limited to a maxium of 1 providers
expect(res1).to.exist()
expect(res1).to.have.length(1)

return tdht.teardown()
})

it('find client provider', async function () {
this.timeout(20 * 1000)

const val = values[0]
const tdht = new TestDHT()
const dhts = await tdht.spawn(2)
const [clientDHT] = await tdht.spawn(1, { clientMode: true })

// Connect
await Promise.all([
tdht.connect(clientDHT, dhts[0]),
tdht.connect(dhts[0], dhts[1])
])

await clientDHT.provide(val.cid)

await delay(1e3)

const res = await all(dhts[1].findProviders(val.cid))

// find providers find the client provider
expect(res).to.exist()
expect(res).to.have.length(1)

return tdht.teardown()
})

it('find one provider locally', async function () {
this.timeout(20 * 1000)
const val = values[0]
Expand Down
63 changes: 41 additions & 22 deletions test/utils/test-dht.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const PeerStore = require('libp2p/src/peer-store')
const pRetry = require('p-retry')
const delay = require('delay')
const multiaddr = require('multiaddr')

const KadDHT = require('../../src')
const { PROTOCOL_DHT } = require('../../src/constants')
Expand Down Expand Up @@ -39,7 +40,7 @@ class TestDHT {

const [peerId] = await createPeerId(1)

const connectToPeer = async (peer) => {
const connectToPeer = (localDHT, peer) => {
const remotePeerB58 = peer.toB58String()
const remoteDht = this.nodes.find(
(node) => node.peerId.toB58String() === remotePeerB58
Expand All @@ -52,27 +53,35 @@ class TestDHT {

// Notice peers of connection
const [c0, c1] = ConnectionPair()
await localOnConnect(remoteDht.peerId, c1)
await remoteOnConnect(peerId, c0)

await remoteHandler({
protocol: PROTOCOL_DHT,
stream: c0.stream,
connection: {
remotePeer: peerId
}
})

return {
newStream: () => {
newStream: async () => {
if (remoteDht._clientMode) {
throw new Error('unsupported protocol')
}

// Trigger on connect for servers connecting
if (!remoteDht._clientMode) await localOnConnect(remoteDht.peerId, c1)
if (!localDHT._clientMode) await remoteOnConnect(peerId, c0)

await remoteHandler({
protocol: PROTOCOL_DHT,
stream: c0.stream,
connection: {
remotePeer: peerId
}
})
return { stream: c1.stream }
}
}
}

const dht = new KadDHT({
libp2p: {
multiaddrs: [multiaddr('/ip4/0.0.0.0/tcp/4002')]
},
dialer: {
connectToPeer
connectToPeer: (peer) => connectToPeer(dht, peer)
},
registrar: createMockRegistrar(regRecord),
peerStore,
Expand Down Expand Up @@ -112,12 +121,13 @@ class TestDHT {

const [c0, c1] = ConnectionPair()

// Notice peers of connection
await onConnectA(dhtB.peerId, c0)
await onConnectB(dhtA.peerId, c1)
const routingTableChecks = []

return Promise.all([
pRetry(async () => {
// Notice peers of connection
if (!dhtB._clientMode) {
// B is a server, trigger connect events on A
await onConnectA(dhtB.peerId, c0)
routingTableChecks.push(async () => {
const match = await dhtA.routingTable.find(dhtB.peerId)

if (!match) {
Expand All @@ -126,8 +136,12 @@ class TestDHT {
}

return match
}, { retries: 50 }),
pRetry(async () => {
})
}
if (!dhtA._clientMode) {
// A is a server, trigger connect events on B
await onConnectB(dhtA.peerId, c1)
routingTableChecks.push(async () => {
const match = await dhtB.routingTable.find(dhtA.peerId)

if (!match) {
Expand All @@ -136,8 +150,13 @@ class TestDHT {
}

return match
}, { retries: 50 })
])
})
}

// Check routing tables
return Promise.all(routingTableChecks.map(check => {
pRetry(check, { retries: 50 })
}))
}

async teardown () {
Expand Down