diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index 13c381a15b..eb29f1d7f1 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -161,9 +161,10 @@ export interface Libp2pEvents { 'peer:connect': CustomEvent /** - * This event will be triggered any time we are disconnected from another peer, regardless of - * the circumstances of that disconnection. If we happen to have multiple connections to a - * peer, this event will **only** be triggered when the last connection is closed. + * This event will be triggered any time we are disconnected from another + * peer, regardless of the circumstances of that disconnection. If we happen + * to have multiple connections to a peer, this event will **only** be + * triggered when the last connection is closed. * * @example * @@ -177,9 +178,26 @@ export interface Libp2pEvents { 'peer:disconnect': CustomEvent /** - * This event is dispatched after a remote peer has successfully responded to the identify - * protocol. Note that for this to be emitted, both peers must have an identify service - * configured. + * When a peer tagged with `keep-alive` disconnects, we will make multiple + * attempts to reconnect to it with a backoff factor (see the connection + * manager settings for details). If these all fail, the `keep-alive` tag will + * be removed and this event will be emitted. + * + * @example + * + * ```TypeScript + * libp2p.addEventListener('peer:reconnect-failure', (event) => { + * const peerId = event.detail + * // ... + * }) + * ``` + */ + 'peer:reconnect-failure': CustomEvent + + /** + * This event is dispatched after a remote peer has successfully responded to + * the identify protocol. Note that for this to be emitted, both peers must + * have an identify service configured. * * @example * diff --git a/packages/libp2p/src/connection-manager/reconnect-queue.ts b/packages/libp2p/src/connection-manager/reconnect-queue.ts index c3dff8bca5..aa1490a014 100644 --- a/packages/libp2p/src/connection-manager/reconnect-queue.ts +++ b/packages/libp2p/src/connection-manager/reconnect-queue.ts @@ -33,6 +33,7 @@ export class ReconnectQueue implements Startable { private readonly retryInterval?: number private readonly backoffFactor?: number private readonly connectionManager: ConnectionManager + private readonly events: TypedEventTarget constructor (components: ReconnectQueueComponents, init: ReconnectQueueInit = {}) { this.log = components.logger.forComponent('libp2p:reconnect-queue') @@ -47,11 +48,12 @@ export class ReconnectQueue implements Startable { this.retries = init.retries ?? 5 this.backoffFactor = init.backoffFactor this.retryInterval = init.retryInterval + this.events = components.events components.events.addEventListener('peer:disconnect', (evt) => { this.maybeReconnect(evt.detail) .catch(err => { - this.log.error('failed to maybe reconnect to %p', evt.detail, err) + this.log.error('failed to maybe reconnect to %p - %e', evt.detail, err) }) }) } @@ -82,7 +84,7 @@ export class ReconnectQueue implements Startable { signal: options?.signal }) } catch (err) { - this.log('reconnecting to %p attempt %d of %d failed', peerId, attempt, this.retries, err) + this.log('reconnecting to %p attempt %d of %d failed - %e', peerId, attempt, this.retries, err) throw err } }, { @@ -108,6 +110,10 @@ export class ReconnectQueue implements Startable { await this.peerStore.merge(peerId, { tags }) + + this.events.safeDispatchEvent('peer:reconnect-failure', { + detail: peerId + }) }) .catch(async err => { this.log.error('failed to remove keep-alive tag from %p - %e', peerId, err) diff --git a/packages/transport-circuit-relay-v2/src/index.ts b/packages/transport-circuit-relay-v2/src/index.ts index 98d94b0259..4a55cdf2bb 100644 --- a/packages/transport-circuit-relay-v2/src/index.ts +++ b/packages/transport-circuit-relay-v2/src/index.ts @@ -64,10 +64,10 @@ export interface CircuitRelayService extends TypedEventEmitter private readonly log: Logger private readonly relayFilter: Filter - constructor (components: RelayStoreComponents, init?: RelayStoreInit) { + constructor (components: ReservationStoreComponents, init?: ReservationStoreInit) { super() this.log = components.logger.forComponent('libp2p:circuit-relay:transport:reservation-store') @@ -120,7 +120,7 @@ export class ReservationStore extends TypedEventEmitter // When a peer disconnects, if we had a reservation on that peer // remove the reservation and multiaddr and maybe trigger search // for new relays - this.events.addEventListener('peer:disconnect', (evt) => { + this.events.addEventListener('peer:reconnect-failure', (evt) => { this.#removeRelay(evt.detail) }) } @@ -134,10 +134,37 @@ export class ReservationStore extends TypedEventEmitter } afterStart (): void { - if (this.reservations.size < this.maxDiscoveredRelays) { - this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) - this.safeDispatchEvent('relay:not-enough-relays', {}) - } + // remove old relay tags + void Promise.resolve() + .then(async () => { + const relayPeers: Peer[] = await this.peerStore.all({ + filters: [(peer) => { + return peer.tags.has(RELAY_TAG) + }] + }) + + this.log('removing tag from %d old relays', relayPeers.length) + + // remove old relay tag and redial + await Promise.all( + relayPeers.map(async peer => { + await this.peerStore.merge(peer.id, { + tags: { + [RELAY_TAG]: undefined, + [KEEP_ALIVE_TAG]: undefined + } + }) + }) + ) + + if (this.reservations.size < this.maxDiscoveredRelays) { + this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) + this.safeDispatchEvent('relay:not-enough-relays', {}) + } + }) + .catch(err => { + this.log.error(err) + }) } stop (): void { @@ -360,11 +387,23 @@ export class ReservationStore extends TypedEventEmitter clearTimeout(existingReservation.timeout) this.reservations.delete(peerId) - this.safeDispatchEvent('relay:removed', { detail: peerId }) + // ensure we don't close the connection to the relay + this.peerStore.merge(peerId, { + tags: { + [RELAY_TAG]: undefined, + [KEEP_ALIVE_TAG]: undefined + } + }) + .then(() => { + this.safeDispatchEvent('relay:removed', { detail: peerId }) - if (this.reservations.size < this.maxDiscoveredRelays) { - this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) - this.safeDispatchEvent('relay:not-enough-relays', {}) - } + if (this.reservations.size < this.maxDiscoveredRelays) { + this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays) + this.safeDispatchEvent('relay:not-enough-relays', {}) + } + }) + .catch(err => { + this.log('could not update tags for relay %p - %e', peerId, err) + }) } } diff --git a/packages/transport-circuit-relay-v2/test/transport/reservation-store.spec.ts b/packages/transport-circuit-relay-v2/test/transport/reservation-store.spec.ts new file mode 100644 index 0000000000..a6466d868d --- /dev/null +++ b/packages/transport-circuit-relay-v2/test/transport/reservation-store.spec.ts @@ -0,0 +1,64 @@ +import { generateKeyPair } from '@libp2p/crypto/keys' +import { TypedEventEmitter, start } from '@libp2p/interface' +import { defaultLogger } from '@libp2p/logger' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { expect } from 'aegir/chai' +import delay from 'delay' +import { stubInterface } from 'sinon-ts' +import { KEEP_ALIVE_TAG, RELAY_TAG } from '../../src/constants.js' +import { ReservationStore } from '../../src/transport/reservation-store.js' +import type { ComponentLogger, Libp2pEvents, Peer, PeerId, PeerStore, TypedEventTarget } from '@libp2p/interface' +import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal' +import type { StubbedInstance } from 'sinon-ts' + +export interface StubbedReservationStoreComponents { + peerId: PeerId + connectionManager: StubbedInstance + transportManager: StubbedInstance + peerStore: StubbedInstance + events: TypedEventTarget + logger: ComponentLogger +} + +describe('transport reservation-store', () => { + let store: ReservationStore + let components: StubbedReservationStoreComponents + + beforeEach(async () => { + const privateKey = await generateKeyPair('Ed25519') + + components = { + peerId: peerIdFromPrivateKey(privateKey), + connectionManager: stubInterface(), + transportManager: stubInterface(), + peerStore: stubInterface(), + events: new TypedEventEmitter(), + logger: defaultLogger() + } + + store = new ReservationStore(components) + }) + + it('should remove relay tags on start', async () => { + const peer: Peer = { + id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), + addresses: [], + metadata: new Map(), + tags: new Map([[RELAY_TAG, { value: 1 }]]), + protocols: [] + } + + components.peerStore.all.resolves([peer]) + + await start(store) + + await delay(100) + + expect(components.peerStore.merge.calledWith(peer.id, { + tags: { + [RELAY_TAG]: undefined, + [KEEP_ALIVE_TAG]: undefined + } + })).to.be.true() + }) +})