Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

follow new transport and connection spec #6

Merged
merged 1 commit into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@
"bl": "^1.1.2",
"chai": "^3.5.0",
"pre-commit": "^1.1.3",
"run-series": "^1.1.4",
"webrtcsupport": "^2.2.0"
},
"dependencies": {
"debug": "^2.2.0",
"duplexify": "^3.4.3",
"hapi": "^13.4.1",
"interface-connection": "^0.1.3",
"mafmt": "^2.1.0",
"minimist": "^1.2.0",
"peer-id": "^0.7.0",
Expand All @@ -61,4 +63,4 @@
"David Dias <daviddias.p@gmail.com>",
"dignifiedquire <dignifiedquire@gmail.com>"
]
}
}
181 changes: 104 additions & 77 deletions src/webrtc-star/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,135 +4,161 @@ const debug = require('debug')
const log = debug('libp2p:webrtc-star')
const multiaddr = require('multiaddr')
const mafmt = require('mafmt')
const parallel = require('run-parallel')
const io = require('socket.io-client')
const EE = require('events').EventEmitter
const SimplePeer = require('simple-peer')
const Duplexify = require('duplexify')
const peerId = require('peer-id')
const PeerInfo = require('peer-info')
const Connection = require('interface-connection').Connection

exports = module.exports = WebRTCStar

const sioOptions = {
transports: ['websocket'],
'force new connection': true
}

function WebRTCStar () {
if (!(this instanceof WebRTCStar)) {
return new WebRTCStar()
}

const listeners = []
let mhSelf
let maSelf
const listeners = {}
this.discovery = new EE()

this.dial = function (multiaddr, options) {
if (!options) {
this.dial = function (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

options.ready = options.ready || function noop () {}
const pt = new Duplexify()
if (!callback) {
callback = function noop () {}
}

const conn = new Connection()

const intentId = (~~(Math.random() * 1e9)).toString(36) + Date.now()
const sioClient = listeners[0]
const conn = new SimplePeer({ initiator: true, trickle: false })
const sioClient = listeners[Object.keys(listeners)[0]].io
const channel = new SimplePeer({ initiator: true, trickle: false })

conn.on('signal', function (signal) {
channel.on('signal', function (signal) {
sioClient.emit('ss-handshake', {
intentId: intentId,
srcMultiaddr: mhSelf.toString(),
dstMultiaddr: multiaddr.toString(),
srcMultiaddr: maSelf.toString(),
dstMultiaddr: ma.toString(),
signal: signal
})
})

channel.on('timeout', () => {
conn.emit('timeout')
})

channel.on('error', (err) => {
callback(err)
conn.emit('error', err)
})

sioClient.on('ws-handshake', (offer) => {
if (offer.intentId !== intentId || !offer.answer) {
return
}

conn.on('connect', () => {
pt.setReadable(conn)
pt.setWritable(conn)
channel.on('connect', () => {
conn.setInnerConn(channel)

pt.destroy = conn.destroy.bind(conn)
conn.destroy = channel.destroy.bind(channel)

conn.on('close', () => {
pt.emit('close')
channel.on('close', () => {
conn.emit('close')
})

pt.getObservedAddrs = () => {
return [multiaddr]
conn.getObservedAddrs = () => {
return [ma]
}
options.ready(null, pt)

conn.emit('connect')
callback(null, conn)
})
conn.signal(offer.signal)

channel.signal(offer.signal)
})

return pt
return conn
}

this.createListener = (multiaddrs, handler, callback) => {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
this.createListener = (options, handler) => {
if (typeof options === 'function') {
handler = options
options = {}
}

const sioOptions = {
transports: ['websocket'],
'force new connection': true
}
// for now it only supports listening in one signalling server
// no technical limitation why not to do more :)
const mh = multiaddrs[0]
mhSelf = mh
// I know.. "websockets connects on a http endpoint, but through a
// tcp port"
const sioUrl = 'http://' + mh.toString().split('/')[3] + ':' + mh.toString().split('/')[5]
const sioClient = io.connect(sioUrl, sioOptions)
sioClient.on('connect_error', callback)
sioClient.on('connect', () => {
sioClient.emit('ss-join', multiaddrs[0].toString())
sioClient.on('ws-handshake', incommingDial)
sioClient.on('ws-peer', peerDiscovered.bind(this))
listeners.push(sioClient)
callback()
})
const listener = new EE()

function incommingDial (offer) {
if (offer.answer) {
return
listener.listen = (ma, callback) => {
if (!callback) {
callback = function noop () {}
}
maSelf = ma

const sioUrl = 'http://' + ma.toString().split('/')[3] + ':' + ma.toString().split('/')[5]

listener.io = io.connect(sioUrl, sioOptions)
listener.io.on('connect_error', callback)
listener.io.on('connect', () => {
listener.io.emit('ss-join', ma.toString())
listener.io.on('ws-handshake', incommingDial)
listener.io.on('ws-peer', peerDiscovered.bind(this))
listener.emit('listening')
callback()
})

const conn = new SimplePeer({ trickle: false })
function incommingDial (offer) {
if (offer.answer) { return }

conn.on('connect', () => {
conn.getObservedAddrs = () => {
return []
}
const channel = new SimplePeer({ trickle: false })
const conn = Connection(channel)

handler(conn)
})
channel.on('connect', () => {
conn.getObservedAddrs = () => {
return [offer.srcMultiaddr]
}

conn.on('signal', function (signal) {
offer.signal = signal
offer.answer = true
sioClient.emit('ss-handshake', offer)
})
listener.emit('connection', conn)
handler(conn)
})

conn.signal(offer.signal)
}
}
channel.on('signal', (signal) => {
offer.signal = signal
offer.answer = true
listener.io.emit('ss-handshake', offer)
})

this.close = (callback) => {
if (listeners.length === 0) {
log('Called close with no active listeners')
return callback()
channel.signal(offer.signal)
}
}

parallel(listeners.map((listener) => {
return (cb) => {
listener.emit('ss-leave')
cb()
listener.close = (callback) => {
if (!callback) {
callback = function noop () {}
}
}), callback)
listener.io.emit('ss-leave')
setTimeout(() => {
listener.emit('close')
callback()
}, 100)
}

listener.getAddrs = (callback) => {
process.nextTick(() => {
callback(null, [maSelf])
})
}

listeners[multiaddr.toString()] = listener
return listener
}

this.filter = (multiaddrs) => {
Expand All @@ -144,10 +170,11 @@ function WebRTCStar () {
})
}

function peerDiscovered (mh) {
const id = peerId.createFromB58String(mh.split('/')[8])
function peerDiscovered (maStr) {
log('Peer Discovered:', maStr)
const id = peerId.createFromB58String(maStr.split('/')[8])
const peer = new PeerInfo(id)
peer.multiaddr.add(multiaddr(mh))
peer.multiaddr.add(multiaddr(maStr))
this.discovery.emit('peer', peer)
}
}
7 changes: 4 additions & 3 deletions test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

const webrtcSupport = require('webrtcsupport')

require('./webrtc-star/test-instance.js')
require('./webrtc-star/test-filter.js')
require('./webrtc-star/test-join-and-leave.js')
require('./webrtc-star/test-listen.js')

if (webrtcSupport.support) {
require('./webrtc-star/test-dial-and-listen.js')
require('./webrtc-star/test-dial-and-destroy.js')
require('./webrtc-star/test-dial.js')
require('./webrtc-star/test-discovery.js')
require('./webrtc-star/test-valid-connection.js')
}
73 changes: 0 additions & 73 deletions test/webrtc-star/test-dial-and-destroy.js

This file was deleted.

Loading