Skip to content

Commit

Permalink
feat: ping peers on routing table refresh
Browse files Browse the repository at this point in the history
We have seen in the past that there are peers in the IPFS DHT that let you connect to them but then refuse to speak any protocol. This was mainly due to resource manager killing the connection if limits were exceeded. We have seen that such peers are pushed to the edge of the DHT - meaning, they get already pruned from lower buckets. However, they won't get pruned from higher ones, because we only try to connect to them and not speak anything on that connection.

This change adds a ping message to the liveness check on routing table refreshes.
  • Loading branch information
dennis-tra committed Feb 3, 2023
1 parent dae5a9a commit eb6d2f7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
5 changes: 5 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,15 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
return err
}

pingFnc := func(ctx context.Context, p peer.ID) error {
return dht.protoMessenger.Ping(ctx, p)
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
pingFnc,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
Expand Down
14 changes: 13 additions & 1 deletion rtrefresh/rt_refresh_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type RtRefreshManager struct {
enableAutoRefresh bool // should run periodic refreshes ?
refreshKeyGenFnc func(cpl uint) (string, error) // generate the key for the query to refresh this cpl
refreshQueryFnc func(ctx context.Context, key string) error // query to run for a refresh.
refreshPingFnc func(ctx context.Context, p peer.ID) error // request to check liveness of remote peer
refreshQueryTimeout time.Duration // timeout for one refresh query

// interval between two periodic refreshes.
Expand All @@ -57,6 +58,7 @@ type RtRefreshManager struct {
func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
refreshKeyGenFnc func(cpl uint) (string, error),
refreshQueryFnc func(ctx context.Context, key string) error,
refreshPingFnc func(ctx context.Context, p peer.ID) error,
refreshQueryTimeout time.Duration,
refreshInterval time.Duration,
successfulOutboundQueryGracePeriod time.Duration,
Expand All @@ -73,6 +75,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool
enableAutoRefresh: autoRefresh,
refreshKeyGenFnc: refreshKeyGenFnc,
refreshQueryFnc: refreshQueryFnc,
refreshPingFnc: refreshPingFnc,

refreshQueryTimeout: refreshQueryTimeout,
refreshInterval: refreshInterval,
Expand Down Expand Up @@ -178,12 +181,21 @@ func (r *RtRefreshManager) loop() {
wg.Add(1)
go func(ps kbucket.PeerInfo) {
defer wg.Done()

livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout)
defer cancel()

if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed connection", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
return
}

if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
return
}
cancel()
}(ps)
}
}
Expand Down

0 comments on commit eb6d2f7

Please sign in to comment.