diff --git a/dht.go b/dht.go index f84ccab5d..e95682569 100644 --- a/dht.go +++ b/dht.go @@ -92,6 +92,7 @@ type IpfsDHT struct { bucketSize int alpha int // The concurrency parameter per path + beta int // The number of peers closest to a target that must have responded for a query path to terminate d int // Number of Disjoint Paths to query autoRefresh bool @@ -234,6 +235,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { serverProtocols: serverProtocols, bucketSize: cfg.bucketSize, alpha: cfg.concurrency, + beta: cfg.resiliency, d: cfg.disjointPaths, triggerRtRefresh: make(chan chan<- error), triggerSelfLookup: make(chan chan<- error), diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 2ff90b6ae..e7442ae15 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -8,7 +8,6 @@ import ( multierror "github.com/hashicorp/go-multierror" process "github.com/jbenet/goprocess" processctx "github.com/jbenet/goprocess/context" - "github.com/libp2p/go-libp2p-core/routing" kbucket "github.com/libp2p/go-libp2p-kbucket" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" @@ -58,8 +57,8 @@ func (dht *IpfsDHT) startSelfLookup() error { // Do a self walk queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) - _, err := dht.FindPeer(queryCtx, dht.self) - if err == routing.ErrNotFound || err == kbucket.ErrLookupFailure { + _, err := dht.GetClosestPeers(queryCtx, string(dht.self)) + if err == kbucket.ErrLookupFailure { err = nil } else if err != nil { err = fmt.Errorf("failed to query self during routing table refresh: %s", err) @@ -207,10 +206,7 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error { // walk to the generated peer walkFnc := func(c context.Context) error { - _, err := dht.FindPeer(c, randPeer) - if err == routing.ErrNotFound { - return nil - } + _, err := dht.GetClosestPeers(c, string(randPeer)) return err } diff --git a/dht_options.go b/dht_options.go index 648cb5f67..ece3e54b9 100644 --- a/dht_options.go +++ b/dht_options.go @@ -34,6 +34,7 @@ type config struct { bucketSize int disjointPaths int concurrency int + resiliency int maxRecordAge time.Duration enableProviders bool enableValues bool @@ -87,6 +88,7 @@ var defaults = func(o *config) error { o.bucketSize = defaultBucketSize o.concurrency = 3 + o.resiliency = 3 return nil } @@ -239,6 +241,17 @@ func Concurrency(alpha int) Option { } } +// Resiliency configures the number of peers closest to a target that must have responded in order for a given query +// path to complete. +// +// The default value is 3. +func Resiliency(beta int) Option { + return func(c *config) error { + c.resiliency = beta + return nil + } +} + // DisjointPaths configures the number of disjoint paths (d in the S/Kademlia paper) taken per query. // // The default value is BucketSize/2. diff --git a/dht_test.go b/dht_test.go index 8db21bbaf..b60e8c265 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1467,8 +1467,8 @@ func TestFindClosestPeers(t *testing.T) { out = append(out, p) } - if len(out) != querier.bucketSize { - t.Fatalf("got wrong number of peers (got %d, expected %d)", len(out), querier.bucketSize) + if len(out) < querier.beta { + t.Fatalf("got wrong number of peers (got %d, expected at least %d)", len(out), querier.beta) } } diff --git a/kpeerset/peerheap/heap.go b/kpeerset/peerheap/heap.go deleted file mode 100644 index e42abb405..000000000 --- a/kpeerset/peerheap/heap.go +++ /dev/null @@ -1,110 +0,0 @@ -package peerheap - -import ( - "github.com/libp2p/go-libp2p-core/peer" -) - -// Comparator is the type of a function that compares two peer Heap items to determine the ordering between them. -// It returns true if i1 is "less" than i2 and false otherwise. -type Comparator func(i1 Item, i2 Item) bool - -// Item is one "item" in the Heap. -// It contains the Id of the peer, an arbitrary value associated with the peer -// and the index of the "item" in the Heap. -type Item struct { - Peer peer.ID - Value interface{} - Index int -} - -// Heap implements a heap of peer Items. -// It uses the "compare" member function to compare two peers to determine the order between them. -// If isMaxHeap is set to true, this Heap is a maxHeap, otherwise it's a minHeap. -// -// Note: It is the responsibility of the caller to enforce locking & synchronization. -type Heap struct { - items []*Item - compare Comparator - isMaxHeap bool -} - -// New creates & returns a peer Heap. -func New(isMaxHeap bool, compare Comparator) *Heap { - return &Heap{isMaxHeap: isMaxHeap, compare: compare} -} - -// PeekTop returns a copy of the top/first Item in the heap. -// This would be the "maximum" or the "minimum" peer depending on whether -// the heap is a maxHeap or a minHeap. -// -// A call to PeekTop will panic if the Heap is empty. -func (ph *Heap) PeekTop() Item { - return *ph.items[0] -} - -// FilterItems returns Copies of ALL Items in the Heap that satisfy the given predicate -func (ph *Heap) FilterItems(p func(i Item) bool) []Item { - var items []Item - - for _, i := range ph.items { - ih := *i - if p(ih) { - items = append(items, ih) - } - } - return items -} - -// Peers returns all the peers currently in the heap -func (ph *Heap) Peers() []peer.ID { - peers := make([]peer.ID, 0, len(ph.items)) - - for _, i := range ph.items { - peers = append(peers, i.Peer) - } - return peers -} - -// Note: The functions below make the Heap satisfy the "heap.Interface" as required by the "heap" package in the -// standard library. Please refer to the docs for "heap.Interface" in the standard library for more details. - -func (ph *Heap) Len() int { - return len(ph.items) -} - -func (ph *Heap) Less(i, j int) bool { - h := ph.items - - isLess := ph.compare(*h[i], *h[j]) - - // because the "compare" function returns true if item1 is less than item2, - // we need to reverse it's result if the Heap is a maxHeap. - if ph.isMaxHeap { - return !isLess - } - return isLess -} - -func (ph *Heap) Swap(i, j int) { - h := ph.items - h[i], h[j] = h[j], h[i] - h[i].Index = i - h[j].Index = j -} - -func (ph *Heap) Push(x interface{}) { - n := len(ph.items) - item := x.(*Item) - item.Index = n - ph.items = append(ph.items, item) -} - -func (ph *Heap) Pop() interface{} { - old := ph.items - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.Index = -1 // for safety - ph.items = old[0 : n-1] - return item -} diff --git a/kpeerset/peerheap/heap_test.go b/kpeerset/peerheap/heap_test.go deleted file mode 100644 index 70a622eed..000000000 --- a/kpeerset/peerheap/heap_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package peerheap - -import ( - "container/heap" - "testing" - - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/stretchr/testify/require" -) - -// a comparator that compares peer Ids based on their length -var cmp = func(i1 Item, i2 Item) bool { - return len(i1.Peer) < len(i2.Peer) -} - -var ( - peer1 = peer.ID("22") - peer2 = peer.ID("1") - peer3 = peer.ID("333") -) - -func TestMinHeap(t *testing.T) { - // create new - ph := New(false, cmp) - require.Zero(t, ph.Len()) - - // push the element - heap.Push(ph, &Item{Peer: peer1}) - // assertions - require.True(t, ph.Len() == 1) - require.Equal(t, peer1, ph.PeekTop().Peer) - - // push another element - heap.Push(ph, &Item{Peer: peer2}) - // assertions - require.True(t, ph.Len() == 2) - require.Equal(t, peer2, ph.PeekTop().Peer) - - // push another element - heap.Push(ph, &Item{Peer: peer3}) - // assertions - require.True(t, ph.Len() == 3) - require.Equal(t, peer2, ph.PeekTop().Peer) - - // remove & add again - heap.Remove(ph, 1) - require.True(t, ph.Len() == 2) - heap.Remove(ph, 0) - require.True(t, ph.Len() == 1) - - heap.Push(ph, &Item{Peer: peer1}) - heap.Push(ph, &Item{Peer: peer2}) - - // test filter peers - filtered := ph.FilterItems(func(i Item) bool { - return len(i.Peer) != 2 - }) - require.Len(t, filtered, 2) - require.Contains(t, itemsToPeers(filtered), peer2) - require.Contains(t, itemsToPeers(filtered), peer3) - - // Assert Min Heap Order - require.Equal(t, peer2, heap.Pop(ph).(*Item).Peer) - require.Equal(t, peer1, heap.Pop(ph).(*Item).Peer) - require.Equal(t, peer3, heap.Pop(ph).(*Item).Peer) -} - -func itemsToPeers(is []Item) []peer.ID { - peers := make([]peer.ID, 0, len(is)) - for _, i := range is { - peers = append(peers, i.Peer) - } - return peers -} - -func TestMaxHeap(t *testing.T) { - // create new - ph := New(true, cmp) - require.Zero(t, ph.Len()) - - // push all three peers - heap.Push(ph, &Item{Peer: peer1}) - heap.Push(ph, &Item{Peer: peer3}) - heap.Push(ph, &Item{Peer: peer2}) - - // Assert Max Heap Order - require.Equal(t, peer3, heap.Pop(ph).(*Item).Peer) - require.Equal(t, peer1, heap.Pop(ph).(*Item).Peer) - require.Equal(t, peer2, heap.Pop(ph).(*Item).Peer) -} diff --git a/kpeerset/sorted_peerset.go b/kpeerset/sorted_peerset.go deleted file mode 100644 index f4eac9b60..000000000 --- a/kpeerset/sorted_peerset.go +++ /dev/null @@ -1,236 +0,0 @@ -package kpeerset - -import ( - "container/heap" - "math/big" - "sync" - - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" - - ks "github.com/whyrusleeping/go-keyspace" -) - -// SortedPeerset is a data-structure that maintains the queried & unqueried peers for a query -// based on their distance from the key. -// It's main use is to allow peer addition, removal & retrieval for the query as per the -// semantics described in the Kad DHT paper. -type SortedPeerset struct { - // the key being searched for - key ks.Key - - // the K parameter in the Kad DHT paper - kvalue int - - // a maxHeap maintaining the K closest(Kademlia XOR distance) peers to the key. - // the topmost peer will be the peer furthest from the key in this heap. - heapKClosestPeers *peerheap.Heap - - // a minHeap for for rest of the peers ordered by their distance from the key. - // the topmost peer will be the peer closest to the key in this heap. - heapRestOfPeers *peerheap.Heap - - // pointer to the item in the heap of K closest peers. - kClosestPeers map[peer.ID]*peerheap.Item - - // pointer to the item in the heap of the rest of peers. - restOfPeers map[peer.ID]*peerheap.Item - - // peers that have already been queried. - queried map[peer.ID]struct{} - - // the closest peer to the key that we have heard about - closestKnownPeer peer.ID - // the distance of the closest known peer from the key - dClosestKnownPeer *big.Int - - lock sync.Mutex -} - -// NewSortedPeerset creates and returns a new SortedPeerset. -func NewSortedPeerset(kvalue int, key string) *SortedPeerset { - compare := func(i1 peerheap.Item, i2 peerheap.Item) bool { - // distance of the first peer from the key - d1 := i1.Value.(*big.Int) - // distance of the second peer from the key - d2 := i2.Value.(*big.Int) - - // Is the first peer closer to the key than the second peer ? - return d1.Cmp(d2) == -1 - } - - return &SortedPeerset{ - key: ks.XORKeySpace.Key([]byte(key)), - kvalue: kvalue, - heapKClosestPeers: peerheap.New(true, compare), - heapRestOfPeers: peerheap.New(false, compare), - kClosestPeers: make(map[peer.ID]*peerheap.Item), - restOfPeers: make(map[peer.ID]*peerheap.Item), - queried: make(map[peer.ID]struct{}), - } -} - -// Add adds the peer to the SortedPeerset. -// -// If there are less than K peers in the K closest peers, we add the peer to -// the K closest peers. -// -// Otherwise, we do one of the following: -// 1. If this peer is closer to the key than the peer furthest from the key in the -// K closest peers, we move that furthest peer to the rest of peers and then -// add this peer to the K closest peers. -// 2. If this peer is further from the key than the peer furthest from the key in the -// K closest peers, we add it to the rest of peers. -// -// Returns true if the peer is closer to key than the closet peer we've heard about. -func (ps *SortedPeerset) Add(p peer.ID) bool { - ps.lock.Lock() - defer ps.lock.Unlock() - - // we've already added the peer - if ps.kClosestPeers[p] != nil || ps.restOfPeers[p] != nil { - return false - } - - // calculate the distance of the given peer from the key - distancePeer := ks.XORKeySpace.Key([]byte(p)).Distance(ps.key) - item := &peerheap.Item{Peer: p, Value: distancePeer} - - if ps.heapKClosestPeers.Len() < ps.kvalue { - // add the peer to the K closest peers if we have space - heap.Push(ps.heapKClosestPeers, item) - ps.kClosestPeers[p] = item - } else if top := ps.heapKClosestPeers.PeekTop(); distancePeer.Cmp(top.Value.(*big.Int)) == -1 { - // peer is closer to the key than the top peer in the heap of K closest peers - // which is basically the peer furthest from the key because the K closest peers - // are stored in a maxHeap ordered by the distance from the key. - - // remove the top peer from the K closest peers & add it to the rest of peers. - bumpedPeer := heap.Pop(ps.heapKClosestPeers).(*peerheap.Item) - delete(ps.kClosestPeers, bumpedPeer.Peer) - heap.Push(ps.heapRestOfPeers, bumpedPeer) - ps.restOfPeers[bumpedPeer.Peer] = bumpedPeer - - // add the peer p to the K closest peers - heap.Push(ps.heapKClosestPeers, item) - ps.kClosestPeers[p] = item - } else { - // add the peer to the rest of peers. - heap.Push(ps.heapRestOfPeers, item) - ps.restOfPeers[p] = item - } - - if ps.closestKnownPeer == "" || (distancePeer.Cmp(ps.dClosestKnownPeer) == -1) { - // given peer is closer to the key than the current closest known peer. - // So, let's update the closest known peer - ps.closestKnownPeer = p - ps.dClosestKnownPeer = distancePeer - return true - } - - return false -} - -// UnqueriedFromKClosest returns the unqueried peers among the K closest peers AFTER -// sorting them in Ascending Order with the given comparator. -// It uses the `getValue` function to get the value with which to compare the peers for sorting -// and the `sortWith` function to compare two peerHeap items to determine the ordering between them. -func (ps *SortedPeerset) UnqueriedFromKClosest(getValue func(id peer.ID, distance *big.Int) interface{}, - sortWith peerheap.Comparator) []peer.ID { - ps.lock.Lock() - defer ps.lock.Unlock() - - unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) - - // create a min-heap to sort the unqueried peer Items using the given comparator - ph := peerheap.New(false, sortWith) - for _, i := range unqueriedPeerItems { - p := i.Peer - d := i.Value.(*big.Int) - heap.Push(ph, &peerheap.Item{Peer: p, Value: getValue(p, d)}) - } - // now pop so we get them in sorted order - peers := make([]peer.ID, 0, ph.Len()) - for ph.Len() != 0 { - popped := heap.Pop(ph).(*peerheap.Item) - peers = append(peers, popped.Peer) - } - - return peers -} - -// LenUnqueriedFromKClosest returns the number of unqueried peers among -// the K closest peers. -func (ps *SortedPeerset) LenUnqueriedFromKClosest() int { - ps.lock.Lock() - defer ps.lock.Unlock() - - unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) - - return len(unqueriedPeerItems) -} - -// caller is responsible for the locking -func (ps *SortedPeerset) isPeerItemQueried(i peerheap.Item) bool { - _, ok := ps.queried[i.Peer] - return !ok -} - -// MarkQueried marks the peer as queried. -// It should be called when we have successfully dialed to and gotten a response from the peer. -func (ps *SortedPeerset) MarkQueried(p peer.ID) { - ps.lock.Lock() - defer ps.lock.Unlock() - - ps.queried[p] = struct{}{} -} - -// Remove removes the peer from the SortedPeerset. -// -// If the removed peer was among the K closest peers, we pop a peer from the heap of rest of peers -// and add it to the K closest peers to replace the removed peer. The peer added to the K closest peers in this way -// would be the peer that was closest to the key among the rest of peers since the rest of peers are in a -// minHeap ordered on the distance from the key. -func (ps *SortedPeerset) Remove(p peer.ID) { - ps.lock.Lock() - defer ps.lock.Unlock() - - delete(ps.queried, p) - - if item, ok := ps.kClosestPeers[p]; ok { - // peer is among the K closest peers - - // remove it from the K closest peers - heap.Remove(ps.heapKClosestPeers, item.Index) - delete(ps.kClosestPeers, p) - // if this peer was the closest peer we knew, we need to find the new closest peer. - if ps.closestKnownPeer == p { - var minDistance *big.Int - var closest peer.ID - for _, i := range ps.kClosestPeers { - d := i.Value.(*big.Int) - if minDistance == nil || (d.Cmp(minDistance) == -1) { - minDistance = d - closest = i.Peer - } - } - ps.closestKnownPeer = closest - ps.dClosestKnownPeer = minDistance - } - - // we now need to add a peer to the K closest peers from the rest of peers - // to make up for the peer that was just removed - if ps.heapRestOfPeers.Len() > 0 { - // pop a peer from the rest of peers & add it to the K closest peers - upgrade := heap.Pop(ps.heapRestOfPeers).(*peerheap.Item) - delete(ps.restOfPeers, upgrade.Peer) - heap.Push(ps.heapKClosestPeers, upgrade) - ps.kClosestPeers[upgrade.Peer] = upgrade - } - } else if item, ok := ps.restOfPeers[p]; ok { - // peer is not among the K closest, so remove it from the rest of peers. - heap.Remove(ps.heapRestOfPeers, item.Index) - delete(ps.restOfPeers, p) - } -} diff --git a/kpeerset/sorted_peerset_test.go b/kpeerset/sorted_peerset_test.go deleted file mode 100644 index a496dade0..000000000 --- a/kpeerset/sorted_peerset_test.go +++ /dev/null @@ -1,169 +0,0 @@ -package kpeerset - -import ( - "math/big" - "testing" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/test" - - "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" - kb "github.com/libp2p/go-libp2p-kbucket" - - "github.com/stretchr/testify/require" -) - -var noopCompare = func(i1 peerheap.Item, i2 peerheap.Item) bool { - return true -} - -var noopGetValue = func(p peer.ID, d *big.Int) interface{} { - return d -} - -func TestSortedPeerset(t *testing.T) { - key := "test" - sp := NewSortedPeerset(2, key) - require.Empty(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare)) - - // -----------------Ordering between peers for the Test ----- - // KEY < peer0 < peer3 < peer1 < peer4 < peer2 < peer5 - // ---------------------------------------------------------- - peer2 := test.RandPeerIDFatal(t) - - // add peer 2 & assert - require.True(t, sp.Add(peer2)) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) - require.True(t, sp.LenUnqueriedFromKClosest() == 1) - require.Equal(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare)[0], peer2) - assertClosestKnownPeer(t, sp, peer2) - - // add peer4 & assert - var peer4 peer.ID - for { - peer4 = test.RandPeerIDFatal(t) - if kb.Closer(peer4, peer2, key) { - break - } - } - require.True(t, sp.Add(peer4)) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) - require.True(t, sp.LenUnqueriedFromKClosest() == 2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) - assertClosestKnownPeer(t, sp, peer4) - - // add peer1 which will displace peer2 in the kClosest - var peer1 peer.ID - for { - peer1 = test.RandPeerIDFatal(t) - if kb.Closer(peer1, peer4, key) { - break - } - } - require.True(t, sp.Add(peer1)) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer1) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) - assertClosestKnownPeer(t, sp, peer1) - - // add peer 3 which will displace peer4 in the kClosest - var peer3 peer.ID - for { - peer3 = test.RandPeerIDFatal(t) - if kb.Closer(peer3, peer1, key) { - break - } - } - require.True(t, sp.Add(peer3)) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer1) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) - assertClosestKnownPeer(t, sp, peer3) - - // removing peer1 moves peer4 to the KClosest - sp.Remove(peer1) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) - sp.lock.Lock() - require.True(t, sp.heapRestOfPeers.Len() == 1) - require.Contains(t, sp.heapRestOfPeers.Peers(), peer2) - sp.lock.Unlock() - - // mark a peer as queried so it's not returned as unqueried - sp.MarkQueried(peer4) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) - - // removing peer3 moves peer2 to the kClosest & updates the closest known peer to peer4 - sp.Remove(peer3) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer2) - sp.lock.Lock() - require.Empty(t, sp.heapRestOfPeers.Peers()) - sp.lock.Unlock() - assertClosestKnownPeer(t, sp, peer4) - - // adding peer5 does not change the closest known peer - var peer5 peer.ID - for { - peer5 = test.RandPeerIDFatal(t) - if kb.Closer(peer2, peer5, key) { - break - } - } - require.False(t, sp.Add(peer5)) - assertClosestKnownPeer(t, sp, peer4) - - // adding peer0 changes the closest known peer - var peer0 peer.ID - for { - peer0 = test.RandPeerIDFatal(t) - if kb.Closer(peer0, peer3, key) { - break - } - } - require.True(t, sp.Add(peer0)) - assertClosestKnownPeer(t, sp, peer0) -} - -func TestSortingUnqueriedFromKClosest(t *testing.T) { - p1 := peer.ID("1") - p2 := peer.ID("22") - p3 := peer.ID("333") - - key := "test" - sp := NewSortedPeerset(3, key) - sp.Add(p1) - sp.Add(p3) - sp.Add(p2) - - ps := sp.UnqueriedFromKClosest(noopGetValue, func(i1 peerheap.Item, i2 peerheap.Item) bool { - return len(i1.Peer) > len(i2.Peer) - }) - require.Len(t, ps, 3) - require.Equal(t, p3, ps[0]) - require.Equal(t, p2, ps[1]) - require.Equal(t, p1, ps[2]) - - // mark one as queried - scoref := func(p peer.ID, d *big.Int) interface{} { - return len(p) - } - - sp.MarkQueried(p3) - ps = sp.UnqueriedFromKClosest(scoref, func(i1 peerheap.Item, i2 peerheap.Item) bool { - return i1.Value.(int) > i2.Value.(int) - }) - require.Len(t, ps, 2) - require.Equal(t, p2, ps[0]) - require.Equal(t, p1, ps[1]) -} - -func assertClosestKnownPeer(t *testing.T, sp *SortedPeerset, p peer.ID) { - sp.lock.Lock() - defer sp.lock.Unlock() - - require.Equal(t, sp.closestKnownPeer, p) -} diff --git a/lookup.go b/lookup.go index aef06a671..7e59771da 100644 --- a/lookup.go +++ b/lookup.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-kad-dht/kpeerset" pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" "github.com/multiformats/go-base32" @@ -76,7 +75,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key)) defer e.Done() - queries, err := dht.runDisjointQueries(ctx, dht.d, key, + lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -100,7 +99,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee return peers, err }, - func(peerset *kpeerset.SortedPeerset) bool { return false }, + func() bool { return false }, ) if err != nil { @@ -110,18 +109,13 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee out := make(chan peer.ID, dht.bucketSize) defer close(out) - kadID := kb.ConvertKey(key) - allPeers := kb.SortClosestPeers(queries[0].globallyQueriedPeers.Peers(), kadID) - for i, p := range allPeers { - if i == dht.bucketSize { - break - } + for _, p := range lookupRes.peers { out <- p } - if ctx.Err() == nil { + if ctx.Err() == nil && lookupRes.completed { // refresh the cpl for this key as the query was successful - dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now()) + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now()) } return out, ctx.Err() diff --git a/qpeerset/qpeerset.go b/qpeerset/qpeerset.go new file mode 100644 index 000000000..a2113c9c1 --- /dev/null +++ b/qpeerset/qpeerset.go @@ -0,0 +1,165 @@ +package qpeerset + +import ( + "math/big" + "sort" + + "github.com/libp2p/go-libp2p-core/peer" + ks "github.com/whyrusleeping/go-keyspace" +) + +// PeerState describes the state of a peer ID during the lifecycle of an individual lookup. +type PeerState int + +const ( + // PeerHeard is applied to peers which have not been queried yet. + PeerHeard PeerState = iota + // PeerWaiting is applied to peers that are currently being queried. + PeerWaiting + // PeerQueried is applied to peers who have been queried and a response was retrieved successfully. + PeerQueried + // PeerUnreachable is applied to peers who have been queried and a response was not retrieved successfully. + PeerUnreachable +) + +// QueryPeerset maintains the state of a Kademlia asynchronous lookup. +// The lookup state is a set of peers, each labeled with a peer state. +type QueryPeerset struct { + // the key being searched for + key ks.Key + + // all known peers + all []queryPeerState + + // sorted is true if all is currently in sorted order + sorted bool +} + +type queryPeerState struct { + id peer.ID + distance *big.Int + state PeerState +} + +type sortedQueryPeerset QueryPeerset + +func (sqp *sortedQueryPeerset) Len() int { + return len(sqp.all) +} + +func (sqp *sortedQueryPeerset) Swap(i, j int) { + sqp.all[i], sqp.all[j] = sqp.all[j], sqp.all[i] +} + +func (sqp *sortedQueryPeerset) Less(i, j int) bool { + di, dj := sqp.all[i].distance, sqp.all[j].distance + return di.Cmp(dj) == -1 +} + +// NewQueryPeerset creates a new empty set of peers. +// key is the target key of the lookup that this peer set is for. +func NewQueryPeerset(key string) *QueryPeerset { + return &QueryPeerset{ + key: ks.XORKeySpace.Key([]byte(key)), + all: []queryPeerState{}, + sorted: false, + } +} + +func (qp *QueryPeerset) find(p peer.ID) int { + for i := range qp.all { + if qp.all[i].id == p { + return i + } + } + return -1 +} + +func (qp *QueryPeerset) distanceToKey(p peer.ID) *big.Int { + return ks.XORKeySpace.Key([]byte(p)).Distance(qp.key) +} + +// TryAdd adds the peer p to the peer set. +// If the peer is already present, no action is taken. +// Otherwise, the peer is added with state set to PeerHeard. +// TryAdd returns true iff the peer was not already present. +func (qp *QueryPeerset) TryAdd(p peer.ID) bool { + if qp.find(p) >= 0 { + return false + } else { + qp.all = append(qp.all, queryPeerState{id: p, distance: qp.distanceToKey(p), state: PeerHeard}) + qp.sorted = false + return true + } +} + +func (qp *QueryPeerset) sort() { + if qp.sorted { + return + } + sort.Sort((*sortedQueryPeerset)(qp)) + qp.sorted = true +} + +// SetState sets the state of peer p to s. +// If p is not in the peerset, SetState panics. +func (qp *QueryPeerset) SetState(p peer.ID, s PeerState) { + qp.all[qp.find(p)].state = s +} + +// GetState returns the state of peer p. +// If p is not in the peerset, GetState panics. +func (qp *QueryPeerset) GetState(p peer.ID) PeerState { + return qp.all[qp.find(p)].state +} + +// NumWaiting returns the number of peers in state PeerWaiting. +func (qp *QueryPeerset) NumWaiting() int { + return len(qp.GetWaitingPeers()) +} + +// GetWaitingPeers returns a slice of all peers in state PeerWaiting, in an undefined order. +func (qp *QueryPeerset) GetWaitingPeers() (result []peer.ID) { + for _, p := range qp.all { + if p.state == PeerWaiting { + result = append(result, p.id) + } + } + return +} + +// GetClosestNotUnreachable returns the closest to the key peers, which are not in state PeerUnreachable. +// It returns count peers or less, if fewer peers meet the condition. +func (qp *QueryPeerset) GetClosestNotUnreachable(count int) (result []peer.ID) { + qp.sort() + for _, p := range qp.all { + if p.state != PeerUnreachable { + result = append(result, p.id) + } + } + if len(result) >= count { + return result[:count] + } + return result +} + +// NumHeard returns the number of peers in state PeerHeard. +func (qp *QueryPeerset) NumHeard() int { + return len(qp.GetHeardPeers()) +} + +// GetHeardPeers returns a slice of all peers in state PeerHeard, in an undefined order. +func (qp *QueryPeerset) GetHeardPeers() (result []peer.ID) { + for _, p := range qp.all { + if p.state == PeerHeard { + result = append(result, p.id) + } + } + return +} + +// GetSortedHeard returns a slice of all peers in state PeerHeard, ordered by ascending distance to the target key. +func (qp *QueryPeerset) GetSortedHeard() (result []peer.ID) { + qp.sort() + return qp.GetHeardPeers() +} diff --git a/qpeerset/qpeerset_test.go b/qpeerset/qpeerset_test.go new file mode 100644 index 000000000..a84095765 --- /dev/null +++ b/qpeerset/qpeerset_test.go @@ -0,0 +1,84 @@ +package qpeerset + +import ( + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + + kb "github.com/libp2p/go-libp2p-kbucket" + + "github.com/stretchr/testify/require" +) + +func TestQPeerSet(t *testing.T) { + key := "test" + qp := NewQueryPeerset(key) + + // -----------------Ordering between peers for the Test ----- + // KEY < peer3 < peer1 < peer4 < peer2 + // ---------------------------------------------------------- + peer2 := test.RandPeerIDFatal(t) + var peer4 peer.ID + for { + peer4 = test.RandPeerIDFatal(t) + if kb.Closer(peer4, peer2, key) { + break + } + } + + var peer1 peer.ID + for { + peer1 = test.RandPeerIDFatal(t) + if kb.Closer(peer1, peer4, key) { + break + } + } + + var peer3 peer.ID + for { + peer3 = test.RandPeerIDFatal(t) + if kb.Closer(peer3, peer1, key) { + break + } + } + + // find fails + require.Equal(t, -1, qp.find(peer2)) + + // add peer2,assert state & then another add fails + require.True(t, qp.TryAdd(peer2)) + require.Equal(t, PeerHeard, qp.GetState(peer2)) + require.False(t, qp.TryAdd(peer2)) + require.Equal(t, 0, qp.NumWaiting()) + + // add peer4 + require.True(t, qp.TryAdd(peer4)) + cl := qp.GetClosestNotUnreachable(2) + require.Equal(t, []peer.ID{peer4, peer2}, cl) + cl = qp.GetClosestNotUnreachable(3) + require.Equal(t, []peer.ID{peer4, peer2}, cl) + cl = qp.GetClosestNotUnreachable(1) + require.Equal(t, []peer.ID{peer4}, cl) + + // mark as unreachable & try to get it + qp.SetState(peer4, PeerUnreachable) + cl = qp.GetClosestNotUnreachable(1) + require.Equal(t, []peer.ID{peer2}, cl) + + // add peer1 + require.True(t, qp.TryAdd(peer1)) + cl = qp.GetClosestNotUnreachable(1) + require.Equal(t, []peer.ID{peer1}, cl) + cl = qp.GetClosestNotUnreachable(2) + require.Equal(t, []peer.ID{peer1, peer2}, cl) + + // mark as waiting and assert + qp.SetState(peer2, PeerWaiting) + require.Equal(t, []peer.ID{peer2}, qp.GetWaitingPeers()) + + require.Equal(t, []peer.ID{peer1}, qp.GetHeardPeers()) + require.True(t, qp.TryAdd(peer3)) + require.Equal(t, []peer.ID{peer3, peer1}, qp.GetSortedHeard()) + require.Equal(t, 2, qp.NumHeard()) +} diff --git a/query.go b/query.go index 6eddf8ff6..ce9b95c4f 100644 --- a/query.go +++ b/query.go @@ -3,15 +3,14 @@ package dht import ( "context" "errors" + "fmt" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" - "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" - "math/big" - "time" - "github.com/libp2p/go-libp2p-kad-dht/kpeerset" + "github.com/libp2p/go-libp2p-kad-dht/qpeerset" kb "github.com/libp2p/go-libp2p-kbucket" ) @@ -19,22 +18,30 @@ import ( var ErrNoPeersQueried = errors.New("failed to query any peers") type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error) -type stopFn func(*kpeerset.SortedPeerset) bool +type stopFn func() bool // query represents a single disjoint query. type query struct { // the query context. ctx context.Context + // the cancellation function for the query context. cancel context.CancelFunc dht *IpfsDHT - // localPeers is the set of peers that need to be queried or have already been queried for this query. - localPeers *kpeerset.SortedPeerset + // seedPeers is the set of peers that seed the query + seedPeers []peer.ID + + // queryPeers is the set of peers known by this query and their respective states. + queryPeers *qpeerset.QueryPeerset + + // terminated is set when the first worker thread encounters the termination condition. + // Its role is to make sure that once termination is determined, it is sticky. + terminated bool // globallyQueriedPeers is the combined set of peers queried across ALL the disjoint queries. - globallyQueriedPeers *peer.Set + globallyQueriedPeers *peer.Set // TODO: abstract this away from specifics of disjoint paths // the function that will be used to query a single peer. queryFn queryFn @@ -43,14 +50,88 @@ type query struct { stopFn stopFn } -func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) { - queryCtx, cancelQuery := context.WithCancel(ctx) +type lookupWithFollowupResult struct { + peers []peer.ID // the top K not unreachable peers across all query paths + state []qpeerset.PeerState // the peer states at the end of the queries - numQueriesComplete := 0 - queryDone := make(chan struct{}, d) + // indicates that neither the lookup nor the followup has been prematurely terminated by an external condition such + // as context cancellation or the stop function being called. + completed bool +} + +// runLookupWithFollowup executes the lookup on the target using the given query function and stopping when either the +// context is cancelled or the stop function returns true. Note: if the stop function is not sticky, i.e. it does not +// return true every time after the first time it returns true, it is not guaranteed to cause a stop to occur just +// because it momentarily returns true. +// +// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the +// lookup that have not already been successfully queried. +func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { + // run the query + lookupRes, err := dht.runDisjointQueries(ctx, d, target, queryFn, stopFn) + if err != nil { + return nil, err + } + + // query all of the top K peers we've either Heard about or have outstanding queries we're Waiting on. + // This ensures that all of the top K results have been queried which adds to resiliency against churn for query + // functions that carry state (e.g. FindProviders and GetValue) as well as establish connections that are needed + // by stateless query functions (e.g. GetClosestPeers and therefore Provide and PutValue) + queryPeers := make([]peer.ID, 0, len(lookupRes.peers)) + for i, p := range lookupRes.peers { + if state := lookupRes.state[i]; state == qpeerset.PeerHeard || state == qpeerset.PeerWaiting { + queryPeers = append(queryPeers, p) + } + } + + if len(queryPeers) == 0 { + return lookupRes, nil + } + + // return if the lookup has been externally stopped + if ctx.Err() != nil || stopFn() { + lookupRes.completed = false + return lookupRes, nil + } + + doneCh := make(chan struct{}, len(queryPeers)) + followUpCtx, cancelFollowUp := context.WithCancel(ctx) + for _, p := range queryPeers { + qp := p + go func() { + _, _ = queryFn(followUpCtx, qp) + doneCh <- struct{}{} + }() + } + + // wait for all queries to complete before returning, aborting ongoing queries if we've been externally stopped +processFollowUp: + for i := 0; i < len(queryPeers); i++ { + select { + case <-doneCh: + if stopFn() { + cancelFollowUp() + if i < len(queryPeers)-1 { + lookupRes.completed = false + } + break processFollowUp + } + case <-ctx.Done(): + lookupRes.completed = false + break processFollowUp + } + } + + return lookupRes, nil +} + +// d is the number of disjoint queries. +func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { + queryCtx, cancelQuery := context.WithCancel(ctx) // pick the K closest peers to the key in our Routing table and shuffle them. - seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize) + targetKadID := kb.ConvertKey(target) + seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) if len(seedPeers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, @@ -73,7 +154,9 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string ctx: queryCtx, cancel: cancelQuery, dht: dht, - localPeers: kpeerset.NewSortedPeerset(dht.bucketSize, target), + queryPeers: qpeerset.NewQueryPeerset(target), + seedPeers: []peer.ID{}, + terminated: false, globallyQueriedPeers: peersQueried, queryFn: queryFn, stopFn: stopFn, @@ -84,174 +167,212 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string // distribute the shuffled K closest peers as seeds among the "d" disjoint queries for i := 0; i < len(seedPeers); i++ { - queries[i%d].localPeers.Add(seedPeers[i]) + queries[i%d].seedPeers = append(queries[i%d].seedPeers, seedPeers[i]) } // start the "d" disjoint queries + queryDone := make(chan struct{}, d) for i := 0; i < d; i++ { query := queries[i] go func() { - strictParallelismQuery(query) + query.runWithGreedyParallelism() queryDone <- struct{}{} }() } -loop: // wait for all the "d" disjoint queries to complete before we return + // XXX: Waiting until all queries are done is a vector for DoS attacks: + // The disjoint lookup paths that are taken over by adversarial peers + // can easily be fooled to go on forever. + numQueriesComplete := 0 for { - select { - case <-queryDone: - numQueriesComplete++ - if numQueriesComplete == d { - break loop - } - case <-ctx.Done(): - break loop + <-queryDone + numQueriesComplete++ + if numQueriesComplete == d { + break } } - return queries, nil + res := dht.constructLookupResult(queries, targetKadID) + return res, nil } -// TODO This function should be owned by the DHT as it dosen't really belong to "a query". -// scorePeerByDistanceAndLatency scores a peer using metrics such as connectendness of the peer, it's distance from the key -// and it's current known latency. -func (q query) scorePeerByDistanceAndLatency(p peer.ID, distanceFromKey *big.Int) interface{} { - connectedness := q.dht.host.Network().Connectedness(p) - latency := q.dht.host.Peerstore().LatencyEWMA(p) +// constructLookupResult takes the query information and uses it to construct the lookup result +func (dht *IpfsDHT) constructLookupResult(queries []*query, target kb.ID) *lookupWithFollowupResult { + // determine if any queries terminated early + completed := true + for _, q := range queries { + if !(q.isLookupTermination() || q.isStarvationTermination()) { + completed = false + break + } + } - var c int64 - switch connectedness { - case network.Connected: - c = 1 - case network.CanConnect: - c = 5 - case network.CannotConnect: - c = 10000 - default: - c = 20 + // extract the top K not unreachable peers from each query path, as well as their states at the end of the queries + var peers []peer.ID + peerState := make(map[peer.ID]qpeerset.PeerState) + for _, q := range queries { + qp := q.queryPeers.GetClosestNotUnreachable(dht.bucketSize) + for _, p := range qp { + // Since the same peer can be seen in multiple queries use the "best" state for the peer + // Note: It's possible that a peer was marked undialable in one path, but wasn't yet tried in another path + // for now we're going to return that peer as long as some path does not think it is undialable. This may + // change in the future if we track addresses dialed per path. + state := q.queryPeers.GetState(p) + if currState, ok := peerState[p]; ok { + if state > currState { + peerState[p] = state + } + } else { + peerState[p] = state + peers = append(peers, p) + } + } } - l := int64(latency) - if l <= 0 { - l = int64(time.Second) * 10 + // get the top K overall peers + sortedPeers := kb.SortClosestPeers(peers, target) + if len(sortedPeers) > dht.bucketSize { + sortedPeers = sortedPeers[:dht.bucketSize] } - res := big.NewInt(c) - res.Mul(res, big.NewInt(l)) - res.Mul(res, distanceFromKey) + // return the top K not unreachable peers across all query paths as well as their states at the end of the queries + res := &lookupWithFollowupResult{ + peers: sortedPeers, + state: make([]qpeerset.PeerState, len(sortedPeers)), + completed: completed, + } + + for i, p := range sortedPeers { + res.state[i] = peerState[p] + } return res } -// strictParallelismQuery concurrently sends the query RPC to all eligible peers -// and waits for ALL the RPC's to complete before starting the next round of RPC's. -func strictParallelismQuery(q *query) { - foundCloser := false - for { - // get the unqueried peers from among the K closest peers to the key sorted in ascending order - // of their 'distance-latency` score. - // We sort peers like this so that "better" peers are chosen to be in the α peers - // which get queried from among the unqueried K closet. - peersToQuery := q.localPeers.UnqueriedFromKClosest(q.scorePeerByDistanceAndLatency, - func(i1 peerheap.Item, i2 peerheap.Item) bool { - return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 - }) - - // The lookup terminates when the initiator has queried and gotten responses from the k - // closest nodes it has heard about. - if len(peersToQuery) == 0 { - return - } +type queryUpdate struct { + seen []peer.ID + queried []peer.ID + unreachable []peer.ID +} - // Of the k nodes the initiator has heard of closest to the target, - // it picks α that it has not yet queried and resends the FIND NODE RPC to them. - numQuery := q.dht.alpha - - // However, If a round of RPC's fails to return a node any closer than the closest already heard about, - // the initiator resends the RPC'S to all of the k closest nodes it has - // not already queried. - if !foundCloser { - numQuery = len(peersToQuery) - } else if pqLen := len(peersToQuery); pqLen < numQuery { - // if we don't have α peers, pick whatever number we have. - numQuery = pqLen - } +func (q *query) runWithGreedyParallelism() { + pathCtx, cancelPath := context.WithCancel(q.ctx) + defer cancelPath() - // reset foundCloser to false for the next round of RPC's - foundCloser = false + alpha := q.dht.alpha - queryResCh := make(chan *queryResult, numQuery) - resultsReceived := 0 + ch := make(chan *queryUpdate, alpha) + ch <- &queryUpdate{seen: q.seedPeers} - // send RPC's to all the chosen peers concurrently - for _, p := range peersToQuery[:numQuery] { - go func(p peer.ID) { - queryResCh <- q.queryPeer(p) - }(p) + for { + select { + case update := <-ch: + q.updateState(update) + case <-pathCtx.Done(): + q.terminate() } - loop: - // wait for all outstanding RPC's to complete before we start the next round. - for { - select { - case res := <-queryResCh: - foundCloser = foundCloser || res.foundCloserPeer - resultsReceived++ - if resultsReceived == numQuery { - break loop - } - case <-q.ctx.Done(): + // termination is triggered on end-of-lookup conditions or starvation of unused peers + if q.readyToTerminate() { + q.terminate() + + // exit once all goroutines have been cleaned up + if q.queryPeers.NumWaiting() == 0 { return } + continue + } + + // if all "threads" are busy, wait until someone finishes + if q.queryPeers.NumWaiting() >= alpha { + continue + } + + // spawn new queries, up to the parallelism allowance + for j := 0; j < alpha-q.queryPeers.NumWaiting(); j++ { + q.spawnQuery(ch) + } + } +} + +// spawnQuery starts one query, if an available seen peer is found +func (q *query) spawnQuery(ch chan<- *queryUpdate) { + if peers := q.queryPeers.GetSortedHeard(); len(peers) == 0 { + return + } else { + q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting) + go q.queryPeer(ch, peers[0]) + } +} + +func (q *query) readyToTerminate() bool { + // if termination has already been determined, the query is considered terminated forever, + // regardless of any change to queryPeers that might occur after the initial termination. + if q.terminated { + return true + } + // give the application logic a chance to terminate + if q.stopFn() { + return true + } + if q.isStarvationTermination() { + return true + } + if q.isLookupTermination() { + return true + } + return false +} + +// From the set of all nodes that are not unreachable, +// if the closest beta nodes are all queried, the lookup can terminate. +func (q *query) isLookupTermination() bool { + var peers []peer.ID + peers = q.queryPeers.GetClosestNotUnreachable(q.dht.beta) + for _, p := range peers { + if q.queryPeers.GetState(p) != qpeerset.PeerQueried { + return false } } + return true +} + +func (q *query) isStarvationTermination() bool { + return q.queryPeers.NumHeard() == 0 && q.queryPeers.NumWaiting() == 0 } -type queryResult struct { - // foundCloserPeer is true if the peer we're querying returns a peer - // closer than the closest we've already heard about - foundCloserPeer bool +func (q *query) terminate() { + q.terminated = true } -// queryPeer queries a single peer. -func (q *query) queryPeer(p peer.ID) *queryResult { +// queryPeer queries a single peer and reports its findings on the channel. +// queryPeer does not access the query state in queryPeers! +func (q *query) queryPeer(ch chan<- *queryUpdate, p peer.ID) { dialCtx, queryCtx := q.ctx, q.ctx // dial the peer if err := q.dht.dialPeer(dialCtx, p); err != nil { - q.localPeers.Remove(p) - return &queryResult{} + ch <- &queryUpdate{unreachable: []peer.ID{p}} + return } // add the peer to the global set of queried peers since the dial was successful // so that no other disjoint query tries sending an RPC to the same peer if !q.globallyQueriedPeers.TryAdd(p) { - q.localPeers.Remove(p) - return &queryResult{} - } - - // did the dial fulfill the stop condition ? - if q.stopFn(q.localPeers) { - q.cancel() - return &queryResult{} + ch <- &queryUpdate{unreachable: []peer.ID{p}} + return } // send query RPC to the remote peer newPeers, err := q.queryFn(queryCtx, p) if err != nil { - q.localPeers.Remove(p) - return &queryResult{} - } - - // mark the peer as queried. - q.localPeers.MarkQueried(p) - - if len(newPeers) == 0 { - logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p) + ch <- &queryUpdate{unreachable: []peer.ID{p}} + return } - foundCloserPeer := false + // process new peers + saw := []peer.ID{} for _, next := range newPeers { if next.ID == q.dht.self { // don't add self. logger.Debugf("PEERS CLOSER -- worker for: %v found self", p) @@ -260,17 +381,38 @@ func (q *query) queryPeer(p peer.ID) *queryResult { // add their addresses to the dialer's peerstore q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL) - closer := q.localPeers.Add(next.ID) - foundCloserPeer = foundCloserPeer || closer + saw = append(saw, next.ID) } - // did the successful query RPC fulfill the query stop condition ? - if q.stopFn(q.localPeers) { - q.cancel() - } + ch <- &queryUpdate{seen: saw, queried: []peer.ID{p}} +} - return &queryResult{ - foundCloserPeer: foundCloserPeer, +func (q *query) updateState(up *queryUpdate) { + for _, p := range up.seen { + if p == q.dht.self { // don't add self. + continue + } + q.queryPeers.TryAdd(p) + } + for _, p := range up.queried { + if p == q.dht.self { // don't add self. + continue + } + if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { + q.queryPeers.SetState(p, qpeerset.PeerQueried) + } else { + panic(fmt.Errorf("kademlia protocol error: tried to transition to the queried state from state %v", st)) + } + } + for _, p := range up.unreachable { + if p == q.dht.self { // don't add self. + continue + } + if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { + q.queryPeers.SetState(p, qpeerset.PeerUnreachable) + } else { + panic(fmt.Errorf("kademlia protocol error: tried to transition to the unreachable state from state %v", st)) + } } } diff --git a/routing.go b/routing.go index 34033fc8c..54dad76d1 100644 --- a/routing.go +++ b/routing.go @@ -4,10 +4,10 @@ import ( "bytes" "context" "fmt" - "github.com/libp2p/go-libp2p-core/network" "sync" "time" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" @@ -15,8 +15,8 @@ import ( "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-kad-dht/kpeerset" pb "github.com/libp2p/go-libp2p-kad-dht/pb" + "github.com/libp2p/go-libp2p-kad-dht/qpeerset" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" "github.com/multiformats/go-multihash" @@ -172,7 +172,7 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing } stopCh := make(chan struct{}) - valCh, queries := dht.getValues(ctx, key, stopCh) + valCh, lookupRes := dht.getValues(ctx, key, stopCh) out := make(chan []byte) go func() { @@ -184,17 +184,12 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing updatePeers := make([]peer.ID, 0, dht.bucketSize) select { - case q := <-queries: - if len(q) < 1 { + case l := <-lookupRes: + if l == nil { return } - peers := q[0].globallyQueriedPeers.Peers() - peers = kb.SortClosestPeers(peers, kb.ConvertKey(key)) - for i, p := range peers { - if i == dht.bucketSize { - break - } + for _, p := range l.peers { if _, ok := peersWithBest[p]; !ok { updatePeers = append(updatePeers, p) } @@ -320,9 +315,9 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte } } -func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan []*query) { +func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { valCh := make(chan RecvdVal, 1) - queriesCh := make(chan []*query, 1) + lookupResCh := make(chan *lookupWithFollowupResult, 1) if rec, err := dht.getLocal(key); rec != nil && err == nil { select { @@ -336,8 +331,8 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st go func() { defer close(valCh) - defer close(queriesCh) - queries, err := dht.runDisjointQueries(ctx, dht.d, key, + defer close(lookupResCh) + lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -386,7 +381,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st return peers, err }, - func(peerset *kpeerset.SortedPeerset) bool { + func() bool { select { case <-stopQuery: return true @@ -399,26 +394,18 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st if err != nil { return } - queriesCh <- queries + lookupResCh <- lookupRes if ctx.Err() == nil { - dht.refreshRTIfNoShortcut(kb.ConvertKey(key), queries) + dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes) } }() - return valCh, queriesCh + return valCh, lookupResCh } -func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) { - shortcutTaken := false - for _, q := range queries { - if q.localPeers.LenUnqueriedFromKClosest() > 0 { - shortcutTaken = true - break - } - } - - if !shortcutTaken { +func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) { + if lookupRes.completed { // refresh the cpl for this key as the query was successful dht.routingTable.ResetCplRefreshedAtForID(key, time.Now()) } @@ -593,7 +580,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash } } - queries, err := dht.runDisjointQueries(ctx, dht.d, string(key), + lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, string(key), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -644,13 +631,13 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash return peers, nil }, - func(peerset *kpeerset.SortedPeerset) bool { + func() bool { return !findAll && ps.Size() >= count }, ) if err != nil && ctx.Err() == nil { - dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), queries) + dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) } } @@ -669,7 +656,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return pi, nil } - queries, err := dht.runDisjointQueries(ctx, dht.d, string(id), + lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, string(id), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -693,7 +680,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return peers, err }, - func(peerset *kpeerset.SortedPeerset) bool { + func() bool { return dht.host.Network().Connectedness(id) == network.Connected }, ) @@ -702,14 +689,21 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return peer.AddrInfo{}, err } - // refresh the cpl for this key if we discovered the peer because of the query - if ctx.Err() == nil && queries[0].globallyQueriedPeers.Contains(id) { - kadID := kb.ConvertPeerID(id) - dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now()) + dialedPeerDuringQuery := false + for i, p := range lookupRes.peers { + if p == id { + // Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol + // and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT + // server peer and is not identified as such is a bug. + dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard + break + } } - // TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar - if dht.host.Network().Connectedness(id) == network.Connected { + // Return peer information if we tried to dial the peer during the query or we are (or recently were) connected + // to the peer. + connectedness := dht.host.Network().Connectedness(id) + if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect { return dht.peerstore.PeerInfo(id), nil }