diff --git a/packages/connection-encrypter-plaintext/package.json b/packages/connection-encrypter-plaintext/package.json index c42ae291e1..394c599fff 100644 --- a/packages/connection-encrypter-plaintext/package.json +++ b/packages/connection-encrypter-plaintext/package.json @@ -63,10 +63,9 @@ }, "devDependencies": { "@libp2p/crypto": "^5.0.6", - "@libp2p/interface-compliance-tests": "^6.1.8", "@libp2p/logger": "^5.1.3", - "@multiformats/multiaddr": "^12.2.3", "aegir": "^44.0.1", + "it-pair": "^2.0.6", "protons": "^7.5.0", "sinon": "^18.0.0" }, diff --git a/packages/connection-encrypter-plaintext/test/index.spec.ts b/packages/connection-encrypter-plaintext/test/index.spec.ts index 3ae8de7beb..7a0a4eacce 100644 --- a/packages/connection-encrypter-plaintext/test/index.spec.ts +++ b/packages/connection-encrypter-plaintext/test/index.spec.ts @@ -1,27 +1,22 @@ /* eslint-env mocha */ import { generateKeyPair } from '@libp2p/crypto/keys' -import { mockMultiaddrConnPair } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' -import { peerIdFromMultihash, peerIdFromPrivateKey } from '@libp2p/peer-id' -import { multiaddr } from '@multiformats/multiaddr' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { expect } from 'aegir/chai' +import { duplexPair } from 'it-pair/duplex' import sinon from 'sinon' import { plaintext } from '../src/index.js' import type { ConnectionEncrypter, PeerId } from '@libp2p/interface' describe('plaintext', () => { let localPeer: PeerId - let remotePeer: PeerId let wrongPeer: PeerId let encrypter: ConnectionEncrypter let encrypterRemote: ConnectionEncrypter beforeEach(async () => { - [remotePeer, wrongPeer] = await Promise.all([ - peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - peerIdFromPrivateKey(await generateKeyPair('Ed25519')) - ]) + wrongPeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) const localKeyPair = await generateKeyPair('Ed25519') localPeer = peerIdFromPrivateKey(localKeyPair) @@ -41,13 +36,7 @@ describe('plaintext', () => { }) it('should verify the public key and id match', async () => { - const { inbound, outbound } = mockMultiaddrConnPair({ - remotePeer, - addrs: [ - multiaddr('/ip4/127.0.0.1/tcp/1234'), - multiaddr('/ip4/127.0.0.1/tcp/1235') - ] - }) + const [inbound, outbound] = duplexPair() await Promise.all([ encrypter.secureInbound(inbound), @@ -62,21 +51,13 @@ describe('plaintext', () => { it('should fail if the peer does not provide its public key', async () => { const keyPair = await generateKeyPair('RSA', 512) - const peer = peerIdFromPrivateKey(keyPair) - remotePeer = peerIdFromMultihash(peer.toMultihash()) encrypter = plaintext()({ privateKey: keyPair, logger: defaultLogger() }) - const { inbound, outbound } = mockMultiaddrConnPair({ - remotePeer, - addrs: [ - multiaddr('/ip4/127.0.0.1/tcp/1234'), - multiaddr('/ip4/127.0.0.1/tcp/1235') - ] - }) + const [inbound, outbound] = duplexPair() await expect(Promise.all([ encrypter.secureInbound(inbound), diff --git a/packages/connection-encrypter-tls/package.json b/packages/connection-encrypter-tls/package.json index 1a8ca8a6c8..40da424ec3 100644 --- a/packages/connection-encrypter-tls/package.json +++ b/packages/connection-encrypter-tls/package.json @@ -63,12 +63,14 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "@libp2p/interface-compliance-tests": "^6.1.8", "@libp2p/logger": "^5.1.3", - "@multiformats/multiaddr": "^12.2.3", "aegir": "^44.0.1", + "it-pair": "^2.0.6", "protons": "^7.5.0", "sinon": "^18.0.0" }, + "browser": { + "./dist/src/tls.js": "./dist/src/tls.browser.js" + }, "sideEffects": false } diff --git a/packages/connection-encrypter-tls/src/tls.browser.ts b/packages/connection-encrypter-tls/src/tls.browser.ts new file mode 100644 index 0000000000..a7078170cf --- /dev/null +++ b/packages/connection-encrypter-tls/src/tls.browser.ts @@ -0,0 +1,27 @@ +import { serviceCapabilities } from '@libp2p/interface' +import { PROTOCOL } from './index.js' +import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, SecureConnectionOptions } from '@libp2p/interface' +import type { Duplex } from 'it-stream-types' +import type { Uint8ArrayList } from 'uint8arraylist' + +export class TLS implements ConnectionEncrypter { + public protocol: string = PROTOCOL + + constructor () { + throw new Error('TLS encryption is not possible in browsers') + } + + readonly [Symbol.toStringTag] = '@libp2p/tls' + + readonly [serviceCapabilities]: string[] = [ + '@libp2p/connection-encryption' + ] + + async secureInbound > = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise> { + throw new Error('TLS encryption is not possible in browsers') + } + + async secureOutbound > = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise> { + throw new Error('TLS encryption is not possible in browsers') + } +} diff --git a/packages/connection-encrypter-tls/test/index.spec.ts b/packages/connection-encrypter-tls/test/index.spec.ts index 0a529d8c45..8637f0ee66 100644 --- a/packages/connection-encrypter-tls/test/index.spec.ts +++ b/packages/connection-encrypter-tls/test/index.spec.ts @@ -1,11 +1,10 @@ /* eslint-env mocha */ import { generateKeyPair } from '@libp2p/crypto/keys' -import { mockMultiaddrConnPair } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' import { peerIdFromMultihash, peerIdFromPrivateKey } from '@libp2p/peer-id' -import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' +import { duplexPair } from 'it-pair/duplex' import sinon from 'sinon' import { tls } from '../src/index.js' import type { ConnectionEncrypter, PeerId } from '@libp2p/interface' @@ -36,13 +35,7 @@ describe('tls', () => { }) it('should verify the public key and id match', async () => { - const { inbound, outbound } = mockMultiaddrConnPair({ - remotePeer, - addrs: [ - multiaddr('/ip4/127.0.0.1/tcp/1234'), - multiaddr('/ip4/127.0.0.1/tcp/1235') - ] - }) + const [inbound, outbound] = duplexPair() await Promise.all([ encrypter.secureInbound(inbound, { @@ -67,13 +60,7 @@ describe('tls', () => { logger: defaultLogger() }) - const { inbound, outbound } = mockMultiaddrConnPair({ - remotePeer, - addrs: [ - multiaddr('/ip4/127.0.0.1/tcp/1234'), - multiaddr('/ip4/127.0.0.1/tcp/1235') - ] - }) + const [inbound, outbound] = duplexPair() await expect(Promise.all([ encrypter.secureInbound(inbound, { diff --git a/packages/integration-tests/.aegir.js b/packages/integration-tests/.aegir.js index 9a334a8fdd..45b0d3b2b2 100644 --- a/packages/integration-tests/.aegir.js +++ b/packages/integration-tests/.aegir.js @@ -13,12 +13,13 @@ export default { const { mplex } = await import('@libp2p/mplex') const { noise } = await import('@chainsafe/libp2p-noise') const { yamux } = await import('@chainsafe/libp2p-yamux') - const { WebSockets } = await import('@multiformats/mafmt') + const { WebSockets } = await import('@multiformats/multiaddr-matcher') const { createLibp2p } = await import('libp2p') const { plaintext } = await import('@libp2p/plaintext') const { circuitRelayServer, circuitRelayTransport } = await import('@libp2p/circuit-relay-v2') const { identify } = await import('@libp2p/identify') const { echo } = await import('@libp2p/echo') + const { mockMuxer } = await import('@libp2p/interface-compliance-tests/mocks') const libp2p = await createLibp2p({ connectionManager: { @@ -26,6 +27,7 @@ export default { }, addresses: { listen: [ + '/ip4/127.0.0.1/tcp/0/ws', '/ip4/127.0.0.1/tcp/0/ws' ] }, @@ -35,7 +37,8 @@ export default { ], streamMuxers: [ yamux(), - mplex() + mplex(), + () => mockMuxer() ], connectionEncrypters: [ noise(), @@ -48,17 +51,22 @@ export default { maxReservations: Infinity } }), - echo: echo() + echo: echo({ + maxInboundStreams: 5 + }) } }) const goLibp2pRelay = await createGoLibp2pRelay() + const wsAddresses = libp2p.getMultiaddrs().filter(ma => WebSockets.exactMatch(ma)) return { libp2p, goLibp2pRelay, env: { - RELAY_MULTIADDR: libp2p.getMultiaddrs().filter(ma => WebSockets.matches(ma)).pop(), + RELAY_MULTIADDR: wsAddresses[0], + RELAY_WS_MULTIADDR_0: wsAddresses[0], + RELAY_WS_MULTIADDR_1: wsAddresses[1], GO_RELAY_PEER: goLibp2pRelay.peerId, GO_RELAY_MULTIADDRS: goLibp2pRelay.multiaddrs, GO_RELAY_APIADDR: goLibp2pRelay.apiAddr diff --git a/packages/connection-encrypter-plaintext/test/compliance.spec.ts b/packages/integration-tests/test/compliance/connection-encryption/plaintext.spec.ts similarity index 78% rename from packages/connection-encrypter-plaintext/test/compliance.spec.ts rename to packages/integration-tests/test/compliance/connection-encryption/plaintext.spec.ts index 30570115a3..cf9a824245 100644 --- a/packages/connection-encrypter-plaintext/test/compliance.spec.ts +++ b/packages/integration-tests/test/compliance/connection-encryption/plaintext.spec.ts @@ -3,9 +3,9 @@ import { generateKeyPair } from '@libp2p/crypto/keys' import suite from '@libp2p/interface-compliance-tests/connection-encryption' import { defaultLogger } from '@libp2p/logger' -import { plaintext } from '../src/index.js' +import { plaintext } from '@libp2p/plaintext' -describe('plaintext compliance', () => { +describe('plaintext connection encrypter interface compliance', () => { suite({ async setup (opts) { return plaintext()({ diff --git a/packages/connection-encrypter-tls/test/compliance.spec.ts b/packages/integration-tests/test/compliance/connection-encryption/tls.spec.ts similarity index 67% rename from packages/connection-encrypter-tls/test/compliance.spec.ts rename to packages/integration-tests/test/compliance/connection-encryption/tls.spec.ts index ed31f94411..3388654cb4 100644 --- a/packages/connection-encrypter-tls/test/compliance.spec.ts +++ b/packages/integration-tests/test/compliance/connection-encryption/tls.spec.ts @@ -3,9 +3,14 @@ import { generateKeyPair } from '@libp2p/crypto/keys' import suite from '@libp2p/interface-compliance-tests/connection-encryption' import { defaultLogger } from '@libp2p/logger' -import { tls } from '../src/index.js' +import { tls } from '@libp2p/tls' +import { isBrowser, isWebWorker } from 'wherearewe' + +describe('tls connection encrypter interface compliance', () => { + if (isBrowser || isWebWorker) { + return + } -describe('tls compliance', () => { suite({ async setup (opts) { return tls()({ diff --git a/packages/integration-tests/test/compliance/transport/memory.spec.ts b/packages/integration-tests/test/compliance/transport/memory.spec.ts new file mode 100644 index 0000000000..d92f19b2e1 --- /dev/null +++ b/packages/integration-tests/test/compliance/transport/memory.spec.ts @@ -0,0 +1,19 @@ +import tests from '@libp2p/interface-compliance-tests/transport' +import { memory } from '@libp2p/memory' +import { multiaddr } from '@multiformats/multiaddr' +import type { Multiaddr } from '@multiformats/multiaddr' + +describe('memory transport interface compliance tests', () => { + tests({ + async setup () { + const transport = memory() + const dialAddrs: [Multiaddr, Multiaddr] = [ + multiaddr('/memory/addr-1'), + multiaddr('/memory/addr-2') + ] + + return { transport, dialAddrs } + }, + async teardown () {} + }) +}) diff --git a/packages/integration-tests/test/compliance/transport/tcp.spec.ts b/packages/integration-tests/test/compliance/transport/tcp.spec.ts new file mode 100644 index 0000000000..10401b8ce7 --- /dev/null +++ b/packages/integration-tests/test/compliance/transport/tcp.spec.ts @@ -0,0 +1,43 @@ +import tests from '@libp2p/interface-compliance-tests/transport' +import { tcp } from '@libp2p/tcp' +import { multiaddr } from '@multiformats/multiaddr' +import { isBrowser, isWebWorker } from 'wherearewe' +import type { Multiaddr } from '@multiformats/multiaddr' + +describe('tcp transport interface compliance IPv4', () => { + if (isBrowser || isWebWorker) { + return + } + + tests({ + async setup () { + const transport = tcp() + const dialAddrs: [Multiaddr, Multiaddr] = [ + multiaddr('/ip4/127.0.0.1/tcp/9091'), + multiaddr('/ip4/127.0.0.1/tcp/9092') + ] + + return { transport, dialAddrs } + }, + async teardown () {} + }) +}) + +describe('tcp transport interface compliance IPv6', () => { + if (isBrowser || isWebWorker) { + return + } + + tests({ + async setup () { + const transport = tcp() + const dialAddrs: [Multiaddr, Multiaddr] = [ + multiaddr('/ip6/::/tcp/9093'), + multiaddr('/ip6/::/tcp/9094') + ] + + return { transport, dialAddrs } + }, + async teardown () {} + }) +}) diff --git a/packages/integration-tests/test/compliance/transport/websockets.spec.ts b/packages/integration-tests/test/compliance/transport/websockets.spec.ts new file mode 100644 index 0000000000..e2a341a265 --- /dev/null +++ b/packages/integration-tests/test/compliance/transport/websockets.spec.ts @@ -0,0 +1,37 @@ +/* eslint-env mocha */ + +import tests from '@libp2p/interface-compliance-tests/transport' +import { webSockets } from '@libp2p/websockets' +import * as filters from '@libp2p/websockets/filters' +import { multiaddr } from '@multiformats/multiaddr' +import { isElectronMain, isNode } from 'wherearewe' +import type { Multiaddr } from '@multiformats/multiaddr' + +describe('websocket transport interface compliance', () => { + tests({ + async setup () { + const dialOnly = !isNode && !isElectronMain + + const transport = webSockets({ filter: filters.all }) + let dialAddrs: [Multiaddr, Multiaddr] = [ + multiaddr('/ip4/127.0.0.1/tcp/9096/ws'), + multiaddr('/ip4/127.0.0.1/tcp/9097/ws') + ] + + if (dialOnly) { + dialAddrs = [ + multiaddr(process.env.RELAY_WS_MULTIADDR_0), + multiaddr(process.env.RELAY_WS_MULTIADDR_1) + ] + } + + return { + transport, + listenAddrs: dialOnly ? undefined : dialAddrs, + dialAddrs, + dialOnly + } + }, + async teardown () {} + }) +}) diff --git a/packages/integration-tests/test/fixtures/utils.ts b/packages/integration-tests/test/fixtures/utils.ts index 87d81c968a..c844f48c20 100644 --- a/packages/integration-tests/test/fixtures/utils.ts +++ b/packages/integration-tests/test/fixtures/utils.ts @@ -63,13 +63,20 @@ export async function hasRelay (node: Libp2p, opts?: PWaitForOptions): P // Wait for peer to be used as a relay await pWaitFor(() => { - const relayAddrs = node.getMultiaddrs().filter(addr => addr.protoNames().includes('p2p-circuit')) + const relayHosts = new Set() + + const relayAddrs = node.getMultiaddrs().filter(addr => { + const options = addr.toOptions() + relayHosts.add(options.host) + + return addr.protoNames().includes('p2p-circuit') + }) if (relayAddrs.length === 0) { return false } - if (relayAddrs.length !== 1) { + if (relayHosts.size !== 1) { throw new Error(`node listening on too many relays - ${relayAddrs.length}`) } diff --git a/packages/interface-compliance-tests/package.json b/packages/interface-compliance-tests/package.json index 1054ef424b..ea54c9ba19 100644 --- a/packages/interface-compliance-tests/package.json +++ b/packages/interface-compliance-tests/package.json @@ -48,10 +48,6 @@ "types": "./dist/src/index.d.ts", "import": "./dist/src/index.js" }, - "./connection": { - "types": "./dist/src/connection/index.d.ts", - "import": "./dist/src/connection/index.js" - }, "./connection-encryption": { "types": "./dist/src/connection-encryption/index.d.ts", "import": "./dist/src/connection-encryption/index.js" @@ -109,12 +105,15 @@ }, "dependencies": { "@libp2p/crypto": "^5.0.6", + "@libp2p/echo": "^2.1.1", "@libp2p/interface": "^2.2.0", "@libp2p/interface-internal": "^2.0.10", "@libp2p/logger": "^5.1.3", + "@libp2p/memory": "^0.0.0", "@libp2p/multistream-select": "^6.0.8", "@libp2p/peer-collections": "^6.0.10", "@libp2p/peer-id": "^5.0.7", + "@libp2p/plaintext": "^2.0.10", "@libp2p/utils": "^6.1.3", "@multiformats/multiaddr": "^12.2.3", "abortable-iterator": "^5.0.1", @@ -131,12 +130,14 @@ "it-pushable": "^3.2.3", "it-stream-types": "^2.0.1", "it-to-buffer": "^4.0.7", + "libp2p": "^2.2.1", "merge-options": "^3.0.4", "p-defer": "^4.0.1", "p-event": "^6.0.1", "p-limit": "^6.0.0", "p-wait-for": "^5.0.2", "protons-runtime": "^5.4.0", + "race-signal": "^1.1.0", "sinon": "^18.0.0", "tdigest": "^0.1.2", "uint8arraylist": "^2.4.8", diff --git a/packages/interface-compliance-tests/src/connection/index.ts b/packages/interface-compliance-tests/src/connection/index.ts deleted file mode 100644 index 657b53ff1f..0000000000 --- a/packages/interface-compliance-tests/src/connection/index.ts +++ /dev/null @@ -1,166 +0,0 @@ -import { expect } from 'aegir/chai' -import sinon from 'sinon' -import type { TestSetup } from '../index.js' -import type { Connection } from '@libp2p/interface' - -export default (test: TestSetup): void => { - describe('connection', () => { - describe('open connection', () => { - let connection: Connection - - beforeEach(async () => { - connection = await test.setup() - }) - - afterEach(async () => { - await connection.close() - await test.teardown() - }) - - it('should have properties set', () => { - expect(connection.id).to.exist() - expect(connection.remotePeer).to.exist() - expect(connection.remoteAddr).to.exist() - expect(connection.status).to.equal('open') - expect(connection.timeline.open).to.exist() - expect(connection.timeline.close).to.not.exist() - expect(connection.direction).to.exist() - expect(connection.streams).to.eql([]) - expect(connection.tags).to.eql([]) - }) - - it('should get the metadata of an open connection', () => { - expect(connection.status).to.equal('open') - expect(connection.direction).to.exist() - expect(connection.timeline.open).to.exist() - expect(connection.timeline.close).to.not.exist() - }) - - it('should return an empty array of streams', () => { - const streams = connection.streams - - expect(streams).to.eql([]) - }) - - it('should be able to create a new stream', async () => { - expect(connection.streams).to.be.empty() - - const protocolToUse = '/echo/0.0.1' - const stream = await connection.newStream([protocolToUse]) - - expect(stream).to.have.property('protocol', protocolToUse) - - const connStreams = connection.streams - - expect(stream).to.exist() - expect(connStreams).to.exist() - expect(connStreams).to.have.lengthOf(1) - expect(connStreams[0]).to.equal(stream) - }) - }) - - describe('close connection', () => { - let connection: Connection - let timelineProxy - const proxyHandler = { - set () { - // @ts-expect-error - TS fails to infer here - return Reflect.set(...arguments) - } - } - - beforeEach(async () => { - timelineProxy = new Proxy({ - open: Date.now() - 10, - upgraded: Date.now() - }, proxyHandler) - - connection = await test.setup() - connection.timeline = timelineProxy - }) - - afterEach(async () => { - await test.teardown() - }) - - it('should be able to close the connection after being created', async () => { - expect(connection.timeline.close).to.not.exist() - await connection.close() - - expect(connection.timeline.close).to.exist() - expect(connection.status).to.equal('closed') - }) - - it('should be able to close the connection after opening a stream', async () => { - // Open stream - const protocol = '/echo/0.0.1' - await connection.newStream([protocol]) - - // Close connection - expect(connection.timeline.close).to.not.exist() - await connection.close() - - expect(connection.timeline.close).to.exist() - expect(connection.status).to.equal('closed') - }) - - it('should properly track streams', async () => { - // Open stream - const protocol = '/echo/0.0.1' - const stream = await connection.newStream([protocol]) - expect(stream).to.have.property('protocol', protocol) - - // Close stream - await stream.close() - - expect(connection.streams.filter(s => s.id === stream.id)).to.be.empty() - }) - - it('should track outbound streams', async () => { - // Open stream - const protocol = '/echo/0.0.1' - const stream = await connection.newStream(protocol) - expect(stream).to.have.property('direction', 'outbound') - }) - - it('should support a proxy on the timeline', async () => { - sinon.spy(proxyHandler, 'set') - expect(connection.timeline.close).to.not.exist() - - await connection.close() - // @ts-expect-error - fails to infer callCount - expect(proxyHandler.set.callCount).to.equal(1) - // @ts-expect-error - fails to infer getCall - const [obj, key, value] = proxyHandler.set.getCall(0).args - expect(obj).to.eql(connection.timeline) - expect(key).to.equal('close') - expect(value).to.be.a('number').that.equals(connection.timeline.close) - }) - - it('should fail to create a new stream if the connection is closing', async () => { - expect(connection.timeline.close).to.not.exist() - const p = connection.close() - - try { - const protocol = '/echo/0.0.1' - await connection.newStream([protocol]) - } catch (err: any) { - expect(err).to.exist() - return - } finally { - await p - } - - throw new Error('should fail to create a new stream if the connection is closing') - }) - - it('should fail to create a new stream if the connection is closed', async () => { - expect(connection.timeline.close).to.not.exist() - await connection.close() - - await expect(connection.newStream(['/echo/0.0.1'])).to.eventually.be.rejected - .with.property('name', 'ConnectionClosedError') - }) - }) - }) -} diff --git a/packages/interface-compliance-tests/src/matchers.ts b/packages/interface-compliance-tests/src/matchers.ts index d4f56c4a4a..2850a86db1 100644 --- a/packages/interface-compliance-tests/src/matchers.ts +++ b/packages/interface-compliance-tests/src/matchers.ts @@ -2,10 +2,16 @@ import Sinon from 'sinon' import type { PeerId } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' +/** + * @deprecated PeerIds can be passed to sinon matchers directly + */ export function matchPeerId (peerId: PeerId): Sinon.SinonMatcher { return Sinon.match(p => p.toString() === peerId.toString()) } +/** + * @deprecated Multiaddrs can be passed to sinon matchers directly + */ export function matchMultiaddr (ma: Multiaddr): Sinon.SinonMatcher { return Sinon.match(m => m.toString() === ma.toString()) } diff --git a/packages/interface-compliance-tests/src/mocks/muxer.ts b/packages/interface-compliance-tests/src/mocks/muxer.ts index 96f2218faa..3e1d08aae2 100644 --- a/packages/interface-compliance-tests/src/mocks/muxer.ts +++ b/packages/interface-compliance-tests/src/mocks/muxer.ts @@ -27,9 +27,15 @@ interface ResetMessage { direction: Direction } -interface CloseMessage { +interface CloseWriteMessage { id: string - type: 'close' + type: 'closeWrite' + direction: Direction +} + +interface CloseReadMessage { + id: string + type: 'closeRead' direction: Direction } @@ -39,7 +45,7 @@ interface CreateMessage { direction: 'outbound' } -type StreamMessage = DataMessage | ResetMessage | CloseMessage | CreateMessage +type StreamMessage = DataMessage | ResetMessage | CloseWriteMessage | CloseReadMessage | CreateMessage export interface MockMuxedStreamInit extends AbstractStreamInit { push: Pushable @@ -84,16 +90,21 @@ class MuxedStream extends AbstractStream { } sendCloseWrite (): void { - const closeMsg: CloseMessage = { + const closeMsg: CloseWriteMessage = { id: this.id, - type: 'close', + type: 'closeWrite', direction: this.direction } this.push.push(closeMsg) } sendCloseRead (): void { - // does not support close read, only close write + const closeMsg: CloseReadMessage = { + id: this.id, + type: 'closeRead', + direction: this.direction + } + this.push.push(closeMsg) } } @@ -153,10 +164,10 @@ class MockMuxer implements StreamMuxer { } ) - this.log('muxed stream ended') + this.log('muxer ended') this.input.end() } catch (err: any) { - this.log('muxed stream errored', err) + this.log.error('muxer errored - %e', err) this.input.end(err) } } @@ -192,9 +203,12 @@ class MockMuxer implements StreamMuxer { } else if (message.type === 'reset') { this.log('-> reset stream %s %s', muxedStream.direction, muxedStream.id) muxedStream.reset() - } else if (message.type === 'close') { - this.log('-> closing stream %s %s', muxedStream.direction, muxedStream.id) + } else if (message.type === 'closeWrite') { + this.log('-> closing writeable end of stream %s %s', muxedStream.direction, muxedStream.id) muxedStream.remoteCloseWrite() + } else if (message.type === 'closeRead') { + this.log('-> closing readable end of stream %s %s', muxedStream.direction, muxedStream.id) + muxedStream.remoteCloseRead() } } diff --git a/packages/interface-compliance-tests/src/transport/dial-test.ts b/packages/interface-compliance-tests/src/transport/dial-test.ts deleted file mode 100644 index 942c74771b..0000000000 --- a/packages/interface-compliance-tests/src/transport/dial-test.ts +++ /dev/null @@ -1,125 +0,0 @@ -import { TypedEventEmitter } from '@libp2p/interface' -import { expect } from 'aegir/chai' -import all from 'it-all' -import drain from 'it-drain' -import { pipe } from 'it-pipe' -import sinon from 'sinon' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { isValidTick } from '../is-valid-tick.js' -import { mockUpgrader, mockRegistrar } from '../mocks/index.js' -import type { TransportTestFixtures, Connector } from './index.js' -import type { TestSetup } from '../index.js' -import type { Listener, Transport, Upgrader } from '@libp2p/interface' -import type { Registrar } from '@libp2p/interface-internal' -import type { Multiaddr } from '@multiformats/multiaddr' - -export default (common: TestSetup): void => { - describe('dial', () => { - let upgrader: Upgrader - let registrar: Registrar - let listenAddrs: Multiaddr[] - let dialAddrs: Multiaddr[] - let dialer: Transport - let listener: Transport - let connector: Connector - let listen: Listener - - before(async () => { - registrar = mockRegistrar() - upgrader = mockUpgrader({ - registrar, - events: new TypedEventEmitter() - }); - - ({ listenAddrs, dialAddrs, dialer, listener, connector } = await common.setup()) - }) - - after(async () => { - await common.teardown() - }) - - beforeEach(async () => { - listen = listener.createListener({ - upgrader - }) - await listen.listen(listenAddrs[0]) - }) - - afterEach(async () => { - sinon.restore() - connector.restore() - await listen.close() - }) - - it('simple', async () => { - const protocol = '/hello/1.0.0' - void registrar.handle(protocol, (data) => { - void pipe([ - uint8ArrayFromString('hey') - ], - data.stream, - drain - ) - }) - - const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound') - const conn = await dialer.dial(dialAddrs[0], { - upgrader - }) - - const stream = await conn.newStream([protocol]) - const result = await all(stream.source) - - expect(upgradeSpy.callCount).to.equal(1) - await expect(upgradeSpy.getCall(0).returnValue).to.eventually.equal(conn) - expect(result.length).to.equal(1) - expect(result[0].subarray()).to.equalBytes(uint8ArrayFromString('hey')) - await conn.close() - }) - - it('can close connections', async () => { - const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound') - const conn = await dialer.dial(dialAddrs[0], { - upgrader - }) - - expect(upgradeSpy.callCount).to.equal(1) - await expect(upgradeSpy.getCall(0).returnValue).to.eventually.equal(conn) - await conn.close() - expect(isValidTick(conn.timeline.close)).to.equal(true) - }) - - it('to non existent listener', async () => { - const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound') - - await expect(dialer.dial(dialAddrs[1], { - upgrader - })).to.eventually.be.rejected() - expect(upgradeSpy.callCount).to.equal(0) - }) - - it('abort before dialing throws AbortError', async () => { - const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound') - const controller = new AbortController() - controller.abort() - const conn = dialer.dial(dialAddrs[0], { signal: controller.signal, upgrader }) - - await expect(conn).to.eventually.be.rejected().with.property('name', 'AbortError') - expect(upgradeSpy.callCount).to.equal(0) - }) - - it('abort while dialing throws AbortError', async () => { - const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound') - // Add a delay to connect() so that we can abort while the dial is in - // progress - connector.delay(100) - - const controller = new AbortController() - const conn = dialer.dial(dialAddrs[0], { signal: controller.signal, upgrader }) - setTimeout(() => { controller.abort() }, 50) - - await expect(conn).to.eventually.be.rejected().with.property('name', 'AbortError') - expect(upgradeSpy.callCount).to.equal(0) - }) - }) -} diff --git a/packages/interface-compliance-tests/src/transport/filter-test.ts b/packages/interface-compliance-tests/src/transport/filter-test.ts deleted file mode 100644 index 79c940abde..0000000000 --- a/packages/interface-compliance-tests/src/transport/filter-test.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { expect } from 'aegir/chai' -import type { TransportTestFixtures } from './index.js' -import type { TestSetup } from '../index.js' -import type { Transport } from '@libp2p/interface' -import type { Multiaddr } from '@multiformats/multiaddr' - -export default (common: TestSetup): void => { - describe('filter', () => { - let listenAddrs: Multiaddr[] - let dialAddrs: Multiaddr[] - let dialer: Transport - let listener: Transport - - before(async () => { - ({ listenAddrs, dialAddrs, dialer, listener } = await common.setup()) - }) - - after(async () => { - await common.teardown() - }) - - it('filters listen addresses', () => { - const filteredAddrs = listener.listenFilter(listenAddrs) - expect(filteredAddrs).to.eql(listenAddrs) - }) - - it('filters dial addresses', () => { - const filteredAddrs = dialer.dialFilter(dialAddrs) - expect(filteredAddrs).to.eql(dialAddrs) - }) - }) -} diff --git a/packages/interface-compliance-tests/src/transport/index.ts b/packages/interface-compliance-tests/src/transport/index.ts index 9210ddbb06..c94b5314f7 100644 --- a/packages/interface-compliance-tests/src/transport/index.ts +++ b/packages/interface-compliance-tests/src/transport/index.ts @@ -1,27 +1,415 @@ -import dial from './dial-test.js' -import filter from './filter-test.js' -import listen from './listen-test.js' +import { echo } from '@libp2p/echo' +import { stop } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import delay from 'delay' +import { pEvent } from 'p-event' +import pWaitFor from 'p-wait-for' +import { raceSignal } from 'race-signal' +import { isValidTick } from '../is-valid-tick.js' +import { createPeer, getTransportManager, getUpgrader, slowNetwork } from './utils.js' import type { TestSetup } from '../index.js' -import type { Transport } from '@libp2p/interface' +import type { Echo } from '@libp2p/echo' +import type { Connection, Libp2p, Stream, Transport } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' -export interface Connector { - delay(ms: number): void - restore(): void -} - export interface TransportTestFixtures { - listenAddrs: Multiaddr[] - dialAddrs: Multiaddr[] - dialer: Transport - listener: Transport - connector: Connector + /** + * Addresses that will be used to dial listeners from `listenAddrs` + */ + dialAddrs: [Multiaddr, Multiaddr] + + /** + * Addresses that will be used to create listeners to dial + */ + listenAddrs?: [Multiaddr, Multiaddr] + + /** + * Only run the dial portion of the tests, do not try to create listeners + */ + dialOnly?: boolean + + transport(components: any): Transport } export default (common: TestSetup): void => { describe('interface-transport', () => { - dial(common) - listen(common) - filter(common) + let dialAddrs: Multiaddr[] + let listenAddrs: Multiaddr[] + let transport: (components: any) => Transport + let dialer: Libp2p<{ echo: Echo }> + let listener: Libp2p<{ echo: Echo }> | undefined + let dialOnly: boolean + + beforeEach(async () => { + ({ dialAddrs, listenAddrs = dialAddrs, transport, dialOnly = false } = await common.setup()) + }) + + afterEach(async () => { + await stop(dialer, listener) + await common.teardown() + }) + + it('simple', async () => { + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ] + }) + } + + const input = Uint8Array.from([0, 1, 2, 3, 4]) + const output = await dialer.services.echo.echo(dialAddrs[0], input, { + signal: AbortSignal.timeout(5000) + }) + + expect(output).to.equalBytes(input) + }) + + it('should listen on multiple addresses', async () => { + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [ + listenAddrs[0].toString(), + listenAddrs[1].toString() + ] + }, + transports: [ + transport + ] + }) + } + + const input = Uint8Array.from([0, 1, 2, 3, 4]) + + await expect(dialer.services.echo.echo(dialAddrs[0], input, { + signal: AbortSignal.timeout(5000) + })).to.eventually.deep.equal(input) + + await expect(dialer.services.echo.echo(dialAddrs[1], input, { + signal: AbortSignal.timeout(5000) + })).to.eventually.deep.equal(input) + }) + + it('can close connections', async () => { + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ] + }) + } + + const conn = await dialer.dial(dialAddrs[0], { + signal: AbortSignal.timeout(5000) + }) + + await conn.close() + expect(isValidTick(conn.timeline.close)).to.equal(true) + }) + + it('abort before dialing throws AbortError', async () => { + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ] + }) + } + + const controller = new AbortController() + controller.abort() + + await expect(dialer.dial(dialAddrs[0], { + signal: controller.signal + })).to.eventually.be.rejected() + .with.property('name', 'AbortError') + }) + + it('abort while dialing throws AbortError', async () => { + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ] + }) + } + slowNetwork(dialer, 100) + + const controller = new AbortController() + setTimeout(() => { controller.abort() }, 50) + + await expect(dialer.dial(dialAddrs[0], { + signal: controller.signal + })).to.eventually.be.rejected() + .with.property('name', 'AbortError') + }) + + it('should close all streams when the connection closes', async () => { + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ], + services: { + echo: echo({ + maxInboundStreams: 5 + }) + } + }) + } + + const connection = await dialer.dial(listenAddrs[0]) + let remoteConn: Connection | undefined + + if (listener != null) { + const remoteConnections = listener.getConnections(dialer.peerId) + expect(remoteConnections).to.have.lengthOf(1) + remoteConn = remoteConnections[0] + } + + const streams: Stream[] = [] + + for (let i = 0; i < 5; i++) { + streams.push(await connection.newStream('/echo/1.0.0', { + maxOutboundStreams: 5 + })) + } + + // Close the connection and verify all streams have been closed + await connection.close() + await pWaitFor(() => connection.streams.length === 0) + + if (remoteConn != null) { + await pWaitFor(() => remoteConn.streams.length === 0) + } + + expect(streams.find(stream => stream.status !== 'closed')).to.be.undefined() + }) + + it('should not handle connection if upgradeInbound rejects', async function () { + if (dialOnly) { + return this.skip() + } + + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ] + }) + } + + const upgrader = getUpgrader(listener) + upgrader.upgradeInbound = async () => { + await delay(100) + throw new Error('Oh noes!') + } + + await expect(dialer.dial(listenAddrs[0])).to.eventually.be.rejected + .with.property('name', 'EncryptionFailedError') + + expect(dialer.getConnections()).to.have.lengthOf(0) + + if (listener != null) { + expect(listener.getConnections()).to.have.lengthOf(0) + } + }) + }) + + describe('events', () => { + let listenAddrs: Multiaddr[] + let dialAddrs: Multiaddr[] + let transport: (components: any) => Transport + let dialer: Libp2p<{ echo: Echo }> + let listener: Libp2p<{ echo: Echo }> | undefined + let dialOnly: boolean + + beforeEach(async () => { + ({ dialAddrs, listenAddrs = dialAddrs, transport, dialOnly = false } = await common.setup()) + }) + + afterEach(async () => { + await stop(dialer, listener) + await common.teardown() + }) + + it('emits connection', async function () { + if (dialOnly) { + return this.skip() + } + + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ] + }) + } + + const transportManager = getTransportManager(listener) + const transportListener = transportManager.getListeners()[0] + + const p = pEvent(transportListener, 'connection') + + await expect(dialer.dial(dialAddrs[0])).to.eventually.be.ok() + + await raceSignal(p, AbortSignal.timeout(1000), { + errorMessage: 'Did not emit connection event' + }) + }) + + it('emits listening', async function () { + if (dialOnly) { + return this.skip() + } + + dialer = await createPeer({ + transports: [ + transport + ] + }) + + if (!dialOnly) { + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ] + }) + } + + const transportManager = getTransportManager(listener) + const t = transportManager.dialTransportForMultiaddr(dialAddrs[0]) + + if (t == null) { + throw new Error(`No transport configured for dial address ${dialAddrs[0]}`) + } + + let p: Promise | undefined + const originalCreateListener = t.createListener.bind(t) + + t.createListener = (opts) => { + const listener = originalCreateListener(opts) + p = pEvent(listener, 'listening') + + return listener + } + + await transportManager.listen([ + listenAddrs[1] + ]) + + if (p == null) { + throw new Error('Listener was not created') + } + + await raceSignal(p, AbortSignal.timeout(1000), { + errorMessage: 'Did not emit connection event' + }) + }) + + it('emits close', async function () { + if (dialOnly) { + return this.skip() + } + + dialer = await createPeer({ + transports: [ + transport + ] + }) + listener = await createPeer({ + addresses: { + listen: [listenAddrs[0].toString()] + }, + transports: [ + transport + ] + }) + + const transportManager = getTransportManager(listener) + const transportListener = transportManager.getListeners()[0] + + const p = pEvent(transportListener, 'close') + + await listener.stop() + + await raceSignal(p, AbortSignal.timeout(1000), { + errorMessage: 'Did not emit close event' + }) + }) }) } diff --git a/packages/interface-compliance-tests/src/transport/listen-test.ts b/packages/interface-compliance-tests/src/transport/listen-test.ts deleted file mode 100644 index eadf69db60..0000000000 --- a/packages/interface-compliance-tests/src/transport/listen-test.ts +++ /dev/null @@ -1,192 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -import { TypedEventEmitter } from '@libp2p/interface' -import { expect } from 'aegir/chai' -import drain from 'it-drain' -import { pipe } from 'it-pipe' -import defer from 'p-defer' -import pWaitFor from 'p-wait-for' -import sinon from 'sinon' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { isValidTick } from '../is-valid-tick.js' -import { mockUpgrader, mockRegistrar } from '../mocks/index.js' -import type { TransportTestFixtures } from './index.js' -import type { TestSetup } from '../index.js' -import type { Connection, Transport, Upgrader } from '@libp2p/interface' -import type { Registrar } from '@libp2p/interface-internal' -import type { Multiaddr } from '@multiformats/multiaddr' - -export default (common: TestSetup): void => { - describe('listen', () => { - let upgrader: Upgrader - let listenAddrs: Multiaddr[] - let dialAddrs: Multiaddr[] - let dialer: Transport - let listener: Transport - let registrar: Registrar - - before(async () => { - registrar = mockRegistrar() - upgrader = mockUpgrader({ - registrar, - events: new TypedEventEmitter() - }); - - ({ dialer, listener, listenAddrs, dialAddrs } = await common.setup()) - }) - - after(async () => { - await common.teardown() - }) - - afterEach(() => { - sinon.restore() - }) - - it('simple', async () => { - const listen = listener.createListener({ - upgrader - }) - await listen.listen(listenAddrs[0]) - await listen.close() - }) - - it('close listener with connections, through timeout', async () => { - const upgradeSpy = sinon.spy(upgrader, 'upgradeInbound') - const listenerConns: Connection[] = [] - - const protocol = '/test/protocol' - void registrar.handle(protocol, (data) => { - void drain(data.stream.source) - }) - - const listen = listener.createListener({ - upgrader, - handler: (conn) => { - listenerConns.push(conn) - } - }) - - // Listen - await listen.listen(listenAddrs[0]) - - // Create two connections to the listener - const [conn1] = await Promise.all([ - dialer.dial(dialAddrs[0], { - upgrader - }), - dialer.dial(dialAddrs[0], { - upgrader - }) - ]) - - // Give the listener a chance to finish its upgrade - await pWaitFor(() => listenerConns.length === 2) - - const stream1 = await conn1.newStream([protocol]) - - // Wait for the data send and close to finish - await Promise.all([ - pipe( - [uint8ArrayFromString('Some data that is never handled')], - stream1 - ), - // Closer the listener (will take a couple of seconds to time out) - listen.close() - ]) - - await stream1.close() - await conn1.close() - - expect(isValidTick(conn1.timeline.close)).to.equal(true) - listenerConns.forEach(conn => { - expect(isValidTick(conn.timeline.close)).to.equal(true) - }) - - // 2 dials = 2 connections upgraded - expect(upgradeSpy.callCount).to.equal(2) - }) - - it('should not handle connection if upgradeInbound rejects', async () => { - sinon.stub(upgrader, 'upgradeInbound').rejects() - - const listen = listener.createListener({ - upgrader - }) - - // Listen - await listen.listen(listenAddrs[0]) - - // Create a connection to the listener - const conn = await dialer.dial(dialAddrs[0], { - upgrader - }) - - await pWaitFor(() => typeof conn.timeline.close === 'number') - await listen.close() - }) - - describe('events', () => { - it('connection', async () => { - const upgradeSpy = sinon.spy(upgrader, 'upgradeInbound') - const listen = listener.createListener({ - upgrader - }) - const deferred = defer() - let conn - - listen.addEventListener('connection', (evt) => { - conn = evt.detail - deferred.resolve() - }) - - void (async () => { - await listen.listen(listenAddrs[0]) - await dialer.dial(dialAddrs[0], { - upgrader - }) - })() - - await deferred.promise - - await expect(upgradeSpy.getCall(0).returnValue).to.eventually.equal(conn) - expect(upgradeSpy.callCount).to.equal(1) - await listen.close() - }) - - it('listening', (done) => { - const listen = listener.createListener({ - upgrader - }) - listen.addEventListener('listening', () => { - listen.close().then(done, done) - }) - void listen.listen(listenAddrs[0]) - }) - - it('error', (done) => { - const listen = listener.createListener({ - upgrader - }) - listen.addEventListener('error', (evt) => { - expect(evt.detail).to.be.an.instanceOf(Error) - listen.close().then(done, done) - }) - listen.dispatchEvent(new CustomEvent('error', { - detail: new Error('my err') - })) - }) - - it('close', (done) => { - const listen = listener.createListener({ - upgrader - }) - listen.addEventListener('close', () => { done() }) - - void (async () => { - await listen.listen(listenAddrs[0]) - await listen.close() - })() - }) - }) - }) -} diff --git a/packages/interface-compliance-tests/src/transport/utils.ts b/packages/interface-compliance-tests/src/transport/utils.ts new file mode 100644 index 0000000000..b56d79a348 --- /dev/null +++ b/packages/interface-compliance-tests/src/transport/utils.ts @@ -0,0 +1,76 @@ +/* eslint-env mocha */ + +import { echo } from '@libp2p/echo' +import { memory } from '@libp2p/memory' +import { plaintext } from '@libp2p/plaintext' +import delay from 'delay' +import map from 'it-map' +import { createLibp2p } from 'libp2p' +import { mockMuxer } from '../mocks/muxer.js' +import type { Echo } from '@libp2p/echo' +import type { Libp2p, Upgrader } from '@libp2p/interface' +import type { TransportManager } from '@libp2p/interface-internal' +import type { Libp2pOptions } from 'libp2p' + +export async function createPeer (config: Partial> = {}): Promise> { + const node = await createLibp2p({ + transports: [ + memory() + ], + connectionEncrypters: [ + plaintext() + ], + streamMuxers: [ + () => mockMuxer() + ], + connectionGater: { + denyDialMultiaddr: () => false + }, + ...config, + services: { + echo: echo(), + ...config.services + } + }) + + return node +} + +/** + * Monkey patch the upgrader in the passed libp2p to add latency to any + * multiaddr connections upgraded to connections - this is to work with + * transports that have their own muxers/encrypters and do not support + * connection protection + */ +export function slowNetwork (libp2p: any, latency: number): void { + const upgrader: Upgrader = getUpgrader(libp2p) + + const originalUpgradeInbound = upgrader.upgradeInbound.bind(upgrader) + const originalUpgradeOutbound = upgrader.upgradeOutbound.bind(upgrader) + + upgrader.upgradeInbound = async (maConn, opts) => { + maConn.source = map(maConn.source, async (buf) => { + await delay(latency) + return buf + }) + + return originalUpgradeInbound(maConn, opts) + } + + upgrader.upgradeOutbound = async (maConn, opts) => { + maConn.source = map(maConn.source, async (buf) => { + await delay(latency) + return buf + }) + + return originalUpgradeOutbound(maConn, opts) + } +} + +export function getUpgrader (libp2p: any): Upgrader { + return libp2p.components.upgrader +} + +export function getTransportManager (libp2p: any): TransportManager { + return libp2p.components.transportManager +} diff --git a/packages/interface-compliance-tests/test/mocks/connection.spec.ts b/packages/interface-compliance-tests/test/mocks/connection.spec.ts deleted file mode 100644 index d8c2654e1a..0000000000 --- a/packages/interface-compliance-tests/test/mocks/connection.spec.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { generateKeyPair } from '@libp2p/crypto/keys' -import { peerIdFromPrivateKey } from '@libp2p/peer-id' -import { pipe } from 'it-pipe' -import tests from '../../src/connection/index.js' -import { connectionPair } from '../../src/mocks/connection.js' -import { mockRegistrar } from '../../src/mocks/registrar.js' -import type { Connection } from '@libp2p/interface' - -describe('mock connection compliance tests', () => { - let connections: Connection[] = [] - - tests({ - async setup () { - const privateKeyA = await generateKeyPair('Ed25519') - const componentsA = { - peerId: peerIdFromPrivateKey(privateKeyA), - registrar: mockRegistrar() - } - const privateKeyB = await generateKeyPair('Ed25519') - const componentsB = { - peerId: peerIdFromPrivateKey(privateKeyB), - registrar: mockRegistrar() - } - connections = connectionPair(componentsA, componentsB) - - await componentsB.registrar.handle('/echo/0.0.1', (data) => { - void pipe( - data.stream, - data.stream - ) - }) - - return connections[0] - }, - async teardown () { - await Promise.all(connections.map(async conn => { - await conn.close() - })) - } - }) -}) diff --git a/packages/interface/src/transport/index.ts b/packages/interface/src/transport/index.ts index b0db5d0a42..082c232ad3 100644 --- a/packages/interface/src/transport/index.ts +++ b/packages/interface/src/transport/index.ts @@ -36,7 +36,6 @@ export interface ConnectionHandler { (connection: Connection): void } export interface MultiaddrFilter { (multiaddrs: Multiaddr[]): Multiaddr[] } export interface CreateListenerOptions { - handler?: ConnectionHandler upgrader: Upgrader } diff --git a/packages/transport-memory/src/listener.ts b/packages/transport-memory/src/listener.ts index 9320b3c2a8..6bcefc92a7 100644 --- a/packages/transport-memory/src/listener.ts +++ b/packages/transport-memory/src/listener.ts @@ -120,7 +120,6 @@ export class MemoryTransportListener extends TypedEventEmitter i signal }) .then(connection => { - this.init.handler?.(connection) this.safeDispatchEvent('connection', { detail: connection }) diff --git a/packages/transport-tcp/package.json b/packages/transport-tcp/package.json index 7df5c1edee..b150733fe2 100644 --- a/packages/transport-tcp/package.json +++ b/packages/transport-tcp/package.json @@ -72,13 +72,10 @@ "stream-to-it": "^1.0.1" }, "devDependencies": { - "@libp2p/interface-compliance-tests": "^6.1.8", "@libp2p/logger": "^5.1.3", "aegir": "^44.0.1", - "it-all": "^3.0.6", - "it-pipe": "^3.0.1", "sinon": "^18.0.0", - "uint8arrays": "^5.1.0", + "sinon-ts": "^2.0.0", "wherearewe": "^2.0.1" }, "browser": { diff --git a/packages/transport-tcp/src/tcp.browser.ts b/packages/transport-tcp/src/tcp.browser.ts index 079fda23e4..85da1cac58 100644 --- a/packages/transport-tcp/src/tcp.browser.ts +++ b/packages/transport-tcp/src/tcp.browser.ts @@ -1,32 +1,3 @@ -/** - * @packageDocumentation - * - * A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) based on the TCP networking stack. - * - * @example - * - * ```TypeScript - * import { createLibp2p } from 'libp2p' - * import { tcp } from '@libp2p/tcp' - * import { multiaddr } from '@multiformats/multiaddr' - * - * const node = await createLibp2p({ - * transports: [ - * tcp() - * ] - * }) - * - * const ma = multiaddr('/ip4/123.123.123.123/tcp/1234') - * - * // dial a TCP connection, timing out after 10 seconds - * const connection = await node.dial(ma, { - * signal: AbortSignal.timeout(10_000) - * }) - * - * // use connection... - * ``` - */ - import { serviceCapabilities, transportSymbol } from '@libp2p/interface' import type { TCPComponents, TCPDialEvents, TCPMetrics, TCPOptions } from './index.js' import type { Logger, Connection, Transport, Listener } from '@libp2p/interface' diff --git a/packages/transport-tcp/test/compliance.spec.ts b/packages/transport-tcp/test/compliance.spec.ts deleted file mode 100644 index 33a945a4d4..0000000000 --- a/packages/transport-tcp/test/compliance.spec.ts +++ /dev/null @@ -1,45 +0,0 @@ -import net from 'net' -import tests from '@libp2p/interface-compliance-tests/transport' -import { defaultLogger } from '@libp2p/logger' -import { multiaddr } from '@multiformats/multiaddr' -import sinon from 'sinon' -import { tcp } from '../src/index.js' - -describe('interface-transport compliance', () => { - tests({ - async setup () { - const transport = tcp()({ - logger: defaultLogger() - }) - const addrs = [ - multiaddr('/ip4/127.0.0.1/tcp/9091'), - multiaddr('/ip4/127.0.0.1/tcp/9092'), - multiaddr('/ip4/127.0.0.1/tcp/9093'), - multiaddr('/ip6/::/tcp/9094') - ] - - // Used by the dial tests to simulate a delayed connect - const connector = { - delay (delayMs: number) { - const netConnect = net.connect - sinon.replace(net, 'connect', (opts: any) => { - const socket = netConnect(opts) - const socketEmit = socket.emit.bind(socket) - sinon.replace(socket, 'emit', (...args: [string]) => { - const time = args[0] === 'connect' ? delayMs : 0 - setTimeout(() => socketEmit(...args), time) - return true - }) - return socket - }) - }, - restore () { - sinon.restore() - } - } - - return { dialer: transport, listener: transport, listenAddrs: addrs, dialAddrs: addrs, connector } - }, - async teardown () {} - }) -}) diff --git a/packages/transport-tcp/test/connection-limits.spec.ts b/packages/transport-tcp/test/connection-limits.spec.ts index b43163dade..47684f579b 100644 --- a/packages/transport-tcp/test/connection-limits.spec.ts +++ b/packages/transport-tcp/test/connection-limits.spec.ts @@ -1,11 +1,12 @@ import net from 'node:net' import { promisify } from 'util' -import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' +import { stubInterface } from 'sinon-ts' import { tcp } from '../src/index.js' import type { TCPListener } from '../src/listener.js' +import type { Connection, Upgrader } from '@libp2p/interface' const buildSocketAssertions = (port: number, closeCallbacks: Array<() => Promise | any>): { assertConnectedSocket(i: number): Promise, assertRefusedSocket(i: number): Promise } => { function createSocket (i: number): net.Socket { @@ -75,9 +76,19 @@ async function assertServerConnections (listener: TCPListener, connections: numb describe('closeAbove/listenBelow', () => { let afterEachCallbacks: Array<() => Promise | any> = [] + let upgrader: Upgrader beforeEach(() => { afterEachCallbacks = [] + + upgrader = stubInterface({ + upgradeInbound: async (maConn) => { + return stubInterface() + }, + upgradeOutbound: async (maConn) => { + return stubInterface() + } + }) }) afterEach(async () => { @@ -93,7 +104,6 @@ describe('closeAbove/listenBelow', () => { logger: defaultLogger() }) - const upgrader = mockUpgrader() const listener = transport.createListener({ upgrader }) as TCPListener afterEachCallbacks.push(async () => listener.close()) @@ -120,7 +130,6 @@ describe('closeAbove/listenBelow', () => { logger: defaultLogger() }) - const upgrader = mockUpgrader() const listener = transport.createListener({ upgrader }) as TCPListener afterEachCallbacks.push(async () => listener.close()) @@ -155,7 +164,6 @@ describe('closeAbove/listenBelow', () => { logger: defaultLogger() }) - const upgrader = mockUpgrader() const listener = transport.createListener({ upgrader }) as TCPListener afterEachCallbacks.push(async () => listener.close()) @@ -185,7 +193,6 @@ describe('closeAbove/listenBelow', () => { logger: defaultLogger() }) - const upgrader = mockUpgrader() const listener = transport.createListener({ upgrader }) as TCPListener afterEachCallbacks.push(async () => listener.close()) diff --git a/packages/transport-tcp/test/connection.spec.ts b/packages/transport-tcp/test/connection.spec.ts index 06174b685c..3f194ec325 100644 --- a/packages/transport-tcp/test/connection.spec.ts +++ b/packages/transport-tcp/test/connection.spec.ts @@ -1,8 +1,8 @@ -import { TypedEventEmitter } from '@libp2p/interface' -import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' +import { pEvent } from 'p-event' +import { stubInterface } from 'sinon-ts' import { tcp } from '../src/index.js' import type { Connection, Transport, Upgrader } from '@libp2p/interface' @@ -14,23 +14,25 @@ describe('valid localAddr and remoteAddr', () => { transport = tcp()({ logger: defaultLogger() }) - upgrader = mockUpgrader({ - events: new TypedEventEmitter() + upgrader = stubInterface({ + upgradeInbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + }, + upgradeOutbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + } }) }) const ma = multiaddr('/ip4/127.0.0.1/tcp/0') it('should resolve port 0', async () => { - // Create a Promise that resolves when a connection is handled - let handled: (conn: Connection) => void - const handlerPromise = new Promise(resolve => { handled = resolve }) - - const handler = (conn: Connection): void => { handled(conn) } - - // Create a listener with the handler + // Create a listener const listener = transport.createListener({ - handler, upgrader }) @@ -40,31 +42,29 @@ describe('valid localAddr and remoteAddr', () => { const localAddrs = listener.getAddrs() expect(localAddrs.length).to.equal(1) + const p = pEvent(listener, 'connection') + // Dial to that address await transport.dial(localAddrs[0], { upgrader }) // Wait for the incoming dial to be handled - await handlerPromise + await p // Close the listener await listener.close() }) it('should handle multiple simultaneous closes', async () => { - // Create a Promise that resolves when a connection is handled - let handled: (conn: Connection) => void - const handlerPromise = new Promise(resolve => { handled = resolve }) - - const handler = (conn: Connection): void => { handled(conn) } - - // Create a listener with the handler + // Create a listener const listener = transport.createListener({ - handler, upgrader }) + // Create a Promise that resolves when a connection is handled + const p = pEvent(listener, 'connection') + // Listen on the multi-address await listener.listen(ma) @@ -77,7 +77,7 @@ describe('valid localAddr and remoteAddr', () => { }) // Wait for the incoming dial to be handled - await handlerPromise + await p // Close the dialer with two simultaneous calls to `close` await Promise.race([ diff --git a/packages/transport-tcp/test/listen-dial.spec.ts b/packages/transport-tcp/test/listen-dial.spec.ts index 81e47a58dc..ff9d9dbed2 100644 --- a/packages/transport-tcp/test/listen-dial.spec.ts +++ b/packages/transport-tcp/test/listen-dial.spec.ts @@ -1,16 +1,12 @@ import os from 'os' import path from 'path' -import { AbortError, TypedEventEmitter } from '@libp2p/interface' -import { mockRegistrar, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' -import all from 'it-all' -import { pipe } from 'it-pipe' import pDefer from 'p-defer' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { stubInterface } from 'sinon-ts' import { tcp } from '../src/index.js' -import type { MultiaddrConnection, Transport, Upgrader } from '@libp2p/interface' +import type { Connection, Transport, Upgrader } from '@libp2p/interface' const isCI = process.env.CI @@ -23,8 +19,17 @@ describe('listen', () => { transport = tcp()({ logger: defaultLogger() }) - upgrader = mockUpgrader({ - events: new TypedEventEmitter() + upgrader = stubInterface({ + upgradeInbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + }, + upgradeOutbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + } }) }) @@ -38,7 +43,7 @@ describe('listen', () => { } }) - it('listen on path', async () => { + it('listen on unix domain socket', async () => { const mh = multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`) listener = transport.createListener({ @@ -163,21 +168,21 @@ describe('listen', () => { }) describe('dial', () => { - const protocol = '/echo/1.0.0' let transport: Transport let upgrader: Upgrader beforeEach(async () => { - const registrar = mockRegistrar() - void registrar.handle(protocol, (evt) => { - void pipe( - evt.stream, - evt.stream - ) - }) - upgrader = mockUpgrader({ - registrar, - events: new TypedEventEmitter() + upgrader = stubInterface({ + upgradeInbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + }, + upgradeOutbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + } }) transport = tcp()({ @@ -185,30 +190,21 @@ describe('dial', () => { }) }) - it('dial on IPv4', async () => { + it('dial IPv4', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') const listener = transport.createListener({ upgrader }) await listener.listen(ma) - const conn = await transport.dial(ma, { + await expect(transport.dial(ma, { upgrader - }) - const stream = await conn.newStream([protocol]) + })).to.eventually.be.ok() - const values = await pipe( - [uint8ArrayFromString('hey')], - stream, - async (source) => all(source) - ) - - expect(values[0].subarray()).to.equalBytes(uint8ArrayFromString('hey')) - await conn.close() await listener.close() }) - it('dial on IPv6', async () => { + it('dial IPv6', async () => { if (isCI != null) { return } @@ -218,179 +214,39 @@ describe('dial', () => { upgrader }) await listener.listen(ma) - const conn = await transport.dial(ma, { - upgrader - }) - const stream = await conn.newStream([protocol]) - - const values = await pipe( - [uint8ArrayFromString('hey')], - stream, - async (source) => all(source) - ) - expect(values[0].subarray()).to.equalBytes(uint8ArrayFromString('hey')) - await conn.close() - await listener.close() - }) - - it('dial on path', async () => { - const ma = multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`) - - const listener = transport.createListener({ - upgrader - }) - await listener.listen(ma) - const conn = await transport.dial(ma, { - upgrader - }) - const stream = await conn.newStream([protocol]) - - const values = await pipe( - [uint8ArrayFromString('hey')], - stream, - async (source) => all(source) - ) - - expect(values[0].subarray()).to.equalBytes(uint8ArrayFromString('hey')) - await conn.close() - await listener.close() - }) - - it('dial and destroy on listener', async () => { - let handled: () => void - const handledPromise = new Promise(resolve => { handled = resolve }) - - const ma = multiaddr('/ip6/::/tcp/9090') - - const listener = transport.createListener({ - handler: (conn) => { - // let multistream select finish before closing - setTimeout(() => { - void conn.close() - .then(() => { handled() }) - }, 100) - }, - upgrader - }) - - await listener.listen(ma) - const addrs = listener.getAddrs() - const conn = await transport.dial(addrs[0], { + await expect(transport.dial(ma, { upgrader - }) - const stream = await conn.newStream([protocol]) - pipe(stream) + })).to.eventually.be.ok() - await handledPromise - await conn.close() await listener.close() }) - it('dial and destroy on dialer', async () => { - if (isCI != null) { - return - } - - let handled: () => void - const handledPromise = new Promise(resolve => { handled = resolve }) - - const ma = multiaddr('/ip6/::/tcp/9090') - - const listener = transport.createListener({ - handler: () => { - handled() - }, - upgrader - }) - - await listener.listen(ma) - const addrs = listener.getAddrs() - const conn = await transport.dial(addrs[0], { - upgrader - }) - - await conn.close() - await handledPromise - await listener.close() - }) + it('dial unix domain socket', async () => { + const ma = multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`) - it('dials on IPv4 with IPFS Id', async () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = transport.createListener({ upgrader }) await listener.listen(ma) - const conn = await transport.dial(ma, { + await expect(transport.dial(ma, { upgrader - }) - const stream = await conn.newStream([protocol]) + })).to.eventually.be.ok() - const values = await pipe( - [uint8ArrayFromString('hey')], - stream, - async (source) => all(source) - ) - expect(values[0].subarray()).to.equalBytes(uint8ArrayFromString('hey')) - - await conn.close() await listener.close() }) - it('aborts during dial', async () => { + it('dials IPv4 with IPFS Id', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const maConnPromise = pDefer() - - // @ts-expect-error missing return value - upgrader.upgradeOutbound = async (maConn, opts) => { - maConnPromise.resolve(maConn) - - // abort the upgrade if the signal aborts - await new Promise((resolve, reject) => { - opts?.signal?.addEventListener('abort', () => { - reject(new AbortError()) - }) - }) - } - const listener = transport.createListener({ upgrader }) await listener.listen(ma) - const abortController = new AbortController() - - // abort once the upgrade process has started - void maConnPromise.promise.then(() => { - abortController.abort() - }) - await expect(transport.dial(ma, { - upgrader, - signal: abortController.signal - })).to.eventually.be.rejected('The operation was aborted') - - await expect(maConnPromise.promise).to.eventually.have.nested.property('timeline.close') - .that.is.ok('did not gracefully close maConn') - - await listener.close() - }) - - it('aborts before dial', async () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = transport.createListener({ upgrader - }) - await listener.listen(ma) - - const abortController = new AbortController() - abortController.abort() - - await expect(transport.dial(ma, { - upgrader, - signal: abortController.signal - })).to.eventually.be.rejected('The operation was aborted') + })).to.eventually.be.ok() await listener.close() }) diff --git a/packages/transport-tcp/test/max-connections.spec.ts b/packages/transport-tcp/test/max-connections.spec.ts index dc3339061d..c834f2bfe0 100644 --- a/packages/transport-tcp/test/max-connections.spec.ts +++ b/packages/transport-tcp/test/max-connections.spec.ts @@ -1,14 +1,31 @@ import net from 'node:net' import { promisify } from 'node:util' -import { TypedEventEmitter } from '@libp2p/interface' -import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' +import { stubInterface } from 'sinon-ts' import { tcp } from '../src/index.js' +import type { Connection, Upgrader } from '@libp2p/interface' describe('maxConnections', () => { const afterEachCallbacks: Array<() => Promise | any> = [] + let upgrader: Upgrader + + beforeEach(() => { + upgrader = stubInterface({ + upgradeInbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + }, + upgradeOutbound: async (maConn) => { + return stubInterface({ + remoteAddr: maConn.remoteAddr + }) + } + }) + }) + afterEach(async () => { await Promise.all(afterEachCallbacks.map(fn => fn())) afterEachCallbacks.length = 0 @@ -24,12 +41,8 @@ describe('maxConnections', () => { logger: defaultLogger() }) - const upgrader = mockUpgrader({ - events: new TypedEventEmitter() - }) const listener = transport.createListener({ upgrader }) - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.push(() => listener.close()) + afterEachCallbacks.push(async () => listener.close()) await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) listener.addEventListener('connection', (conn) => { @@ -42,7 +55,6 @@ describe('maxConnections', () => { const socket = net.connect({ host: '127.0.0.1', port }) sockets.push(socket) - // eslint-disable-next-line @typescript-eslint/promise-function-async afterEachCallbacks.unshift(async () => { if (!socket.destroyed) { socket.destroy() diff --git a/packages/transport-websockets/package.json b/packages/transport-websockets/package.json index 2d0a6ad697..9adfc3f3b6 100644 --- a/packages/transport-websockets/package.json +++ b/packages/transport-websockets/package.json @@ -97,6 +97,7 @@ "it-goodbye": "^4.0.6", "it-pipe": "^3.0.1", "it-stream-types": "^2.0.1", + "p-event": "^6.0.1", "p-wait-for": "^5.0.2", "uint8arraylist": "^2.4.8", "uint8arrays": "^5.1.0" diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index 84905da253..d4d48a49ca 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -66,10 +66,6 @@ class WebSocketListener extends TypedEventEmitter implements Lis .then((conn) => { this.log('inbound connection %s upgraded', maConn.remoteAddr) - if (init?.handler != null) { - init?.handler(conn) - } - self.dispatchEvent(new CustomEvent('connection', { detail: conn })) diff --git a/packages/transport-websockets/test/compliance.node.ts b/packages/transport-websockets/test/compliance.node.ts deleted file mode 100644 index c204accde3..0000000000 --- a/packages/transport-websockets/test/compliance.node.ts +++ /dev/null @@ -1,63 +0,0 @@ -/* eslint-env mocha */ - -import http from 'http' -import tests from '@libp2p/interface-compliance-tests/transport' -import { defaultLogger } from '@libp2p/logger' -import { multiaddr } from '@multiformats/multiaddr' -import * as filters from '../src/filters.js' -import { webSockets } from '../src/index.js' -import type { WebSocketListenerInit } from '../src/listener.js' -import type { Listener } from '@libp2p/interface' - -describe('interface-transport compliance', () => { - tests({ - async setup () { - const ws = webSockets({ filter: filters.all })({ - logger: defaultLogger() - }) - const addrs = [ - multiaddr('/ip4/127.0.0.1/tcp/9096/ws'), - multiaddr('/ip4/127.0.0.1/tcp/9097/ws'), - multiaddr('/dns4/ipfs.io/tcp/9097/ws'), - multiaddr('/dns4/ipfs.io/tcp/9097/wss') - ] - - let delayMs = 0 - const delayedCreateListener = (options: WebSocketListenerInit): Listener => { - // A server that will delay the upgrade event by delayMs - options.server = new Proxy(http.createServer(), { - get (server, prop) { - if (prop === 'on') { - return (event: string, handler: (...args: any[]) => void) => { - server.on(event, (...args) => { - if (event !== 'upgrade' || delayMs === 0) { - handler(...args); return - } - setTimeout(() => { handler(...args) }, delayMs) - }) - } - } - // @ts-expect-error cannot access props with a string - return server[prop] - } - }) - - return ws.createListener(options) - } - - const wsProxy = new Proxy(ws, { - // @ts-expect-error cannot access props with a string - get: (_, prop) => prop === 'createListener' ? delayedCreateListener : ws[prop] - }) - - // Used by the dial tests to simulate a delayed connect - const connector = { - delay (ms: number) { delayMs = ms }, - restore () { delayMs = 0 } - } - - return { dialer: wsProxy, listener: wsProxy, listenAddrs: addrs, dialAddrs: addrs, connector } - }, - async teardown () {} - }) -}) diff --git a/packages/transport-websockets/test/node.ts b/packages/transport-websockets/test/node.ts index c4cae7a2af..1765a9e0cd 100644 --- a/packages/transport-websockets/test/node.ts +++ b/packages/transport-websockets/test/node.ts @@ -14,7 +14,7 @@ import all from 'it-all' import drain from 'it-drain' import { goodbye } from 'it-goodbye' import { pipe } from 'it-pipe' -import defer from 'p-defer' +import { pEvent } from 'p-event' import waitFor from 'p-wait-for' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import * as filters from '../src/filters.js' @@ -22,7 +22,6 @@ import { webSockets } from '../src/index.js' import type { Listener, Transport } from '@libp2p/interface' import type { Source } from 'it-stream-types' import type { Uint8ArrayList } from 'uint8arraylist' -import './compliance.node.js' async function * toBuffers (source: Source): AsyncGenerator { for await (const list of source) { @@ -62,13 +61,13 @@ describe('listen', () => { logger: defaultLogger() }) const listener = ws.createListener({ - handler: (conn) => { - void conn.newStream([protocol]).then(async (stream) => { - await pipe(stream, stream) - }) - }, upgrader }) + listener.addEventListener('connection', (event) => { + void event.detail.newStream([protocol]).then(async (stream) => { + await pipe(stream, stream) + }) + }) await listener.listen(ma) const conn = await ws.dial(ma, { @@ -296,10 +295,10 @@ describe('dial', () => { logger: defaultLogger() }) - // Create a Promise that resolves when a connection is handled - const deferred = defer() + const listener = ws.createListener({ upgrader }) - const listener = ws.createListener({ handler: deferred.resolve, upgrader }) + // Create a Promise that resolves when a connection is handled + const p = pEvent(listener, 'connection') // Listen on the multiaddr await listener.listen(ma) @@ -311,7 +310,7 @@ describe('dial', () => { await ws.dial(localAddrs[0], { upgrader }) // Wait for the incoming dial to be handled - await deferred.promise + await p // close the listener await listener.close() @@ -328,13 +327,13 @@ describe('dial', () => { logger: defaultLogger() }) listener = ws.createListener({ - handler: (conn) => { - void conn.newStream([protocol]).then(async (stream) => { - await pipe(stream, stream) - }) - }, upgrader }) + listener.addEventListener('connection', (event) => { + void event.detail.newStream([protocol]).then(async (stream) => { + await pipe(stream, stream) + }) + }) await listener.listen(ma) }) @@ -385,13 +384,13 @@ describe('dial', () => { logger: defaultLogger() }) listener = ws.createListener({ - handler: (conn) => { - void conn.newStream([protocol]).then(async (stream) => { - await pipe(stream, stream) - }) - }, upgrader }) + listener.addEventListener('connection', (event) => { + void event.detail.newStream([protocol]).then(async (stream) => { + await pipe(stream, stream) + }) + }) await listener.listen(ma) }) @@ -434,13 +433,13 @@ describe('dial', () => { logger: defaultLogger() }) listener = ws.createListener({ - handler: (conn) => { - void conn.newStream([protocol]).then(async (stream) => { - await pipe(stream, stream) - }) - }, upgrader }) + listener.addEventListener('connection', (event) => { + void event.detail.newStream([protocol]).then(async (stream) => { + await pipe(stream, stream) + }) + }) await listener.listen(ma) })