diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9869d6e302..0b5b160e44 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -236,7 +236,7 @@ jobs: id-token: write pull-requests: write steps: - - uses: google-github-actions/release-please-action@v4 + - uses: googleapis/release-please-action@v4 id: release with: token: ${{ secrets.UCI_GITHUB_TOKEN || github.token }} diff --git a/.release-please-manifest.json b/.release-please-manifest.json index e08af06f65..40bd99c43c 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1 +1 @@ -{"interop":"2.0.33","packages/connection-encrypter-plaintext":"1.0.24","packages/connection-encrypter-tls":"1.0.11","packages/crypto":"4.1.2","packages/interface":"1.4.0","packages/interface-compliance-tests":"5.4.5","packages/interface-internal":"1.2.2","packages/kad-dht":"12.0.17","packages/keychain":"4.0.15","packages/libp2p":"1.6.0","packages/logger":"4.0.13","packages/metrics-prometheus":"3.0.24","packages/metrics-simple":"1.0.2","packages/multistream-select":"5.1.10","packages/peer-collections":"5.2.2","packages/peer-discovery-bootstrap":"10.0.24","packages/peer-discovery-mdns":"10.0.24","packages/peer-id":"4.1.2","packages/peer-id-factory":"4.1.2","packages/peer-record":"7.0.18","packages/peer-store":"10.0.19","packages/protocol-autonat":"1.0.21","packages/protocol-dcutr":"1.0.21","packages/protocol-echo":"1.0.7","packages/protocol-fetch":"1.0.18","packages/protocol-identify":"2.0.2","packages/protocol-perf":"3.0.24","packages/protocol-ping":"1.0.19","packages/pubsub":"9.0.19","packages/pubsub-floodsub":"9.0.20","packages/record":"4.0.2","packages/stream-multiplexer-mplex":"10.0.24","packages/transport-circuit-relay-v2":"1.0.24","packages/transport-tcp":"9.0.26","packages/transport-webrtc":"4.0.33","packages/transport-websockets":"8.0.24","packages/transport-webtransport":"4.0.32","packages/upnp-nat":"1.0.22","packages/utils":"5.4.2"} \ No newline at end of file +{"packages/connection-encrypter-plaintext":"1.0.24","packages/connection-encrypter-tls":"1.0.12","packages/crypto":"4.1.2","packages/interface":"1.4.0","packages/interface-compliance-tests":"5.4.5","packages/interface-internal":"1.2.2","packages/kad-dht":"12.0.17","packages/keychain":"4.0.15","packages/libp2p":"1.6.0","packages/logger":"4.0.13","packages/metrics-devtools":"0.1.0","packages/metrics-prometheus":"3.0.24","packages/metrics-simple":"1.0.2","packages/multistream-select":"5.1.10","packages/peer-collections":"5.2.2","packages/peer-discovery-bootstrap":"10.0.24","packages/peer-discovery-mdns":"10.0.24","packages/peer-id":"4.1.2","packages/peer-id-factory":"4.1.2","packages/peer-record":"7.0.18","packages/peer-store":"10.0.19","packages/protocol-autonat":"1.0.21","packages/protocol-dcutr":"1.0.21","packages/protocol-echo":"1.0.7","packages/protocol-fetch":"1.0.18","packages/protocol-identify":"2.0.2","packages/protocol-perf":"3.0.24","packages/protocol-ping":"1.0.19","packages/pubsub":"9.0.19","packages/pubsub-floodsub":"9.0.20","packages/record":"4.0.2","packages/stream-multiplexer-mplex":"10.0.24","packages/transport-circuit-relay-v2":"1.0.24","packages/transport-tcp":"9.0.26","packages/transport-webrtc":"4.0.33","packages/transport-websockets":"8.0.24","packages/transport-webtransport":"4.0.32","packages/upnp-nat":"1.0.22","packages/utils":"5.4.2"} diff --git a/.release-please.json b/.release-please.json index c558e0fcb0..8b8b5e462b 100644 --- a/.release-please.json +++ b/.release-please.json @@ -9,7 +9,6 @@ { "type": "refactor", "section": "Refactors", "hidden": false } ], "packages": { - "interop": {}, "packages/connection-encrypter-plaintext": {}, "packages/connection-encrypter-tls": {}, "packages/crypto": {}, @@ -20,6 +19,7 @@ "packages/keychain": {}, "packages/libp2p": {}, "packages/logger": {}, + "packages/metrics-devtools": {}, "packages/metrics-prometheus": {}, "packages/metrics-simple": {}, "packages/multistream-select": {}, diff --git a/doc/package.json b/doc/package.json index 792bf07988..eaf777b78d 100644 --- a/doc/package.json +++ b/doc/package.json @@ -24,7 +24,7 @@ "doc-check": "aegir doc-check" }, "devDependencies": { - "aegir": "^42.2.11" + "aegir": "^43.0.1" }, "private": true } diff --git a/interop/package.json b/interop/package.json index 9194c81d0b..f2ade900f6 100644 --- a/interop/package.json +++ b/interop/package.json @@ -30,7 +30,7 @@ "@libp2p/websockets": "^8.0.24", "@libp2p/webtransport": "^4.0.32", "@multiformats/multiaddr": "^12.2.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "libp2p": "^1.6.0", "p-event": "^6.0.1", "redis": "^4.6.14" diff --git a/package.json b/package.json index c90f6c85ee..41e54bccb4 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ "docs:no-publish": "aegir docs --publish false -- --exclude interop --exclude doc" }, "devDependencies": { - "aegir": "^42.0.0", + "aegir": "^43.0.1", "npm-run-all": "^4.1.5" }, "eslintConfig": { diff --git a/packages/connection-encrypter-plaintext/package.json b/packages/connection-encrypter-plaintext/package.json index 6286827aa1..8860635781 100644 --- a/packages/connection-encrypter-plaintext/package.json +++ b/packages/connection-encrypter-plaintext/package.json @@ -64,7 +64,7 @@ "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", "@multiformats/multiaddr": "^12.2.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "protons": "^7.5.0", "sinon": "^18.0.0" }, diff --git a/packages/connection-encrypter-tls/CHANGELOG.md b/packages/connection-encrypter-tls/CHANGELOG.md index d7ced9b1e0..59d46ff25f 100644 --- a/packages/connection-encrypter-tls/CHANGELOG.md +++ b/packages/connection-encrypter-tls/CHANGELOG.md @@ -6,6 +6,13 @@ * devDependencies * @libp2p/interface-compliance-tests bumped from ^5.3.0 to ^5.3.1 +## [1.0.12](https://github.com/libp2p/js-libp2p/compare/tls-v1.0.11...tls-v1.0.12) (2024-05-28) + + +### Bug Fixes + +* export tls key as pkcs8 ([#2562](https://github.com/libp2p/js-libp2p/issues/2562)) ([167bf2b](https://github.com/libp2p/js-libp2p/commit/167bf2b3cf0aa741c8118e241c3668e8ef91c549)) + ## [1.0.11](https://github.com/libp2p/js-libp2p/compare/tls-v1.0.10...tls-v1.0.11) (2024-05-17) diff --git a/packages/connection-encrypter-tls/package.json b/packages/connection-encrypter-tls/package.json index 4fc1fa26d8..d7d989d4e1 100644 --- a/packages/connection-encrypter-tls/package.json +++ b/packages/connection-encrypter-tls/package.json @@ -1,6 +1,6 @@ { "name": "@libp2p/tls", - "version": "1.0.11", + "version": "1.0.12", "description": "A connection encrypter that uses TLS 1.3", "license": "Apache-2.0 OR MIT", "homepage": "https://github.com/libp2p/js-libp2p/tree/main/packages/connection-encrypter-tls#readme", @@ -67,7 +67,7 @@ "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", "@multiformats/multiaddr": "^12.2.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "protons": "^7.5.0", "sinon": "^18.0.0" }, diff --git a/packages/connection-encrypter-tls/src/utils.ts b/packages/connection-encrypter-tls/src/utils.ts index e093e33616..e95c3694c4 100644 --- a/packages/connection-encrypter-tls/src/utils.ts +++ b/packages/connection-encrypter-tls/src/utils.ts @@ -179,11 +179,11 @@ export async function generateCertificate (peerId: PeerId): Promise<{ cert: stri ] }) - const certPrivateKeySpki = await crypto.subtle.exportKey('spki', keys.privateKey) + const certPrivateKeyPkcs8 = await crypto.subtle.exportKey('pkcs8', keys.privateKey) return { cert: selfCert.toString(), - key: spkiToPEM(certPrivateKeySpki) + key: pkcs8ToPEM(certPrivateKeyPkcs8) } } @@ -213,7 +213,7 @@ export function encodeSignatureData (certPublicKey: ArrayBuffer): Uint8Array { ]) } -function spkiToPEM (keydata: ArrayBuffer): string { +function pkcs8ToPEM (keydata: ArrayBuffer): string { return formatAsPem(uint8ArrayToString(new Uint8Array(keydata), 'base64')) } diff --git a/packages/crypto/package.json b/packages/crypto/package.json index 2c973d0e7a..069f148dc4 100644 --- a/packages/crypto/package.json +++ b/packages/crypto/package.json @@ -95,7 +95,7 @@ }, "devDependencies": { "@types/mocha": "^10.0.6", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "benchmark": "^2.1.4", "protons": "^7.5.0" }, diff --git a/packages/integration-tests/.aegir.js b/packages/integration-tests/.aegir.js index 6517c7b1ea..078dd47d7d 100644 --- a/packages/integration-tests/.aegir.js +++ b/packages/integration-tests/.aegir.js @@ -1,4 +1,5 @@ -import { pipe } from 'it-pipe' +import { execa } from 'execa' +import pDefer from 'p-defer' /** @type {import('aegir').PartialOptions} */ export default { @@ -18,6 +19,7 @@ export default { const { plaintext } = await import('@libp2p/plaintext') const { circuitRelayServer, circuitRelayTransport } = await import('@libp2p/circuit-relay-v2') const { identify } = await import('@libp2p/identify') + const { echo } = await import('@libp2p/echo') const peerId = await createEd25519PeerId() const libp2p = await createLibp2p({ @@ -49,24 +51,73 @@ export default { reservations: { maxReservations: Infinity } - }) + }), + echo: echo() } }) - // Add the echo protocol - await libp2p.handle('/echo/1.0.0', ({ stream }) => { - pipe(stream, stream) - .catch() // sometimes connections are closed before multistream-select finishes which causes an error - }) + + const goLibp2pRelay = await createGoLibp2pRelay() return { libp2p, + goLibp2pRelay, env: { - RELAY_MULTIADDR: libp2p.getMultiaddrs().filter(ma => WebSockets.matches(ma)).pop() + RELAY_MULTIADDR: libp2p.getMultiaddrs().filter(ma => WebSockets.matches(ma)).pop(), + GO_RELAY_PEER: goLibp2pRelay.peerId, + GO_RELAY_MULTIADDRS: goLibp2pRelay.multiaddrs, + GO_RELAY_APIADDR: goLibp2pRelay.apiAddr } } }, after: async (_, before) => { await before.libp2p.stop() + await before.goLibp2pRelay.proc.kill() + } + } +} + +async function createGoLibp2pRelay () { + const { multiaddr } = await import('@multiformats/multiaddr') + const { path: p2pd } = await import('go-libp2p') + const { createClient } = await import('@libp2p/daemon-client') + + const controlPort = Math.floor(Math.random() * (50000 - 10000 + 1)) + 10000 + const apiAddr = multiaddr(`/ip4/127.0.0.1/tcp/${controlPort}`) + const deferred = pDefer() + const proc = execa(p2pd(), [ + `-listen=${apiAddr.toString()}`, + // listen on TCP, WebSockets and WebTransport + '-hostAddrs=/ip4/127.0.0.1/tcp/0,/ip4/127.0.0.1/tcp/0/ws,/ip4/127.0.0.1/udp/0/quic-v1/webtransport', + '-noise=true', + '-dhtServer', + '-relay', + '-muxer=mplex' + ], { + env: { + GOLOG_LOG_LEVEL: 'debug' + } + }) + proc.catch(() => { + // go-libp2p daemon throws when killed + }) + + proc.stdout?.on('data', (buf) => { + const str = buf.toString() + + // daemon has started + if (str.includes('Control socket:')) { + deferred.resolve() } + }) + await deferred.promise + + const daemonClient = createClient(apiAddr) + const id = await daemonClient.identify() + + return { + apiAddr, + peerId: id.peerId.toString(), + multiaddrs: id.addrs.map(ma => ma.toString()).join(','), + proc } } diff --git a/packages/integration-tests/package.json b/packages/integration-tests/package.json index cfa11c3b72..718d7ac0fe 100644 --- a/packages/integration-tests/package.json +++ b/packages/integration-tests/package.json @@ -42,6 +42,7 @@ "@libp2p/daemon-client": "^8.0.5", "@libp2p/daemon-server": "^7.0.5", "@libp2p/dcutr": "^1.0.20", + "@libp2p/echo": "^1.0.7", "@libp2p/fetch": "^1.0.17", "@libp2p/floodsub": "^9.0.19", "@libp2p/identify": "^2.0.1", @@ -61,10 +62,13 @@ "@libp2p/tls": "^1.0.10", "@libp2p/webrtc": "^4.0.32", "@libp2p/websockets": "^8.0.23", + "@libp2p/webtransport": "^4.0.32", "@multiformats/mafmt": "^12.1.6", "@multiformats/multiaddr": "^12.2.3", - "aegir": "^42.2.11", + "@multiformats/multiaddr-matcher": "^1.2.1", + "aegir": "^43.0.1", "delay": "^6.0.0", + "detect-browser": "^5.3.0", "execa": "^9.1.0", "go-libp2p": "^1.2.0", "it-all": "^3.0.6", diff --git a/packages/integration-tests/test/circuit-relay-discovery.node.ts b/packages/integration-tests/test/circuit-relay-discovery.node.ts index 41f6490e80..40ab730da2 100644 --- a/packages/integration-tests/test/circuit-relay-discovery.node.ts +++ b/packages/integration-tests/test/circuit-relay-discovery.node.ts @@ -2,21 +2,27 @@ import { yamux } from '@chainsafe/libp2p-yamux' import { circuitRelayServer, type CircuitRelayService, circuitRelayTransport } from '@libp2p/circuit-relay-v2' +import { identify } from '@libp2p/identify' +import { stop } from '@libp2p/interface' +import { kadDHT, passthroughMapper } from '@libp2p/kad-dht' import { plaintext } from '@libp2p/plaintext' import { tcp } from '@libp2p/tcp' import { expect } from 'aegir/chai' import { createLibp2p } from 'libp2p' -import { pEvent } from 'p-event' -import { getRelayAddress, hasRelay, MockContentRouting, mockContentRouting } from './fixtures/utils.js' +import pDefer from 'p-defer' +import { getRelayAddress, hasRelay } from './fixtures/utils.js' import type { Libp2p } from '@libp2p/interface' +import type { KadDHT } from '@libp2p/kad-dht' + +const DHT_PROTOCOL = '/integration-test/circuit-relay/1.0.0' describe('circuit-relay discovery', () => { let local: Libp2p let remote: Libp2p let relay: Libp2p<{ relay: CircuitRelayService }> + let bootstrapper: Libp2p<{ kadDht: KadDHT }> beforeEach(async () => { - // create relay first so it has time to advertise itself via content routing relay = await createLibp2p({ addresses: { listen: ['/ip4/127.0.0.1/tcp/0'] @@ -30,20 +36,57 @@ describe('circuit-relay discovery', () => { connectionEncryption: [ plaintext() ], - contentRouters: [ - mockContentRouting() - ], services: { relay: circuitRelayServer({ - advertise: { - bootDelay: 10 + reservations: { + maxReservations: Infinity } + }), + identify: identify(), + kadDht: kadDHT({ + protocol: DHT_PROTOCOL, + peerInfoMapper: passthroughMapper, + clientMode: false + }) + } + }) + + bootstrapper = await createLibp2p({ + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0'] + }, + transports: [ + tcp() + ], + streamMuxers: [ + yamux() + ], + connectionEncryption: [ + plaintext() + ], + services: { + identify: identify(), + kadDht: kadDHT({ + protocol: DHT_PROTOCOL, + peerInfoMapper: passthroughMapper, + clientMode: false }) } }) - // wait for relay to advertise service successfully - await pEvent(relay.services.relay, 'relay:advert:success') + // connect the bootstrapper to the relay + await bootstrapper.dial(relay.getMultiaddrs()) + + // bootstrapper should be able to locate relay via DHT + const foundRelay = pDefer() + void Promise.resolve().then(async () => { + for await (const event of bootstrapper.services.kadDht.findPeer(relay.peerId)) { + if (event.name === 'FINAL_PEER') { + foundRelay.resolve() + } + } + }) + await foundRelay.promise // now create client nodes ;[local, remote] = await Promise.all([ @@ -63,9 +106,14 @@ describe('circuit-relay discovery', () => { connectionEncryption: [ plaintext() ], - contentRouters: [ - mockContentRouting() - ] + services: { + identify: identify(), + kadDht: kadDHT({ + protocol: DHT_PROTOCOL, + peerInfoMapper: passthroughMapper, + clientMode: true + }) + } }), createLibp2p({ addresses: { @@ -83,25 +131,35 @@ describe('circuit-relay discovery', () => { connectionEncryption: [ plaintext() ], - contentRouters: [ - mockContentRouting() - ] + services: { + identify: identify(), + kadDht: kadDHT({ + protocol: DHT_PROTOCOL, + peerInfoMapper: passthroughMapper, + clientMode: true + }) + } }) ]) + + // connect both nodes to the bootstrapper + await Promise.all([ + local.dial(bootstrapper.getMultiaddrs()), + remote.dial(bootstrapper.getMultiaddrs()) + ]) }) afterEach(async () => { - MockContentRouting.reset() - // Stop each node - return Promise.all([local, remote, relay].map(async libp2p => { - if (libp2p != null) { - await libp2p.stop() - } - })) + await stop( + local, + remote, + bootstrapper, + relay + ) }) - it('should find provider for relay and add it as listen relay', async () => { + it('should discover relay and add it as listen relay', async () => { // both nodes should discover the relay - they have no direct connection // so it will be via content routing const localRelayPeerId = await hasRelay(local) diff --git a/packages/integration-tests/test/circuit-relay-discovery.spec.ts b/packages/integration-tests/test/circuit-relay-discovery.spec.ts new file mode 100644 index 0000000000..8cc3722b03 --- /dev/null +++ b/packages/integration-tests/test/circuit-relay-discovery.spec.ts @@ -0,0 +1,99 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ['error', 6] */ + +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { circuitRelayTransport } from '@libp2p/circuit-relay-v2' +import { identify } from '@libp2p/identify' +import { stop } from '@libp2p/interface' +import { mplex } from '@libp2p/mplex' +import { plaintext } from '@libp2p/plaintext' +import { webSockets } from '@libp2p/websockets' +import * as filters from '@libp2p/websockets/filters' +import { webTransport } from '@libp2p/webtransport' +import { multiaddr } from '@multiformats/multiaddr' +import { WebSockets, WebTransport } from '@multiformats/multiaddr-matcher' +import { createLibp2p } from 'libp2p' +import { hasRelay, isFirefox } from './fixtures/utils.js' +import type { Libp2p } from '@libp2p/interface' + +describe('circuit-relay discovery', () => { + let node: Libp2p + + beforeEach(async () => { + node = await createLibp2p({ + transports: [ + webSockets({ + filter: filters.all + }), + circuitRelayTransport({ + discoverRelays: 1 + }), + webTransport() + ], + streamMuxers: [ + yamux(), + mplex() + ], + connectionEncryption: [ + plaintext(), + noise() + ], + connectionGater: { + denyDialMultiaddr: () => false + }, + services: { + identify: identify() + } + }) + }) + + afterEach(async () => { + await stop(node) + }) + + it('should reserve slot on go relay via WebSockets', async () => { + const ma = (process.env.GO_RELAY_MULTIADDRS ?? '') + .split(',') + .map(ma => multiaddr(ma)) + .filter(ma => WebSockets.matches(ma)) + .pop() + + if (ma == null) { + throw new Error('Could not detect go relay WebSocket address') + } + + // dial the relay + await node.dial(ma) + + // wait for a reservation to be made + await hasRelay(node) + }) + + it('should reserve slot on go relay via WebTransport', async function () { + if (globalThis.WebTransport == null) { + return this.skip() + } + + if (isFirefox) { + // https://bugzilla.mozilla.org/show_bug.cgi?id=1899812 + return this.skip() + } + + const ma = (process.env.GO_RELAY_MULTIADDRS ?? '') + .split(',') + .map(ma => multiaddr(`${ma}/p2p/${process.env.GO_RELAY_PEER}`)) + .filter(ma => WebTransport.matches(ma)) + .pop() + + if (ma == null) { + throw new Error('Could not detect go relay WebSocket address') + } + + // dial the relay + await node.dial(ma) + + // wait for a reservation to be made + await hasRelay(node) + }) +}) diff --git a/packages/integration-tests/test/circuit-relay.node.ts b/packages/integration-tests/test/circuit-relay.node.ts index ad4721df1e..fe5600b198 100644 --- a/packages/integration-tests/test/circuit-relay.node.ts +++ b/packages/integration-tests/test/circuit-relay.node.ts @@ -638,7 +638,7 @@ describe('circuit-relay', () => { expect(circuitListener[0].relayStore.listenerCount('relay:removed')).to.equal(1) }) - it('should mark a relayed connection as transient', async () => { + it('should mark an outgoing relayed connection as transient', async () => { // discover relay and make reservation const connectionToRelay = await remote.dial(relay1.getMultiaddrs()[0]) @@ -655,6 +655,25 @@ describe('circuit-relay', () => { expect(connection).to.have.property('transient', true) }) + it('should mark an incoming relayed connection as transient', async () => { + // discover relay and make reservation + const connectionToRelay = await remote.dial(relay1.getMultiaddrs()[0]) + + // connection to relay should not be marked transient + expect(connectionToRelay).to.have.property('transient', false) + + await usingAsRelay(remote, relay1) + + // dial the remote through the relay + const ma = getRelayAddress(remote) + await local.dial(ma) + + // connection from local through relay should be marked transient + const connections = remote.getConnections(local.peerId) + expect(connections).to.have.lengthOf(1) + expect(connections).to.have.nested.property('[0].transient', true) + }) + it('should not open streams on a transient connection', async () => { // discover relay and make reservation await remote.dial(relay1.getMultiaddrs()[0]) @@ -1073,5 +1092,22 @@ describe('circuit-relay', () => { // for longer than that expect(finish - start).to.be.greaterThan(defaultDurationLimit) }) + + it('should not mark an outgoing connection as transient', async () => { + const ma = getRelayAddress(remote) + + const connection = await local.dial(ma) + expect(connection).to.have.property('transient', false) + }) + + it('should not mark an incoming connection as transient', async () => { + const ma = getRelayAddress(remote) + + await local.dial(ma) + + const connections = remote.getConnections(local.peerId) + expect(connections).to.have.lengthOf(1) + expect(connections).to.have.nested.property('[0].transient', false) + }) }) }) diff --git a/packages/integration-tests/test/fixtures/utils.ts b/packages/integration-tests/test/fixtures/utils.ts index ddc7b7b822..2491f4a7dc 100644 --- a/packages/integration-tests/test/fixtures/utils.ts +++ b/packages/integration-tests/test/fixtures/utils.ts @@ -1,5 +1,6 @@ import { RELAY_V2_HOP_CODEC } from '@libp2p/circuit-relay-v2' import { peerIdFromString } from '@libp2p/peer-id' +import { detect } from 'detect-browser' import pWaitFor from 'p-wait-for' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import type { Libp2p, AbortOptions, ContentRouting, PeerId, PeerInfo } from '@libp2p/interface' @@ -8,6 +9,9 @@ import type { Multiaddr } from '@multiformats/multiaddr' import type { CID, Version } from 'multiformats' import type { Options as PWaitForOptions } from 'p-wait-for' +const browser = detect() +export const isFirefox = ((browser != null) && browser.name === 'firefox') + export async function usingAsRelay (node: Libp2p, relay: Libp2p, opts?: PWaitForOptions): Promise { // Wait for peer to be used as a relay await pWaitFor(() => { diff --git a/packages/interface-compliance-tests/package.json b/packages/interface-compliance-tests/package.json index 70e32ef8ff..70304d695a 100644 --- a/packages/interface-compliance-tests/package.json +++ b/packages/interface-compliance-tests/package.json @@ -122,7 +122,7 @@ "@libp2p/utils": "^5.4.2", "@multiformats/multiaddr": "^12.2.3", "abortable-iterator": "^5.0.1", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "delay": "^6.0.0", "it-all": "^3.0.6", "it-byte-stream": "^1.0.10", diff --git a/packages/interface-internal/package.json b/packages/interface-internal/package.json index 893147cb72..f40b4a6968 100644 --- a/packages/interface-internal/package.json +++ b/packages/interface-internal/package.json @@ -54,7 +54,7 @@ "uint8arraylist": "^2.4.8" }, "devDependencies": { - "aegir": "^42.2.11" + "aegir": "^43.0.1" }, "sideEffects": false } diff --git a/packages/interface/package.json b/packages/interface/package.json index e6525ba461..4483105ed0 100644 --- a/packages/interface/package.json +++ b/packages/interface/package.json @@ -56,7 +56,7 @@ "uint8arraylist": "^2.4.8" }, "devDependencies": { - "aegir": "^42.2.11" + "aegir": "^43.0.1" }, "browser": { "events": "./dist/src/events.browser.js" diff --git a/packages/kad-dht/package.json b/packages/kad-dht/package.json index 338ea52518..8976ff5f4f 100644 --- a/packages/kad-dht/package.json +++ b/packages/kad-dht/package.json @@ -97,7 +97,7 @@ "@types/lodash.range": "^3.2.9", "@types/sinon": "^17.0.3", "@types/which": "^3.0.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "datastore-core": "^9.2.9", "datastore-level": "^10.1.8", "delay": "^6.0.0", diff --git a/packages/keychain/package.json b/packages/keychain/package.json index ca7c325daa..356df3abd0 100644 --- a/packages/keychain/package.json +++ b/packages/keychain/package.json @@ -71,7 +71,7 @@ "devDependencies": { "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "datastore-core": "^9.2.9" }, "sideEffects": false diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index 278b44b64b..8a96a71c21 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -120,7 +120,7 @@ "@libp2p/tcp": "^9.0.26", "@libp2p/websockets": "^8.0.24", "@multiformats/mafmt": "^12.1.6", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "delay": "^6.0.0", "it-all": "^3.0.6", "it-drain": "^3.0.7", diff --git a/packages/libp2p/src/peer-routing.ts b/packages/libp2p/src/peer-routing.ts index 53f0572498..5e4315ad41 100644 --- a/packages/libp2p/src/peer-routing.ts +++ b/packages/libp2p/src/peer-routing.ts @@ -1,5 +1,5 @@ import { CodeError } from '@libp2p/interface' -import { PeerSet } from '@libp2p/peer-collections' +import { createScalableCuckooFilter } from '@libp2p/utils/filters' import merge from 'it-merge' import parallel from 'it-parallel' import { codes, messages } from './errors.js' @@ -79,7 +79,7 @@ export class DefaultPeerRouting implements PeerRouting { } const self = this - const seen = new PeerSet() + const seen = createScalableCuckooFilter(1024) for await (const peer of parallel( async function * () { @@ -119,11 +119,11 @@ export class DefaultPeerRouting implements PeerRouting { } // deduplicate peers - if (seen.has(peer.id)) { + if (seen.has(peer.id.toBytes())) { continue } - seen.add(peer.id) + seen.add(peer.id.toBytes()) yield peer } diff --git a/packages/libp2p/src/random-walk.ts b/packages/libp2p/src/random-walk.ts index d116706aa8..2146b541f4 100644 --- a/packages/libp2p/src/random-walk.ts +++ b/packages/libp2p/src/random-walk.ts @@ -103,10 +103,17 @@ export class RandomWalk extends TypedEventEmitter implements R // find peers until no more consumers are interested while (this.walkers > 0) { try { - for await (const peer of this.peerRouting.getClosestPeers(randomBytes(32), { signal })) { + const data = randomBytes(32) + let s = Date.now() + + for await (const peer of this.peerRouting.getClosestPeers(data, { signal })) { + if (signal.aborted) { + this.log('aborting walk') + } + signal.throwIfAborted() - this.log('found peer %p', peer.id) + this.log('found peer %p after %dms for %d walkers', peer.id, Date.now() - s, this.walkers) found++ this.safeDispatchEvent('walk:peer', { detail: peer @@ -115,9 +122,14 @@ export class RandomWalk extends TypedEventEmitter implements R // if we only have one consumer, pause the query until they request // another random peer or they signal they are no longer interested if (this.walkers === 1 && this.needNext != null) { + this.log('wait for need next') await raceSignal(this.needNext.promise, signal) } + + s = Date.now() } + + this.log('walk iteration for %b and %d walkers finished, found %d peers', data, this.walkers, found) } catch (err) { this.log.error('randomwalk errored', err) @@ -126,6 +138,8 @@ export class RandomWalk extends TypedEventEmitter implements R }) } } + + this.log('no walkers left, ended walk') }) .catch(err => { this.log.error('randomwalk errored', err) diff --git a/packages/logger/package.json b/packages/logger/package.json index 777b0ef684..acc438deb2 100644 --- a/packages/logger/package.json +++ b/packages/logger/package.json @@ -63,7 +63,7 @@ "devDependencies": { "@libp2p/peer-id": "^4.1.2", "@types/debug": "^4.1.12", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "sinon": "^18.0.0", "uint8arrays": "^5.1.0" }, diff --git a/packages/metrics-devtools/CHANGELOG.md b/packages/metrics-devtools/CHANGELOG.md new file mode 100644 index 0000000000..fc17e98d26 --- /dev/null +++ b/packages/metrics-devtools/CHANGELOG.md @@ -0,0 +1,8 @@ +# Changelog + +## [0.1.0](https://github.com/libp2p/js-libp2p/compare/devtools-metrics-v0.0.1...devtools-metrics-v0.1.0) (2024-05-28) + + +### Features + +* add devtools metrics ([#2551](https://github.com/libp2p/js-libp2p/issues/2551)) ([7464dc0](https://github.com/libp2p/js-libp2p/commit/7464dc00caef2d95bfcfc75346f48e0901458df6)) diff --git a/packages/metrics-devtools/LICENSE b/packages/metrics-devtools/LICENSE new file mode 100644 index 0000000000..20ce483c86 --- /dev/null +++ b/packages/metrics-devtools/LICENSE @@ -0,0 +1,4 @@ +This project is dual licensed under MIT and Apache-2.0. + +MIT: https://www.opensource.org/licenses/mit +Apache-2.0: https://www.apache.org/licenses/license-2.0 diff --git a/packages/metrics-devtools/LICENSE-APACHE b/packages/metrics-devtools/LICENSE-APACHE new file mode 100644 index 0000000000..14478a3b60 --- /dev/null +++ b/packages/metrics-devtools/LICENSE-APACHE @@ -0,0 +1,5 @@ +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/packages/metrics-devtools/LICENSE-MIT b/packages/metrics-devtools/LICENSE-MIT new file mode 100644 index 0000000000..72dc60d84b --- /dev/null +++ b/packages/metrics-devtools/LICENSE-MIT @@ -0,0 +1,19 @@ +The MIT License (MIT) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/packages/metrics-devtools/README.md b/packages/metrics-devtools/README.md new file mode 100644 index 0000000000..f862d9dc9b --- /dev/null +++ b/packages/metrics-devtools/README.md @@ -0,0 +1,68 @@ +# @libp2p/devtools-metrics + +[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) +[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) +[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p) +[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p/main.yml?branch=main\&style=flat-square)](https://github.com/libp2p/js-libp2p/actions/workflows/main.yml?query=branch%3Amain) + +> Collect libp2p metrics and send them to browser DevTools + +# About + + + +Configure your browser-based libp2p node with DevTools metrics: + +```typescript +import { createLibp2p } from 'libp2p' +import { devToolsMetrics } from '@libp2p/devtools-metrics' + +const node = await createLibp2p({ + metrics: devToolsMetrics() +}) +``` + +Then use the [DevTools plugin](https://github.com/ipfs-shipyard/js-libp2p-devtools) +for Chrome or Firefox to inspect the state of your running node. + +# Install + +```console +$ npm i @libp2p/devtools-metrics +``` + +## Browser ` +``` + +# API Docs + +- + +# License + +Licensed under either of + +- Apache 2.0, ([LICENSE-APACHE](https://github.com/libp2p/js-libp2p/blob/main/packages/metrics-devtools/LICENSE-APACHE) / ) +- MIT ([LICENSE-MIT](https://github.com/libp2p/js-libp2p/blob/main/packages/metrics-devtools/LICENSE-MIT) / ) + +# Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/packages/metrics-devtools/package.json b/packages/metrics-devtools/package.json new file mode 100644 index 0000000000..1b0d7765fa --- /dev/null +++ b/packages/metrics-devtools/package.json @@ -0,0 +1,63 @@ +{ + "name": "@libp2p/devtools-metrics", + "version": "0.1.0", + "description": "Collect libp2p metrics and send them to browser DevTools", + "author": "", + "license": "Apache-2.0 OR MIT", + "homepage": "https://github.com/libp2p/js-libp2p/tree/main/packages/metrics-devtools#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/libp2p/js-libp2p.git" + }, + "bugs": { + "url": "https://github.com/libp2p/js-libp2p/issues" + }, + "publishConfig": { + "access": "public", + "provenance": true + }, + "type": "module", + "types": "./dist/src/index.d.ts", + "files": [ + "src", + "dist", + "!dist/test", + "!**/*.tsbuildinfo" + ], + "exports": { + ".": { + "types": "./dist/src/index.d.ts", + "import": "./dist/src/index.js" + } + }, + "eslintConfig": { + "extends": "ipfs", + "parserOptions": { + "project": true, + "sourceType": "module" + } + }, + "scripts": { + "clean": "aegir clean", + "lint": "aegir lint", + "dep-check": "aegir dep-check", + "doc-check": "aegir doc-check", + "build": "aegir build", + "test": "aegir test -t browser", + "test:chrome": "aegir test -t browser --cov" + }, + "dependencies": { + "@libp2p/interface": "^1.3.1", + "@libp2p/interface-internal": "^1.2.1", + "@libp2p/logger": "^4.0.12", + "@libp2p/simple-metrics": "^1.0.1", + "multiformats": "^13.1.0" + }, + "devDependencies": { + "@libp2p/peer-id-factory": "^4.1.1", + "aegir": "^43.0.1", + "race-event": "^1.3.0", + "sinon-ts": "^2.0.0" + }, + "sideEffects": false +} diff --git a/packages/metrics-devtools/src/index.ts b/packages/metrics-devtools/src/index.ts new file mode 100644 index 0000000000..a0118a4543 --- /dev/null +++ b/packages/metrics-devtools/src/index.ts @@ -0,0 +1,469 @@ +/** + * @packageDocumentation + * + * Configure your browser-based libp2p node with DevTools metrics: + * + * ```typescript + * import { createLibp2p } from 'libp2p' + * import { devToolsMetrics } from '@libp2p/devtools-metrics' + * + * const node = await createLibp2p({ + * metrics: devToolsMetrics() + * }) + * ``` + * + * Then use the [DevTools plugin](https://github.com/ipfs-shipyard/js-libp2p-devtools) + * for Chrome or Firefox to inspect the state of your running node. + */ + +import { start, stop } from '@libp2p/interface' +import { enable, disable } from '@libp2p/logger' +import { simpleMetrics } from '@libp2p/simple-metrics' +import { base64 } from 'multiformats/bases/base64' +import type { ComponentLogger, Connection, Libp2pEvents, Logger, Metrics, MultiaddrConnection, PeerId, Peer as PeerStorePeer, PeerStore, PeerUpdate, Stream, TypedEventEmitter } from '@libp2p/interface' +import type { TransportManager, Registrar, ConnectionManager } from '@libp2p/interface-internal' + +export const SOURCE_DEVTOOLS = '@libp2p/devtools-metrics:devtools' +export const SOURCE_SERVICE_WORKER = '@libp2p/devtools-metrics:worker' +export const SOURCE_CONTENT_SCRIPT = '@libp2p/devtools-metrics:content' +export const SOURCE_METRICS = '@libp2p/devtools-metrics:metrics' +export const LIBP2P_DEVTOOLS_METRICS_KEY = '________libp2p_devtools_metrics' + +// let devtools know we are here +Object.defineProperty(globalThis, LIBP2P_DEVTOOLS_METRICS_KEY, { + value: true, + enumerable: false, + writable: false +}) + +/** + * Sent when new metrics are available + */ +export interface MetricsMessage { + source: typeof SOURCE_METRICS + type: 'metrics' + metrics: Record +} + +/** + * This message represents the current state of the libp2p node + */ +export interface SelfMessage { + source: typeof SOURCE_METRICS + type: 'self' + peer: SelfPeer +} + +/** + * This message represents the current state of the libp2p node + */ +export interface PeersMessage { + source: typeof SOURCE_METRICS + type: 'peers' + peers: Peer[] +} + +/** + * Sent by the DevTools service worker to the DevTools panel when the inspected + * page has finished (re)loading + */ +export interface PageLoadedMessage { + source: '@libp2p/devtools-metrics:devtools' + type: 'page-loaded' + tabId: number +} + +/** + * Sent by the DevTools service worker to the DevTools panel when it has failed + * to send a message to the inspected page as there is no receiving end present. + * + * This normally means the content script has not been loaded due to the user + * not having granted permission for the script to run. + */ +export interface PermissionsErrorMessage { + source: '@libp2p/devtools-metrics:devtools' + type: 'permissions-error' + tabId: number +} + +/** + * This message is sent by DevTools when no `self` message has been received + */ +export interface IdentifyMessage { + source: '@libp2p/devtools-metrics:devtools' + type: 'identify' + tabId: number +} + +/** + * This message is sent by DevTools when no `self` message has been received + */ +export interface EnableDebugMessage { + source: '@libp2p/devtools-metrics:devtools' + type: 'debug' + namespace: string + tabId: number +} + +/** + * We cannot use the web extension API to copy text to the cliboard yet as it's + * not supported in Firefox yet, so get the page to do it + * + * @see https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/Interact_with_the_clipboard#writing_to_the_clipboard + */ +export interface CopyToClipboardMessage { + source: '@libp2p/devtools-metrics:devtools' + type: 'copy-to-clipboard' + value: string + tabId: number +} + +/** + * Messages that are sent from the application page to the DevTools panel + */ +export type ApplicationMessage = MetricsMessage | SelfMessage | PeersMessage + +/** + * Messages that are sent from the service worker + */ +export type WorkerMessage = PageLoadedMessage | PermissionsErrorMessage + +/** + * Messages that are sent from the DevTools panel page to the application page + */ +export type DevToolsMessage = IdentifyMessage | EnableDebugMessage | CopyToClipboardMessage + +export interface SelfPeer { + /** + * The identifier of the peer + */ + id: string + + /** + * The list of multiaddrs the peer is listening on + */ + multiaddrs: string[] + + /** + * Any peer store tags the peer has + */ + tags: Record + + /** + * Any peer store metadata the peer has + */ + metadata: Record + + /** + * The protocols the peer supports + */ + protocols: string[] +} + +export interface Address { + /** + * The multiaddr this address represents + */ + multiaddr: string + + /** + * If `true`, this multiaddr came from a signed peer record + */ + isCertified?: boolean + + /** + * If `true`, the current node has an active connection to this peer via this + * address + */ + isConnected?: boolean +} + +export interface Peer { + /** + * The identifier of the remote peer + */ + id: string + + /** + * The list of addresses the peer has that we know about + */ + addresses: Address[] + + /** + * Any peer store tags the peer has + */ + tags: Record + + /** + * Any peer store metadata the peer has + */ + metadata: Record + + /** + * The protocols the peer supports, if known + */ + protocols: string[] +} + +export interface DevToolsMetricsInit { + /** + * How often to pass metrics to the DevTools panel + */ + intervalMs?: number +} + +export interface DevToolsMetricsComponents { + logger: ComponentLogger + events: TypedEventEmitter + peerId: PeerId + transportManager: TransportManager + registrar: Registrar + connectionManager: ConnectionManager + peerStore: PeerStore +} + +class DevToolsMetrics implements Metrics { + private readonly log: Logger + private readonly components: DevToolsMetricsComponents + private readonly simpleMetrics: Metrics + private readonly intervalMs?: number + + constructor (components: DevToolsMetricsComponents, init?: Partial) { + this.log = components.logger.forComponent('libp2p:devtools-metrics') + this.intervalMs = init?.intervalMs + this.components = components + + // collect information on current peers and sent it to the dev tools panel + this.onPeersUpdate = debounce(this.onPeersUpdate.bind(this), 1000) + this.onSelfUpdate = this.onSelfUpdate.bind(this) + this.onIncomingMessage = this.onIncomingMessage.bind(this) + + // collect metrics + this.simpleMetrics = simpleMetrics({ + intervalMs: this.intervalMs, + onMetrics: (metrics) => { + const message: MetricsMessage = { + source: SOURCE_METRICS, + type: 'metrics', + metrics + } + + this.log('post metrics message') + window.postMessage(message, '*') + } + })({}) + } + + trackMultiaddrConnection (maConn: MultiaddrConnection): void { + this.simpleMetrics.trackMultiaddrConnection(maConn) + } + + trackProtocolStream (stream: Stream, connection: Connection): void { + this.simpleMetrics.trackProtocolStream(stream, connection) + } + + registerMetric (name: any, options: any): any { + return this.simpleMetrics.registerMetric(name, options) + } + + registerMetricGroup (name: any, options: any): any { + return this.simpleMetrics.registerMetricGroup(name, options) + } + + registerCounter (name: any, options: any): any { + return this.simpleMetrics.registerCounter(name, options) + } + + registerCounterGroup (name: any, options: any): any { + return this.simpleMetrics.registerCounterGroup(name, options) + } + + async start (): Promise { + // send peer updates + this.components.events.addEventListener('peer:connect', this.onPeersUpdate) + this.components.events.addEventListener('peer:disconnect', this.onPeersUpdate) + this.components.events.addEventListener('peer:identify', this.onPeersUpdate) + this.components.events.addEventListener('peer:update', this.onPeersUpdate) + + // send node status updates + this.components.events.addEventListener('self:peer:update', this.onSelfUpdate) + + // process incoming messages from devtools + window.addEventListener('message', this.onIncomingMessage) + + // send metrics + await start(this.simpleMetrics) + } + + async stop (): Promise { + window.removeEventListener('message', this.onIncomingMessage) + this.components.events.removeEventListener('self:peer:update', this.onSelfUpdate) + this.components.events.removeEventListener('peer:connect', this.onPeersUpdate) + this.components.events.removeEventListener('peer:disconnect', this.onPeersUpdate) + this.components.events.removeEventListener('peer:identify', this.onPeersUpdate) + this.components.events.removeEventListener('peer:update', this.onPeersUpdate) + await stop(this.simpleMetrics) + } + + private onPeersUpdate (): void { + Promise.resolve().then(async () => { + const message: PeersMessage = { + source: SOURCE_METRICS, + type: 'peers', + peers: [] + } + + const connections = this.components.connectionManager.getConnectionsMap() + const connectedAddresses = [...connections.values()].flatMap(conn => conn).map(conn => conn.remoteAddr.toString()) + + for (const [peerId, conns] of connections.entries()) { + try { + const peer = await this.components.peerStore.get(peerId) + + message.peers.push({ + id: peerId.toString(), + addresses: peer.addresses.map(({ isCertified, multiaddr }) => { + const addr = multiaddr.toString() + + return { + multiaddr: addr, + isCertified, + isConnected: connectedAddresses.includes(addr) + } + }), + protocols: [...peer.protocols], + tags: toObject(peer.tags, (t) => t.value), + metadata: toObject(peer.metadata, (buf) => base64.encode(buf)) + }) + } catch (err) { + this.log.error('could not load peer data from peer store', err) + + message.peers.push({ + id: peerId.toString(), + addresses: conns.map(conn => { + const addr = conn.remoteAddr.toString() + + return { + multiaddr: addr, + isConnected: connectedAddresses.includes(addr) + } + }), + protocols: [], + tags: {}, + metadata: {} + }) + } + } + + window.postMessage(message, '*') + }) + .catch(err => { + this.log.error('error sending peers message', err) + }) + } + + private onSelfUpdate (evt: CustomEvent): void { + this.sendSelfUpdate(evt.detail.peer) + } + + private sendSelfUpdate (peer: PeerStorePeer): void { + Promise.resolve() + .then(async () => { + const message: SelfMessage = { + source: SOURCE_METRICS, + type: 'self', + peer: { + id: peer.id.toString(), + multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr.toString()), + protocols: [...peer.protocols], + tags: toObject(peer.tags, (t) => t.value), + metadata: toObject(peer.metadata, (buf) => base64.encode(buf)) + } + } + + this.log('post node update message') + window.postMessage(message, '*') + }) + .catch(err => { + this.log.error('error sending self update', err) + }) + } + + private onIncomingMessage (event: MessageEvent): void { + // Only accept messages from same frame + if (event.source !== window) { + return + } + + const message = event.data + + // Only accept messages of correct format (our messages) + if (message?.source !== SOURCE_DEVTOOLS) { + return + } + + // respond to identify request + if (message.type === 'identify') { + Promise.resolve() + .then(async () => { + const peer = await this.components.peerStore.get(this.components.peerId) + + this.sendSelfUpdate(peer) + // also send our current peer list + this.onPeersUpdate() + }) + .catch(err => { + this.log.error('error sending identify response', err) + }) + } + + // handle enabling/disabling debug namespaces + if (message.type === 'debug') { + if (message.namespace.length > 0) { + enable(message.namespace) + } else { + disable() + } + } + } +} + +export function devToolsMetrics (init?: Partial): (components: DevToolsMetricsComponents) => Metrics { + return (components) => { + return new DevToolsMetrics(components, init) + } +} + +function toObject (map: Map, transform: (value: T) => R): Record { + const output: Record = {} + + for (const [key, value] of map.entries()) { + output[key] = transform(value) + } + + return output +} + +function debounce (callback: () => void, wait: number = 100): () => void { + let timeout: ReturnType + let start: number | undefined + + return (): void => { + if (start == null) { + start = Date.now() + } + + if (timeout != null && Date.now() - start > wait) { + clearTimeout(timeout) + start = undefined + callback() + return + } + + clearTimeout(timeout) + timeout = setTimeout(() => { + start = undefined + callback() + }, wait) + } +} diff --git a/packages/metrics-devtools/test/index.spec.ts b/packages/metrics-devtools/test/index.spec.ts new file mode 100644 index 0000000000..d20ce2cbbf --- /dev/null +++ b/packages/metrics-devtools/test/index.spec.ts @@ -0,0 +1,86 @@ +import { TypedEventEmitter, start, stop } from '@libp2p/interface' +import { defaultLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { expect } from 'aegir/chai' +import { raceEvent } from 'race-event' +import { stubInterface, type StubbedInstance } from 'sinon-ts' +import { LIBP2P_DEVTOOLS_METRICS_KEY, SOURCE_METRICS, SOURCE_DEVTOOLS, devToolsMetrics, type ApplicationMessage } from '../src/index.js' +import type { ComponentLogger, Libp2pEvents, Metrics, PeerId, PeerStore } from '@libp2p/interface' +import type { ConnectionManager, Registrar, TransportManager } from '@libp2p/interface-internal' + +interface StubbedComponents { + logger: ComponentLogger + events: TypedEventEmitter + peerId: PeerId + transportManager: StubbedInstance + registrar: StubbedInstance + connectionManager: StubbedInstance + peerStore: StubbedInstance +} + +describe('devtools-metrics', () => { + let components: StubbedComponents + let metrics: Metrics + + beforeEach(async () => { + components = { + logger: defaultLogger(), + events: new TypedEventEmitter(), + peerId: await createEd25519PeerId(), + transportManager: stubInterface(), + registrar: stubInterface(), + connectionManager: stubInterface(), + peerStore: stubInterface() + } + + metrics = devToolsMetrics({ + intervalMs: 10 + })(components) + + await start(metrics) + }) + + afterEach(async () => { + await stop(metrics) + }) + + it('should broadcast metrics', async () => { + const event = await raceEvent>(window, 'message', AbortSignal.timeout(1000), { + filter: (evt) => { + return evt.data.source === SOURCE_METRICS && evt.data.type === 'metrics' + } + }) + + expect(event).to.have.nested.property('data.metrics') + }) + + it('should identify node', async () => { + components.transportManager.getListeners.returns([]) + components.registrar.getProtocols.returns([]) + components.peerStore.get.withArgs(components.peerId).resolves({ + id: components.peerId, + addresses: [], + metadata: new Map(), + protocols: [], + tags: new Map() + }) + + // devtools asks the node to reveal itself + window.postMessage({ + source: SOURCE_DEVTOOLS, + type: 'identify' + }, '*') + + const event = await raceEvent>(window, 'message', AbortSignal.timeout(1000), { + filter: (evt) => { + return evt.data.source === SOURCE_METRICS && evt.data.type === 'self' + } + }) + + expect(event).to.have.nested.property('data.peer.id') + }) + + it('should signal presence of metrics', () => { + expect(globalThis).to.have.property(LIBP2P_DEVTOOLS_METRICS_KEY).that.is.true() + }) +}) diff --git a/packages/metrics-devtools/tsconfig.json b/packages/metrics-devtools/tsconfig.json new file mode 100644 index 0000000000..4c8f70ddb3 --- /dev/null +++ b/packages/metrics-devtools/tsconfig.json @@ -0,0 +1,27 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src", + "test" + ], + "references": [ + { + "path": "../interface" + }, + { + "path": "../interface-internal" + }, + { + "path": "../logger" + }, + { + "path": "../peer-id-factory" + }, + { + "path": "../metrics-simple" + } + ] +} diff --git a/packages/metrics-devtools/typedoc.json b/packages/metrics-devtools/typedoc.json new file mode 100644 index 0000000000..f599dc728d --- /dev/null +++ b/packages/metrics-devtools/typedoc.json @@ -0,0 +1,5 @@ +{ + "entryPoints": [ + "./src/index.ts" + ] +} diff --git a/packages/metrics-prometheus/package.json b/packages/metrics-prometheus/package.json index a722b15fd2..379e4c84d8 100644 --- a/packages/metrics-prometheus/package.json +++ b/packages/metrics-prometheus/package.json @@ -59,7 +59,7 @@ "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", "@multiformats/multiaddr": "^12.2.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "it-drain": "^3.0.7", "it-pipe": "^3.0.1", "p-defer": "^4.0.1" diff --git a/packages/metrics-simple/package.json b/packages/metrics-simple/package.json index d0f5ad82f7..bc981453c4 100644 --- a/packages/metrics-simple/package.json +++ b/packages/metrics-simple/package.json @@ -56,7 +56,7 @@ "it-stream-types": "^2.0.1" }, "devDependencies": { - "aegir": "^42.2.11", + "aegir": "^43.0.1", "p-defer": "^4.0.1" } } diff --git a/packages/multistream-select/package.json b/packages/multistream-select/package.json index 9a3a3d5875..47114cd036 100644 --- a/packages/multistream-select/package.json +++ b/packages/multistream-select/package.json @@ -70,7 +70,7 @@ }, "devDependencies": { "@libp2p/logger": "^4.0.13", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "iso-random-stream": "^2.0.2", "it-all": "^3.0.6", "it-drain": "^3.0.7", diff --git a/packages/peer-collections/package.json b/packages/peer-collections/package.json index 73cbd6eb90..a64803513c 100644 --- a/packages/peer-collections/package.json +++ b/packages/peer-collections/package.json @@ -61,7 +61,7 @@ "devDependencies": { "@libp2p/peer-id-factory": "^4.1.2", "@types/sinon": "^17.0.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "sinon": "^18.0.0", "sinon-ts": "^2.0.0" }, diff --git a/packages/peer-discovery-bootstrap/package.json b/packages/peer-discovery-bootstrap/package.json index 963a005381..1e166227ad 100644 --- a/packages/peer-discovery-bootstrap/package.json +++ b/packages/peer-discovery-bootstrap/package.json @@ -62,7 +62,7 @@ "devDependencies": { "@libp2p/interface-compliance-tests": "^5.4.5", "@libp2p/logger": "^4.0.13", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "sinon-ts": "^2.0.0" }, "sideEffects": false diff --git a/packages/peer-discovery-mdns/package.json b/packages/peer-discovery-mdns/package.json index ce177635c6..05e59e603e 100644 --- a/packages/peer-discovery-mdns/package.json +++ b/packages/peer-discovery-mdns/package.json @@ -63,7 +63,7 @@ "@libp2p/interface-compliance-tests": "^5.4.5", "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "p-wait-for": "^5.0.2", "sinon-ts": "^2.0.0" }, diff --git a/packages/peer-id-factory/package.json b/packages/peer-id-factory/package.json index ee4b9225e7..7bc7a5baf2 100644 --- a/packages/peer-id-factory/package.json +++ b/packages/peer-id-factory/package.json @@ -66,7 +66,7 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "aegir": "^42.2.11", + "aegir": "^43.0.1", "multiformats": "^13.1.0", "protons": "^7.5.0" }, diff --git a/packages/peer-id/package.json b/packages/peer-id/package.json index 2aa24401c0..c466996d94 100644 --- a/packages/peer-id/package.json +++ b/packages/peer-id/package.json @@ -59,7 +59,7 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "aegir": "^42.2.11" + "aegir": "^43.0.1" }, "sideEffects": false } diff --git a/packages/peer-record/package.json b/packages/peer-record/package.json index e3b27035f3..f279aff3b3 100644 --- a/packages/peer-record/package.json +++ b/packages/peer-record/package.json @@ -73,7 +73,7 @@ }, "devDependencies": { "@libp2p/peer-id-factory": "^4.1.2", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "protons": "^7.5.0" }, "sideEffects": false diff --git a/packages/peer-store/package.json b/packages/peer-store/package.json index 507856b7d5..ded7889b22 100644 --- a/packages/peer-store/package.json +++ b/packages/peer-store/package.json @@ -76,7 +76,7 @@ "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", "@types/sinon": "^17.0.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "datastore-core": "^9.2.9", "delay": "^6.0.0", "p-defer": "^4.0.1", diff --git a/packages/pnet/package.json b/packages/pnet/package.json index 3074bf7eef..9d0b1e3577 100644 --- a/packages/pnet/package.json +++ b/packages/pnet/package.json @@ -67,7 +67,7 @@ "@libp2p/peer-id-factory": "^4.1.1", "@multiformats/multiaddr": "^12.2.3", "@types/xsalsa20": "^1.1.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "it-all": "^3.0.6" }, "sideEffects": false diff --git a/packages/protocol-autonat/package.json b/packages/protocol-autonat/package.json index 97f7af9774..68ccc87d3d 100644 --- a/packages/protocol-autonat/package.json +++ b/packages/protocol-autonat/package.json @@ -68,7 +68,7 @@ }, "devDependencies": { "@libp2p/logger": "^4.0.13", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "it-all": "^3.0.6", "it-pushable": "^3.2.3", "protons": "^7.5.0", diff --git a/packages/protocol-autonat/src/autonat.ts b/packages/protocol-autonat/src/autonat.ts index 8e825e34a3..3c9b04c977 100644 --- a/packages/protocol-autonat/src/autonat.ts +++ b/packages/protocol-autonat/src/autonat.ts @@ -15,7 +15,7 @@ import { } from './constants.js' import { Message } from './pb/index.js' import type { AutoNATComponents, AutoNATServiceInit } from './index.js' -import type { Logger, Connection, PeerId, PeerInfo, Startable } from '@libp2p/interface' +import type { Logger, Connection, PeerId, PeerInfo, Startable, AbortOptions } from '@libp2p/interface' import type { IncomingStreamData } from '@libp2p/interface-internal' // if more than 3 peers manage to dial us on what we believe to be our external @@ -95,9 +95,6 @@ export class AutoNATService implements Startable { // appearing in the console setMaxListeners(Infinity, signal) - const ourHosts = this.components.addressManager.getAddresses() - .map(ma => ma.toOptions().host) - try { const self = this @@ -138,193 +135,188 @@ export class AutoNATService implements Startable { return } - const dialRequest = request.dial - - if (dialRequest == null) { - self.log.error('dial was missing from message') + yield Message.encode(await self.handleAutonatMessage(request, data.connection, { + signal + })) + }, + (source) => lp.encode(source), + data.stream + ) + } catch (err) { + this.log.error('error handling incoming autonat stream', err) + } finally { + signal.removeEventListener('abort', onAbort) + } + } - yield Message.encode({ - type: Message.MessageType.DIAL_RESPONSE, - dialResponse: { - status: Message.ResponseStatus.E_BAD_REQUEST, - statusText: 'No Dial message found in message' - } - }) + _verifyExternalAddresses (): void { + void this.verifyExternalAddresses() + .catch(err => { + this.log.error('error verifying external address', err) + }) + } - return - } + private async handleAutonatMessage (message: Message, connection: Connection, options?: AbortOptions): Promise { + const ourHosts = this.components.addressManager.getAddresses() + .map(ma => ma.toOptions().host) - let peerId: PeerId - const peer = dialRequest.peer + const dialRequest = message.dial - if (peer == null || peer.id == null) { - self.log.error('PeerId missing from message') + if (dialRequest == null) { + this.log.error('dial was missing from message') - yield Message.encode({ - type: Message.MessageType.DIAL_RESPONSE, - dialResponse: { - status: Message.ResponseStatus.E_BAD_REQUEST, - statusText: 'missing peer info' - } - }) + return { + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'No Dial message found in message' + } + } + } - return - } + let peerId: PeerId + const peer = dialRequest.peer - try { - peerId = peerIdFromBytes(peer.id) - } catch (err) { - self.log.error('invalid PeerId', err) + if (peer == null || peer.id == null) { + this.log.error('PeerId missing from message') - yield Message.encode({ - type: Message.MessageType.DIAL_RESPONSE, - dialResponse: { - status: Message.ResponseStatus.E_BAD_REQUEST, - statusText: 'bad peer id' - } - }) + return { + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'missing peer info' + } + } + } - return - } + try { + peerId = peerIdFromBytes(peer.id) + } catch (err) { + this.log.error('invalid PeerId', err) - self.log('incoming request from %p', peerId) + return { + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'bad peer id' + } + } + } - // reject any dial requests that arrive via relays - if (!data.connection.remotePeer.equals(peerId)) { - self.log('target peer %p did not equal sending peer %p', peerId, data.connection.remotePeer) + this.log('incoming request from %p', peerId) - yield Message.encode({ - type: Message.MessageType.DIAL_RESPONSE, - dialResponse: { - status: Message.ResponseStatus.E_BAD_REQUEST, - statusText: 'peer id mismatch' - } - }) + // reject any dial requests that arrive via relays + if (!connection.remotePeer.equals(peerId)) { + this.log('target peer %p did not equal sending peer %p', peerId, connection.remotePeer) - return - } + return { + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'peer id mismatch' + } + } + } - // get a list of multiaddrs to dial - const multiaddrs = peer.addrs - .map(buf => multiaddr(buf)) - .filter(ma => { - const isFromSameHost = ma.toOptions().host === data.connection.remoteAddr.toOptions().host + // get a list of multiaddrs to dial + const multiaddrs = peer.addrs + .map(buf => multiaddr(buf)) + .filter(ma => { + const isFromSameHost = ma.toOptions().host === connection.remoteAddr.toOptions().host - self.log.trace('request to dial %a was sent from %a is same host %s', ma, data.connection.remoteAddr, isFromSameHost) - // skip any Multiaddrs where the target node's IP does not match the sending node's IP - return isFromSameHost - }) - .filter(ma => { - const host = ma.toOptions().host - const isPublicIp = !(isPrivateIp(host) ?? false) + this.log.trace('request to dial %a was sent from %a is same host %s', ma, connection.remoteAddr, isFromSameHost) + // skip any Multiaddrs where the target node's IP does not match the sending node's IP + return isFromSameHost + }) + .filter(ma => { + const host = ma.toOptions().host + const isPublicIp = !(isPrivateIp(host) ?? false) - self.log.trace('host %s was public %s', host, isPublicIp) - // don't try to dial private addresses - return isPublicIp - }) - .filter(ma => { - const host = ma.toOptions().host - const isNotOurHost = !ourHosts.includes(host) + this.log.trace('host %s was public %s', host, isPublicIp) + // don't try to dial private addresses + return isPublicIp + }) + .filter(ma => { + const host = ma.toOptions().host + const isNotOurHost = !ourHosts.includes(host) - self.log.trace('host %s was not our host %s', host, isNotOurHost) - // don't try to dial nodes on the same host as us - return isNotOurHost - }) - .filter(ma => { - const isSupportedTransport = Boolean(self.components.transportManager.dialTransportForMultiaddr(ma)) + this.log.trace('host %s was not our host %s', host, isNotOurHost) + // don't try to dial nodes on the same host as us + return isNotOurHost + }) + .filter(ma => { + const isSupportedTransport = Boolean(this.components.transportManager.dialTransportForMultiaddr(ma)) - self.log.trace('transport for %a is supported %s', ma, isSupportedTransport) - // skip any Multiaddrs that have transports we do not support - return isSupportedTransport - }) - .map(ma => { - if (ma.getPeerId() == null) { - // make sure we have the PeerId as part of the Multiaddr - ma = ma.encapsulate(`/p2p/${peerId.toString()}`) - } + this.log.trace('transport for %a is supported %s', ma, isSupportedTransport) + // skip any Multiaddrs that have transports we do not support + return isSupportedTransport + }) + .map(ma => { + if (ma.getPeerId() == null) { + // make sure we have the PeerId as part of the Multiaddr + ma = ma.encapsulate(`/p2p/${peerId.toString()}`) + } - return ma - }) + return ma + }) - // make sure we have something to dial - if (multiaddrs.length === 0) { - self.log('no valid multiaddrs for %p in message', peerId) + // make sure we have something to dial + if (multiaddrs.length === 0) { + this.log('no valid multiaddrs for %p in message', peerId) - yield Message.encode({ - type: Message.MessageType.DIAL_RESPONSE, - dialResponse: { - status: Message.ResponseStatus.E_DIAL_REFUSED, - statusText: 'no dialable addresses' - } - }) + return { + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_DIAL_REFUSED, + statusText: 'no dialable addresses' + } + } + } - return - } + this.log('dial multiaddrs %s for peer %p', multiaddrs.map(ma => ma.toString()).join(', '), peerId) - self.log('dial multiaddrs %s for peer %p', multiaddrs.map(ma => ma.toString()).join(', '), peerId) + let errorMessage = '' + let lastMultiaddr = multiaddrs[0] - let errorMessage = '' - let lastMultiaddr = multiaddrs[0] + for await (const multiaddr of multiaddrs) { + let connection: Connection | undefined + lastMultiaddr = multiaddr - for await (const multiaddr of multiaddrs) { - let connection: Connection | undefined - lastMultiaddr = multiaddr + try { + connection = await this.components.connectionManager.openConnection(multiaddr, options) - try { - connection = await self.components.connectionManager.openConnection(multiaddr, { - signal - }) + if (!connection.remoteAddr.equals(multiaddr)) { + this.log.error('tried to dial %a but dialed %a', multiaddr, connection.remoteAddr) + throw new Error('Unexpected remote address') + } - if (!connection.remoteAddr.equals(multiaddr)) { - self.log.error('tried to dial %a but dialed %a', multiaddr, connection.remoteAddr) - throw new Error('Unexpected remote address') - } + this.log('Success %p', peerId) - self.log('Success %p', peerId) - - yield Message.encode({ - type: Message.MessageType.DIAL_RESPONSE, - dialResponse: { - status: Message.ResponseStatus.OK, - addr: connection.remoteAddr.decapsulateCode(protocols('p2p').code).bytes - } - }) - - return - } catch (err: any) { - self.log('could not dial %p', peerId, err) - errorMessage = err.message - } finally { - if (connection != null) { - await connection.close() - } - } + return { + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.OK, + addr: connection.remoteAddr.decapsulateCode(protocols('p2p').code).bytes } - - yield Message.encode({ - type: Message.MessageType.DIAL_RESPONSE, - dialResponse: { - status: Message.ResponseStatus.E_DIAL_ERROR, - statusText: errorMessage, - addr: lastMultiaddr.bytes - } - }) - }, - (source) => lp.encode(source), - data.stream - ) - } catch (err) { - this.log.error('error handling incoming autonat stream', err) - } finally { - signal.removeEventListener('abort', onAbort) + } + } catch (err: any) { + this.log('could not dial %p', peerId, err) + errorMessage = err.message + } finally { + if (connection != null) { + await connection.close() + } + } } - } - _verifyExternalAddresses (): void { - void this.verifyExternalAddresses() - .catch(err => { - this.log.error('error verifying external address', err) - }) + return { + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_DIAL_ERROR, + statusText: errorMessage, + addr: lastMultiaddr.bytes + } + } } /** diff --git a/packages/protocol-dcutr/package.json b/packages/protocol-dcutr/package.json index ca317f7e14..8f5bcc3f72 100644 --- a/packages/protocol-dcutr/package.json +++ b/packages/protocol-dcutr/package.json @@ -63,7 +63,7 @@ "uint8arraylist": "^2.4.8" }, "devDependencies": { - "aegir": "^42.2.11", + "aegir": "^43.0.1", "protons": "^7.5.0", "sinon": "^18.0.0", "sinon-ts": "^2.0.0" diff --git a/packages/protocol-echo/package.json b/packages/protocol-echo/package.json index b0be5852c1..74fcb25f24 100644 --- a/packages/protocol-echo/package.json +++ b/packages/protocol-echo/package.json @@ -58,7 +58,7 @@ }, "devDependencies": { "@libp2p/logger": "^4.0.13", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "it-all": "^3.0.6", "it-pair": "^2.0.6", "sinon": "^18.0.0", diff --git a/packages/protocol-fetch/package.json b/packages/protocol-fetch/package.json index ff16368940..11c1686cdf 100644 --- a/packages/protocol-fetch/package.json +++ b/packages/protocol-fetch/package.json @@ -62,7 +62,7 @@ "devDependencies": { "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "it-pair": "^2.0.6", "protons": "^7.5.0", "sinon": "^18.0.0", diff --git a/packages/protocol-identify/package.json b/packages/protocol-identify/package.json index 3648773a7a..159dead77c 100644 --- a/packages/protocol-identify/package.json +++ b/packages/protocol-identify/package.json @@ -70,7 +70,7 @@ "@libp2p/interface-compliance-tests": "^5.4.5", "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "delay": "^6.0.0", "it-length-prefixed": "^9.0.4", "it-pair": "^2.0.6", diff --git a/packages/protocol-perf/package.json b/packages/protocol-perf/package.json index 87b9f85e47..eea0544e4d 100644 --- a/packages/protocol-perf/package.json +++ b/packages/protocol-perf/package.json @@ -60,7 +60,7 @@ "devDependencies": { "@libp2p/interface-compliance-tests": "^5.4.5", "@libp2p/logger": "^4.0.13", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "it-last": "^3.0.6", "it-pair": "^2.0.6", "sinon-ts": "^2.0.0" diff --git a/packages/protocol-ping/package.json b/packages/protocol-ping/package.json index dc5529334f..c8a269dcc9 100644 --- a/packages/protocol-ping/package.json +++ b/packages/protocol-ping/package.json @@ -61,7 +61,7 @@ "devDependencies": { "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "it-byte-stream": "^1.0.10", "it-pair": "^2.0.6", "p-defer": "^4.0.1", diff --git a/packages/pubsub-floodsub/package.json b/packages/pubsub-floodsub/package.json index 6229b0e97c..331c54186e 100644 --- a/packages/pubsub-floodsub/package.json +++ b/packages/pubsub-floodsub/package.json @@ -73,7 +73,7 @@ "@libp2p/peer-id-factory": "^4.1.2", "@multiformats/multiaddr": "^12.2.3", "@types/sinon": "^17.0.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "multiformats": "^13.1.0", "p-wait-for": "^5.0.2", "protons": "^7.5.0", diff --git a/packages/pubsub/package.json b/packages/pubsub/package.json index fa4b8f163b..f94b74cc6f 100644 --- a/packages/pubsub/package.json +++ b/packages/pubsub/package.json @@ -102,7 +102,7 @@ "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", "@types/sinon": "^17.0.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "delay": "^6.0.0", "it-pair": "^2.0.6", "p-defer": "^4.0.1", diff --git a/packages/record/package.json b/packages/record/package.json index 83a617de55..6828f75415 100644 --- a/packages/record/package.json +++ b/packages/record/package.json @@ -65,7 +65,7 @@ "@types/lodash.random": "^3.2.9", "@types/lodash.range": "^3.2.9", "@types/which": "^3.0.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "multiformats": "^13.1.0", "protons": "^7.5.0" }, diff --git a/packages/stream-multiplexer-mplex/package.json b/packages/stream-multiplexer-mplex/package.json index a5c022ea3c..8b1340ce81 100644 --- a/packages/stream-multiplexer-mplex/package.json +++ b/packages/stream-multiplexer-mplex/package.json @@ -74,7 +74,7 @@ "devDependencies": { "@libp2p/interface-compliance-tests": "^5.4.5", "@libp2p/logger": "^4.0.13", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "benchmark": "^2.1.4", "cborg": "^4.2.0", "delay": "^6.0.0", diff --git a/packages/transport-circuit-relay-v2/package.json b/packages/transport-circuit-relay-v2/package.json index 1f0e712b40..d4b0a2cc73 100644 --- a/packages/transport-circuit-relay-v2/package.json +++ b/packages/transport-circuit-relay-v2/package.json @@ -65,8 +65,8 @@ "it-stream-types": "^2.0.1", "multiformats": "^13.1.0", "p-defer": "^4.0.1", - "p-retry": "^6.2.0", "protons-runtime": "^5.4.0", + "race-signal": "^1.0.2", "uint8arraylist": "^2.4.8", "uint8arrays": "^5.1.0" }, @@ -74,7 +74,7 @@ "@libp2p/interface-compliance-tests": "^5.4.5", "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "delay": "^6.0.0", "it-drain": "^3.0.7", "it-pair": "^2.0.6", @@ -82,7 +82,6 @@ "it-to-buffer": "^4.0.7", "p-wait-for": "^5.0.2", "protons": "^7.5.0", - "race-signal": "^1.0.2", "sinon": "^18.0.0", "sinon-ts": "^2.0.0" }, diff --git a/packages/transport-circuit-relay-v2/src/constants.ts b/packages/transport-circuit-relay-v2/src/constants.ts index 50ed7a1134..7a47d1ba40 100644 --- a/packages/transport-circuit-relay-v2/src/constants.ts +++ b/packages/transport-circuit-relay-v2/src/constants.ts @@ -1,26 +1,11 @@ const second = 1000 const minute = 60 * second -/** - * Delay before HOP relay service is advertised on the network - */ -export const ADVERTISE_BOOT_DELAY = 15 * minute - -/** - * Delay Between HOP relay service advertisements on the network - */ -export const ADVERTISE_TTL = 30 * minute - /** * Multicodec code */ export const CIRCUIT_PROTO_CODE = 290 -/** - * Relay HOP relay service namespace for discovery - */ -export const RELAY_RENDEZVOUS_NS = '/libp2p/relay' - /** * The maximum number of relay reservations the relay server will accept */ @@ -36,8 +21,21 @@ export const DEFAULT_MAX_RESERVATION_CLEAR_INTERVAL = 300 * second */ export const DEFAULT_MAX_RESERVATION_TTL = 2 * 60 * minute +/** + * How many reservation attempts to make in parallel + */ export const DEFAULT_RESERVATION_CONCURRENCY = 1 +/** + * How long to wait for a reservation attempt to finsih + */ +export const DEFAULT_RESERVATION_COMPLETION_TIMEOUT = 1000 + +/** + * How long to let the reservation attempt queue to grow + */ +export const DEFAULT_MAX_RESERVATION_QUEUE_LENGTH = 100 + export const RELAY_SOURCE_TAG = 'circuit-relay-source' export const RELAY_TAG = 'circuit-relay-relay' @@ -77,3 +75,6 @@ export const ERR_NO_ROUTERS_AVAILABLE = 'ERR_NO_ROUTERS_AVAILABLE' export const ERR_RELAYED_DIAL = 'ERR_RELAYED_DIAL' export const ERR_HOP_REQUEST_FAILED = 'ERR_HOP_REQUEST_FAILED' export const ERR_TRANSFER_LIMIT_EXCEEDED = 'ERR_TRANSFER_LIMIT_EXCEEDED' + +export const DEFAULT_DISCOVERY_FILTER_SIZE = 4096 +export const DEFAULT_DISCOVERY_FILTER_ERROR_RATE = 0.001 diff --git a/packages/transport-circuit-relay-v2/src/pb/index.ts b/packages/transport-circuit-relay-v2/src/pb/index.ts index be6d4dbf33..9073458e46 100644 --- a/packages/transport-circuit-relay-v2/src/pb/index.ts +++ b/packages/transport-circuit-relay-v2/src/pb/index.ts @@ -4,8 +4,8 @@ /* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */ /* eslint-disable @typescript-eslint/no-empty-interface */ -import { enumeration, encodeMessage, decodeMessage, message } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { type Codec, CodeError, decodeMessage, type DecodeOptions, encodeMessage, enumeration, message } from 'protons-runtime' +import { alloc as uint8ArrayAlloc } from 'uint8arrays/alloc' import type { Uint8ArrayList } from 'uint8arraylist' export interface HopMessage { @@ -72,7 +72,7 @@ export namespace HopMessage { if (opts.lengthDelimited !== false) { w.ldelim() } - }, (reader, length) => { + }, (reader, length, opts = {}) => { const obj: any = {} const end = length == null ? reader.len : reader.pos + length @@ -81,24 +81,36 @@ export namespace HopMessage { const tag = reader.uint32() switch (tag >>> 3) { - case 1: + case 1: { obj.type = HopMessage.Type.codec().decode(reader) break - case 2: - obj.peer = Peer.codec().decode(reader, reader.uint32()) + } + case 2: { + obj.peer = Peer.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.peer + }) break - case 3: - obj.reservation = Reservation.codec().decode(reader, reader.uint32()) + } + case 3: { + obj.reservation = Reservation.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.reservation + }) break - case 4: - obj.limit = Limit.codec().decode(reader, reader.uint32()) + } + case 4: { + obj.limit = Limit.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.limit + }) break - case 5: + } + case 5: { obj.status = Status.codec().decode(reader) break - default: + } + default: { reader.skipType(tag & 7) break + } } } @@ -113,8 +125,8 @@ export namespace HopMessage { return encodeMessage(obj, HopMessage.codec()) } - export const decode = (buf: Uint8Array | Uint8ArrayList): HopMessage => { - return decodeMessage(buf, HopMessage.codec()) + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): HopMessage => { + return decodeMessage(buf, HopMessage.codec(), opts) } } @@ -174,7 +186,7 @@ export namespace StopMessage { if (opts.lengthDelimited !== false) { w.ldelim() } - }, (reader, length) => { + }, (reader, length, opts = {}) => { const obj: any = {} const end = length == null ? reader.len : reader.pos + length @@ -183,21 +195,30 @@ export namespace StopMessage { const tag = reader.uint32() switch (tag >>> 3) { - case 1: + case 1: { obj.type = StopMessage.Type.codec().decode(reader) break - case 2: - obj.peer = Peer.codec().decode(reader, reader.uint32()) + } + case 2: { + obj.peer = Peer.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.peer + }) break - case 3: - obj.limit = Limit.codec().decode(reader, reader.uint32()) + } + case 3: { + obj.limit = Limit.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.limit + }) break - case 4: + } + case 4: { obj.status = Status.codec().decode(reader) break - default: + } + default: { reader.skipType(tag & 7) break + } } } @@ -212,8 +233,8 @@ export namespace StopMessage { return encodeMessage(obj, StopMessage.codec()) } - export const decode = (buf: Uint8Array | Uint8ArrayList): StopMessage => { - return decodeMessage(buf, StopMessage.codec()) + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): StopMessage => { + return decodeMessage(buf, StopMessage.codec(), opts) } } @@ -247,9 +268,9 @@ export namespace Peer { if (opts.lengthDelimited !== false) { w.ldelim() } - }, (reader, length) => { + }, (reader, length, opts = {}) => { const obj: any = { - id: new Uint8Array(0), + id: uint8ArrayAlloc(0), addrs: [] } @@ -259,15 +280,22 @@ export namespace Peer { const tag = reader.uint32() switch (tag >>> 3) { - case 1: + case 1: { obj.id = reader.bytes() break - case 2: + } + case 2: { + if (opts.limits?.addrs != null && obj.addrs.length === opts.limits.addrs) { + throw new CodeError('decode error - map field "addrs" had too many elements', 'ERR_MAX_LENGTH') + } + obj.addrs.push(reader.bytes()) break - default: + } + default: { reader.skipType(tag & 7) break + } } } @@ -282,8 +310,8 @@ export namespace Peer { return encodeMessage(obj, Peer.codec()) } - export const decode = (buf: Uint8Array | Uint8ArrayList): Peer => { - return decodeMessage(buf, Peer.codec()) + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): Peer => { + return decodeMessage(buf, Peer.codec(), opts) } } @@ -323,7 +351,7 @@ export namespace Reservation { if (opts.lengthDelimited !== false) { w.ldelim() } - }, (reader, length) => { + }, (reader, length, opts = {}) => { const obj: any = { expire: 0n, addrs: [] @@ -335,18 +363,26 @@ export namespace Reservation { const tag = reader.uint32() switch (tag >>> 3) { - case 1: + case 1: { obj.expire = reader.uint64() break - case 2: + } + case 2: { + if (opts.limits?.addrs != null && obj.addrs.length === opts.limits.addrs) { + throw new CodeError('decode error - map field "addrs" had too many elements', 'ERR_MAX_LENGTH') + } + obj.addrs.push(reader.bytes()) break - case 3: + } + case 3: { obj.voucher = reader.bytes() break - default: + } + default: { reader.skipType(tag & 7) break + } } } @@ -361,8 +397,8 @@ export namespace Reservation { return encodeMessage(obj, Reservation.codec()) } - export const decode = (buf: Uint8Array | Uint8ArrayList): Reservation => { - return decodeMessage(buf, Reservation.codec()) + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): Reservation => { + return decodeMessage(buf, Reservation.codec(), opts) } } @@ -394,7 +430,7 @@ export namespace Limit { if (opts.lengthDelimited !== false) { w.ldelim() } - }, (reader, length) => { + }, (reader, length, opts = {}) => { const obj: any = {} const end = length == null ? reader.len : reader.pos + length @@ -403,15 +439,18 @@ export namespace Limit { const tag = reader.uint32() switch (tag >>> 3) { - case 1: + case 1: { obj.duration = reader.uint32() break - case 2: + } + case 2: { obj.data = reader.uint64() break - default: + } + default: { reader.skipType(tag & 7) break + } } } @@ -426,8 +465,8 @@ export namespace Limit { return encodeMessage(obj, Limit.codec()) } - export const decode = (buf: Uint8Array | Uint8ArrayList): Limit => { - return decodeMessage(buf, Limit.codec()) + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): Limit => { + return decodeMessage(buf, Limit.codec(), opts) } } @@ -494,10 +533,10 @@ export namespace ReservationVoucher { if (opts.lengthDelimited !== false) { w.ldelim() } - }, (reader, length) => { + }, (reader, length, opts = {}) => { const obj: any = { - relay: new Uint8Array(0), - peer: new Uint8Array(0), + relay: uint8ArrayAlloc(0), + peer: uint8ArrayAlloc(0), expiration: 0n } @@ -507,18 +546,22 @@ export namespace ReservationVoucher { const tag = reader.uint32() switch (tag >>> 3) { - case 1: + case 1: { obj.relay = reader.bytes() break - case 2: + } + case 2: { obj.peer = reader.bytes() break - case 3: + } + case 3: { obj.expiration = reader.uint64() break - default: + } + default: { reader.skipType(tag & 7) break + } } } @@ -533,7 +576,7 @@ export namespace ReservationVoucher { return encodeMessage(obj, ReservationVoucher.codec()) } - export const decode = (buf: Uint8Array | Uint8ArrayList): ReservationVoucher => { - return decodeMessage(buf, ReservationVoucher.codec()) + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): ReservationVoucher => { + return decodeMessage(buf, ReservationVoucher.codec(), opts) } } diff --git a/packages/transport-circuit-relay-v2/src/server/advert-service.ts b/packages/transport-circuit-relay-v2/src/server/advert-service.ts deleted file mode 100644 index 4b8f55dddb..0000000000 --- a/packages/transport-circuit-relay-v2/src/server/advert-service.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { TypedEventEmitter } from '@libp2p/interface' -import pRetry from 'p-retry' -import { - DEFAULT_ADVERT_BOOT_DELAY, - ERR_NO_ROUTERS_AVAILABLE, - RELAY_RENDEZVOUS_NS -} from '../constants.js' -import { namespaceToCid } from '../utils.js' -import type { ComponentLogger, Logger, ContentRouting, Startable } from '@libp2p/interface' - -export interface AdvertServiceInit { - /** - * How long to wait after startup to begin advertising the service - * - if some configured content routers take a while to warm up (for - * example, the DHT needs some peers to be able to publish) this - * value should be high enough that they will have warmed up - */ - bootDelay?: number -} - -export interface AdvertServiceComponents { - contentRouting: ContentRouting - logger: ComponentLogger -} - -export interface AdvertServiceEvents { - 'advert:success': CustomEvent - 'advert:error': CustomEvent -} - -export class AdvertService extends TypedEventEmitter implements Startable { - private readonly contentRouting: ContentRouting - private timeout?: any - private started: boolean - private readonly bootDelay: number - private readonly log: Logger - - /** - * Creates an instance of Relay - */ - constructor (components: AdvertServiceComponents, init?: AdvertServiceInit) { - super() - - this.log = components.logger.forComponent('libp2p:circuit-relay:advert-service') - this.contentRouting = components.contentRouting - this.bootDelay = init?.bootDelay ?? DEFAULT_ADVERT_BOOT_DELAY - this.started = false - } - - isStarted (): boolean { - return this.started - } - - /** - * Start Relay service - */ - start (): void { - if (this.started) { - return - } - - // Advertise service if HOP enabled and advertising enabled - this.timeout = setTimeout(() => { - this._advertiseService().catch(err => { - this.log.error('could not advertise service', err) - }) - }, this.bootDelay) - - this.started = true - } - - /** - * Stop Relay service - */ - stop (): void { - try { - clearTimeout(this.timeout) - } catch (err) { } - - this.started = false - } - - /** - * Advertise hop relay service in the network. - */ - async _advertiseService (): Promise { - await pRetry(async () => { - try { - const cid = await namespaceToCid(RELAY_RENDEZVOUS_NS) - await this.contentRouting.provide(cid) - - this.safeDispatchEvent('advert:success', { detail: undefined }) - } catch (err: any) { - this.safeDispatchEvent('advert:error', { detail: err }) - - if (err.code === ERR_NO_ROUTERS_AVAILABLE) { - this.log.error('a content router, such as a DHT, must be provided in order to advertise the relay service', err) - this.stop() - return - } - - this.log.error('could not advertise service', err) - throw err - } - }) - } -} diff --git a/packages/transport-circuit-relay-v2/src/server/index.ts b/packages/transport-circuit-relay-v2/src/server/index.ts index 36eb9fdfa8..16996249fd 100644 --- a/packages/transport-circuit-relay-v2/src/server/index.ts +++ b/packages/transport-circuit-relay-v2/src/server/index.ts @@ -14,7 +14,6 @@ import { } from '../constants.js' import { HopMessage, type Reservation, Status, StopMessage } from '../pb/index.js' import { createLimitedRelay } from '../utils.js' -import { AdvertService, type AdvertServiceComponents, type AdvertServiceInit } from './advert-service.js' import { ReservationStore, type ReservationStoreInit } from './reservation-store.js' import { ReservationVoucherRecord } from './reservation-voucher.js' import type { CircuitRelayService, RelayReservation } from '../index.js' @@ -31,12 +30,6 @@ export interface CircuitRelayServerInit { */ hopTimeout?: number - /** - * If true, advertise this service via libp2p content routing to allow - * peers to locate us on the network (default: false) - */ - advertise?: boolean | AdvertServiceInit - /** * Configuration of reservations */ @@ -70,7 +63,7 @@ export interface StopOptions { request: StopMessage } -export interface CircuitRelayServerComponents extends AdvertServiceComponents { +export interface CircuitRelayServerComponents { registrar: Registrar peerStore: PeerStore addressManager: AddressManager @@ -98,7 +91,6 @@ class CircuitRelayServer extends TypedEventEmitter implements private readonly connectionManager: ConnectionManager private readonly connectionGater: ConnectionGater private readonly reservationStore: ReservationStore - private readonly advertService: AdvertService | undefined private started: boolean private readonly hopTimeout: number private readonly shutdownController: AbortController @@ -122,24 +114,13 @@ class CircuitRelayServer extends TypedEventEmitter implements this.connectionGater = components.connectionGater this.started = false this.hopTimeout = init?.hopTimeout ?? DEFAULT_HOP_TIMEOUT - this.shutdownController = new AbortController() this.maxInboundHopStreams = init.maxInboundHopStreams this.maxOutboundHopStreams = init.maxOutboundHopStreams this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams + this.reservationStore = new ReservationStore(init.reservations) + this.shutdownController = new AbortController() setMaxListeners(Infinity, this.shutdownController.signal) - - if (init.advertise != null && init.advertise !== false) { - this.advertService = new AdvertService(components, init.advertise === true ? undefined : init.advertise) - this.advertService.addEventListener('advert:success', () => { - this.safeDispatchEvent('relay:advert:success', {}) - }) - this.advertService.addEventListener('advert:error', (evt) => { - this.safeDispatchEvent('relay:advert:error', { detail: evt.detail }) - }) - } - - this.reservationStore = new ReservationStore(init.reservations) } isStarted (): boolean { @@ -154,9 +135,6 @@ class CircuitRelayServer extends TypedEventEmitter implements return } - // Advertise service if HOP enabled and advertising enabled - this.advertService?.start() - await this.registrar.handle(RELAY_V2_HOP_CODEC, (data) => { void this.onHop(data).catch(err => { this.log.error(err) @@ -176,7 +154,6 @@ class CircuitRelayServer extends TypedEventEmitter implements * Stop Relay service */ async stop (): Promise { - this.advertService?.stop() this.reservationStore.stop() this.shutdownController.abort() await this.registrar.unhandle(RELAY_V2_HOP_CODEC) @@ -359,6 +336,7 @@ class CircuitRelayServer extends TypedEventEmitter implements return } + const limit = this.reservationStore.get(dstPeer)?.limit const destinationConnection = connections[0] const destinationStream = await this.stopHop({ @@ -368,7 +346,8 @@ class CircuitRelayServer extends TypedEventEmitter implements peer: { id: connection.remotePeer.toBytes(), addrs: [] - } + }, + limit } }) @@ -378,11 +357,14 @@ class CircuitRelayServer extends TypedEventEmitter implements return } - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.OK }) + await hopstr.write({ + type: HopMessage.Type.STATUS, + status: Status.OK, + limit + }) const sourceStream = stream.unwrap() this.log('connection from %p to %p established - merging streams', connection.remotePeer, dstPeer) - const limit = this.reservationStore.get(dstPeer)?.limit // Short circuit the two streams to create the relayed connection createLimitedRelay(sourceStream, destinationStream, this.shutdownController.signal, limit, { log: this.log diff --git a/packages/transport-circuit-relay-v2/src/transport/discovery.ts b/packages/transport-circuit-relay-v2/src/transport/discovery.ts index 1958974e0b..71ac88eb08 100644 --- a/packages/transport-circuit-relay-v2/src/transport/discovery.ts +++ b/packages/transport-circuit-relay-v2/src/transport/discovery.ts @@ -1,24 +1,28 @@ -import { TypedEventEmitter } from '@libp2p/interface' +import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface' +import { PeerQueue } from '@libp2p/utils/peer-queue' +import { anySignal } from 'any-signal' +import { raceSignal } from 'race-signal' import { - RELAY_RENDEZVOUS_NS, RELAY_V2_HOP_CODEC } from '../constants.js' -import { namespaceToCid } from '../utils.js' -import type { ComponentLogger, Logger, ContentRouting, PeerId, PeerStore, Startable } from '@libp2p/interface' -import type { ConnectionManager, Registrar, TransportManager } from '@libp2p/interface-internal' +import type { ComponentLogger, Logger, PeerId, PeerStore, Startable, TopologyFilter } from '@libp2p/interface' +import type { ConnectionManager, RandomWalk, Registrar, TransportManager } from '@libp2p/interface-internal' export interface RelayDiscoveryEvents { 'relay:discover': CustomEvent } export interface RelayDiscoveryComponents { - peerId: PeerId peerStore: PeerStore connectionManager: ConnectionManager transportManager: TransportManager - contentRouting: ContentRouting registrar: Registrar logger: ComponentLogger + randomWalk: RandomWalk +} + +export interface RelayDiscoveryInit { + filter?: TopologyFilter } /** @@ -26,23 +30,30 @@ export interface RelayDiscoveryComponents { * peers that support the circuit v2 HOP protocol. */ export class RelayDiscovery extends TypedEventEmitter implements Startable { - private readonly peerId: PeerId private readonly peerStore: PeerStore - private readonly contentRouting: ContentRouting private readonly registrar: Registrar + private readonly connectionManager: ConnectionManager + private readonly randomWalk: RandomWalk private started: boolean + private running: boolean private topologyId?: string private readonly log: Logger + private discoveryController: AbortController + private readonly filter?: TopologyFilter - constructor (components: RelayDiscoveryComponents) { + constructor (components: RelayDiscoveryComponents, init: RelayDiscoveryInit = {}) { super() this.log = components.logger.forComponent('libp2p:circuit-relay:discover-relays') this.started = false - this.peerId = components.peerId + this.running = false this.peerStore = components.peerStore - this.contentRouting = components.contentRouting this.registrar = components.registrar + this.connectionManager = components.connectionManager + this.randomWalk = components.randomWalk + this.filter = init.filter + this.discoveryController = new AbortController() + setMaxListeners(Infinity, this.discoveryController.signal) } isStarted (): boolean { @@ -53,8 +64,9 @@ export class RelayDiscovery extends TypedEventEmitter impl // register a topology listener for when new peers are encountered // that support the hop protocol this.topologyId = await this.registrar.register(RELAY_V2_HOP_CODEC, { - notifyOnTransient: true, + filter: this.filter, onConnect: (peerId) => { + this.log('discovered relay %p', peerId) this.safeDispatchEvent('relay:discover', { detail: peerId }) } }) @@ -62,18 +74,12 @@ export class RelayDiscovery extends TypedEventEmitter impl this.started = true } - afterStart (): void { - void this.discover() - .catch(err => { - this.log.error('error discovering relays', err) - }) - } - stop (): void { if (this.topologyId != null) { this.registrar.unregister(this.topologyId) } + this.discoveryController?.abort() this.started = false } @@ -81,54 +87,113 @@ export class RelayDiscovery extends TypedEventEmitter impl * Try to listen on available hop relay connections. * The following order will happen while we do not have enough relays: * - * 1. Check the metadata store for known relays, try to listen on the ones we are already connected + * 1. Check the metadata store for known relays, try to listen on the ones we are already connected to * 2. Dial and try to listen on the peers we know that support hop but are not connected * 3. Search the network */ - async discover (): Promise { - this.log('searching peer store for relays') - const peers = (await this.peerStore.all({ - filters: [ - // filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on - (peer) => { - return peer.protocols.includes(RELAY_V2_HOP_CODEC) - } - ], - orders: [ - () => Math.random() < 0.5 ? 1 : -1 - ] - })) - - for (const peer of peers) { - this.log('found relay peer %p in content peer store', peer.id) - this.safeDispatchEvent('relay:discover', { detail: peer.id }) + startDiscovery (): void { + if (this.running) { + return } - this.log('found %d relay peers in peer store', peers.length) + this.log('start discovery') + this.running = true + this.discoveryController = new AbortController() + setMaxListeners(Infinity, this.discoveryController.signal) + + Promise.resolve() + .then(async () => { + this.log('searching peer store for relays') + + const peers = (await this.peerStore.all({ + filters: [ + // filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on + (peer) => { + return peer.protocols.includes(RELAY_V2_HOP_CODEC) + } + ], + orders: [ + () => Math.random() < 0.5 ? 1 : -1 + ] + })) + + for (const peer of peers) { + this.log.trace('found relay peer %p in peer store', peer.id) + this.safeDispatchEvent('relay:discover', { detail: peer.id }) + } + + this.log('found %d relay peers in peer store', peers.length) + + // perform random walk and dial peers - after identify has run, the network + // topology will be notified of new relays + const queue = new PeerQueue({ + concurrency: 5 + }) + + this.log('start random walk') + for await (const peer of this.randomWalk.walk({ signal: this.discoveryController.signal })) { + this.log.trace('found random peer %p', peer.id) + + if (queue.has(peer.id)) { + this.log.trace('random peer %p was already in queue', peer.id) + + // skip peers already in the queue + continue + } - try { - this.log('searching content routing for relays') - const cid = await namespaceToCid(RELAY_RENDEZVOUS_NS) + if (this.connectionManager.getConnections(peer.id)?.length > 0) { + this.log.trace('random peer %p was already connected', peer.id) - let found = 0 + // skip peers we are already connected to + continue + } - for await (const provider of this.contentRouting.findProviders(cid)) { - if (provider.multiaddrs.length > 0 && !provider.id.equals(this.peerId)) { - const peerId = provider.id + if (!(await this.connectionManager.isDialable(peer.multiaddrs))) { + this.log.trace('random peer %p was not dialable', peer.id, peer.multiaddrs.map(ma => ma.toString())) - found++ - await this.peerStore.merge(peerId, { - multiaddrs: provider.multiaddrs + // skip peers we can't dial + continue + } + + this.log.trace('wait for space in queue for %p', peer.id) + + // pause the random walk until there is space in the queue + await raceSignal(queue.onSizeLessThan(10), this.discoveryController.signal) + + this.log('adding random peer %p to dial queue (length: %d)', peer.id, queue.size) + + // dial the peer - this will cause identify to run and our topology to + // be notified and we'll attempt to create reservations + queue.add(async () => { + const signal = anySignal([this.discoveryController.signal, AbortSignal.timeout(5000)]) + setMaxListeners(Infinity, signal) + + try { + await this.connectionManager.openConnection(peer.id, { signal }) + } finally { + signal.clear() + } + }, { + peerId: peer.id, + signal: this.discoveryController.signal }) + .catch(err => { + this.log.error('error opening connection to random peer %p', peer.id, err) + }) + } - this.log('found relay peer %p in content routing', peerId) - this.safeDispatchEvent('relay:discover', { detail: peerId }) + await queue.onIdle() + }) + .catch(err => { + if (!this.discoveryController.signal.aborted) { + this.log.error('failed when finding relays on the network', err) } - } + }) + } - this.log('found %d relay peers in content routing', found) - } catch (err: any) { - this.log.error('failed when finding relays on the network', err) - } + stopDiscovery (): void { + this.log('stop discovery') + this.running = false + this.discoveryController?.abort() } } diff --git a/packages/transport-circuit-relay-v2/src/transport/index.ts b/packages/transport-circuit-relay-v2/src/transport/index.ts index be25883e4b..77841f932f 100644 --- a/packages/transport-circuit-relay-v2/src/transport/index.ts +++ b/packages/transport-circuit-relay-v2/src/transport/index.ts @@ -1,20 +1,16 @@ -import { type Transport, type Upgrader, type Libp2pEvents, type ComponentLogger, type ConnectionGater, type ContentRouting, type TypedEventTarget, type PeerId, type PeerStore } from '@libp2p/interface' -import { type RelayDiscoveryComponents } from './discovery.js' -import { type RelayStoreInit } from './reservation-store.js' import { CircuitRelayTransport } from './transport.js' -import type { AddressManager, ConnectionManager, Registrar } from '@libp2p/interface-internal' +import type { RelayDiscoveryComponents } from './discovery.js' +import type { RelayStoreInit } from './reservation-store.js' +import type { Transport, Upgrader, Libp2pEvents, ConnectionGater, TypedEventTarget, PeerId, TopologyFilter } from '@libp2p/interface' +import type { AddressManager, Registrar } from '@libp2p/interface-internal' export interface CircuitRelayTransportComponents extends RelayDiscoveryComponents { peerId: PeerId - peerStore: PeerStore registrar: Registrar - connectionManager: ConnectionManager upgrader: Upgrader addressManager: AddressManager - contentRouting: ContentRouting connectionGater: ConnectionGater events: TypedEventTarget - logger: ComponentLogger } /** @@ -22,34 +18,48 @@ export interface CircuitRelayTransportComponents extends RelayDiscoveryComponent */ export interface CircuitRelayTransportInit extends RelayStoreInit { /** - * The number of peers running diable relays to search for and - * connect to. (default: 0) + * The number of peers running diable relays to search for and connect to + * + * @default 0 */ discoverRelays?: number + /** + * An optional filter used to prevent duplicate attempts to reserve relay + * slots on the same peer + */ + discoveryFilter?: TopologyFilter + /** * The maximum number of simultaneous STOP inbound streams that can be open at - * once - each inbound relayed connection uses a STOP stream (default: 300) + * once - each inbound relayed connection uses a STOP stream + * + * @default 300 */ maxInboundStopStreams?: number /** - * The maximum number of simultaneous STOP outbound streams that can be open at - * once. If this transport is used along with the relay server these settings - * should be set to the same value (default: 300) + * The maximum number of simultaneous STOP outbound streams that can be open + * at once. If this transport is used along with the relay server these + * settings should be set to the same value + * + * @default 300 */ maxOutboundStopStreams?: number /** - * Incoming STOP requests (e.g. when a remote peer wants to dial us via a relay) - * must finish the initial protocol negotiation within this timeout in ms - * (default: 30000) + * Incoming STOP requests (e.g. when a remote peer wants to dial us via a + * relay) must finish the initial protocol negotiation within this timeout in + * ms + * + * @default 30000 */ stopTimeout?: number /** * When creating a reservation it must complete within this number of ms - * (default: 10000) + * + * @default 10000 */ reservationCompletionTimeout?: number } diff --git a/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts b/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts index 4557c85301..280d6347b8 100644 --- a/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts +++ b/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts @@ -1,15 +1,17 @@ -import { TypedEventEmitter } from '@libp2p/interface' +import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface' import { PeerMap } from '@libp2p/peer-collections' +import { createBloomFilter } from '@libp2p/utils/filters' import { PeerQueue } from '@libp2p/utils/peer-queue' import { multiaddr } from '@multiformats/multiaddr' import { pbStream } from 'it-protobuf-stream' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' -import { DEFAULT_RESERVATION_CONCURRENCY, RELAY_TAG, RELAY_V2_HOP_CODEC } from '../constants.js' +import { DEFAULT_MAX_RESERVATION_QUEUE_LENGTH, DEFAULT_RESERVATION_COMPLETION_TIMEOUT, DEFAULT_RESERVATION_CONCURRENCY, RELAY_TAG, RELAY_V2_HOP_CODEC } from '../constants.js' import { HopMessage, Status } from '../pb/index.js' import { getExpirationMilliseconds } from '../utils.js' import type { Reservation } from '../pb/index.js' import type { TypedEventTarget, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, PeerId, PeerStore, Startable, Metrics } from '@libp2p/interface' import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal' +import type { Filter } from '@libp2p/utils/filters' // allow refreshing a relay reservation if it will expire in the next 10 minutes const REFRESH_WINDOW = (60 * 1000) * 10 @@ -69,6 +71,7 @@ interface RelayEntry { export interface ReservationStoreEvents { 'relay:not-enough-relays': CustomEvent 'relay:removed': CustomEvent + 'relay:created-reservation': CustomEvent } export class ReservationStore extends TypedEventEmitter implements Startable { @@ -84,6 +87,7 @@ export class ReservationStore extends TypedEventEmitter private readonly reservationCompletionTimeout: number private started: boolean private readonly log: Logger + private readonly relayFilter: Filter constructor (components: RelayStoreComponents, init?: RelayStoreInit) { super() @@ -96,9 +100,10 @@ export class ReservationStore extends TypedEventEmitter this.events = components.events this.reservations = new PeerMap() this.maxDiscoveredRelays = init?.discoverRelays ?? 0 - this.maxReservationQueueLength = init?.maxReservationQueueLength ?? 100 - this.reservationCompletionTimeout = init?.reservationCompletionTimeout ?? 10000 + this.maxReservationQueueLength = init?.maxReservationQueueLength ?? DEFAULT_MAX_RESERVATION_QUEUE_LENGTH + this.reservationCompletionTimeout = init?.reservationCompletionTimeout ?? DEFAULT_RESERVATION_COMPLETION_TIMEOUT this.started = false + this.relayFilter = createBloomFilter(100) // ensure we don't listen on multiple relays simultaneously this.reserveQueue = new PeerQueue({ @@ -123,6 +128,13 @@ export class ReservationStore extends TypedEventEmitter this.started = true } + afterStart (): void { + if (this.reservations.size < this.maxDiscoveredRelays) { + this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) + this.safeDispatchEvent('relay:not-enough-relays', {}) + } + } + stop (): void { this.reserveQueue.clear() this.reservations.forEach(({ timeout }) => { @@ -134,9 +146,9 @@ export class ReservationStore extends TypedEventEmitter /** * If the number of current relays is beneath the configured `maxReservations` - * value, and the passed peer id is not our own, and we have a non-relayed connection - * to the remote, and the remote peer speaks the hop protocol, try to reserve a slot - * on the remote peer + * value, and the passed peer id is not our own, and we have a non-relayed + * connection to the remote, and the remote peer speaks the hop protocol, try + * to reserve a slot on the remote peer */ async addRelay (peerId: PeerId, type: RelayType): Promise { if (this.peerId.equals(peerId)) { @@ -145,18 +157,25 @@ export class ReservationStore extends TypedEventEmitter } if (this.reserveQueue.size > this.maxReservationQueueLength) { - this.log('not adding relay as the queue is full') + this.log('not adding potential relay peer %p as the queue is full', peerId) return } if (this.reserveQueue.has(peerId)) { - this.log('relay peer is already in the reservation queue') + this.log('potential relay peer %p is already in the reservation queue', peerId) + return + } + + if (this.relayFilter.has(peerId.toBytes())) { + this.log('potential relay peer %p has failed previously, not trying again', peerId, new Error('where').stack) return } - this.log('add relay %p', peerId) + this.log('try to reserve relay slot with %p', peerId) await this.reserveQueue.add(async () => { + const start = Date.now() + try { // allow refresh of an existing reservation if it is about to expire const existingReservation = this.reservations.get(peerId) @@ -183,6 +202,7 @@ export class ReservationStore extends TypedEventEmitter } const signal = AbortSignal.timeout(this.reservationCompletionTimeout) + setMaxListeners(Infinity, signal) const connection = await this.connectionManager.openConnection(peerId, { signal @@ -230,8 +250,12 @@ export class ReservationStore extends TypedEventEmitter // listen on multiaddr that only the circuit transport is listening for await this.transportManager.listen([multiaddr(`/p2p/${peerId.toString()}/p2p-circuit`)]) + + this.safeDispatchEvent('relay:created-reservation', { + detail: peerId + }) } catch (err) { - this.log.error('could not reserve slot on %p', peerId, err) + this.log.error('could not reserve slot on %p after %dms', peerId, Date.now() - start, err) // cancel the renewal timeout if it's been set const reservation = this.reservations.get(peerId) @@ -242,6 +266,9 @@ export class ReservationStore extends TypedEventEmitter // if listening failed, remove the reservation this.reservations.delete(peerId) + + // don't try this peer again + this.relayFilter.add(peerId.toBytes()) } }, { peerId @@ -256,6 +283,10 @@ export class ReservationStore extends TypedEventEmitter return this.reservations.get(peerId)?.reservation } + reservationCount (): number { + return this.reservations.size + } + async #createReservation (connection: Connection, options: AbortOptions): Promise { options.signal?.throwIfAborted() @@ -270,11 +301,12 @@ export class ReservationStore extends TypedEventEmitter try { response = await hopstr.read(options) } catch (err: any) { - this.log.error('error parsing reserve message response from %p because', connection.remotePeer, err) stream.abort(err) throw err } finally { - await stream.close() + if (stream.status !== 'closed') { + await stream.close(options) + } } if (response.status === Status.OK && (response.reservation != null)) { diff --git a/packages/transport-circuit-relay-v2/src/transport/transport.ts b/packages/transport-circuit-relay-v2/src/transport/transport.ts index cc1978276b..31e41b89f8 100644 --- a/packages/transport-circuit-relay-v2/src/transport/transport.ts +++ b/packages/transport-circuit-relay-v2/src/transport/transport.ts @@ -1,11 +1,12 @@ -import { CodeError } from '@libp2p/interface' +import { CodeError, start, stop } from '@libp2p/interface' import { transportSymbol, type Transport, type CreateListenerOptions, type Listener, type Upgrader, type AbortOptions, type ComponentLogger, type Logger, type Connection, type Stream, type ConnectionGater, type PeerId, type PeerStore } from '@libp2p/interface' +import { peerFilter } from '@libp2p/peer-collections' import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id' import { streamToMaConnection } from '@libp2p/utils/stream-to-ma-conn' import * as mafmt from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import { pbStream } from 'it-protobuf-stream' -import { CIRCUIT_PROTO_CODE, ERR_HOP_REQUEST_FAILED, ERR_RELAYED_DIAL, MAX_CONNECTIONS, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC } from '../constants.js' +import { CIRCUIT_PROTO_CODE, DEFAULT_DISCOVERY_FILTER_ERROR_RATE, DEFAULT_DISCOVERY_FILTER_SIZE, ERR_HOP_REQUEST_FAILED, ERR_RELAYED_DIAL, MAX_CONNECTIONS, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC } from '../constants.js' import { StopMessage, HopMessage, Status } from '../pb/index.js' import { RelayDiscovery } from './discovery.js' import { createListener } from './listener.js' @@ -77,8 +78,12 @@ export class CircuitRelayTransport implements Transport { this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams this.stopTimeout = init.stopTimeout ?? defaults.stopTimeout - if (init.discoverRelays != null && init.discoverRelays > 0) { - this.discovery = new RelayDiscovery(components) + const discoverRelays = init.discoverRelays ?? 0 + + if (discoverRelays > 0) { + this.discovery = new RelayDiscovery(components, { + filter: init.discoveryFilter ?? peerFilter(DEFAULT_DISCOVERY_FILTER_SIZE, DEFAULT_DISCOVERY_FILTER_ERROR_RATE) + }) this.discovery.addEventListener('relay:discover', (evt) => { this.reservationStore.addRelay(evt.detail, 'discovered') .catch(err => { @@ -89,10 +94,12 @@ export class CircuitRelayTransport implements Transport { this.reservationStore = new ReservationStore(components, init) this.reservationStore.addEventListener('relay:not-enough-relays', () => { - this.discovery?.discover() - .catch(err => { - this.log.error('could not discover relays', err) - }) + this.discovery?.startDiscovery() + }) + this.reservationStore.addEventListener('relay:created-reservation', () => { + if (this.reservationStore.reservationCount() >= discoverRelays) { + this.discovery?.stopDiscovery() + } }) this.started = false @@ -103,8 +110,6 @@ export class CircuitRelayTransport implements Transport { } async start (): Promise { - this.reservationStore.start() - await this.registrar.handle(RELAY_V2_STOP_CODEC, (data) => { void this.onStop(data).catch(err => { this.log.error('error while handling STOP protocol', err) @@ -116,18 +121,13 @@ export class CircuitRelayTransport implements Transport { runOnTransientConnection: true }) - await this.discovery?.start() + await start(this.discovery, this.reservationStore) this.started = true } - afterStart (): void { - this.discovery?.afterStart() - } - async stop (): Promise { - this.discovery?.stop() - this.reservationStore.stop() + await stop(this.discovery, this.reservationStore) await this.registrar.unhandle(RELAY_V2_STOP_CODEC) this.started = false @@ -231,9 +231,9 @@ export class CircuitRelayTransport implements Transport { logger: this.logger }) - this.log('new outbound transient connection %a', maConn.remoteAddr) + this.log('new outbound relayed connection %a', maConn.remoteAddr) return await this.upgrader.upgradeOutbound(maConn, { - transient: true + transient: status.limit != null }) } catch (err: any) { this.log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err) @@ -346,9 +346,9 @@ export class CircuitRelayTransport implements Transport { logger: this.logger }) - this.log('new inbound transient connection %a', maConn.remoteAddr) + this.log('new inbound relayed connection %a', maConn.remoteAddr) await this.upgrader.upgradeInbound(maConn, { - transient: true + transient: request.limit != null }) this.log('%s connection %a upgraded', 'inbound', maConn.remoteAddr) } diff --git a/packages/transport-circuit-relay-v2/test/hop.spec.ts b/packages/transport-circuit-relay-v2/test/hop.spec.ts index 39a004780c..97c10a913e 100644 --- a/packages/transport-circuit-relay-v2/test/hop.spec.ts +++ b/packages/transport-circuit-relay-v2/test/hop.spec.ts @@ -1,7 +1,6 @@ /* eslint-disable max-nested-callbacks */ -import { TypedEventEmitter, type TypedEventTarget, type ComponentLogger, type Libp2pEvents, type Connection, type Stream, type ConnectionGater, type ContentRouting, type PeerId, type PeerStore, type Transport, type Upgrader } from '@libp2p/interface' -import { isStartable } from '@libp2p/interface' +import { TypedEventEmitter, isStartable } from '@libp2p/interface' import { matchPeerId } from '@libp2p/interface-compliance-tests/matchers' import { mockRegistrar, mockUpgrader, mockNetwork, mockConnectionManager, mockConnectionGater } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' @@ -16,7 +15,8 @@ import { DEFAULT_MAX_RESERVATION_STORE_SIZE, RELAY_SOURCE_TAG, RELAY_V2_HOP_CODE import { circuitRelayServer, type CircuitRelayService, circuitRelayTransport } from '../src/index.js' import { HopMessage, Status } from '../src/pb/index.js' import type { CircuitRelayServerInit } from '../src/server/index.js' -import type { AddressManager, ConnectionManager, Registrar, TransportManager } from '@libp2p/interface-internal' +import type { TypedEventTarget, ComponentLogger, Libp2pEvents, Connection, Stream, ConnectionGater, PeerId, PeerStore, Upgrader, Transport } from '@libp2p/interface' +import type { RandomWalk, AddressManager, ConnectionManager, Registrar, TransportManager } from '@libp2p/interface-internal' interface Node { peerId: PeerId @@ -82,7 +82,6 @@ describe('circuit-relay hop protocol', function () { const service = circuitRelayServer(circuitRelayInit)({ addressManager, - contentRouting: stubInterface(), connectionManager, peerId, peerStore, @@ -98,9 +97,9 @@ describe('circuit-relay hop protocol', function () { const transport = circuitRelayTransport({})({ addressManager, connectionManager, - contentRouting: stubInterface(), peerId, peerStore, + randomWalk: stubInterface(), registrar, transportManager: stubInterface(), upgrader, diff --git a/packages/transport-circuit-relay-v2/test/stop.spec.ts b/packages/transport-circuit-relay-v2/test/stop.spec.ts index 57a9926754..7590c030dc 100644 --- a/packages/transport-circuit-relay-v2/test/stop.spec.ts +++ b/packages/transport-circuit-relay-v2/test/stop.spec.ts @@ -1,7 +1,6 @@ /* eslint-env mocha */ -import { TypedEventEmitter, type TypedEventTarget, type ComponentLogger, type Libp2pEvents, type Connection, type Stream, type ConnectionGater, type ContentRouting, type PeerId, type PeerStore, type Upgrader } from '@libp2p/interface' -import { isStartable } from '@libp2p/interface' +import { TypedEventEmitter, isStartable } from '@libp2p/interface' import { mockStream } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' import { createEd25519PeerId } from '@libp2p/peer-id-factory' @@ -14,7 +13,8 @@ import Sinon from 'sinon' import { stubInterface, type StubbedInstance } from 'sinon-ts' import { Status, StopMessage } from '../src/pb/index.js' import { CircuitRelayTransport } from '../src/transport/transport.js' -import type { AddressManager, ConnectionManager, Registrar, StreamHandler, TransportManager } from '@libp2p/interface-internal' +import type { TypedEventTarget, ComponentLogger, Libp2pEvents, Connection, Stream, ConnectionGater, PeerId, PeerStore, Upgrader } from '@libp2p/interface' +import type { AddressManager, ConnectionManager, RandomWalk, Registrar, StreamHandler, TransportManager } from '@libp2p/interface-internal' interface StubbedCircuitRelayTransportComponents { peerId: PeerId @@ -24,7 +24,7 @@ interface StubbedCircuitRelayTransportComponents { transportManager: StubbedInstance upgrader: StubbedInstance addressManager: StubbedInstance - contentRouting: StubbedInstance + randomWalk: StubbedInstance connectionGater: StubbedInstance events: TypedEventTarget logger: ComponentLogger @@ -44,9 +44,9 @@ describe('circuit-relay stop protocol', function () { components = { addressManager: stubInterface(), connectionManager: stubInterface(), - contentRouting: stubInterface(), peerId: await createEd25519PeerId(), peerStore: stubInterface(), + randomWalk: stubInterface(), registrar: stubInterface(), transportManager: stubInterface(), upgrader: stubInterface(), diff --git a/packages/transport-tcp/package.json b/packages/transport-tcp/package.json index ab720914cf..84583bcf0d 100644 --- a/packages/transport-tcp/package.json +++ b/packages/transport-tcp/package.json @@ -66,7 +66,7 @@ "devDependencies": { "@libp2p/interface-compliance-tests": "^5.4.5", "@libp2p/logger": "^4.0.13", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "it-all": "^3.0.6", "it-pipe": "^3.0.1", "p-defer": "^4.0.1", diff --git a/packages/transport-webrtc/.aegir.js b/packages/transport-webrtc/.aegir.js index 02e55efc8a..e95fcc9e5e 100644 --- a/packages/transport-webrtc/.aegir.js +++ b/packages/transport-webrtc/.aegir.js @@ -1,4 +1,3 @@ - /** @type {import('aegir').PartialOptions} */ export default { build: { diff --git a/packages/transport-webrtc/README.md b/packages/transport-webrtc/README.md index 24c7f89ac3..ceb4b492c8 100644 --- a/packages/transport-webrtc/README.md +++ b/packages/transport-webrtc/README.md @@ -52,7 +52,7 @@ At the time of writing, WebRTC Direct is dial-only in browsers and not supported Support in Node.js is possible but PRs will need to be opened to [libdatachannel](https://github.com/paullouisageneau/libdatachannel) and the appropriate APIs exposed in [node-datachannel](https://github.com/murat-dogan/node-datachannel). -For both WebRTC and WebRTC Direct, support is arriving soon in go-libp2p but they are unsupported in rust-libp2p. +WebRTC Direct support is available in rust-libp2p and arriving soon in go-libp2p. See the WebRTC section of for more information. diff --git a/packages/transport-webrtc/package.json b/packages/transport-webrtc/package.json index 0ad02ab631..74e0b2bafd 100644 --- a/packages/transport-webrtc/package.json +++ b/packages/transport-webrtc/package.json @@ -40,7 +40,7 @@ "generate": "protons src/private-to-private/pb/message.proto src/pb/message.proto", "build": "aegir build", "test": "aegir test -t node -t browser", - "test:node": "aegir test -t node --cov", + "test:node": "aegir test -t node --cov -- --exit", "test:chrome": "aegir test -t browser --cov", "test:firefox": "aegir test -t browser -- --browser firefox", "lint": "aegir lint", @@ -83,7 +83,7 @@ "@libp2p/peer-id-factory": "^4.1.2", "@libp2p/websockets": "^8.0.24", "@types/sinon": "^17.0.3", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "delay": "^6.0.0", "it-drain": "^3.0.7", "it-length": "^3.0.6", diff --git a/packages/transport-webrtc/src/index.ts b/packages/transport-webrtc/src/index.ts index 0e1aba251c..ffda128049 100644 --- a/packages/transport-webrtc/src/index.ts +++ b/packages/transport-webrtc/src/index.ts @@ -29,7 +29,7 @@ * * Support in Node.js is possible but PRs will need to be opened to [libdatachannel](https://github.com/paullouisageneau/libdatachannel) and the appropriate APIs exposed in [node-datachannel](https://github.com/murat-dogan/node-datachannel). * - * For both WebRTC and WebRTC Direct, support is arriving soon in go-libp2p but they are unsupported in rust-libp2p. + * WebRTC Direct support is available in rust-libp2p and arriving soon in go-libp2p. * * See the WebRTC section of https://connectivity.libp2p.io for more information. * diff --git a/packages/transport-webrtc/test/transport.spec.ts b/packages/transport-webrtc/test/transport.spec.ts index 790e0c3709..f28ac2cc58 100644 --- a/packages/transport-webrtc/test/transport.spec.ts +++ b/packages/transport-webrtc/test/transport.spec.ts @@ -70,7 +70,7 @@ describe('WebRTCDirect Transport', () => { ] const invalid = [ multiaddr('/ip4/1.2.3.4/udp/1234/webrtc/certhash/uEiAUqV7kzvM1wI5DYDc1RbcekYVmXli_Qprlw3IkiEg6tQ'), - multiaddr('/ip4/1.2.3.4/udp/1234/webrtc-direct/p2p/12D3KooWGDMwwqrpcYKpKCgxuKT2NfqPqa94QnkoBBpqvCaiCzWd'), + multiaddr('/ip4/1.2.3.4/tcp/1234/webrtc-direct/p2p/12D3KooWGDMwwqrpcYKpKCgxuKT2NfqPqa94QnkoBBpqvCaiCzWd'), multiaddr('/ip4/1.2.3.4/udp/1234/certhash/uEiAUqV7kzvM1wI5DYDc1RbcekYVmXli_Qprlw3IkiEg6tQ/p2p/12D3KooWGDMwwqrpcYKpKCgxuKT2NfqPqa94QnkoBBpqvCaiCzWd') ] diff --git a/packages/transport-websockets/package.json b/packages/transport-websockets/package.json index 6d6c918237..8acaa4c873 100644 --- a/packages/transport-websockets/package.json +++ b/packages/transport-websockets/package.json @@ -88,7 +88,7 @@ "devDependencies": { "@libp2p/interface-compliance-tests": "^5.4.5", "@libp2p/logger": "^4.0.13", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "is-loopback-addr": "^2.0.2", "it-all": "^3.0.6", "it-drain": "^3.0.7", diff --git a/packages/transport-webtransport/.aegir.js b/packages/transport-webtransport/.aegir.js index 6a6b15a72c..4fa1f3a589 100644 --- a/packages/transport-webtransport/.aegir.js +++ b/packages/transport-webtransport/.aegir.js @@ -1,5 +1,3 @@ -import { createClient } from '@libp2p/daemon-client' -import { multiaddr } from '@multiformats/multiaddr' import { execa } from 'execa' import { path as p2pd } from 'go-libp2p' import pDefer from 'p-defer' @@ -27,6 +25,10 @@ export default { } async function createGoLibp2p () { + // dynamic import is necessary because these modules have dependencies on + // modules in this monorepo which may not have been built yet + const { multiaddr } = await import('@multiformats/multiaddr') + const { createClient } = await import('@libp2p/daemon-client') const controlPort = Math.floor(Math.random() * (50000 - 10000 + 1)) + 10000 const apiAddr = multiaddr(`/ip4/127.0.0.1/tcp/${controlPort}`) const deferred = pDefer() diff --git a/packages/transport-webtransport/package.json b/packages/transport-webtransport/package.json index 7a833df242..950d05f77c 100644 --- a/packages/transport-webtransport/package.json +++ b/packages/transport-webtransport/package.json @@ -68,7 +68,7 @@ "@libp2p/peer-id-factory": "^4.1.2", "@libp2p/ping": "^1.0.19", "@noble/hashes": "^1.4.0", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "execa": "^9.1.0", "go-libp2p": "^1.2.0", "it-map": "^3.1.0", diff --git a/packages/transport-webtransport/src/muxer.ts b/packages/transport-webtransport/src/muxer.ts index fa5c1e0492..a20eb0bff8 100644 --- a/packages/transport-webtransport/src/muxer.ts +++ b/packages/transport-webtransport/src/muxer.ts @@ -23,38 +23,42 @@ export function webtransportMuxer (wt: Pick { - //! TODO unclear how to add backpressure here? - while (true) { - const { done, value: wtStream } = await reader.read() + Promise.resolve() + .then(async () => { + //! TODO unclear how to add backpressure here? + while (true) { + const { done, value: wtStream } = await reader.read() - if (done) { - break - } + if (done) { + break + } - if (activeStreams.length >= config.maxInboundStreams) { - log(`too many inbound streams open - ${activeStreams.length}/${config.maxInboundStreams}, closing new incoming stream`) - // We've reached our limit, close this stream. - wtStream.writable.close().catch((err: Error) => { - log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) - }) - wtStream.readable.cancel().catch((err: Error) => { - log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) - }) - } else { - const stream = await webtransportBiDiStreamToStream( - wtStream, - String(streamIDCounter++), - 'inbound', - activeStreams, - init?.onStreamEnd, - logger - ) - activeStreams.push(stream) - init?.onIncomingStream?.(stream) + if (activeStreams.length >= config.maxInboundStreams) { + log(`too many inbound streams open - ${activeStreams.length}/${config.maxInboundStreams}, closing new incoming stream`) + // We've reached our limit, close this stream. + wtStream.writable.close().catch((err: Error) => { + log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) + }) + wtStream.readable.cancel().catch((err: Error) => { + log.error(`failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) + }) + } else { + const stream = await webtransportBiDiStreamToStream( + wtStream, + String(streamIDCounter++), + 'inbound', + activeStreams, + init?.onStreamEnd, + logger + ) + activeStreams.push(stream) + init?.onIncomingStream?.(stream) + } } - } - }) + }) + .catch(err => { + log.error('could not create a new stream', err) + }) const muxer: StreamMuxer = { protocol: 'webtransport', @@ -74,7 +78,12 @@ export function webtransportMuxer (wt: Pick { log('closing webtransport muxer gracefully') - wt.close() + + try { + wt.close() + } catch (err: any) { + muxer.abort(err) + } }, /** @@ -82,7 +91,12 @@ export function webtransportMuxer (wt: Pick { log('closing webtransport muxer with err:', err) - wt.close() + + try { + wt.close() + } catch (err: any) { + log.error('webtransport session threw error during close', err) + } }, // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex. diff --git a/packages/upnp-nat/package.json b/packages/upnp-nat/package.json index 0f4cf95fec..918c7b40ee 100644 --- a/packages/upnp-nat/package.json +++ b/packages/upnp-nat/package.json @@ -60,7 +60,7 @@ "devDependencies": { "@libp2p/logger": "^4.0.13", "@libp2p/peer-id-factory": "^4.1.2", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "sinon-ts": "^2.0.0" }, "sideEffects": false diff --git a/packages/upnp-nat/src/index.ts b/packages/upnp-nat/src/index.ts index 7cbc1d1efa..f22f1f200b 100644 --- a/packages/upnp-nat/src/index.ts +++ b/packages/upnp-nat/src/index.ts @@ -35,10 +35,12 @@ * ``` */ -import { UPnPNAT } from './upnp-nat.js' +import { UPnPNAT as UPnPNATClass, type NatAPI, type MapPortOptions } from './upnp-nat.js' import type { ComponentLogger, NodeInfo, PeerId } from '@libp2p/interface' import type { AddressManager, TransportManager } from '@libp2p/interface-internal' +export type { NatAPI, MapPortOptions } + export interface PMPOptions { /** * Whether to enable PMP as well as UPnP @@ -86,8 +88,12 @@ export interface UPnPNATComponents { addressManager: AddressManager } -export function uPnPNAT (init: UPnPNATInit = {}): (components: UPnPNATComponents) => unknown { +export interface UPnPNAT { + client: NatAPI +} + +export function uPnPNAT (init: UPnPNATInit = {}): (components: UPnPNATComponents) => UPnPNAT { return (components: UPnPNATComponents) => { - return new UPnPNAT(components, init) + return new UPnPNATClass(components, init) } } diff --git a/packages/upnp-nat/src/upnp-nat.ts b/packages/upnp-nat/src/upnp-nat.ts index 03bcffce3d..1fd56a3896 100644 --- a/packages/upnp-nat/src/upnp-nat.ts +++ b/packages/upnp-nat/src/upnp-nat.ts @@ -1,19 +1,22 @@ -import { upnpNat, type NatAPI } from '@achingbrain/nat-port-mapper' +import { upnpNat, type NatAPI, type MapPortOptions } from '@achingbrain/nat-port-mapper' import { CodeError, ERR_INVALID_PARAMETERS } from '@libp2p/interface' import { isLoopback } from '@libp2p/utils/multiaddr/is-loopback' import { isPrivateIp } from '@libp2p/utils/private-ip' import { fromNodeAddress } from '@multiformats/multiaddr' import { isBrowser } from 'wherearewe' -import type { UPnPNATComponents, UPnPNATInit } from './index.js' +import type { UPnPNATComponents, UPnPNATInit, UPnPNAT as UPnPNATInterface } from './index.js' import type { Logger, Startable } from '@libp2p/interface' const DEFAULT_TTL = 7200 +export type { NatAPI, MapPortOptions } + function highPort (min = 1024, max = 65535): number { return Math.floor(Math.random() * (max - min + 1) + min) } -export class UPnPNAT implements Startable { +export class UPnPNAT implements Startable, UPnPNATInterface { + public client: NatAPI private readonly components: UPnPNATComponents private readonly externalAddress?: string private readonly localAddress?: string @@ -22,7 +25,6 @@ export class UPnPNAT implements Startable { private readonly keepAlive: boolean private readonly gateway?: string private started: boolean - private client?: NatAPI private readonly log: Logger constructor (components: UPnPNATComponents, init: UPnPNATInit) { @@ -40,6 +42,13 @@ export class UPnPNAT implements Startable { if (this.ttl < DEFAULT_TTL) { throw new CodeError(`NatManager ttl should be at least ${DEFAULT_TTL} seconds`, ERR_INVALID_PARAMETERS) } + + this.client = upnpNat({ + description: this.description, + ttl: this.ttl, + keepAlive: this.keepAlive, + gateway: this.gateway + }) } isStarted (): boolean { @@ -93,8 +102,7 @@ export class UPnPNAT implements Startable { continue } - const client = this._getClient() - const publicIp = this.externalAddress ?? await client.externalIp() + const publicIp = this.externalAddress ?? await this.client.externalIp() const isPrivate = isPrivateIp(publicIp) if (isPrivate === true) { @@ -109,7 +117,7 @@ export class UPnPNAT implements Startable { this.log(`opening uPnP connection from ${publicIp}:${publicPort} to ${host}:${port}`) - await client.map({ + await this.client.map({ publicPort, localPort: port, localAddress: this.localAddress, @@ -124,21 +132,6 @@ export class UPnPNAT implements Startable { } } - _getClient (): NatAPI { - if (this.client != null) { - return this.client - } - - this.client = upnpNat({ - description: this.description, - ttl: this.ttl, - keepAlive: this.keepAlive, - gateway: this.gateway - }) - - return this.client - } - /** * Stops the NAT manager */ @@ -149,7 +142,6 @@ export class UPnPNAT implements Startable { try { await this.client.close() - this.client = undefined } catch (err: any) { this.log.error(err) } diff --git a/packages/upnp-nat/test/index.spec.ts b/packages/upnp-nat/test/index.spec.ts index 53382be4fb..43ad25badc 100644 --- a/packages/upnp-nat/test/index.spec.ts +++ b/packages/upnp-nat/test/index.spec.ts @@ -39,9 +39,7 @@ describe('UPnP NAT (TCP)', () => { client = stubInterface() - natManager._getClient = () => { - return client - } + natManager.client = client teardown.push(async () => { await stop(natManager) diff --git a/packages/utils/package.json b/packages/utils/package.json index 9c12eda5b9..8229467cb8 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -176,7 +176,7 @@ "devDependencies": { "@libp2p/peer-id-factory": "^4.1.2", "@types/netmask": "^2.0.5", - "aegir": "^42.2.11", + "aegir": "^43.0.1", "delay": "^6.0.0", "it-all": "^3.0.6", "it-drain": "^3.0.7", diff --git a/packages/utils/src/queue/index.ts b/packages/utils/src/queue/index.ts index e843d69511..7059e84e50 100644 --- a/packages/utils/src/queue/index.ts +++ b/packages/utils/src/queue/index.ts @@ -210,8 +210,11 @@ export class Queue(fn, options) + this.enqueue(job) + this.safeDispatchEvent('add') + this.tryToStartAnother() - const p = job.join(options) + return job.join(options) .then(result => { this.safeDispatchEvent('completed', { detail: result }) this.safeDispatchEvent('success', { detail: { job, result } }) @@ -234,12 +237,6 @@ export class Queue