Skip to content
This repository has been archived by the owner on Oct 19, 2022. It is now read-only.

Commit

Permalink
feat: consolidate select write (#53)
Browse files Browse the repository at this point in the history
* chore: update deps
* chore: remove package lock
  • Loading branch information
jacobheun authored Sep 26, 2019
1 parent 3d9cff9 commit e866e1d
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 17,836 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ node_modules
.nyc_output
coverage
docs
package-lock.json
17,778 changes: 0 additions & 17,778 deletions package-lock.json

This file was deleted.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@
"buffer": "^5.2.1",
"debug": "^4.1.1",
"err-code": "^2.0.0",
"it-length-prefixed": "jacobheun/pull-length-prefixed#v2.0.0-rc.0",
"it-handshake": "^1.0.0",
"it-length-prefixed": "^2.0.0",
"it-pipe": "^1.0.1",
"it-pushable": "^1.3.1",
"it-reader": "jacobheun/it-reader#v2.0.0-rc.0",
"it-reader": "^2.0.0",
"p-defer": "^3.0.0"
},
"devDependencies": {
Expand Down
3 changes: 3 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use strict'

exports.PROTOCOL_ID = '/multistream/1.0.0'
21 changes: 14 additions & 7 deletions src/handle.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,40 @@
const log = require('debug')('mss:handle')
const BufferList = require('bl/BufferList')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')
const handshake = require('it-handshake')
const { PROTOCOL_ID } = require('./constants')

module.exports = async (stream, protocols) => {
protocols = Array.isArray(protocols) ? protocols : [protocols]
const { reader, writer, rest } = toReaderWriter(stream)
const { writer, reader, rest, stream: shakeStream } = handshake(stream)

while (true) {
const protocol = (await multistream.read(reader)).toString()
log('read "%s"', protocol)

if (protocol === PROTOCOL_ID) {
log('respond with "%s" for "%s"', PROTOCOL_ID, protocol)
multistream.write(writer, PROTOCOL_ID)
continue
}

if (protocols.includes(protocol)) {
multistream.write(writer, protocol)
log('write "%s" "%s"', protocol, protocol)
writer.end()
return { stream: rest, protocol }
log('respond with "%s" for "%s"', protocol, protocol)
rest()
return { stream: shakeStream, protocol }
}

if (protocol === 'ls') {
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n\n
multistream.write(writer, new BufferList(
protocols.map(p => multistream.encode(p))
))
log('write "%s" %s', protocol, protocols)
log('respond with "%s" for %s', protocols, protocol)
continue
}

multistream.write(writer, 'na')
log('write "%s" "na"', protocol)
log('respond with "na" for "%s"', protocol)
}
}
11 changes: 4 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
const select = require('./select')
const handle = require('./handle')
const ls = require('./ls')

const PROTOCOL_ID = '/multistream/1.0.0'
const { PROTOCOL_ID } = require('./constants')

exports.PROTOCOL_ID = PROTOCOL_ID

Expand All @@ -24,9 +23,8 @@ class MultistreamSelect {
}

class Dialer extends MultistreamSelect {
async select (protocols) {
await this._handshake()
return select(this._stream, protocols)
select (protocols) {
return select(this._stream, protocols, this._shaken ? null : PROTOCOL_ID)
}

async ls () {
Expand All @@ -40,8 +38,7 @@ class Dialer extends MultistreamSelect {
exports.Dialer = Dialer

class Listener extends MultistreamSelect {
async handle (protocols) {
await this._handshake()
handle (protocols) {
return handle(this._stream, protocols)
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
const Reader = require('it-reader')
const log = require('debug')('it-multistream-select:ls')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')
const handshake = require('it-handshake')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')

module.exports = async stream => {
const { reader, writer, rest } = toReaderWriter(stream)
const { reader, writer, rest, stream: shakeStream } = handshake(stream)

log('write "ls"')
multistream.write(writer, 'ls')
writer.end()
rest()

// Next message from remote will be (e.g. for 2 protocols):
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
Expand All @@ -35,5 +35,5 @@ module.exports = async stream => {
}
)

return { stream: rest, protocols }
return { stream: shakeStream, protocols }
}
6 changes: 6 additions & 0 deletions src/multistream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ async function oneChunk (source) {

exports.encode = buffer => lp.encode.single(new BufferList([buffer, NewLine]))

// `write` encodes and writes a single buffer
exports.write = (writer, buffer) => writer.push(exports.encode(buffer))

// `writeAll` behaves like `write`, except it encodes an array of items as a single write
exports.writeAll = (writer, buffers) => {
writer.push(buffers.reduce((bl, buffer) => bl.append(exports.encode(buffer)), new BufferList()))
}

exports.read = async reader => {
let byteLength = 1 // Read single byte chunks until the length is known
const varByteSource = { // No return impl - we want the reader to remain readable
Expand Down
41 changes: 33 additions & 8 deletions src/select.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,49 @@
const log = require('debug')('mss:select')
const errCode = require('err-code')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')
const handshake = require('it-handshake')

module.exports = async (stream, protocols) => {
module.exports = async (stream, protocols, protocolId) => {
protocols = Array.isArray(protocols) ? protocols : [protocols]
const { reader, writer, rest } = toReaderWriter(stream)
const { reader, writer, rest, stream: shakeStream } = handshake(stream)

const protocol = protocols.shift()
if (protocolId) {
log('select: write ["%s", "%s"]', protocolId, protocol)
multistream.writeAll(writer, [protocolId, protocol])
} else {
log('select: write "%s"', protocol)
multistream.write(writer, protocol)
}

let response = (await multistream.read(reader)).toString()
log('select: read "%s"', response)

// Read the protocol response if we got the protocolId in return
if (response === protocolId) {
response = (await multistream.read(reader)).toString()
log('select: read "%s"', response)
}

// We're done
if (response === protocol) {
rest()
return { stream: shakeStream, protocol }
}

// We haven't gotten a valid ack, try the other protocols
for (const protocol of protocols) {
log('write "%s"', protocol)
log('select: write "%s"', protocol)
multistream.write(writer, protocol)
const response = (await multistream.read(reader)).toString()
log('read "%s" "%s"', protocol, response)
log('select: read "%s" for "%s"', response, protocol)

if (response === protocol) {
writer.end() // End our writer so others can start writing to stream
return { stream: rest, protocol }
rest() // End our writer so others can start writing to stream
return { stream: shakeStream, protocol }
}
}

writer.end()
rest()
throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL')
}
30 changes: 0 additions & 30 deletions src/to-reader-writer.js

This file was deleted.

0 comments on commit e866e1d

Please sign in to comment.