From c91d6c2a91cacf6edbba5227b514c828274ece54 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 9 Oct 2024 14:08:55 +0100 Subject: [PATCH 1/2] fix: remove old relay tags on start To ensure we don't think we have a reserved slot on a relay, remove the old tags on startup. --- packages/interface/src/index.ts | 30 +++++++-- .../src/connection-manager/reconnect-queue.ts | 10 ++- .../transport-circuit-relay-v2/src/index.ts | 4 +- .../src/transport/index.ts | 4 +- .../src/transport/reservation-store.ts | 67 +++++++++++++++---- .../test/transport/reservation-store.spec.ts | 64 ++++++++++++++++++ 6 files changed, 153 insertions(+), 26 deletions(-) create mode 100644 packages/transport-circuit-relay-v2/test/transport/reservation-store.spec.ts diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index 13c381a15b..eb29f1d7f1 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -161,9 +161,10 @@ export interface Libp2pEvents { 'peer:connect': CustomEvent /** - * This event will be triggered any time we are disconnected from another peer, regardless of - * the circumstances of that disconnection. If we happen to have multiple connections to a - * peer, this event will **only** be triggered when the last connection is closed. + * This event will be triggered any time we are disconnected from another + * peer, regardless of the circumstances of that disconnection. If we happen + * to have multiple connections to a peer, this event will **only** be + * triggered when the last connection is closed. * * @example * @@ -177,9 +178,26 @@ export interface Libp2pEvents { 'peer:disconnect': CustomEvent /** - * This event is dispatched after a remote peer has successfully responded to the identify - * protocol. Note that for this to be emitted, both peers must have an identify service - * configured. + * When a peer tagged with `keep-alive` disconnects, we will make multiple + * attempts to reconnect to it with a backoff factor (see the connection + * manager settings for details). If these all fail, the `keep-alive` tag will + * be removed and this event will be emitted. + * + * @example + * + * ```TypeScript + * libp2p.addEventListener('peer:reconnect-failure', (event) => { + * const peerId = event.detail + * // ... + * }) + * ``` + */ + 'peer:reconnect-failure': CustomEvent + + /** + * This event is dispatched after a remote peer has successfully responded to + * the identify protocol. Note that for this to be emitted, both peers must + * have an identify service configured. * * @example * diff --git a/packages/libp2p/src/connection-manager/reconnect-queue.ts b/packages/libp2p/src/connection-manager/reconnect-queue.ts index c3dff8bca5..aa1490a014 100644 --- a/packages/libp2p/src/connection-manager/reconnect-queue.ts +++ b/packages/libp2p/src/connection-manager/reconnect-queue.ts @@ -33,6 +33,7 @@ export class ReconnectQueue implements Startable { private readonly retryInterval?: number private readonly backoffFactor?: number private readonly connectionManager: ConnectionManager + private readonly events: TypedEventTarget constructor (components: ReconnectQueueComponents, init: ReconnectQueueInit = {}) { this.log = components.logger.forComponent('libp2p:reconnect-queue') @@ -47,11 +48,12 @@ export class ReconnectQueue implements Startable { this.retries = init.retries ?? 5 this.backoffFactor = init.backoffFactor this.retryInterval = init.retryInterval + this.events = components.events components.events.addEventListener('peer:disconnect', (evt) => { this.maybeReconnect(evt.detail) .catch(err => { - this.log.error('failed to maybe reconnect to %p', evt.detail, err) + this.log.error('failed to maybe reconnect to %p - %e', evt.detail, err) }) }) } @@ -82,7 +84,7 @@ export class ReconnectQueue implements Startable { signal: options?.signal }) } catch (err) { - this.log('reconnecting to %p attempt %d of %d failed', peerId, attempt, this.retries, err) + this.log('reconnecting to %p attempt %d of %d failed - %e', peerId, attempt, this.retries, err) throw err } }, { @@ -108,6 +110,10 @@ export class ReconnectQueue implements Startable { await this.peerStore.merge(peerId, { tags }) + + this.events.safeDispatchEvent('peer:reconnect-failure', { + detail: peerId + }) }) .catch(async err => { this.log.error('failed to remove keep-alive tag from %p - %e', peerId, err) diff --git a/packages/transport-circuit-relay-v2/src/index.ts b/packages/transport-circuit-relay-v2/src/index.ts index 98d94b0259..4a55cdf2bb 100644 --- a/packages/transport-circuit-relay-v2/src/index.ts +++ b/packages/transport-circuit-relay-v2/src/index.ts @@ -64,10 +64,10 @@ export interface CircuitRelayService extends TypedEventEmitter private readonly log: Logger private readonly relayFilter: Filter - constructor (components: RelayStoreComponents, init?: RelayStoreInit) { + constructor (components: ReservationStoreComponents, init?: ReservationStoreInit) { super() this.log = components.logger.forComponent('libp2p:circuit-relay:transport:reservation-store') @@ -120,7 +120,7 @@ export class ReservationStore extends TypedEventEmitter // When a peer disconnects, if we had a reservation on that peer // remove the reservation and multiaddr and maybe trigger search // for new relays - this.events.addEventListener('peer:disconnect', (evt) => { + this.events.addEventListener('peer:reconnect-failure', (evt) => { this.#removeRelay(evt.detail) }) } @@ -134,10 +134,37 @@ export class ReservationStore extends TypedEventEmitter } afterStart (): void { - if (this.reservations.size < this.maxDiscoveredRelays) { - this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) - this.safeDispatchEvent('relay:not-enough-relays', {}) - } + // remove old relay tags + void Promise.resolve() + .then(async () => { + const relayPeers: Peer[] = await this.peerStore.all({ + filters: [(peer) => { + return peer.tags.has(RELAY_TAG) + }] + }) + + this.log('removing tag from %d old relays', relayPeers.length) + + // remove old relay tag and redial + await Promise.all( + relayPeers.map(async peer => { + await this.peerStore.merge(peer.id, { + tags: { + [RELAY_TAG]: undefined, + [KEEP_ALIVE_TAG]: undefined + } + }) + }) + ) + + if (this.reservations.size < this.maxDiscoveredRelays) { + this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) + this.safeDispatchEvent('relay:not-enough-relays', {}) + } + }) + .catch(err => { + this.log.error(err) + }) } stop (): void { @@ -360,11 +387,23 @@ export class ReservationStore extends TypedEventEmitter clearTimeout(existingReservation.timeout) this.reservations.delete(peerId) - this.safeDispatchEvent('relay:removed', { detail: peerId }) + // ensure we don't close the connection to the relay + this.peerStore.merge(peerId, { + tags: { + [RELAY_TAG]: undefined, + [KEEP_ALIVE_TAG]: undefined + } + }) + .then(() => { + this.safeDispatchEvent('relay:removed', { detail: peerId }) - if (this.reservations.size < this.maxDiscoveredRelays) { - this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) - this.safeDispatchEvent('relay:not-enough-relays', {}) - } + if (this.reservations.size < this.maxDiscoveredRelays) { + this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) + this.safeDispatchEvent('relay:not-enough-relays', {}) + } + }) + .catch(err => { + this.log('could not update tags for relay %p - %e', peerId, err) + }) } } diff --git a/packages/transport-circuit-relay-v2/test/transport/reservation-store.spec.ts b/packages/transport-circuit-relay-v2/test/transport/reservation-store.spec.ts new file mode 100644 index 0000000000..a6466d868d --- /dev/null +++ b/packages/transport-circuit-relay-v2/test/transport/reservation-store.spec.ts @@ -0,0 +1,64 @@ +import { generateKeyPair } from '@libp2p/crypto/keys' +import { TypedEventEmitter, start } from '@libp2p/interface' +import { defaultLogger } from '@libp2p/logger' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { expect } from 'aegir/chai' +import delay from 'delay' +import { stubInterface } from 'sinon-ts' +import { KEEP_ALIVE_TAG, RELAY_TAG } from '../../src/constants.js' +import { ReservationStore } from '../../src/transport/reservation-store.js' +import type { ComponentLogger, Libp2pEvents, Peer, PeerId, PeerStore, TypedEventTarget } from '@libp2p/interface' +import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal' +import type { StubbedInstance } from 'sinon-ts' + +export interface StubbedReservationStoreComponents { + peerId: PeerId + connectionManager: StubbedInstance + transportManager: StubbedInstance + peerStore: StubbedInstance + events: TypedEventTarget + logger: ComponentLogger +} + +describe('transport reservation-store', () => { + let store: ReservationStore + let components: StubbedReservationStoreComponents + + beforeEach(async () => { + const privateKey = await generateKeyPair('Ed25519') + + components = { + peerId: peerIdFromPrivateKey(privateKey), + connectionManager: stubInterface(), + transportManager: stubInterface(), + peerStore: stubInterface(), + events: new TypedEventEmitter(), + logger: defaultLogger() + } + + store = new ReservationStore(components) + }) + + it('should remove relay tags on start', async () => { + const peer: Peer = { + id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), + addresses: [], + metadata: new Map(), + tags: new Map([[RELAY_TAG, { value: 1 }]]), + protocols: [] + } + + components.peerStore.all.resolves([peer]) + + await start(store) + + await delay(100) + + expect(components.peerStore.merge.calledWith(peer.id, { + tags: { + [RELAY_TAG]: undefined, + [KEEP_ALIVE_TAG]: undefined + } + })).to.be.true() + }) +}) From 9ea80a4c8ef28037f6f8078b4f65688f3aaa9871 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 9 Oct 2024 17:04:24 +0100 Subject: [PATCH 2/2] fix: search for new relays on disconnect --- .../test/circuit-relay.node.ts | 9 +- .../transport-circuit-relay-v2/package.json | 1 - .../src/constants.ts | 5 +- .../src/server/index.ts | 107 ++++++++-------- .../src/transport/discovery.ts | 2 +- .../src/transport/listener.ts | 53 ++++---- .../src/transport/reservation-store.ts | 115 ++++++++++-------- .../test/hop.spec.ts | 6 +- 8 files changed, 163 insertions(+), 135 deletions(-) diff --git a/packages/integration-tests/test/circuit-relay.node.ts b/packages/integration-tests/test/circuit-relay.node.ts index d874c5408f..014fe7ce24 100644 --- a/packages/integration-tests/test/circuit-relay.node.ts +++ b/packages/integration-tests/test/circuit-relay.node.ts @@ -624,15 +624,16 @@ describe('circuit-relay', () => { return circuitMultiaddrs.length > 0 }) - expect(circuitListener[0].relayStore.listenerCount('relay:removed')).to.equal(2) + expect(circuitListener[0].reservationStore.listenerCount('relay:removed')).to.equal(2) - // remove one listener - await local.hangUp(relay1.peerId) + // stop the listener + await circuitListener[0].close() + // not using the relay any more await notUsingAsRelay(local, relay1) // expect 1 listener - expect(circuitListener[0].relayStore.listenerCount('relay:removed')).to.equal(1) + expect(circuitListener[0].reservationStore.listenerCount('relay:removed')).to.equal(1) }) it('should mark an outgoing relayed connection as limited', async () => { diff --git a/packages/transport-circuit-relay-v2/package.json b/packages/transport-circuit-relay-v2/package.json index 3fee78a504..5ec8bd170d 100644 --- a/packages/transport-circuit-relay-v2/package.json +++ b/packages/transport-circuit-relay-v2/package.json @@ -64,7 +64,6 @@ "it-protobuf-stream": "^1.1.3", "it-stream-types": "^2.0.1", "multiformats": "^13.1.0", - "p-defer": "^4.0.1", "progress-events": "^1.0.0", "protons-runtime": "^5.4.0", "race-signal": "^1.0.2", diff --git a/packages/transport-circuit-relay-v2/src/constants.ts b/packages/transport-circuit-relay-v2/src/constants.ts index 7701773af6..b77fdb57ad 100644 --- a/packages/transport-circuit-relay-v2/src/constants.ts +++ b/packages/transport-circuit-relay-v2/src/constants.ts @@ -29,9 +29,9 @@ export const DEFAULT_MAX_RESERVATION_TTL = 2 * 60 * minute export const DEFAULT_RESERVATION_CONCURRENCY = 1 /** - * How long to wait for a reservation attempt to finsih + * How long to wait for a reservation attempt to finish */ -export const DEFAULT_RESERVATION_COMPLETION_TIMEOUT = 1000 +export const DEFAULT_RESERVATION_COMPLETION_TIMEOUT = 2000 /** * How long to let the reservation attempt queue to grow @@ -43,6 +43,7 @@ export const RELAY_SOURCE_TAG = 'circuit-relay-source' export const RELAY_TAG = 'circuit-relay-relay' export const KEEP_ALIVE_TAG = `${KEEP_ALIVE}-circuit-relay` +export const KEEP_ALIVE_SOURCE_TAG = `${KEEP_ALIVE}-circuit-relay-source` // circuit v2 connection limits // https://github.com/libp2p/go-libp2p/blob/master/p2p/protocol/circuitv2/relay/resources.go#L61-L66 diff --git a/packages/transport-circuit-relay-v2/src/server/index.ts b/packages/transport-circuit-relay-v2/src/server/index.ts index 2edef91f7e..79da7d3966 100644 --- a/packages/transport-circuit-relay-v2/src/server/index.ts +++ b/packages/transport-circuit-relay-v2/src/server/index.ts @@ -4,10 +4,10 @@ import { RecordEnvelope } from '@libp2p/peer-record' import { type Multiaddr, multiaddr } from '@multiformats/multiaddr' import { pbStream, type ProtobufStream } from 'it-protobuf-stream' import * as Digest from 'multiformats/hashes/digest' -import pDefer from 'p-defer' import { CIRCUIT_PROTO_CODE, DEFAULT_HOP_TIMEOUT, + KEEP_ALIVE_SOURCE_TAG, MAX_CONNECTIONS, RELAY_SOURCE_TAG, RELAY_V2_HOP_CODEC, @@ -18,7 +18,7 @@ import { createLimitedRelay } from '../utils.js' import { ReservationStore, type ReservationStoreInit } from './reservation-store.js' import { ReservationVoucherRecord } from './reservation-voucher.js' import type { CircuitRelayService, RelayReservation } from '../index.js' -import type { ComponentLogger, Logger, Connection, Stream, ConnectionGater, PeerId, PeerStore, Startable, PrivateKey, Metrics } from '@libp2p/interface' +import type { ComponentLogger, Logger, Connection, Stream, ConnectionGater, PeerId, PeerStore, Startable, PrivateKey, Metrics, AbortOptions } from '@libp2p/interface' import type { AddressManager, ConnectionManager, IncomingStreamData, Registrar } from '@libp2p/interface-internal' import type { PeerMap } from '@libp2p/peer-collections' @@ -175,17 +175,13 @@ class CircuitRelayServer extends TypedEventEmitter implements async onHop ({ connection, stream }: IncomingStreamData): Promise { this.log('received circuit v2 hop protocol stream from %p', connection.remotePeer) - const hopTimeoutPromise = pDefer() - const timeout = setTimeout(() => { - hopTimeoutPromise.reject('timed out') - }, this.hopTimeout) + const options = { + signal: AbortSignal.timeout(this.hopTimeout) + } const pbstr = pbStream(stream) try { - const request: HopMessage = await Promise.race([ - pbstr.pb(HopMessage).read(), - hopTimeoutPromise.promise - ]) + const request: HopMessage = await pbstr.pb(HopMessage).read(options) if (request?.type == null) { throw new Error('request was invalid, could not read from stream') @@ -193,31 +189,26 @@ class CircuitRelayServer extends TypedEventEmitter implements this.log('received', request.type) - await Promise.race([ - this.handleHopProtocol({ - connection, - stream: pbstr, - request - }), - hopTimeoutPromise.promise - ]) + await this.handleHopProtocol({ + connection, + stream: pbstr, + request + }, options) } catch (err: any) { this.log.error('error while handling hop', err) await pbstr.pb(HopMessage).write({ type: HopMessage.Type.STATUS, status: Status.MALFORMED_MESSAGE - }) + }, options) stream.abort(err) - } finally { - clearTimeout(timeout) } } - async handleHopProtocol ({ stream, request, connection }: HopProtocolOptions): Promise { + async handleHopProtocol ({ stream, request, connection }: HopProtocolOptions, options: AbortOptions): Promise { this.log('received hop message') switch (request.type) { - case HopMessage.Type.RESERVE: await this.handleReserve({ stream, request, connection }); break - case HopMessage.Type.CONNECT: await this.handleConnect({ stream, request, connection }); break + case HopMessage.Type.RESERVE: await this.handleReserve({ stream, request, connection }, options); break + case HopMessage.Type.CONNECT: await this.handleConnect({ stream, request, connection }, options); break default: { this.log.error('invalid hop request type %s via peer %p', request.type, connection.remotePeer) await stream.pb(HopMessage).write({ type: HopMessage.Type.STATUS, status: Status.UNEXPECTED_MESSAGE }) @@ -225,37 +216,38 @@ class CircuitRelayServer extends TypedEventEmitter implements } } - async handleReserve ({ stream, request, connection }: HopProtocolOptions): Promise { + async handleReserve ({ stream, connection }: HopProtocolOptions, options: AbortOptions): Promise { const hopstr = stream.pb(HopMessage) this.log('hop reserve request from %p', connection.remotePeer) if (isRelayAddr(connection.remoteAddr)) { this.log.error('relay reservation over circuit connection denied for peer: %p', connection.remotePeer) - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }) + await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }, options) return } if ((await this.connectionGater.denyInboundRelayReservation?.(connection.remotePeer)) === true) { this.log.error('reservation for %p denied by connection gater', connection.remotePeer) - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }) + await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }, options) return } const result = this.reservationStore.reserve(connection.remotePeer, connection.remoteAddr) - if (result.status !== Status.OK) { - await hopstr.write({ type: HopMessage.Type.STATUS, status: result.status }) - return - } - try { + if (result.status !== Status.OK) { + await hopstr.write({ type: HopMessage.Type.STATUS, status: result.status }, options) + return + } + // tag relay target peer // result.expire is non-null if `ReservationStore.reserve` returns with status == OK if (result.expire != null) { const ttl = (result.expire * 1000) - Date.now() await this.peerStore.merge(connection.remotePeer, { tags: { - [RELAY_SOURCE_TAG]: { value: 1, ttl } + [RELAY_SOURCE_TAG]: { value: 1, ttl }, + [KEEP_ALIVE_SOURCE_TAG]: { value: 1, ttl } } }) } @@ -265,11 +257,22 @@ class CircuitRelayServer extends TypedEventEmitter implements status: Status.OK, reservation: await this.makeReservation(connection.remotePeer, BigInt(result.expire ?? 0)), limit: this.reservationStore.get(connection.remotePeer)?.limit - }) + }, options) this.log('sent confirmation response to %s', connection.remotePeer) } catch (err) { - this.log.error('failed to send confirmation response to %p', connection.remotePeer, err) + this.log.error('failed to send confirmation response to %p - %e', connection.remotePeer, err) this.reservationStore.removeReservation(connection.remotePeer) + + try { + await this.peerStore.merge(connection.remotePeer, { + tags: { + [RELAY_SOURCE_TAG]: undefined, + [KEEP_ALIVE_SOURCE_TAG]: undefined + } + }) + } catch (err) { + this.log.error('failed to untag relay source peer %p - %e', connection.remotePeer, err) + } } } @@ -300,12 +303,12 @@ class CircuitRelayServer extends TypedEventEmitter implements } } - async handleConnect ({ stream, request, connection }: HopProtocolOptions): Promise { + async handleConnect ({ stream, request, connection }: HopProtocolOptions, options: AbortOptions): Promise { const hopstr = stream.pb(HopMessage) if (isRelayAddr(connection.remoteAddr)) { this.log.error('relay reservation over circuit connection denied for peer: %p', connection.remotePeer) - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }) + await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }, options) return } @@ -323,19 +326,19 @@ class CircuitRelayServer extends TypedEventEmitter implements dstPeer = peerIdFromMultihash(Digest.decode(request.peer.id)) } catch (err) { this.log.error('invalid hop connect request via peer %p %s', connection.remotePeer, err) - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.MALFORMED_MESSAGE }) + await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.MALFORMED_MESSAGE }, options) return } if (!this.reservationStore.hasReservation(dstPeer)) { this.log.error('hop connect denied for destination peer %p not having a reservation for %p with status %s', dstPeer, connection.remotePeer, Status.NO_RESERVATION) - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.NO_RESERVATION }) + await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.NO_RESERVATION }, options) return } if ((await this.connectionGater.denyOutboundRelayedConnection?.(connection.remotePeer, dstPeer)) === true) { this.log.error('hop connect for %p to %p denied by connection gater', connection.remotePeer, dstPeer) - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }) + await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }, options) return } @@ -343,7 +346,7 @@ class CircuitRelayServer extends TypedEventEmitter implements if (connections.length === 0) { this.log('hop connect denied for destination peer %p not having a connection for %p as there is no destination connection', dstPeer, connection.remotePeer) - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.NO_RESERVATION }) + await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.NO_RESERVATION }, options) return } @@ -360,11 +363,11 @@ class CircuitRelayServer extends TypedEventEmitter implements }, limit } - }) + }, options) if (destinationStream == null) { this.log.error('failed to open stream to destination peer %p', destinationConnection?.remotePeer) - await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.CONNECTION_FAILED }) + await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.CONNECTION_FAILED }, options) return } @@ -372,7 +375,7 @@ class CircuitRelayServer extends TypedEventEmitter implements type: HopMessage.Type.STATUS, status: Status.OK, limit - }) + }, options) const sourceStream = stream.unwrap() this.log('connection from %p to %p established - merging streams', connection.remotePeer, dstPeer) @@ -385,29 +388,27 @@ class CircuitRelayServer extends TypedEventEmitter implements /** * Send a STOP request to the target peer that the dialing peer wants to contact */ - async stopHop ({ - connection, - request - }: StopOptions): Promise { + async stopHop ({ connection, request }: StopOptions, options: AbortOptions): Promise { this.log('starting circuit relay v2 stop request to %s', connection.remotePeer) const stream = await connection.newStream([RELAY_V2_STOP_CODEC], { maxOutboundStreams: this.maxOutboundStopStreams, - runOnLimitedConnection: true + runOnLimitedConnection: true, + ...options }) const pbstr = pbStream(stream) const stopstr = pbstr.pb(StopMessage) - await stopstr.write(request) + await stopstr.write(request, options) let response try { - response = await stopstr.read() + response = await stopstr.read(options) } catch (err) { this.log.error('error parsing stop message response from %p', connection.remotePeer) } if (response == null) { this.log.error('could not read response from %p', connection.remotePeer) - await stream.close() + await stream.close(options) return } @@ -417,7 +418,7 @@ class CircuitRelayServer extends TypedEventEmitter implements } this.log('stop request failed with code %d', response.status) - await stream.close() + await stream.close(options) } get reservations (): PeerMap { diff --git a/packages/transport-circuit-relay-v2/src/transport/discovery.ts b/packages/transport-circuit-relay-v2/src/transport/discovery.ts index 71ac88eb08..091954c9bc 100644 --- a/packages/transport-circuit-relay-v2/src/transport/discovery.ts +++ b/packages/transport-circuit-relay-v2/src/transport/discovery.ts @@ -66,7 +66,7 @@ export class RelayDiscovery extends TypedEventEmitter impl this.topologyId = await this.registrar.register(RELAY_V2_HOP_CODEC, { filter: this.filter, onConnect: (peerId) => { - this.log('discovered relay %p', peerId) + this.log.trace('discovered relay %p', peerId) this.safeDispatchEvent('relay:discover', { detail: peerId }) } }) diff --git a/packages/transport-circuit-relay-v2/src/transport/listener.ts b/packages/transport-circuit-relay-v2/src/transport/listener.ts index 239345370f..7164aacaad 100644 --- a/packages/transport-circuit-relay-v2/src/transport/listener.ts +++ b/packages/transport-circuit-relay-v2/src/transport/listener.ts @@ -14,7 +14,7 @@ export interface CircuitRelayTransportListenerComponents { class CircuitRelayTransportListener extends TypedEventEmitter implements Listener { private readonly connectionManager: ConnectionManager - private readonly relayStore: ReservationStore + private readonly reservationStore: ReservationStore private readonly listeningAddrs: PeerMap private readonly log: Logger @@ -23,15 +23,26 @@ class CircuitRelayTransportListener extends TypedEventEmitter im this.log = components.logger.forComponent('libp2p:circuit-relay:transport:listener') this.connectionManager = components.connectionManager - this.relayStore = components.relayStore + this.reservationStore = components.relayStore this.listeningAddrs = new PeerMap() // remove listening addrs when a relay is removed - this.relayStore.addEventListener('relay:removed', this._onRemoveRelayPeer) + this.reservationStore.addEventListener('relay:removed', this._onRemoveRelayPeer) } _onRemoveRelayPeer = (evt: CustomEvent): void => { - this.#removeRelayPeer(evt.detail) + const had = this.listeningAddrs.has(evt.detail) + + this.log('relay peer removed %p - had reservation', evt.detail, had) + + if (!had) { + return + } + + this.listeningAddrs.delete(evt.detail) + + // announce listen addresses change + this.safeDispatchEvent('listening') } async listen (addr: Multiaddr): Promise { @@ -41,14 +52,14 @@ class CircuitRelayTransportListener extends TypedEventEmitter im const relayAddr = addr.decapsulate('/p2p-circuit') const relayConn = await this.connectionManager.openConnection(relayAddr) - if (!this.relayStore.hasReservation(relayConn.remotePeer)) { + if (!this.reservationStore.hasReservation(relayConn.remotePeer)) { this.log('making reservation on peer %p', relayConn.remotePeer) // addRelay calls transportManager.listen which calls this listen method - await this.relayStore.addRelay(relayConn.remotePeer, 'configured') + await this.reservationStore.addRelay(relayConn.remotePeer, 'configured') return } - const reservation = this.relayStore.getReservation(relayConn.remotePeer) + const reservation = this.reservationStore.getReservation(relayConn.remotePeer) if (reservation == null) { throw new ListenError('Did not have reservation after making reservation') @@ -60,11 +71,11 @@ class CircuitRelayTransportListener extends TypedEventEmitter im } // add all addresses from the relay reservation - this.listeningAddrs.set(relayConn.remotePeer, reservation.addrs.map(buf => { - return multiaddr(buf).encapsulate('/p2p-circuit') - })) + this.listeningAddrs.set(relayConn.remotePeer, reservation.addrs + .map(buf => multiaddr(buf).encapsulate('/p2p-circuit')) + ) - this.safeDispatchEvent('listening', {}) + this.safeDispatchEvent('listening') } getAddrs (): Multiaddr[] { @@ -72,22 +83,14 @@ class CircuitRelayTransportListener extends TypedEventEmitter im } async close (): Promise { + await this.reservationStore.cancelReservations() + this.listeningAddrs.clear() - } - - #removeRelayPeer (peerId: PeerId): void { - const had = this.listeningAddrs.has(peerId) - - this.log('relay peer removed %p - had reservation', peerId, had) + // remove listener + this.reservationStore.removeEventListener('relay:removed', this._onRemoveRelayPeer) - this.listeningAddrs.delete(peerId) - - if (had) { - this.log.trace('removing relay event listener for peer %p', peerId) - this.relayStore.removeEventListener('relay:removed', this._onRemoveRelayPeer) - // Announce listen addresses change - this.safeDispatchEvent('close', {}) - } + // announce listen addresses change + this.safeDispatchEvent('close') } } diff --git a/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts b/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts index e67b96ebe1..3252a892a0 100644 --- a/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts +++ b/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts @@ -4,7 +4,6 @@ import { createBloomFilter } from '@libp2p/utils/filters' import { PeerQueue } from '@libp2p/utils/peer-queue' import { multiaddr } from '@multiformats/multiaddr' import { pbStream } from 'it-protobuf-stream' -import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { DEFAULT_MAX_RESERVATION_QUEUE_LENGTH, DEFAULT_RESERVATION_COMPLETION_TIMEOUT, DEFAULT_RESERVATION_CONCURRENCY, KEEP_ALIVE_TAG, RELAY_TAG, RELAY_V2_HOP_CODEC } from '../constants.js' import { HopMessage, Status } from '../pb/index.js' import { getExpirationMilliseconds } from '../utils.js' @@ -71,6 +70,11 @@ interface RelayEntry { timeout: ReturnType type: RelayType reservation: Reservation + + /** + * Stores the id of the connection we have to the relay + */ + connection: string } export interface ReservationStoreEvents { @@ -117,11 +121,21 @@ export class ReservationStore extends TypedEventEmitter metrics: components.metrics }) - // When a peer disconnects, if we had a reservation on that peer - // remove the reservation and multiaddr and maybe trigger search - // for new relays - this.events.addEventListener('peer:reconnect-failure', (evt) => { - this.#removeRelay(evt.detail) + // reservations are only valid while we are still connected to the relay. + // if we had a reservation opened via that connection, remove it and maybe + // trigger a search for new relays + this.events.addEventListener('connection:close', (evt) => { + const reservation = [...this.reservations.values()] + .find(reservation => reservation.connection === evt.detail.id) + + if (reservation == null) { + return + } + + this.#removeReservation(evt.detail.remotePeer, reservation) + .catch(err => { + this.log('could not remove relay %p - %e', evt.detail, err) + }) }) } @@ -184,26 +198,26 @@ export class ReservationStore extends TypedEventEmitter */ async addRelay (peerId: PeerId, type: RelayType): Promise { if (this.peerId.equals(peerId)) { - this.log('not trying to use self as relay') + this.log.trace('not trying to use self as relay') return } if (this.reserveQueue.size > this.maxReservationQueueLength) { - this.log('not adding potential relay peer %p as the queue is full', peerId) + this.log.trace('not adding potential relay peer %p as the queue is full', peerId) return } if (this.reserveQueue.has(peerId)) { - this.log('potential relay peer %p is already in the reservation queue', peerId) + this.log.trace('potential relay peer %p is already in the reservation queue', peerId) return } if (this.relayFilter.has(peerId.toMultihash().bytes)) { - this.log('potential relay peer %p has failed previously, not trying again', peerId) + this.log.trace('potential relay peer %p has failed previously, not trying again', peerId) return } - this.log('try to reserve relay slot with %p', peerId) + this.log.trace('try to reserve relay slot with %p', peerId) await this.reserveQueue.add(async () => { const start = Date.now() @@ -213,13 +227,13 @@ export class ReservationStore extends TypedEventEmitter const existingReservation = this.reservations.get(peerId) if (existingReservation != null) { - if (getExpirationMilliseconds(existingReservation.reservation.expire) > REFRESH_WINDOW) { - this.log('already have reservation on relay peer %p and it expires in more than 10 minutes', peerId) + if (this.connectionManager.getConnections(peerId).map(conn => conn.id).includes(existingReservation.connection) && getExpirationMilliseconds(existingReservation.reservation.expire) > REFRESH_WINDOW) { + this.log('already have relay reservation with %p but we are still connected and it does not expire soon', peerId) return } - clearTimeout(existingReservation.timeout) - this.reservations.delete(peerId) + this.log('already have relay reservation with %p but the original connection is no longer open', peerId) + await this.#removeReservation(peerId, existingReservation) } if (type === 'discovered' && [...this.reservations.values()].reduce((acc, curr) => { @@ -229,7 +243,7 @@ export class ReservationStore extends TypedEventEmitter return acc }, 0) >= this.maxDiscoveredRelays) { - this.log('already have enough discovered relays') + this.log.trace('already have enough discovered relays') return } @@ -267,7 +281,8 @@ export class ReservationStore extends TypedEventEmitter this.reservations.set(peerId, { timeout, reservation, - type + type, + connection: connection.id }) // ensure we don't close the connection to the relay @@ -323,6 +338,14 @@ export class ReservationStore extends TypedEventEmitter return this.reservations.size } + async cancelReservations (): Promise { + await Promise.all( + [...this.reservations.entries()].map(async ([peerId, reservation]) => { + await this.#removeReservation(peerId, reservation) + }) + ) + } + async #createReservation (connection: Connection, options: AbortOptions): Promise { options.signal?.throwIfAborted() @@ -345,24 +368,31 @@ export class ReservationStore extends TypedEventEmitter } } - if (response.status === Status.OK && (response.reservation != null)) { + if (response.status === Status.OK && response.reservation != null) { // check that the returned relay has the relay address - this can be // omitted when requesting a reservation from a go-libp2p relay we // already have a reservation on - let hasRelayAddress = false - const relayAddressBytes = connection.remoteAddr.bytes + const addresses = new Set() + addresses.add(connection.remoteAddr.toString()) for (const buf of response.reservation.addrs) { - if (uint8ArrayEquals(relayAddressBytes, buf)) { - hasRelayAddress = true - break + let ma = multiaddr(buf) + + if (ma.getPeerId() == null) { + ma = ma.encapsulate(`/p2p/${connection.remotePeer}`) } - } - if (!hasRelayAddress) { - response.reservation.addrs.push(relayAddressBytes) + // TODO: workaround for https://github.com/libp2p/go-libp2p/issues/3003 + ma = multiaddr(ma.toString().replace( + `/p2p/${connection.remotePeer}/p2p/${connection.remotePeer}`, + `/p2p/${connection.remotePeer}` + )) + + addresses.add(ma.toString()) } + response.reservation.addrs = [...addresses].map(str => multiaddr(str).bytes) + return response.reservation } @@ -375,35 +405,24 @@ export class ReservationStore extends TypedEventEmitter /** * Remove listen relay */ - #removeRelay (peerId: PeerId): void { - const existingReservation = this.reservations.get(peerId) - - if (existingReservation == null) { - return - } - - this.log('connection to relay %p closed, removing reservation from local store', peerId) - - clearTimeout(existingReservation.timeout) + async #removeReservation (peerId: PeerId, reservation: RelayEntry): Promise { + this.log('removing relay reservation with %p from local store', peerId) + clearTimeout(reservation.timeout) this.reservations.delete(peerId) - // ensure we don't close the connection to the relay - this.peerStore.merge(peerId, { + // untag the relay + await this.peerStore.merge(peerId, { tags: { [RELAY_TAG]: undefined, [KEEP_ALIVE_TAG]: undefined } }) - .then(() => { - this.safeDispatchEvent('relay:removed', { detail: peerId }) - if (this.reservations.size < this.maxDiscoveredRelays) { - this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) - this.safeDispatchEvent('relay:not-enough-relays', {}) - } - }) - .catch(err => { - this.log('could not update tags for relay %p - %e', peerId, err) - }) + this.safeDispatchEvent('relay:removed', { detail: peerId }) + + if (this.reservations.size < this.maxDiscoveredRelays) { + this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) + this.safeDispatchEvent('relay:not-enough-relays', {}) + } } } diff --git a/packages/transport-circuit-relay-v2/test/hop.spec.ts b/packages/transport-circuit-relay-v2/test/hop.spec.ts index 2a7f4e7685..f0f275da00 100644 --- a/packages/transport-circuit-relay-v2/test/hop.spec.ts +++ b/packages/transport-circuit-relay-v2/test/hop.spec.ts @@ -12,7 +12,7 @@ import { expect } from 'aegir/chai' import { type MessageStream, pbStream } from 'it-protobuf-stream' import Sinon from 'sinon' import { type StubbedInstance, stubInterface } from 'sinon-ts' -import { DEFAULT_MAX_RESERVATION_STORE_SIZE, RELAY_SOURCE_TAG, RELAY_V2_HOP_CODEC } from '../src/constants.js' +import { DEFAULT_MAX_RESERVATION_STORE_SIZE, KEEP_ALIVE_SOURCE_TAG, RELAY_SOURCE_TAG, RELAY_V2_HOP_CODEC } from '../src/constants.js' import { circuitRelayServer, type CircuitRelayService, circuitRelayTransport } from '../src/index.js' import { HopMessage, Status } from '../src/pb/index.js' import type { CircuitRelayServerInit } from '../src/server/index.js' @@ -296,6 +296,10 @@ describe('circuit-relay hop protocol', function () { [RELAY_SOURCE_TAG]: { value: 1, ttl: Sinon.match.number as unknown as number + }, + [KEEP_ALIVE_SOURCE_TAG]: { + value: 1, + ttl: Sinon.match.number as unknown as number } } })).to.be.true()