From 4667e81182cce3d559d3d784a289309311129376 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Thu, 8 Aug 2024 09:27:52 +0200 Subject: [PATCH 1/4] updated TestRTEvictionOnFailedQuery --- query_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/query_test.go b/query_test.go index 99c10d96..ee4f8e1c 100644 --- a/query_test.go +++ b/query_test.go @@ -34,18 +34,18 @@ func TestRTEvictionOnFailedQuery(t *testing.T) { // peers should be in the RT because of fixLowPeers require.NoError(t, tu.WaitFor(ctx, func() error { if !checkRoutingTable(d1, d2) { - return fmt.Errorf("should have routes 0") + return fmt.Errorf("should have routes") } return nil })) - // close both hosts so query fails - require.NoError(t, d1.host.Close()) - require.NoError(t, d2.host.Close()) - // peers will still be in the RT because we have decoupled membership from connectivity + // clear the addresses of the peers so that the next queries fail + d1.host.Peerstore().ClearAddrs(d2.self) + d2.host.Peerstore().ClearAddrs(d1.self) + // peers will still be in the RT because RT is decoupled with the host and peerstore require.NoError(t, tu.WaitFor(ctx, func() error { if !checkRoutingTable(d1, d2) { - return fmt.Errorf("should have routes 1") + return fmt.Errorf("should have routes") } return nil })) @@ -59,7 +59,7 @@ func TestRTEvictionOnFailedQuery(t *testing.T) { require.NoError(t, tu.WaitFor(ctx, func() error { if checkRoutingTable(d1, d2) { - return fmt.Errorf("should not have routes 2") + return fmt.Errorf("should not have routes") } return nil })) From 53fb5ad10bd44d4773b3935973126f8d45abfb9d Mon Sep 17 00:00:00 2001 From: Daniel N <2color@users.noreply.github.com> Date: Wed, 14 Aug 2024 12:39:25 +0200 Subject: [PATCH 2/4] fix: treat limited connections as also connectable --- dht.go | 6 ++++-- fullrt/dht.go | 8 +++++--- routing.go | 5 +++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/dht.go b/dht.go index 43223290..bd28a256 100644 --- a/dht.go +++ b/dht.go @@ -737,7 +737,8 @@ func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo { _, span := internal.StartSpan(ctx, "IpfsDHT.FindLocal", trace.WithAttributes(attribute.Stringer("PeerID", id))) defer span.End() - if dht.host.Network().Connectedness(id) == network.Connected { + connectedness := dht.host.Network().Connectedness(id) + if connectedness == network.Connected || connectedness == network.Limited { return dht.peerstore.PeerInfo(id) } return peer.AddrInfo{} @@ -926,7 +927,8 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { // Don't add addresses for self or our connected peers. We have better ones. - if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected { + connectedness := dht.host.Network().Connectedness(p) + if p == dht.self || connectedness == network.Connected || connectedness == network.Limited { return } dht.peerstore.AddAddrs(p, dht.filterAddrs(addrs), ttl) diff --git a/fullrt/dht.go b/fullrt/dht.go index 99cf188d..91e519ae 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1460,7 +1460,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, // Return peer information if we tried to dial the peer during the query or we are (or recently were) connected // to the peer. connectedness := dht.h.Network().Connectedness(id) - if connectedness == network.Connected { + if connectedness == network.Connected || connectedness == network.Limited { return dht.h.Peerstore().PeerInfo(id), nil } @@ -1538,7 +1538,8 @@ func (dht *FullRT) getRecordFromDatastore(ctx context.Context, dskey ds.Key) (*r // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. func (dht *FullRT) FindLocal(id peer.ID) peer.AddrInfo { - if dht.h.Network().Connectedness(id) == network.Connected { + connectedness := dht.h.Network().Connectedness(id) + if connectedness == network.Connected || connectedness == network.Limited { return dht.h.Peerstore().PeerInfo(id) } return peer.AddrInfo{} @@ -1546,7 +1547,8 @@ func (dht *FullRT) FindLocal(id peer.ID) peer.AddrInfo { func (dht *FullRT) maybeAddAddrs(p peer.ID, addrs []multiaddr.Multiaddr, ttl time.Duration) { // Don't add addresses for self or our connected peers. We have better ones. - if p == dht.h.ID() || dht.h.Network().Connectedness(p) == network.Connected { + connectedness := dht.h.Network().Connectedness(p) + if p == dht.h.ID() || connectedness == network.Connected || connectedness == network.Limited { return } dht.h.Peerstore().AddAddrs(p, addrs, ttl) diff --git a/routing.go b/routing.go index 890cec82..83f09535 100644 --- a/routing.go +++ b/routing.go @@ -664,7 +664,8 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, return peers, err }, func(*qpeerset.QueryPeerset) bool { - return dht.host.Network().Connectedness(id) == network.Connected + connectedness := dht.host.Network().Connectedness(id) + return connectedness == network.Connected || connectedness == network.Limited }, ) @@ -686,7 +687,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, // Return peer information if we tried to dial the peer during the query or we are (or recently were) connected // to the peer. connectedness := dht.host.Network().Connectedness(id) - if dialedPeerDuringQuery || connectedness == network.Connected { + if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.Limited { return dht.peerstore.PeerInfo(id), nil } From be9a8e7afbb6fe7ee6afa3361ffd0975a347a2c8 Mon Sep 17 00:00:00 2001 From: Daniel N <2color@users.noreply.github.com> Date: Wed, 14 Aug 2024 14:29:47 +0200 Subject: [PATCH 3/4] fix: check connectedness in separate function --- dht.go | 6 ++---- dht_filters.go | 5 +++++ fullrt/dht.go | 9 +++------ query.go | 3 +-- routing.go | 7 ++----- 5 files changed, 13 insertions(+), 17 deletions(-) diff --git a/dht.go b/dht.go index bd28a256..824c17e5 100644 --- a/dht.go +++ b/dht.go @@ -737,8 +737,7 @@ func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo { _, span := internal.StartSpan(ctx, "IpfsDHT.FindLocal", trace.WithAttributes(attribute.Stringer("PeerID", id))) defer span.End() - connectedness := dht.host.Network().Connectedness(id) - if connectedness == network.Connected || connectedness == network.Limited { + if HasValidConnectedness(dht.host, id) { return dht.peerstore.PeerInfo(id) } return peer.AddrInfo{} @@ -927,8 +926,7 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { // Don't add addresses for self or our connected peers. We have better ones. - connectedness := dht.host.Network().Connectedness(p) - if p == dht.self || connectedness == network.Connected || connectedness == network.Limited { + if p == dht.self || HasValidConnectedness(dht.host, p) { return } dht.peerstore.AddAddrs(p, dht.filterAddrs(addrs), ttl) diff --git a/dht_filters.go b/dht_filters.go index 27e9e8d9..c56c3ca1 100644 --- a/dht_filters.go +++ b/dht_filters.go @@ -238,3 +238,8 @@ func inAddrRange(ip net.IP, ipnets []*net.IPNet) bool { return false } + +func HasValidConnectedness(host host.Host, id peer.ID) bool { + connectedness := host.Network().Connectedness(id) + return connectedness == network.Connected || connectedness == network.Limited +} diff --git a/fullrt/dht.go b/fullrt/dht.go index 91e519ae..3383b27e 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -1459,8 +1459,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, // Return peer information if we tried to dial the peer during the query or we are (or recently were) connected // to the peer. - connectedness := dht.h.Network().Connectedness(id) - if connectedness == network.Connected || connectedness == network.Limited { + if kaddht.HasValidConnectedness(dht.h, id) { return dht.h.Peerstore().PeerInfo(id), nil } @@ -1538,8 +1537,7 @@ func (dht *FullRT) getRecordFromDatastore(ctx context.Context, dskey ds.Key) (*r // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. func (dht *FullRT) FindLocal(id peer.ID) peer.AddrInfo { - connectedness := dht.h.Network().Connectedness(id) - if connectedness == network.Connected || connectedness == network.Limited { + if kaddht.HasValidConnectedness(dht.h, id) { return dht.h.Peerstore().PeerInfo(id) } return peer.AddrInfo{} @@ -1547,8 +1545,7 @@ func (dht *FullRT) FindLocal(id peer.ID) peer.AddrInfo { func (dht *FullRT) maybeAddAddrs(p peer.ID, addrs []multiaddr.Multiaddr, ttl time.Duration) { // Don't add addresses for self or our connected peers. We have better ones. - connectedness := dht.h.Network().Connectedness(p) - if p == dht.h.ID() || connectedness == network.Connected || connectedness == network.Limited { + if p == dht.h.ID() || kaddht.HasValidConnectedness(dht.h, p) { return } dht.h.Peerstore().AddAddrs(p, addrs, ttl) diff --git a/query.go b/query.go index 7c01a2af..f7e10e9b 100644 --- a/query.go +++ b/query.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" pstore "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" @@ -530,7 +529,7 @@ func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error { defer span.End() // short-circuit if we're already connected. - if dht.host.Network().Connectedness(p) == network.Connected { + if HasValidConnectedness(dht.host, p) { return nil } diff --git a/routing.go b/routing.go index 83f09535..bba9bdb1 100644 --- a/routing.go +++ b/routing.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" @@ -664,8 +663,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, return peers, err }, func(*qpeerset.QueryPeerset) bool { - connectedness := dht.host.Network().Connectedness(id) - return connectedness == network.Connected || connectedness == network.Limited + return HasValidConnectedness(dht.host, id) }, ) @@ -686,8 +684,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, // Return peer information if we tried to dial the peer during the query or we are (or recently were) connected // to the peer. - connectedness := dht.host.Network().Connectedness(id) - if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.Limited { + if dialedPeerDuringQuery || HasValidConnectedness(dht.host, id) { return dht.peerstore.PeerInfo(id), nil } From 7004c773fd1248c28662ad5247ebbfe38d6c2f97 Mon Sep 17 00:00:00 2001 From: Daniel N <2color@users.noreply.github.com> Date: Mon, 19 Aug 2024 11:17:27 +0200 Subject: [PATCH 4/4] fix: use a non-limited connection for querying --- query.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/query.go b/query.go index f7e10e9b..7c01a2af 100644 --- a/query.go +++ b/query.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" pstore "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" @@ -529,7 +530,7 @@ func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error { defer span.End() // short-circuit if we're already connected. - if HasValidConnectedness(dht.host, p) { + if dht.host.Network().Connectedness(p) == network.Connected { return nil }