Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
refactor: use async datastore (#140)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: The DHT now requires its datastore to have
a promise based api, instead of callbacks. Datastores that use
ipfs/interface-datastore@0.7 or later should be used.
https://github.com/ipfs/interface-datastore/releases/tag/v0.7.0
  • Loading branch information
jacobheun authored and vasco-santos committed Aug 16, 2019
1 parent 83ce93d commit daf9b00
Show file tree
Hide file tree
Showing 16 changed files with 111 additions and 112 deletions.
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"err-code": "^1.1.2",
"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "~0.6.0",
"interface-datastore": "~0.7.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "~0.16.1",
"libp2p-record": "~0.6.2",
Expand All @@ -64,14 +64,13 @@
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.2",
"pull-stream": "^3.6.9",
"pull-stream-to-async-iterator": "^1.0.1",
"varint": "^5.0.0",
"xor-distance": "^2.0.0"
},
"devDependencies": {
"aegir": "^18.2.1",
"aegir": "^20.0.0",
"chai": "^4.2.0",
"datastore-level": "~0.10.0",
"datastore-level": "~0.12.1",
"dirty-chai": "^2.0.1",
"interface-connection": "~0.3.3",
"libp2p-mplex": "~0.8.5",
Expand Down
14 changes: 7 additions & 7 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ module.exports = (dht) => ({
// Fetch value from ds
let rawRecord
try {
rawRecord = await promisify(cb => dht.datastore.get(dsKey, cb))()
rawRecord = await dht.datastore.get(dsKey)
} catch (err) {
if (err.code === 'ERR_NOT_FOUND') {
return undefined
Expand All @@ -114,7 +114,7 @@ module.exports = (dht) => ({
if (record.timeReceived == null ||
utils.now() - record.timeReceived > c.MAX_RECORD_AGE) {
// If record is bad delete it and return
await promisify(cb => dht.datastore.delete(dsKey, cb))()
await dht.datastore.delete(dsKey)
return undefined
}

Expand Down Expand Up @@ -214,7 +214,7 @@ module.exports = (dht) => ({
promiseToCallback(this._findPeerSingleAsync(peer, target))(callback)
},

async _findPeerSingleAsync (peer, target) {
async _findPeerSingleAsync (peer, target) { // eslint-disable-line require-await
dht._log('_findPeerSingle %s', peer.toB58String())
const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0)
return promisify(callback => dht.network.sendRequest(peer, msg, callback))()
Expand Down Expand Up @@ -262,7 +262,7 @@ module.exports = (dht) => ({
},

async _putLocalAsync (key, rec) {
await promisify(cb => dht.datastore.put(utils.bufferToKey(key), rec, cb))()
await dht.datastore.put(utils.bufferToKey(key), rec)
return undefined
},

Expand Down Expand Up @@ -366,7 +366,7 @@ module.exports = (dht) => ({
async _getLocalAsync (key) {
dht._log('getLocal %b', key)

const raw = await promisify(cb => dht.datastore.get(utils.bufferToKey(key), cb))()
const raw = await dht.datastore.get(utils.bufferToKey(key))
dht._log('found %b in local datastore', key)
const rec = Record.deserialize(raw)

Expand Down Expand Up @@ -436,7 +436,7 @@ module.exports = (dht) => ({
promiseToCallback(this._getValueSingleAsync(peer, key))(callback)
},

async _getValueSingleAsync (peer, key) {
async _getValueSingleAsync (peer, key) { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_VALUE, key, 0)
return promisify(cb => dht.network.sendRequest(peer, msg, cb))()
},
Expand Down Expand Up @@ -598,7 +598,7 @@ module.exports = (dht) => ({
promiseToCallback(this._findProvidersSingleAsync(peer, key))(callback)
},

async _findProvidersSingleAsync (peer, key) {
async _findProvidersSingleAsync (peer, key) { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0)
return promisify(cb => dht.network.sendRequest(peer, msg, cb))()
}
Expand Down
16 changes: 7 additions & 9 deletions src/providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ const varint = require('varint')
const PeerId = require('peer-id')
const Key = require('interface-datastore').Key
const Queue = require('p-queue')
const promisify = require('promisify-es6')
const toIterator = require('pull-stream-to-async-iterator')

const c = require('./constants')
const utils = require('./utils')
Expand Down Expand Up @@ -91,7 +89,7 @@ class Providers {

// Get all provider entries from the datastore
const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX })
for await (const entry of toIterator(query)) {
for await (const entry of query) {
try {
// Add a delete to the batch for each expired entry
const { cid, peerId } = parseProviderKey(entry.key)
Expand All @@ -117,7 +115,7 @@ class Providers {

// Commit the deletes to the datastore
if (deleted.size) {
await promisify(cb => batch.commit(cb))()
await batch.commit()
}

// Clear expired entries from the cache
Expand Down Expand Up @@ -182,7 +180,7 @@ class Providers {
* @param {PeerId} provider
* @returns {Promise}
*/
async addProvider (cid, provider) {
async addProvider (cid, provider) { // eslint-disable-line require-await
return this.syncQueue.add(async () => {
this._log('addProvider %s', cid.toBaseEncodedString())
const provs = await this._getProvidersMap(cid)
Expand All @@ -203,7 +201,7 @@ class Providers {
* @param {CID} cid
* @returns {Promise<Array<PeerId>>}
*/
async getProviders (cid) {
async getProviders (cid) { // eslint-disable-line require-await
return this.syncQueue.add(async () => {
this._log('getProviders %s', cid.toBaseEncodedString())
const provs = await this._getProvidersMap(cid)
Expand Down Expand Up @@ -238,7 +236,7 @@ function makeProviderKey (cid) {
*
* @private
*/
async function writeProviderEntry (store, cid, peer, time) {
async function writeProviderEntry (store, cid, peer, time) { // eslint-disable-line require-await
const dsKey = [
makeProviderKey(cid),
'/',
Expand All @@ -247,7 +245,7 @@ async function writeProviderEntry (store, cid, peer, time) {

const key = new Key(dsKey)
const buffer = Buffer.from(varint.encode(time))
return promisify(cb => store.put(key, buffer, cb))()
return store.put(key, buffer)
}

/**
Expand Down Expand Up @@ -282,7 +280,7 @@ function parseProviderKey (key) {
async function loadProviders (store, cid) {
const providers = new Map()
const query = store.query({ prefix: makeProviderKey(cid) })
for await (const entry of toIterator(query)) {
for await (const entry of query) {
const { peerId } = parseProviderKey(entry.key)
providers.set(peerId, readTime(entry.value))
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Query {
* @param {Array<PeerId>} peers
* @returns {Promise}
*/
async run (peers) {
async run (peers) { // eslint-disable-line require-await
if (!this.dht._queryManager.running) {
this._log.error('Attempt to run query after shutdown')
return { finalSet: new Set(), paths: [] }
Expand Down
76 changes: 36 additions & 40 deletions src/rpc/handlers/get-providers.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const CID = require('cids')
const parallel = require('async/parallel')
const PeerInfo = require('peer-info')
const promiseToCallback = require('promise-to-callback')
const errcode = require('err-code')
Expand All @@ -17,63 +16,60 @@ module.exports = (dht) => {
*
* @param {PeerInfo} peer
* @param {Message} msg
* @param {function(Error, Message)} callback
* @returns {undefined}
* @returns {Promise<Message>} Resolves a `Message` response
*/
return function getProviders (peer, msg, callback) {
async function getProvidersAsync (peer, msg) {
let cid
try {
cid = new CID(msg.key)
} catch (err) {
return callback(errcode(new Error(`Invalid CID: ${err.message}`), 'ERR_INVALID_CID'))
throw errcode(new Error(`Invalid CID: ${err.message}`), 'ERR_INVALID_CID')
}

log('%s', cid.toBaseEncodedString())

const dsKey = utils.bufferToKey(cid.buffer)

parallel([
(cb) => dht.datastore.has(dsKey, (err, exists) => {
if (err) {
log.error('Failed to check datastore existence', err)
return cb(null, false)
}
const [has, peers, closer] = await Promise.all([
dht.datastore.has(dsKey),
dht.providers.getProviders(cid),
dht._betterPeersToQueryAsync(msg, peer)
])

cb(null, exists)
}),
(cb) => promiseToCallback(dht.providers.getProviders(cid))(cb),
(cb) => dht._betterPeersToQuery(msg, peer, cb)
], (err, res) => {
if (err) {
return callback(err)
const providers = peers.map((p) => {
if (dht.peerBook.has(p)) {
return dht.peerBook.get(p)
}
const has = res[0]
const closer = res[2]
const providers = res[1].map((p) => {
if (dht.peerBook.has(p)) {
return dht.peerBook.get(p)
}

return dht.peerBook.put(new PeerInfo(p))
})
return dht.peerBook.put(new PeerInfo(p))
})

if (has) {
providers.push(dht.peerInfo)
}
if (has) {
providers.push(dht.peerInfo)
}

const response = new Message(msg.type, msg.key, msg.clusterLevel)
const response = new Message(msg.type, msg.key, msg.clusterLevel)

if (providers.length > 0) {
response.providerPeers = providers
}
if (providers.length > 0) {
response.providerPeers = providers
}

if (closer.length > 0) {
response.closerPeers = closer
}
if (closer.length > 0) {
response.closerPeers = closer
}

log('got %s providers %s closerPeers', providers.length, closer.length)
log('got %s providers %s closerPeers', providers.length, closer.length)
return response
}

callback(null, response)
})
/**
* Process `GetProviders` DHT messages.
*
* @param {PeerInfo} peer
* @param {Message} msg
* @param {function(Error, Message)} callback
* @returns {undefined}
*/
return function getProviders (peer, msg, callback) {
promiseToCallback(getProvidersAsync(peer, msg))(callback)
}
}
3 changes: 2 additions & 1 deletion src/rpc/handlers/put-value.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const utils = require('../../utils')
const errcode = require('err-code')
const promiseToCallback = require('promise-to-callback')

module.exports = (dht) => {
const log = utils.logger(dht.peerInfo.id, 'rpc:put-value')
Expand Down Expand Up @@ -37,7 +38,7 @@ module.exports = (dht) => {

const key = utils.bufferToKey(record.key)

dht.datastore.put(key, record.serialize(), (err) => {
promiseToCallback(dht.datastore.put(key, record.serialize()))(err => {
if (err) {
return callback(err)
}
Expand Down
2 changes: 1 addition & 1 deletion src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ exports.TimeoutError = class TimeoutError extends Error {
* @private
*/
exports.withTimeout = (asyncFn, time) => {
return async (...args) => {
return async (...args) => { // eslint-disable-line require-await
return Promise.race([
asyncFn(...args),
new Promise((resolve, reject) => {
Expand Down
12 changes: 7 additions & 5 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const PeerBook = require('peer-book')
const Switch = require('libp2p-switch')
const TCP = require('libp2p-tcp')
const Mplex = require('libp2p-mplex')

const promiseToCallback = require('promise-to-callback')
const errcode = require('err-code')

const KadDHT = require('../src')
Expand Down Expand Up @@ -955,14 +955,16 @@ describe('KadDHT', () => {
Buffer.from('hello'),
Buffer.from('world')
)
let received = new Date()
const received = new Date()
received.setDate(received.getDate() - 2)

record.timeReceived = received

waterfall([
(cb) => dht._putLocal(record.key, record.serialize(), cb),
(cb) => dht.datastore.get(kadUtils.bufferToKey(record.key), cb),
(cb) => {
promiseToCallback(dht.datastore.get(kadUtils.bufferToKey(record.key)))(cb)
},
(lookup, cb) => {
expect(lookup).to.exist('Record should be in the local datastore')
cb()
Expand All @@ -972,7 +974,7 @@ describe('KadDHT', () => {
expect(err).to.not.exist()
expect(rec).to.not.exist('Record should have expired')

dht.datastore.get(kadUtils.bufferToKey(record.key), (err, lookup) => {
promiseToCallback(dht.datastore.get(kadUtils.bufferToKey(record.key)))((err, lookup) => {
expect(err).to.exist('Should throw error for not existing')
expect(lookup).to.not.exist('Record should be removed from datastore')
done()
Expand Down Expand Up @@ -1021,7 +1023,7 @@ describe('KadDHT', () => {
// Simulate returning a peer id to query
sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]),
// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeersAsync').callsFake(async () => ({ record: rec }))
sinon.stub(dht, '_getValueOrPeersAsync').callsFake(async () => ({ record: rec })) // eslint-disable-line require-await
]

dht.getMany(key, 1, (err, res) => {
Expand Down
2 changes: 1 addition & 1 deletion test/kad-utils.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('kad utils', () => {

describe('withTimeout', () => {
it('rejects with the error in the original function', async () => {
const original = async () => { throw new Error('explode') }
const original = async () => { throw new Error('explode') } // eslint-disable-line require-await
const asyncFn = utils.withTimeout(original, 100)
let err
try {
Expand Down
12 changes: 6 additions & 6 deletions test/providers.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,31 +115,31 @@ describe('Providers', () => {
const store = new LevelStore(p)
providers = new Providers(store, infos[2].id, 10)

console.log('starting')
console.log('starting') // eslint-disable-line no-console
const res = await Promise.all([
createValues(100),
createPeerInfo(600)
])

console.log('got values and peers')
console.log('got values and peers') // eslint-disable-line no-console
const values = res[0]
const peers = res[1]
let total = Date.now()
const total = Date.now()

for (const v of values) {
for (const p of peers) {
await providers.addProvider(v.cid, p.id)
}
}

console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total)
console.log('starting profile with %s peers and %s cids', peers.length, values.length)
console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total) // eslint-disable-line no-console
console.log('starting profile with %s peers and %s cids', peers.length, values.length) // eslint-disable-line no-console

for (let i = 0; i < 3; i++) {
const start = Date.now()
for (const v of values) {
await providers.getProviders(v.cid)
console.log('query %sms', (Date.now() - start))
console.log('query %sms', (Date.now() - start)) // eslint-disable-line no-console
}
}

Expand Down
Loading

0 comments on commit daf9b00

Please sign in to comment.