Skip to content
This repository has been archived by the owner on Oct 19, 2022. It is now read-only.

feat: consolidate select write #53

Merged
merged 4 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ node_modules
.nyc_output
coverage
docs
package-lock.json
17,778 changes: 0 additions & 17,778 deletions package-lock.json

This file was deleted.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@
"buffer": "^5.2.1",
"debug": "^4.1.1",
"err-code": "^2.0.0",
"it-length-prefixed": "jacobheun/pull-length-prefixed#v2.0.0-rc.0",
"it-handshake": "^1.0.0",
"it-length-prefixed": "^2.0.0",
"it-pipe": "^1.0.1",
"it-pushable": "^1.3.1",
"it-reader": "jacobheun/it-reader#v2.0.0-rc.0",
"it-reader": "^2.0.0",
"p-defer": "^3.0.0"
},
"devDependencies": {
Expand Down
3 changes: 3 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use strict'

exports.PROTOCOL_ID = '/multistream/1.0.0'
21 changes: 14 additions & 7 deletions src/handle.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,40 @@
const log = require('debug')('mss:handle')
const BufferList = require('bl/BufferList')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')
const handshake = require('it-handshake')
const { PROTOCOL_ID } = require('./constants')

module.exports = async (stream, protocols) => {
protocols = Array.isArray(protocols) ? protocols : [protocols]
const { reader, writer, rest } = toReaderWriter(stream)
const { writer, reader, rest, stream: shakeStream } = handshake(stream)

while (true) {
const protocol = (await multistream.read(reader)).toString()
log('read "%s"', protocol)

if (protocol === PROTOCOL_ID) {
log('respond with "%s" for "%s"', PROTOCOL_ID, protocol)
multistream.write(writer, PROTOCOL_ID)
continue
}

if (protocols.includes(protocol)) {
multistream.write(writer, protocol)
log('write "%s" "%s"', protocol, protocol)
writer.end()
return { stream: rest, protocol }
log('respond with "%s" for "%s"', protocol, protocol)
rest()
return { stream: shakeStream, protocol }
}

if (protocol === 'ls') {
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n\n
multistream.write(writer, new BufferList(
protocols.map(p => multistream.encode(p))
))
log('write "%s" %s', protocol, protocols)
log('respond with "%s" for %s', protocols, protocol)
continue
}

multistream.write(writer, 'na')
log('write "%s" "na"', protocol)
log('respond with "na" for "%s"', protocol)
}
}
11 changes: 4 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
const select = require('./select')
const handle = require('./handle')
const ls = require('./ls')

const PROTOCOL_ID = '/multistream/1.0.0'
const { PROTOCOL_ID } = require('./constants')

exports.PROTOCOL_ID = PROTOCOL_ID

Expand All @@ -24,9 +23,8 @@ class MultistreamSelect {
}

class Dialer extends MultistreamSelect {
async select (protocols) {
await this._handshake()
return select(this._stream, protocols)
select (protocols) {
return select(this._stream, protocols, this._shaken ? null : PROTOCOL_ID)
}

async ls () {
Expand All @@ -40,8 +38,7 @@ class Dialer extends MultistreamSelect {
exports.Dialer = Dialer

class Listener extends MultistreamSelect {
async handle (protocols) {
await this._handshake()
handle (protocols) {
return handle(this._stream, protocols)
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/ls.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
const Reader = require('it-reader')
const log = require('debug')('it-multistream-select:ls')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')
const handshake = require('it-handshake')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')

module.exports = async stream => {
const { reader, writer, rest } = toReaderWriter(stream)
const { reader, writer, rest, stream: shakeStream } = handshake(stream)

log('write "ls"')
multistream.write(writer, 'ls')
writer.end()
rest()

// Next message from remote will be (e.g. for 2 protocols):
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n
Expand All @@ -35,5 +35,5 @@ module.exports = async stream => {
}
)

return { stream: rest, protocols }
return { stream: shakeStream, protocols }
}
6 changes: 6 additions & 0 deletions src/multistream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ async function oneChunk (source) {

exports.encode = buffer => lp.encode.single(new BufferList([buffer, NewLine]))

// `write` encodes and writes a single buffer
exports.write = (writer, buffer) => writer.push(exports.encode(buffer))

// `writeAll` behaves like `write`, except it encodes an array of items as a single write
exports.writeAll = (writer, buffers) => {
writer.push(buffers.reduce((bl, buffer) => bl.append(exports.encode(buffer)), new BufferList()))
}

exports.read = async reader => {
let byteLength = 1 // Read single byte chunks until the length is known
const varByteSource = { // No return impl - we want the reader to remain readable
Expand Down
41 changes: 33 additions & 8 deletions src/select.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,49 @@
const log = require('debug')('mss:select')
const errCode = require('err-code')
const multistream = require('./multistream')
const toReaderWriter = require('./to-reader-writer')
const handshake = require('it-handshake')

module.exports = async (stream, protocols) => {
module.exports = async (stream, protocols, protocolId) => {
protocols = Array.isArray(protocols) ? protocols : [protocols]
const { reader, writer, rest } = toReaderWriter(stream)
const { reader, writer, rest, stream: shakeStream } = handshake(stream)

const protocol = protocols.shift()
if (protocolId) {
log('select: write ["%s", "%s"]', protocolId, protocol)
multistream.writeAll(writer, [protocolId, protocol])
} else {
log('select: write "%s"', protocol)
multistream.write(writer, protocol)
}

let response = (await multistream.read(reader)).toString()
log('select: read "%s"', response)

// Read the protocol response if we got the protocolId in return
if (response === protocolId) {
response = (await multistream.read(reader)).toString()
log('select: read "%s"', response)
}

// We're done
if (response === protocol) {
rest()
return { stream: shakeStream, protocol }
}

// We haven't gotten a valid ack, try the other protocols
for (const protocol of protocols) {
log('write "%s"', protocol)
log('select: write "%s"', protocol)
multistream.write(writer, protocol)
const response = (await multistream.read(reader)).toString()
log('read "%s" "%s"', protocol, response)
log('select: read "%s" for "%s"', response, protocol)

if (response === protocol) {
writer.end() // End our writer so others can start writing to stream
return { stream: rest, protocol }
rest() // End our writer so others can start writing to stream
return { stream: shakeStream, protocol }
}
}

writer.end()
rest()
throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL')
}
30 changes: 0 additions & 30 deletions src/to-reader-writer.js

This file was deleted.