Skip to content

Commit

Permalink
fix: update interfaces
Browse files Browse the repository at this point in the history
Updates to latest code from libp2p/js-libp2p-interfaces#180
  • Loading branch information
achingbrain committed Mar 15, 2022
1 parent 09b773b commit bfa7886
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 28 deletions.
18 changes: 9 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,23 @@
"release": "semantic-release"
},
"dependencies": {
"@libp2p/logger": "^1.0.4",
"@multiformats/multiaddr": "^10.1.5",
"it-drain": "^1.0.3",
"@libp2p/interfaces": "^1.3.14",
"@libp2p/logger": "^1.1.2",
"@libp2p/peer-id": "^1.1.8",
"@multiformats/multiaddr": "^10.1.7",
"err-code": "^3.0.1",
"it-drain": "^1.0.5",
"p-defer": "^4.0.0",
"p-queue": "^7.2.0",
"peer-id": "^0.16.0"
"p-queue": "^7.2.0"
},
"devDependencies": {
"@libp2p/interfaces": "^1.3.11",
"@libp2p/peer-id": "^1.1.5",
"aegir": "^36.1.3",
"go-ipfs": "^0.12.0",
"ipfs-core-types": "^0.10.1",
"ipfs-http-client": "^56.0.1",
"ipfsd-ctl": "^10.0.6",
"it-all": "^1.0.0",
"it-drain": "^1.0.0",
"it-all": "^1.0.6",
"it-drain": "^1.0.5",
"uint8arrays": "^3.0.0",
"wherearewe": "^1.0.0"
},
Expand Down
63 changes: 60 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ import { logger } from '@libp2p/logger'
import drain from 'it-drain'
import PQueue from 'p-queue'
import defer from 'p-defer'
import { peerIdFromString } from '@libp2p/peer-id'
import { Multiaddr } from '@multiformats/multiaddr'
import errCode from 'err-code'
import type { IPFSHTTPClient, CID } from 'ipfs-http-client'
import type { HTTPClientExtraOptions } from 'ipfs-http-client/types/src/types'
import type { AbortOptions } from 'ipfs-core-types/src/utils'
import type { ContentRouting } from '@libp2p/interfaces/content-routing'
import type { PeerData } from '@libp2p/interfaces/peer-data'

const log = logger('libp2p-delegated-content-routing')

Expand All @@ -15,7 +20,7 @@ const CONCURRENT_HTTP_REFS_REQUESTS = 2
/**
* An implementation of content routing, using a delegated peer
*/
export class DelegatedContentRouting {
export class DelegatedContentRouting implements ContentRouting {
private readonly client: IPFSHTTPClient
private readonly httpQueue: PQueue
private readonly httpQueueRefs: PQueue
Expand Down Expand Up @@ -70,9 +75,21 @@ export class DelegatedContentRouting {
try {
await onStart.promise

yield * this.client.dht.findProvs(key, {
for await (const event of this.client.dht.findProvs(key, {
timeout: options.timeout
})
})) {
if (event.name === 'PROVIDER') {
yield * event.providers.map(prov => {
const peerData: PeerData = {
id: peerIdFromString(prov.id),
protocols: [],
multiaddrs: prov.multiaddrs.map(m => new Multiaddr(m.toString()))
}

return peerData
})
}
}
} catch (err) {
log.error('findProviders errored:', err)
throw err
Expand Down Expand Up @@ -102,4 +119,44 @@ export class DelegatedContentRouting {
})
log('provide finished: %c', key)
}

/**
* Stores a value in the backing key/value store of the delegated content router.
* This may fail if the delegated node's content routing implementation does not
* use a key/value store, or if the delegated operation fails.
*/
async put (key: Uint8Array, value :Uint8Array, options: HTTPClientExtraOptions & AbortOptions = {}) {
const timeout = options.timeout || 3000
log(`put value start: ${key}`)

await this.httpQueue.add(async () => {
await drain(this.client.dht.put(key, value, { timeout }))
})

log(`put value finished: ${key}`)
}

/**
* Fetches an value from the backing key/value store of the delegated content router.
* This may fail if the delegated node's content routing implementation does not
* use a key/value store, or if the delegated operation fails.
*/
async get (key: Uint8Array, options: HTTPClientExtraOptions & AbortOptions = {}) {
const timeout = options.timeout || 3000
log(`get value start: ${key}`)

return await this.httpQueue.add(async () => {
for await (const event of this.client.dht.get(key, { timeout })) {
if (event.name === 'VALUE') {
log(`get value finished: ${key}`)
return {
from: peerIdFromString(event.from),
val: event.value
}
}
}

throw errCode(new Error('Not found'), 'ERR_NOT_FOUND')
})
}
}
18 changes: 2 additions & 16 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,7 @@ describe('DelegatedContentRouting', function () {
host: opts.host
}))

const events = await all(routing.findProviders(cid))
const providers: PeerData[] = []

for (const event of events) {
if (event.name === 'PEER_RESPONSE') {
providers.push(...event.providers)
}
}
const providers = await all(routing.findProviders(cid))

// We should get the bootstrap node as provider
// The delegate node is not included, because it is handling the requests
Expand All @@ -147,14 +140,7 @@ describe('DelegatedContentRouting', function () {
host: opts.host
}))

const events = await all(routing.findProviders(cid, { timeout: 5e3 }))
const providers: PeerData[] = []

for (const event of events) {
if (event.name === 'PEER_RESPONSE') {
providers.push(...event.providers)
}
}
const providers = await all(routing.findProviders(cid, { timeout: 5e3 }))

expect(providers.map((p) => p.id.toString())).to.include(bootstrapId.id, 'Did not include bootstrap node')
})
Expand Down

0 comments on commit bfa7886

Please sign in to comment.