From 216503873712f08aef644ae6b4c7ff4a30108943 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 21 Dec 2021 09:49:21 +0100 Subject: [PATCH] fix: make mss abortable (#142) Allows passing an options object to `dialer.select` that can contain an `AbortSignal` that will throw if the operation is aborted due to a timeout or other criteria. Also adds types and swaps travis for gh actions. BREAKING CHANGE: This module now has types --- .github/dependabot.yml | 8 ++++ .github/workflows/test.yml | 78 ++++++++++++++++++++++++++++++++++++++ .travis.yml | 50 ------------------------ README.md | 45 +++++++++++++++++----- package.json | 16 +++++--- src/handle.js | 22 +++++++++-- src/index.js | 48 ++++++++++++++++++----- src/ls.js | 35 ++++++++++++++--- src/multistream.js | 72 ++++++++++++++++++++++++++++++----- src/select.js | 32 +++++++++++++--- src/types.ts | 5 +++ test/integration.spec.js | 4 +- test/listener.spec.js | 4 +- test/multistream.spec.js | 21 ++++++++-- tsconfig.json | 9 +++++ 15 files changed, 340 insertions(+), 109 deletions(-) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/test.yml delete mode 100644 .travis.yml create mode 100644 src/types.ts create mode 100644 tsconfig.json diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..de46e32 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,8 @@ +version: 2 +updates: +- package-ecosystem: npm + directory: "/" + schedule: + interval: daily + time: "11:00" + open-pull-requests-limit: 10 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..91b33ea --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,78 @@ +name: ci +on: + push: + branches: + - master + pull_request: + branches: + - master + +jobs: + check: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - run: npm install + - run: npm run lint + - run: npm run build + - run: npm run dep-check + test-node: + needs: check + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [windows-latest, ubuntu-latest, macos-latest] + node: [16] + fail-fast: true + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v2 + with: + node-version: ${{ matrix.node }} + - run: npm install + - run: npm run build + - run: npm run test:node -- --bail --cov + - uses: codecov/codecov-action@v1 + test-chrome: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v2 + with: + node-version: 16 + - run: npm install + - run: npm run build + - run: npm run test:browser -- -t browser -t webworker --bail + test-firefox: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v2 + with: + node-version: 16 + - run: npm install + - run: npm run build + - run: npm run test:browser -- -t browser -t webworker --bail -- --browser firefox + test-electron-main: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v2 + with: + node-version: 16 + - run: npm install + - run: npm run build + - run: npx xvfb-maybe npm run test:electron --bail + test-electron-renderer: + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v2 + with: + node-version: 16 + - run: npm install + - run: npx xvfb-maybe aegir test -t electron-renderer --bail diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index ad24ec5..0000000 --- a/.travis.yml +++ /dev/null @@ -1,50 +0,0 @@ -language: node_js -cache: npm -dist: bionic - -branches: - only: - - master - - /^release\/.*$/ - -stages: - - check - - test - - cov - -node_js: - - 'lts/*' - - 'node' - -os: - - linux - - osx - - windows - -script: npx nyc -s npm run test:node -- --bail -after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov - -jobs: - include: - - os: windows - cache: false - - - stage: check - script: - - npx aegir dep-check - - npm run lint - - - stage: test - name: chrome - addons: - chrome: stable - script: npx aegir test -t browser -t webworker - - - stage: test - name: firefox - addons: - firefox: latest - script: npx aegir test -t browser -t webworker -- --browser firefox - -notifications: - email: false \ No newline at end of file diff --git a/README.md b/README.md index 2c5615f..3c9a65a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# js-multistream-select +# js-multistream-select [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai) [![](https://img.shields.io/badge/project-multiformats-blue.svg?style=flat-square)](https://github.com/multiformats/multiformats) @@ -10,21 +10,40 @@ > JavaScript implementation of [multistream-select](https://github.com/multiformats/multistream-select) -## Lead Maintainer +## Lead Maintainer [Jacob Heun](https://github.com/jacobheun) -## Table of Contents +## Table of Contents - [Background](#background) - - [What is multistream-select?](#what-is-multistream-select) + - [What is `multistream-select`?](#what-is-multistream-select) - [Select a protocol flow](#select-a-protocol-flow) - [Install](#install) - [Usage](#usage) - - [Dialer](#dialer) - - [Listener](#listener) + - [Dialer](#dialer) + - [Listener](#listener) - [API](#api) -- [Maintainers](#maintainers) + - [`new MSS.Dialer(duplex)`](#new-mssdialerduplex) + - [Parameters](#parameters) + - [Returns](#returns) + - [Examples](#examples) + - [`dialer.select(protocols, [options])`](#dialerselectprotocols-options) + - [Parameters](#parameters-1) + - [Returns](#returns-1) + - [Examples](#examples-1) + - [`dialer.ls([options])`](#dialerlsoptions) + - [Parameters](#parameters-2) + - [Returns](#returns-2) + - [Examples](#examples-2) + - [`new MSS.Listener(duplex)`](#new-msslistenerduplex) + - [Parameters](#parameters-3) + - [Returns](#returns-3) + - [Examples](#examples-3) + - [`listener.handle(protocols, [options])`](#listenerhandleprotocols-options) + - [Parameters](#parameters-4) + - [Returns](#returns-4) + - [Examples](#examples-4) - [Contribute](#contribute) - [License](#license) @@ -164,13 +183,14 @@ A new multistream select dialer instance. const dialer = new MSS.Dialer(duplex) ``` -### `dialer.select(protocols)` +### `dialer.select(protocols, [options])` Negotiate a protocol to use from a list of protocols. #### Parameters * `protocols` (`String[]`/`String`) - A list of protocols (or single protocol) to negotiate with. Protocols are attempted in order until a match is made. +* `options` (`{ signal: AbortSignal }`) - an options object containing an AbortSignal #### Returns @@ -189,10 +209,14 @@ const { stream, protocol } = await dialer.select([ // Now talk `protocol` on `stream` ``` -### `dialer.ls()` +### `dialer.ls([options])` List protocols that the remote supports. +#### Parameters + +* `options` (`{ signal: AbortSignal }`) - an options object containing an AbortSignal + #### Returns `String[]` - A list of all the protocols the remote supports. @@ -228,13 +252,14 @@ A new multistream select listener instance. const listener = new MSS.Listener(duplex) ``` -### `listener.handle(protocols)` +### `listener.handle(protocols, [options])` Handle multistream protocol selections for the given list of protocols. #### Parameters * `protocols` (`String[]`/`String`) - A list of protocols (or single protocol) that this listener is able to speak. +* `options` (`{ signal: AbortSignal }`) - an options object containing an AbortSignal #### Returns diff --git a/package.json b/package.json index 85dd535..f4ddd2c 100644 --- a/package.json +++ b/package.json @@ -4,21 +4,25 @@ "leadMaintainer": "Jacob Heun ", "description": "JavaScript implementation of multistream-select", "main": "src/index.js", + "types": "dist/src/index.d.ts", "files": [ "dist", "src" ], "scripts": { - "lint": "aegir lint", + "lint": "aegir ts -p check && aegir lint", "build": "aegir build", "test": "aegir test", "test:node": "aegir test -t node", "test:browser": "aegir test -t browser", "test:webworker": "aegir test -t webworker", + "test:electron": "aegir test -t electron-main", + "test:electron-renderer": "aegir test -t electron-renderer", "coverage": "nyc --reporter=text --reporter=lcov npm run test:node", "release": "aegir release -t node -t browser", "release-minor": "aegir release --type minor -t node -t browser", - "release-major": "aegir release --type major -t node -t browser" + "release-major": "aegir release --type major -t node -t browser", + "dep-check": "aegir dep-check" }, "repository": { "type": "git", @@ -40,6 +44,7 @@ }, "homepage": "https://github.com/multiformats/js-multistream-select#readme", "dependencies": { + "abortable-iterator": "^3.0.0", "bl": "^5.0.0", "debug": "^4.1.1", "err-code": "^3.0.1", @@ -53,13 +58,12 @@ }, "devDependencies": { "@types/bl": "^5.0.1", - "aegir": "^35.0.1", - "chai": "^4.3.4", - "dirty-chai": "^2.0.1", + "@types/debug": "^4.1.7", + "aegir": "^36.1.1", "it-pair": "^1.0.0", - "mocha": "^8.1.1", "p-timeout": "^4.0.0", "streaming-iterables": "^5.0.2", + "timeout-abort-controller": "^2.0.0", "util": "^0.12.4", "varint": "^6.0.0" }, diff --git a/src/handle.js b/src/handle.js index 3fef764..9655082 100644 --- a/src/handle.js +++ b/src/handle.js @@ -1,17 +1,32 @@ 'use strict' -const log = require('debug')('mss:handle') +const debug = require('debug') const BufferList = require('bl/BufferList') const multistream = require('./multistream') +// @ts-expect-error no types const handshake = require('it-handshake') const { PROTOCOL_ID } = require('./constants') -module.exports = async (stream, protocols) => { +const log = Object.assign(debug('mss:handle'), { + error: debug('mss:handle:error') +}) + +/** + * @typedef {import('./types').DuplexStream} DuplexStream + */ + +/** + * @param {DuplexStream} stream + * @param {string | string[]} protocols + * @param {object} [options] + * @param {AbortSignal} options.signal + */ +module.exports = async function handle (stream, protocols, options) { protocols = Array.isArray(protocols) ? protocols : [protocols] const { writer, reader, rest, stream: shakeStream } = handshake(stream) while (true) { - const protocol = (await multistream.read(reader)).toString() + const protocol = (await multistream.read(reader, options)).toString() log('read "%s"', protocol) if (protocol === PROTOCOL_ID) { @@ -30,6 +45,7 @@ module.exports = async (stream, protocols) => { if (protocol === 'ls') { // \n\n\n multistream.write(writer, new BufferList( + // @ts-expect-error BufferList does not accept Uint8Array[] as a constructor arg protocols.map(p => multistream.encode(p)) )) log('respond with "%s" for %s', protocols, protocol) diff --git a/src/index.js b/src/index.js index 33fabde..e6a5691 100644 --- a/src/index.js +++ b/src/index.js @@ -7,29 +7,52 @@ const { PROTOCOL_ID } = require('./constants') exports.PROTOCOL_ID = PROTOCOL_ID +/** + * @typedef {import('./types').DuplexStream} DuplexStream + */ + class MultistreamSelect { + /** + * @param {DuplexStream} stream + */ constructor (stream) { this._stream = stream this._shaken = false } - // Perform the multistream-select handshake - async _handshake () { + /** + * Perform the multistream-select handshake + * + * @param {object} [options] + * @param {AbortSignal} options.signal + */ + async _handshake (options) { if (this._shaken) return - const { stream } = await select(this._stream, PROTOCOL_ID) + const { stream } = await select(this._stream, PROTOCOL_ID, undefined, options) this._stream = stream this._shaken = true } } class Dialer extends MultistreamSelect { - select (protocols) { - return select(this._stream, protocols, this._shaken ? null : PROTOCOL_ID) + /** + * @param {string | string[]} protocols + * @param {object} [options] + * @param {AbortSignal} options.signal + */ + select (protocols, options) { + return select(this._stream, protocols, this._shaken ? undefined : PROTOCOL_ID, options) } - async ls () { - await this._handshake() - const { stream, protocols } = await ls(this._stream) + /** + * @param {object} [options] + * @param {AbortSignal} options.signal + */ + async ls (options) { + await this._handshake(options) + /** @type {{ stream: DuplexStream, protocols: string[] }} */ + const res = await ls(this._stream, options) + const { stream, protocols } = res this._stream = stream return protocols } @@ -38,8 +61,13 @@ class Dialer extends MultistreamSelect { exports.Dialer = Dialer class Listener extends MultistreamSelect { - handle (protocols) { - return handle(this._stream, protocols) + /** + * @param {string | string[]} protocols + * @param {object} [options] + * @param {AbortSignal} options.signal + */ + handle (protocols, options) { + return handle(this._stream, protocols, options) } } diff --git a/src/ls.js b/src/ls.js index 07d0850..ea7d66b 100644 --- a/src/ls.js +++ b/src/ls.js @@ -1,13 +1,29 @@ 'use strict' +// @ts-expect-error no types const Reader = require('it-reader') -const log = require('debug')('it-multistream-select:ls') +const debug = require('debug') const multistream = require('./multistream') +// @ts-expect-error no types const handshake = require('it-handshake') const lp = require('it-length-prefixed') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') -module.exports = async stream => { +const log = Object.assign(debug('mss:ls'), { + error: debug('mss:ls:error') +}) + +/** + * @typedef {import('./types').DuplexStream} DuplexStream + * @typedef {import('bl/BufferList')} BufferList + */ + +/** + * @param {DuplexStream} stream + * @param {object} [options] + * @param {AbortSignal} options.signal + */ +module.exports = async function ls (stream, options) { const { reader, writer, rest, stream: shakeStream } = handshake(stream) log('write "ls"') @@ -16,18 +32,22 @@ module.exports = async stream => { // Next message from remote will be (e.g. for 2 protocols): // \n\n - const res = await multistream.read(reader) + const res = await multistream.read(reader, options) // After reading response we have: // \n\n const protocolsReader = Reader([res]) + + /** + * @type {string[]} + */ const protocols = [] // Decode each of the protocols from the reader await pipe( protocolsReader, lp.decode(), - async source => { + async (/** @type {AsyncIterable} */ source) => { for await (const protocol of source) { // Remove the newline protocols.push(protocol.shallowSlice(0, -1).toString()) @@ -35,5 +55,8 @@ module.exports = async stream => { } ) - return { stream: shakeStream, protocols } + /** @type {{ stream: DuplexStream, protocols: string[] }} */ + const output = { stream: shakeStream, protocols } + + return output } diff --git a/src/multistream.js b/src/multistream.js index 7c9982b..d154082 100644 --- a/src/multistream.js +++ b/src/multistream.js @@ -2,33 +2,78 @@ const BufferList = require('bl/BufferList') const lp = require('it-length-prefixed') -const pipe = require('it-pipe') +const { pipe } = require('it-pipe') const errCode = require('err-code') const { fromString: uint8ArrayFromString } = require('uint8arrays/from-string') const first = require('it-first') +const { source } = require('abortable-iterator') + +/** + * @typedef {import('it-pushable').Pushable} Pushable + */ const NewLine = uint8ArrayFromString('\n') -exports.encode = buffer => lp.encode.single(new BufferList([buffer, NewLine])) +/** + * @param {Uint8Array | BufferList | string} buffer + * @returns {Uint8Array} + */ +function encode (buffer) { + // @ts-expect-error BufferList accepts Buffer[], we pass Uint8Array[] + return lp.encode.single(new BufferList([buffer, NewLine])) +} -// `write` encodes and writes a single buffer -exports.write = (writer, buffer) => writer.push(exports.encode(buffer)) +/** + * `write` encodes and writes a single buffer + * + * @param {Pushable} writer + * @param {Uint8Array | BufferList | string} buffer + */ +function write (writer, buffer) { + writer.push(encode(buffer)) +} -// `writeAll` behaves like `write`, except it encodes an array of items as a single write -exports.writeAll = (writer, buffers) => { - writer.push(buffers.reduce((bl, buffer) => bl.append(exports.encode(buffer)), new BufferList())) +/** + * `writeAll` behaves like `write`, except it encodes an array of items as a single write + * + * @param {Pushable} writer + * @param {(Uint8Array | BufferList | string)[]} buffers + */ +async function writeAll (writer, buffers) { + // @ts-expect-error BufferList cannot append Uint8Arrays + writer.push(buffers.reduce((bl, buffer) => bl.append(encode(buffer)), new BufferList())) } -exports.read = async reader => { +/** + * @param {AsyncGenerator} reader + * @param {object} [options] + * @param {AbortSignal} options.signal + */ +async function read (reader, options) { let byteLength = 1 // Read single byte chunks until the length is known const varByteSource = { // No return impl - we want the reader to remain readable [Symbol.asyncIterator] () { return this }, next: () => reader.next(byteLength) } + /** @type {AsyncIterable} */ + let input = varByteSource + + // If we have been passed an abort signal, wrap the input source in an abortable + // iterator that will throw if the operation is aborted + if (options && options.signal) { + input = source(varByteSource, options.signal) + } + // Once the length has been parsed, read chunk for that length - const onLength = l => { byteLength = l } - const buf = await pipe(varByteSource, lp.decode({ onLength }), first) + const onLength = (/** @type {number} */ l) => { byteLength = l } + + /** @type {BufferList} */ + const buf = await pipe( + input, + lp.decode({ onLength }), + first + ) if (buf.get(buf.length - 1) !== NewLine[0]) { throw errCode(new Error('missing newline'), 'ERR_INVALID_MULTISTREAM_SELECT_MESSAGE') @@ -36,3 +81,10 @@ exports.read = async reader => { return buf.shallowSlice(0, -1) // Remove newline } + +module.exports = { + encode, + write, + writeAll, + read +} diff --git a/src/select.js b/src/select.js index 59af498..c2796b6 100644 --- a/src/select.js +++ b/src/select.js @@ -1,15 +1,37 @@ 'use strict' -const log = require('debug')('mss:select') +const debug = require('debug') const errCode = require('err-code') const multistream = require('./multistream') +// @ts-expect-error no types const handshake = require('it-handshake') -module.exports = async (stream, protocols, protocolId) => { +const log = Object.assign(debug('mss:select'), { + error: debug('mss:select:error') +}) + +/** + * @typedef {import('./types').DuplexStream} DuplexStream + * @typedef {import('bl/BufferList')} BufferList + */ + +/** + * @param {DuplexStream} stream + * @param {string | string[]} protocols + * @param {string} [protocolId] + * @param {object} [options] + * @param {AbortSignal} options.signal + */ +module.exports = async function select (stream, protocols, protocolId, options) { protocols = Array.isArray(protocols) ? [...protocols] : [protocols] const { reader, writer, rest, stream: shakeStream } = handshake(stream) const protocol = protocols.shift() + + if (!protocol) { + throw new Error('At least one protocol must be specified') + } + if (protocolId) { log('select: write ["%s", "%s"]', protocolId, protocol) multistream.writeAll(writer, [protocolId, protocol]) @@ -18,12 +40,12 @@ module.exports = async (stream, protocols, protocolId) => { multistream.write(writer, protocol) } - let response = (await multistream.read(reader)).toString() + let response = (await multistream.read(reader, options)).toString() log('select: read "%s"', response) // Read the protocol response if we got the protocolId in return if (response === protocolId) { - response = (await multistream.read(reader)).toString() + response = (await multistream.read(reader, options)).toString() log('select: read "%s"', response) } @@ -37,7 +59,7 @@ module.exports = async (stream, protocols, protocolId) => { for (const protocol of protocols) { log('select: write "%s"', protocol) multistream.write(writer, protocol) - const response = (await multistream.read(reader)).toString() + const response = (await multistream.read(reader, options)).toString() log('select: read "%s" for "%s"', response, protocol) if (response === protocol) { diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..d994e7b --- /dev/null +++ b/src/types.ts @@ -0,0 +1,5 @@ + +export interface DuplexStream { + source: AsyncIterable + sink: (source: AsyncIterable) => void +} diff --git a/test/integration.spec.js b/test/integration.spec.js index eea721a..ad8f832 100644 --- a/test/integration.spec.js +++ b/test/integration.spec.js @@ -1,9 +1,7 @@ 'use strict' /* eslint-env mocha */ -const chai = require('chai') -chai.use(require('dirty-chai')) -const { expect } = chai +const { expect } = require('aegir/utils/chai') const pipe = require('it-pipe') const { collect } = require('streaming-iterables') const BufferList = require('bl/BufferList') diff --git a/test/listener.spec.js b/test/listener.spec.js index b8a385c..9cde013 100644 --- a/test/listener.spec.js +++ b/test/listener.spec.js @@ -1,9 +1,7 @@ 'use strict' /* eslint-env mocha */ -const chai = require('chai') -chai.use(require('dirty-chai')) -const { expect } = chai +const { expect } = require('aegir/utils/chai') const pipe = require('it-pipe') const BufferList = require('bl/BufferList') const Reader = require('it-reader') diff --git a/test/multistream.spec.js b/test/multistream.spec.js index d019108..c1e7fb2 100644 --- a/test/multistream.spec.js +++ b/test/multistream.spec.js @@ -1,9 +1,7 @@ 'use strict' /* eslint-env mocha */ -const chai = require('chai') -chai.use(require('dirty-chai')) -const { expect } = chai +const { expect } = require('aegir/utils/chai') const Varint = require('varint') const BufferList = require('bl/BufferList') const Reader = require('it-reader') @@ -85,5 +83,22 @@ describe('Multistream', () => { const err = await throwsAsync(Multistream.read(reader)) expect(err.code).to.equal('ERR_INVALID_MULTISTREAM_SELECT_MESSAGE') }) + + it('should be abortable', async () => { + const input = uint8ArrayFromString(`TEST${Date.now()}`) + + const reader = Reader([uint8ArrayConcat([ + Uint8Array.from(Varint.encode(input.length + 1)), // +1 to include newline + input, + uint8ArrayFromString('\n') + ])]) + + const controller = new AbortController() + controller.abort() + + await expect(Multistream.read(reader, { + signal: controller.signal + })).to.eventually.be.rejected().with.property('code', 'ABORT_ERR') + }) }) }) diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..5fe8ea4 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src" + ] +}