Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: swap ipfs-http-client for kubo-rpc-client #145

Merged
merged 3 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createServer } from 'ipfsd-ctl'
import * as ipfsHttpModule from 'ipfs-http-client'
import * as ipfsHttpModule from 'kubo-rpc-client'
import goIpfsModule from 'go-ipfs'

let server
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
node_modules
build
dist
.docs
.coverage
node_modules
package-lock.json
yarn.lock
.vscode
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ Requires access to `/api/v0/dht/findprovs` and `/api/v0/refs` HTTP API endpoints

## Requirements

`@libp2p/delegated-content-routing` leverages the `ipfs-http-client` library and requires an instance of it as a constructor argument.
`@libp2p/delegated-content-routing` leverages the `kubo-rpc-client` library and requires an instance of it as a constructor argument.

```sh
npm install ipfs-http-client @libp2p/delegated-content-routing
npm install kubo-rpc-client @libp2p/delegated-content-routing
```

## Example

```js
import { createLibp2p } from 'libp2p'
import { delegatedContentRouting } from '@libp2p/delegated-content-routing'
import { create as createIpfsHttpClient } from 'ipfs-http-client')
import { create as createIpfsHttpClient } from 'kubo-rpc-client'

// default is to use ipfs.io
const client = createIpfsHttpClient({
Expand Down
10 changes: 3 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
"bugs": {
"url": "https://github.com/libp2p/js-libp2p-delegated-content-routing/issues"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
},
"type": "module",
"types": "./dist/src/index.d.ts",
"files": [
Expand Down Expand Up @@ -141,7 +137,7 @@
"@libp2p/interface-peer-info": "^1.0.3",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.1",
"any-signal": "^3.0.1",
"any-signal": "^4.1.1",
"err-code": "^3.0.1",
"it-drain": "^3.0.2",
"multiformats": "^11.0.0",
Expand All @@ -151,11 +147,11 @@
"devDependencies": {
"@libp2p/peer-id": "^2.0.0",
"aegir": "^39.0.9",
"go-ipfs": "^0.17.0",
"go-ipfs": "^0.20.0",
"ipfs-core-types": "^0.14.0",
"ipfs-http-client": "^60.0.0",
"ipfsd-ctl": "^13.0.0",
"it-all": "^3.0.2",
"kubo-rpc-client": "^3.0.1",
"uint8arrays": "^4.0.2",
"wherearewe": "^2.0.1"
},
Expand Down
98 changes: 77 additions & 21 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { logger } from '@libp2p/logger'
import anySignal from 'any-signal'
import { anySignal } from 'any-signal'
import errCode from 'err-code'
import drain from 'it-drain'
import defer from 'p-defer'
Expand Down Expand Up @@ -197,8 +197,8 @@ class DelegatedContentRouting implements ContentRouting, Startable {
async * findProviders (key: CID, options: HTTPClientExtraOptions & AbortOptions = {}): AsyncIterable<PeerInfo> {
log('findProviders starts: %c', key)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))

const signal = anySignal([this.abortController.signal, options.signal])
const onStart = defer()
const onFinish = defer()

Expand All @@ -210,7 +210,10 @@ class DelegatedContentRouting implements ContentRouting, Startable {
try {
await onStart.promise

for await (const event of this.client.dht.findProvs(key, options)) {
for await (const event of this.client.dht.findProvs(key, {
...options,
signal
})) {
if (event.name === 'PROVIDER') {
yield * event.providers.map(prov => {
const peerInfo: PeerInfo = {
Expand All @@ -227,6 +230,7 @@ class DelegatedContentRouting implements ContentRouting, Startable {
log.error('findProviders errored:', err)
throw err
} finally {
signal.clear()
onFinish.resolve()
log('findProviders finished: %c', key)
}
Expand All @@ -247,13 +251,34 @@ class DelegatedContentRouting implements ContentRouting, Startable {
async provide (key: CID, options: HTTPClientExtraOptions & AbortOptions = {}): Promise<void> {
log('provide starts: %c', key)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))
const signal = anySignal([this.abortController.signal, options.signal])
const onStart = defer()
const onFinish = defer()

await this.httpQueueRefs.add(async () => {
await this.client.block.stat(key, options)
await drain(this.client.dht.provide(key, options))
void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})
log('provide finished: %c', key)

try {
await onStart.promise

await this.client.block.stat(key, {
...options,
signal
})
await drain(this.client.dht.provide(key, {
...options,
signal
}))
} catch (err) {
log.error('provide errored:', err)
throw err
} finally {
signal.clear()
onFinish.resolve()
log('provide finished: %c', key)
}
}

/**
Expand All @@ -264,13 +289,30 @@ class DelegatedContentRouting implements ContentRouting, Startable {
async put (key: Uint8Array, value: Uint8Array, options: HTTPClientExtraOptions & AbortOptions = {}): Promise<void> {
log('put value start: %b', key)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))
const signal = anySignal([this.abortController.signal, options.signal])
const onStart = defer()
const onFinish = defer()

await this.httpQueue.add(async () => {
await drain(this.client.dht.put(key, value, options))
void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})

log('put value finished: %b', key)
try {
await onStart.promise

await drain(this.client.dht.put(key, value, {
...options,
signal
}))
} catch (err) {
log.error('put errored:', err)
throw err
} finally {
signal.clear()
onFinish.resolve()
log('put finished: %b', key)
}
}

/**
Expand All @@ -281,23 +323,37 @@ class DelegatedContentRouting implements ContentRouting, Startable {
async get (key: Uint8Array, options: HTTPClientExtraOptions & AbortOptions = {}): Promise<Uint8Array> {
log('get value start: %b', key)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))

const value = await this.httpQueue.add(async () => {
for await (const event of this.client.dht.get(key, options)) {
const signal = anySignal([this.abortController.signal, options.signal])
const onStart = defer()
const onFinish = defer()

void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})

try {
await onStart.promise

for await (const event of this.client.dht.get(key, {
...options,
signal
})) {
if (event.name === 'VALUE') {
log('get value finished: %b', key)
return event.value
}
}

throw errCode(new Error('Not found'), 'ERR_NOT_FOUND')
})

if (value === undefined) {
throw errCode(new Error('Not found'), 'ERR_NOT_FOUND')
} else {
return value
} catch (err) {
log.error('put errored:', err)
throw err
} finally {
signal.clear()
onFinish.resolve()
log('put finished: %b', key)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { peerIdFromString } from '@libp2p/peer-id'
import { expect } from 'aegir/chai'
// @ts-expect-error no types
import goIpfs from 'go-ipfs'
import { create, type Options, CID as IPFSCID } from 'ipfs-http-client'
import { type Controller, createFactory } from 'ipfsd-ctl'
import all from 'it-all'
import drain from 'it-drain'
import { create, type Options, CID as IPFSCID } from 'kubo-rpc-client'
import { CID } from 'multiformats/cid'
import pDefer from 'p-defer'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand Down