Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

[WIP] feat: initial implementation of pull-streams based multiplexer #76

Closed
wants to merge 12 commits into from
13 changes: 13 additions & 0 deletions benchmarks/libp2p-mplex/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package-lock.json
yarn.lock
docs

**/node_modules
**/*.log
test/setup/tmp-disposable-nodes-addrs.json
dist
coverage
**/*.swp
examples/sub-module/**/bundle.js
examples/sub-module/**/*-minified.js
examples/sub-module/*-bundle.js
18 changes: 18 additions & 0 deletions benchmarks/libp2p-mplex/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "libp2p-mplex-profile",
"version": "1.0.0",
"description": "",
"main": "profile.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "MIT",
"dependencies": {
"async": "^2.6.0",
"libp2p-mplex": "0.7.0",
"pull-generate": "^2.2.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.6.8"
}
}
85 changes: 85 additions & 0 deletions benchmarks/libp2p-mplex/profile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
'use strict'

const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const generate = require('pull-generate')
const each = require('async/each')
const eachLimit = require('async/eachLimit')
const setImmediate = require('async/setImmediate')

const Plex = require('libp2p-mplex')

const spawn = (nStreams, nMsg, done, limit) => {
const p = pair()

const check = marker(2 * nStreams, done)

const msg = 'simple msg'

const listener = Plex.dialer(p[0])
const dialer = Plex.listener(p[1])

listener.on('stream', (stream) => {
pull(
stream,
pull.onEnd((err) => {
if (err) { return done(err) }
check()
pull(pull.empty(), stream)
})
)
})

const numbers = []
for (let i = 0; i < nStreams; i++) {
numbers.push(i)
}

const spawnStream = (n, cb) => {
const stream = dialer.newStream()
pull(
generate(0, (s, cb) => {
setImmediate(() => {
cb(s === nMsg ? true : null, msg, s + 1)
})
}),
stream,
pull.collect((err) => {
if (err) { return done(err) }
check()
cb()
})
)
}

if (limit) {
eachLimit(numbers, limit, spawnStream, () => {})
} else {
each(numbers, spawnStream, () => {})
}
}

function marker (n, done) {
let i = 0
return (err) => {
i++

// console.log(`${i} out of ${n} interactions`)
if (err) {
console.error('Failed after %s iterations', i)
return done(err)
}

if (i === n) {
done()
}
}
}

spawn(1000, 1000, (err) => {
if (err) {
throw err
}
console.log('Done')
process.exit(0)
}, 50000)
13 changes: 13 additions & 0 deletions benchmarks/libp2p-pull-mplex/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package-lock.json
yarn.lock
docs

**/node_modules
**/*.log
test/setup/tmp-disposable-nodes-addrs.json
dist
coverage
**/*.swp
examples/sub-module/**/bundle.js
examples/sub-module/**/*-minified.js
examples/sub-module/*-bundle.js
17 changes: 17 additions & 0 deletions benchmarks/libp2p-pull-mplex/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "libp2p-pull-mplex-profile",
"version": "1.0.0",
"description": "",
"main": "profile.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"async": "^2.6.0",
"pull-generate": "^2.2.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.6.8"
}
}
85 changes: 85 additions & 0 deletions benchmarks/libp2p-pull-mplex/profile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
'use strict'

const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const generate = require('pull-generate')
const each = require('async/each')
const eachLimit = require('async/eachLimit')
const setImmediate = require('async/setImmediate')

const Plex = require('../../src')

const spawn = (nStreams, nMsg, done, limit) => {
const p = pair()

const check = marker(2 * nStreams, done)

const msg = 'simple msg'

const listener = Plex.dialer(p[0])
const dialer = Plex.listener(p[1])

listener.on('stream', (stream) => {
pull(
stream,
pull.onEnd((err) => {
if (err) { return done(err) }
check()
pull(pull.empty(), stream)
})
)
})

const numbers = []
for (let i = 0; i < nStreams; i++) {
numbers.push(i)
}

const spawnStream = (n, cb) => {
const stream = dialer.newStream()
pull(
generate(0, (s, cb) => {
setImmediate(() => {
cb(s === nMsg ? true : null, msg, s + 1)
})
}),
stream,
pull.collect((err) => {
if (err) { return done(err) }
check()
cb()
})
)
}

if (limit) {
eachLimit(numbers, limit, spawnStream, () => {})
} else {
each(numbers, spawnStream, () => {})
}
}

function marker (n, done) {
let i = 0
return (err) => {
i++

// console.log(`${i} out of ${n} interactions`)
if (err) {
console.error('Failed after %s iterations', i)
return done(err)
}

if (i === n) {
done()
}
}
}

spawn(1000, 1000, (err) => {
if (err) {
throw err
}
console.log('Done')
process.exit(0)
}, 50000)
13 changes: 13 additions & 0 deletions benchmarks/multiplex/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package-lock.json
yarn.lock
docs

**/node_modules
**/*.log
test/setup/tmp-disposable-nodes-addrs.json
dist
coverage
**/*.swp
examples/sub-module/**/bundle.js
examples/sub-module/**/*-minified.js
examples/sub-module/*-bundle.js
15 changes: 15 additions & 0 deletions benchmarks/multiplex/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "multiplex-profile",
"version": "1.0.0",
"description": "",
"main": "profile.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"async": "^2.6.0",
"multiplex": "^6.7.0"
}
}
79 changes: 79 additions & 0 deletions benchmarks/multiplex/profile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict'

const each = require('async/each')
const eachLimit = require('async/eachLimit')

const Plex = require('multiplex')

const spawn = (nStreams, nMsg, done, limit) => {
const check = marker(2 * nStreams, done)

const msg = 'simple msg'

const listener = new Plex()
const dialer = new Plex()

listener.on('stream', (stream) => {
stream.once('data', () => {
check()
})

stream.once('error', (err) => {
done(err)
})
})

dialer.pipe(listener).pipe(dialer)

const numbers = []
for (let i = 1; i <= nStreams; i++) {
numbers.push(i)
}

const spawnStream = (n, cb) => {
const stream = dialer.createStream()

stream.once('error', (err) => {
return done(err)
})

for (let i = 0; i <= nMsg; i++) {
stream.write(msg + i)
}

check()
stream.end()
cb()
}

if (limit) {
eachLimit(numbers, limit, spawnStream, () => {})
} else {
each(numbers, spawnStream, () => {})
}
}

function marker (n, done) {
let i = 0
return (err) => {
i++

// console.log(`${i} out of ${n} interactions`)
if (err) {
console.error('Failed after %s iterations', i)
return done(err)
}

if (i === n) {
done()
}
}
}

spawn(1000, 10000, (err) => {
if (err) {
throw err
}
console.log('Done')
process.exit(0)
}, 50000)
13 changes: 13 additions & 0 deletions benchmarks/pull-mplex/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package-lock.json
yarn.lock
docs

**/node_modules
**/*.log
test/setup/tmp-disposable-nodes-addrs.json
dist
coverage
**/*.swp
examples/sub-module/**/bundle.js
examples/sub-module/**/*-minified.js
examples/sub-module/*-bundle.js
18 changes: 18 additions & 0 deletions benchmarks/pull-mplex/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "libp2p-pull-mplex-profile",
"version": "1.0.0",
"description": "",
"main": "profile.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "MIT",
"dependencies": {
"async": "^2.6.0",
"pull-generate": "^2.2.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.6.8",
"pull-mplex": "0.0.1"
}
}
Loading