From d8a042a32eff8c088119a33f35a5861d08c85fb5 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 19 Jul 2022 12:41:19 +0100 Subject: [PATCH 01/10] feat: add autonat support Implements the [autonat](https://github.com/libp2p/specs/blob/master/autonat/README.md) spec to give us confidence in our external addresses and to open the door to things like switching DHT server mode on automatically, hole punching, v2 circuit relay etc. Depends on: - [ ] https://github.com/libp2p/js-libp2p-interfaces/pull/269 --- package.json | 1 + src/address-manager/index.ts | 86 +++- src/autonat/constants.ts | 4 + src/autonat/index.ts | 560 +++++++++++++++++++++ src/autonat/pb/index.proto | 35 ++ src/autonat/pb/index.ts | 133 +++++ src/config.ts | 8 + src/identify/index.ts | 11 +- src/index.ts | 2 + src/libp2p.ts | 6 + src/nat-manager.ts | 6 +- test/addresses/address-manager.spec.ts | 30 +- test/addresses/addresses.node.ts | 2 +- test/autonat/index.spec.ts | 669 +++++++++++++++++++++++++ test/core/consume-peer-record.spec.ts | 4 +- 15 files changed, 1512 insertions(+), 45 deletions(-) create mode 100644 src/autonat/constants.ts create mode 100644 src/autonat/index.ts create mode 100644 src/autonat/pb/index.proto create mode 100644 src/autonat/pb/index.ts create mode 100644 test/autonat/index.spec.ts diff --git a/package.json b/package.json index f254345301..a99e1f0d51 100644 --- a/package.json +++ b/package.json @@ -83,6 +83,7 @@ "prepublishOnly": "node scripts/update-version.js", "build": "aegir build", "generate": "run-s generate:proto:*", + "generate:proto:autonat": "protons ./src/autonat/pb/index.proto", "generate:proto:circuit": "protons ./src/circuit/pb/index.proto", "generate:proto:fetch": "protons ./src/fetch/pb/proto.proto", "generate:proto:identify": "protons ./src/identify/pb/message.proto", diff --git a/src/address-manager/index.ts b/src/address-manager/index.ts index e011bde4f7..59536323c4 100644 --- a/src/address-manager/index.ts +++ b/src/address-manager/index.ts @@ -3,6 +3,7 @@ import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events' import { Multiaddr } from '@multiformats/multiaddr' import { peerIdFromString } from '@libp2p/peer-id' import type { Components } from '@libp2p/components' +import type { PeerId } from '@libp2p/interface-peer-id' export interface AddressManagerInit { announceFilter?: AddressFilter @@ -29,11 +30,31 @@ export interface AddressFilter { const defaultAddressFilter = (addrs: Multiaddr[]): Multiaddr[] => addrs +interface ObservedAddressMetadata { + confident: boolean +} + +function stripPeerId (ma: Multiaddr, peerId: PeerId) { + const observedPeerId = ma.getPeerId() + + // strip our peer id if it has been passed + if (observedPeerId != null) { + const peerId = peerIdFromString(observedPeerId) + + // use same encoding for comparison + if (peerId.equals(peerId)) { + ma = ma.decapsulate(new Multiaddr(`/p2p/${peerId.toString()}`)) + } + } + + return ma +} + export class DefaultAddressManager extends EventEmitter { private readonly components: Components private readonly listen: Set private readonly announce: Set - private readonly observed: Set + private readonly observed: Map private readonly announceFilter: AddressFilter /** @@ -50,7 +71,7 @@ export class DefaultAddressManager extends EventEmitter { this.components = components this.listen = new Set(listen.map(ma => ma.toString())) this.announce = new Set(announce.map(ma => ma.toString())) - this.observed = new Set() + this.observed = new Map() this.announceFilter = init.announceFilter ?? defaultAddressFilter } @@ -72,35 +93,51 @@ export class DefaultAddressManager extends EventEmitter { * Get observed multiaddrs */ getObservedAddrs (): Multiaddr[] { - return Array.from(this.observed).map((a) => new Multiaddr(a)) + return Array.from(this.observed) + .map(([ma]) => new Multiaddr(ma)) } /** * Add peer observed addresses */ - addObservedAddr (addr: string | Multiaddr): void { - let ma = new Multiaddr(addr) - const remotePeer = ma.getPeerId() - - // strip our peer id if it has been passed - if (remotePeer != null) { - const remotePeerId = peerIdFromString(remotePeer) - - // use same encoding for comparison - if (remotePeerId.equals(this.components.getPeerId())) { - ma = ma.decapsulate(new Multiaddr(`/p2p/${this.components.getPeerId().toString()}`)) - } - } - - const addrString = ma.toString() + addObservedAddr (addr: Multiaddr): void { + addr = stripPeerId(addr, this.components.getPeerId()) + const addrString = addr.toString() - // do not trigger the change:addresses event if we already know about this address if (this.observed.has(addrString)) { return } - this.observed.add(addrString) - this.dispatchEvent(new CustomEvent('change:addresses')) + this.observed.set(addrString, { + confident: false + }) + } + + confirmObservedAddr (addr: Multiaddr) { + addr = stripPeerId(addr, this.components.getPeerId()) + const addrString = addr.toString() + + const metadata = this.observed.get(addrString) ?? { + confident: false + } + + const startingConfidence = metadata.confident + + this.observed.set(addrString, { + confident: true + }) + + // only trigger the change:addresses event if our confidence in an address has changed + if (startingConfidence === false) { + this.dispatchEvent(new CustomEvent('change:addresses')) + } + } + + removeObservedAddr (addr: Multiaddr) { + addr = stripPeerId(addr, this.components.getPeerId()) + const addrString = addr.toString() + + this.observed.delete(addrString) } getAddresses (): Multiaddr[] { @@ -111,7 +148,12 @@ export class DefaultAddressManager extends EventEmitter { addrs = this.components.getTransportManager().getAddrs().map(ma => ma.toString()) } - addrs = addrs.concat(this.getObservedAddrs().map(ma => ma.toString())) + // add observed addresses we are confident in + addrs = addrs.concat( + Array.from(this.observed) + .filter(([ma, metadata]) => metadata.confident) + .map(([ma]) => ma) + ) // dedupe multiaddrs const addrSet = new Set(addrs) diff --git a/src/autonat/constants.ts b/src/autonat/constants.ts new file mode 100644 index 0000000000..d76113de8f --- /dev/null +++ b/src/autonat/constants.ts @@ -0,0 +1,4 @@ + +export const PROTOCOL = '/libp2p/autonat/1.0.0' +export const PROTOCOL_VERSION = '1.0.0' +export const PROTOCOL_NAME = 'autonat' diff --git a/src/autonat/index.ts b/src/autonat/index.ts new file mode 100644 index 0000000000..b0352be3b9 --- /dev/null +++ b/src/autonat/index.ts @@ -0,0 +1,560 @@ +import { logger } from '@libp2p/logger' +import { + PROTOCOL +} from './constants.js' +import type { IncomingStreamData } from '@libp2p/interface-registrar' +import type { Startable } from '@libp2p/interfaces/startable' +import type { Components } from '@libp2p/components' +import { TimeoutController } from 'timeout-abort-controller' +import { abortableDuplex } from 'abortable-iterator' +import { Message } from './pb/index.js' +import { pipe } from 'it-pipe' +import first from 'it-first' +import isPrivateIp from 'private-ip' +import { peerIdFromBytes } from '@libp2p/peer-id' +import { multiaddr, protocols } from '@multiformats/multiaddr' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { DefaultConnectionManager } from '../connection-manager/index.js' +import type { Connection } from '@libp2p/interface-connection' +import parallel from 'it-parallel' +import map from 'it-map' +import type { PeerInfo } from '@libp2p/interface-peer-info' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import type { DefaultAddressManager } from '../address-manager/index.js' +import * as lp from 'it-length-prefixed' +import { setMaxListeners } from 'events' + +const log = logger('libp2p:autonat') + +// if more than 3 peers manage to dial us on what we believe to be our external +// IP then we are convinced that it is, in fact, our external IP +// https://github.com/libp2p/specs/blob/master/autonat/README.md#autonat-protocol +const REQUIRED_SUCCESSFUL_DIALS = 4 + +// Wait this long before we start to query autonat nodes +const AUTONAT_STARTUP_DELAY = 5000 + +// Only try to verify our external address this often +const AUTONAT_REFRESH_INTERVAL = 60000 + +export interface AutonatServiceInit { + /** + * Allows overriding the protocol prefix used + */ + protocolPrefix: string + + /** + * How long we should wait for a remote peer to verify our external address + */ + timeout: number + + /** + * How long to wait after startup before trying to verify our external address + */ + startupDelay: number + + /** + * Verify our external addresses this often + */ + refreshInterval: number + + /** + * How many parallel inbound autonat streams we allow per-connection + */ + maxInboundStreams: number + + /** + * How many parallel outbound autonat streams we allow per-connection + */ + maxOutboundStreams: number +} + +export class AutonatService implements Startable { + private readonly components: Components + private readonly _init: AutonatServiceInit + private readonly startupDelay: number + private readonly refreshInterval: number + private verifyAddressTimeout?: ReturnType + private started: boolean + + constructor (components: Components, init: AutonatServiceInit) { + this.components = components + this.started = false + this._init = init + this.startupDelay = init.startupDelay ?? AUTONAT_STARTUP_DELAY + this.refreshInterval = init.refreshInterval ?? AUTONAT_REFRESH_INTERVAL + + this._verifyExternalAddresses = this._verifyExternalAddresses.bind(this) + } + + isStarted () { + return this.started + } + + async start () { + if (this.started) { + return + } + + await this.components.getRegistrar().handle(PROTOCOL, (data) => { + void this.handleIncomingAutonatStream(data) + .catch(err => { + log.error(err) + }) + }, { + maxInboundStreams: this._init.maxInboundStreams, + maxOutboundStreams: this._init.maxOutboundStreams + }) + + this.verifyAddressTimeout = setTimeout(this._verifyExternalAddresses, this.startupDelay) + + this.started = true + } + + async stop () { + await this.components.getRegistrar().unhandle(PROTOCOL) + clearTimeout(this.verifyAddressTimeout) + + this.started = false + } + + /** + * Handle an incoming autonat request + */ + async handleIncomingAutonatStream (data: IncomingStreamData): Promise { + const controller = new TimeoutController(this._init.timeout) + + // this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning + // appearing in the console + try { + // fails on node < 15.4 + setMaxListeners?.(Infinity, controller.signal) + } catch {} + + const ourHosts = this.components.getAddressManager().getAddresses() + .map(ma => ma.toOptions().host) + + try { + const source = abortableDuplex(data.stream, controller.signal) + const self = this + + await pipe( + source, + lp.decode(), + async function * (stream) { + const buf = await first(stream) + + if (buf == null) { + log('No message received') + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'No message was sent' + } + }) + + return + } + + let request: Message + + try { + request = Message.decode(buf) + } catch (err) { + log.error('Could not decode message', err) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'Could not decode message' + } + }) + + return + } + + const dialRequest = request.dial + + if (dialRequest == null) { + log.error('Dial was missing from message') + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'No Dial message found in message' + } + }) + + return + } + + let peerId: PeerId + const peer = dialRequest.peer + + if (peer == null || peer.id == null) { + log.error('PeerId missing from message') + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'missing peer info' + } + }) + + return + } + + try { + peerId = peerIdFromBytes(peer.id) + } catch (err) { + log.error('Invalid PeerId', err) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'bad peer id' + } + }) + + return + } + + log('Incoming request from %p', peerId) + + // reject any dial requests that arrive via relays + if (!data.connection.remotePeer.equals(peerId)) { + log('Target peer %p did not equal sending peer %p', peerId, data.connection.remotePeer) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'peer id mismatch' + } + }) + + return + } + + // get a list of multiaddrs to dial + const multiaddrs = peer.addrs + .map(buf => multiaddr(buf)) + .filter(ma => { + const isFromSameHost = ma.toOptions().host === data.connection.remoteAddr.toOptions().host + + log.trace('Request to dial %s was sent from %s is same host %s', ma, data.connection.remoteAddr, isFromSameHost) + // skip any Multiaddrs where the target node's IP does not match the sending node's IP + return isFromSameHost + }) + .filter(ma => { + const host = ma.toOptions().host + const isPublicIp = !isPrivateIp(host) + + log.trace('Host %s was public %s', host, isPublicIp) + // don't try to dial private addresses + return isPublicIp + }) + .filter(ma => { + const host = ma.toOptions().host + const isNotOurHost = !ourHosts.includes(host) + + log.trace('Host %s was not our host %s', host, isNotOurHost) + // don't try to dial nodes on the same host as us + return isNotOurHost + }) + .filter(ma => { + const isSupportedTransport = Boolean(self.components.getTransportManager().transportForMultiaddr(ma)) + + log.trace('Transport for %s is supported %s', ma, isSupportedTransport) + // skip any Multiaddrs that have transports we do not support + return isSupportedTransport + }) + .map(ma => { + if (ma.getPeerId() == null) { + // make sure we have the PeerId as part of the Multiaddr + ma = ma.encapsulate(`/p2p/${peerId.toString()}`) + } + + return ma + }) + + // make sure we have something to dial + if (multiaddrs.length === 0) { + log('No valid multiaddrs for %p in message', peerId) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_DIAL_REFUSED, + statusText: 'no dialable addresses' + } + }) + + return + } + + log('Dial multiaddrs %s for peer %p', multiaddrs.map(ma => ma.toString()).join(', '), peerId) + + let errorMessage = '' + let lastMultiaddr = multiaddrs[0] + + for await (const multiaddr of multiaddrs) { + let connection: Connection | undefined + lastMultiaddr = multiaddr + + try { + // use the dialer so we can dial a specific multiaddr instead of every known + // multiaddr for the peer + const dialer = (self.components.getConnectionManager() as DefaultConnectionManager).dialer + connection = await dialer.dial(multiaddr, { + signal: controller.signal + }) + + if (!connection.remoteAddr.equals(multiaddr)) { + log.error('Tried to dial %s but dialed %s', multiaddr, connection.remoteAddr) + throw new Error('Unexpected remote address') + } + + log('Success %p', peerId) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.OK, + addr: connection.remoteAddr.decapsulateCode(protocols('p2p').code).bytes + } + }) + + return + } catch (err: any) { + log('Could not dial %p', peerId, err) + errorMessage = err.message + } finally { + if (connection != null) { + await connection.close() + } + } + } + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_DIAL_ERROR, + statusText: errorMessage, + addr: lastMultiaddr.bytes + } + }) + }, + lp.encode(), + // pipe to the stream, not the abortable source other wise we + // can't tell the remote when a dial timed out.. + data.stream + ) + } finally { + controller.clear() + } + } + + _verifyExternalAddresses () { + void this.verifyExternalAddresses() + .catch(err => { + log.error(err) + }) + } + + /** + * Our multicodec topology noticed a new peer that supports autonat + */ + async verifyExternalAddresses () { + clearTimeout(this.verifyAddressTimeout) + + // Do not try to push if we are not running + if (!this.isStarted()) { + return + } + + const addressManager = this.components.getAddressManager() as DefaultAddressManager + + const multiaddrs = addressManager.getObservedAddrs() + .filter(ma => { + const options = ma.toOptions() + + return !isPrivateIp(options.host) + }) + + if (multiaddrs.length === 0) { + log('No public addresses found, not requesting verification') + this.verifyAddressTimeout = setTimeout(this._verifyExternalAddresses, this.refreshInterval) + + return + } + + const controller = new TimeoutController(this._init.timeout) + + // this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning + // appearing in the console + try { + // fails on node < 15.4 + setMaxListeners?.(Infinity, controller.signal) + } catch {} + + const self = this + + try { + log(`verify multiaddrs %s`, multiaddrs.map(ma => ma.toString()).join(', ')) + + const request = Message.encode({ + type: Message.MessageType.DIAL, + dial: { + peer: { + id: this.components.getPeerId().toBytes(), + addrs: multiaddrs.map(map => map.bytes) + } + } + }) + + // find some random peers + const randomPeer = await createEd25519PeerId() + const randomCid = randomPeer.toBytes() + + const results: Record = {} + const networkSegments: string[] = [] + + async function verifyAddress (peer: PeerInfo): Promise { + try { + log('Asking %p to verify multiaddr', peer.id) + + const connection = await self.components.getConnectionManager().openConnection(peer.id, { + signal: controller.signal + }) + + const stream = await connection.newStream(PROTOCOL, { + signal: controller.signal + }) + const source = abortableDuplex(stream, controller.signal) + + const buf = await pipe( + [request], + lp.encode(), + source, + lp.decode(), + async (stream) => await first(stream) + ) + + if (buf == null) { + log('No response received from %s', connection.remotePeer) + return undefined + } + + const response = Message.decode(buf) + + if (response.type !== Message.MessageType.DIAL_RESPONSE || response.dialResponse == null) { + log('Invalid autonat response from %s', connection.remotePeer) + return undefined + } + + if (response.dialResponse.status === Message.ResponseStatus.OK) { + // make sure we use different network segments + const options = connection.remoteAddr.toOptions() + let segment: string + + if (options.family === 4) { + const octets = options.host.split('.') + segment = octets[0] + } else if (options.family === 6) { + const octets = options.host.split(':') + segment = octets[0] + } else { + log('Remote address "%s" was not IP4 or IP6?', options.host) + return undefined + } + + if (networkSegments.includes(segment)) { + log('Already have response from network segment %d - %s', segment, options.host) + return undefined + } + + networkSegments.push(segment) + } + + return response.dialResponse + } catch (err) { + log.error(err) + } + } + + for await (const dialResponse of parallel(map(this.components.getPeerRouting().getClosestPeers(randomCid, { + signal: controller.signal + }), (peer) => async () => await verifyAddress(peer)), { + concurrency: REQUIRED_SUCCESSFUL_DIALS + })) { + try { + if (dialResponse == null) { + continue + } + + // they either told us which address worked/didn't work, or we only sent them one address + const addr = dialResponse.addr == null ? multiaddrs[0] : multiaddr(dialResponse.addr) + + log('Autonat response for %s is %s', addr, dialResponse.status) + + if (dialResponse.status === Message.ResponseStatus.E_BAD_REQUEST) { + // the remote could not parse our request + continue + } + + if (dialResponse.status === Message.ResponseStatus.E_DIAL_REFUSED) { + // the remote could not honour our request + continue + } + + if (dialResponse.addr == null && multiaddrs.length > 1) { + // we sent the remote multiple addrs but they didn't tell us which ones worked/didn't work + continue + } + + if (!multiaddrs.some(ma => ma.equals(addr))) { + log('Peer reported %s as %s but it was not in our observed address list', addr, dialResponse.status) + continue + } + + const addrStr = addr.toString() + + if (results[addrStr] == null) { + results[addrStr] = { success: 0, failure: 0 } + } + + if (dialResponse.status === Message.ResponseStatus.OK) { + results[addrStr].success++ + } else if (dialResponse.status === Message.ResponseStatus.E_DIAL_ERROR) { + results[addrStr].failure++ + } + + if (results[addrStr].success === REQUIRED_SUCCESSFUL_DIALS) { + // we are now convinced + log('%s is externally dialable', addr) + addressManager.confirmObservedAddr(addr) + return + } + + if (results[addrStr].failure === REQUIRED_SUCCESSFUL_DIALS) { + // we are now unconvinced + log('%s is not externally dialable', addr) + addressManager.removeObservedAddr(addr) + return + } + } catch (err) { + log.error('Could not verify external address', err) + } + } + } finally { + controller.clear() + this.verifyAddressTimeout = setTimeout(this._verifyExternalAddresses, this.refreshInterval) + } + } +} diff --git a/src/autonat/pb/index.proto b/src/autonat/pb/index.proto new file mode 100644 index 0000000000..0bcd0db561 --- /dev/null +++ b/src/autonat/pb/index.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +message Message { + enum MessageType { + DIAL = 0; + DIAL_RESPONSE = 1; + } + + enum ResponseStatus { + OK = 0; + E_DIAL_ERROR = 100; + E_DIAL_REFUSED = 101; + E_BAD_REQUEST = 200; + E_INTERNAL_ERROR = 300; + } + + message PeerInfo { + optional bytes id = 1; + repeated bytes addrs = 2; + } + + message Dial { + optional PeerInfo peer = 1; + } + + message DialResponse { + optional ResponseStatus status = 1; + optional string statusText = 2; + optional bytes addr = 3; + } + + optional MessageType type = 1; + optional Dial dial = 2; + optional DialResponse dialResponse = 3; +} \ No newline at end of file diff --git a/src/autonat/pb/index.ts b/src/autonat/pb/index.ts new file mode 100644 index 0000000000..7efe88a1b9 --- /dev/null +++ b/src/autonat/pb/index.ts @@ -0,0 +1,133 @@ +/* eslint-disable import/export */ +/* eslint-disable @typescript-eslint/no-namespace */ + +import { enumeration, encodeMessage, decodeMessage, message, bytes, string } from 'protons-runtime' +import type { Codec } from 'protons-runtime' + +export interface Message { + type?: Message.MessageType + dial?: Message.Dial + dialResponse?: Message.DialResponse +} + +export namespace Message { + export enum MessageType { + DIAL = 'DIAL', + DIAL_RESPONSE = 'DIAL_RESPONSE' + } + + enum __MessageTypeValues { + DIAL = 0, + DIAL_RESPONSE = 1 + } + + export namespace MessageType { + export const codec = () => { + return enumeration(__MessageTypeValues) + } + } + + export enum ResponseStatus { + OK = 'OK', + E_DIAL_ERROR = 'E_DIAL_ERROR', + E_DIAL_REFUSED = 'E_DIAL_REFUSED', + E_BAD_REQUEST = 'E_BAD_REQUEST', + E_INTERNAL_ERROR = 'E_INTERNAL_ERROR' + } + + enum __ResponseStatusValues { + OK = 0, + E_DIAL_ERROR = 100, + E_DIAL_REFUSED = 101, + E_BAD_REQUEST = 200, + E_INTERNAL_ERROR = 300 + } + + export namespace ResponseStatus { + export const codec = () => { + return enumeration(__ResponseStatusValues) + } + } + + export interface PeerInfo { + id?: Uint8Array + addrs: Uint8Array[] + } + + export namespace PeerInfo { + export const codec = (): Codec => { + return message({ + 1: { name: 'id', codec: bytes, optional: true }, + 2: { name: 'addrs', codec: bytes, repeats: true } + }) + } + + export const encode = (obj: PeerInfo): Uint8Array => { + return encodeMessage(obj, PeerInfo.codec()) + } + + export const decode = (buf: Uint8Array): PeerInfo => { + return decodeMessage(buf, PeerInfo.codec()) + } + } + + export interface Dial { + peer?: Message.PeerInfo + } + + export namespace Dial { + export const codec = (): Codec => { + return message({ + 1: { name: 'peer', codec: Message.PeerInfo.codec(), optional: true } + }) + } + + export const encode = (obj: Dial): Uint8Array => { + return encodeMessage(obj, Dial.codec()) + } + + export const decode = (buf: Uint8Array): Dial => { + return decodeMessage(buf, Dial.codec()) + } + } + + export interface DialResponse { + status?: Message.ResponseStatus + statusText?: string + addr?: Uint8Array + } + + export namespace DialResponse { + export const codec = (): Codec => { + return message({ + 1: { name: 'status', codec: Message.ResponseStatus.codec(), optional: true }, + 2: { name: 'statusText', codec: string, optional: true }, + 3: { name: 'addr', codec: bytes, optional: true } + }) + } + + export const encode = (obj: DialResponse): Uint8Array => { + return encodeMessage(obj, DialResponse.codec()) + } + + export const decode = (buf: Uint8Array): DialResponse => { + return decodeMessage(buf, DialResponse.codec()) + } + } + + export const codec = (): Codec => { + return message({ + 1: { name: 'type', codec: Message.MessageType.codec(), optional: true }, + 2: { name: 'dial', codec: Message.Dial.codec(), optional: true }, + 3: { name: 'dialResponse', codec: Message.DialResponse.codec(), optional: true } + }) + } + + export const encode = (obj: Message): Uint8Array => { + return encodeMessage(obj, Message.codec()) + } + + export const decode = (buf: Uint8Array): Message => { + return decodeMessage(buf, Message.codec()) + } +} diff --git a/src/config.ts b/src/config.ts index a3acf51a2c..954c971971 100644 --- a/src/config.ts +++ b/src/config.ts @@ -100,6 +100,14 @@ const DefaultConfig: Partial = { maxInboundStreams: 1, maxOutboundStreams: 1, timeout: 10000 + }, + autonat: { + protocolPrefix: 'libp2p', + maxInboundStreams: 1, + maxOutboundStreams: 1, + timeout: 30000, + startupDelay: 5000, + refreshInterval: 60000 } } diff --git a/src/identify/index.ts b/src/identify/index.ts index f868a631ac..3088a1cfd6 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -371,10 +371,15 @@ export class IdentifyService implements Startable { } log('identify completed for peer %p and protocols %o', id, protocols) + log('our observed address is %s', cleanObservedAddr) - // TODO: Add and score our observed addr - log('received observed address of %s', cleanObservedAddr?.toString()) - // this.components.getAddressManager().addObservedAddr(observedAddr) + /* + if (cleanObservedAddr != null) { + // TODO: Add and score our observed addr + log('received observed address of %s', cleanObservedAddr?.toString()) + this.components.getAddressManager().addObservedAddr(cleanObservedAddr) + } + */ } /** diff --git a/src/index.ts b/src/index.ts index d5c3830709..137b1495ef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,6 +26,7 @@ import type { KeyChain } from './keychain/index.js' import type { ConnectionManagerInit } from './connection-manager/index.js' import type { PingServiceInit } from './ping/index.js' import type { FetchServiceInit } from './fetch/index.js' +import type { AutonatServiceInit } from './autonat/index.js' export interface PersistentPeerStoreOptions { threshold?: number @@ -112,6 +113,7 @@ export interface Libp2pInit { identify: IdentifyServiceInit ping: PingServiceInit fetch: FetchServiceInit + autonat: AutonatServiceInit transports: Transport[] streamMuxers?: StreamMuxerFactory[] diff --git a/src/libp2p.ts b/src/libp2p.ts index 0a5a435df5..64fc77c3ce 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -28,6 +28,7 @@ import { PersistentPeerStore } from '@libp2p/peer-store' import { DHTContentRouting } from './dht/dht-content-routing.js' import { AutoDialer } from './connection-manager/dialer/auto-dialer.js' import { Initializable, Components, isInitializable } from '@libp2p/components' +import { AutonatService } from './autonat/index.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { Connection } from '@libp2p/interface-connection' import type { PeerRouting } from '@libp2p/interface-peer-routing' @@ -59,6 +60,7 @@ export class Libp2pNode extends EventEmitter implements Libp2p { public identifyService?: IdentifyService public fetchService: FetchService public pingService: PingService + public autonatService: AutonatService public components: Components public peerStore: PeerStore public contentRouting: ContentRouting @@ -234,6 +236,10 @@ export class Libp2pNode extends EventEmitter implements Libp2p { ...init.ping })) + this.autonatService = this.configureComponent(new AutonatService(this.components, { + ...init.autonat + })) + const autoDialer = this.configureComponent(new AutoDialer(this.components, { enabled: init.connectionManager.autoDial !== false, minConnections: init.connectionManager.minConnections, diff --git a/src/nat-manager.ts b/src/nat-manager.ts index 7e9cb0c61b..056f7fec69 100644 --- a/src/nat-manager.ts +++ b/src/nat-manager.ts @@ -157,11 +157,13 @@ export class NatManager implements Startable { protocol: transport.toUpperCase() === 'TCP' ? 'TCP' : 'UDP' }) - this.components.getAddressManager().addObservedAddr(Multiaddr.fromNodeAddress({ + const publicAddr = Multiaddr.fromNodeAddress({ family: 4, address: publicIp, port: publicPort - }, transport)) + }, transport) + + this.components.getAddressManager().addObservedAddr(publicAddr) } } diff --git a/test/addresses/address-manager.spec.ts b/test/addresses/address-manager.spec.ts index 9095e62080..49d367d6c8 100644 --- a/test/addresses/address-manager.spec.ts +++ b/test/addresses/address-manager.spec.ts @@ -1,7 +1,7 @@ /* eslint-env mocha */ import { expect } from 'aegir/chai' -import { Multiaddr, protocols } from '@multiformats/multiaddr' +import { multiaddr, Multiaddr, protocols } from '@multiformats/multiaddr' import { AddressFilter, DefaultAddressManager } from '../../src/address-manager/index.js' import { createNode } from '../utils/creators/peer.js' import { createFromJSON } from '@libp2p/peer-id-factory' @@ -80,13 +80,13 @@ describe('Address Manager', () => { expect(am.getObservedAddrs()).to.be.empty() - am.addObservedAddr('/ip4/123.123.123.123/tcp/39201') + am.addObservedAddr(multiaddr('/ip4/123.123.123.123/tcp/39201')) expect(am.getObservedAddrs()).to.have.lengthOf(1) }) it('should dedupe added observed addresses', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager(new Components({ peerId, transportManager: stubInterface() @@ -101,11 +101,11 @@ describe('Address Manager', () => { am.addObservedAddr(ma) expect(am.getObservedAddrs()).to.have.lengthOf(1) - expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma) + expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma.toString()) }) it('should only emit one change:addresses event', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager(new Components({ peerId, transportManager: stubInterface() @@ -118,16 +118,16 @@ describe('Address Manager', () => { eventCount++ }) - am.addObservedAddr(ma) - am.addObservedAddr(ma) - am.addObservedAddr(ma) - am.addObservedAddr(`${ma}/p2p/${peerId.toString()}`) + am.confirmObservedAddr(ma) + am.confirmObservedAddr(ma) + am.confirmObservedAddr(ma) + am.confirmObservedAddr(multiaddr(`${ma}/p2p/${peerId.toString()}`)) expect(eventCount).to.equal(1) }) it('should strip our peer address from added observed addresses', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager(new Components({ peerId, transportManager: stubInterface() @@ -138,14 +138,14 @@ describe('Address Manager', () => { expect(am.getObservedAddrs()).to.be.empty() am.addObservedAddr(ma) - am.addObservedAddr(`${ma}/p2p/${peerId.toString()}`) + am.addObservedAddr(multiaddr(`${ma}/p2p/${peerId.toString()}`)) expect(am.getObservedAddrs()).to.have.lengthOf(1) - expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma) + expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma.toString()) }) it('should strip our peer address from added observed addresses in difference formats', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager(new Components({ peerId, transportManager: stubInterface() @@ -156,10 +156,10 @@ describe('Address Manager', () => { expect(am.getObservedAddrs()).to.be.empty() am.addObservedAddr(ma) - am.addObservedAddr(`${ma}/p2p/${peerId.toString()}`) + am.addObservedAddr(multiaddr(`${ma}/p2p/${peerId.toString()}`)) expect(am.getObservedAddrs()).to.have.lengthOf(1) - expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma) + expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma.toString()) }) }) diff --git a/test/addresses/addresses.node.ts b/test/addresses/addresses.node.ts index 836a0f11fa..e6b77f9391 100644 --- a/test/addresses/addresses.node.ts +++ b/test/addresses/addresses.node.ts @@ -165,7 +165,7 @@ describe('libp2p.multiaddrs', () => { expect(libp2p.components.getAddressManager().getAddresses()).to.have.lengthOf(listenAddresses.length) - libp2p.components.getAddressManager().addObservedAddr(new Multiaddr(ma)) + libp2p.components.getAddressManager().confirmObservedAddr(new Multiaddr(ma)) expect(libp2p.components.getAddressManager().getAddresses()).to.have.lengthOf(listenAddresses.length + 1) expect(libp2p.components.getAddressManager().getAddresses().map(ma => ma.decapsulateCode(protocols('p2p').code).toString())).to.include(ma) diff --git a/test/autonat/index.spec.ts b/test/autonat/index.spec.ts new file mode 100644 index 0000000000..cb2fa9d5e0 --- /dev/null +++ b/test/autonat/index.spec.ts @@ -0,0 +1,669 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ + +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { Components } from '@libp2p/components' +import { start, stop } from '@libp2p/interfaces/startable' +import { AutonatService, AutonatServiceInit } from '../../src/autonat/index.js' +import { StubbedInstance, stubInterface } from 'ts-sinon' +import type { PeerRouting } from '@libp2p/interface-peer-routing' +import { Multiaddr, multiaddr } from '@multiformats/multiaddr' +import type { Registrar } from '@libp2p/interface-registrar' +import type { AddressManager } from '@libp2p/interface-address-manager' +import type { Connection, Stream } from '@libp2p/interface-connection' +import { PROTOCOL } from '../../src/autonat/constants.js' +import { Message } from '../../src/autonat/pb/index.js' +import type { PeerId } from '@libp2p/interface-peer-id' +import { pushable } from 'it-pushable' +import type { Transport, TransportManager } from '@libp2p/interface-transport' +import type { AddressBook, PeerStore } from '@libp2p/interface-peer-store' +import type { DefaultConnectionManager } from '../../src/connection-manager/index.js' +import type { Dialer } from '../../src/connection-manager/dialer/index.js' +import * as lp from 'it-length-prefixed' +import all from 'it-all' +import { pipe } from 'it-pipe' + +const defaultInit: AutonatServiceInit = { + protocolPrefix: 'libp2p', + maxInboundStreams: 1, + maxOutboundStreams: 1, + timeout: 100, + startupDelay: 120000, + refreshInterval: 120000 +} + +describe('autonat', () => { + let service: AutonatService + let components: Components + let peerRouting: StubbedInstance + let registrar: StubbedInstance + let addressManager: StubbedInstance + let connectionManager: StubbedInstance + let dialer: StubbedInstance + let transportManager: StubbedInstance + let peerStore: StubbedInstance + + beforeEach(async () => { + peerRouting = stubInterface() + registrar = stubInterface() + addressManager = stubInterface() + addressManager.getAddresses.returns([]) + + dialer = stubInterface() + connectionManager = stubInterface() + // @ts-expect-error read-only property + connectionManager.dialer = dialer + transportManager = stubInterface() + peerStore = stubInterface() + peerStore.addressBook = stubInterface() + + components = new Components({ + peerId: await createEd25519PeerId(), + peerRouting, + registrar, + addressManager, + connectionManager, + transportManager, + peerStore + }) + + service = new AutonatService(components, defaultInit) + + await start(components) + await start(service) + }) + + afterEach(async () => { + sinon.restore() + + await stop(service) + await stop(components) + }) + + describe('verify our observed addresses', () => { + async function stubPeerResponse (host: string, dialResponse: Message.DialResponse, peerId?: PeerId) { + // stub random peer lookup + const peer = { + id: peerId ?? await createEd25519PeerId(), + multiaddrs: [], + protocols: [] + } + + // stub connection to remote peer + const connection = stubInterface() + connection.remoteAddr = multiaddr(`/ip4/${host}/tcp/28319/p2p/${peer.id.toString()}`) + connectionManager.openConnection.withArgs(peer.id).resolves(connection) + + // stub autonat protocol stream + const stream = stubInterface() + connection.newStream.withArgs(PROTOCOL).resolves(stream) + + // stub autonat response + const response = Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse + }) + stream.source = (async function * () { + yield lp.varintEncode(response.length) + yield response + }()) + stream.sink.returns(Promise.resolve()) + + return peer + } + + it('should request peers verify our observed address', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // The network says OK + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('125.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('126.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.OK + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.confirmObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not confirm observed multiaddr') + }) + + it('should mark observed address as low confidence when dialing fails', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // The network says ERROR + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('125.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('126.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.removeObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not verify external multiaddr') + }) + + it('should ignore non error or success statuses', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // Mix of responses, mostly OK + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('125.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('126.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('128.124.124.124', { + status: Message.ResponseStatus.E_DIAL_REFUSED + }), + await stubPeerResponse('129.124.124.124', { + status: Message.ResponseStatus.E_INTERNAL_ERROR + }), + await stubPeerResponse('139.124.124.124', { + status: Message.ResponseStatus.OK + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.confirmObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not confirm external multiaddr') + + expect(connectionManager.openConnection.callCount) + .to.equal(peers.length, 'Did not open connections to all peers') + }) + + it('should require confirmation from diverse networks', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // an attacker says OK, the rest of the network says ERROR + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('124.124.124.125', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('124.124.124.126', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('124.124.124.127', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('128.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('129.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('130.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.removeObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not verify external multiaddr') + + expect(connectionManager.openConnection.callCount) + .to.equal(peers.length, 'Did not open connections to all peers') + }) + + it('should require confirmation from diverse peers', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + const peerId = await createEd25519PeerId() + + // an attacker says OK, the rest of the network says ERROR + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }, peerId), + await stubPeerResponse('125.124.124.125', { + status: Message.ResponseStatus.OK + }, peerId), + await stubPeerResponse('126.124.124.126', { + status: Message.ResponseStatus.OK + }, peerId), + await stubPeerResponse('127.124.124.127', { + status: Message.ResponseStatus.OK + }, peerId), + await stubPeerResponse('128.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('129.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('130.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('131.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.removeObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not verify external multiaddr') + + expect(connectionManager.openConnection.callCount) + .to.equal(peers.length, 'Did not open connections to all peers') + }) + + it('should only accept observed addresses', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + const reportedAddress = multiaddr('/ip4/100.100.100.100/tcp/28319') + + // our observed addresses + addressManager.getObservedAddrs.returns([observedAddress]) + + // an attacker says OK, the rest of the network says ERROR + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK, + addr: reportedAddress.bytes + }), + await stubPeerResponse('125.124.124.125', { + status: Message.ResponseStatus.OK, + addr: reportedAddress.bytes + }), + await stubPeerResponse('126.124.124.126', { + status: Message.ResponseStatus.OK, + addr: reportedAddress.bytes + }), + await stubPeerResponse('127.124.124.127', { + status: Message.ResponseStatus.OK, + addr: reportedAddress.bytes + }), + await stubPeerResponse('128.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('129.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('130.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('131.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.removeObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not verify external multiaddr') + + expect(connectionManager.openConnection.callCount) + .to.equal(peers.length, 'Did not open connections to all peers') + }) + + it('should time out when verifying an observed address', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // The network says OK + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('125.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('126.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.OK + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + connectionManager.openConnection.reset() + connectionManager.openConnection.callsFake(async (peer, options = {}) => { + return await Promise.race([ + new Promise((resolve, reject) => { + options.signal?.addEventListener('abort', () => { + reject(new Error('Dial aborted!')) + }) + }), + new Promise((resolve, reject) => { + // longer than the timeout + setTimeout(() => { + reject(new Error('Dial Timeout!')) + }, 1000) + }) + ]) + }) + + await service.verifyExternalAddresses() + + expect(addressManager.addObservedAddr.called) + .to.be.false('Verify external multiaddr when we should have timed out') + }) + }) + + describe('verify others observed addresses', () => { + async function stubIncomingStream (opts: { + requestingPeer?: PeerId + remotePeer?: PeerId + observedAddress?: Multiaddr + remoteAddr?: Multiaddr + message?: Message | Uint8Array | boolean + transportSupported?: boolean + canDial?: boolean + } = {}) { + const requestingPeer = opts.requestingPeer ?? await createEd25519PeerId() + const remotePeer = opts.remotePeer ?? requestingPeer + const observedAddress = opts.observedAddress ?? multiaddr('/ip4/124.124.124.124/tcp/28319') + const remoteAddr = opts.remoteAddr ?? observedAddress.encapsulate(`/p2p/${remotePeer.toString()}`) + const source = pushable() + const sink = pushable() + const stream: Stream = { + ...stubInterface(), + source, + sink: async (stream) => { + for await (const buf of stream) { + sink.push(buf) + } + + sink.end() + } + } + const connection = { + ...stubInterface(), + remotePeer, + remoteAddr + } + + // we might support this transport + transportManager.transportForMultiaddr.withArgs(observedAddress) + .returns(opts.transportSupported === false ? undefined : stubInterface()) + + // we might open a new connection + const newConnection = stubInterface() + newConnection.remotePeer = remotePeer + newConnection.remoteAddr = remoteAddr + + if (opts.canDial === false) { + dialer.dial.rejects(new Error('Could not dial')) + } else if (opts.canDial === true) { + dialer.dial.resolves(newConnection) + } + + let buf: Uint8Array | undefined + + if (opts.message instanceof Uint8Array) { + buf = opts.message + } else if (opts.message == null) { + buf = Message.encode({ + type: Message.MessageType.DIAL, + dial: { + peer: { + id: requestingPeer.toBytes(), + addrs: [ + observedAddress.bytes + ] + } + } + }) + } else if (opts.message !== false && opts.message !== true) { + buf = Message.encode(opts.message) + } + + if (buf != null) { + source.push(lp.varintEncode(buf.byteLength)) + source.push(buf) + } + + source.end() + + await service.handleIncomingAutonatStream({ + stream, + connection + }) + + const slice = await pipe( + sink, + lp.decode(), + source => all(source) + ) + + if (slice.length !== 1) { + throw new Error('Response was not length encoded') + } + + const message = Message.decode(slice[0]) + + if (message.dialResponse?.status === Message.ResponseStatus.OK) { + expect(newConnection.close.called).to.be.true('Did not close connection after dial') + } + + return message + } + + it('should dial a requested address', async () => { + const message = await stubIncomingStream({ + canDial: true + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.OK) + }) + + it('should expect a message', async () => { + const message = await stubIncomingStream({ + message: false + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'No message was sent') + }) + + it('should expect a valid message', async () => { + const message = await stubIncomingStream({ + message: Uint8Array.from([3, 2, 1, 0]) + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'Could not decode message') + }) + + it('should expect a dial message', async () => { + const message = await stubIncomingStream({ + message: {} + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'No Dial message found in message') + }) + + it('should expect a message with a peer id', async () => { + const observedAddress = multiaddr('/ip4/124.124.124.124/tcp/28319') + const message = await stubIncomingStream({ + observedAddress, + message: { + type: Message.MessageType.DIAL, + dial: { + peer: { + addrs: [ + observedAddress.bytes + ] + } + } + } + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'missing peer info') + }) + + it('should expect a message with a valid peer id', async () => { + const observedAddress = multiaddr('/ip4/124.124.124.124/tcp/28319') + const message = await stubIncomingStream({ + observedAddress, + message: { + type: Message.MessageType.DIAL, + dial: { + peer: { + id: Uint8Array.from([0, 1, 2, 3]), + addrs: [ + observedAddress.bytes + ] + } + } + } + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'bad peer id') + }) + + it('should fail to dial a requested address when it arrives via a relay', async () => { + const remotePeer = await createEd25519PeerId() + const requestingPeer = await createEd25519PeerId() + + const message = await stubIncomingStream({ + remotePeer, + remoteAddr: multiaddr(`/ip4/223.223.223.223/tcp/27132/p2p/${remotePeer.toString()}/p2p-circuit/p2p/${requestingPeer.toString()}`), + requestingPeer + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'peer id mismatch') + }) + + it('should refuse to dial a requested address when it is from a different host', async () => { + const requestingPeer = await createEd25519PeerId() + const observedAddress = multiaddr('/ip4/10.10.10.10/tcp/27132') + const remoteAddr = multiaddr(`/ip4/129.129.129.129/tcp/27132/p2p/${requestingPeer.toString()}`) + + const message = await stubIncomingStream({ + requestingPeer, + remoteAddr, + observedAddress + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_DIAL_REFUSED) + expect(message).to.have.nested.property('dialResponse.statusText', 'no dialable addresses') + }) + + it('should refuse to dial a requested address when it is on an unsupported transport', async () => { + const message = await stubIncomingStream({ + transportSupported: false + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_DIAL_REFUSED) + expect(message).to.have.nested.property('dialResponse.statusText', 'no dialable addresses') + }) + + it('should error when to dialing a requested address', async () => { + const message = await stubIncomingStream({ + canDial: false + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_DIAL_ERROR) + expect(message).to.have.nested.property('dialResponse.statusText', 'Could not dial') + }) + + it('should time out when dialing a requested address', async () => { + dialer.dial.callsFake(async function (ma, options = {}) { + return await Promise.race([ + new Promise((resolve, reject) => { + options.signal?.addEventListener('abort', () => { + reject(new Error('Dial aborted!')) + }) + }), + new Promise((resolve, reject) => { + // longer than the timeout + setTimeout(() => { + reject(new Error('Dial Timeout!')) + }, 1000) + }) + ]) + }) + + const message = await stubIncomingStream({ + canDial: undefined + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_DIAL_ERROR) + expect(message).to.have.nested.property('dialResponse.statusText', 'Dial aborted!') + }) + }) +}) diff --git a/test/core/consume-peer-record.spec.ts b/test/core/consume-peer-record.spec.ts index 70dad598dd..e819fc48cb 100644 --- a/test/core/consume-peer-record.spec.ts +++ b/test/core/consume-peer-record.spec.ts @@ -28,7 +28,7 @@ describe('Consume peer record', () => { await libp2p.stop() }) - it('should consume peer record when observed addrs are added', async () => { + it('should consume peer record when observed addrs are confirmed', async () => { let done: () => void libp2p.components.getPeerStore().addressBook.consumePeerRecord = async () => { @@ -42,7 +42,7 @@ describe('Consume peer record', () => { await libp2p.start() - libp2p.components.getAddressManager().addObservedAddr(new Multiaddr('/ip4/123.123.123.123/tcp/3983')) + libp2p.components.getAddressManager().confirmObservedAddr(new Multiaddr('/ip4/123.123.123.123/tcp/3983')) await p From 3c81306381dd621566ef063c8727cbec6dbe6bdb Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 19 Jul 2022 12:49:59 +0100 Subject: [PATCH 02/10] chore: linting --- src/address-manager/index.ts | 2 +- src/autonat/index.ts | 2 +- test/addresses/address-manager.spec.ts | 6 +++--- test/autonat/index.spec.ts | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/address-manager/index.ts b/src/address-manager/index.ts index 59536323c4..72bc9bfd0a 100644 --- a/src/address-manager/index.ts +++ b/src/address-manager/index.ts @@ -128,7 +128,7 @@ export class DefaultAddressManager extends EventEmitter { }) // only trigger the change:addresses event if our confidence in an address has changed - if (startingConfidence === false) { + if (!startingConfidence) { this.dispatchEvent(new CustomEvent('change:addresses')) } } diff --git a/src/autonat/index.ts b/src/autonat/index.ts index b0352be3b9..0fd1875530 100644 --- a/src/autonat/index.ts +++ b/src/autonat/index.ts @@ -406,7 +406,7 @@ export class AutonatService implements Startable { const self = this try { - log(`verify multiaddrs %s`, multiaddrs.map(ma => ma.toString()).join(', ')) + log('verify multiaddrs %s', multiaddrs.map(ma => ma.toString()).join(', ')) const request = Message.encode({ type: Message.MessageType.DIAL, diff --git a/test/addresses/address-manager.spec.ts b/test/addresses/address-manager.spec.ts index 49d367d6c8..1c5e7fcaba 100644 --- a/test/addresses/address-manager.spec.ts +++ b/test/addresses/address-manager.spec.ts @@ -121,7 +121,7 @@ describe('Address Manager', () => { am.confirmObservedAddr(ma) am.confirmObservedAddr(ma) am.confirmObservedAddr(ma) - am.confirmObservedAddr(multiaddr(`${ma}/p2p/${peerId.toString()}`)) + am.confirmObservedAddr(multiaddr(`${ma.toString()}/p2p/${peerId.toString()}`)) expect(eventCount).to.equal(1) }) @@ -138,7 +138,7 @@ describe('Address Manager', () => { expect(am.getObservedAddrs()).to.be.empty() am.addObservedAddr(ma) - am.addObservedAddr(multiaddr(`${ma}/p2p/${peerId.toString()}`)) + am.addObservedAddr(multiaddr(`${ma.toString()}/p2p/${peerId.toString()}`)) expect(am.getObservedAddrs()).to.have.lengthOf(1) expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma.toString()) @@ -156,7 +156,7 @@ describe('Address Manager', () => { expect(am.getObservedAddrs()).to.be.empty() am.addObservedAddr(ma) - am.addObservedAddr(multiaddr(`${ma}/p2p/${peerId.toString()}`)) + am.addObservedAddr(multiaddr(`${ma.toString()}/p2p/${peerId.toString()}`)) expect(am.getObservedAddrs()).to.have.lengthOf(1) expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma.toString()) diff --git a/test/autonat/index.spec.ts b/test/autonat/index.spec.ts index cb2fa9d5e0..a92633babe 100644 --- a/test/autonat/index.spec.ts +++ b/test/autonat/index.spec.ts @@ -491,7 +491,7 @@ describe('autonat', () => { const slice = await pipe( sink, lp.decode(), - source => all(source) + async source => await all(source) ) if (slice.length !== 1) { From a24a5e1127621bcfeedf4ef74607282744d955b7 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 19 Jul 2022 14:35:44 +0100 Subject: [PATCH 03/10] chore: fix node tests --- test/nat-manager/nat-manager.node.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/nat-manager/nat-manager.node.ts b/test/nat-manager/nat-manager.node.ts index 01049a2a0d..d85cd473f8 100644 --- a/test/nat-manager/nat-manager.node.ts +++ b/test/nat-manager/nat-manager.node.ts @@ -14,6 +14,7 @@ import { Components } from '@libp2p/components' import type { NatAPI } from '@achingbrain/nat-port-mapper' import { StubbedInstance, stubInterface } from 'ts-sinon' import { start, stop } from '@libp2p/interfaces/startable' +import { multiaddr } from '@multiformats/multiaddr' const DEFAULT_ADDRESSES = [ '/ip4/127.0.0.1/tcp/0', @@ -99,6 +100,9 @@ describe('Nat Manager (TCP)', () => { }) }) + // simulate autonat having run + components.getAddressManager().confirmObservedAddr(multiaddr('/ip4/82.3.1.5/tcp/4002')) + expect(addressChangedEventFired).to.be.true() }) From 00af7a02e479c14e26459d0eafca37e4d67c9d3d Mon Sep 17 00:00:00 2001 From: Marin Petrunic Date: Mon, 13 Feb 2023 16:34:14 +0100 Subject: [PATCH 04/10] fix: tests --- src/autonat/pb/index.ts | 32 ++++++++++++-------------------- src/circuit/pb/index.ts | 16 ++++++---------- src/fetch/pb/proto.ts | 12 ++++++------ src/identify/pb/message.ts | 4 ++-- src/insecure/pb/proto.ts | 14 ++++++-------- test/autonat/index.spec.ts | 3 ++- 6 files changed, 34 insertions(+), 47 deletions(-) diff --git a/src/autonat/pb/index.ts b/src/autonat/pb/index.ts index 4c3c068301..0d9e840f03 100644 --- a/src/autonat/pb/index.ts +++ b/src/autonat/pb/index.ts @@ -5,8 +5,8 @@ /* eslint-disable @typescript-eslint/no-empty-interface */ import { enumeration, encodeMessage, decodeMessage, message } from 'protons-runtime' -import type { Uint8ArrayList } from 'uint8arraylist' import type { Codec } from 'protons-runtime' +import type { Uint8ArrayList } from 'uint8arraylist' export interface Message { type?: Message.MessageType @@ -40,7 +40,7 @@ export namespace Message { } enum __ResponseStatusValues { - OK = 1, + OK = 0, E_DIAL_ERROR = 100, E_DIAL_REFUSED = 101, E_BAD_REQUEST = 200, @@ -113,7 +113,7 @@ export namespace Message { return _codec } - export const encode = (obj: PeerInfo): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, PeerInfo.codec()) } @@ -138,9 +138,7 @@ export namespace Message { if (obj.peer != null) { w.uint32(10) - Message.PeerInfo.codec().encode(obj.peer, w, { - writeDefaults: false - }) + Message.PeerInfo.codec().encode(obj.peer, w) } if (opts.lengthDelimited !== false) { @@ -171,7 +169,7 @@ export namespace Message { return _codec } - export const encode = (obj: Dial): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, Dial.codec()) } @@ -197,10 +195,8 @@ export namespace Message { } if (obj.status != null) { - w.uint32(10) - Message.ResponseStatus.codec().encode(obj.status, w, { - writeDefaults: false - }) + w.uint32(8) + Message.ResponseStatus.codec().encode(obj.status, w) } if (obj.statusText != null) { @@ -226,7 +222,7 @@ export namespace Message { switch (tag >>> 3) { case 1: - obj.status = Message.ResponseStatus.codec().decode(reader, reader.uint32()) + obj.status = Message.ResponseStatus.codec().decode(reader) break case 2: obj.statusText = reader.string() @@ -247,7 +243,7 @@ export namespace Message { return _codec } - export const encode = (obj: DialResponse): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, DialResponse.codec()) } @@ -272,16 +268,12 @@ export namespace Message { if (obj.dial != null) { w.uint32(18) - Message.Dial.codec().encode(obj.dial, w, { - writeDefaults: false - }) + Message.Dial.codec().encode(obj.dial, w) } if (obj.dialResponse != null) { w.uint32(26) - Message.DialResponse.codec().encode(obj.dialResponse, w, { - writeDefaults: false - }) + Message.DialResponse.codec().encode(obj.dialResponse, w) } if (opts.lengthDelimited !== false) { @@ -318,7 +310,7 @@ export namespace Message { return _codec } - export const encode = (obj: Message): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, Message.codec()) } diff --git a/src/circuit/pb/index.ts b/src/circuit/pb/index.ts index a0134c400f..a32487a18c 100644 --- a/src/circuit/pb/index.ts +++ b/src/circuit/pb/index.ts @@ -5,8 +5,8 @@ /* eslint-disable @typescript-eslint/no-empty-interface */ import { enumeration, encodeMessage, decodeMessage, message } from 'protons-runtime' -import type { Uint8ArrayList } from 'uint8arraylist' import type { Codec } from 'protons-runtime' +import type { Uint8ArrayList } from 'uint8arraylist' export interface CircuitRelay { type?: CircuitRelay.Type @@ -95,7 +95,7 @@ export namespace CircuitRelay { w.fork() } - if (opts.writeDefaults === true || (obj.id != null && obj.id.byteLength > 0)) { + if ((obj.id != null && obj.id.byteLength > 0)) { w.uint32(10) w.bytes(obj.id) } @@ -141,7 +141,7 @@ export namespace CircuitRelay { return _codec } - export const encode = (obj: Peer): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, Peer.codec()) } @@ -166,16 +166,12 @@ export namespace CircuitRelay { if (obj.srcPeer != null) { w.uint32(18) - CircuitRelay.Peer.codec().encode(obj.srcPeer, w, { - writeDefaults: false - }) + CircuitRelay.Peer.codec().encode(obj.srcPeer, w) } if (obj.dstPeer != null) { w.uint32(26) - CircuitRelay.Peer.codec().encode(obj.dstPeer, w, { - writeDefaults: false - }) + CircuitRelay.Peer.codec().encode(obj.dstPeer, w) } if (obj.code != null) { @@ -220,7 +216,7 @@ export namespace CircuitRelay { return _codec } - export const encode = (obj: CircuitRelay): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, CircuitRelay.codec()) } diff --git a/src/fetch/pb/proto.ts b/src/fetch/pb/proto.ts index c607724c95..4eae212dcd 100644 --- a/src/fetch/pb/proto.ts +++ b/src/fetch/pb/proto.ts @@ -5,8 +5,8 @@ /* eslint-disable @typescript-eslint/no-empty-interface */ import { encodeMessage, decodeMessage, message, enumeration } from 'protons-runtime' -import type { Uint8ArrayList } from 'uint8arraylist' import type { Codec } from 'protons-runtime' +import type { Uint8ArrayList } from 'uint8arraylist' export interface FetchRequest { identifier: string @@ -22,7 +22,7 @@ export namespace FetchRequest { w.fork() } - if (opts.writeDefaults === true || obj.identifier !== '') { + if ((obj.identifier != null && obj.identifier !== '')) { w.uint32(10) w.string(obj.identifier) } @@ -57,7 +57,7 @@ export namespace FetchRequest { return _codec } - export const encode = (obj: FetchRequest): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, FetchRequest.codec()) } @@ -99,12 +99,12 @@ export namespace FetchResponse { w.fork() } - if (opts.writeDefaults === true || (obj.status != null && __StatusCodeValues[obj.status] !== 0)) { + if (obj.status != null && __StatusCodeValues[obj.status] !== 0) { w.uint32(8) FetchResponse.StatusCode.codec().encode(obj.status, w) } - if (opts.writeDefaults === true || (obj.data != null && obj.data.byteLength > 0)) { + if ((obj.data != null && obj.data.byteLength > 0)) { w.uint32(18) w.bytes(obj.data) } @@ -143,7 +143,7 @@ export namespace FetchResponse { return _codec } - export const encode = (obj: FetchResponse): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, FetchResponse.codec()) } diff --git a/src/identify/pb/message.ts b/src/identify/pb/message.ts index a78e23255d..2f6c2b276b 100644 --- a/src/identify/pb/message.ts +++ b/src/identify/pb/message.ts @@ -5,8 +5,8 @@ /* eslint-disable @typescript-eslint/no-empty-interface */ import { encodeMessage, decodeMessage, message } from 'protons-runtime' -import type { Uint8ArrayList } from 'uint8arraylist' import type { Codec } from 'protons-runtime' +import type { Uint8ArrayList } from 'uint8arraylist' export interface Identify { protocolVersion?: string @@ -116,7 +116,7 @@ export namespace Identify { return _codec } - export const encode = (obj: Identify): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, Identify.codec()) } diff --git a/src/insecure/pb/proto.ts b/src/insecure/pb/proto.ts index 6098e23eed..d1ebbd510c 100644 --- a/src/insecure/pb/proto.ts +++ b/src/insecure/pb/proto.ts @@ -5,8 +5,8 @@ /* eslint-disable @typescript-eslint/no-empty-interface */ import { encodeMessage, decodeMessage, message, enumeration } from 'protons-runtime' -import type { Uint8ArrayList } from 'uint8arraylist' import type { Codec } from 'protons-runtime' +import type { Uint8ArrayList } from 'uint8arraylist' export interface Exchange { id?: Uint8Array @@ -30,9 +30,7 @@ export namespace Exchange { if (obj.pubkey != null) { w.uint32(18) - PublicKey.codec().encode(obj.pubkey, w, { - writeDefaults: false - }) + PublicKey.codec().encode(obj.pubkey, w) } if (opts.lengthDelimited !== false) { @@ -66,7 +64,7 @@ export namespace Exchange { return _codec } - export const encode = (obj: Exchange): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, Exchange.codec()) } @@ -109,12 +107,12 @@ export namespace PublicKey { w.fork() } - if (opts.writeDefaults === true || (obj.Type != null && __KeyTypeValues[obj.Type] !== 0)) { + if (obj.Type != null && __KeyTypeValues[obj.Type] !== 0) { w.uint32(8) KeyType.codec().encode(obj.Type, w) } - if (opts.writeDefaults === true || (obj.Data != null && obj.Data.byteLength > 0)) { + if ((obj.Data != null && obj.Data.byteLength > 0)) { w.uint32(18) w.bytes(obj.Data) } @@ -153,7 +151,7 @@ export namespace PublicKey { return _codec } - export const encode = (obj: PublicKey): Uint8Array => { + export const encode = (obj: Partial): Uint8Array => { return encodeMessage(obj, PublicKey.codec()) } diff --git a/test/autonat/index.spec.ts b/test/autonat/index.spec.ts index dbed3835a1..e8f12328c6 100644 --- a/test/autonat/index.spec.ts +++ b/test/autonat/index.spec.ts @@ -62,6 +62,7 @@ describe('autonat', () => { components = new DefaultComponents({ peerId: await createEd25519PeerId(), + dialer, peerRouting, registrar, addressManager, @@ -114,7 +115,7 @@ describe('autonat', () => { return peer } - it.only('should request peers verify our observed address', async () => { + it('should request peers verify our observed address', async () => { const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') addressManager.getObservedAddrs.returns([observedAddress]) From 6e038f2c2fd98e05c31a58a329064ab2e3cf186a Mon Sep 17 00:00:00 2001 From: Marin Petrunic Date: Mon, 13 Feb 2023 16:42:48 +0100 Subject: [PATCH 05/10] fix depcheck --- package.json | 1 + test/autonat/index.spec.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index bc3754e25b..33aee2c254 100644 --- a/package.json +++ b/package.json @@ -139,6 +139,7 @@ "it-drain": "^2.0.0", "it-filter": "^2.0.0", "it-first": "^2.0.0", + "it-parallel": "^3.0.0", "it-handshake": "^4.1.2", "it-length-prefixed": "^8.0.2", "it-map": "^2.0.0", diff --git a/test/autonat/index.spec.ts b/test/autonat/index.spec.ts index e8f12328c6..96c2139631 100644 --- a/test/autonat/index.spec.ts +++ b/test/autonat/index.spec.ts @@ -6,7 +6,7 @@ import sinon from 'sinon' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { start, stop } from '@libp2p/interfaces/startable' import { AutonatService, AutonatServiceInit } from '../../src/autonat/index.js' -import { StubbedInstance, stubInterface } from 'ts-sinon' +import { StubbedInstance, stubInterface } from 'sinon-ts' import type { PeerRouting } from '@libp2p/interface-peer-routing' import { Multiaddr, multiaddr } from '@multiformats/multiaddr' import type { Registrar } from '@libp2p/interface-registrar' From e9b26300de468a1e110d11d30dc051276867d854 Mon Sep 17 00:00:00 2001 From: Marin Petrunic Date: Tue, 28 Feb 2023 13:21:10 +0100 Subject: [PATCH 06/10] fix: limit number of observed addresses from identify --- src/config.ts | 3 ++- src/identify/index.ts | 14 +++++--------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/config.ts b/src/config.ts index 547cac639e..e5151a14e0 100644 --- a/src/config.ts +++ b/src/config.ts @@ -76,7 +76,8 @@ const DefaultConfig: Partial = { maxInboundStreams: 1, maxOutboundStreams: 1, maxPushIncomingStreams: 1, - maxPushOutgoingStreams: 1 + maxPushOutgoingStreams: 1, + maxObservedAddresses: 10 }, ping: { protocolPrefix: 'ipfs', diff --git a/src/identify/index.ts b/src/identify/index.ts index 74f381d7e1..e9eb604a13 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -65,6 +65,7 @@ export interface IdentifyServiceInit { maxPushIncomingStreams: number maxPushOutgoingStreams: number + maxObservedAddresses?: number } export interface IdentifyServiceComponents { @@ -380,16 +381,11 @@ export class IdentifyService implements Startable { log('identify completed for peer %p and protocols %o', id, protocols) log('our observed address is %s', cleanObservedAddr) - /* - if (cleanObservedAddr != null) { - // TODO: Add and score our observed addr - log('received observed address of %s', cleanObservedAddr?.toString()) - this.components.getAddressManager().addObservedAddr(cleanObservedAddr) + if (cleanObservedAddr != null && + this.components.addressManager.getObservedAddrs().length < (this.init.maxObservedAddresses ?? Infinity)) { + log('storing our observed address %s', cleanObservedAddr?.toString()) + this.components.addressManager.addObservedAddr(cleanObservedAddr) } - */ - // TODO: Add and score our observed addr - log('received observed address of %s', cleanObservedAddr?.toString()) - // this.components.addressManager.addObservedAddr(observedAddr) } /** From a551e7ff7b537c642d1d7a1a59857978db3f9771 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 14 Mar 2023 10:20:59 +0100 Subject: [PATCH 07/10] chore: fix linting --- src/address-manager/index.ts | 6 +++--- src/autonat/index.ts | 10 +++++----- test/autonat/index.spec.ts | 5 +++-- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/address-manager/index.ts b/src/address-manager/index.ts index 5005f8b2d6..2f5c1cddcd 100644 --- a/src/address-manager/index.ts +++ b/src/address-manager/index.ts @@ -48,7 +48,7 @@ interface ObservedAddressMetadata { confident: boolean } -function stripPeerId (ma: Multiaddr, peerId: PeerId) { +function stripPeerId (ma: Multiaddr, peerId: PeerId): Multiaddr { const observedPeerId = ma.getPeerId() // strip our peer id if it has been passed @@ -140,7 +140,7 @@ export class DefaultAddressManager extends EventEmitter { }) } - confirmObservedAddr (addr: Multiaddr) { + confirmObservedAddr (addr: Multiaddr): void { addr = stripPeerId(addr, this.components.peerId) const addrString = addr.toString() @@ -160,7 +160,7 @@ export class DefaultAddressManager extends EventEmitter { } } - removeObservedAddr (addr: Multiaddr) { + removeObservedAddr (addr: Multiaddr): void { addr = stripPeerId(addr, this.components.peerId) const addrString = addr.toString() diff --git a/src/autonat/index.ts b/src/autonat/index.ts index 5815ba1a5f..a4608332b8 100644 --- a/src/autonat/index.ts +++ b/src/autonat/index.ts @@ -92,11 +92,11 @@ export class AutonatService implements Startable { this._verifyExternalAddresses = this._verifyExternalAddresses.bind(this) } - isStarted () { + isStarted (): boolean { return this.started } - async start () { + async start (): Promise { if (this.started) { return } @@ -116,7 +116,7 @@ export class AutonatService implements Startable { this.started = true } - async stop () { + async stop (): Promise { await this.components.registrar.unhandle(PROTOCOL) clearTimeout(this.verifyAddressTimeout) @@ -364,7 +364,7 @@ export class AutonatService implements Startable { } } - _verifyExternalAddresses () { + _verifyExternalAddresses (): void { void this.verifyExternalAddresses() .catch(err => { log.error(err) @@ -374,7 +374,7 @@ export class AutonatService implements Startable { /** * Our multicodec topology noticed a new peer that supports autonat */ - async verifyExternalAddresses () { + async verifyExternalAddresses (): Promise { clearTimeout(this.verifyAddressTimeout) // Do not try to push if we are not running diff --git a/test/autonat/index.spec.ts b/test/autonat/index.spec.ts index 96c2139631..27c2043325 100644 --- a/test/autonat/index.spec.ts +++ b/test/autonat/index.spec.ts @@ -25,6 +25,7 @@ import { pipe } from 'it-pipe' import { Components, DefaultComponents } from '../../src/components.js' import type { Dialer } from '@libp2p/interface-connection-manager' import { Uint8ArrayList } from 'uint8arraylist' +import { PeerInfo } from '@libp2p/interface-peer-info' const defaultInit: AutonatServiceInit = { protocolPrefix: 'libp2p', @@ -85,7 +86,7 @@ describe('autonat', () => { }) describe('verify our observed addresses', () => { - async function stubPeerResponse (host: string, dialResponse: Message.DialResponse, peerId?: PeerId) { + async function stubPeerResponse (host: string, dialResponse: Message.DialResponse, peerId?: PeerId): Promise { // stub random peer lookup const peer = { id: peerId ?? await createEd25519PeerId(), @@ -418,7 +419,7 @@ describe('autonat', () => { message?: Message | Uint8Array | boolean transportSupported?: boolean canDial?: boolean - } = {}) { + } = {}): Promise { const requestingPeer = opts.requestingPeer ?? await createEd25519PeerId() const remotePeer = opts.remotePeer ?? requestingPeer const observedAddress = opts.observedAddress ?? multiaddr('/ip4/124.124.124.124/tcp/28319') From d6b50a54fe14d4e26606ab1d2837be460aac9e86 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 14 Mar 2023 10:43:45 +0100 Subject: [PATCH 08/10] chore: fix type --- test/autonat/index.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/autonat/index.spec.ts b/test/autonat/index.spec.ts index 27c2043325..4f2f133dd4 100644 --- a/test/autonat/index.spec.ts +++ b/test/autonat/index.spec.ts @@ -25,7 +25,7 @@ import { pipe } from 'it-pipe' import { Components, DefaultComponents } from '../../src/components.js' import type { Dialer } from '@libp2p/interface-connection-manager' import { Uint8ArrayList } from 'uint8arraylist' -import { PeerInfo } from '@libp2p/interface-peer-info' +import type { PeerInfo } from '@libp2p/interface-peer-info' const defaultInit: AutonatServiceInit = { protocolPrefix: 'libp2p', From 05ea09540af238637d7101d6944fa49df42bd397 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 14 Mar 2023 11:33:36 +0100 Subject: [PATCH 09/10] chore: be explicit about component dependencies --- src/address-manager/index.ts | 29 +++++++++----------------- src/autonat/index.ts | 22 ++++++++++++------- src/autonat/pb/index.proto | 2 +- src/index.ts | 10 ++++++--- test/addresses/address-manager.spec.ts | 6 +++--- 5 files changed, 35 insertions(+), 34 deletions(-) diff --git a/src/address-manager/index.ts b/src/address-manager/index.ts index 2f5c1cddcd..35834b3b0c 100644 --- a/src/address-manager/index.ts +++ b/src/address-manager/index.ts @@ -48,15 +48,18 @@ interface ObservedAddressMetadata { confident: boolean } +/** + * If the passed multiaddr contains the passed peer id, remove it + */ function stripPeerId (ma: Multiaddr, peerId: PeerId): Multiaddr { - const observedPeerId = ma.getPeerId() + const observedPeerIdStr = ma.getPeerId() // strip our peer id if it has been passed - if (observedPeerId != null) { - const peerId = peerIdFromString(observedPeerId) + if (observedPeerIdStr != null) { + const observedPeerId = peerIdFromString(observedPeerIdStr) // use same encoding for comparison - if (peerId.equals(peerId)) { + if (observedPeerId.equals(peerId)) { ma = ma.decapsulate(multiaddr(`/p2p/${peerId.toString()}`)) } } @@ -114,21 +117,9 @@ export class DefaultAddressManager extends EventEmitter { /** * Add peer observed addresses */ - addObservedAddr (addr: string | Multiaddr): void { - let ma = multiaddr(addr) - const remotePeer = ma.getPeerId() - - // strip our peer id if it has been passed - if (remotePeer != null) { - const remotePeerId = peerIdFromString(remotePeer) - - // use same encoding for comparison - if (remotePeerId.equals(this.components.peerId)) { - ma = ma.decapsulate(multiaddr(`/p2p/${this.components.peerId.toString()}`)) - } - } - - const addrString = ma.toString() + addObservedAddr (addr: Multiaddr): void { + addr = stripPeerId(addr, this.components.peerId) + const addrString = addr.toString() // do not trigger the change:addresses event if we already know about this address if (this.observed.has(addrString)) { diff --git a/src/autonat/index.ts b/src/autonat/index.ts index a4608332b8..14ce263d97 100644 --- a/src/autonat/index.ts +++ b/src/autonat/index.ts @@ -1,7 +1,11 @@ +import { AddressManager } from '@libp2p/interface-address-manager' import type { Connection } from '@libp2p/interface-connection' +import { ConnectionManager, Dialer } from '@libp2p/interface-connection-manager' import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerInfo } from '@libp2p/interface-peer-info' -import type { IncomingStreamData } from '@libp2p/interface-registrar' +import { PeerRouting } from '@libp2p/interface-peer-routing' +import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar' +import { TransportManager } from '@libp2p/interface-transport' import type { Startable } from '@libp2p/interfaces/startable' import { logger } from '@libp2p/logger' import { peerIdFromBytes } from '@libp2p/peer-id' @@ -16,7 +20,6 @@ import parallel from 'it-parallel' import { pipe } from 'it-pipe' import isPrivateIp from 'private-ip' import { TimeoutController } from 'timeout-abort-controller' -import type { Components } from '../components.js' import { PROTOCOL } from './constants.js' @@ -67,12 +70,15 @@ export interface AutonatServiceInit { maxOutboundStreams: number } -export type DefaultAutonatComponents = Pick< -Components, -'registrar' | 'addressManager' | -'transportManager' | 'dialer' | -'peerId' | 'connectionManager' | 'peerRouting' -> +export interface DefaultAutonatComponents { + registrar: Registrar + addressManager: AddressManager + transportManager: TransportManager + dialer: Dialer + peerId: PeerId + connectionManager: ConnectionManager + peerRouting: PeerRouting +} export class AutonatService implements Startable { private readonly components: DefaultAutonatComponents diff --git a/src/autonat/pb/index.proto b/src/autonat/pb/index.proto index 0bcd0db561..3e71e23197 100644 --- a/src/autonat/pb/index.proto +++ b/src/autonat/pb/index.proto @@ -32,4 +32,4 @@ message Message { optional MessageType type = 1; optional Dial dial = 2; optional DialResponse dialResponse = 3; -} \ No newline at end of file +} diff --git a/src/index.ts b/src/index.ts index d9659c3a74..ebe9b60b2a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -108,19 +108,23 @@ export interface Libp2pInit { relay: (components: Components) => CircuitRelayService /** - * libp2p identify protocol options + * identify protocol options */ identify: IdentifyServiceInit /** - * libp2p ping protocol options + * ping protocol options */ ping: PingServiceInit /** - * libp2p fetch protocol options + * fetch protocol options */ fetch: FetchServiceInit + + /** + * autonat protocol options + */ autonat: AutonatServiceInit /** diff --git a/test/addresses/address-manager.spec.ts b/test/addresses/address-manager.spec.ts index 823ce469e9..41274f2da9 100644 --- a/test/addresses/address-manager.spec.ts +++ b/test/addresses/address-manager.spec.ts @@ -104,7 +104,7 @@ describe('Address Manager', () => { }) it('should dedupe added observed addresses', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager({ peerId, transportManager: stubInterface() @@ -145,7 +145,7 @@ describe('Address Manager', () => { }) it('should strip our peer address from added observed addresses', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager({ peerId, transportManager: stubInterface() @@ -163,7 +163,7 @@ describe('Address Manager', () => { }) it('should strip our peer address from added observed addresses in difference formats', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager({ peerId, transportManager: stubInterface() From a3ec6969f8576cafa13c0e6bd1cd4829b1cfe7fb Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 14 Mar 2023 11:41:01 +0100 Subject: [PATCH 10/10] chore: linting again --- src/autonat/index.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/autonat/index.ts b/src/autonat/index.ts index 14ce263d97..62003c97e3 100644 --- a/src/autonat/index.ts +++ b/src/autonat/index.ts @@ -1,11 +1,11 @@ -import { AddressManager } from '@libp2p/interface-address-manager' +import type { AddressManager } from '@libp2p/interface-address-manager' import type { Connection } from '@libp2p/interface-connection' -import { ConnectionManager, Dialer } from '@libp2p/interface-connection-manager' +import type { ConnectionManager, Dialer } from '@libp2p/interface-connection-manager' import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerInfo } from '@libp2p/interface-peer-info' -import { PeerRouting } from '@libp2p/interface-peer-routing' +import type { PeerRouting } from '@libp2p/interface-peer-routing' import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar' -import { TransportManager } from '@libp2p/interface-transport' +import type { TransportManager } from '@libp2p/interface-transport' import type { Startable } from '@libp2p/interfaces/startable' import { logger } from '@libp2p/logger' import { peerIdFromBytes } from '@libp2p/peer-id'