diff --git a/bucket.go b/bucket.go index 6a26f7b..4165cd4 100644 --- a/bucket.go +++ b/bucket.go @@ -4,50 +4,77 @@ package kbucket import ( "container/list" - "sync" - "github.com/libp2p/go-libp2p-core/peer" ) -// Bucket holds a list of peers. -type Bucket struct { - lk sync.RWMutex +// PeerState is the state of the peer as seen by the Routing Table. +type PeerState int + +const ( + // PeerStateActive indicates that we know the peer is active/alive. + PeerStateActive PeerState = iota + // PeerStateMissing indicates that we do not know the state of the peer. + PeerStateMissing +) + +// PeerInfo holds all related information for a peer in the K-Bucket. +type PeerInfo struct { + Id peer.ID + State PeerState +} + +// bucket holds a list of peers. +// we synchronize on the Routing Table lock for all access to the bucket +// and so do not need any locks in the bucket. +// if we want/need to avoid locking the table for accessing a bucket in the future, +// it WILL be the caller's responsibility to synchronize all access to a bucket. +type bucket struct { list *list.List } -func newBucket() *Bucket { - b := new(Bucket) +func newBucket() *bucket { + b := new(bucket) b.list = list.New() return b } -func (b *Bucket) Peers() []peer.ID { - b.lk.RLock() - defer b.lk.RUnlock() +// returns all peers in the bucket +// it is safe for the caller to modify the returned objects as it is a defensive copy +func (b *bucket) peers() []PeerInfo { + var ps []PeerInfo + for e := b.list.Front(); e != nil; e = e.Next() { + p := e.Value.(*PeerInfo) + ps = append(ps, *p) + } + return ps +} + +// return the Ids of all the peers in the bucket. +func (b *bucket) peerIds() []peer.ID { ps := make([]peer.ID, 0, b.list.Len()) for e := b.list.Front(); e != nil; e = e.Next() { - id := e.Value.(peer.ID) - ps = append(ps, id) + p := e.Value.(*PeerInfo) + ps = append(ps, p.Id) } return ps } -func (b *Bucket) Has(id peer.ID) bool { - b.lk.RLock() - defer b.lk.RUnlock() +// returns the peer with the given Id if it exists +// returns nil if the peerId does not exist +func (b *bucket) getPeer(p peer.ID) *PeerInfo { for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { - return true + if e.Value.(*PeerInfo).Id == p { + return e.Value.(*PeerInfo) } } - return false + return nil } -func (b *Bucket) Remove(id peer.ID) bool { - b.lk.Lock() - defer b.lk.Unlock() +// removes the peer with the given Id from the bucket. +// returns true if successful, false otherwise. +func (b *bucket) remove(id peer.ID) bool { for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { + if e.Value.(*PeerInfo).Id == id { b.list.Remove(e) return true } @@ -55,49 +82,33 @@ func (b *Bucket) Remove(id peer.ID) bool { return false } -func (b *Bucket) MoveToFront(id peer.ID) { - b.lk.Lock() - defer b.lk.Unlock() +func (b *bucket) moveToFront(id peer.ID) { + for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { + if e.Value.(*PeerInfo).Id == id { b.list.MoveToFront(e) } } } -func (b *Bucket) PushFront(p peer.ID) { - b.lk.Lock() +func (b *bucket) pushFront(p *PeerInfo) { b.list.PushFront(p) - b.lk.Unlock() -} - -func (b *Bucket) PopBack() peer.ID { - b.lk.Lock() - defer b.lk.Unlock() - last := b.list.Back() - b.list.Remove(last) - return last.Value.(peer.ID) } -func (b *Bucket) Len() int { - b.lk.RLock() - defer b.lk.RUnlock() +func (b *bucket) len() int { return b.list.Len() } -// Split splits a buckets peers into two buckets, the methods receiver will have +// splits a buckets peers into two buckets, the methods receiver will have // peers with CPL equal to cpl, the returned bucket will have peers with CPL // greater than cpl (returned bucket has closer peers) -func (b *Bucket) Split(cpl int, target ID) *Bucket { - b.lk.Lock() - defer b.lk.Unlock() - +func (b *bucket) split(cpl int, target ID) *bucket { out := list.New() newbuck := newBucket() newbuck.list = out e := b.list.Front() for e != nil { - peerID := ConvertPeerID(e.Value.(peer.ID)) + peerID := ConvertPeerID(e.Value.(*PeerInfo).Id) peerCPL := CommonPrefixLen(peerID, target) if peerCPL > cpl { cur := e diff --git a/cpl_replacement_cache.go b/cpl_replacement_cache.go new file mode 100644 index 0000000..2f521f7 --- /dev/null +++ b/cpl_replacement_cache.go @@ -0,0 +1,97 @@ +package kbucket + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/peer" +) + +// TODO Should ideally use a Circular queue for this +// maintains a bounded, de-duplicated and FIFO peer candidate queue for each Cpl +type cplReplacementCache struct { + localPeer ID + maxQueueSize int + + sync.Mutex + candidates map[uint][]peer.ID // candidates for a Cpl +} + +func newCplReplacementCache(localPeer ID, maxQueueSize int) *cplReplacementCache { + return &cplReplacementCache{ + localPeer: localPeer, + maxQueueSize: maxQueueSize, + candidates: make(map[uint][]peer.ID), + } +} + +// pushes a candidate to the end of the queue for the corresponding Cpl +// returns false if the queue is full or it already has the peer +// returns true if was successfully added +func (c *cplReplacementCache) push(p peer.ID) bool { + c.Lock() + defer c.Unlock() + + cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) + + // queue is full + if len(c.candidates[cpl]) >= c.maxQueueSize { + return false + } + // queue already has the peer + for _, pr := range c.candidates[cpl] { + if pr == p { + return false + } + } + + // push + c.candidates[cpl] = append(c.candidates[cpl], p) + return true +} + +// pops a candidate from the top of the candidate queue for the given Cpl +// returns false if the queue is empty +// returns the peerId and true if successful +func (c *cplReplacementCache) pop(cpl uint) (peer.ID, bool) { + c.Lock() + defer c.Unlock() + + if len(c.candidates[cpl]) != 0 { + p := c.candidates[cpl][0] + c.candidates[cpl] = c.candidates[cpl][1:] + + // delete the queue if it's empty + if len(c.candidates[cpl]) == 0 { + delete(c.candidates, cpl) + } + + return p, true + } + return "", false +} + +// removes a given peer if it's present +// returns false if the peer is absent +func (c *cplReplacementCache) remove(p peer.ID) bool { + c.Lock() + defer c.Unlock() + + cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) + + if len(c.candidates[cpl]) != 0 { + // remove the peer if it's present + for i, pr := range c.candidates[cpl] { + if pr == p { + c.candidates[cpl] = append(c.candidates[cpl][:i], c.candidates[cpl][i+1:]...) + } + } + + // remove the queue if it's empty + if len(c.candidates[cpl]) == 0 { + delete(c.candidates, cpl) + } + + return true + } + return false +} diff --git a/cpl_replacement_cache_test.go b/cpl_replacement_cache_test.go new file mode 100644 index 0000000..98981a4 --- /dev/null +++ b/cpl_replacement_cache_test.go @@ -0,0 +1,87 @@ +package kbucket + +import ( + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + + "github.com/stretchr/testify/require" +) + +func TestCplReplacementCache(t *testing.T) { + t.Parallel() + + maxQSize := 2 + local := ConvertPeerID(test.RandPeerIDFatal(t)) + c := newCplReplacementCache(local, maxQSize) + + // pop an empty queue fails + p, b := c.pop(1) + require.Empty(t, p) + require.False(t, b) + + // push two elements to an empty queue works + testPeer1 := genPeer(t, local, 1) + testPeer2 := genPeer(t, local, 1) + + // pushing first peer works + require.True(t, c.push(testPeer1)) + // pushing a duplicate fails + require.False(t, c.push(testPeer1)) + // pushing another peers works + require.True(t, c.push(testPeer2)) + + // popping the above pushes works + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer1, p) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer2, p) + + // pushing & popping again works + require.True(t, c.push(testPeer1)) + require.True(t, c.push(testPeer2)) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer1, p) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer2, p) + + // fill up a queue + p1 := genPeer(t, local, 2) + p2 := genPeer(t, local, 2) + require.True(t, c.push(p1)) + require.True(t, c.push(p2)) + + // push should not work on a full queue + p3 := genPeer(t, local, 2) + require.False(t, c.push(p3)) + + // remove a peer & verify it's been removed + p4 := genPeer(t, local, 0) + require.True(t, c.push(p4)) + + c.Lock() + require.Len(t, c.candidates[0], 1) + require.True(t, c.candidates[0][0] == p4) + c.Unlock() + + require.True(t, c.remove(p4)) + + c.Lock() + require.Len(t, c.candidates[0], 0) + c.Unlock() +} + +func genPeer(t *testing.T, local ID, cpl int) peer.ID { + var p peer.ID + for { + p = test.RandPeerIDFatal(t) + if CommonPrefixLen(local, ConvertPeerID(p)) == cpl { + return p + } + } +} diff --git a/go.mod b/go.mod index 980d86b..1af884a 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,13 @@ module github.com/libp2p/go-libp2p-kbucket require ( github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-log v0.0.1 + github.com/jbenet/goprocess v0.1.3 github.com/libp2p/go-libp2p-core v0.3.0 github.com/libp2p/go-libp2p-peerstore v0.1.4 github.com/minio/sha256-simd v0.1.1 github.com/multiformats/go-multihash v0.0.13 github.com/stretchr/testify v1.4.0 + github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 ) go 1.13 diff --git a/go.sum b/go.sum index 46342c1..593300d 100644 --- a/go.sum +++ b/go.sum @@ -174,6 +174,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -204,6 +206,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnUy4kzoKJemjH8eqwCl3RjW1r1ITCjjW7G8= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= diff --git a/options.go b/options.go new file mode 100644 index 0000000..f2bbeaf --- /dev/null +++ b/options.go @@ -0,0 +1,82 @@ +package kbucket + +import ( + "fmt" + "time" +) + +// option is the Routing Table functional option type. +type option func(*options) error + +// options is a structure containing all the functional options that can be used when constructing a Routing Table. +type options struct { + tableCleanup struct { + peerValidationFnc PeerValidationFunc + peersForValidationFnc PeerSelectionFunc + peerValidationTimeout time.Duration + interval time.Duration + } +} + +// Apply applies the given options to this option. +func (o *options) Apply(opts ...option) error { + for i, opt := range opts { + if err := opt(o); err != nil { + return fmt.Errorf("routing table option %d failed: %s", i, err) + } + } + return nil +} + +// PeerValidationFnc configures the Peer Validation function used for RT cleanup. +// Not configuring this disables Routing Table cleanup. +func PeerValidationFnc(f PeerValidationFunc) option { + return func(o *options) error { + o.tableCleanup.peerValidationFnc = f + return nil + } +} + +// PeersForValidationFnc configures the function that will be used to select the peers that need to be validated during cleanup. +func PeersForValidationFnc(f PeerSelectionFunc) option { + return func(o *options) error { + o.tableCleanup.peersForValidationFnc = f + return nil + } +} + +// TableCleanupInterval configures the interval between two runs of the Routing Table cleanup routine. +func TableCleanupInterval(i time.Duration) option { + return func(o *options) error { + o.tableCleanup.interval = i + return nil + } +} + +// PeerValidationTimeout sets the timeout for a single peer validation during cleanup. +func PeerValidationTimeout(timeout time.Duration) option { + return func(o *options) error { + o.tableCleanup.peerValidationTimeout = timeout + return nil + } +} + +// Defaults are the default options. This option will be automatically +// prepended to any options you pass to the Routing Table constructor. +var Defaults = func(o *options) error { + o.tableCleanup.peerValidationTimeout = 30 * time.Second + o.tableCleanup.interval = 2 * time.Minute + + // default selector function selects all peers that are in missing state. + o.tableCleanup.peersForValidationFnc = func(peers []PeerInfo) []PeerInfo { + var selectedPeers []PeerInfo + for _, p := range peers { + if p.State == PeerStateMissing { + selectedPeers = append(selectedPeers, p) + } + } + return selectedPeers + } + + return nil +} diff --git a/sorting.go b/sorting.go index 3b67072..10aaf11 100644 --- a/sorting.go +++ b/sorting.go @@ -36,7 +36,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID) { // Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted. func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) { for e := l.Front(); e != nil; e = e.Next() { - pds.appendPeer(e.Value.(peer.ID)) + pds.appendPeer(e.Value.(*PeerInfo).Id) } } diff --git a/table.go b/table.go index 0b4455e..0e1cfa9 100644 --- a/table.go +++ b/table.go @@ -2,6 +2,7 @@ package kbucket import ( + "context" "encoding/binary" "errors" "fmt" @@ -11,9 +12,11 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - mh "github.com/multiformats/go-multihash" logging "github.com/ipfs/go-log" + "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" + mh "github.com/multiformats/go-multihash" ) var log = logging.Logger("table") @@ -21,6 +24,13 @@ var log = logging.Logger("table") var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high") var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity") +// PeerSelectionFunc is the signature of a function that selects zero or more peers from the given peers +// based on some criteria. +type PeerSelectionFunc func(peers []PeerInfo) []PeerInfo + +// PeerValidationFunc is the signature of a function that determines the validity a peer for Routing Table membership. +type PeerValidationFunc func(ctx context.Context, p peer.ID) bool + // maxCplForRefresh is the maximum cpl we support for refresh. // This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now. const maxCplForRefresh uint = 15 @@ -34,6 +44,7 @@ type CplRefresh struct { // RoutingTable defines the routing table. type RoutingTable struct { + ctx context.Context // ID of the local peer local ID @@ -47,31 +58,142 @@ type RoutingTable struct { maxLatency time.Duration // kBuckets define all the fingers to other nodes. - Buckets []*Bucket + buckets []*bucket bucketsize int cplRefreshLk sync.RWMutex cplRefreshedAt map[uint]time.Time + // replacement candidates for a Cpl + cplReplacementCache *cplReplacementCache + // notification functions PeerRemoved func(peer.ID) PeerAdded func(peer.ID) + + // function to determine the validity of a peer for RT membership + peerValidationFnc PeerValidationFunc + + // timeout for a single call to the peer validation function + peerValidationTimeout time.Duration + // interval between two runs of the table cleanup routine + tableCleanupInterval time.Duration + // function to select peers that need to be validated + peersForValidationFnc PeerSelectionFunc + + proc goprocess.Process } // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. -func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable { +// Passing a nil PeerValidationFunc disables periodic table cleanup. +func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, + opts ...option) (*RoutingTable, error) { + + var cfg options + if err := cfg.Apply(append([]option{Defaults}, opts...)...); err != nil { + return nil, err + } + rt := &RoutingTable{ - Buckets: []*Bucket{newBucket()}, - bucketsize: bucketsize, - local: localID, - maxLatency: latency, - metrics: m, + ctx: context.Background(), + buckets: []*bucket{newBucket()}, + bucketsize: bucketsize, + local: localID, + + maxLatency: latency, + metrics: m, + cplRefreshedAt: make(map[uint]time.Time), - PeerRemoved: func(peer.ID) {}, - PeerAdded: func(peer.ID) {}, + + PeerRemoved: func(peer.ID) {}, + PeerAdded: func(peer.ID) {}, + + peerValidationFnc: cfg.tableCleanup.peerValidationFnc, + peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc, + peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout, + tableCleanupInterval: cfg.tableCleanup.interval, + } + + rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize) + rt.proc = goprocessctx.WithContext(rt.ctx) + + // schedule periodic RT cleanup if peer validation function has been passed + if rt.peerValidationFnc != nil { + rt.proc.Go(rt.cleanup) + } + + return rt, nil +} + +// Close shuts down the Routing Table & all associated processes. +// It is safe to call this multiple times. +func (rt *RoutingTable) Close() error { + return rt.proc.Close() +} + +func (rt *RoutingTable) cleanup(proc goprocess.Process) { + validatePeerF := func(p peer.ID) bool { + queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout) + defer cancel() + return rt.peerValidationFnc(queryCtx, p) } - return rt + cleanupTickr := time.NewTicker(rt.tableCleanupInterval) + defer cleanupTickr.Stop() + for { + select { + case <-cleanupTickr.C: + ps := rt.peersToValidate() + for _, pinfo := range ps { + // continue if we are able to successfully validate the peer + // it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive() + // TODO Should we revisit this ? It makes more sense for the RT to mark it as active here + if validatePeerF(pinfo.Id) { + log.Infof("successfully validated missing peer=%s", pinfo.Id) + continue + } + + // peer does not seem to be alive, let's try candidates now + log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id) + // evict missing peer + rt.HandlePeerDead(pinfo.Id) + + // keep trying replacement candidates for the missing peer till we get a successful validation or + // we run out of candidates + cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local)) + c, notEmpty := rt.cplReplacementCache.pop(cpl) + for notEmpty { + if validatePeerF(c) { + log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id) + break + } + log.Infof("failed to validated candidate=%s", c) + // remove candidate + rt.HandlePeerDead(c) + + c, notEmpty = rt.cplReplacementCache.pop(cpl) + } + + if !notEmpty { + log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id) + } + } + case <-proc.Closing(): + return + } + } +} + +// returns the peers that need to be validated. +func (rt *RoutingTable) peersToValidate() []PeerInfo { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + var peers []PeerInfo + for _, b := range rt.buckets { + peers = append(peers, b.peers()...) + } + return rt.peersForValidationFnc(peers) } // GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh. @@ -128,24 +250,33 @@ func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) { rt.cplRefreshedAt[uint(cpl)] = newTime } -// Update adds or moves the given peer to the front of its respective bucket -func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) { - peerID := ConvertPeerID(p) - cpl := CommonPrefixLen(peerID, rt.local) - +// HandlePeerDisconnect should be called when the caller detects a disconnection with the peer. +// This enables the Routing Table to mark the peer as missing. +func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) { rt.tabLock.Lock() defer rt.tabLock.Unlock() - bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 + + bucketId := rt.bucketIdForPeer(p) + // mark the peer as missing + b := rt.buckets[bucketId] + if peer := b.getPeer(p); peer != nil { + peer.State = PeerStateMissing } +} + +// HandlePeerAlive should be called when the caller detects that a peer is alive. +// This could be a successful incoming/outgoing connection with the peer or even a successful message delivery to/from the peer. +// This enables the RT to update it's internal state to mark the peer as active. +func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) { + rt.tabLock.Lock() + defer rt.tabLock.Unlock() + + bucketID := rt.bucketIdForPeer(p) + bucket := rt.buckets[bucketID] + if peer := bucket.getPeer(p); peer != nil { + // mark the peer as active + peer.State = PeerStateActive - bucket := rt.Buckets[bucketID] - if bucket.Has(p) { - // If the peer is already in the table, move it to the front. - // This signifies that it it "more active" and the less active nodes - // Will as a result tend towards the back of the list - bucket.MoveToFront(p) return "", nil } @@ -155,49 +286,44 @@ func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) { } // We have enough space in the bucket (whether spawned or grouped). - if bucket.Len() < rt.bucketsize { - bucket.PushFront(p) + if bucket.len() < rt.bucketsize { + bucket.pushFront(&PeerInfo{p, PeerStateActive}) rt.PeerAdded(p) return "", nil } - if bucketID == len(rt.Buckets)-1 { + if bucketID == len(rt.buckets)-1 { // if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it. rt.nextBucket() // the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket. - bucketID = cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 - } - bucket = rt.Buckets[bucketID] - if bucket.Len() >= rt.bucketsize { - // if after all the unfolding, we're unable to find room for this peer, scrap it. - return "", ErrPeerRejectedNoCapacity + bucketID = rt.bucketIdForPeer(p) + bucket = rt.buckets[bucketID] + + // push the peer only if the bucket isn't overflowing after slitting + if bucket.len() < rt.bucketsize { + bucket.pushFront(&PeerInfo{p, PeerStateActive}) + rt.PeerAdded(p) + return "", nil } - bucket.PushFront(p) - rt.PeerAdded(p) - return "", nil } + // try to push it as a candidate in the replacement cache + rt.cplReplacementCache.push(p) return "", ErrPeerRejectedNoCapacity } -// Remove deletes a peer from the routing table. This is to be used -// when we are sure a node has disconnected completely. -func (rt *RoutingTable) Remove(p peer.ID) { - peerID := ConvertPeerID(p) - cpl := CommonPrefixLen(peerID, rt.local) +// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable. +// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one. +func (rt *RoutingTable) HandlePeerDead(p peer.ID) { + // remove it as a candidate + rt.cplReplacementCache.remove(p) + // remove it from the RT rt.tabLock.Lock() defer rt.tabLock.Unlock() - - bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 - } - - bucket := rt.Buckets[bucketID] - if bucket.Remove(p) { + bucketID := rt.bucketIdForPeer(p) + bucket := rt.buckets[bucketID] + if bucket.remove(p) { rt.PeerRemoved(p) } } @@ -206,12 +332,12 @@ func (rt *RoutingTable) nextBucket() { // This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets. // _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket. // This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8. - bucket := rt.Buckets[len(rt.Buckets)-1] - newBucket := bucket.Split(len(rt.Buckets)-1, rt.local) - rt.Buckets = append(rt.Buckets, newBucket) + bucket := rt.buckets[len(rt.buckets)-1] + newBucket := bucket.split(len(rt.buckets)-1, rt.local) + rt.buckets = append(rt.buckets, newBucket) // The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket. - if newBucket.Len() >= rt.bucketsize { + if newBucket.len() >= rt.bucketsize { // Keep unfolding the table until the last bucket is not overflowing. rt.nextBucket() } @@ -249,8 +375,8 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { rt.tabLock.RLock() // Get bucket index or last bucket - if cpl >= len(rt.Buckets) { - cpl = len(rt.Buckets) - 1 + if cpl >= len(rt.buckets) { + cpl = len(rt.buckets) - 1 } pds := peerDistanceSorter{ @@ -259,7 +385,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { } // Add peers from the target bucket (cpl+1 shared bits). - pds.appendPeersFromList(rt.Buckets[cpl].list) + pds.appendPeersFromList(rt.buckets[cpl].list) // If we're short, add peers from buckets to the right until we have // enough. All buckets to the right share exactly cpl bits (as opposed @@ -271,8 +397,8 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { // // However, we're going to do that anyways as it's "good enough" - for i := cpl + 1; i < len(rt.Buckets) && pds.Len() < count; i++ { - pds.appendPeersFromList(rt.Buckets[i].list) + for i := cpl + 1; i < len(rt.buckets) && pds.Len() < count; i++ { + pds.appendPeersFromList(rt.buckets[i].list) } // If we're still short, add in buckets that share _fewer_ bits. We can @@ -283,7 +409,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { // * bucket cpl-2: cpl-2 shared bits. // ... for i := cpl - 1; i >= 0 && pds.Len() < count; i-- { - pds.appendPeersFromList(rt.Buckets[i].list) + pds.appendPeersFromList(rt.buckets[i].list) } rt.tabLock.RUnlock() @@ -306,8 +432,8 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { func (rt *RoutingTable) Size() int { var tot int rt.tabLock.RLock() - for _, buck := range rt.Buckets { - tot += buck.Len() + for _, buck := range rt.buckets { + tot += buck.len() } rt.tabLock.RUnlock() return tot @@ -317,8 +443,8 @@ func (rt *RoutingTable) Size() int { func (rt *RoutingTable) ListPeers() []peer.ID { var peers []peer.ID rt.tabLock.RLock() - for _, buck := range rt.Buckets { - peers = append(peers, buck.Peers()...) + for _, buck := range rt.buckets { + peers = append(peers, buck.peerIds()...) } rt.tabLock.RUnlock() return peers @@ -329,15 +455,24 @@ func (rt *RoutingTable) Print() { fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency) rt.tabLock.RLock() - for i, b := range rt.Buckets { + for i, b := range rt.buckets { fmt.Printf("\tbucket: %d\n", i) - b.lk.RLock() for e := b.list.Front(); e != nil; e = e.Next() { p := e.Value.(peer.ID) fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String()) } - b.lk.RUnlock() } rt.tabLock.RUnlock() } + +// the caller is responsible for the locking +func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int { + peerID := ConvertPeerID(p) + cpl := CommonPrefixLen(peerID, rt.local) + bucketID := cpl + if bucketID >= len(rt.buckets) { + bucketID = len(rt.buckets) - 1 + } + return bucketID +} diff --git a/table_test.go b/table_test.go index ce83aff..c72d289 100644 --- a/table_test.go +++ b/table_test.go @@ -1,16 +1,23 @@ package kbucket import ( + "context" "math/rand" + "sync" "testing" "time" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/test" + pstore "github.com/libp2p/go-libp2p-peerstore" "github.com/stretchr/testify/require" ) +var PeerAlwaysValidFnc = func(ctx context.Context, p peer.ID) bool { + return true +} + // Test basic features of the bucket struct func TestBucket(t *testing.T) { t.Parallel() @@ -20,33 +27,43 @@ func TestBucket(t *testing.T) { peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - b.PushFront(peers[i]) + b.pushFront(&PeerInfo{peers[i], PeerStateActive}) } local := test.RandPeerIDFatal(t) localID := ConvertPeerID(local) - i := rand.Intn(len(peers)) - if !b.Has(peers[i]) { - t.Errorf("Failed to find peer: %v", peers[i]) - } + infos := b.peers() + require.Len(t, infos, 100) - spl := b.Split(0, ConvertPeerID(local)) + i := rand.Intn(len(peers)) + p := b.getPeer(peers[i]) + require.NotNil(t, p) + require.Equal(t, peers[i], p.Id) + require.Equal(t, PeerStateActive, p.State) + + // mark as missing + p.State = PeerStateMissing + p = b.getPeer(peers[i]) + require.NotNil(t, p) + require.Equal(t, PeerStateMissing, p.State) + + spl := b.split(0, ConvertPeerID(local)) llist := b.list for e := llist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(peer.ID)) + p := ConvertPeerID(e.Value.(*PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl > 0 { - t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket") + t.Fatalf("split failed. found id with cpl > 0 in 0 bucket") } } rlist := spl.list for e := rlist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(peer.ID)) + p := ConvertPeerID(e.Value.(*PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl == 0 { - t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket") + t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket") } } } @@ -56,7 +73,8 @@ func TestGenRandPeerID(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) // generate above maxCplForRefresh fails p, err := rt.GenRandPeerID(maxCplForRefresh + 1) @@ -74,12 +92,12 @@ func TestGenRandPeerID(t *testing.T) { func TestRefreshAndGetTrackedCpls(t *testing.T) { t.Parallel() - local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) - // add cpl's for tracking + // push cpl's for tracking for cpl := uint(0); cpl < maxCplForRefresh; cpl++ { peerID, err := rt.GenRandPeerID(cpl) require.NoError(t, err) @@ -100,12 +118,50 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { } } +func TestHandlePeerDead(t *testing.T) { + t.Parallel() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) + + // push 3 peers -> 2 for the first bucket, and 1 as candidates + var peers []peer.ID + for i := 0; i < 3; i++ { + p, err := rt.GenRandPeerID(uint(0)) + require.NoError(t, err) + require.NotEmpty(t, p) + rt.HandlePeerAlive(p) + peers = append(peers, p) + } + + // ensure we have 1 candidate + rt.cplReplacementCache.Lock() + require.NotNil(t, rt.cplReplacementCache.candidates[uint(0)]) + require.True(t, len(rt.cplReplacementCache.candidates[uint(0)]) == 1) + rt.cplReplacementCache.Unlock() + + // mark a peer as dead and ensure it's not in the RT + require.NotEmpty(t, rt.Find(peers[0])) + rt.HandlePeerDead(peers[0]) + require.Empty(t, rt.Find(peers[0])) + + // mark the peer as dead & verify we don't get it as a candidate + rt.HandlePeerDead(peers[2]) + + rt.cplReplacementCache.Lock() + require.Nil(t, rt.cplReplacementCache.candidates[uint(0)]) + rt.cplReplacementCache.Unlock() +} + func TestTableCallbacks(t *testing.T) { t.Parallel() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { @@ -120,18 +176,18 @@ func TestTableCallbacks(t *testing.T) { delete(pset, p) } - rt.Update(peers[0]) + rt.HandlePeerAlive(peers[0]) if _, ok := pset[peers[0]]; !ok { t.Fatal("should have this peer") } - rt.Remove(peers[0]) + rt.HandlePeerDead(peers[0]) if _, ok := pset[peers[0]]; ok { t.Fatal("should not have this peer") } for _, p := range peers { - rt.Update(p) + rt.HandlePeerAlive(p) } out := rt.ListPeers() @@ -147,22 +203,52 @@ func TestTableCallbacks(t *testing.T) { } } +func TestHandlePeerDisconnect(t *testing.T) { + t.Parallel() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) + + p := test.RandPeerIDFatal(t) + // mark a peer as alive + rt.HandlePeerAlive(p) + + // verify it's active + rt.tabLock.Lock() + bp := rt.buckets[0].getPeer(p) + require.NotNil(t, bp) + require.NotNil(t, bp) + require.Equal(t, PeerStateActive, bp.State) + rt.tabLock.Unlock() + + //now mark it as disconnected & verify it's in missing state + rt.HandlePeerDisconnect(p) + rt.tabLock.Lock() + bp = rt.buckets[0].getPeer(p) + require.NotNil(t, bp) + require.Equal(t, PeerStateMissing, bp.State) + rt.tabLock.Unlock() +} + // Right now, this just makes sure that it doesnt hang or crash -func TestTableUpdate(t *testing.T) { +func TestHandlePeerAlive(t *testing.T) { t.Parallel() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) } - // Testing Update + // Testing HandlePeerAlive for i := 0; i < 10000; i++ { - rt.Update(peers[rand.Intn(len(peers))]) + rt.HandlePeerAlive(peers[rand.Intn(len(peers))]) } for i := 0; i < 100; i++ { @@ -179,12 +265,13 @@ func TestTableFind(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 5; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } t.Logf("Searching for peer: '%s'", peers[2]) @@ -194,12 +281,43 @@ func TestTableFind(t *testing.T) { } } +func TestCandidateAddition(t *testing.T) { + t.Parallel() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(3, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) + + // generate 6 peers for the first bucket, 3 to push to it, and 3 as candidates + var peers []peer.ID + for i := 0; i < 6; i++ { + p, err := rt.GenRandPeerID(uint(0)) + require.NoError(t, err) + require.NotEmpty(t, p) + rt.HandlePeerAlive(p) + peers = append(peers, p) + } + + // fetch & verify candidates + for _, p := range peers[3:] { + ap, b := rt.cplReplacementCache.pop(0) + require.True(t, b) + require.Equal(t, p, ap) + } + + // now pop should fail as queue should be empty + _, b := rt.cplReplacementCache.pop(0) + require.False(t, b) +} + func TestTableEldestPreferred(t *testing.T) { t.Parallel() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) // generate size + 1 peers to saturate a bucket peers := make([]peer.ID, 15) @@ -212,14 +330,14 @@ func TestTableEldestPreferred(t *testing.T) { // test 10 first peers are accepted. for _, p := range peers[:10] { - if _, err := rt.Update(p); err != nil { + if _, err := rt.HandlePeerAlive(p); err != nil { t.Errorf("expected all 10 peers to be accepted; instead got: %v", err) } } // test next 5 peers are rejected. for _, p := range peers[10:] { - if _, err := rt.Update(p); err != ErrPeerRejectedNoCapacity { + if _, err := rt.HandlePeerAlive(p); err != ErrPeerRejectedNoCapacity { t.Errorf("expected extra 5 peers to be rejected; instead got: %v", err) } } @@ -230,12 +348,13 @@ func TestTableFindMultiple(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 18; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } t.Logf("Searching for peer: '%s'", peers[2]) @@ -264,12 +383,13 @@ func TestTableFindMultipleBuckets(t *testing.T) { localID := ConvertPeerID(local) m := pstore.NewMetrics() - rt := NewRoutingTable(5, localID, time.Hour, m) + rt, err := NewRoutingTable(5, localID, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } targetID := ConvertPeerID(peers[2]) @@ -277,7 +397,7 @@ func TestTableFindMultipleBuckets(t *testing.T) { closest := SortClosestPeers(rt.ListPeers(), targetID) targetCpl := CommonPrefixLen(localID, targetID) - // Split the peers into closer, same, and further from the key (than us). + // split the peers into closer, same, and further from the key (than us). var ( closer, same, further []peer.ID ) @@ -377,7 +497,8 @@ func TestTableMultithreaded(t *testing.T) { local := peer.ID("localPeer") m := pstore.NewMetrics() - tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m) + tab, err := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(t, err) var peers []peer.ID for i := 0; i < 500; i++ { peers = append(peers, test.RandPeerIDFatal(t)) @@ -387,7 +508,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.Update(peers[n]) + tab.HandlePeerAlive(peers[n]) } done <- struct{}{} }() @@ -395,7 +516,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.Update(peers[n]) + tab.HandlePeerAlive(peers[n]) } done <- struct{}{} }() @@ -412,11 +533,117 @@ func TestTableMultithreaded(t *testing.T) { <-done } -func BenchmarkUpdates(b *testing.B) { +func TestTableCleanup(t *testing.T) { + t.Parallel() + local := test.RandPeerIDFatal(t) + + // Generate: + // 6 peers with CPL 0 + // 6 peers with CPL 1 + cplPeerMap := make(map[int][]peer.ID) + for cpl := 0; cpl < 2; cpl++ { + i := 0 + + for { + p := test.RandPeerIDFatal(t) + if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl { + cplPeerMap[cpl] = append(cplPeerMap[cpl], p) + + i++ + if i == 6 { + break + } + } + } + } + + // mock peer validation fnc that successfully validates p[1], p[3] & p[5] + var addedCandidatesLk sync.Mutex + addedCandidates := make(map[peer.ID]struct{}) + f := func(ctx context.Context, p peer.ID) bool { + cpl := CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) + if cplPeerMap[cpl][1] == p || cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p { + // 1 is already in the RT, but 3 & 5 are candidates + if cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p { + addedCandidatesLk.Lock() + addedCandidates[p] = struct{}{} + addedCandidatesLk.Unlock() + } + + return true + } else { + return false + } + } + + // create RT with a very short cleanup interval + rt, err := NewRoutingTable(3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerValidationFnc(f), + TableCleanupInterval(100*time.Millisecond)) + require.NoError(t, err) + + // for each CPL, p[0], p[1] & p[2] got the bucket & p[3], p[4] & p[5] become candidates + for _, peers := range cplPeerMap { + for _, p := range peers { + rt.HandlePeerAlive(p) + + } + } + + // validate current state + rt.tabLock.RLock() + require.Len(t, rt.ListPeers(), 6) + ps0 := rt.buckets[0].peerIds() + require.Len(t, ps0, 3) + ps1 := rt.buckets[1].peerIds() + require.Len(t, ps1, 3) + require.Contains(t, ps0, cplPeerMap[0][0]) + require.Contains(t, ps0, cplPeerMap[0][1]) + require.Contains(t, ps0, cplPeerMap[0][2]) + require.Contains(t, ps1, cplPeerMap[1][0]) + require.Contains(t, ps1, cplPeerMap[1][1]) + require.Contains(t, ps1, cplPeerMap[1][2]) + rt.tabLock.RUnlock() + + // now disconnect peers 0 1 & 2 from both buckets so it has only 0 left after it gets validated + for _, peers := range cplPeerMap { + rt.HandlePeerDisconnect(peers[0]) + rt.HandlePeerDisconnect(peers[1]) + rt.HandlePeerDisconnect(peers[2]) + } + + // let RT cleanup complete + time.Sleep(1 * time.Second) + + // verify RT state + rt.tabLock.RLock() + require.Len(t, rt.ListPeers(), 2) + ps0 = rt.buckets[0].peerIds() + require.Len(t, ps0, 1) + ps1 = rt.buckets[1].peerIds() + require.Len(t, ps1, 1) + require.Contains(t, ps0, cplPeerMap[0][1]) + require.Contains(t, ps1, cplPeerMap[1][1]) + rt.tabLock.RUnlock() + + // verify candidate state + addedCandidatesLk.Lock() + require.Len(t, addedCandidates, 4) + require.Contains(t, addedCandidates, cplPeerMap[0][3]) + require.Contains(t, addedCandidates, cplPeerMap[0][5]) + require.Contains(t, addedCandidates, cplPeerMap[1][3]) + require.Contains(t, addedCandidates, cplPeerMap[1][5]) + addedCandidatesLk.Unlock() + + // close RT + require.NoError(t, rt.Close()) +} + +func BenchmarkHandlePeerAlive(b *testing.B) { b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab := NewRoutingTable(20, local, time.Hour, m) + tab, err := NewRoutingTable(20, local, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(b, err) var peers []peer.ID for i := 0; i < b.N; i++ { @@ -425,7 +652,7 @@ func BenchmarkUpdates(b *testing.B) { b.StartTimer() for i := 0; i < b.N; i++ { - tab.Update(peers[i]) + tab.HandlePeerAlive(peers[i]) } } @@ -433,12 +660,13 @@ func BenchmarkFinds(b *testing.B) { b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab := NewRoutingTable(20, local, time.Hour, m) + tab, err := NewRoutingTable(20, local, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + require.NoError(b, err) var peers []peer.ID for i := 0; i < b.N; i++ { peers = append(peers, test.RandPeerIDFatal(b)) - tab.Update(peers[i]) + tab.HandlePeerAlive(peers[i]) } b.StartTimer()