Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

refactor: switch to async iterators #82

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
'use strict'

const multiaddr = require('multiaddr')
const pull = require('pull-stream')

const pipe = require('it-pipe')
const WS = require('./src')

let listener

function boot (done) {
const ws = new WS()
const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
listener = ws.createListener((conn) => pull(conn, conn))
listener.listen(ma, done)
listener = ws.createListener(conn => pipe(conn, conn))
listener.listen(ma).then(() => done()).catch(done)
listener.on('error', console.error)
}

function shutdown (done) {
listener.close(done)
listener.close().then(done).catch(done)
}

module.exports = {
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ lib-cov

# Coverage directory used by tools like istanbul
coverage
.nyc_output

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
Expand All @@ -40,4 +41,3 @@ node_modules
*.swp

dist

2 changes: 0 additions & 2 deletions ci/Jenkinsfile

This file was deleted.

16 changes: 9 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"release": "aegir release -t node -t browser ",
"release-minor": "aegir release --type minor -t node -t browser",
"release-major": "aegir release --type major -t node -t browser",
"coverage": "aegir coverage",
"coverage-publish": "aegir coverage --provider coveralls"
"coverage": "nyc --reporter=lcov --reporter=text npm run test:node"
},
"browser": {
"src/listener": "./src/listener.browser.js"
Expand All @@ -40,21 +39,24 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
"dependencies": {
"abortable-iterator": "^2.0.0",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"interface-connection": "~0.3.3",
"it-ws": "^2.1.0",
"mafmt": "^6.0.7",
"multiaddr-to-uri": "^4.0.1",
"pull-ws": "hugomrdias/pull-ws#fix/bundle-size"
"multiaddr-to-uri": "^4.0.1"
},
"devDependencies": {
"abort-controller": "^3.0.0",
"aegir": "^18.2.1",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"interface-transport": "~0.3.7",
"interface-transport": "github:libp2p/interface-transport#master",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If #master, why not released? //cc @jacobheun

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's wrong, this is outdated. We're tackling tcp first and then will circle back to this and the other transports.

"it-goodbye": "^2.0.0",
"it-pipe": "^1.0.0",
"multiaddr": "^6.0.6",
"pull-goodbye": "0.0.2",
"pull-stream": "^3.6.9"
"streaming-iterables": "^4.0.2"
},
"contributors": [
"Chris Campbell <christopher.d.campbell@gmail.com>",
Expand Down
17 changes: 17 additions & 0 deletions src/adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

const { Adapter } = require('interface-transport')
const withIs = require('class-is')
const WebSockets = require('./')

// Legacy adapter to old transport & connection interface
class WebSocketsAdapter extends Adapter {
constructor () {
super(new WebSockets())
}
}

module.exports = withIs(WebSocketsAdapter, {
className: 'WebSockets',
symbolName: '@libp2p/js-libp2p-websockets/websockets'
})
85 changes: 52 additions & 33 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,54 +1,71 @@
'use strict'

const connect = require('pull-ws/client')
const connect = require('it-ws/client')
const mafmt = require('mafmt')
const withIs = require('class-is')
const Connection = require('interface-connection').Connection

const toUri = require('multiaddr-to-uri')
const debug = require('debug')
const log = debug('libp2p:websockets:dialer')

const log = require('debug')('libp2p:websockets:transport')
const abortable = require('abortable-iterator')
const { AbortError } = require('interface-transport')
const createListener = require('./listener')

class WebSockets {
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
async dial (ma, options) {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
options = options || {}
log('dialing %s', ma)

callback = callback || function () { }
const socket = connect(toUri(ma), Object.assign({ binary: true }, options))
const getObservedAddrs = () => [ma]

const url = toUri(ma)
log('dialing %s', url)
const socket = connect(url, {
binary: true,
onConnect: (err) => {
callback(err)
if (!options.signal) {
socket.getObservedAddrs = getObservedAddrs
await socket.connected()
log('connected %s', ma)
return socket
}

// Allow abort via signal during connect
let onAbort
const abort = new Promise((resolve, reject) => {
onAbort = () => {
reject(new AbortError())
socket.close()
}

// Already aborted?
if (options.signal.aborted) return onAbort()
options.signal.addEventListener('abort', onAbort)
})

const conn = new Connection(socket)
conn.getObservedAddrs = (cb) => cb(null, [ma])
conn.close = (cb) => socket.close(cb)
try {
await Promise.race([abort, socket.connected()])
} finally {
options.signal.removeEventListener('abort', onAbort)
}

return conn
log('connected %s', ma)
return {
sink: async source => {
try {
await socket.sink(abortable(source, options.signal))
} catch (err) {
// Re-throw non-aborted errors
if (err.type !== 'aborted') throw err
// Otherwise, this is fine...
await socket.close()
}
},
source: abortable(socket.source, options.signal),
getObservedAddrs
}
}

createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}

return createListener(options, handler)
}

filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]

return multiaddrs.filter((ma) => {
if (ma.protoNames().includes('p2p-circuit')) {
Expand All @@ -59,10 +76,12 @@ class WebSockets {
ma = ma.decapsulate('ipfs')
}

return mafmt.WebSockets.matches(ma) ||
mafmt.WebSocketsSecure.matches(ma)
return mafmt.WebSockets.matches(ma) || mafmt.WebSocketsSecure.matches(ma)
})
}
}

module.exports = withIs(WebSockets, { className: 'WebSockets', symbolName: '@libp2p/js-libp2p-websockets/websockets' })
module.exports = withIs(WebSockets, {
className: 'WebSockets',
symbolName: '@libp2p/js-libp2p-websockets/websockets'
})
40 changes: 19 additions & 21 deletions src/listener.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,41 @@
'use strict'

const Connection = require('interface-connection').Connection
const multiaddr = require('multiaddr')
const os = require('os')

function noop () {}

const createServer = require('pull-ws/server') || noop
const { createServer } = require('it-ws')

module.exports = (options, handler) => {
const listener = createServer(options, (socket) => {
socket.getObservedAddrs = (callback) => {
// TODO research if we can reuse the address in anyway
return callback(null, [])
}
if (typeof options === 'function') {
handler = options
options = {}
}

options = options || {}

handler(new Connection(socket))
})
const server = createServer(options, handler ? socket => {
socket.getObservedAddrs = () => []
handler(socket)
} : null)

let listeningMultiaddr

listener._listen = listener.listen
listener.listen = (ma, callback) => {
callback = callback || noop
const listen = server.listen
server.listen = ma => {
listeningMultiaddr = ma

if (ma.protoNames().includes('ipfs')) {
ma = ma.decapsulate('ipfs')
}

listener._listen(ma.toOptions(), callback)
return listen(ma.toOptions())
}

listener.getAddrs = (callback) => {
server.getAddrs = () => {
const multiaddrs = []
const address = listener.address()
const address = server.address()

if (!address) {
return callback(new Error('Listener is not ready yet'))
throw new Error('Listener is not ready yet')
}

let ipfsId = listeningMultiaddr.getPeerId()
Expand Down Expand Up @@ -65,8 +63,8 @@ module.exports = (options, handler) => {
}
}

callback(null, multiaddrs)
return multiaddrs
}

return listener
return server
}
74 changes: 29 additions & 45 deletions test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ const expect = chai.expect
chai.use(dirtyChai)

const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const goodbye = require('pull-goodbye')
const pipe = require('it-pipe')
const goodbye = require('it-goodbye')
const { collect, take } = require('streaming-iterables')

const WS = require('../src')

Expand All @@ -17,65 +18,48 @@ describe('libp2p-websockets', () => {
let ws
let conn

beforeEach((done) => {
beforeEach(async () => {
ws = new WS()
expect(ws).to.exist()
conn = ws.dial(ma, (err, res) => {
expect(err).to.not.exist()
done()
})
conn = await ws.dial(ma)
})

it('echo', (done) => {
const message = 'Hello World!'
it('echo', async () => {
const message = Buffer.from('Hello World!')
const s = goodbye({ source: [message], sink: collect })

const s = goodbye({
source: pull.values([message]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist()
expect(results).to.eql([message])
done()
})
})

pull(s, conn, s)
const results = await pipe(s, conn, s)
expect(results).to.eql([message])
})

describe('stress', () => {
it('one big write', (done) => {
it('one big write', async () => {
const rawMessage = Buffer.allocUnsafe(1000000).fill('a')

const s = goodbye({
source: pull.values([rawMessage]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist()
expect(results).to.eql([rawMessage])
done()
})
})
pull(s, conn, s)
const s = goodbye({ source: [rawMessage], sink: collect })

const results = await pipe(s, conn, s)
expect(results).to.eql([rawMessage])
})

it('many writes', function (done) {
this.timeout(10000)
it('many writes', async function () {
this.timeout(100000)
const s = goodbye({
source: pull(
pull.infinite(),
pull.take(1000),
pull.map((val) => Buffer.from(val.toString()))
source: pipe(
{
[Symbol.iterator] () { return this },
next: () => ({ done: false, value: Buffer.from(Math.random().toString()) })
},
take(20000)
),
sink: pull.collect((err, result) => {
expect(err).to.not.exist()
expect(result).to.have.length(1000)
done()
})
sink: collect
})

pull(s, conn, s)
const result = await pipe(s, conn, s)
expect(result).to.have.length(20000)
})
})
})

it('.createServer throws in browser', () => {
expect(new WS().createListener).to.throw()
it('.createServer throws in browser', () => {
expect(new WS().createListener).to.throw()
})
})
Loading