From eb6d2f78c91f152b5f8d6b086e25248f7cb1f091 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Fri, 3 Feb 2023 13:12:22 +0100 Subject: [PATCH 1/5] feat: ping peers on routing table refresh We have seen in the past that there are peers in the IPFS DHT that let you connect to them but then refuse to speak any protocol. This was mainly due to resource manager killing the connection if limits were exceeded. We have seen that such peers are pushed to the edge of the DHT - meaning, they get already pruned from lower buckets. However, they won't get pruned from higher ones, because we only try to connect to them and not speak anything on that connection. This change adds a ping message to the liveness check on routing table refreshes. --- dht.go | 5 +++++ rtrefresh/rt_refresh_manager.go | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dht.go b/dht.go index 924c9bb6a..1bb75dfb0 100644 --- a/dht.go +++ b/dht.go @@ -365,10 +365,15 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb return err } + pingFnc := func(ctx context.Context, p peer.ID) error { + return dht.protoMessenger.Ping(ctx, p) + } + r, err := rtrefresh.NewRtRefreshManager( dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh, keyGenFnc, queryFnc, + pingFnc, cfg.RoutingTable.RefreshQueryTimeout, cfg.RoutingTable.RefreshInterval, maxLastSuccessfulOutboundThreshold, diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go index cb0eefc25..ff8f27e8d 100644 --- a/rtrefresh/rt_refresh_manager.go +++ b/rtrefresh/rt_refresh_manager.go @@ -41,6 +41,7 @@ type RtRefreshManager struct { enableAutoRefresh bool // should run periodic refreshes ? refreshKeyGenFnc func(cpl uint) (string, error) // generate the key for the query to refresh this cpl refreshQueryFnc func(ctx context.Context, key string) error // query to run for a refresh. + refreshPingFnc func(ctx context.Context, p peer.ID) error // request to check liveness of remote peer refreshQueryTimeout time.Duration // timeout for one refresh query // interval between two periodic refreshes. @@ -57,6 +58,7 @@ type RtRefreshManager struct { func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool, refreshKeyGenFnc func(cpl uint) (string, error), refreshQueryFnc func(ctx context.Context, key string) error, + refreshPingFnc func(ctx context.Context, p peer.ID) error, refreshQueryTimeout time.Duration, refreshInterval time.Duration, successfulOutboundQueryGracePeriod time.Duration, @@ -73,6 +75,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool enableAutoRefresh: autoRefresh, refreshKeyGenFnc: refreshKeyGenFnc, refreshQueryFnc: refreshQueryFnc, + refreshPingFnc: refreshPingFnc, refreshQueryTimeout: refreshQueryTimeout, refreshInterval: refreshInterval, @@ -178,12 +181,21 @@ func (r *RtRefreshManager) loop() { wg.Add(1) go func(ps kbucket.PeerInfo) { defer wg.Done() + livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout) + defer cancel() + if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { + logger.Debugw("evicting peer after failed connection", "peer", ps.Id, "error", err) + r.rt.RemovePeer(ps.Id) + return + } + + if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil { logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) r.rt.RemovePeer(ps.Id) + return } - cancel() }(ps) } } From 16823c3a5818ce23c577b2b0c889cd91857f9366 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Sun, 5 Feb 2023 11:08:34 +0100 Subject: [PATCH 2/5] use ping protocol instead of DHT ping --- dht.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/dht.go b/dht.go index 1bb75dfb0..fed07a5b9 100644 --- a/dht.go +++ b/dht.go @@ -26,6 +26,7 @@ import ( "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" @@ -366,7 +367,19 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb } pingFnc := func(ctx context.Context, p peer.ID) error { - return dht.protoMessenger.Ping(ctx, p) + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + // Just wait for a single ping + select { + case res, more := <-ping.Ping(timeoutCtx, dht.host, p): + if !more { + return timeoutCtx.Err() + } + return res.Error + case <-timeoutCtx.Done(): + return timeoutCtx.Err() + } } r, err := rtrefresh.NewRtRefreshManager( From d5ea78fdc990550b4f8ee9a5344649e5a71cfa1a Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Sun, 5 Feb 2023 11:13:42 +0100 Subject: [PATCH 3/5] extract global ping timeout constant --- dht.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dht.go b/dht.go index fed07a5b9..ae07fa700 100644 --- a/dht.go +++ b/dht.go @@ -69,6 +69,11 @@ const ( protectedBuckets = 2 ) +const ( + // MAGIC: timeout for receiving a pong from a peer in our routing table + pingTimeout = 5 * time.Second +) + type addPeerRTReq struct { p peer.ID queryPeer bool @@ -367,7 +372,7 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb } pingFnc := func(ctx context.Context, p peer.ID) error { - timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + timeoutCtx, cancel := context.WithTimeout(ctx, pingTimeout) defer cancel() // Just wait for a single ping From 5e68507d821a427fde7d8dfd1baa1cfb982728c2 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 7 Feb 2023 18:08:52 +0100 Subject: [PATCH 4/5] Revert "use ping protocol instead of DHT ping" This reverts commit 16823c3a5818ce23c577b2b0c889cd91857f9366. --- dht.go | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/dht.go b/dht.go index ae07fa700..1bb75dfb0 100644 --- a/dht.go +++ b/dht.go @@ -26,7 +26,6 @@ import ( "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" @@ -69,11 +68,6 @@ const ( protectedBuckets = 2 ) -const ( - // MAGIC: timeout for receiving a pong from a peer in our routing table - pingTimeout = 5 * time.Second -) - type addPeerRTReq struct { p peer.ID queryPeer bool @@ -372,19 +366,7 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb } pingFnc := func(ctx context.Context, p peer.ID) error { - timeoutCtx, cancel := context.WithTimeout(ctx, pingTimeout) - defer cancel() - - // Just wait for a single ping - select { - case res, more := <-ping.Ping(timeoutCtx, dht.host, p): - if !more { - return timeoutCtx.Err() - } - return res.Error - case <-timeoutCtx.Done(): - return timeoutCtx.Err() - } + return dht.protoMessenger.Ping(ctx, p) } r, err := rtrefresh.NewRtRefreshManager( From 62c832eb2a185d596272f278899380a18f19dc0d Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Thu, 9 Feb 2023 19:32:53 +0100 Subject: [PATCH 5/5] refactor: use the FIND_NODE RPC to ping peer --- dht.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dht.go b/dht.go index 1bb75dfb0..f79acf9ed 100644 --- a/dht.go +++ b/dht.go @@ -366,7 +366,8 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb } pingFnc := func(ctx context.Context, p peer.ID) error { - return dht.protoMessenger.Ping(ctx, p) + _, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) // don't use the PING message type as it's deprecated + return err } r, err := rtrefresh.NewRtRefreshManager(