From 45fcaa10a6a3215089340ff2eff117d7fd1100e7 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 11 Feb 2022 18:50:23 +0200 Subject: [PATCH] fix: simpler topologies (#164) A topology is a topology, let the registrar worry if it's handling multiple protocols. Also, make the registrar do the work of calling topology methods instead of each topology maintaining its own set of event listeners. --- .../package.json | 35 +---- .../connection-gater.ts} | 0 .../connection-manager.ts} | 0 .../connection.ts} | 43 +++++- .../src/mocks/index.ts | 9 ++ .../multiaddr-connection.ts} | 6 +- .../{utils/mock-muxer.ts => mocks/muxer.ts} | 0 .../src/mocks/registrar.ts | 71 +++++++++ .../mock-upgrader.ts => mocks/upgrader.ts} | 4 +- .../src/topology/multicodec-topology.ts | 136 ----------------- .../src/topology/topology.ts | 15 +- .../src/utils/mock-registrar.ts | 35 ----- packages/libp2p-interfaces/src/dht/index.ts | 8 +- packages/libp2p-interfaces/src/index.ts | 7 +- .../libp2p-interfaces/src/registrar/index.ts | 9 +- .../libp2p-interfaces/src/topology/index.ts | 28 ++-- packages/libp2p-peer-id/src/index.ts | 14 ++ packages/libp2p-peer-record/package.json | 3 +- .../libp2p-peer-record/src/envelope/index.ts | 5 +- .../src/peer-record/index.ts | 5 +- packages/libp2p-peer-store/package.json | 7 +- .../libp2p-peer-store/src/address-book.ts | 11 +- packages/libp2p-peer-store/src/key-book.ts | 15 +- .../libp2p-peer-store/src/metadata-book.ts | 15 +- packages/libp2p-peer-store/src/proto-book.ts | 11 +- packages/libp2p-peer-store/src/store.ts | 5 +- .../test/address-book.spec.ts | 7 +- .../libp2p-peer-store/test/key-book.spec.ts | 6 - .../libp2p-peer-store/test/peer-store.spec.ts | 6 +- packages/libp2p-peer-store/tsconfig.json | 9 ++ packages/libp2p-pubsub/package.json | 3 +- packages/libp2p-pubsub/src/index.ts | 31 ++-- packages/libp2p-pubsub/src/message/sign.ts | 5 +- packages/libp2p-pubsub/test/lifecycle.spec.ts | 24 +-- packages/libp2p-pubsub/test/pubsub.spec.ts | 34 +++-- packages/libp2p-pubsub/test/utils/index.ts | 72 +++++++-- packages/libp2p-topology/src/index.ts | 18 +-- .../src/multicodec-topology.ts | 140 ------------------ 38 files changed, 325 insertions(+), 527 deletions(-) rename packages/libp2p-interface-compliance-tests/src/{utils/mock-connection-gater.ts => mocks/connection-gater.ts} (100%) rename packages/libp2p-interface-compliance-tests/src/{utils/mock-connection-manager.ts => mocks/connection-manager.ts} (100%) rename packages/libp2p-interface-compliance-tests/src/{utils/mock-connection.ts => mocks/connection.ts} (62%) create mode 100644 packages/libp2p-interface-compliance-tests/src/mocks/index.ts rename packages/libp2p-interface-compliance-tests/src/{utils/mock-multiaddr-connection.ts => mocks/multiaddr-connection.ts} (74%) rename packages/libp2p-interface-compliance-tests/src/{utils/mock-muxer.ts => mocks/muxer.ts} (100%) create mode 100644 packages/libp2p-interface-compliance-tests/src/mocks/registrar.ts rename packages/libp2p-interface-compliance-tests/src/{utils/mock-upgrader.ts => mocks/upgrader.ts} (91%) delete mode 100644 packages/libp2p-interface-compliance-tests/src/topology/multicodec-topology.ts delete mode 100644 packages/libp2p-interface-compliance-tests/src/utils/mock-registrar.ts delete mode 100644 packages/libp2p-topology/src/multicodec-topology.ts diff --git a/packages/libp2p-interface-compliance-tests/package.json b/packages/libp2p-interface-compliance-tests/package.json index 8342bb1fa..79b497e52 100644 --- a/packages/libp2p-interface-compliance-tests/package.json +++ b/packages/libp2p-interface-compliance-tests/package.json @@ -88,37 +88,9 @@ "import": "./dist/src/transport/utils/index.js", "types": "./dist/src/transport/utils/index.d.ts" }, - "./utils/mock-connection": { - "import": "./dist/src/utils/mock-connection.js", - "types": "./dist/src/utils/mock-connection.d.ts" - }, - "./utils/mock-connection-gater": { - "import": "./dist/src/utils/mock-connection-gater.js", - "types": "./dist/src/utils/mock-connection-gater.d.ts" - }, - "./utils/mock-connection-manager": { - "import": "./dist/src/utils/mock-connection-manager.js", - "types": "./dist/src/utils/mock-connection-manager.d.ts" - }, - "./utils/mock-multiaddr-connection": { - "import": "./dist/src/utils/mock-multiaddr-connection.js", - "types": "./dist/src/utils/mock-multiaddr-connection.d.ts" - }, - "./utils/mock-muxer": { - "import": "./dist/src/utils/mock-muxer.js", - "types": "./dist/src/utils/mock-muxer.d.ts" - }, - "./utils/mock-peer-store": { - "import": "./dist/src/utils/mock-peer-store.js", - "types": "./dist/src/utils/mock-peer-store.d.ts" - }, - "./utils/mock-registrar": { - "import": "./dist/src/utils/mock-registrar.js", - "types": "./dist/src/utils/mock-registrar.d.ts" - }, - "./utils/mock-upgrader": { - "import": "./dist/src/utils/mock-upgrader.js", - "types": "./dist/src/utils/mock-upgrader.d.ts" + "./mocks": { + "import": "./dist/src/mocks/index.js", + "types": "./dist/src/mocks/index.d.ts" }, "./utils/peers": { "import": "./dist/src/utils/peers.js", @@ -242,6 +214,7 @@ "it-pair": "^2.0.0", "it-pipe": "^2.0.2", "it-pushable": "^2.0.1", + "it-stream-types": "^1.0.4", "multiformats": "^9.4.10", "p-defer": "^4.0.0", "p-limit": "^4.0.0", diff --git a/packages/libp2p-interface-compliance-tests/src/utils/mock-connection-gater.ts b/packages/libp2p-interface-compliance-tests/src/mocks/connection-gater.ts similarity index 100% rename from packages/libp2p-interface-compliance-tests/src/utils/mock-connection-gater.ts rename to packages/libp2p-interface-compliance-tests/src/mocks/connection-gater.ts diff --git a/packages/libp2p-interface-compliance-tests/src/utils/mock-connection-manager.ts b/packages/libp2p-interface-compliance-tests/src/mocks/connection-manager.ts similarity index 100% rename from packages/libp2p-interface-compliance-tests/src/utils/mock-connection-manager.ts rename to packages/libp2p-interface-compliance-tests/src/mocks/connection-manager.ts diff --git a/packages/libp2p-interface-compliance-tests/src/utils/mock-connection.ts b/packages/libp2p-interface-compliance-tests/src/mocks/connection.ts similarity index 62% rename from packages/libp2p-interface-compliance-tests/src/utils/mock-connection.ts rename to packages/libp2p-interface-compliance-tests/src/mocks/connection.ts index ad70f5951..53b7604d3 100644 --- a/packages/libp2p-interface-compliance-tests/src/utils/mock-connection.ts +++ b/packages/libp2p-interface-compliance-tests/src/mocks/connection.ts @@ -1,20 +1,21 @@ import { PeerId } from '@libp2p/peer-id' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { pipe } from 'it-pipe' +import { duplexPair } from 'it-pair/duplex' import type { MultiaddrConnection } from '@libp2p/interfaces/transport' import type { Connection, Stream, Metadata, ProtocolStream } from '@libp2p/interfaces/connection' import type { Muxer } from '@libp2p/interfaces/stream-muxer' +import type { Duplex } from 'it-stream-types' +import { mockMuxer } from './muxer.js' -export async function mockConnection (maConn: MultiaddrConnection, direction: 'inbound' | 'outbound', muxer: Muxer): Promise { +export async function mockConnection (maConn: MultiaddrConnection, direction: 'inbound' | 'outbound' = 'inbound', muxer: Muxer = mockMuxer()): Promise { const remoteAddr = maConn.remoteAddr const remotePeerIdStr = remoteAddr.getPeerId() const remotePeer = remotePeerIdStr != null ? PeerId.fromString(remotePeerIdStr) : await createEd25519PeerId() - + const registry = new Map() const streams: Stream[] = [] let streamId = 0 - const registry = new Map() - void pipe( maConn, muxer, maConn ) @@ -64,3 +65,37 @@ export async function mockConnection (maConn: MultiaddrConnection, direction: 'i } } } + +export function mockStream (stream: Duplex): Stream { + return { + ...stream, + close: () => {}, + abort: () => {}, + reset: () => {}, + timeline: { + open: Date.now() + }, + id: `stream-${Date.now()}` + } +} + +export function connectionPair (): [ Connection, Connection ] { + const [d0, d1] = duplexPair() + + return [ + // @ts-expect-error not a complete implementation + { + newStream: async (multicodecs: string[]) => await Promise.resolve({ + stream: mockStream(d0), + protocol: multicodecs[0] + }) + }, + // @ts-expect-error not a complete implementation + { + newStream: async (multicodecs: string[]) => await Promise.resolve({ + stream: mockStream(d1), + protocol: multicodecs[0] + }) + } + ] +} diff --git a/packages/libp2p-interface-compliance-tests/src/mocks/index.ts b/packages/libp2p-interface-compliance-tests/src/mocks/index.ts new file mode 100644 index 000000000..dec9cef1d --- /dev/null +++ b/packages/libp2p-interface-compliance-tests/src/mocks/index.ts @@ -0,0 +1,9 @@ + +export { mockConnectionGater } from './connection-gater.js' +export { mockConnectionManager } from './connection-manager.js' +export { mockConnection, mockStream, connectionPair } from './connection.js' +export { mockMultiaddrConnection } from './multiaddr-connection.js' +export { mockMuxer } from './muxer.js' +export { mockRegistrar } from './registrar.js' +export { mockUpgrader } from './upgrader.js' +export type { MockUpgraderOptions } from './upgrader.js' diff --git a/packages/libp2p-interface-compliance-tests/src/utils/mock-multiaddr-connection.ts b/packages/libp2p-interface-compliance-tests/src/mocks/multiaddr-connection.ts similarity index 74% rename from packages/libp2p-interface-compliance-tests/src/utils/mock-multiaddr-connection.ts rename to packages/libp2p-interface-compliance-tests/src/mocks/multiaddr-connection.ts index 7e8144f0c..d49dfd3a7 100644 --- a/packages/libp2p-interface-compliance-tests/src/utils/mock-multiaddr-connection.ts +++ b/packages/libp2p-interface-compliance-tests/src/mocks/multiaddr-connection.ts @@ -2,16 +2,16 @@ import { Multiaddr } from '@multiformats/multiaddr' import type { MultiaddrConnection } from '@libp2p/interfaces/transport' import type { Duplex } from 'it-stream-types' -export function mockMultiaddrConnection (source: Duplex): MultiaddrConnection { +export function mockMultiaddrConnection (source: Duplex & Partial): MultiaddrConnection { const maConn: MultiaddrConnection = { - ...source, async close () { }, timeline: { open: Date.now() }, - remoteAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4001') + remoteAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4001'), + ...source } return maConn diff --git a/packages/libp2p-interface-compliance-tests/src/utils/mock-muxer.ts b/packages/libp2p-interface-compliance-tests/src/mocks/muxer.ts similarity index 100% rename from packages/libp2p-interface-compliance-tests/src/utils/mock-muxer.ts rename to packages/libp2p-interface-compliance-tests/src/mocks/muxer.ts diff --git a/packages/libp2p-interface-compliance-tests/src/mocks/registrar.ts b/packages/libp2p-interface-compliance-tests/src/mocks/registrar.ts new file mode 100644 index 000000000..4d7819526 --- /dev/null +++ b/packages/libp2p-interface-compliance-tests/src/mocks/registrar.ts @@ -0,0 +1,71 @@ +import type { Registrar, StreamHandler } from '@libp2p/interfaces/registrar' +import type { Topology } from '@libp2p/interfaces/topology' + +export class MockRegistrar implements Registrar { + private readonly topologies: Map = new Map() + private readonly handlers: Map = new Map() + + async handle (protocols: string | string[], handler: StreamHandler) { + if (!Array.isArray(protocols)) { + protocols = [protocols] + } + + const id = `handler-id-${Math.random()}` + + this.handlers.set(id, { + handler, + protocols + }) + + return id + } + + async unhandle (id: string) { + this.handlers.delete(id) + } + + getHandlers (protocol: string) { + const output: StreamHandler[] = [] + + for (const { handler, protocols } of this.handlers.values()) { + if (protocols.includes(protocol)) { + output.push(handler) + } + } + + return output + } + + register (protocols: string | string[], topology: Topology) { + if (!Array.isArray(protocols)) { + protocols = [protocols] + } + + const id = `topology-id-${Math.random()}` + + this.topologies.set(id, { + topology, + protocols + }) + + return id + } + + unregister (id: string | string[]) { + if (!Array.isArray(id)) { + id = [id] + } + + id.forEach(id => this.topologies.delete(id)) + } + + getTopologies (protocol: string) { + const output: Topology[] = [] + + return output + } +} + +export function mockRegistrar () { + return new MockRegistrar() +} diff --git a/packages/libp2p-interface-compliance-tests/src/utils/mock-upgrader.ts b/packages/libp2p-interface-compliance-tests/src/mocks/upgrader.ts similarity index 91% rename from packages/libp2p-interface-compliance-tests/src/utils/mock-upgrader.ts rename to packages/libp2p-interface-compliance-tests/src/mocks/upgrader.ts index aa5879b2d..39d92dc29 100644 --- a/packages/libp2p-interface-compliance-tests/src/utils/mock-upgrader.ts +++ b/packages/libp2p-interface-compliance-tests/src/mocks/upgrader.ts @@ -1,6 +1,6 @@ import { expect } from 'aegir/utils/chai.js' -import { mockMuxer } from './mock-muxer.js' -import { mockConnection } from './mock-connection.js' +import { mockMuxer } from './muxer.js' +import { mockConnection } from './connection.js' import type { Upgrader, MultiaddrConnection } from '@libp2p/interfaces/transport' import type { Muxer } from '@libp2p/interfaces/stream-muxer' diff --git a/packages/libp2p-interface-compliance-tests/src/topology/multicodec-topology.ts b/packages/libp2p-interface-compliance-tests/src/topology/multicodec-topology.ts deleted file mode 100644 index 2e6750ea2..000000000 --- a/packages/libp2p-interface-compliance-tests/src/topology/multicodec-topology.ts +++ /dev/null @@ -1,136 +0,0 @@ -import { expect } from 'aegir/utils/chai.js' -import sinon from 'sinon' -import * as PeerIdFactory from '@libp2p/peer-id-factory' -import peers from '../utils/peers.js' -import type { TestSetup } from '../index.js' -import type { MulticodecTopology } from '@libp2p/interfaces/topology' -import type { PeerId } from '@libp2p/interfaces/peer-id' - -export default (test: TestSetup) => { - describe('multicodec topology', () => { - let topology: MulticodecTopology, id: PeerId - - beforeEach(async () => { - topology = await test.setup() - - id = await PeerIdFactory.createFromJSON(peers[0]) - }) - - afterEach(async () => { - sinon.restore() - await test.teardown() - }) - - it('should have properties set', () => { - expect(topology.multicodecs).to.exist() - expect(topology.peers).to.exist() - }) - - it('should trigger "onDisconnect" on peer disconnected', () => { - // @ts-expect-error protected property - sinon.spy(topology, '_onDisconnect') - topology.disconnect(id) - - expect(topology).to.have.nested.property('_onDisconnect.callCount', 1) - }) - - it('should update peers on protocol change', async () => { - // @ts-expect-error protected property - sinon.spy(topology, '_updatePeers') - expect(topology.peers.size).to.eql(0) - - // @ts-expect-error protected property - const peerStore = topology._registrar.peerStore - - const id2 = await PeerIdFactory.createFromJSON(peers[1]) - peerStore.peers.set(id2.toString(), { - id: id2, - protocols: Array.from(topology.multicodecs) - }) - - peerStore.emit('change:protocols', { - peerId: id2, - protocols: Array.from(topology.multicodecs) - }) - - expect(topology).to.have.nested.property('_updatePeers.callCount', 1) - expect(topology.peers.size).to.eql(1) - }) - - it('should disconnect if peer no longer supports a protocol', async () => { - // @ts-expect-error protected property - sinon.spy(topology, '_onDisconnect') - expect(topology.peers.size).to.eql(0) - - // @ts-expect-error protected property - const peerStore = topology._registrar.peerStore - - const id2 = await PeerIdFactory.createFromJSON(peers[1]) - peerStore.peers.set(id2.toString(), { - id: id2, - protocols: Array.from(topology.multicodecs) - }) - - peerStore.emit('change:protocols', { - peerId: id2, - protocols: Array.from(topology.multicodecs) - }) - - expect(topology.peers.size).to.eql(1) - - peerStore.peers.set(id2.toString(), { - id: id2, - protocols: [] - }) - // Peer does not support the protocol anymore - peerStore.emit('change:protocols', { - peerId: id2, - protocols: [] - }) - - expect(topology.peers.size).to.eql(1) - expect(topology).to.have.nested.property('_onDisconnect.callCount', 1) - // @ts-expect-error protected property - expect(topology._onDisconnect.calledWith(id2)).to.equal(true) - }) - - it('should trigger "onConnect" when a peer connects and has one of the topology multicodecs in its known protocols', () => { - // @ts-expect-error protected property - sinon.spy(topology, '_onConnect') - // @ts-expect-error protected property - sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns(topology.multicodecs) - // @ts-expect-error protected property - topology._registrar.connectionManager.emit('peer:connect', { - remotePeer: id - }) - - expect(topology).to.have.nested.property('_onConnect.callCount', 1) - }) - - it('should not trigger "onConnect" when a peer connects and has none of the topology multicodecs in its known protocols', () => { - // @ts-expect-error protected property - sinon.spy(topology, '_onConnect') - // @ts-expect-error protected property - sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns([]) - // @ts-expect-error protected property - topology._registrar.connectionManager.emit('peer:connect', { - remotePeer: id - }) - - expect(topology).to.have.nested.property('_onConnect.callCount', 0) - }) - - it('should not trigger "onConnect" when a peer connects and its protocols are not known', () => { - // @ts-expect-error protected property - sinon.spy(topology, '_onConnect') - // @ts-expect-error protected property - sinon.stub(topology._registrar.peerStore.protoBook, 'get').returns(undefined) - // @ts-expect-error protected property - topology._registrar.connectionManager.emit('peer:connect', { - remotePeer: id - }) - - expect(topology).to.have.nested.property('_onConnect.callCount', 0) - }) - }) -} diff --git a/packages/libp2p-interface-compliance-tests/src/topology/topology.ts b/packages/libp2p-interface-compliance-tests/src/topology/topology.ts index 648c0f812..280fbe02d 100644 --- a/packages/libp2p-interface-compliance-tests/src/topology/topology.ts +++ b/packages/libp2p-interface-compliance-tests/src/topology/topology.ts @@ -1,19 +1,14 @@ import { expect } from 'aegir/utils/chai.js' import sinon from 'sinon' -import * as PeerIdFactory from '@libp2p/peer-id-factory' -import peers from '../utils/peers.js' import type { TestSetup } from '../index.js' import type { Topology } from '@libp2p/interfaces/topology' -import type { PeerId } from '@libp2p/interfaces/peer-id' export default (test: TestSetup) => { describe('topology', () => { - let topology: Topology, id: PeerId + let topology: Topology beforeEach(async () => { topology = await test.setup() - - id = await PeerIdFactory.createFromJSON(peers[0]) }) afterEach(async () => { @@ -26,13 +21,5 @@ export default (test: TestSetup) => { expect(topology.max).to.exist() expect(topology.peers).to.exist() }) - - it('should trigger "onDisconnect" on peer disconnected', () => { - // @ts-expect-error protected property - sinon.spy(topology, '_onDisconnect') - topology.disconnect(id) - - expect(topology).to.have.nested.property('_onDisconnect.callCount', 1) - }) }) } diff --git a/packages/libp2p-interface-compliance-tests/src/utils/mock-registrar.ts b/packages/libp2p-interface-compliance-tests/src/utils/mock-registrar.ts deleted file mode 100644 index 86c8835cf..000000000 --- a/packages/libp2p-interface-compliance-tests/src/utils/mock-registrar.ts +++ /dev/null @@ -1,35 +0,0 @@ -import type { Registrar, StreamHandler } from '@libp2p/interfaces/registrar' -import type { MulticodecTopology } from '../../../libp2p-topology/src/multicodec-topology' - -export class MockRegistrar implements Registrar { - public readonly topologies: Map = new Map() - public readonly streamHandlers: Map = new Map() - - async handle (multicodecs: string | string[], handler: StreamHandler) { - if (!Array.isArray(multicodecs)) { - multicodecs = [multicodecs] - } - - this.streamHandlers.set(multicodecs[0], handler) - } - - async unhandle (multicodec: string) { - this.streamHandlers.delete(multicodec) - } - - register (topology: MulticodecTopology) { - const { multicodecs } = topology - - this.topologies.set(multicodecs[0], topology) - - return multicodecs[0] - } - - unregister (id: string) { - this.topologies.delete(id) - } -} - -export function mockRegistrar () { - return new MockRegistrar() -} diff --git a/packages/libp2p-interfaces/src/dht/index.ts b/packages/libp2p-interfaces/src/dht/index.ts index 7a3390de4..6f0ac9f2f 100644 --- a/packages/libp2p-interfaces/src/dht/index.ts +++ b/packages/libp2p-interfaces/src/dht/index.ts @@ -161,14 +161,14 @@ export interface DHT extends PeerDiscovery, Startable { put: (key: Uint8Array, value: Uint8Array, options?: QueryOptions) => AsyncIterable /** - * Enable server mode (e.g. allow publishing provider records) + * Returns the mode this node is in */ - enableServerMode: () => void + getMode: () => Promise<'client' | 'server'> /** - * Enable server mode (e.g. disallow publishing provider records) + * If 'server' this node will respond to DHT queries, if 'client' this node will not */ - enableClientMode: () => void + setMode: (mode: 'client' | 'server') => Promise /** * Force a routing table refresh diff --git a/packages/libp2p-interfaces/src/index.ts b/packages/libp2p-interfaces/src/index.ts index 6acffaaaf..d5a55f0df 100644 --- a/packages/libp2p-interfaces/src/index.ts +++ b/packages/libp2p-interfaces/src/index.ts @@ -1,6 +1,6 @@ import type { PeerId } from './peer-id/index.js' import type { Multiaddr } from '@multiformats/multiaddr' -import type { Duplex } from 'it-stream-types' +import type { ProtocolStream, Connection } from './connection/index.js' export interface AbortOptions { signal?: AbortSignal @@ -12,12 +12,11 @@ export interface Startable { isStarted: () => boolean } -// Implemented by libp2p, should be moved to libp2p-interfaces eventually export interface Dialer { - dialProtocol: (peer: PeerId, protocol: string, options?: { signal?: AbortSignal }) => Promise<{ stream: Duplex }> + dial: (peer: PeerId, options?: { signal?: AbortSignal }) => Promise + dialProtocol: (peer: PeerId, protocol: string, options?: { signal?: AbortSignal }) => Promise } -// Implemented by libp2p, should be moved to libp2p-interfaces eventually export interface Addressable { multiaddrs: Multiaddr[] } diff --git a/packages/libp2p-interfaces/src/registrar/index.ts b/packages/libp2p-interfaces/src/registrar/index.ts index 005b001cc..d9098e2e8 100644 --- a/packages/libp2p-interfaces/src/registrar/index.ts +++ b/packages/libp2p-interfaces/src/registrar/index.ts @@ -1,6 +1,7 @@ import type { EventEmitter } from '../index.js' import type { Connection, Stream } from '../connection/index.js' import type { PeerId } from '../peer-id/index.js' +import type { Topology } from '../topology/index.js' export interface IncomingStreamData { protocol: string @@ -21,9 +22,11 @@ export interface StreamHandler { } export interface Registrar { - handle: (multicodec: string | string[], handler: StreamHandler) => Promise - unhandle: (multicodec: string) => Promise + handle: (protocol: string | string[], handler: StreamHandler) => Promise + unhandle: (id: string) => Promise + getHandlers: (protocol: string) => StreamHandler[] - register: (topology: any) => string + register: (protocols: string | string[], topology: Topology) => string unregister: (id: string) => void + getTopologies: (protocol: string) => Topology[] } diff --git a/packages/libp2p-interfaces/src/topology/index.ts b/packages/libp2p-interfaces/src/topology/index.ts index 5730c1396..c1ffeb7d6 100644 --- a/packages/libp2p-interfaces/src/topology/index.ts +++ b/packages/libp2p-interfaces/src/topology/index.ts @@ -1,14 +1,12 @@ import type { PeerId } from '../peer-id/index.js' import type { Connection } from '../connection/index.js' -import type { ConnectionManager } from '../registrar/index.js' -import type { PeerStore } from '../peer-store/index.js' -export interface onConnectHandler { (peerId: PeerId, conn: Connection): void } -export interface onDisconnectHandler { (peerId: PeerId, conn?: Connection): void } +export interface onConnectHandler { + (peerId: PeerId, conn: Connection): void +} -export interface Handlers { - onConnect?: onConnectHandler - onDisconnect?: onDisconnectHandler +export interface onDisconnectHandler { + (peerId: PeerId, conn?: Connection): void } export interface TopologyOptions { @@ -21,7 +19,8 @@ export interface TopologyOptions { * maximum needed connections */ max?: number - handlers: Handlers + onConnect?: onConnectHandler + onDisconnect?: onDisconnectHandler } export interface Topology { @@ -29,15 +28,6 @@ export interface Topology { max: number peers: Set - disconnect: (id: PeerId) => void -} - -export interface MulticodecTopologyOptions extends TopologyOptions { - multicodecs: string[] - peerStore: PeerStore - connectionManager: ConnectionManager -} - -export interface MulticodecTopology extends Topology { - multicodecs: string[] + onConnect: (peerId: PeerId, conn: Connection) => void + onDisconnect: (peerId: PeerId) => void } diff --git a/packages/libp2p-peer-id/src/index.ts b/packages/libp2p-peer-id/src/index.ts index eaa60a3dc..2dd093e16 100644 --- a/packages/libp2p-peer-id/src/index.ts +++ b/packages/libp2p-peer-id/src/index.ts @@ -9,6 +9,8 @@ import errcode from 'err-code' import type { MultibaseDecoder, MultibaseEncoder } from 'multiformats/bases/interface' import type { MultihashDigest } from 'multiformats/hashes/interface' +const peerIdSymbol = Symbol.for('@libp2p/peer-id') + const baseDecoder = Object .values(bases) .map(codec => codec.decoder) @@ -55,6 +57,18 @@ export class PeerId { this.privateKey = opts.privateKey } + static isPeerId (other: any): other is PeerId { + return peerIdSymbol in other + } + + get [Symbol.toStringTag] () { + return peerIdSymbol.toString() + } + + get [peerIdSymbol] () { + return true + } + toString (codec?: MultibaseEncoder) { if (codec == null) { codec = base58btc diff --git a/packages/libp2p-peer-record/package.json b/packages/libp2p-peer-record/package.json index 18ae0cbdc..dc0258806 100644 --- a/packages/libp2p-peer-record/package.json +++ b/packages/libp2p-peer-record/package.json @@ -127,8 +127,7 @@ "scripts": { "lint": "aegir lint", "dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js", - "build": "tsc", - "postbuild": "npm run build:copy-proto-files", + "build": "tsc && npm run build:copy-proto-files", "generate": "npm run generate:envelope && npm run generate:envelope-types && npm run generate:peer-record && npm run generate:peer-record-types", "generate:envelope": "pbjs -t static-module -w es6 -r libp2p-peer-record-envelope --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/envelope/envelope.js ./src/envelope/envelope.proto", "generate:envelope-types": "pbts -o src/envelope/envelope.d.ts src/envelope/envelope.js", diff --git a/packages/libp2p-peer-record/src/envelope/index.ts b/packages/libp2p-peer-record/src/envelope/index.ts index 4dba02870..6dca90b7c 100644 --- a/packages/libp2p-peer-record/src/envelope/index.ts +++ b/packages/libp2p-peer-record/src/envelope/index.ts @@ -6,7 +6,8 @@ import varint from 'varint' import { equals as uint8arraysEquals } from 'uint8arrays/equals' import { codes } from '../errors.js' import { Envelope as Protobuf } from './envelope.js' -import { PeerId } from '@libp2p/peer-id' +import { PeerId as PeerIdImpl } from '@libp2p/peer-id' +import type { PeerId } from '@libp2p/interfaces/peer-id' import type { Record, Envelope } from '@libp2p/interfaces/record' export interface EnvelopeOptions { @@ -22,7 +23,7 @@ export class RecordEnvelope implements Envelope { */ static createFromProtobuf = async (data: Uint8Array) => { const envelopeData = Protobuf.decode(data) - const peerId = await PeerId.fromKeys(envelopeData.publicKey) + const peerId = await PeerIdImpl.fromKeys(envelopeData.publicKey) return new RecordEnvelope({ peerId, diff --git a/packages/libp2p-peer-record/src/peer-record/index.ts b/packages/libp2p-peer-record/src/peer-record/index.ts index 80fb03634..f2870bd0f 100644 --- a/packages/libp2p-peer-record/src/peer-record/index.ts +++ b/packages/libp2p-peer-record/src/peer-record/index.ts @@ -1,6 +1,7 @@ import { Multiaddr } from '@multiformats/multiaddr' -import { PeerId } from '@libp2p/peer-id' +import type { PeerId } from '@libp2p/interfaces/peer-id' import { arrayEquals } from '@libp2p/utils/array-equals' +import { PeerId as PeerIdImpl } from '@libp2p/peer-id' import { PeerRecord as Protobuf } from './peer-record.js' import { ENVELOPE_DOMAIN_PEER_RECORD, @@ -34,7 +35,7 @@ export class PeerRecord { */ static createFromProtobuf = (buf: Uint8Array) => { const peerRecord = Protobuf.decode(buf) - const peerId = PeerId.fromBytes(peerRecord.peerId) + const peerId = PeerIdImpl.fromBytes(peerRecord.peerId) const multiaddrs = (peerRecord.addresses ?? []).map((a) => new Multiaddr(a.multiaddr)) const seqNumber = Number(peerRecord.seq) diff --git a/packages/libp2p-peer-store/package.json b/packages/libp2p-peer-store/package.json index 38f79a0c1..5be9a8996 100644 --- a/packages/libp2p-peer-store/package.json +++ b/packages/libp2p-peer-store/package.json @@ -125,8 +125,7 @@ "scripts": { "lint": "aegir lint", "dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js", - "build": "tsc", - "postbuild": "npm run build:copy-proto-files", + "build": "tsc && npm run build:copy-proto-files", "generate": "npm run generate:proto && npm run generate:proto-types && tsc", "generate:proto": "pbjs -t static-module -w es6 -r libp2p-peer-store --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/pb/peer.js ./src/pb/peer.proto", "generate:proto-types": "pbts -o src/pb/peer.d.ts src/pb/peer.js", @@ -143,7 +142,7 @@ "dependencies": { "@libp2p/interfaces": "^1.0.0", "@libp2p/logger": "^1.0.1", - "@libp2p/peer-record": "^0.0.0", + "@libp2p/peer-record": "^1.0.1", "@multiformats/multiaddr": "^10.1.5", "interface-datastore": "^6.1.0", "it-all": "^1.0.6", @@ -156,7 +155,7 @@ "protobufjs": "^6.10.2" }, "devDependencies": { - "@libp2p/interface-compliance-tests": "^1.1.0", + "@libp2p/interface-compliance-tests": "^1.1.2", "@libp2p/peer-id": "^1.1.0", "@libp2p/peer-id-factory": "^1.0.3", "@libp2p/utils": "^1.0.5", diff --git a/packages/libp2p-peer-store/src/address-book.ts b/packages/libp2p-peer-store/src/address-book.ts index 9be6313e9..2133f4912 100644 --- a/packages/libp2p-peer-store/src/address-book.ts +++ b/packages/libp2p-peer-store/src/address-book.ts @@ -9,12 +9,13 @@ import filter from 'it-filter' import map from 'it-map' import each from 'it-foreach' import { base58btc } from 'multiformats/bases/base58' -import { PeerId } from '@libp2p/peer-id' +import { PeerId as PeerIdImpl } from '@libp2p/peer-id' import { CustomEvent } from '@libp2p/interfaces' import type { PeerStore } from '@libp2p/interfaces/peer-store' import type { Store } from './store.js' import type { AddressFilter, AddressSorter } from './index.js' import type { Envelope } from '@libp2p/interfaces/record' +import type { PeerId } from '@libp2p/interfaces/peer-id' const log = logger('libp2p:peer-store:address-book') const EVENT_NAME = 'change:multiaddrs' @@ -138,7 +139,7 @@ export class PeerStoreAddressBook { } async get (peerId: PeerId) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('get wait for read lock') const release = await this.store.lock.readLock() @@ -161,7 +162,7 @@ export class PeerStoreAddressBook { } async set (peerId: PeerId, multiaddrs: Multiaddr[]) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) if (!Array.isArray(multiaddrs)) { log.error('multiaddrs must be an array of Multiaddrs') @@ -223,7 +224,7 @@ export class PeerStoreAddressBook { } async add (peerId: PeerId, multiaddrs: Multiaddr[]) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) if (!Array.isArray(multiaddrs)) { log.error('multiaddrs must be an array of Multiaddrs') @@ -282,7 +283,7 @@ export class PeerStoreAddressBook { } async delete (peerId: PeerId) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('delete await write lock') const release = await this.store.lock.writeLock() diff --git a/packages/libp2p-peer-store/src/key-book.ts b/packages/libp2p-peer-store/src/key-book.ts index dae8f262a..3132e211f 100644 --- a/packages/libp2p-peer-store/src/key-book.ts +++ b/packages/libp2p-peer-store/src/key-book.ts @@ -1,17 +1,12 @@ import { logger } from '@libp2p/logger' import errcode from 'err-code' import { codes } from './errors.js' -import { PeerId } from '@libp2p/peer-id' +import { PeerId as PeerIdImpl } from '@libp2p/peer-id' import { equals as uint8arrayEquals } from 'uint8arrays/equals' import { CustomEvent } from '@libp2p/interfaces' import type { Store } from './store.js' import type { PeerStore, KeyBook } from '@libp2p/interfaces/src/peer-store' - -/** - * @typedef {import('./types').PeerStore} PeerStore - * @typedef {import('./types').KeyBook} KeyBook - * @typedef {import('libp2p-interfaces/src/keys/types').PublicKey} PublicKey - */ +import type { PeerId } from '@libp2p/interfaces/peer-id' const log = logger('libp2p:peer-store:key-book') @@ -33,7 +28,7 @@ export class PeerStoreKeyBook implements KeyBook { * Set the Peer public key */ async set (peerId: PeerId, publicKey: Uint8Array) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) if (!(publicKey instanceof Uint8Array)) { log.error('publicKey must be an instance of Uint8Array to store data') @@ -79,7 +74,7 @@ export class PeerStoreKeyBook implements KeyBook { * Get Public key of the given PeerId, if stored */ async get (peerId: PeerId) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('get await write lock') const release = await this.store.lock.readLock() @@ -100,7 +95,7 @@ export class PeerStoreKeyBook implements KeyBook { } async delete (peerId: PeerId) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('delete await write lock') const release = await this.store.lock.writeLock() diff --git a/packages/libp2p-peer-store/src/metadata-book.ts b/packages/libp2p-peer-store/src/metadata-book.ts index 731d54fcd..317ce0f8e 100644 --- a/packages/libp2p-peer-store/src/metadata-book.ts +++ b/packages/libp2p-peer-store/src/metadata-book.ts @@ -1,11 +1,12 @@ import { logger } from '@libp2p/logger' import errcode from 'err-code' import { codes } from './errors.js' -import { PeerId } from '@libp2p/peer-id' +import { PeerId as PeerIdImpl } from '@libp2p/peer-id' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { CustomEvent } from '@libp2p/interfaces' import type { Store } from './store.js' import type { PeerStore, MetadataBook } from '@libp2p/interfaces/src/peer-store' +import type { PeerId } from '@libp2p/interfaces/peer-id' const log = logger('libp2p:peer-store:metadata-book') @@ -28,7 +29,7 @@ export class PeerStoreMetadataBook implements MetadataBook { * Get the known data of a provided peer */ async get (peerId: PeerId) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('get await read lock') const release = await this.store.lock.readLock() @@ -54,7 +55,7 @@ export class PeerStoreMetadataBook implements MetadataBook { * Get specific metadata value, if it exists */ async getValue (peerId: PeerId, key: string) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('getValue await read lock') const release = await this.store.lock.readLock() @@ -75,7 +76,7 @@ export class PeerStoreMetadataBook implements MetadataBook { } async set (peerId: PeerId, metadata: Map) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) if (!(metadata instanceof Map)) { log.error('valid metadata must be provided to store data') @@ -104,7 +105,7 @@ export class PeerStoreMetadataBook implements MetadataBook { * Set metadata key and value of a provided peer */ async setValue (peerId: PeerId, key: string, value: Uint8Array) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) if (typeof key !== 'string' || !(value instanceof Uint8Array)) { log.error('valid key and value must be provided to store data') @@ -145,7 +146,7 @@ export class PeerStoreMetadataBook implements MetadataBook { } async delete (peerId: PeerId) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('delete await write lock') const release = await this.store.lock.writeLock() @@ -174,7 +175,7 @@ export class PeerStoreMetadataBook implements MetadataBook { } async deleteValue (peerId: PeerId, key: string) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('deleteValue await write lock') const release = await this.store.lock.writeLock() diff --git a/packages/libp2p-peer-store/src/proto-book.ts b/packages/libp2p-peer-store/src/proto-book.ts index 8e5bde68b..9e7ad1e8d 100644 --- a/packages/libp2p-peer-store/src/proto-book.ts +++ b/packages/libp2p-peer-store/src/proto-book.ts @@ -1,11 +1,12 @@ import { logger } from '@libp2p/logger' import errcode from 'err-code' import { codes } from './errors.js' -import { PeerId } from '@libp2p/peer-id' +import { PeerId as PeerIdImpl } from '@libp2p/peer-id' import { base58btc } from 'multiformats/bases/base58' import { CustomEvent } from '@libp2p/interfaces' import type { Store } from './store.js' import type { PeerStore, ProtoBook } from '@libp2p/interfaces/src/peer-store' +import type { PeerId } from '@libp2p/interfaces/peer-id' const log = logger('libp2p:peer-store:proto-book') @@ -46,7 +47,7 @@ export class PeerStoreProtoBook implements ProtoBook { } async set (peerId: PeerId, protocols: string[]) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) if (!Array.isArray(protocols)) { log.error('protocols must be provided to store data') @@ -90,7 +91,7 @@ export class PeerStoreProtoBook implements ProtoBook { } async add (peerId: PeerId, protocols: string[]) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) if (!Array.isArray(protocols)) { log.error('protocols must be provided to store data') @@ -135,7 +136,7 @@ export class PeerStoreProtoBook implements ProtoBook { } async remove (peerId: PeerId, protocols: string[]) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) if (!Array.isArray(protocols)) { log.error('protocols must be provided to store data') @@ -182,7 +183,7 @@ export class PeerStoreProtoBook implements ProtoBook { } async delete (peerId: PeerId) { - peerId = PeerId.fromPeerId(peerId) + peerId = PeerIdImpl.fromPeerId(peerId) log('delete await write lock') const release = await this.store.lock.writeLock() diff --git a/packages/libp2p-peer-store/src/store.ts b/packages/libp2p-peer-store/src/store.ts index 2050f593b..ed598a55c 100644 --- a/packages/libp2p-peer-store/src/store.ts +++ b/packages/libp2p-peer-store/src/store.ts @@ -1,5 +1,5 @@ import { logger } from '@libp2p/logger' -import { PeerId } from '@libp2p/peer-id' +import { PeerId as PeerIdImpl } from '@libp2p/peer-id' import errcode from 'err-code' import { codes } from './errors.js' import { Key } from 'interface-datastore/key' @@ -10,6 +10,7 @@ import mortice from 'mortice' import { equals as uint8arrayEquals } from 'uint8arrays/equals' import type { Peer } from '@libp2p/interfaces/peer-store' import type { Datastore } from 'interface-datastore' +import type { PeerId } from '@libp2p/interfaces/peer-id' const log = logger('libp2p:peer-store:store') @@ -218,7 +219,7 @@ export class PersistentStore { const base32Str = key.toString().split('/')[2] const buf = base32.decode(base32Str) - yield this.load(PeerId.fromBytes(buf)) + yield this.load(PeerIdImpl.fromBytes(buf)) } } } diff --git a/packages/libp2p-peer-store/test/address-book.spec.ts b/packages/libp2p-peer-store/test/address-book.spec.ts index 468df9eea..514bdadaf 100644 --- a/packages/libp2p-peer-store/test/address-book.spec.ts +++ b/packages/libp2p-peer-store/test/address-book.spec.ts @@ -10,18 +10,13 @@ import pDefer from 'p-defer' import { MemoryDatastore } from 'datastore-core/memory' import { DefaultPeerStore } from '../src/index.js' import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record' -import { mockConnectionGater } from '@libp2p/interface-compliance-tests/utils/mock-connection-gater' +import { mockConnectionGater } from '@libp2p/interface-compliance-tests/mocks' import { codes } from '../src/errors.js' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import type { PeerStore, AddressBook } from '@libp2p/interfaces/peer-store' import { base58btc } from 'multiformats/bases/base58' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -/** - * @typedef {import('../../src/peer-store/types').PeerStore} PeerStore - * @typedef {import('../../src/peer-store/types').AddressBook} AddressBook - */ - const addr1 = new Multiaddr('/ip4/127.0.0.1/tcp/8000') const addr2 = new Multiaddr('/ip4/20.0.0.1/tcp/8001') const addr3 = new Multiaddr('/ip4/127.0.0.1/tcp/8002') diff --git a/packages/libp2p-peer-store/test/key-book.spec.ts b/packages/libp2p-peer-store/test/key-book.spec.ts index e55e257c9..0dd61b67f 100644 --- a/packages/libp2p-peer-store/test/key-book.spec.ts +++ b/packages/libp2p-peer-store/test/key-book.spec.ts @@ -11,12 +11,6 @@ import type { PeerId } from '@libp2p/interfaces/peer-id' import type { PeerStore, KeyBook } from '@libp2p/interfaces/peer-store' import { base58btc } from 'multiformats/bases/base58' -/** - * @typedef {import('../../src/peer-store/types').PeerStore} PeerStore - * @typedef {import('../../src/peer-store/types').KeyBook} KeyBook - * @typedef {import('peer-id')} PeerId - */ - describe('keyBook', () => { let peerId: PeerId let peerStore: PeerStore diff --git a/packages/libp2p-peer-store/test/peer-store.spec.ts b/packages/libp2p-peer-store/test/peer-store.spec.ts index 6bcd1b794..9f5f801bd 100644 --- a/packages/libp2p-peer-store/test/peer-store.spec.ts +++ b/packages/libp2p-peer-store/test/peer-store.spec.ts @@ -10,7 +10,7 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import type { PeerId } from '@libp2p/interfaces/peer-id' import type { PeerStore } from '@libp2p/interfaces/peer-store' -import { mockConnectionGater } from '@libp2p/interface-compliance-tests/utils/mock-connection-gater' +import { mockConnectionGater } from '@libp2p/interface-compliance-tests/mocks' import { base58btc } from 'multiformats/bases/base58' const addr1 = new Multiaddr('/ip4/127.0.0.1/tcp/8000') @@ -22,10 +22,6 @@ const proto1 = '/protocol1' const proto2 = '/protocol2' const proto3 = '/protocol3' -/** - * @typedef {import('../../src/peer-store/types').PeerStore} PeerStore - */ - describe('peer-store', () => { const connectionGater = mockConnectionGater() let peerIds: PeerId[] diff --git a/packages/libp2p-peer-store/tsconfig.json b/packages/libp2p-peer-store/tsconfig.json index bec6fed0c..ce3b69b28 100644 --- a/packages/libp2p-peer-store/tsconfig.json +++ b/packages/libp2p-peer-store/tsconfig.json @@ -21,6 +21,15 @@ }, { "path": "../libp2p-peer-record" + }, + { + "path": "../libp2p-peer-id" + }, + { + "path": "../libp2p-peer-id-factory" + }, + { + "path": "../libp2p-interface-compliance-tests" } ] } diff --git a/packages/libp2p-pubsub/package.json b/packages/libp2p-pubsub/package.json index 57114a57f..b759b67b1 100644 --- a/packages/libp2p-pubsub/package.json +++ b/packages/libp2p-pubsub/package.json @@ -167,8 +167,7 @@ "scripts": { "lint": "aegir lint", "dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js", - "build": "tsc", - "postbuild": "npm run build:copy-proto-files", + "build": "tsc && npm run build:copy-proto-files", "pretest": "npm run build", "test": "aegir test -f ./dist/test/*.js -f ./dist/test/**/*.js", "test:chrome": "npm run test -- -t browser", diff --git a/packages/libp2p-pubsub/src/index.ts b/packages/libp2p-pubsub/src/index.ts index 9fc8e45b7..4b86bcaca 100644 --- a/packages/libp2p-pubsub/src/index.ts +++ b/packages/libp2p-pubsub/src/index.ts @@ -3,7 +3,7 @@ import { EventEmitter, CustomEvent } from '@libp2p/interfaces' import errcode from 'err-code' import { pipe } from 'it-pipe' import Queue from 'p-queue' -import { MulticodecTopology } from '@libp2p/topology/multicodec-topology' +import { Topology } from '@libp2p/topology' import { codes } from './errors.js' import { RPC, IRPC } from './message/rpc.js' import { PeerStreams } from './peer-streams.js' @@ -65,7 +65,8 @@ export abstract class PubsubBaseProtocol extends EventEmitter extends EventEmitter extends EventEmitter { pubsubB.start() ]) - expect(registrarA.streamHandlers).to.have.lengthOf(1) - expect(registrarB.streamHandlers).to.have.lengthOf(1) + expect(registrarA.getHandlers(protocol)).to.have.lengthOf(1) + expect(registrarB.getHandlers(protocol)).to.have.lengthOf(1) }) afterEach(async () => { @@ -125,8 +125,8 @@ describe('pubsub base lifecycle', () => { }) it('should handle onConnect as expected', async () => { - const topologyA = registrarA.topologies.get(protocol) - const handlerB = registrarB.streamHandlers.get(protocol) + const topologyA = registrarA.getTopologies(protocol)[0] + const handlerB = registrarB.getHandlers(protocol)[0] if (topologyA == null || handlerB == null) { throw new Error(`No handler registered for ${protocol}`) @@ -143,8 +143,8 @@ describe('pubsub base lifecycle', () => { }) it('should use the latest connection if onConnect is called more than once', async () => { - const topologyA = registrarA.topologies.get(protocol) - const handlerB = registrarB.streamHandlers.get(protocol) + const topologyA = registrarA.getTopologies(protocol)[0] + const handlerB = registrarB.getHandlers(protocol)[0] if (topologyA == null || handlerB == null) { throw new Error(`No handler registered for ${protocol}`) @@ -184,8 +184,8 @@ describe('pubsub base lifecycle', () => { }) it('should handle newStream errors in onConnect', async () => { - const topologyA = registrarA.topologies.get(protocol) - const handlerB = registrarB.streamHandlers.get(protocol) + const topologyA = registrarA.getTopologies(protocol)[0] + const handlerB = registrarB.getHandlers(protocol)[0] if (topologyA == null || handlerB == null) { throw new Error(`No handler registered for ${protocol}`) @@ -203,9 +203,9 @@ describe('pubsub base lifecycle', () => { }) it('should handle onDisconnect as expected', async () => { - const topologyA = registrarA.topologies.get(protocol) - const topologyB = registrarB.topologies.get(protocol) - const handlerB = registrarB.streamHandlers.get(protocol) + const topologyA = registrarA.getTopologies(protocol)[0] + const topologyB = registrarB.getTopologies(protocol)[0] + const handlerB = registrarB.getHandlers(protocol)[0] if (topologyA == null || handlerB == null) { throw new Error(`No handler registered for ${protocol}`) @@ -226,7 +226,7 @@ describe('pubsub base lifecycle', () => { }) it('should handle onDisconnect for unknown peers', () => { - const topologyA = registrarA.topologies.get(protocol) + const topologyA = registrarA.getTopologies(protocol)[0] expect(pubsubA.peers.size).to.be.eql(0) diff --git a/packages/libp2p-pubsub/test/pubsub.spec.ts b/packages/libp2p-pubsub/test/pubsub.spec.ts index a1bc850fb..99ab59a67 100644 --- a/packages/libp2p-pubsub/test/pubsub.spec.ts +++ b/packages/libp2p-pubsub/test/pubsub.spec.ts @@ -32,7 +32,7 @@ describe('pubsub base implementation', () => { }) }) - afterEach(() => pubsub.stop()) + afterEach(async () => await pubsub.stop()) it('calls _publish for router to forward messages', async () => { sinon.spy(pubsub, '_publish') @@ -74,7 +74,7 @@ describe('pubsub base implementation', () => { await pubsub.start() }) - afterEach(() => pubsub.stop()) + afterEach(async () => await pubsub.stop()) it('should add subscription', () => { pubsub.subscribe(topic) @@ -119,8 +119,8 @@ describe('pubsub base implementation', () => { pubsubA.start(), pubsubB.start() ]) - const topologyA = registrarA.topologies.get(protocol) - const handlerB = registrarB.streamHandlers.get(protocol) + const topologyA = registrarA.getTopologies(protocol)[0] + const handlerB = registrarB.getHandlers(protocol)[0] if (topologyA == null || handlerB == null) { throw new Error(`No handler registered for ${protocol}`) @@ -133,9 +133,11 @@ describe('pubsub base implementation', () => { await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA)) }) - afterEach(() => { - pubsubA.stop() - pubsubB.stop() + afterEach(async () => { + await Promise.all([ + pubsubA.stop(), + pubsubB.stop() + ]) }) it('should send subscribe message to connected peers', async () => { @@ -177,7 +179,7 @@ describe('pubsub base implementation', () => { await pubsub.start() }) - afterEach(() => pubsub.stop()) + afterEach(async () => await pubsub.stop()) it('should remove all subscriptions for a topic', () => { pubsub.subscribe(topic) @@ -227,8 +229,8 @@ describe('pubsub base implementation', () => { pubsubB.start() ]) - const topologyA = registrarA.topologies.get(protocol) - const handlerB = registrarB.streamHandlers.get(protocol) + const topologyA = registrarA.getTopologies(protocol)[0] + const handlerB = registrarB.getHandlers(protocol)[0] if (topologyA == null || handlerB == null) { throw new Error(`No handler registered for ${protocol}`) @@ -241,9 +243,11 @@ describe('pubsub base implementation', () => { await handlerB(await mockIncomingStreamEvent(protocol, c1, peerIdA)) }) - afterEach(() => { - pubsubA.stop() - pubsubB.stop() + afterEach(async () => { + await Promise.all([ + pubsubA.stop(), + pubsubB.stop() + ]) }) it('should send unsubscribe message to connected peers', async () => { @@ -313,7 +317,7 @@ describe('pubsub base implementation', () => { await pubsub.start() }) - afterEach(() => pubsub.stop()) + afterEach(async () => await pubsub.stop()) it('returns the subscribed topics', () => { let subsTopics = pubsub.getTopics() @@ -342,7 +346,7 @@ describe('pubsub base implementation', () => { }) }) - afterEach(() => pubsub.stop()) + afterEach(async () => await pubsub.stop()) it('should fail if pubsub is not started', () => { const topic = 'topic-test' diff --git a/packages/libp2p-pubsub/test/utils/index.ts b/packages/libp2p-pubsub/test/utils/index.ts index b4bc223ba..7aff9699b 100644 --- a/packages/libp2p-pubsub/test/utils/index.ts +++ b/packages/libp2p-pubsub/test/utils/index.ts @@ -5,7 +5,7 @@ import { RPC, IRPC } from '../../src/message/rpc.js' import { CustomEvent } from '@libp2p/interfaces' import type { IncomingStreamData, Registrar, StreamHandler } from '@libp2p/interfaces/registrar' import type { Ed25519PeerId } from '@libp2p/peer-id' -import type { MulticodecTopology } from '@libp2p/topology/multicodec-topology' +import type { Topology } from '@libp2p/interfaces/topology' import type { Connection } from '@libp2p/interfaces/src/connection' import type { PeerId } from '@libp2p/interfaces/src/peer-id' @@ -34,31 +34,73 @@ export class PubsubImplementation extends PubsubBaseProtocol { } export class MockRegistrar implements Registrar { - public readonly topologies: Map = new Map() - public readonly streamHandlers: Map = new Map() + private readonly topologies: Map = new Map() + private readonly handlers: Map = new Map() - async handle (multicodecs: string | string[], handler: StreamHandler) { - if (!Array.isArray(multicodecs)) { - multicodecs = [multicodecs] + async handle (protocols: string | string[], handler: StreamHandler) { + if (!Array.isArray(protocols)) { + protocols = [protocols] } - this.streamHandlers.set(multicodecs[0], handler) + const id = `handler-id-${Math.random()}` + + this.handlers.set(id, { + handler, + protocols + }) + + return id } - async unhandle (multicodec: string) { - this.streamHandlers.delete(multicodec) + async unhandle (id: string) { + this.handlers.delete(id) } - register (topology: MulticodecTopology) { - const { multicodecs } = topology + getHandlers (protocol: string) { + const output: StreamHandler[] = [] + + for (const { handler, protocols } of this.handlers.values()) { + if (protocols.includes(protocol)) { + output.push(handler) + } + } + + return output + } + + register (protocols: string | string[], topology: Topology) { + if (!Array.isArray(protocols)) { + protocols = [protocols] + } + + const id = `topology-id-${Math.random()}` + + this.topologies.set(id, { + topology, + protocols + }) - this.topologies.set(multicodecs[0], topology) + return id + } + + unregister (id: string | string[]) { + if (!Array.isArray(id)) { + id = [id] + } - return multicodecs[0] + id.forEach(id => this.topologies.delete(id)) } - unregister (id: string) { - this.topologies.delete(id) + getTopologies (protocol: string) { + const output: Topology[] = [] + + for (const { topology, protocols } of this.topologies.values()) { + if (protocols.includes(protocol)) { + output.push(topology) + } + } + + return output } } diff --git a/packages/libp2p-topology/src/index.ts b/packages/libp2p-topology/src/index.ts index 3677ce2bd..b6f03970d 100644 --- a/packages/libp2p-topology/src/index.ts +++ b/packages/libp2p-topology/src/index.ts @@ -3,7 +3,7 @@ import type { TopologyOptions, onConnectHandler, onDisconnectHandler } from '@li import type { Registrar } from '@libp2p/interfaces/registrar' const noop = () => {} -const topologySymbol = Symbol.for('@libp2p/js-interfaces/topology') +const topologySymbol = Symbol.for('@libp2p/topology') export class Topology { public min: number @@ -23,12 +23,12 @@ export class Topology { this.max = options.max ?? Infinity this.peers = new Set() - this.onConnect = options.handlers?.onConnect == null ? noop : options.handlers?.onConnect - this.onDisconnect = options.handlers?.onDisconnect == null ? noop : options.handlers?.onDisconnect + this.onConnect = options.onConnect ?? noop + this.onDisconnect = options.onDisconnect ?? noop } get [Symbol.toStringTag] () { - return 'Topology' + return topologySymbol.toString() } get [topologySymbol] () { @@ -38,18 +38,10 @@ export class Topology { /** * Checks if the given value is a Topology instance */ - static isTopology (other: any) { + static isTopology (other: any): other is Topology { return topologySymbol in other } - set registrar (registrar: Registrar | undefined) { - this._registrar = registrar - } - - get registrar () { - return this._registrar - } - /** * Notify about peer disconnected event */ diff --git a/packages/libp2p-topology/src/multicodec-topology.ts b/packages/libp2p-topology/src/multicodec-topology.ts deleted file mode 100644 index 5eab5488a..000000000 --- a/packages/libp2p-topology/src/multicodec-topology.ts +++ /dev/null @@ -1,140 +0,0 @@ -import { Topology } from './index.js' -import all from 'it-all' -import { logger } from '@libp2p/logger' -import type { PeerId } from '@libp2p/interfaces/peer-id' -import type { Peer, PeerStore } from '@libp2p/interfaces/peer-store' -import type { Connection } from '@libp2p/interfaces/connection' -import type { ConnectionManager, Registrar } from '@libp2p/interfaces/registrar' -import type { MulticodecTopologyOptions } from '@libp2p/interfaces/topology' - -const log = logger('libp2p:topology:multicodec-topology') - -interface ChangeProtocolsEvent { - peerId: PeerId - protocols: string[] -} - -const multicodecTopologySymbol = Symbol.for('@libp2p/js-interfaces/topology/multicodec-topology') - -export class MulticodecTopology extends Topology { - public readonly multicodecs: string[] - private readonly peerStore: PeerStore - private readonly connectionManager: ConnectionManager - - constructor (options: MulticodecTopologyOptions) { - super(options) - - this.multicodecs = options.multicodecs - this.peerStore = options.peerStore - this.connectionManager = options.connectionManager - } - - get [Symbol.toStringTag] () { - return 'Topology' - } - - get [multicodecTopologySymbol] () { - return true - } - - /** - * Checks if the given value is a `MulticodecTopology` instance. - */ - static isMulticodecTopology (other: any) { - return Boolean(multicodecTopologySymbol in other) - } - - async setRegistrar (registrar: Registrar | undefined) { - if (registrar == null) { - return - } - - this._registrar = registrar - this.peerStore.addEventListener('change:protocols', (evt) => { - this._onProtocolChange(evt.detail) - }) - this.connectionManager.addEventListener('peer:connect', (evt) => { - this._onPeerConnect(evt.detail) - }) - - // Update topology peers - await this._updatePeers(this.peerStore.getPeers()) - } - - get registrar () { - return this._registrar - } - - /** - * Update topology - */ - async _updatePeers (peerDataIterable: Iterable | AsyncIterable) { - const peerDatas = await all(peerDataIterable) - - for await (const { id, protocols } of peerDatas) { - if (this.multicodecs.filter(multicodec => protocols.includes(multicodec)).length > 0) { - // Add the peer regardless of whether or not there is currently a connection - this.peers.add(id.toString()) - // If there is a connection, call _onConnect - if (this.connectionManager != null) { - const connection = this.connectionManager.getConnection(id) - ;(connection != null) && this.onConnect(id, connection) - } - } else { - // Remove any peers we might be tracking that are no longer of value to us - this.peers.delete(id.toString()) - } - } - } - - /** - * Check if a new peer support the multicodecs for this topology - */ - _onProtocolChange (event: ChangeProtocolsEvent) { - if (this._registrar == null) { - return - } - - const { peerId, protocols } = event - const hadPeer = this.peers.has(peerId.toString()) - const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol)) - - // Not supporting the protocol any more? - if (hadPeer && hasProtocol.length === 0) { - this.onDisconnect(peerId) - } - - let p: Promise | undefined - - // New to protocol support - for (const protocol of protocols) { - if (this.multicodecs.includes(protocol)) { - p = this.peerStore.get(peerId).then(async peerData => await this._updatePeers([peerData])) - break - } - } - - if (p != null) { - p.catch(err => log.error(err)) - } - } - - /** - * Verify if a new connected peer has a topology multicodec and call _onConnect - */ - _onPeerConnect (connection: Connection) { - if (this._registrar == null) { - return - } - - const peerId = connection.remotePeer - this.peerStore.protoBook.get(peerId) - .then(protocols => { - if (this.multicodecs.find(multicodec => protocols.includes(multicodec)) != null) { - this.peers.add(peerId.toString()) - this.onConnect(peerId, connection) - } - }) - .catch(err => log.error(err)) - } -}