From 2dfbbc4693e8fc95df921b95506bbba19ca90a7c Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 17 Mar 2020 15:05:49 -0400 Subject: [PATCH 01/18] tmp --- kpeerset/sorted_peerset.go | 58 +++++++++++++++++++++++++++++++++ query.go | 66 +++++++++++++++++++++++++++++++++++++- 2 files changed, 123 insertions(+), 1 deletion(-) diff --git a/kpeerset/sorted_peerset.go b/kpeerset/sorted_peerset.go index f4eac9b60..ef1ef9e05 100644 --- a/kpeerset/sorted_peerset.go +++ b/kpeerset/sorted_peerset.go @@ -160,6 +160,64 @@ func (ps *SortedPeerset) UnqueriedFromKClosest(getValue func(id peer.ID, distanc return peers } +func (ps *SortedPeerset) GetBestUnqueried(num int, getValue func(id peer.ID, distance *big.Int) interface{}, + sortWith peerheap.Comparator) []peer.ID { + ps.lock.Lock() + defer ps.lock.Unlock() + + unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) + + // create a min-heap to sort the unqueried peer Items using the given comparator + ph := peerheap.New(false, sortWith) + for _, i := range unqueriedPeerItems { + p := i.Peer + d := i.Value.(*big.Int) + heap.Push(ph, &peerheap.Item{Peer: p, Value: getValue(p, d)}) + } + // now pop so we get them in sorted order + peers := make([]peer.ID, 0, num) + for ph.Len() != 0 && len(peers) < num { + popped := heap.Pop(ph).(*peerheap.Item) + peers = append(peers, popped.Peer) + } + + return peers +} + +func (ps *SortedPeerset) GetClosestUnqueried(num int) []peer.ID { + ps.lock.Lock() + defer ps.lock.Unlock() + + sortWith := func(i1 peerheap.Item, i2 peerheap.Item) bool { + return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 + } + + peerItems := ps.heapKClosestPeers.FilterItems(func(peerheap.Item) bool { return true }) + + // create a min-heap to sort the unqueried peer Items using the given comparator + ph := peerheap.New(false, sortWith) + for _, i := range peerItems { + p := i.Peer + d := i.Value.(*big.Int) + heap.Push(ph, &peerheap.Item{Peer: p, Value: d}) + } + // now pop so we get them in sorted order + peers := make([]peer.ID, 0, num) + for ph.Len() != 0 && len(peers) < num { + popped := heap.Pop(ph).(*peerheap.Item) + peers = append(peers, popped.Peer) + } + + unqueriedPeers := make([]peer.ID, 0, num) + for _, p := range peers { + if _, queried := ps.queried[p]; !queried { + unqueriedPeers = append(unqueriedPeers, p) + } + } + + return unqueriedPeers +} + // LenUnqueriedFromKClosest returns the number of unqueried peers among // the K closest peers. func (ps *SortedPeerset) LenUnqueriedFromKClosest() int { diff --git a/query.go b/query.go index 6eddf8ff6..39b2624d2 100644 --- a/query.go +++ b/query.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" "math/big" + "sync" "time" "github.com/libp2p/go-libp2p-kad-dht/kpeerset" @@ -91,7 +92,8 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string for i := 0; i < d; i++ { query := queries[i] go func() { - strictParallelismQuery(query) + looseBParallelismQuery(query) + //strictParallelismQuery(query) queryDone <- struct{}{} }() } @@ -208,6 +210,68 @@ func strictParallelismQuery(q *query) { } } +// strictParallelismQuery concurrently sends the query RPC to all eligible peers +// and waits for ALL the RPC's to complete before starting the next round of RPC's. +func looseBParallelismQuery(q *query) { + alphaCh := make(chan bool, q.dht.alpha) + resultCh := make(chan *queryResult, q.dht.alpha) + + pathCtx, cancelPath := context.WithCancel(q.ctx) + defer cancelPath() + + scoreCmp := func(i1 peerheap.Item, i2 peerheap.Item) bool { + return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 + } + + alphaMx := sync.Mutex{} + + for i := 0; i < q.dht.alpha; i++ { + go func() { + for { + if len(q.localPeers.GetClosestUnqueried(3)) == 0 { + cancelPath() + } + + select { + case top := <-alphaCh: + alphaMx.Lock() + var peers []peer.ID + if !top { + peers = q.localPeers.GetBestUnqueried(1, q.scorePeerByDistanceAndLatency, scoreCmp) + } else { + peers = q.localPeers.GetClosestUnqueried(3) + } + var qp peer.ID + if len(peers) > 0 { + qp = peers[0] + } else { + continue + } + q.localPeers.MarkQueried(qp) + alphaMx.Unlock() + resultCh <- q.queryPeer(qp) + case <-pathCtx.Done(): + return + } + } + }() + } + + foundCloserCounter := 0 + for closest := q.localPeers.GetClosestUnqueried(3); len(closest) > 0; { + select { + case alphaCh <- foundCloserCounter >= q.dht.alpha: + case res := <-resultCh: + if res.foundCloserPeer { + foundCloserCounter++ + } else { + foundCloserCounter = 0 + } + case <-pathCtx.Done(): + } + } +} + type queryResult struct { // foundCloserPeer is true if the peer we're querying returns a peer // closer than the closest we've already heard about From 49b07fa585e6d6a27daf5e3e6b2b34807e9a3ac0 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 19 Mar 2020 10:36:22 -0700 Subject: [PATCH 02/18] Fully async implementation of Kademlia lookup. --- qpeerset/qpeerset.go | 168 +++++++++++++++++++++++ query.go | 307 ++++++++++++++++++------------------------- 2 files changed, 294 insertions(+), 181 deletions(-) create mode 100644 qpeerset/qpeerset.go diff --git a/qpeerset/qpeerset.go b/qpeerset/qpeerset.go new file mode 100644 index 000000000..9938268e1 --- /dev/null +++ b/qpeerset/qpeerset.go @@ -0,0 +1,168 @@ +package qpeerset + +import ( + "math/big" + "sort" + + "github.com/libp2p/go-libp2p-core/peer" + ks "github.com/whyrusleeping/go-keyspace" +) + +type PeerState int + +const ( + PeerSeen PeerState = iota + PeerWaiting + PeerQueried + PeerUnreachable +) + +type QueryPeerset struct { + // the key being searched for + key ks.Key + + // all known peers + all []queryPeerState + + // sorted is true if all is currently in sorted order + sorted bool +} + +type queryPeerState struct { + id peer.ID + state PeerState +} + +type sortedQueryPeerset QueryPeerset + +func (sqp *sortedQueryPeerset) Len() int { + return len(sqp.all) +} + +func (sqp *sortedQueryPeerset) Swap(i, j int) { + sqp.all[i], sqp.all[j] = sqp.all[j], sqp.all[i] +} + +func (sqp *sortedQueryPeerset) Less(i, j int) bool { + di, dj := sqp.distanceToKey(i), sqp.distanceToKey(j) + return di.Cmp(dj) == -1 +} + +func (sqp *sortedQueryPeerset) distanceToKey(i int) *big.Int { + return ks.XORKeySpace.Key([]byte(sqp.all[i].id)).Distance(sqp.key) +} + +func NewQueryPeerset(key string) *QueryPeerset { + return &QueryPeerset{ + key: ks.XORKeySpace.Key([]byte(key)), + all: []queryPeerState{}, + sorted: false, + } +} + +func (qp *QueryPeerset) find(p peer.ID) int { + for i := range qp.all { + if qp.all[i].id == p { + return i + } + } + return -1 +} + +func (qp *QueryPeerset) TryAdd(p peer.ID) bool { + if qp.find(p) >= 0 { + return false + } else { + qp.all = append(qp.all, queryPeerState{id: p, state: PeerSeen}) + qp.sorted = false + return true + } +} + +func (qp *QueryPeerset) sort() { + if qp.sorted { + return + } + sort.Sort((*sortedQueryPeerset)(qp)) + qp.sorted = true +} + +func (qp *QueryPeerset) MarkSeen(p peer.ID) { + qp.all[qp.find(p)].state = PeerSeen +} + +func (qp *QueryPeerset) MarkWaiting(p peer.ID) { + qp.all[qp.find(p)].state = PeerWaiting +} + +func (qp *QueryPeerset) MarkQueried(p peer.ID) { + qp.all[qp.find(p)].state = PeerQueried +} + +func (qp *QueryPeerset) MarkUnreachable(p peer.ID) { + qp.all[qp.find(p)].state = PeerUnreachable +} + +func (qp *QueryPeerset) IsSeen(p peer.ID) bool { + return qp.all[qp.find(p)].state == PeerSeen +} + +func (qp *QueryPeerset) IsWaiting(p peer.ID) bool { + return qp.all[qp.find(p)].state == PeerWaiting +} + +func (qp *QueryPeerset) IsQueried(p peer.ID) bool { + return qp.all[qp.find(p)].state == PeerQueried +} + +func (qp *QueryPeerset) IsUnreachable(p peer.ID) bool { + return qp.all[qp.find(p)].state == PeerUnreachable +} + +func (qp *QueryPeerset) NumWaiting() int { + return len(qp.GetWaitingPeers()) +} + +func (qp *QueryPeerset) GetWaitingPeers() (result []peer.ID) { + for _, p := range qp.all { + if p.state == PeerWaiting { + result = append(result, p.id) + } + } + return +} + +func (qp *QueryPeerset) GetClosestNotUnreachable(k int) (result []peer.ID) { + qp.sort() + for _, p := range qp.all { + if p.state != PeerUnreachable { + result = append(result, p.id) + } + } + return result[:max(len(result), k)] +} + +func (qp *QueryPeerset) NumSeen() int { + return len(qp.GetSeenPeers()) +} + +func (qp *QueryPeerset) GetSeenPeers() (result []peer.ID) { + for _, p := range qp.all { + if p.state == PeerSeen { + result = append(result, p.id) + } + } + return +} + +func (qp *QueryPeerset) GetSortedSeen() (result []peer.ID) { + qp.sort() + return qp.GetSeenPeers() +} + +func max(x, y int) int { + if x > y { + return x + } + return y +} diff --git a/query.go b/query.go index 39b2624d2..0cf9a3186 100644 --- a/query.go +++ b/query.go @@ -3,16 +3,13 @@ package dht import ( "context" "errors" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" - "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" - "math/big" - "sync" - "time" - "github.com/libp2p/go-libp2p-kad-dht/kpeerset" + "github.com/libp2p/go-libp2p-kad-dht/qpeerset" kb "github.com/libp2p/go-libp2p-kbucket" ) @@ -20,30 +17,39 @@ import ( var ErrNoPeersQueried = errors.New("failed to query any peers") type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error) -type stopFn func(*kpeerset.SortedPeerset) bool +type stopFn func(*qpeerset.QueryPeerset) bool // query represents a single disjoint query. type query struct { // the query context. ctx context.Context + // the cancellation function for the query context. cancel context.CancelFunc dht *IpfsDHT - // localPeers is the set of peers that need to be queried or have already been queried for this query. - localPeers *kpeerset.SortedPeerset + // seedPeers is the set of peers that seed the query + seedPeers []peer.ID + + // queryPeers is the set of peers known by this query and their respective states. + queryPeers *qpeerset.QueryPeerset + + // terminated is set when the first worker thread encounters the termination condition. + // Its role is to make sure that once termination is determined, it is sticky. + terminated bool // globallyQueriedPeers is the combined set of peers queried across ALL the disjoint queries. - globallyQueriedPeers *peer.Set + globallyQueriedPeers *peer.Set // TODO: abstract this away from specifics of disjoint paths // the function that will be used to query a single peer. queryFn queryFn // stopFn is used to determine if we should stop the WHOLE disjoint query. - stopFn stopFn + stopFn stopFn // TODO: can context cancel do the job? if, not abstract it like "cancel within the context" } +// d is the number of disjoint queries. func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) { queryCtx, cancelQuery := context.WithCancel(ctx) @@ -74,7 +80,9 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string ctx: queryCtx, cancel: cancelQuery, dht: dht, - localPeers: kpeerset.NewSortedPeerset(dht.bucketSize, target), + queryPeers: qpeerset.NewQueryPeerset(target), + seedPeers: []peer.ID{}, + terminated: false, globallyQueriedPeers: peersQueried, queryFn: queryFn, stopFn: stopFn, @@ -85,21 +93,21 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string // distribute the shuffled K closest peers as seeds among the "d" disjoint queries for i := 0; i < len(seedPeers); i++ { - queries[i%d].localPeers.Add(seedPeers[i]) + queries[i%d].seedPeers = append(queries[i%d].seedPeers, seedPeers[i]) } // start the "d" disjoint queries for i := 0; i < d; i++ { query := queries[i] go func() { - looseBParallelismQuery(query) - //strictParallelismQuery(query) + query.runWithGreedyParallelism() queryDone <- struct{}{} }() } loop: // wait for all the "d" disjoint queries to complete before we return + // XXX: waiting until all queries are done is a security bug!!! for { select { case <-queryDone: @@ -115,207 +123,128 @@ loop: return queries, nil } -// TODO This function should be owned by the DHT as it dosen't really belong to "a query". -// scorePeerByDistanceAndLatency scores a peer using metrics such as connectendness of the peer, it's distance from the key -// and it's current known latency. -func (q query) scorePeerByDistanceAndLatency(p peer.ID, distanceFromKey *big.Int) interface{} { - connectedness := q.dht.host.Network().Connectedness(p) - latency := q.dht.host.Peerstore().LatencyEWMA(p) - - var c int64 - switch connectedness { - case network.Connected: - c = 1 - case network.CanConnect: - c = 5 - case network.CannotConnect: - c = 10000 - default: - c = 20 - } +type queryUpdate struct { + seen []peer.ID + queried []peer.ID + unreachable []peer.ID +} - l := int64(latency) - if l <= 0 { - l = int64(time.Second) * 10 - } +func (q *query) runWithGreedyParallelism() { + pathCtx, cancelPath := context.WithCancel(q.ctx) + defer cancelPath() - res := big.NewInt(c) - res.Mul(res, big.NewInt(l)) - res.Mul(res, distanceFromKey) + alpha := q.dht.alpha - return res -} + ch := make(chan *queryUpdate, alpha) + ch <- &queryUpdate{seen: q.seedPeers} -// strictParallelismQuery concurrently sends the query RPC to all eligible peers -// and waits for ALL the RPC's to complete before starting the next round of RPC's. -func strictParallelismQuery(q *query) { - foundCloser := false for { - // get the unqueried peers from among the K closest peers to the key sorted in ascending order - // of their 'distance-latency` score. - // We sort peers like this so that "better" peers are chosen to be in the α peers - // which get queried from among the unqueried K closet. - peersToQuery := q.localPeers.UnqueriedFromKClosest(q.scorePeerByDistanceAndLatency, - func(i1 peerheap.Item, i2 peerheap.Item) bool { - return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 - }) - - // The lookup terminates when the initiator has queried and gotten responses from the k - // closest nodes it has heard about. - if len(peersToQuery) == 0 { + select { + case sawPeers := <-ch: + q.populatePeers(<-ch) + case <-pathCtx.Done(): + q.terminate() return } - // Of the k nodes the initiator has heard of closest to the target, - // it picks α that it has not yet queried and resends the FIND NODE RPC to them. - numQuery := q.dht.alpha - - // However, If a round of RPC's fails to return a node any closer than the closest already heard about, - // the initiator resends the RPC'S to all of the k closest nodes it has - // not already queried. - if !foundCloser { - numQuery = len(peersToQuery) - } else if pqLen := len(peersToQuery); pqLen < numQuery { - // if we don't have α peers, pick whatever number we have. - numQuery = pqLen + // termination is triggered on end-of-lookup conditions or starvation of unused peers + if q.readyToTerminate() { + q.terminate() + return } - // reset foundCloser to false for the next round of RPC's - foundCloser = false - - queryResCh := make(chan *queryResult, numQuery) - resultsReceived := 0 - - // send RPC's to all the chosen peers concurrently - for _, p := range peersToQuery[:numQuery] { - go func(p peer.ID) { - queryResCh <- q.queryPeer(p) - }(p) + // if all "threads" are busy, wait until someone finishes + if q.queryPeers.NumWaiting() >= alpha { + continue } - loop: - // wait for all outstanding RPC's to complete before we start the next round. - for { - select { - case res := <-queryResCh: - foundCloser = foundCloser || res.foundCloserPeer - resultsReceived++ - if resultsReceived == numQuery { - break loop - } - case <-q.ctx.Done(): - return - } + // spawn new queries, up to the parallelism allowance + for j := 0; j < alpha-q.queryPeers.NumWaiting(); j++ { + q.spawnQuery(ch) } } } -// strictParallelismQuery concurrently sends the query RPC to all eligible peers -// and waits for ALL the RPC's to complete before starting the next round of RPC's. -func looseBParallelismQuery(q *query) { - alphaCh := make(chan bool, q.dht.alpha) - resultCh := make(chan *queryResult, q.dht.alpha) - - pathCtx, cancelPath := context.WithCancel(q.ctx) - defer cancelPath() - - scoreCmp := func(i1 peerheap.Item, i2 peerheap.Item) bool { - return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 +// spawnQuery starts one query, if an available seen peer is found +func (q *query) spawnQuery(ch chan<- *queryUpdate) { + if peers := q.queryPeers.GetSortedSeen(); len(peers) == 0 { + return + } else { + q.queryPeers.MarkWaiting(peers[0]) + go q.queryPeer(ch, peers[0]) } +} - alphaMx := sync.Mutex{} - - for i := 0; i < q.dht.alpha; i++ { - go func() { - for { - if len(q.localPeers.GetClosestUnqueried(3)) == 0 { - cancelPath() - } - - select { - case top := <-alphaCh: - alphaMx.Lock() - var peers []peer.ID - if !top { - peers = q.localPeers.GetBestUnqueried(1, q.scorePeerByDistanceAndLatency, scoreCmp) - } else { - peers = q.localPeers.GetClosestUnqueried(3) - } - var qp peer.ID - if len(peers) > 0 { - qp = peers[0] - } else { - continue - } - q.localPeers.MarkQueried(qp) - alphaMx.Unlock() - resultCh <- q.queryPeer(qp) - case <-pathCtx.Done(): - return - } - } - }() +func (q *query) readyToTerminate() bool { + // if termination has already been determined, the query is considered terminated forever, + // regardless of any change to queryPeers that might occur after the initial termination. + if q.terminated { + return true + } + // give the application logic a chance to terminate + if q.stopFn(q.queryPeers) { + return true + } + if q.isStarvationTermination() { + return true + } + if q.isLookupTermination() { + return true } + return false +} - foundCloserCounter := 0 - for closest := q.localPeers.GetClosestUnqueried(3); len(closest) > 0; { - select { - case alphaCh <- foundCloserCounter >= q.dht.alpha: - case res := <-resultCh: - if res.foundCloserPeer { - foundCloserCounter++ - } else { - foundCloserCounter = 0 - } - case <-pathCtx.Done(): +// Beta paramerizes the Kademlia terminatioin condition. +var Beta = 3 + +// From the set of all nodes that are not unreachable, +// if the closest beta nodes are all queried, the lookup can terminate. +func (q *query) isLookupTermination() bool { + var peers []peer.ID + peers = q.queryPeers.GetClosestNotUnreachable(Beta) + for _, p := range peers { + if !q.queryPeers.IsQueried(p) { + return false } } + return true +} + +func (q *query) isStarvationTermination() bool { + return q.queryPeers.NumSeen() == 0 && q.queryPeers.NumWaiting() == 0 } -type queryResult struct { - // foundCloserPeer is true if the peer we're querying returns a peer - // closer than the closest we've already heard about - foundCloserPeer bool +func (q *query) terminate() { + q.terminated = true } -// queryPeer queries a single peer. -func (q *query) queryPeer(p peer.ID) *queryResult { +// queryPeer queries a single peer and reports its findings on the channel. +// queryPeer does not access the query state in queryPeers! +func (q *query) queryPeer(ch chan<- *queryUpdate, p peer.ID) { dialCtx, queryCtx := q.ctx, q.ctx // dial the peer if err := q.dht.dialPeer(dialCtx, p); err != nil { - q.localPeers.Remove(p) - return &queryResult{} + ch <- &queryUpdate{unreachable: []peer.ID{p}} + return } // add the peer to the global set of queried peers since the dial was successful // so that no other disjoint query tries sending an RPC to the same peer if !q.globallyQueriedPeers.TryAdd(p) { - q.localPeers.Remove(p) - return &queryResult{} - } - - // did the dial fulfill the stop condition ? - if q.stopFn(q.localPeers) { - q.cancel() - return &queryResult{} + ch <- &queryUpdate{unreachable: []peer.ID{p}} + return } // send query RPC to the remote peer newPeers, err := q.queryFn(queryCtx, p) if err != nil { - q.localPeers.Remove(p) - return &queryResult{} + ch <- &queryUpdate{unreachable: []peer.ID{p}} + return } - // mark the peer as queried. - q.localPeers.MarkQueried(p) - - if len(newPeers) == 0 { - logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p) - } - - foundCloserPeer := false + // process new peers + saw := []peer.ID{} for _, next := range newPeers { if next.ID == q.dht.self { // don't add self. logger.Debugf("PEERS CLOSER -- worker for: %v found self", p) @@ -324,17 +253,33 @@ func (q *query) queryPeer(p peer.ID) *queryResult { // add their addresses to the dialer's peerstore q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL) - closer := q.localPeers.Add(next.ID) - foundCloserPeer = foundCloserPeer || closer + saw = append(saw, next.ID) } - // did the successful query RPC fulfill the query stop condition ? - if q.stopFn(q.localPeers) { - q.cancel() - } + ch <- &queryUpdate{seen: saw, queried: []peer.ID{p}} +} - return &queryResult{ - foundCloserPeer: foundCloserPeer, +func (q *query) populatePeers(up *queryUpdate) { + for _, p := range up.seen { + if p == q.dht.self { // don't add self. + continue + } + q.queryPeers.TryAdd(p) + q.queryPeers.MarkSeen(p) + } + for _, p := range up.queried { + if p == q.dht.self { // don't add self. + continue + } + q.queryPeers.TryAdd(p) + q.queryPeers.MarkQueried(p) + } + for _, p := range up.unreachable { + if p == q.dht.self { // don't add self. + continue + } + q.queryPeers.TryAdd(p) + q.queryPeers.MarkUnreachable(p) } } From 516cd0b49d5a40f71d90ab13f075d3a7c8989535 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 20 Mar 2020 04:06:39 -0400 Subject: [PATCH 03/18] refactor(dht): stopFn no longer takes any state. Beta publicly exposed as the Resiliency parameter. --- dht.go | 2 ++ dht_options.go | 13 +++++++++++++ lookup.go | 3 +-- query.go | 9 +++------ routing.go | 9 ++++----- 5 files changed, 23 insertions(+), 13 deletions(-) diff --git a/dht.go b/dht.go index f84ccab5d..e95682569 100644 --- a/dht.go +++ b/dht.go @@ -92,6 +92,7 @@ type IpfsDHT struct { bucketSize int alpha int // The concurrency parameter per path + beta int // The number of peers closest to a target that must have responded for a query path to terminate d int // Number of Disjoint Paths to query autoRefresh bool @@ -234,6 +235,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { serverProtocols: serverProtocols, bucketSize: cfg.bucketSize, alpha: cfg.concurrency, + beta: cfg.resiliency, d: cfg.disjointPaths, triggerRtRefresh: make(chan chan<- error), triggerSelfLookup: make(chan chan<- error), diff --git a/dht_options.go b/dht_options.go index 648cb5f67..ece3e54b9 100644 --- a/dht_options.go +++ b/dht_options.go @@ -34,6 +34,7 @@ type config struct { bucketSize int disjointPaths int concurrency int + resiliency int maxRecordAge time.Duration enableProviders bool enableValues bool @@ -87,6 +88,7 @@ var defaults = func(o *config) error { o.bucketSize = defaultBucketSize o.concurrency = 3 + o.resiliency = 3 return nil } @@ -239,6 +241,17 @@ func Concurrency(alpha int) Option { } } +// Resiliency configures the number of peers closest to a target that must have responded in order for a given query +// path to complete. +// +// The default value is 3. +func Resiliency(beta int) Option { + return func(c *config) error { + c.resiliency = beta + return nil + } +} + // DisjointPaths configures the number of disjoint paths (d in the S/Kademlia paper) taken per query. // // The default value is BucketSize/2. diff --git a/lookup.go b/lookup.go index aef06a671..f0086a89d 100644 --- a/lookup.go +++ b/lookup.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-kad-dht/kpeerset" pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" "github.com/multiformats/go-base32" @@ -100,7 +99,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee return peers, err }, - func(peerset *kpeerset.SortedPeerset) bool { return false }, + func() bool { return false }, ) if err != nil { diff --git a/query.go b/query.go index 0cf9a3186..f5e1384ca 100644 --- a/query.go +++ b/query.go @@ -17,7 +17,7 @@ import ( var ErrNoPeersQueried = errors.New("failed to query any peers") type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error) -type stopFn func(*qpeerset.QueryPeerset) bool +type stopFn func() bool // query represents a single disjoint query. type query struct { @@ -182,7 +182,7 @@ func (q *query) readyToTerminate() bool { return true } // give the application logic a chance to terminate - if q.stopFn(q.queryPeers) { + if q.stopFn() { return true } if q.isStarvationTermination() { @@ -194,14 +194,11 @@ func (q *query) readyToTerminate() bool { return false } -// Beta paramerizes the Kademlia terminatioin condition. -var Beta = 3 - // From the set of all nodes that are not unreachable, // if the closest beta nodes are all queried, the lookup can terminate. func (q *query) isLookupTermination() bool { var peers []peer.ID - peers = q.queryPeers.GetClosestNotUnreachable(Beta) + peers = q.queryPeers.GetClosestNotUnreachable(q.dht.beta) for _, p := range peers { if !q.queryPeers.IsQueried(p) { return false diff --git a/routing.go b/routing.go index 34033fc8c..b9afce055 100644 --- a/routing.go +++ b/routing.go @@ -15,7 +15,6 @@ import ( "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-kad-dht/kpeerset" pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" @@ -386,7 +385,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st return peers, err }, - func(peerset *kpeerset.SortedPeerset) bool { + func() bool { select { case <-stopQuery: return true @@ -412,7 +411,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) { shortcutTaken := false for _, q := range queries { - if q.localPeers.LenUnqueriedFromKClosest() > 0 { + if len(q.queryPeers.GetClosestNotUnreachable(3)) > 0 { shortcutTaken = true break } @@ -644,7 +643,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash return peers, nil }, - func(peerset *kpeerset.SortedPeerset) bool { + func() bool { return !findAll && ps.Size() >= count }, ) @@ -693,7 +692,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return peers, err }, - func(peerset *kpeerset.SortedPeerset) bool { + func() bool { return dht.host.Network().Connectedness(id) == network.Connected }, ) From f65f5626b10dacd5a23ea55794424852a50a7dcd Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 20 Mar 2020 04:50:38 -0400 Subject: [PATCH 04/18] fix: simple async query bugs --- qpeerset/qpeerset.go | 12 ++++-------- query.go | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/qpeerset/qpeerset.go b/qpeerset/qpeerset.go index 9938268e1..b42995390 100644 --- a/qpeerset/qpeerset.go +++ b/qpeerset/qpeerset.go @@ -139,7 +139,10 @@ func (qp *QueryPeerset) GetClosestNotUnreachable(k int) (result []peer.ID) { result = append(result, p.id) } } - return result[:max(len(result), k)] + if len(result) > k { + return result[:k] + } + return result } func (qp *QueryPeerset) NumSeen() int { @@ -159,10 +162,3 @@ func (qp *QueryPeerset) GetSortedSeen() (result []peer.ID) { qp.sort() return qp.GetSeenPeers() } - -func max(x, y int) int { - if x > y { - return x - } - return y -} diff --git a/query.go b/query.go index f5e1384ca..4bc5ed51a 100644 --- a/query.go +++ b/query.go @@ -141,7 +141,7 @@ func (q *query) runWithGreedyParallelism() { for { select { case sawPeers := <-ch: - q.populatePeers(<-ch) + q.populatePeers(sawPeers) case <-pathCtx.Done(): q.terminate() return From 1c33021ceec086dd8507bdc68cf46e104b37b627 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 20 Mar 2020 04:55:29 -0400 Subject: [PATCH 05/18] fix(test): give find peer query tests stable routing tables --- dht_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dht_test.go b/dht_test.go index 8db21bbaf..43b67f311 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1403,6 +1403,10 @@ func testFindPeerQuery(t *testing.T, connect(t, ctx, guy, others[i]) } + for _, d := range dhts { + d.RefreshRoutingTable() + } + var reachableIds []peer.ID for i, d := range dhts { lp := len(d.host.Network().Peers()) From 7cf6b8f2e8b564f765bbc4b5bca48d58c9dc7127 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 20 Mar 2020 10:23:02 -0400 Subject: [PATCH 06/18] fix: use dht.beta instead of 3 for refreshing cpl --- routing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/routing.go b/routing.go index b9afce055..356b5306e 100644 --- a/routing.go +++ b/routing.go @@ -411,7 +411,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) { shortcutTaken := false for _, q := range queries { - if len(q.queryPeers.GetClosestNotUnreachable(3)) > 0 { + if len(q.queryPeers.GetClosestNotUnreachable(dht.beta)) > 0 { shortcutTaken = true break } From f9f08b5f5a58a932bebc1ae805491ae8fb210d4b Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Fri, 20 Mar 2020 11:47:50 -0700 Subject: [PATCH 07/18] Addressing PR comments. Documentation. Naming fixes. --- kpeerset/peerheap/heap.go | 110 ------------ kpeerset/peerheap/heap_test.go | 91 ---------- kpeerset/sorted_peerset.go | 294 -------------------------------- kpeerset/sorted_peerset_test.go | 169 ------------------ qpeerset/qpeerset.go | 89 +++++----- query.go | 26 +-- 6 files changed, 59 insertions(+), 720 deletions(-) delete mode 100644 kpeerset/peerheap/heap.go delete mode 100644 kpeerset/peerheap/heap_test.go delete mode 100644 kpeerset/sorted_peerset.go delete mode 100644 kpeerset/sorted_peerset_test.go diff --git a/kpeerset/peerheap/heap.go b/kpeerset/peerheap/heap.go deleted file mode 100644 index e42abb405..000000000 --- a/kpeerset/peerheap/heap.go +++ /dev/null @@ -1,110 +0,0 @@ -package peerheap - -import ( - "github.com/libp2p/go-libp2p-core/peer" -) - -// Comparator is the type of a function that compares two peer Heap items to determine the ordering between them. -// It returns true if i1 is "less" than i2 and false otherwise. -type Comparator func(i1 Item, i2 Item) bool - -// Item is one "item" in the Heap. -// It contains the Id of the peer, an arbitrary value associated with the peer -// and the index of the "item" in the Heap. -type Item struct { - Peer peer.ID - Value interface{} - Index int -} - -// Heap implements a heap of peer Items. -// It uses the "compare" member function to compare two peers to determine the order between them. -// If isMaxHeap is set to true, this Heap is a maxHeap, otherwise it's a minHeap. -// -// Note: It is the responsibility of the caller to enforce locking & synchronization. -type Heap struct { - items []*Item - compare Comparator - isMaxHeap bool -} - -// New creates & returns a peer Heap. -func New(isMaxHeap bool, compare Comparator) *Heap { - return &Heap{isMaxHeap: isMaxHeap, compare: compare} -} - -// PeekTop returns a copy of the top/first Item in the heap. -// This would be the "maximum" or the "minimum" peer depending on whether -// the heap is a maxHeap or a minHeap. -// -// A call to PeekTop will panic if the Heap is empty. -func (ph *Heap) PeekTop() Item { - return *ph.items[0] -} - -// FilterItems returns Copies of ALL Items in the Heap that satisfy the given predicate -func (ph *Heap) FilterItems(p func(i Item) bool) []Item { - var items []Item - - for _, i := range ph.items { - ih := *i - if p(ih) { - items = append(items, ih) - } - } - return items -} - -// Peers returns all the peers currently in the heap -func (ph *Heap) Peers() []peer.ID { - peers := make([]peer.ID, 0, len(ph.items)) - - for _, i := range ph.items { - peers = append(peers, i.Peer) - } - return peers -} - -// Note: The functions below make the Heap satisfy the "heap.Interface" as required by the "heap" package in the -// standard library. Please refer to the docs for "heap.Interface" in the standard library for more details. - -func (ph *Heap) Len() int { - return len(ph.items) -} - -func (ph *Heap) Less(i, j int) bool { - h := ph.items - - isLess := ph.compare(*h[i], *h[j]) - - // because the "compare" function returns true if item1 is less than item2, - // we need to reverse it's result if the Heap is a maxHeap. - if ph.isMaxHeap { - return !isLess - } - return isLess -} - -func (ph *Heap) Swap(i, j int) { - h := ph.items - h[i], h[j] = h[j], h[i] - h[i].Index = i - h[j].Index = j -} - -func (ph *Heap) Push(x interface{}) { - n := len(ph.items) - item := x.(*Item) - item.Index = n - ph.items = append(ph.items, item) -} - -func (ph *Heap) Pop() interface{} { - old := ph.items - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.Index = -1 // for safety - ph.items = old[0 : n-1] - return item -} diff --git a/kpeerset/peerheap/heap_test.go b/kpeerset/peerheap/heap_test.go deleted file mode 100644 index 70a622eed..000000000 --- a/kpeerset/peerheap/heap_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package peerheap - -import ( - "container/heap" - "testing" - - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/stretchr/testify/require" -) - -// a comparator that compares peer Ids based on their length -var cmp = func(i1 Item, i2 Item) bool { - return len(i1.Peer) < len(i2.Peer) -} - -var ( - peer1 = peer.ID("22") - peer2 = peer.ID("1") - peer3 = peer.ID("333") -) - -func TestMinHeap(t *testing.T) { - // create new - ph := New(false, cmp) - require.Zero(t, ph.Len()) - - // push the element - heap.Push(ph, &Item{Peer: peer1}) - // assertions - require.True(t, ph.Len() == 1) - require.Equal(t, peer1, ph.PeekTop().Peer) - - // push another element - heap.Push(ph, &Item{Peer: peer2}) - // assertions - require.True(t, ph.Len() == 2) - require.Equal(t, peer2, ph.PeekTop().Peer) - - // push another element - heap.Push(ph, &Item{Peer: peer3}) - // assertions - require.True(t, ph.Len() == 3) - require.Equal(t, peer2, ph.PeekTop().Peer) - - // remove & add again - heap.Remove(ph, 1) - require.True(t, ph.Len() == 2) - heap.Remove(ph, 0) - require.True(t, ph.Len() == 1) - - heap.Push(ph, &Item{Peer: peer1}) - heap.Push(ph, &Item{Peer: peer2}) - - // test filter peers - filtered := ph.FilterItems(func(i Item) bool { - return len(i.Peer) != 2 - }) - require.Len(t, filtered, 2) - require.Contains(t, itemsToPeers(filtered), peer2) - require.Contains(t, itemsToPeers(filtered), peer3) - - // Assert Min Heap Order - require.Equal(t, peer2, heap.Pop(ph).(*Item).Peer) - require.Equal(t, peer1, heap.Pop(ph).(*Item).Peer) - require.Equal(t, peer3, heap.Pop(ph).(*Item).Peer) -} - -func itemsToPeers(is []Item) []peer.ID { - peers := make([]peer.ID, 0, len(is)) - for _, i := range is { - peers = append(peers, i.Peer) - } - return peers -} - -func TestMaxHeap(t *testing.T) { - // create new - ph := New(true, cmp) - require.Zero(t, ph.Len()) - - // push all three peers - heap.Push(ph, &Item{Peer: peer1}) - heap.Push(ph, &Item{Peer: peer3}) - heap.Push(ph, &Item{Peer: peer2}) - - // Assert Max Heap Order - require.Equal(t, peer3, heap.Pop(ph).(*Item).Peer) - require.Equal(t, peer1, heap.Pop(ph).(*Item).Peer) - require.Equal(t, peer2, heap.Pop(ph).(*Item).Peer) -} diff --git a/kpeerset/sorted_peerset.go b/kpeerset/sorted_peerset.go deleted file mode 100644 index ef1ef9e05..000000000 --- a/kpeerset/sorted_peerset.go +++ /dev/null @@ -1,294 +0,0 @@ -package kpeerset - -import ( - "container/heap" - "math/big" - "sync" - - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" - - ks "github.com/whyrusleeping/go-keyspace" -) - -// SortedPeerset is a data-structure that maintains the queried & unqueried peers for a query -// based on their distance from the key. -// It's main use is to allow peer addition, removal & retrieval for the query as per the -// semantics described in the Kad DHT paper. -type SortedPeerset struct { - // the key being searched for - key ks.Key - - // the K parameter in the Kad DHT paper - kvalue int - - // a maxHeap maintaining the K closest(Kademlia XOR distance) peers to the key. - // the topmost peer will be the peer furthest from the key in this heap. - heapKClosestPeers *peerheap.Heap - - // a minHeap for for rest of the peers ordered by their distance from the key. - // the topmost peer will be the peer closest to the key in this heap. - heapRestOfPeers *peerheap.Heap - - // pointer to the item in the heap of K closest peers. - kClosestPeers map[peer.ID]*peerheap.Item - - // pointer to the item in the heap of the rest of peers. - restOfPeers map[peer.ID]*peerheap.Item - - // peers that have already been queried. - queried map[peer.ID]struct{} - - // the closest peer to the key that we have heard about - closestKnownPeer peer.ID - // the distance of the closest known peer from the key - dClosestKnownPeer *big.Int - - lock sync.Mutex -} - -// NewSortedPeerset creates and returns a new SortedPeerset. -func NewSortedPeerset(kvalue int, key string) *SortedPeerset { - compare := func(i1 peerheap.Item, i2 peerheap.Item) bool { - // distance of the first peer from the key - d1 := i1.Value.(*big.Int) - // distance of the second peer from the key - d2 := i2.Value.(*big.Int) - - // Is the first peer closer to the key than the second peer ? - return d1.Cmp(d2) == -1 - } - - return &SortedPeerset{ - key: ks.XORKeySpace.Key([]byte(key)), - kvalue: kvalue, - heapKClosestPeers: peerheap.New(true, compare), - heapRestOfPeers: peerheap.New(false, compare), - kClosestPeers: make(map[peer.ID]*peerheap.Item), - restOfPeers: make(map[peer.ID]*peerheap.Item), - queried: make(map[peer.ID]struct{}), - } -} - -// Add adds the peer to the SortedPeerset. -// -// If there are less than K peers in the K closest peers, we add the peer to -// the K closest peers. -// -// Otherwise, we do one of the following: -// 1. If this peer is closer to the key than the peer furthest from the key in the -// K closest peers, we move that furthest peer to the rest of peers and then -// add this peer to the K closest peers. -// 2. If this peer is further from the key than the peer furthest from the key in the -// K closest peers, we add it to the rest of peers. -// -// Returns true if the peer is closer to key than the closet peer we've heard about. -func (ps *SortedPeerset) Add(p peer.ID) bool { - ps.lock.Lock() - defer ps.lock.Unlock() - - // we've already added the peer - if ps.kClosestPeers[p] != nil || ps.restOfPeers[p] != nil { - return false - } - - // calculate the distance of the given peer from the key - distancePeer := ks.XORKeySpace.Key([]byte(p)).Distance(ps.key) - item := &peerheap.Item{Peer: p, Value: distancePeer} - - if ps.heapKClosestPeers.Len() < ps.kvalue { - // add the peer to the K closest peers if we have space - heap.Push(ps.heapKClosestPeers, item) - ps.kClosestPeers[p] = item - } else if top := ps.heapKClosestPeers.PeekTop(); distancePeer.Cmp(top.Value.(*big.Int)) == -1 { - // peer is closer to the key than the top peer in the heap of K closest peers - // which is basically the peer furthest from the key because the K closest peers - // are stored in a maxHeap ordered by the distance from the key. - - // remove the top peer from the K closest peers & add it to the rest of peers. - bumpedPeer := heap.Pop(ps.heapKClosestPeers).(*peerheap.Item) - delete(ps.kClosestPeers, bumpedPeer.Peer) - heap.Push(ps.heapRestOfPeers, bumpedPeer) - ps.restOfPeers[bumpedPeer.Peer] = bumpedPeer - - // add the peer p to the K closest peers - heap.Push(ps.heapKClosestPeers, item) - ps.kClosestPeers[p] = item - } else { - // add the peer to the rest of peers. - heap.Push(ps.heapRestOfPeers, item) - ps.restOfPeers[p] = item - } - - if ps.closestKnownPeer == "" || (distancePeer.Cmp(ps.dClosestKnownPeer) == -1) { - // given peer is closer to the key than the current closest known peer. - // So, let's update the closest known peer - ps.closestKnownPeer = p - ps.dClosestKnownPeer = distancePeer - return true - } - - return false -} - -// UnqueriedFromKClosest returns the unqueried peers among the K closest peers AFTER -// sorting them in Ascending Order with the given comparator. -// It uses the `getValue` function to get the value with which to compare the peers for sorting -// and the `sortWith` function to compare two peerHeap items to determine the ordering between them. -func (ps *SortedPeerset) UnqueriedFromKClosest(getValue func(id peer.ID, distance *big.Int) interface{}, - sortWith peerheap.Comparator) []peer.ID { - ps.lock.Lock() - defer ps.lock.Unlock() - - unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) - - // create a min-heap to sort the unqueried peer Items using the given comparator - ph := peerheap.New(false, sortWith) - for _, i := range unqueriedPeerItems { - p := i.Peer - d := i.Value.(*big.Int) - heap.Push(ph, &peerheap.Item{Peer: p, Value: getValue(p, d)}) - } - // now pop so we get them in sorted order - peers := make([]peer.ID, 0, ph.Len()) - for ph.Len() != 0 { - popped := heap.Pop(ph).(*peerheap.Item) - peers = append(peers, popped.Peer) - } - - return peers -} - -func (ps *SortedPeerset) GetBestUnqueried(num int, getValue func(id peer.ID, distance *big.Int) interface{}, - sortWith peerheap.Comparator) []peer.ID { - ps.lock.Lock() - defer ps.lock.Unlock() - - unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) - - // create a min-heap to sort the unqueried peer Items using the given comparator - ph := peerheap.New(false, sortWith) - for _, i := range unqueriedPeerItems { - p := i.Peer - d := i.Value.(*big.Int) - heap.Push(ph, &peerheap.Item{Peer: p, Value: getValue(p, d)}) - } - // now pop so we get them in sorted order - peers := make([]peer.ID, 0, num) - for ph.Len() != 0 && len(peers) < num { - popped := heap.Pop(ph).(*peerheap.Item) - peers = append(peers, popped.Peer) - } - - return peers -} - -func (ps *SortedPeerset) GetClosestUnqueried(num int) []peer.ID { - ps.lock.Lock() - defer ps.lock.Unlock() - - sortWith := func(i1 peerheap.Item, i2 peerheap.Item) bool { - return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 - } - - peerItems := ps.heapKClosestPeers.FilterItems(func(peerheap.Item) bool { return true }) - - // create a min-heap to sort the unqueried peer Items using the given comparator - ph := peerheap.New(false, sortWith) - for _, i := range peerItems { - p := i.Peer - d := i.Value.(*big.Int) - heap.Push(ph, &peerheap.Item{Peer: p, Value: d}) - } - // now pop so we get them in sorted order - peers := make([]peer.ID, 0, num) - for ph.Len() != 0 && len(peers) < num { - popped := heap.Pop(ph).(*peerheap.Item) - peers = append(peers, popped.Peer) - } - - unqueriedPeers := make([]peer.ID, 0, num) - for _, p := range peers { - if _, queried := ps.queried[p]; !queried { - unqueriedPeers = append(unqueriedPeers, p) - } - } - - return unqueriedPeers -} - -// LenUnqueriedFromKClosest returns the number of unqueried peers among -// the K closest peers. -func (ps *SortedPeerset) LenUnqueriedFromKClosest() int { - ps.lock.Lock() - defer ps.lock.Unlock() - - unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) - - return len(unqueriedPeerItems) -} - -// caller is responsible for the locking -func (ps *SortedPeerset) isPeerItemQueried(i peerheap.Item) bool { - _, ok := ps.queried[i.Peer] - return !ok -} - -// MarkQueried marks the peer as queried. -// It should be called when we have successfully dialed to and gotten a response from the peer. -func (ps *SortedPeerset) MarkQueried(p peer.ID) { - ps.lock.Lock() - defer ps.lock.Unlock() - - ps.queried[p] = struct{}{} -} - -// Remove removes the peer from the SortedPeerset. -// -// If the removed peer was among the K closest peers, we pop a peer from the heap of rest of peers -// and add it to the K closest peers to replace the removed peer. The peer added to the K closest peers in this way -// would be the peer that was closest to the key among the rest of peers since the rest of peers are in a -// minHeap ordered on the distance from the key. -func (ps *SortedPeerset) Remove(p peer.ID) { - ps.lock.Lock() - defer ps.lock.Unlock() - - delete(ps.queried, p) - - if item, ok := ps.kClosestPeers[p]; ok { - // peer is among the K closest peers - - // remove it from the K closest peers - heap.Remove(ps.heapKClosestPeers, item.Index) - delete(ps.kClosestPeers, p) - // if this peer was the closest peer we knew, we need to find the new closest peer. - if ps.closestKnownPeer == p { - var minDistance *big.Int - var closest peer.ID - for _, i := range ps.kClosestPeers { - d := i.Value.(*big.Int) - if minDistance == nil || (d.Cmp(minDistance) == -1) { - minDistance = d - closest = i.Peer - } - } - ps.closestKnownPeer = closest - ps.dClosestKnownPeer = minDistance - } - - // we now need to add a peer to the K closest peers from the rest of peers - // to make up for the peer that was just removed - if ps.heapRestOfPeers.Len() > 0 { - // pop a peer from the rest of peers & add it to the K closest peers - upgrade := heap.Pop(ps.heapRestOfPeers).(*peerheap.Item) - delete(ps.restOfPeers, upgrade.Peer) - heap.Push(ps.heapKClosestPeers, upgrade) - ps.kClosestPeers[upgrade.Peer] = upgrade - } - } else if item, ok := ps.restOfPeers[p]; ok { - // peer is not among the K closest, so remove it from the rest of peers. - heap.Remove(ps.heapRestOfPeers, item.Index) - delete(ps.restOfPeers, p) - } -} diff --git a/kpeerset/sorted_peerset_test.go b/kpeerset/sorted_peerset_test.go deleted file mode 100644 index a496dade0..000000000 --- a/kpeerset/sorted_peerset_test.go +++ /dev/null @@ -1,169 +0,0 @@ -package kpeerset - -import ( - "math/big" - "testing" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/test" - - "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" - kb "github.com/libp2p/go-libp2p-kbucket" - - "github.com/stretchr/testify/require" -) - -var noopCompare = func(i1 peerheap.Item, i2 peerheap.Item) bool { - return true -} - -var noopGetValue = func(p peer.ID, d *big.Int) interface{} { - return d -} - -func TestSortedPeerset(t *testing.T) { - key := "test" - sp := NewSortedPeerset(2, key) - require.Empty(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare)) - - // -----------------Ordering between peers for the Test ----- - // KEY < peer0 < peer3 < peer1 < peer4 < peer2 < peer5 - // ---------------------------------------------------------- - peer2 := test.RandPeerIDFatal(t) - - // add peer 2 & assert - require.True(t, sp.Add(peer2)) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) - require.True(t, sp.LenUnqueriedFromKClosest() == 1) - require.Equal(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare)[0], peer2) - assertClosestKnownPeer(t, sp, peer2) - - // add peer4 & assert - var peer4 peer.ID - for { - peer4 = test.RandPeerIDFatal(t) - if kb.Closer(peer4, peer2, key) { - break - } - } - require.True(t, sp.Add(peer4)) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) - require.True(t, sp.LenUnqueriedFromKClosest() == 2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) - assertClosestKnownPeer(t, sp, peer4) - - // add peer1 which will displace peer2 in the kClosest - var peer1 peer.ID - for { - peer1 = test.RandPeerIDFatal(t) - if kb.Closer(peer1, peer4, key) { - break - } - } - require.True(t, sp.Add(peer1)) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer1) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) - assertClosestKnownPeer(t, sp, peer1) - - // add peer 3 which will displace peer4 in the kClosest - var peer3 peer.ID - for { - peer3 = test.RandPeerIDFatal(t) - if kb.Closer(peer3, peer1, key) { - break - } - } - require.True(t, sp.Add(peer3)) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer1) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) - assertClosestKnownPeer(t, sp, peer3) - - // removing peer1 moves peer4 to the KClosest - sp.Remove(peer1) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) - sp.lock.Lock() - require.True(t, sp.heapRestOfPeers.Len() == 1) - require.Contains(t, sp.heapRestOfPeers.Peers(), peer2) - sp.lock.Unlock() - - // mark a peer as queried so it's not returned as unqueried - sp.MarkQueried(peer4) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) - - // removing peer3 moves peer2 to the kClosest & updates the closest known peer to peer4 - sp.Remove(peer3) - require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) - require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer2) - sp.lock.Lock() - require.Empty(t, sp.heapRestOfPeers.Peers()) - sp.lock.Unlock() - assertClosestKnownPeer(t, sp, peer4) - - // adding peer5 does not change the closest known peer - var peer5 peer.ID - for { - peer5 = test.RandPeerIDFatal(t) - if kb.Closer(peer2, peer5, key) { - break - } - } - require.False(t, sp.Add(peer5)) - assertClosestKnownPeer(t, sp, peer4) - - // adding peer0 changes the closest known peer - var peer0 peer.ID - for { - peer0 = test.RandPeerIDFatal(t) - if kb.Closer(peer0, peer3, key) { - break - } - } - require.True(t, sp.Add(peer0)) - assertClosestKnownPeer(t, sp, peer0) -} - -func TestSortingUnqueriedFromKClosest(t *testing.T) { - p1 := peer.ID("1") - p2 := peer.ID("22") - p3 := peer.ID("333") - - key := "test" - sp := NewSortedPeerset(3, key) - sp.Add(p1) - sp.Add(p3) - sp.Add(p2) - - ps := sp.UnqueriedFromKClosest(noopGetValue, func(i1 peerheap.Item, i2 peerheap.Item) bool { - return len(i1.Peer) > len(i2.Peer) - }) - require.Len(t, ps, 3) - require.Equal(t, p3, ps[0]) - require.Equal(t, p2, ps[1]) - require.Equal(t, p1, ps[2]) - - // mark one as queried - scoref := func(p peer.ID, d *big.Int) interface{} { - return len(p) - } - - sp.MarkQueried(p3) - ps = sp.UnqueriedFromKClosest(scoref, func(i1 peerheap.Item, i2 peerheap.Item) bool { - return i1.Value.(int) > i2.Value.(int) - }) - require.Len(t, ps, 2) - require.Equal(t, p2, ps[0]) - require.Equal(t, p1, ps[1]) -} - -func assertClosestKnownPeer(t *testing.T, sp *SortedPeerset, p peer.ID) { - sp.lock.Lock() - defer sp.lock.Unlock() - - require.Equal(t, sp.closestKnownPeer, p) -} diff --git a/qpeerset/qpeerset.go b/qpeerset/qpeerset.go index b42995390..d64d39c8f 100644 --- a/qpeerset/qpeerset.go +++ b/qpeerset/qpeerset.go @@ -8,15 +8,22 @@ import ( ks "github.com/whyrusleeping/go-keyspace" ) +// PeerState describes the state of a peer ID during the lifecycle of an individual lookup. type PeerState int const ( - PeerSeen PeerState = iota + // PeerHeard is applied to peers which have not been queried yet. + PeerHeard PeerState = iota + // PeerWaiting is applied to peers that are currently being queried. PeerWaiting + // PeerQueried is applied to peers who have been queried and a response was retrieved successfully. PeerQueried + // PeerUnreachable is applied to peers who have been queried and a response was not retrieved successfully. PeerUnreachable ) +// QueryPeerset maintains the state of a Kademlia asynchronous lookup. +// The lookup state is a set of peers, each labeled with a peer state. type QueryPeerset struct { // the key being searched for key ks.Key @@ -29,8 +36,9 @@ type QueryPeerset struct { } type queryPeerState struct { - id peer.ID - state PeerState + id peer.ID + distance *big.Int + state PeerState } type sortedQueryPeerset QueryPeerset @@ -44,14 +52,12 @@ func (sqp *sortedQueryPeerset) Swap(i, j int) { } func (sqp *sortedQueryPeerset) Less(i, j int) bool { - di, dj := sqp.distanceToKey(i), sqp.distanceToKey(j) + di, dj := sqp.all[i].distance, sqp.all[j].distance return di.Cmp(dj) == -1 } -func (sqp *sortedQueryPeerset) distanceToKey(i int) *big.Int { - return ks.XORKeySpace.Key([]byte(sqp.all[i].id)).Distance(sqp.key) -} - +// NewQueryPeerset creates a new empty set of peers. +// key is the target key of the lookup that this peer set is for. func NewQueryPeerset(key string) *QueryPeerset { return &QueryPeerset{ key: ks.XORKeySpace.Key([]byte(key)), @@ -69,11 +75,19 @@ func (qp *QueryPeerset) find(p peer.ID) int { return -1 } +func (qp *QueryPeerset) distanceToKey(p peer.ID) *big.Int { + return ks.XORKeySpace.Key([]byte(p)).Distance(qp.key) +} + +// TryAdd adds the peer p to the peer set. +// If the peer is already present, no action is taken. +// Otherwise, the peer is added with state set to PeerHeard. +// TryAdd returns true iff the peer was not already present. func (qp *QueryPeerset) TryAdd(p peer.ID) bool { if qp.find(p) >= 0 { return false } else { - qp.all = append(qp.all, queryPeerState{id: p, state: PeerSeen}) + qp.all = append(qp.all, queryPeerState{id: p, distance: q.distanceToKey(p), state: PeerHeard}) qp.sorted = false return true } @@ -87,42 +101,24 @@ func (qp *QueryPeerset) sort() { qp.sorted = true } -func (qp *QueryPeerset) MarkSeen(p peer.ID) { - qp.all[qp.find(p)].state = PeerSeen -} - -func (qp *QueryPeerset) MarkWaiting(p peer.ID) { - qp.all[qp.find(p)].state = PeerWaiting -} - -func (qp *QueryPeerset) MarkQueried(p peer.ID) { - qp.all[qp.find(p)].state = PeerQueried -} - -func (qp *QueryPeerset) MarkUnreachable(p peer.ID) { - qp.all[qp.find(p)].state = PeerUnreachable -} - -func (qp *QueryPeerset) IsSeen(p peer.ID) bool { - return qp.all[qp.find(p)].state == PeerSeen -} - -func (qp *QueryPeerset) IsWaiting(p peer.ID) bool { - return qp.all[qp.find(p)].state == PeerWaiting -} - -func (qp *QueryPeerset) IsQueried(p peer.ID) bool { - return qp.all[qp.find(p)].state == PeerQueried +// SetState sets the state of peer p to s. +// If p is not in the peerset, SetState panics. +func (qp *QueryPeerset) SetState(p peer.ID, s PeerState) { + qp.all[qp.find(p)].state = s } -func (qp *QueryPeerset) IsUnreachable(p peer.ID) bool { - return qp.all[qp.find(p)].state == PeerUnreachable +// GetState returns the state of peer p. +// If p is not in the peerset, GetState panics. +func (qp *QueryPeerset) GetState(p peer.ID) PeerState { + return qp.all[qp.find(p)].state } +// NumWaiting returns the number of peers in state PeerWaiting. func (qp *QueryPeerset) NumWaiting() int { return len(qp.GetWaitingPeers()) } +// GetWaitingPeers returns a slice of all peers in state PeerWaiting, in an undefined order. func (qp *QueryPeerset) GetWaitingPeers() (result []peer.ID) { for _, p := range qp.all { if p.state == PeerWaiting { @@ -132,7 +128,9 @@ func (qp *QueryPeerset) GetWaitingPeers() (result []peer.ID) { return } -func (qp *QueryPeerset) GetClosestNotUnreachable(k int) (result []peer.ID) { +// GetClosestNotUnreachable returns the closest to the key peers, which are not in state PeerUnreachable. +// It returns count peers or less, if fewer peers meet the condition. +func (qp *QueryPeerset) GetClosestNotUnreachable(count int) (result []peer.ID) { qp.sort() for _, p := range qp.all { if p.state != PeerUnreachable { @@ -145,20 +143,23 @@ func (qp *QueryPeerset) GetClosestNotUnreachable(k int) (result []peer.ID) { return result } -func (qp *QueryPeerset) NumSeen() int { - return len(qp.GetSeenPeers()) +// NumHeard returns the number of peers in state PeerHeard. +func (qp *QueryPeerset) NumHeard() int { + return len(qp.GetHeardPeers()) } -func (qp *QueryPeerset) GetSeenPeers() (result []peer.ID) { +// GetHeardPeers returns a slice of all peers in state PeerHeard, in an undefined order. +func (qp *QueryPeerset) GetHeardPeers() (result []peer.ID) { for _, p := range qp.all { - if p.state == PeerSeen { + if p.state == PeerHeard { result = append(result, p.id) } } return } -func (qp *QueryPeerset) GetSortedSeen() (result []peer.ID) { +// GetSortedHeard returns a slice of all peers in state PeerHeard, ordered by ascending distance to the target key. +func (qp *QueryPeerset) GetSortedHeard() (result []peer.ID) { qp.sort() - return qp.GetSeenPeers() + return qp.GetHeardPeers() } diff --git a/query.go b/query.go index 4bc5ed51a..ee2594bab 100644 --- a/query.go +++ b/query.go @@ -46,7 +46,7 @@ type query struct { queryFn queryFn // stopFn is used to determine if we should stop the WHOLE disjoint query. - stopFn stopFn // TODO: can context cancel do the job? if, not abstract it like "cancel within the context" + stopFn stopFn } // d is the number of disjoint queries. @@ -107,7 +107,9 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string loop: // wait for all the "d" disjoint queries to complete before we return - // XXX: waiting until all queries are done is a security bug!!! + // XXX: Waiting until all queries are done is a vector for DoS attacks: + // The disjoint lookup paths that are taken over by adversarial peers + // can easily be fooled to go on forever. for { select { case <-queryDone: @@ -140,8 +142,8 @@ func (q *query) runWithGreedyParallelism() { for { select { - case sawPeers := <-ch: - q.populatePeers(sawPeers) + case update := <-ch: + q.updateState(update) case <-pathCtx.Done(): q.terminate() return @@ -167,10 +169,10 @@ func (q *query) runWithGreedyParallelism() { // spawnQuery starts one query, if an available seen peer is found func (q *query) spawnQuery(ch chan<- *queryUpdate) { - if peers := q.queryPeers.GetSortedSeen(); len(peers) == 0 { + if peers := q.queryPeers.GetSortedHeard(); len(peers) == 0 { return } else { - q.queryPeers.MarkWaiting(peers[0]) + q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting) go q.queryPeer(ch, peers[0]) } } @@ -200,7 +202,7 @@ func (q *query) isLookupTermination() bool { var peers []peer.ID peers = q.queryPeers.GetClosestNotUnreachable(q.dht.beta) for _, p := range peers { - if !q.queryPeers.IsQueried(p) { + if q.queryPeers.GetState(p) != qpeerset.PeerQueried { return false } } @@ -208,7 +210,7 @@ func (q *query) isLookupTermination() bool { } func (q *query) isStarvationTermination() bool { - return q.queryPeers.NumSeen() == 0 && q.queryPeers.NumWaiting() == 0 + return q.queryPeers.NumHeard() == 0 && q.queryPeers.NumWaiting() == 0 } func (q *query) terminate() { @@ -256,27 +258,27 @@ func (q *query) queryPeer(ch chan<- *queryUpdate, p peer.ID) { ch <- &queryUpdate{seen: saw, queried: []peer.ID{p}} } -func (q *query) populatePeers(up *queryUpdate) { +func (q *query) updateState(up *queryUpdate) { for _, p := range up.seen { if p == q.dht.self { // don't add self. continue } q.queryPeers.TryAdd(p) - q.queryPeers.MarkSeen(p) + q.queryPeers.SetState(p, qpeerset.PeerHeard) } for _, p := range up.queried { if p == q.dht.self { // don't add self. continue } q.queryPeers.TryAdd(p) - q.queryPeers.MarkQueried(p) + q.queryPeers.SetState(p, qpeerset.PeerQueried) } for _, p := range up.unreachable { if p == q.dht.self { // don't add self. continue } q.queryPeers.TryAdd(p) - q.queryPeers.MarkUnreachable(p) + q.queryPeers.SetState(p, qpeerset.PeerUnreachable) } } From 86fd84adf4986d0ac718c0c712c5b1920d59143f Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Fri, 20 Mar 2020 11:49:19 -0700 Subject: [PATCH 08/18] Bug fixes. --- qpeerset/qpeerset.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/qpeerset/qpeerset.go b/qpeerset/qpeerset.go index d64d39c8f..a2113c9c1 100644 --- a/qpeerset/qpeerset.go +++ b/qpeerset/qpeerset.go @@ -87,7 +87,7 @@ func (qp *QueryPeerset) TryAdd(p peer.ID) bool { if qp.find(p) >= 0 { return false } else { - qp.all = append(qp.all, queryPeerState{id: p, distance: q.distanceToKey(p), state: PeerHeard}) + qp.all = append(qp.all, queryPeerState{id: p, distance: qp.distanceToKey(p), state: PeerHeard}) qp.sorted = false return true } @@ -137,8 +137,8 @@ func (qp *QueryPeerset) GetClosestNotUnreachable(count int) (result []peer.ID) { result = append(result, p.id) } } - if len(result) > k { - return result[:k] + if len(result) >= count { + return result[:count] } return result } From a4fcc846476f3acc26349699292a602603428171 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 22 Mar 2020 01:24:09 -0400 Subject: [PATCH 09/18] add wrapper lookup function to ensure that the top k returned peers from a lookup have all been queried even for beta < k. Modified the structure returned from lookups to be a useful subset of the full query state. --- lookup.go | 11 ++---- query.go | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++-- routing.go | 57 ++++++++++++++--------------- 3 files changed, 130 insertions(+), 42 deletions(-) diff --git a/lookup.go b/lookup.go index f0086a89d..71b360ceb 100644 --- a/lookup.go +++ b/lookup.go @@ -75,7 +75,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key)) defer e.Done() - queries, err := dht.runDisjointQueries(ctx, dht.d, key, + lookupRes, err := dht.runLookup(ctx, dht.d, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -109,18 +109,13 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee out := make(chan peer.ID, dht.bucketSize) defer close(out) - kadID := kb.ConvertKey(key) - allPeers := kb.SortClosestPeers(queries[0].globallyQueriedPeers.Peers(), kadID) - for i, p := range allPeers { - if i == dht.bucketSize { - break - } + for _, p := range lookupRes.peers { out <- p } if ctx.Err() == nil { // refresh the cpl for this key as the query was successful - dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now()) + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now()) } return out, ctx.Err() diff --git a/query.go b/query.go index ee2594bab..fa68b4ed2 100644 --- a/query.go +++ b/query.go @@ -3,7 +3,6 @@ package dht import ( "context" "errors" - "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" @@ -49,8 +48,71 @@ type query struct { stopFn stopFn } +type lookupResult struct { + peers []peer.ID + state []qpeerset.PeerState + completed bool +} + +// runLookup executes the lookup on the target using the given query function and stopping when either the context is +// cancelled or the stop function returns true (if the stop function is not sticky it is not guaranteed to cause a stop +// to occur just because it momentarily returns true). Returns the top K peers that have not failed as well as their +// state at the end of the lookup. +// +// After the lookup is complete the query function is run (unless stopped) against all of the top K peers that have not +// already been successfully queried. +func (dht *IpfsDHT) runLookup(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupResult, error) { + // run the query + lookupRes, err := dht.runDisjointQueries(ctx, d, target, queryFn, stopFn) + if err != nil { + return nil, err + } + + // return if the lookup has been externally stopped + if stopFn() || ctx.Err() != nil { + return lookupRes, nil + } + + // query all of the top K peers we've either Heard about or have outstanding queries we're Waiting on. + // This ensures that all of the top K results have been queried which adds to resiliency against churn for query + // functions that carry state (e.g. FindProviders and GetValue) as well as establish connections that are needed + // by stateless query functions (e.g. GetClosestPeers and therefore Provide and PutValue) + queryPeers := make([]peer.ID, 0, len(lookupRes.peers)) + for i, p := range lookupRes.peers { + if state := lookupRes.state[i]; state == qpeerset.PeerHeard || state == qpeerset.PeerWaiting { + queryPeers = append(queryPeers, p) + } + } + + doneCh := make(chan struct{}, 1) + followUpCtx, cancelFollowUp := context.WithCancel(ctx) + for _, p := range queryPeers { + qp := p + go func() { + _, _ = queryFn(followUpCtx, qp) + doneCh <- struct{}{} + }() + } + + // wait for all queries to complete before returning, aborting ongoing queries if we've been externally stopped + processFollowUp: + for i := 0; i < len(queryPeers); i++ { + select{ + case <-doneCh: + if stopFn() { + cancelFollowUp() + break processFollowUp + } + case <-ctx.Done(): + break processFollowUp + } + } + + return lookupRes, nil +} + // d is the number of disjoint queries. -func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) { +func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupResult, error) { queryCtx, cancelQuery := context.WithCancel(ctx) numQueriesComplete := 0 @@ -122,7 +184,43 @@ loop: } } - return queries, nil + // determine if any queries terminated early + completed := true + for _, q := range queries { + if !(q.isLookupTermination() || q.isStarvationTermination()){ + completed = false + break + } + } + + // extract the top K not unreachable peers from each query path, as well as their states at the end of the queries + var peers []peer.ID + peerState := make(map[peer.ID]qpeerset.PeerState) + for _, q := range queries { + qp := q.queryPeers.GetClosestNotUnreachable(dht.bucketSize) + for _, p := range qp { + peerState[p] = q.queryPeers.GetState(p) + } + peers = append(peers , qp...) + } + + // get the top K overall peers + sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(target)) + if len(sortedPeers) > dht.bucketSize { + sortedPeers = sortedPeers[:dht.bucketSize] + } + + // return the top K not unreachable peers across all query paths as well as their states at the end of the queries + res := &lookupResult{ + peers: sortedPeers, + state: make([]qpeerset.PeerState, len(sortedPeers)), + completed: completed, + } + + for i, p := range sortedPeers { + res.state[i] = peerState[p] + } + return res, nil } type queryUpdate struct { diff --git a/routing.go b/routing.go index 356b5306e..23d33a83a 100644 --- a/routing.go +++ b/routing.go @@ -171,7 +171,7 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing } stopCh := make(chan struct{}) - valCh, queries := dht.getValues(ctx, key, stopCh) + valCh, lookupRes := dht.getValues(ctx, key, stopCh) out := make(chan []byte) go func() { @@ -183,17 +183,12 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing updatePeers := make([]peer.ID, 0, dht.bucketSize) select { - case q := <-queries: - if len(q) < 1 { + case l := <-lookupRes: + if l == nil { return } - peers := q[0].globallyQueriedPeers.Peers() - peers = kb.SortClosestPeers(peers, kb.ConvertKey(key)) - for i, p := range peers { - if i == dht.bucketSize { - break - } + for _, p := range l.peers { if _, ok := peersWithBest[p]; !ok { updatePeers = append(updatePeers, p) } @@ -319,9 +314,9 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte } } -func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan []*query) { +func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupResult) { valCh := make(chan RecvdVal, 1) - queriesCh := make(chan []*query, 1) + lookupResCh := make(chan *lookupResult, 1) if rec, err := dht.getLocal(key); rec != nil && err == nil { select { @@ -335,8 +330,8 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st go func() { defer close(valCh) - defer close(queriesCh) - queries, err := dht.runDisjointQueries(ctx, dht.d, key, + defer close(lookupResCh) + lookupRes, err := dht.runLookup(ctx, dht.d, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -398,26 +393,18 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st if err != nil { return } - queriesCh <- queries + lookupResCh <- lookupRes if ctx.Err() == nil { - dht.refreshRTIfNoShortcut(kb.ConvertKey(key), queries) + dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes) } }() - return valCh, queriesCh + return valCh, lookupResCh } -func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) { - shortcutTaken := false - for _, q := range queries { - if len(q.queryPeers.GetClosestNotUnreachable(dht.beta)) > 0 { - shortcutTaken = true - break - } - } - - if !shortcutTaken { +func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupResult) { + if lookupRes.completed { // refresh the cpl for this key as the query was successful dht.routingTable.ResetCplRefreshedAtForID(key, time.Now()) } @@ -592,7 +579,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash } } - queries, err := dht.runDisjointQueries(ctx, dht.d, string(key), + queries, err := dht.runLookup(ctx, dht.d, string(key), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -668,7 +655,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return pi, nil } - queries, err := dht.runDisjointQueries(ctx, dht.d, string(id), + lookupRes, err := dht.runLookup(ctx, dht.d, string(id), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -702,9 +689,17 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, } // refresh the cpl for this key if we discovered the peer because of the query - if ctx.Err() == nil && queries[0].globallyQueriedPeers.Contains(id) { - kadID := kb.ConvertPeerID(id) - dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now()) + if ctx.Err() == nil { + discoveredPeerDuringQuery := false + for _, p := range lookupRes.peers { + if p == id { + discoveredPeerDuringQuery = true + break + } + } + if discoveredPeerDuringQuery { + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now()) + } } // TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar From 9476ad8683c3fb357064666a407e1b8c4d6abbd0 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Sun, 22 Mar 2020 04:52:13 -0400 Subject: [PATCH 10/18] query state transitions bug fixes. fix wrapper function to take into account disjoint paths. Adjust test to take into account new query behavior (i.e. GetClosestPeers is no longer guaranteed to find k peers if the routing tables are invalid, e.g. ring setups) --- dht_test.go | 8 ++--- lookup.go | 2 +- query.go | 86 +++++++++++++++++++++++++++++++++-------------------- routing.go | 6 ++-- 4 files changed, 60 insertions(+), 42 deletions(-) diff --git a/dht_test.go b/dht_test.go index 43b67f311..b60e8c265 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1403,10 +1403,6 @@ func testFindPeerQuery(t *testing.T, connect(t, ctx, guy, others[i]) } - for _, d := range dhts { - d.RefreshRoutingTable() - } - var reachableIds []peer.ID for i, d := range dhts { lp := len(d.host.Network().Peers()) @@ -1471,8 +1467,8 @@ func TestFindClosestPeers(t *testing.T) { out = append(out, p) } - if len(out) != querier.bucketSize { - t.Fatalf("got wrong number of peers (got %d, expected %d)", len(out), querier.bucketSize) + if len(out) < querier.beta { + t.Fatalf("got wrong number of peers (got %d, expected at least %d)", len(out), querier.beta) } } diff --git a/lookup.go b/lookup.go index 71b360ceb..38c5b0036 100644 --- a/lookup.go +++ b/lookup.go @@ -113,7 +113,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee out <- p } - if ctx.Err() == nil { + if ctx.Err() == nil && lookupRes.completed { // refresh the cpl for this key as the query was successful dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now()) } diff --git a/query.go b/query.go index fa68b4ed2..cc106eee2 100644 --- a/query.go +++ b/query.go @@ -68,11 +68,6 @@ func (dht *IpfsDHT) runLookup(ctx context.Context, d int, target string, queryFn return nil, err } - // return if the lookup has been externally stopped - if stopFn() || ctx.Err() != nil { - return lookupRes, nil - } - // query all of the top K peers we've either Heard about or have outstanding queries we're Waiting on. // This ensures that all of the top K results have been queried which adds to resiliency against churn for query // functions that carry state (e.g. FindProviders and GetValue) as well as establish connections that are needed @@ -84,6 +79,16 @@ func (dht *IpfsDHT) runLookup(ctx context.Context, d int, target string, queryFn } } + if len(queryPeers) == 0 { + return lookupRes, nil + } + + // return if the lookup has been externally stopped + if ctx.Err() != nil || stopFn() { + lookupRes.completed = false + return lookupRes, nil + } + doneCh := make(chan struct{}, 1) followUpCtx, cancelFollowUp := context.WithCancel(ctx) for _, p := range queryPeers { @@ -96,17 +101,22 @@ func (dht *IpfsDHT) runLookup(ctx context.Context, d int, target string, queryFn // wait for all queries to complete before returning, aborting ongoing queries if we've been externally stopped processFollowUp: - for i := 0; i < len(queryPeers); i++ { - select{ - case <-doneCh: - if stopFn() { - cancelFollowUp() + for i := 0; i < len(queryPeers); i++ { + select{ + case <-doneCh: + if stopFn() { + cancelFollowUp() + if i < len(queryPeers) - 1 { + lookupRes.completed = false + } + break processFollowUp + } + case <-ctx.Done(): + lookupRes.completed = false break processFollowUp } - case <-ctx.Done(): - break processFollowUp } - } + return lookupRes, nil } @@ -115,9 +125,6 @@ func (dht *IpfsDHT) runLookup(ctx context.Context, d int, target string, queryFn func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupResult, error) { queryCtx, cancelQuery := context.WithCancel(ctx) - numQueriesComplete := 0 - queryDone := make(chan struct{}, d) - // pick the K closest peers to the key in our Routing table and shuffle them. seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize) if len(seedPeers) == 0 { @@ -159,6 +166,7 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string } // start the "d" disjoint queries + queryDone := make(chan struct{}, d) for i := 0; i < d; i++ { query := queries[i] go func() { @@ -167,20 +175,16 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string }() } -loop: // wait for all the "d" disjoint queries to complete before we return // XXX: Waiting until all queries are done is a vector for DoS attacks: // The disjoint lookup paths that are taken over by adversarial peers // can easily be fooled to go on forever. + numQueriesComplete := 0 for { - select { - case <-queryDone: - numQueriesComplete++ - if numQueriesComplete == d { - break loop - } - case <-ctx.Done(): - break loop + <-queryDone + numQueriesComplete++ + if numQueriesComplete == d { + break } } @@ -199,9 +203,20 @@ loop: for _, q := range queries { qp := q.queryPeers.GetClosestNotUnreachable(dht.bucketSize) for _, p := range qp { - peerState[p] = q.queryPeers.GetState(p) + // Since the same peer can be seen in multiple queries use the "best" state for the peer + // Note: It's possible that a peer was marked undialable in one path, but wasn't yet tried in another path + // for now we're going to return that peer as long as some path does not think it is undialable. This may + // change in the future if we track addresses dialed per path. + state := q.queryPeers.GetState(p) + if currState, ok := peerState[p]; ok { + if state > currState { + peerState[p] = state + } + } else { + peerState[p] = state + peers = append(peers, p) + } } - peers = append(peers , qp...) } // get the top K overall peers @@ -244,13 +259,17 @@ func (q *query) runWithGreedyParallelism() { q.updateState(update) case <-pathCtx.Done(): q.terminate() - return } // termination is triggered on end-of-lookup conditions or starvation of unused peers if q.readyToTerminate() { q.terminate() - return + + // exit once all goroutines have been cleaned up + if q.queryPeers.NumWaiting() == 0 { + return + } + continue } // if all "threads" are busy, wait until someone finishes @@ -362,21 +381,24 @@ func (q *query) updateState(up *queryUpdate) { continue } q.queryPeers.TryAdd(p) - q.queryPeers.SetState(p, qpeerset.PeerHeard) } for _, p := range up.queried { if p == q.dht.self { // don't add self. continue } q.queryPeers.TryAdd(p) - q.queryPeers.SetState(p, qpeerset.PeerQueried) + if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { + q.queryPeers.SetState(p, qpeerset.PeerQueried) + } } for _, p := range up.unreachable { if p == q.dht.self { // don't add self. continue } q.queryPeers.TryAdd(p) - q.queryPeers.SetState(p, qpeerset.PeerUnreachable) + if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { + q.queryPeers.SetState(p, qpeerset.PeerUnreachable) + } } } diff --git a/routing.go b/routing.go index 23d33a83a..a32f168f1 100644 --- a/routing.go +++ b/routing.go @@ -579,7 +579,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash } } - queries, err := dht.runLookup(ctx, dht.d, string(key), + lookupRes, err := dht.runLookup(ctx, dht.d, string(key), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -636,7 +636,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash ) if err != nil && ctx.Err() == nil { - dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), queries) + dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) } } @@ -697,7 +697,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, break } } - if discoveredPeerDuringQuery { + if discoveredPeerDuringQuery || lookupRes.completed{ dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now()) } } From b04ec8ab88b47fa3cd953c75cbc892291c1ff37e Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 23 Mar 2020 01:02:15 -0400 Subject: [PATCH 11/18] move lookup result construction into its own function --- query.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/query.go b/query.go index cc106eee2..e2b4d4320 100644 --- a/query.go +++ b/query.go @@ -126,7 +126,8 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string queryCtx, cancelQuery := context.WithCancel(ctx) // pick the K closest peers to the key in our Routing table and shuffle them. - seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize) + targetKadID := kb.ConvertKey(target) + seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) if len(seedPeers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, @@ -188,6 +189,12 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string } } + res := dht.constructLookupResult(queries, targetKadID) + return res, nil +} + +// constructLookupResult takes the query information and uses it to construct the lookup result +func (dht *IpfsDHT) constructLookupResult(queries []*query, target kb.ID) *lookupResult { // determine if any queries terminated early completed := true for _, q := range queries { @@ -220,7 +227,7 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string } // get the top K overall peers - sortedPeers := kb.SortClosestPeers(peers, kb.ConvertKey(target)) + sortedPeers := kb.SortClosestPeers(peers, target) if len(sortedPeers) > dht.bucketSize { sortedPeers = sortedPeers[:dht.bucketSize] } @@ -235,7 +242,8 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string for i, p := range sortedPeers { res.state[i] = peerState[p] } - return res, nil + + return res } type queryUpdate struct { From c9d93bdaa9e676e5c53341c32eacf4be2c81e2dd Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 23 Mar 2020 01:29:44 -0400 Subject: [PATCH 12/18] go fmt --- query.go | 33 ++++++++++++++++----------------- routing.go | 2 +- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/query.go b/query.go index e2b4d4320..b1bc6f654 100644 --- a/query.go +++ b/query.go @@ -49,8 +49,8 @@ type query struct { } type lookupResult struct { - peers []peer.ID - state []qpeerset.PeerState + peers []peer.ID + state []qpeerset.PeerState completed bool } @@ -84,7 +84,7 @@ func (dht *IpfsDHT) runLookup(ctx context.Context, d int, target string, queryFn } // return if the lookup has been externally stopped - if ctx.Err() != nil || stopFn() { + if ctx.Err() != nil || stopFn() { lookupRes.completed = false return lookupRes, nil } @@ -100,23 +100,22 @@ func (dht *IpfsDHT) runLookup(ctx context.Context, d int, target string, queryFn } // wait for all queries to complete before returning, aborting ongoing queries if we've been externally stopped - processFollowUp: - for i := 0; i < len(queryPeers); i++ { - select{ - case <-doneCh: - if stopFn() { - cancelFollowUp() - if i < len(queryPeers) - 1 { - lookupRes.completed = false - } - break processFollowUp +processFollowUp: + for i := 0; i < len(queryPeers); i++ { + select { + case <-doneCh: + if stopFn() { + cancelFollowUp() + if i < len(queryPeers)-1 { + lookupRes.completed = false } - case <-ctx.Done(): - lookupRes.completed = false break processFollowUp } + case <-ctx.Done(): + lookupRes.completed = false + break processFollowUp } - + } return lookupRes, nil } @@ -198,7 +197,7 @@ func (dht *IpfsDHT) constructLookupResult(queries []*query, target kb.ID) *looku // determine if any queries terminated early completed := true for _, q := range queries { - if !(q.isLookupTermination() || q.isStarvationTermination()){ + if !(q.isLookupTermination() || q.isStarvationTermination()) { completed = false break } diff --git a/routing.go b/routing.go index a32f168f1..6695c9633 100644 --- a/routing.go +++ b/routing.go @@ -697,7 +697,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, break } } - if discoveredPeerDuringQuery || lookupRes.completed{ + if discoveredPeerDuringQuery || lookupRes.completed { dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now()) } } From 6a56205bab06929e1b55f7295b1dbbd4e0f1d19b Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 23 Mar 2020 22:16:18 +0530 Subject: [PATCH 13/18] tests for qpeerset --- qpeerset/qpeerset_test.go | 84 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 qpeerset/qpeerset_test.go diff --git a/qpeerset/qpeerset_test.go b/qpeerset/qpeerset_test.go new file mode 100644 index 000000000..a84095765 --- /dev/null +++ b/qpeerset/qpeerset_test.go @@ -0,0 +1,84 @@ +package qpeerset + +import ( + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + + kb "github.com/libp2p/go-libp2p-kbucket" + + "github.com/stretchr/testify/require" +) + +func TestQPeerSet(t *testing.T) { + key := "test" + qp := NewQueryPeerset(key) + + // -----------------Ordering between peers for the Test ----- + // KEY < peer3 < peer1 < peer4 < peer2 + // ---------------------------------------------------------- + peer2 := test.RandPeerIDFatal(t) + var peer4 peer.ID + for { + peer4 = test.RandPeerIDFatal(t) + if kb.Closer(peer4, peer2, key) { + break + } + } + + var peer1 peer.ID + for { + peer1 = test.RandPeerIDFatal(t) + if kb.Closer(peer1, peer4, key) { + break + } + } + + var peer3 peer.ID + for { + peer3 = test.RandPeerIDFatal(t) + if kb.Closer(peer3, peer1, key) { + break + } + } + + // find fails + require.Equal(t, -1, qp.find(peer2)) + + // add peer2,assert state & then another add fails + require.True(t, qp.TryAdd(peer2)) + require.Equal(t, PeerHeard, qp.GetState(peer2)) + require.False(t, qp.TryAdd(peer2)) + require.Equal(t, 0, qp.NumWaiting()) + + // add peer4 + require.True(t, qp.TryAdd(peer4)) + cl := qp.GetClosestNotUnreachable(2) + require.Equal(t, []peer.ID{peer4, peer2}, cl) + cl = qp.GetClosestNotUnreachable(3) + require.Equal(t, []peer.ID{peer4, peer2}, cl) + cl = qp.GetClosestNotUnreachable(1) + require.Equal(t, []peer.ID{peer4}, cl) + + // mark as unreachable & try to get it + qp.SetState(peer4, PeerUnreachable) + cl = qp.GetClosestNotUnreachable(1) + require.Equal(t, []peer.ID{peer2}, cl) + + // add peer1 + require.True(t, qp.TryAdd(peer1)) + cl = qp.GetClosestNotUnreachable(1) + require.Equal(t, []peer.ID{peer1}, cl) + cl = qp.GetClosestNotUnreachable(2) + require.Equal(t, []peer.ID{peer1, peer2}, cl) + + // mark as waiting and assert + qp.SetState(peer2, PeerWaiting) + require.Equal(t, []peer.ID{peer2}, qp.GetWaitingPeers()) + + require.Equal(t, []peer.ID{peer1}, qp.GetHeardPeers()) + require.True(t, qp.TryAdd(peer3)) + require.Equal(t, []peer.ID{peer3, peer1}, qp.GetSortedHeard()) + require.Equal(t, 2, qp.NumHeard()) +} From 15e343b8b0427bc3d2a1c34a88e6368380ce271e Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 24 Mar 2020 02:01:06 -0400 Subject: [PATCH 14/18] do not mark the routing table as updated after a FindPeer query. bootstrap logic now uses GetClosestPeers instead of FindPeer. FindPeer can return addresses even if not Connected as long as it was either recently connected (CanConnect) or was discovered during the lookup. --- dht_bootstrap.go | 10 +++------- routing.go | 26 +++++++++++++------------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 2ff90b6ae..e7442ae15 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -8,7 +8,6 @@ import ( multierror "github.com/hashicorp/go-multierror" process "github.com/jbenet/goprocess" processctx "github.com/jbenet/goprocess/context" - "github.com/libp2p/go-libp2p-core/routing" kbucket "github.com/libp2p/go-libp2p-kbucket" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" @@ -58,8 +57,8 @@ func (dht *IpfsDHT) startSelfLookup() error { // Do a self walk queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) - _, err := dht.FindPeer(queryCtx, dht.self) - if err == routing.ErrNotFound || err == kbucket.ErrLookupFailure { + _, err := dht.GetClosestPeers(queryCtx, string(dht.self)) + if err == kbucket.ErrLookupFailure { err = nil } else if err != nil { err = fmt.Errorf("failed to query self during routing table refresh: %s", err) @@ -207,10 +206,7 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error { // walk to the generated peer walkFnc := func(c context.Context) error { - _, err := dht.FindPeer(c, randPeer) - if err == routing.ErrNotFound { - return nil - } + _, err := dht.GetClosestPeers(c, string(randPeer)) return err } diff --git a/routing.go b/routing.go index 6695c9633..eb2b4ed22 100644 --- a/routing.go +++ b/routing.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-kad-dht/qpeerset" "sync" "time" @@ -688,22 +689,21 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return peer.AddrInfo{}, err } - // refresh the cpl for this key if we discovered the peer because of the query - if ctx.Err() == nil { - discoveredPeerDuringQuery := false - for _, p := range lookupRes.peers { - if p == id { - discoveredPeerDuringQuery = true - break - } - } - if discoveredPeerDuringQuery || lookupRes.completed { - dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now()) + dialedPeerDuringQuery := false + for i, p := range lookupRes.peers { + if p == id { + // Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol + // and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT + // server peer and is not identified as such is a bug. + dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard + break } } - // TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar - if dht.host.Network().Connectedness(id) == network.Connected { + // Return peer information if we tried to dial the peer during the query or we are (or recently were) connected + // to the peer. + connectedness := dht.host.Network().Connectedness(id) + if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect { return dht.peerstore.PeerInfo(id), nil } From 8656be1b8676c6f5bfbce20fef2635b6ee8ec093 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 24 Mar 2020 02:21:47 -0400 Subject: [PATCH 15/18] more comments --- lookup.go | 2 +- query.go | 29 ++++++++++++++++------------- routing.go | 12 ++++++------ 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/lookup.go b/lookup.go index 38c5b0036..7e59771da 100644 --- a/lookup.go +++ b/lookup.go @@ -75,7 +75,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key)) defer e.Done() - lookupRes, err := dht.runLookup(ctx, dht.d, key, + lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ diff --git a/query.go b/query.go index b1bc6f654..792e33f74 100644 --- a/query.go +++ b/query.go @@ -48,20 +48,23 @@ type query struct { stopFn stopFn } -type lookupResult struct { - peers []peer.ID - state []qpeerset.PeerState +type lookupWithFollowupResult struct { + peers []peer.ID // the top K not unreachable peers across all query paths + state []qpeerset.PeerState // the peer states at the end of the queries + + // indicates that neither the lookup nor the followup has been prematurely terminated by an external condition such + // as context cancellation or the stop function being called. completed bool } -// runLookup executes the lookup on the target using the given query function and stopping when either the context is -// cancelled or the stop function returns true (if the stop function is not sticky it is not guaranteed to cause a stop -// to occur just because it momentarily returns true). Returns the top K peers that have not failed as well as their -// state at the end of the lookup. +// runLookupWithFollowup executes the lookup on the target using the given query function and stopping when either the +// context is cancelled or the stop function returns true. Note: if the stop function is not sticky, i.e. it does not +// return true every time after the first time it returns true, it is not guaranteed to cause a stop to occur just +// because it momentarily returns true. // -// After the lookup is complete the query function is run (unless stopped) against all of the top K peers that have not -// already been successfully queried. -func (dht *IpfsDHT) runLookup(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupResult, error) { +// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the +// lookup that have not already been successfully queried. +func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { // run the query lookupRes, err := dht.runDisjointQueries(ctx, d, target, queryFn, stopFn) if err != nil { @@ -121,7 +124,7 @@ processFollowUp: } // d is the number of disjoint queries. -func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupResult, error) { +func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { queryCtx, cancelQuery := context.WithCancel(ctx) // pick the K closest peers to the key in our Routing table and shuffle them. @@ -193,7 +196,7 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string } // constructLookupResult takes the query information and uses it to construct the lookup result -func (dht *IpfsDHT) constructLookupResult(queries []*query, target kb.ID) *lookupResult { +func (dht *IpfsDHT) constructLookupResult(queries []*query, target kb.ID) *lookupWithFollowupResult { // determine if any queries terminated early completed := true for _, q := range queries { @@ -232,7 +235,7 @@ func (dht *IpfsDHT) constructLookupResult(queries []*query, target kb.ID) *looku } // return the top K not unreachable peers across all query paths as well as their states at the end of the queries - res := &lookupResult{ + res := &lookupWithFollowupResult{ peers: sortedPeers, state: make([]qpeerset.PeerState, len(sortedPeers)), completed: completed, diff --git a/routing.go b/routing.go index eb2b4ed22..dacaf5fc6 100644 --- a/routing.go +++ b/routing.go @@ -315,9 +315,9 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte } } -func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupResult) { +func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { valCh := make(chan RecvdVal, 1) - lookupResCh := make(chan *lookupResult, 1) + lookupResCh := make(chan *lookupWithFollowupResult, 1) if rec, err := dht.getLocal(key); rec != nil && err == nil { select { @@ -332,7 +332,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st go func() { defer close(valCh) defer close(lookupResCh) - lookupRes, err := dht.runLookup(ctx, dht.d, key, + lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -404,7 +404,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st return valCh, lookupResCh } -func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupResult) { +func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) { if lookupRes.completed { // refresh the cpl for this key as the query was successful dht.routingTable.ResetCplRefreshedAtForID(key, time.Now()) @@ -580,7 +580,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash } } - lookupRes, err := dht.runLookup(ctx, dht.d, string(key), + lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, string(key), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -656,7 +656,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return pi, nil } - lookupRes, err := dht.runLookup(ctx, dht.d, string(id), + lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, string(id), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ From de9d5ae745516f623126cd800ae9b5f5c7dfb89f Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 24 Mar 2020 02:42:09 -0400 Subject: [PATCH 16/18] fix lookup followup goroutine cancellation --- query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/query.go b/query.go index 792e33f74..730375c4d 100644 --- a/query.go +++ b/query.go @@ -92,7 +92,7 @@ func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, d int, target str return lookupRes, nil } - doneCh := make(chan struct{}, 1) + doneCh := make(chan struct{}, len(queryPeers)) followUpCtx, cancelFollowUp := context.WithCancel(ctx) for _, p := range queryPeers { qp := p From a21162d55cedeb49950c299e0e3a5d9597b647ff Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 24 Mar 2020 02:59:53 -0400 Subject: [PATCH 17/18] make the query logic panic if the state is invalid --- query.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/query.go b/query.go index 730375c4d..89bfcf67f 100644 --- a/query.go +++ b/query.go @@ -3,6 +3,7 @@ package dht import ( "context" "errors" + "fmt" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" @@ -396,18 +397,20 @@ func (q *query) updateState(up *queryUpdate) { if p == q.dht.self { // don't add self. continue } - q.queryPeers.TryAdd(p) if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { q.queryPeers.SetState(p, qpeerset.PeerQueried) + } else { + panic(fmt.Errorf("kademlia protocol error: tried to transition to the queried state from state %v", st)) } } for _, p := range up.unreachable { if p == q.dht.self { // don't add self. continue } - q.queryPeers.TryAdd(p) if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { q.queryPeers.SetState(p, qpeerset.PeerUnreachable) + } else { + panic(fmt.Errorf("kademlia protocol error: tried to transition to the unreachable state from state %v", st)) } } } From f9a79c5ca80b013f8cede9095c4f3e161631bd3b Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 24 Mar 2020 03:07:31 -0400 Subject: [PATCH 18/18] go fmt + import reordering --- query.go | 5 +++-- routing.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/query.go b/query.go index 89bfcf67f..ce9b95c4f 100644 --- a/query.go +++ b/query.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" @@ -50,8 +51,8 @@ type query struct { } type lookupWithFollowupResult struct { - peers []peer.ID // the top K not unreachable peers across all query paths - state []qpeerset.PeerState // the peer states at the end of the queries + peers []peer.ID // the top K not unreachable peers across all query paths + state []qpeerset.PeerState // the peer states at the end of the queries // indicates that neither the lookup nor the followup has been prematurely terminated by an external condition such // as context cancellation or the stop function being called. diff --git a/routing.go b/routing.go index dacaf5fc6..54dad76d1 100644 --- a/routing.go +++ b/routing.go @@ -4,11 +4,10 @@ import ( "bytes" "context" "fmt" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-kad-dht/qpeerset" "sync" "time" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" @@ -17,6 +16,7 @@ import ( u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" pb "github.com/libp2p/go-libp2p-kad-dht/pb" + "github.com/libp2p/go-libp2p-kad-dht/qpeerset" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" "github.com/multiformats/go-multihash"