Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: dht client (#3947)
Browse files Browse the repository at this point in the history
* Enables `libp2p-kad-dht` in client mode
* Updates types with new DHT events

BREAKING CHANGE: The DHT API has been refactored to return async iterators of query events
  • Loading branch information
achingbrain authored Dec 3, 2021
1 parent c272bfb commit 62d8ecb
Show file tree
Hide file tree
Showing 64 changed files with 1,300 additions and 668 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "JavaScript implementation of the IPFS specification",
"scripts": {
"link": "lerna link",
"reset": "lerna run clean && rimraf packages/*/node_modules node_modules",
"reset": "lerna run clean && rimraf packages/*/node_modules node_modules package-lock.json packages/*/package-lock.json",
"test": "lerna run test",
"test:node": "lerna run test:node",
"test:browser": "lerna run test:browser",
Expand Down
10 changes: 5 additions & 5 deletions packages/interface-ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@
"ipfs-unixfs": "^6.0.3",
"ipfs-unixfs-importer": "^9.0.3",
"ipfs-utils": "^9.0.2",
"ipns": "^0.15.0",
"ipns": "^0.16.0",
"is-ipfs": "^6.0.1",
"iso-random-stream": "^2.0.0",
"iso-random-stream": "^2.0.2",
"it-all": "^1.0.4",
"it-buffer-stream": "^2.0.0",
"it-concat": "^2.0.0",
Expand All @@ -90,7 +90,7 @@
"it-pushable": "^1.4.2",
"it-tar": "^4.0.0",
"it-to-buffer": "^2.0.0",
"libp2p-crypto": "^0.19.7",
"libp2p-crypto": "^0.21.0",
"libp2p-websockets": "^0.16.2",
"multiaddr": "^10.0.0",
"multiformats": "^9.4.13",
Expand All @@ -99,9 +99,9 @@
"p-map": "^4.0.0",
"p-retry": "^4.5.0",
"pako": "^1.0.2",
"peer-id": "^0.15.1",
"peer-id": "^0.16.0",
"readable-stream": "^3.4.0",
"sinon": "^11.1.1",
"sinon": "^12.0.01",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
Expand Down
7 changes: 4 additions & 3 deletions packages/interface-ipfs-core/src/dht/disabled.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import all from 'it-all'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand Down Expand Up @@ -42,8 +42,9 @@ export function testDisabled (factory, options) {
after(() => factory.clean())

it('should error when DHT not available', async () => {
await expect(nodeA.dht.get(uint8ArrayFromString('/ipns/Qme6KJdKcp85TYbLxuLV7oQzMiLremD7HMoXLZEmgo6Rnh')))
.to.eventually.be.rejected()
const events = await all(nodeA.dht.get('/ipns/12D3KooWQMSMXmsBvs5YDEQ6tXsaFv9tjuzmDmEvusaiQSFdrJdN'))

expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty()
})
})
}
36 changes: 24 additions & 12 deletions packages/interface-ipfs-core/src/dht/find-peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import testTimeout from '../utils/test-timeout.js'
import drain from 'it-drain'
import all from 'it-all'
import { ensureReachable } from './utils.js'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand All @@ -23,41 +26,50 @@ export function testFindPeer (factory, options) {
let nodeA
/** @type {import('ipfs-core-types').IPFS} */
let nodeB
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeBId

before(async () => {
nodeA = (await factory.spawn()).api
nodeB = (await factory.spawn()).api
nodeBId = await nodeB.id()

await nodeA.swarm.connect(nodeBId.addresses[0])
await ensureReachable(nodeA, nodeB)
})

after(() => factory.clean())

it('should respect timeout option when finding a peer on the DHT', async () => {
const nodeBId = await nodeB.id()

await testTimeout(() => nodeA.dht.findPeer(nodeBId.id, {
await testTimeout(() => drain(nodeA.dht.findPeer(nodeBId.id, {
timeout: 1
}))
})))
})

it('should find other peers', async () => {
const nodeBId = await nodeB.id()
const res = await nodeA.dht.findPeer(nodeBId.id)
const id = res.id.toString()

const results = await all(nodeA.dht.findPeer(nodeBId.id))
const finalPeer = results.filter(event => event.name === 'FINAL_PEER').pop()

if (!finalPeer || finalPeer.name !== 'FINAL_PEER') {
throw new Error('No finalPeer event received')
}

const id = finalPeer.peer.id
const nodeAddresses = nodeBId.addresses.map((addr) => addr.nodeAddress())
const peerAddresses = res.addrs.map(ma => ma.nodeAddress())
const peerAddresses = finalPeer.peer.multiaddrs.map(ma => ma.nodeAddress())

expect(id).to.be.eql(nodeBId.id)
expect(id).to.equal(nodeBId.id)
expect(peerAddresses).to.deep.include(nodeAddresses[0])
})

it('should fail to find other peer if peer does not exist', () => {
return expect(nodeA.dht.findPeer('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ')).to.eventually.be.rejected()
it('should fail to find other peer if peer does not exist', async () => {
const events = await all(nodeA.dht.findPeer('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ'))

// no finalPeer events found
expect(events.filter(event => event.name === 'FINAL_PEER')).to.be.empty()

// queryError events found
expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty()
})
})
}
60 changes: 14 additions & 46 deletions packages/interface-ipfs-core/src/dht/find-provs.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import all from 'it-all'
import drain from 'it-drain'
import { fakeCid } from './utils.js'
import testTimeout from '../utils/test-timeout.js'
import { ensureReachable } from './utils.js'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand All @@ -20,34 +20,22 @@ export function testFindProvs (factory, options) {
const it = getIt(options)

describe('.dht.findProvs', function () {
this.timeout(20000)
this.timeout(80 * 1000)

/** @type {import('ipfs-core-types').IPFS} */
let nodeA
/** @type {import('ipfs-core-types').IPFS} */
let nodeB
/** @type {import('ipfs-core-types').IPFS} */
let nodeC
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeAId
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeBId
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeCId

before(async () => {
nodeA = (await factory.spawn()).api
nodeB = (await factory.spawn()).api
nodeC = (await factory.spawn()).api

nodeAId = await nodeA.id()
nodeBId = await nodeB.id()
nodeCId = await nodeC.id()

await Promise.all([
nodeB.swarm.connect(nodeAId.addresses[0]),
nodeC.swarm.connect(nodeBId.addresses[0])
])
await ensureReachable(nodeB, nodeA)
await ensureReachable(nodeC, nodeB)
})

after(() => factory.clean())
Expand All @@ -57,8 +45,6 @@ export function testFindProvs (factory, options) {
*/
let providedCid
before('add providers for the same cid', async function () {
this.timeout(10 * 1000)

const cids = await Promise.all([
nodeB.object.new('unixfs-dir'),
nodeC.object.new('unixfs-dir')
Expand All @@ -79,38 +65,20 @@ export function testFindProvs (factory, options) {
})

it('should be able to find providers', async function () {
// @ts-ignore this is mocha
this.timeout(20 * 1000)

const provs = await all(nodeA.dht.findProvs(providedCid, { numProviders: 2 }))
const providerIds = provs.map((p) => p.id.toString())
/** @type {string[]} */
const providerIds = []

expect(providerIds).to.have.members([
nodeBId.id,
nodeCId.id
])
})

it('should take options to override timeout config', async function () {
const options = {
timeout: 1
for await (const event of nodeA.dht.findProvs(providedCid)) {
if (event.name === 'PROVIDER') {
providerIds.push(...event.providers.map(prov => prov.id))
}
}

const cidV0 = await fakeCid()
const start = Date.now()
let res

try {
res = await all(nodeA.dht.findProvs(cidV0, options))
} catch (/** @type {any} */ err) {
// rejected by http client
expect(err).to.have.property('name', 'TimeoutError')
return
}
const nodeBId = await nodeB.id()
const nodeCId = await nodeC.id()

// rejected by the server, errors don't work over http - https://github.com/ipfs/js-ipfs/issues/2519
expect(res).to.be.an('array').with.lengthOf(0)
expect(Date.now() - start).to.be.lessThan(100)
expect(providerIds).to.include(nodeBId.id)
expect(providerIds).to.include(nodeCId.id)
})
})
}
37 changes: 24 additions & 13 deletions packages/interface-ipfs-core/src/dht/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import testTimeout from '../utils/test-timeout.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import drain from 'it-drain'
import all from 'it-all'
import { ensureReachable } from './utils.js'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand All @@ -19,19 +21,18 @@ export function testGet (factory, options) {
const it = getIt(options)

describe('.dht.get', function () {
this.timeout(80 * 1000)

/** @type {import('ipfs-core-types').IPFS} */
let nodeA
/** @type {import('ipfs-core-types').IPFS} */
let nodeB
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeBId

before(async () => {
nodeA = (await factory.spawn()).api
nodeB = (await factory.spawn()).api
nodeBId = await nodeB.id()

await nodeA.swarm.connect(nodeBId.addresses[0])
await ensureReachable(nodeA, nodeB)
})

after(() => factory.clean())
Expand All @@ -40,23 +41,33 @@ export function testGet (factory, options) {
const data = await nodeA.add('should put a value to the DHT')
const publish = await nodeA.name.publish(data.cid)

await testTimeout(() => nodeB.dht.get(uint8ArrayFromString(`/ipns/${publish.name}`), {
await testTimeout(() => drain(nodeB.dht.get(`/ipns/${publish.name}`, {
timeout: 1
}))
})))
})

it('should error when getting a non-existent key from the DHT', () => {
return expect(nodeA.dht.get(uint8ArrayFromString('non-existing'), { timeout: 100 }))
.to.eventually.be.rejected
.and.be.an.instanceOf(Error)
it('should error when getting a non-existent key from the DHT', async () => {
const key = '/ipns/k51qzi5uqu5dl0dbfddy2wb42nvbc6anyxnkrguy5l0h0bv9kaih6j6vqdskqk'
const events = await all(nodeA.dht.get(key))

// no value events found
expect(events.filter(event => event.name === 'VALUE')).to.be.empty()

// queryError events found
expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty()
})

it('should get a value after it was put on another node', async () => {
const data = await nodeA.add('should put a value to the DHT')
const publish = await nodeA.name.publish(data.cid)
const record = await nodeA.dht.get(uint8ArrayFromString(`/ipns/${publish.name}`))
const events = await all(nodeA.dht.get(`/ipns/${publish.name}`))
const valueEvent = events.filter(event => event.name === 'VALUE').pop()

if (!valueEvent || valueEvent.name !== 'VALUE') {
throw new Error('Value event not found')
}

expect(uint8ArrayToString(record)).to.contain(data.cid.toString())
expect(uint8ArrayToString(valueEvent.value)).to.contain(data.cid.toString())
})
})
}
21 changes: 4 additions & 17 deletions packages/interface-ipfs-core/src/dht/provide.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { CID } from 'multiformats/cid'
import all from 'it-all'
import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import { ensureReachable } from './utils.js'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand All @@ -27,8 +28,8 @@ export function testProvide (factory, options) {
before(async () => {
ipfs = (await factory.spawn()).api
const nodeB = (await factory.spawn()).api
const nodeBId = await nodeB.id()
await ipfs.swarm.connect(nodeBId.addresses[0])

await ensureReachable(ipfs, nodeB)
})

after(() => factory.clean())
Expand All @@ -48,28 +49,14 @@ export function testProvide (factory, options) {
.that.include('not found locally')
})

it('should allow multiple CIDs to be passed', async () => {
const res = await all(ipfs.addAll([
{ content: uint8ArrayFromString('t0') },
{ content: uint8ArrayFromString('t1') }
]))

await all(ipfs.dht.provide(res.map(f => f.cid)))
})

it('should provide a CIDv1', async () => {
const res = await ipfs.add(uint8ArrayFromString('test'), { cidVersion: 1 })
await all(ipfs.dht.provide(res.cid))
})

it('should error on non CID arg', () => {
it('should error on non CID arg', async () => {
// @ts-expect-error invalid arg
return expect(all(ipfs.dht.provide({}))).to.eventually.be.rejected()
})

it('should error on array containing non CID arg', () => {
// @ts-expect-error invalid arg
return expect(all(ipfs.dht.provide([{}]))).to.eventually.be.rejected()
})
})
}
Loading

0 comments on commit 62d8ecb

Please sign in to comment.