Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: ipns over pubsub (#1559)
Browse files Browse the repository at this point in the history
Co-Authored-By: vasco-santos <vasco.santos@ua.pt>
  • Loading branch information
vasco-santos authored and Alan Shaw committed Dec 4, 2018
1 parent 6f1381f commit 8712542
Show file tree
Hide file tree
Showing 23 changed files with 818 additions and 31 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ Configure remote preload nodes. The remote will preload content added on this no
Enable and configure experimental features.

- `pubsub` (boolean): Enable libp2p pub-sub. (Default: `false`)
- `ipnsPubsub` (boolean): Enable pub-sub on IPNS. (Default: `false`)
- `sharding` (boolean): Enable directory sharding. Directories that have many child objects will be represented by multiple DAG nodes instead of just one. It can improve lookup performance when a directory has several thousand files or more. (Default: `false`)
- `dht` (boolean): Enable KadDHT. **This is currently not interoperable with `go-ipfs`.**

Expand Down Expand Up @@ -561,6 +562,9 @@ The core API is grouped into several areas:

- [name](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md)
- [`ipfs.name.publish(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepublish)
- [`ipfs.name.pubsub.cancel(arg, [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubcancel)
- [`ipfs.name.pubsub.state([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubstate)
- [`ipfs.name.pubsub.subs([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubsubs)
- [`ipfs.name.resolve(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#nameresolve)

#### Crypto and Key Management
Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"execa": "^1.0.0",
"form-data": "^2.3.3",
"hat": "0.0.3",
"interface-ipfs-core": "~0.88.0",
"interface-ipfs-core": "~0.89.0",
"ipfsd-ctl": "~0.40.1",
"ncp": "^2.0.0",
"qs": "^6.5.2",
Expand All @@ -88,7 +88,9 @@
"byteman": "^1.3.5",
"cid-tool": "~0.2.0",
"cids": "~0.5.5",
"class-is": "^1.1.0",
"datastore-core": "~0.6.0",
"datastore-pubsub": "~0.1.1",
"debug": "^4.1.0",
"deep-extend": "~0.6.0",
"err-code": "^1.1.2",
Expand Down Expand Up @@ -118,7 +120,7 @@
"ipld-ethereum": "^2.0.1",
"ipld-git": "~0.2.2",
"ipld-zcash": "~0.1.6",
"ipns": "~0.3.0",
"ipns": "~0.4.3",
"is-ipfs": "~0.4.7",
"is-pull-stream": "~0.0.0",
"is-stream": "^1.1.0",
Expand Down
3 changes: 3 additions & 0 deletions src/cli/commands/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ module.exports = {
})
.option('local', {
desc: 'Run commands locally to the daemon',
default: false
})
.option('enable-namesys-pubsub', {
type: 'boolean',
default: false
})
Expand Down
18 changes: 18 additions & 0 deletions src/cli/commands/name/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict'

/*
Manage and inspect the state of the IPNS pubsub resolver.
Note: this command is experimental and subject to change as the system is refined.
*/
module.exports = {
command: 'pubsub',

description: 'IPNS pubsub management.',

builder (yargs) {
return yargs.commandDir('pubsub')
},

handler (argv) {
}
}
19 changes: 19 additions & 0 deletions src/cli/commands/name/pubsub/cancel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict'

const print = require('../../../utils').print

module.exports = {
command: 'cancel <name>',

describe: 'Cancel a name subscription.',

handler (argv) {
argv.ipfs.name.pubsub.cancel(argv.name, (err, result) => {
if (err) {
throw err
} else {
print(result.canceled ? 'canceled' : 'no subscription')
}
})
}
}
19 changes: 19 additions & 0 deletions src/cli/commands/name/pubsub/state.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict'

const print = require('../../../utils').print

module.exports = {
command: 'state',

describe: 'Query the state of IPNS pubsub.',

handler (argv) {
argv.ipfs.name.pubsub.state((err, result) => {
if (err) {
throw err
} else {
print(result.enabled ? 'enabled' : 'disabled')
}
})
}
}
21 changes: 21 additions & 0 deletions src/cli/commands/name/pubsub/subs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict'

const print = require('../../../utils').print

module.exports = {
command: 'subs',

describe: 'Show current name subscriptions.',

handler (argv) {
argv.ipfs.name.pubsub.subs((err, result) => {
if (err) {
throw err
} else {
result.forEach((s) => {
print(s)
})
}
})
}
}
92 changes: 92 additions & 0 deletions src/core/components/name-pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict'

const debug = require('debug')
const errcode = require('err-code')
const promisify = require('promisify-es6')

const IpnsPubsubDatastore = require('../ipns/routing/pubsub-datastore')

const log = debug('jsipfs:name-pubsub')
log.error = debug('jsipfs:name-pubsub:error')

// Is pubsub enabled
const isNamePubsubEnabled = (node) => {
try {
return Boolean(getPubsubRouting(node))
} catch (err) {
return false
}
}

// Get pubsub from IPNS routing
const getPubsubRouting = (node) => {
if (!node._ipns || !node._options.EXPERIMENTAL.ipnsPubsub) {
const errMsg = 'IPNS pubsub subsystem is not enabled'

throw errcode(errMsg, 'ERR_IPNS_PUBSUB_NOT_ENABLED')
}

// Only one store and it is pubsub
if (IpnsPubsubDatastore.isIpnsPubsubDatastore(node._ipns.routing)) {
return node._ipns.routing
}

// Find in tiered
const pubsub = (node._ipns.routing.stores || []).find(s => IpnsPubsubDatastore.isIpnsPubsubDatastore(s))

if (!pubsub) {
const errMsg = 'IPNS pubsub datastore not found'

throw errcode(errMsg, 'ERR_PUBSUB_DATASTORE_NOT_FOUND')
}

return pubsub
}

module.exports = function namePubsub (self) {
return {
/**
* Query the state of IPNS pubsub.
*
* @returns {Promise|void}
*/
state: promisify((callback) => {
callback(null, {
enabled: isNamePubsubEnabled(self)
})
}),
/**
* Cancel a name subscription.
*
* @param {String} name subscription name.
* @param {function(Error)} [callback]
* @returns {Promise|void}
*/
cancel: promisify((name, callback) => {
let pubsub
try {
pubsub = getPubsubRouting(self)
} catch (err) {
return callback(err)
}

pubsub.cancel(name, callback)
}),
/**
* Show current name subscriptions.
*
* @param {function(Error)} [callback]
* @returns {Promise|void}
*/
subs: promisify((callback) => {
let pubsub
try {
pubsub = getPubsubRouting(self)
} catch (err) {
return callback(err)
}

pubsub.getSubscriptions(callback)
})
}
}
4 changes: 3 additions & 1 deletion src/core/components/name.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const errcode = require('err-code')
const log = debug('jsipfs:name')
log.error = debug('jsipfs:name:error')

const namePubsub = require('./name-pubsub')
const utils = require('../utils')
const path = require('../ipns/path')

Expand Down Expand Up @@ -161,6 +162,7 @@ module.exports = function name (self) {
}

self._ipns.resolve(name, resolveOptions, callback)
})
}),
pubsub: namePubsub(self)
}
}
13 changes: 12 additions & 1 deletion src/core/components/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const get = require('lodash/get')
const setImmediate = require('async/setImmediate')
const promisify = require('promisify-es6')
const { TieredDatastore } = require('datastore-core')

const IPNS = require('../ipns')
const PubsubDatastore = require('../ipns/routing/pubsub-datastore')
const OfflineDatastore = require('../ipns/routing/offline-datastore')

module.exports = (self) => {
Expand Down Expand Up @@ -41,7 +43,16 @@ module.exports = (self) => {
// Setup online routing for IPNS with a tiered routing composed by a DHT and a Pubsub router (if properly enabled)
const ipnsStores = []

// TODO Add IPNS pubsub if enabled
// Add IPNS pubsub if enabled
let pubsubDs
if (get(self._options, 'EXPERIMENTAL.ipnsPubsub', false)) {
const pubsub = self._libp2pNode.pubsub
const localDatastore = self._repo.datastore
const peerId = self._peerInfo.id

pubsubDs = new PubsubDatastore(pubsub, localDatastore, peerId)
ipnsStores.push(pubsubDs)
}

// NOTE: IPNS routing is being replaced by the local repo datastore while the IPNS over DHT is not ready
// When DHT is added, if local option enabled, should receive offlineDatastore as well
Expand Down
1 change: 1 addition & 0 deletions src/core/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const schema = Joi.object().keys({
}).allow(null),
EXPERIMENTAL: Joi.object().keys({
pubsub: Joi.boolean(),
ipnsPubsub: Joi.boolean(),
sharding: Joi.boolean(),
dht: Joi.boolean()
}).allow(null),
Expand Down
8 changes: 8 additions & 0 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ class IPFS extends EventEmitter {
if (this._options.EXPERIMENTAL.pubsub) {
this.log('EXPERIMENTAL pubsub is enabled')
}
if (this._options.EXPERIMENTAL.ipnsPubsub) {
if (!this._options.EXPERIMENTAL.pubsub) {
this.log('EXPERIMENTAL pubsub is enabled to use IPNS pubsub')
this._options.EXPERIMENTAL.pubsub = true
}

this.log('EXPERIMENTAL IPNS pubsub is enabled')
}
if (this._options.EXPERIMENTAL.sharding) {
this.log('EXPERIMENTAL sharding is enabled')
}
Expand Down
20 changes: 4 additions & 16 deletions src/core/ipns/publisher.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const PeerId = require('peer-id')
const Record = require('libp2p-record').Record
const { Key } = require('interface-datastore')
const series = require('async/series')
const errcode = require('err-code')
Expand Down Expand Up @@ -97,19 +96,17 @@ class IpnsPublisher {
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
}

let rec
let entryData
try {
// Marshal record
const entryData = ipns.marshal(entry)
// Marshal to libp2p record
rec = new Record(key.toBuffer(), entryData)
entryData = ipns.marshal(entry)
} catch (err) {
log.error(err)
return callback(err)
}

// Add record to routing (buffer key)
this._routing.put(key.toBuffer(), rec.serialize(), (err, res) => {
this._routing.put(key.toBuffer(), entryData, (err, res) => {
if (err) {
const errMsg = `ipns record for ${key.toString()} could not be stored in the routing`

Expand Down Expand Up @@ -137,17 +134,8 @@ class IpnsPublisher {
return callback(errcode(new Error(errMsg), 'ERR_UNDEFINED_PARAMETER'))
}

let rec
try {
// Marshal to libp2p record
rec = new Record(key.toBuffer(), publicKey.bytes)
} catch (err) {
log.error(err)
return callback(err)
}

// Add public key to routing (buffer key)
this._routing.put(key.toBuffer(), rec.serialize(), (err, res) => {
this._routing.put(key.toBuffer(), publicKey.bytes, (err, res) => {
if (err) {
const errMsg = `public key for ${key.toString()} could not be stored in the routing`

Expand Down
4 changes: 1 addition & 3 deletions src/core/ipns/resolver.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const ipns = require('ipns')
const Record = require('libp2p-record').Record
const PeerId = require('peer-id')
const errcode = require('err-code')

Expand Down Expand Up @@ -119,8 +118,7 @@ class IpnsResolver {

let ipnsEntry
try {
const record = Record.deserialize(res)
ipnsEntry = ipns.unmarshal(record.value)
ipnsEntry = ipns.unmarshal(res)
} catch (err) {
const errMsg = `found ipns record that we couldn't convert to a value`

Expand Down
Loading

0 comments on commit 8712542

Please sign in to comment.