-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
95 lines (81 loc) · 2.67 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
var muxrpc = require('muxrpc')
var pull = require('pull-stream')
var Obv = require('obv')
var plugify = require('./plugify')
const {syncToAsync} = require('./util')
module.exports = function (inputChannel, outputChannel, opts) {
opts = opts || {}
// muxrpc factory method, wait until muxrpc api is ready, then connect
// any streams that are waiting, and wait for more connections
var outputApi = Obv()
function onConnect (stream) {
if(outputApi.value) {
var pipe = muxrpc(null, outputApi.value.manifest) (outputApi.value.api)
pull(stream, pipe.createStream(), stream)
} else {
outputApi.once(val => {
var pipe = muxrpc(null, val.manifest) (val.api)
pull(stream, pipe.createStream(), stream)
})
}
}
// If outputChannel is a duplex stream, just use that, otherwise
// connect the outputChannel to the muxrpc factory
if (outputChannel.sink && outputChannel.source) {
onConnect(outputChannel)
} else {
outputChannel(onConnect)
}
return new Promise(function (resolve, reject) {
// connect to the pipe input
var sbot = muxrpc((err, manifest, sbot) => {
if (err) {
return reject(err)
}
// If we forward piped in methods, include them in the manifest
// otherwise exclude them, so they're local methods only
if (opts.forward)
sbot.manifest = syncToAsync(manifest)
sbot = plugify(sbot, opts)
var started
var createPipe = function (options) {
started = true
// Wrap api in another object to override manifest as a getter
// without changing api exposed to plugins
var serveApi = {
manifest (cb) {
cb(null, sbot.manifest)
}
}
if (!sbot.manifest['manifest']) {
sbot.manifest['manifest'] = 'async'
}
serveApi.__proto__ = sbot
// set the api to be served, which should then start hooking
// up active connects to the rpc endpoint
outputApi.set({manifest: sbot.manifest, api: serveApi})
return sbot
}
createPipe.close = function () {
if (outputChannel.close) {
outputChannel.close()
sbot.close()
}
}
createPipe.use = function (plugin, overwrite) {
if (started)
throw new Error('pipe already started, cannot load plugins anymore')
sbot.use(plugin, overwrite)
return createPipe
}
resolve(createPipe)
}) ()
if (inputChannel.then) {
inputChannel.then(function (stream) {
pull(stream, sbot.createStream(), stream)
}, reject)
} else {
pull(inputChannel, sbot.createStream(), inputChannel)
}
})
}