diff --git a/.aegir.cjs b/.aegir.cjs index a8682e0..63ea976 100644 --- a/.aegir.cjs +++ b/.aegir.cjs @@ -11,19 +11,21 @@ module.exports = { const protocol = '/echo/1.0.0' const registrar = mockRegistrar() - registrar.handle(protocol, (evt) => { + registrar.handle(protocol, ({ stream }) => { void pipe( - evt.detail.stream, - evt.detail.stream + stream, + stream ) }) const upgrader = mockUpgrader({ registrar }) - const ws = new WebSockets({ upgrader }) + const ws = new WebSockets() const ma = new Multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - const listener = ws.createListener() + const listener = ws.createListener({ + upgrader + }) await listener.listen(ma) listener.addEventListener('error', (evt) => { console.error(evt.detail) diff --git a/package.json b/package.json index bde42a4..43cb301 100644 --- a/package.json +++ b/package.json @@ -152,10 +152,10 @@ "release": "semantic-release" }, "dependencies": { - "@libp2p/logger": "^1.0.2", - "@libp2p/utils": "^1.0.0", - "@multiformats/mafmt": "^11.0.1", - "@multiformats/multiaddr": "^10.0.0", + "@libp2p/logger": "^1.1.2", + "@libp2p/utils": "^1.0.9", + "@multiformats/mafmt": "^11.0.2", + "@multiformats/multiaddr": "^10.1.5", "@multiformats/multiaddr-to-uri": "^9.0.0", "abortable-iterator": "^4.0.2", "err-code": "^3.0.1", @@ -165,19 +165,18 @@ "wherearewe": "^1.0.0" }, "devDependencies": { - "@libp2p/interface-compliance-tests": "^1.1.2", - "@libp2p/interfaces": "^1.3.2", + "@libp2p/interface-compliance-tests": "^1.1.17", + "@libp2p/interfaces": "^1.3.14", "@types/ws": "^8.2.2", "aegir": "^36.1.3", "is-loopback-addr": "^2.0.1", "it-all": "^1.0.6", "it-drain": "^1.0.5", "it-goodbye": "^4.0.1", - "it-pipe": "^2.0.2", + "it-pipe": "^2.0.3", "it-take": "^1.0.2", "p-wait-for": "^4.1.0", - "uint8arrays": "^3.0.0", - "util": "^0.12.3" + "uint8arrays": "^3.0.0" }, "browser": { "./dist/src/listener.js": "./dist/src/listener.browser.js" diff --git a/src/index.ts b/src/index.ts index 3b7e75e..eaa8b42 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,35 +7,37 @@ import env from 'wherearewe' import { createListener } from './listener.js' import { socketToMaConn } from './socket-to-conn.js' import * as filters from './filters.js' -import type { Transport, Upgrader, MultiaddrFilter } from '@libp2p/interfaces/transport' +import { Transport, MultiaddrFilter, symbol, CreateListenerOptions, DialOptions } from '@libp2p/interfaces/transport' import type { AbortOptions } from '@libp2p/interfaces' -import type { WebSocketListenerOptions } from './listener.js' import type { Multiaddr } from '@multiformats/multiaddr' -import type { DuplexWebSocket } from 'it-ws/dist/src/duplex' +import type { DuplexWebSocket } from 'it-ws/duplex' +import type { ClientOptions } from 'ws' +import type { Server } from 'http' const log = logger('libp2p:websockets') -/** - * @class WebSockets - */ -export class WebSockets implements Transport { - private readonly upgrader: Upgrader - private readonly _filter?: MultiaddrFilter +export interface WebSocketsInit extends AbortOptions, WebSocketOptions { + filter?: MultiaddrFilter + websocket?: ClientOptions + server?: Server +} - constructor (opts: { upgrader: Upgrader, filter?: MultiaddrFilter }) { - const { upgrader, filter } = opts +export class WebSockets implements Transport { + private readonly init?: WebSocketsInit - if (upgrader == null) { - throw new Error('An upgrader must be provided. See https://github.com/libp2p/js-libp2p-interfaces/tree/master/packages/libp2p-interfaces/src/transport#upgrader') - } + constructor (init?: WebSocketsInit) { + this.init = init + } - this.upgrader = upgrader - this._filter = filter + get [Symbol.toStringTag] () { + return this.constructor.name } - [Symbol.toStringTag] = 'WebSockets' + get [symbol] (): true { + return true + } - async dial (ma: Multiaddr, options?: AbortOptions & WebSocketOptions) { + async dial (ma: Multiaddr, options: DialOptions) { log('dialing %s', ma) options = options ?? {} @@ -43,12 +45,12 @@ export class WebSockets implements Transport { + async _connect (ma: Multiaddr, options: AbortOptions): Promise { if (options?.signal?.aborted === true) { throw new AbortError() } @@ -62,7 +64,7 @@ export class WebSockets implements Transport implements Listener private listeningMultiaddr?: Multiaddr private readonly server: WebSocketServer - constructor (upgrader: Upgrader, options: WebSocketListenerOptions) { + constructor (init: WebSocketListenerInit) { super() // Keep track of open connections to destroy when the listener is closed @@ -26,7 +27,7 @@ class WebSocketListener extends EventEmitter implements Listener const self = this // eslint-disable-line @typescript-eslint/no-this-alias this.server = createServer({ - ...options, + ...init, onConnection: (stream: DuplexWebSocket) => { const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0)) log('new inbound connection %s', maConn.remoteAddr) @@ -38,15 +39,15 @@ class WebSocketListener extends EventEmitter implements Listener }) try { - void upgrader.upgradeInbound(maConn) + void init.upgrader.upgradeInbound(maConn) .then((conn) => { log('inbound connection %s upgraded', maConn.remoteAddr) - if (options?.handler != null) { - options?.handler(conn) + if (init?.handler != null) { + init?.handler(conn) } - self.dispatchEvent(new CustomEvent('connection', { + self.dispatchEvent(new CustomEvent('connection', { detail: conn })) }) @@ -149,12 +150,10 @@ class WebSocketListener extends EventEmitter implements Listener } } -export interface WebSocketListenerOptions extends ListenerOptions { +export interface WebSocketListenerInit extends CreateListenerOptions { server?: Server } -export function createListener (upgrader: Upgrader, options?: WebSocketListenerOptions): Listener { - options = options ?? {} - - return new WebSocketListener(upgrader, options) +export function createListener (init: WebSocketListenerInit): Listener { + return new WebSocketListener(init) } diff --git a/test/browser.ts b/test/browser.ts index 3df3ca1..85128fa 100644 --- a/test/browser.ts +++ b/test/browser.ts @@ -18,8 +18,8 @@ describe('libp2p-websockets', () => { let conn: Connection beforeEach(async () => { - ws = new WebSockets({ upgrader: mockUpgrader() }) - conn = await ws.dial(ma) + ws = new WebSockets() + conn = await ws.dial(ma, { upgrader: mockUpgrader() }) }) afterEach(async () => { @@ -86,6 +86,6 @@ describe('libp2p-websockets', () => { }) it('.createServer throws in browser', () => { - expect(new WebSockets({ upgrader: mockUpgrader() }).createListener).to.throw() + expect(new WebSockets().createListener).to.throw() }) }) diff --git a/test/compliance.node.ts b/test/compliance.node.ts index e034cca..4abeab9 100644 --- a/test/compliance.node.ts +++ b/test/compliance.node.ts @@ -5,17 +5,12 @@ import { Multiaddr } from '@multiformats/multiaddr' import http from 'http' import { WebSockets } from '../src/index.js' import * as filters from '../src/filters.js' -import type { WebSocketListenerOptions } from '../src/listener.js' +import type { WebSocketListenerInit } from '../src/listener.js' describe('interface-transport compliance', () => { tests({ - async setup (args) { - if (args == null) { - throw new Error('No args') - } - - const { upgrader } = args - const ws = new WebSockets({ upgrader, filter: filters.all }) + async setup () { + const ws = new WebSockets({ filter: filters.all }) const addrs = [ new Multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), new Multiaddr('/ip4/127.0.0.1/tcp/9092/ws'), @@ -24,9 +19,7 @@ describe('interface-transport compliance', () => { ] let delayMs = 0 - const delayedCreateListener = (options?: WebSocketListenerOptions) => { - options = options ?? {} - + const delayedCreateListener = (options: WebSocketListenerInit) => { // A server that will delay the upgrade event by delayMs options.server = new Proxy(http.createServer(), { get (server, prop) { diff --git a/test/node.ts b/test/node.ts index 4c7b6ef..1ba6d0a 100644 --- a/test/node.ts +++ b/test/node.ts @@ -26,7 +26,7 @@ void registrar.handle(protocol, (evt) => { void pipe([ uint8ArrayFromString('hey') ], - evt.detail.stream, + evt.stream, drain ) }) @@ -36,7 +36,7 @@ const upgrader = mockUpgrader({ describe('instantiate the transport', () => { it('create', () => { - const ws = new WebSockets({ upgrader }) + const ws = new WebSockets() expect(ws).to.exist() }) }) @@ -45,17 +45,20 @@ describe('listen', () => { it('should close connections when stopping the listener', async () => { const ma = new Multiaddr('/ip4/127.0.0.1/tcp/47382/ws') - const ws = new WebSockets({ upgrader }) + const ws = new WebSockets() const listener = ws.createListener({ handler: (conn) => { void conn.newStream([protocol]).then(async ({ stream }) => { return await pipe(stream, stream) }) - } + }, + upgrader }) await listener.listen(ma) - const conn = await ws.dial(ma) + const conn = await ws.dial(ma, { + upgrader + }) const { stream } = await conn.newStream([protocol]) void pipe(stream, stream) @@ -70,7 +73,7 @@ describe('listen', () => { let listener: Listener beforeEach(() => { - ws = new WebSockets({ upgrader }) + ws = new WebSockets() }) afterEach(async () => { @@ -78,12 +81,12 @@ describe('listen', () => { }) it('listen, check for promise', async () => { - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(ma) }) it('listen, check for listening event', (done) => { - listener = ws.createListener() + listener = ws.createListener({ upgrader }) listener.addEventListener('listening', () => { done() @@ -93,7 +96,7 @@ describe('listen', () => { }) it('listen, check for the close event', (done) => { - const listener = ws.createListener() + const listener = ws.createListener({ upgrader }) listener.addEventListener('listening', () => { listener.addEventListener('close', () => done()) @@ -105,14 +108,14 @@ describe('listen', () => { it('listen on addr with /ipfs/QmHASH', async () => { const ma = new Multiaddr('/ip4/127.0.0.1/tcp/47382/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(ma) }) it('listen on port 0', async () => { const ma = new Multiaddr('/ip4/127.0.0.1/tcp/0/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(ma) const addrs = await listener.getAddrs() @@ -121,7 +124,7 @@ describe('listen', () => { it('listen on any Interface', async () => { const ma = new Multiaddr('/ip4/0.0.0.0/tcp/0/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(ma) const addrs = await listener.getAddrs() @@ -129,7 +132,7 @@ describe('listen', () => { }) it('getAddrs', async () => { - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(ma) const addrs = await listener.getAddrs() expect(addrs.length).to.equal(1) @@ -138,7 +141,7 @@ describe('listen', () => { it('getAddrs on port 0 listen', async () => { const addr = new Multiaddr('/ip4/127.0.0.1/tcp/0/ws') - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(addr) const addrs = await listener.getAddrs() expect(addrs.length).to.equal(1) @@ -147,7 +150,7 @@ describe('listen', () => { it('getAddrs from listening on 0.0.0.0', async () => { const addr = new Multiaddr('/ip4/0.0.0.0/tcp/47382/ws') - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(addr) const addrs = await listener.getAddrs() expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') @@ -155,7 +158,7 @@ describe('listen', () => { it('getAddrs from listening on 0.0.0.0 and port 0', async () => { const addr = new Multiaddr('/ip4/0.0.0.0/tcp/0/ws') - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(addr) const addrs = await listener.getAddrs() expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') @@ -164,7 +167,7 @@ describe('listen', () => { it('getAddrs preserves p2p Id', async () => { const ma = new Multiaddr('/ip4/127.0.0.1/tcp/47382/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - listener = ws.createListener() + listener = ws.createListener({ upgrader }) await listener.listen(ma) const addrs = await listener.getAddrs() @@ -178,17 +181,17 @@ describe('listen', () => { const ma = new Multiaddr('/ip6/::1/tcp/9091/ws') beforeEach(() => { - ws = new WebSockets({ upgrader }) + ws = new WebSockets() }) it('listen, check for promise', async () => { - const listener = ws.createListener() + const listener = ws.createListener({ upgrader }) await listener.listen(ma) await listener.close() }) it('listen, check for listening event', (done) => { - const listener = ws.createListener() + const listener = ws.createListener({ upgrader }) listener.addEventListener('listening', () => { void listener.close().then(done, done) @@ -198,7 +201,7 @@ describe('listen', () => { }) it('listen, check for the close event', (done) => { - const listener = ws.createListener() + const listener = ws.createListener({ upgrader }) listener.addEventListener('listening', () => { listener.addEventListener('close', () => done()) @@ -210,7 +213,7 @@ describe('listen', () => { it('listen on addr with /ipfs/QmHASH', async () => { const ma = new Multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener() + const listener = ws.createListener({ upgrader }) await listener.listen(ma) await listener.close() }) @@ -224,15 +227,15 @@ describe('dial', () => { const ma = new Multiaddr('/ip4/127.0.0.1/tcp/9091/ws') beforeEach(async () => { - ws = new WebSockets({ upgrader }) - listener = ws.createListener() + ws = new WebSockets() + listener = ws.createListener({ upgrader }) return await listener.listen(ma) }) afterEach(async () => await listener.close()) it('dial', async () => { - const conn = await ws.dial(ma) + const conn = await ws.dial(ma, { upgrader }) const { stream } = await conn.newStream([protocol]) await expect(all(stream.source)).to.eventually.deep.equal([uint8ArrayFromString('hey')]) @@ -241,7 +244,7 @@ describe('dial', () => { it('dial with p2p Id', async () => { const ma = new Multiaddr('/ip4/127.0.0.1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = await ws.dial(ma) + const conn = await ws.dial(ma, { upgrader }) const { stream } = await conn.newStream([protocol]) await expect(all(stream.source)).to.eventually.deep.equal([uint8ArrayFromString('hey')]) @@ -252,7 +255,7 @@ describe('dial', () => { const ma = new Multiaddr('/ip4/127.0.0.1/tcp/0/ws') const controller = new AbortController() - const conn = ws.dial(ma, { signal: controller.signal }) + const conn = ws.dial(ma, { signal: controller.signal, upgrader }) controller.abort() await expect(conn).to.eventually.be.rejected() @@ -260,12 +263,12 @@ describe('dial', () => { it('should resolve port 0', async () => { const ma = new Multiaddr('/ip4/127.0.0.1/tcp/0/ws') - const ws = new WebSockets({ upgrader }) + const ws = new WebSockets() // Create a Promise that resolves when a connection is handled const deferred = defer() - const listener = ws.createListener({ handler: deferred.resolve }) + const listener = ws.createListener({ handler: deferred.resolve, upgrader }) // Listen on the multiaddr await listener.listen(ma) @@ -274,7 +277,7 @@ describe('dial', () => { expect(localAddrs.length).to.equal(1) // Dial to that address - await ws.dial(localAddrs[0]) + await ws.dial(localAddrs[0], { upgrader }) // Wait for the incoming dial to be handled await deferred.promise @@ -290,13 +293,14 @@ describe('dial', () => { const ma = new Multiaddr('/ip4/0.0.0.0/tcp/0/ws') beforeEach(async () => { - ws = new WebSockets({ upgrader }) + ws = new WebSockets() listener = ws.createListener({ handler: (conn) => { void conn.newStream([protocol]).then(async ({ stream }) => { return await pipe(stream, stream) }) - } + }, + upgrader }) return await listener.listen(ma) }) @@ -311,7 +315,7 @@ describe('dial', () => { }) // Dial first no loopback address - const conn = await ws.dial(addrs[0]) + const conn = await ws.dial(addrs[0], { upgrader }) const s = goodbye({ source: [uint8ArrayFromString('hey')], sink: all }) const { stream } = await conn.newStream([protocol]) @@ -330,14 +334,14 @@ describe('dial', () => { cert: fs.readFileSync('./test/fixtures/certificate.pem'), key: fs.readFileSync('./test/fixtures/key.pem') }) - ws = new WebSockets({ upgrader }) + ws = new WebSockets({ websocket: { rejectUnauthorized: false }, server }) listener = ws.createListener({ - server, handler: (conn) => { void conn.newStream([protocol]).then(async ({ stream }) => { return await pipe(stream, stream) }) - } + }, + upgrader }) return await listener.listen(ma) }) @@ -355,7 +359,7 @@ describe('dial', () => { }) it('dial ip4', async () => { - const conn = await ws.dial(ma, { websocket: { rejectUnauthorized: false } }) + const conn = await ws.dial(ma, { upgrader }) const s = goodbye({ source: [uint8ArrayFromString('hey')], sink: all }) const { stream } = await conn.newStream([protocol]) @@ -372,13 +376,14 @@ describe('dial', () => { const ma = new Multiaddr('/ip6/::1/tcp/9091/ws') beforeEach(async () => { - ws = new WebSockets({ upgrader }) + ws = new WebSockets() listener = ws.createListener({ handler: (conn) => { void conn.newStream([protocol]).then(async ({ stream }) => { return await pipe(stream, stream) }) - } + }, + upgrader }) return await listener.listen(ma) }) @@ -386,7 +391,7 @@ describe('dial', () => { afterEach(async () => await listener.close()) it('dial ip6', async () => { - const conn = await ws.dial(ma) + const conn = await ws.dial(ma, { upgrader }) const s = goodbye({ source: [uint8ArrayFromString('hey')], sink: all }) const { stream } = await conn.newStream([protocol]) @@ -395,7 +400,7 @@ describe('dial', () => { it('dial with p2p Id', async () => { const ma = new Multiaddr('/ip6/::1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = await ws.dial(ma) + const conn = await ws.dial(ma, { upgrader }) const s = goodbye({ source: [uint8ArrayFromString('hey')], @@ -413,7 +418,7 @@ describe('filter addrs', () => { describe('default filter addrs with only dns', () => { before(() => { - ws = new WebSockets({ upgrader }) + ws = new WebSockets() }) it('should filter out invalid WS addresses', function () { @@ -481,7 +486,7 @@ describe('filter addrs', () => { describe('custom filter addrs', () => { before(() => { - ws = new WebSockets({ upgrader, filter: filters.all }) + ws = new WebSockets({ filter: filters.all }) }) it('should fail invalid WS addresses', function () {