Skip to content

Commit

Permalink
fix: use keep-alive as a tag prefix (#2757)
Browse files Browse the repository at this point in the history
To prevent components removing each other's keep-alive tag, treat it as a prefix.
  • Loading branch information
achingbrain authored Oct 9, 2024
1 parent e99e8f4 commit 29b47ad
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 15 deletions.
8 changes: 8 additions & 0 deletions packages/interface/src/peer-store/tags.ts
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
/**
* When a peer that is tagged with this prefix disconnects, we will attempt to
* redial it, up to a limit.
*
* To allow multiple components to add/remove their own keep-alive tags without
* accidentally overwriting those of other components, attach a unique suffix to
* the tag, e.g. `keep-alive-circuit-relay` or `keep-alive-kad-dht`, etc.
*/
export const KEEP_ALIVE = 'keep-alive'
5 changes: 5 additions & 0 deletions packages/kad-dht/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'

import { KEEP_ALIVE } from '@libp2p/interface'

export const second = 1000
export const minute = 60 * second
export const hour = 60 * minute
Expand Down Expand Up @@ -51,3 +53,6 @@ export const TABLE_REFRESH_QUERY_TIMEOUT = 30 * second

// When a timeout is not specified, run a query for this long
export const DEFAULT_QUERY_TIMEOUT = 180 * second

// used to ensure connections to our closest peers remain open
export const KEEP_ALIVE_TAG = `${KEEP_ALIVE}-kad-dht`
6 changes: 3 additions & 3 deletions packages/kad-dht/src/routing-table/closest-peers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { KEEP_ALIVE } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import { KEEP_ALIVE_TAG } from '../constants.js'
import { PeerDistanceList } from '../peer-distance-list.js'
import { convertPeerId } from '../utils.js'
import type { RoutingTable } from './index.js'
Expand Down Expand Up @@ -94,7 +94,7 @@ export class ClosestPeers implements Startable {
[this.closeTagName]: {
value: this.closeTagValue
},
[KEEP_ALIVE]: {
[KEEP_ALIVE_TAG]: {
value: 1
}
}
Expand All @@ -104,7 +104,7 @@ export class ClosestPeers implements Startable {
await this.components.peerStore.merge(peerId, {
tags: {
[this.closeTagName]: undefined,
[KEEP_ALIVE]: undefined
[KEEP_ALIVE_TAG]: undefined
}
})
})
Expand Down
7 changes: 4 additions & 3 deletions packages/kad-dht/test/closest-peers.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { generateKeyPair } from '@libp2p/crypto/keys'
import { KEEP_ALIVE, start, stop } from '@libp2p/interface'
import { start, stop } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { peerIdFromPrivateKey, peerIdFromString } from '@libp2p/peer-id'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { stubInterface } from 'sinon-ts'
import { xor } from 'uint8arrays/xor'
import { xorCompare } from 'uint8arrays/xor-compare'
import { KEEP_ALIVE_TAG } from '../src/constants.js'
import { ClosestPeers } from '../src/routing-table/closest-peers.js'
import { convertPeerId } from '../src/utils.js'
import type { RoutingTable } from '../src/routing-table/index.js'
Expand Down Expand Up @@ -112,7 +113,7 @@ function assertTagged (peerId: PeerId, peerStore: StubbedInstance<PeerStore>): v
'kad-close': {
value: 50
},
[KEEP_ALIVE]: {
[KEEP_ALIVE_TAG]: {
value: 1
}
}
Expand All @@ -123,7 +124,7 @@ function assertUnTagged (peerId: PeerId, peerStore: StubbedInstance<PeerStore>):
expect(peerStore.merge.calledWith(peerId, {
tags: {
'kad-close': undefined,
[KEEP_ALIVE]: undefined
[KEEP_ALIVE_TAG]: undefined
}
})).to.be.true()
}
37 changes: 31 additions & 6 deletions packages/libp2p/src/connection-manager/reconnect-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class ReconnectQueue implements Startable {

const peer = await this.peerStore.get(peerId)

if (!peer.tags.has(KEEP_ALIVE)) {
if (!hasKeepAliveTag(peer)) {
return
}

Expand Down Expand Up @@ -94,8 +94,23 @@ export class ReconnectQueue implements Startable {
}, {
peerId
})
.catch(err => {
this.log.error('failed to reconnect to %p', peerId, err)
.catch(async err => {
this.log.error('failed to reconnect to %p - %e', peerId, err)

const tags: Record<string, undefined> = {}

;[...peer.tags.keys()].forEach(key => {
if (key.startsWith(KEEP_ALIVE)) {
tags[key] = undefined
}
})

await this.peerStore.merge(peerId, {
tags
})
})
.catch(async err => {
this.log.error('failed to remove keep-alive tag from %p - %e', peerId, err)
})
}

Expand All @@ -108,9 +123,9 @@ export class ReconnectQueue implements Startable {
void Promise.resolve()
.then(async () => {
const keepAlivePeers: Peer[] = await this.peerStore.all({
filters: [(peer) => {
return peer.tags.has(KEEP_ALIVE)
}]
filters: [
(peer) => hasKeepAliveTag(peer)
]
})

await Promise.all(
Expand All @@ -132,3 +147,13 @@ export class ReconnectQueue implements Startable {
this.queue.abort()
}
}

function hasKeepAliveTag (peer: Peer): boolean {
for (const tag of peer.tags.keys()) {
if (tag.startsWith(KEEP_ALIVE)) {
return true
}
}

return false
}
39 changes: 39 additions & 0 deletions packages/libp2p/test/connection-manager/reconnect-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,43 @@ describe('reconnect queue', () => {

expect(components.connectionManager.openConnection.calledWith(nonKeepAlivePeer)).to.be.false()
})

it('should remove KEEP_ALIVE tags when reconnecting fails', async () => {
queue = new ReconnectQueue(components, {
retries: 1,
retryInterval: 10,
backoffFactor: 1
})

const keepAlivePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))

components.peerStore.all.resolves([])
components.peerStore.get.withArgs(keepAlivePeer).resolves(
stubInterface<Peer>({
id: keepAlivePeer,
tags: new Map([[KEEP_ALIVE, {
value: 1
}]])
})
)

await start(queue)

components.connectionManager.openConnection.withArgs(keepAlivePeer).rejects(new Error('Dial failed'))

components.events.safeDispatchEvent('peer:disconnect', new CustomEvent('peer:disconnect', {
detail: keepAlivePeer
}))

await pRetry(() => {
expect(components.peerStore.merge.calledWith(keepAlivePeer, {
tags: {
[KEEP_ALIVE]: undefined
}
})).to.be.true()
}, {
retries: 5,
factor: 1
})
})
})
4 changes: 4 additions & 0 deletions packages/transport-circuit-relay-v2/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { KEEP_ALIVE } from '@libp2p/interface'

const second = 1000
const minute = 60 * second

Expand Down Expand Up @@ -40,6 +42,8 @@ export const RELAY_SOURCE_TAG = 'circuit-relay-source'

export const RELAY_TAG = 'circuit-relay-relay'

export const KEEP_ALIVE_TAG = `${KEEP_ALIVE}-circuit-relay`

// circuit v2 connection limits
// https://github.com/libp2p/go-libp2p/blob/master/p2p/protocol/circuitv2/relay/resources.go#L61-L66

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { KEEP_ALIVE, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { createBloomFilter } from '@libp2p/utils/filters'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { multiaddr } from '@multiformats/multiaddr'
import { pbStream } from 'it-protobuf-stream'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { DEFAULT_MAX_RESERVATION_QUEUE_LENGTH, DEFAULT_RESERVATION_COMPLETION_TIMEOUT, DEFAULT_RESERVATION_CONCURRENCY, RELAY_TAG, RELAY_V2_HOP_CODEC } from '../constants.js'
import { DEFAULT_MAX_RESERVATION_QUEUE_LENGTH, DEFAULT_RESERVATION_COMPLETION_TIMEOUT, DEFAULT_RESERVATION_CONCURRENCY, KEEP_ALIVE_TAG, RELAY_TAG, RELAY_V2_HOP_CODEC } from '../constants.js'
import { HopMessage, Status } from '../pb/index.js'
import { getExpirationMilliseconds } from '../utils.js'
import type { Reservation } from '../pb/index.js'
Expand Down Expand Up @@ -250,7 +250,7 @@ export class ReservationStore extends TypedEventEmitter<ReservationStoreEvents>
value: 1,
ttl: expiration
},
[KEEP_ALIVE]: {
[KEEP_ALIVE_TAG]: {
value: 1,
ttl: expiration
}
Expand Down

0 comments on commit 29b47ad

Please sign in to comment.