From ae0bcf57cd6ff8cc12082ac533861cbd1138cf56 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 14 Feb 2020 13:45:03 +0530 Subject: [PATCH 1/5] disassociate RT membership from connectivity --- bucket.go | 93 +++++++--- cpl_replacement_cache.go | 98 +++++++++++ cpl_replacement_cache_test.go | 82 +++++++++ go.mod | 1 + go.sum | 4 + options.go | 72 ++++++++ sorting.go | 2 +- table.go | 235 ++++++++++++++++++++------ table_test.go | 310 +++++++++++++++++++++++++++++----- 9 files changed, 781 insertions(+), 116 deletions(-) create mode 100644 cpl_replacement_cache.go create mode 100644 cpl_replacement_cache_test.go create mode 100644 options.go diff --git a/bucket.go b/bucket.go index 6a26f7b..0108799 100644 --- a/bucket.go +++ b/bucket.go @@ -9,45 +9,93 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -// Bucket holds a list of peers. -type Bucket struct { +// PeerState is the state of the peer as seen by the Routing Table. +type PeerState int + +const ( + // PeerStateActive indicates that we know the peer is active/alive. + PeerStateActive PeerState = iota + // PeerStateMissing indicates that we do not know the state of the peer. + PeerStateMissing +) + +// PeerInfo holds all related information for a peer in the K-Bucket. +type PeerInfo struct { + Id peer.ID + State PeerState +} + +// bucket holds a list of peers. +type bucket struct { lk sync.RWMutex list *list.List } -func newBucket() *Bucket { - b := new(Bucket) +func newBucket() *bucket { + b := new(bucket) b.list = list.New() return b } -func (b *Bucket) Peers() []peer.ID { +// returns all peers in the bucket +func (b *bucket) peers() []PeerInfo { + b.lk.RLock() + defer b.lk.RUnlock() + var ps []PeerInfo + for e := b.list.Front(); e != nil; e = e.Next() { + p := e.Value.(PeerInfo) + ps = append(ps, p) + } + return ps +} + +// return the Ids of all the peers in the bucket. +func (b *bucket) peerIds() []peer.ID { b.lk.RLock() defer b.lk.RUnlock() ps := make([]peer.ID, 0, b.list.Len()) for e := b.list.Front(); e != nil; e = e.Next() { - id := e.Value.(peer.ID) - ps = append(ps, id) + p := e.Value.(PeerInfo) + ps = append(ps, p.Id) } return ps } -func (b *Bucket) Has(id peer.ID) bool { +// returns the peer with the given Id and true if peer exists +// returns false if the peerId does not exist +func (b *bucket) getPeer(p peer.ID) (PeerInfo, bool) { b.lk.RLock() defer b.lk.RUnlock() for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { + if e.Value.(PeerInfo).Id == p { + return e.Value.(PeerInfo), true + } + } + return PeerInfo{}, false +} + +// replaces the peer based on the Id. +// returns true if the replace was successful, false otherwise. +func (b *bucket) replace(p PeerInfo) bool { + b.lk.Lock() + defer b.lk.Unlock() + for e := b.list.Front(); e != nil; e = e.Next() { + if e.Value.(PeerInfo).Id == p.Id { + b.list.Remove(e) + b.list.PushBack(p) return true } } return false } -func (b *Bucket) Remove(id peer.ID) bool { +// removes the peer with the given Id from the bucket. +// returns true if successful, false otherwise. +func (b *bucket) remove(id peer.ID) bool { b.lk.Lock() defer b.lk.Unlock() for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { + if e.Value.(PeerInfo).Id == id { b.list.Remove(e) return true } @@ -55,40 +103,33 @@ func (b *Bucket) Remove(id peer.ID) bool { return false } -func (b *Bucket) MoveToFront(id peer.ID) { +func (b *bucket) moveToFront(id peer.ID) { b.lk.Lock() defer b.lk.Unlock() + for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { + if e.Value.(PeerInfo).Id == id { b.list.MoveToFront(e) } } } -func (b *Bucket) PushFront(p peer.ID) { +func (b *bucket) pushFront(p PeerInfo) { b.lk.Lock() b.list.PushFront(p) b.lk.Unlock() } -func (b *Bucket) PopBack() peer.ID { - b.lk.Lock() - defer b.lk.Unlock() - last := b.list.Back() - b.list.Remove(last) - return last.Value.(peer.ID) -} - -func (b *Bucket) Len() int { +func (b *bucket) len() int { b.lk.RLock() defer b.lk.RUnlock() return b.list.Len() } -// Split splits a buckets peers into two buckets, the methods receiver will have +// splits a buckets peers into two buckets, the methods receiver will have // peers with CPL equal to cpl, the returned bucket will have peers with CPL // greater than cpl (returned bucket has closer peers) -func (b *Bucket) Split(cpl int, target ID) *Bucket { +func (b *bucket) split(cpl int, target ID) *bucket { b.lk.Lock() defer b.lk.Unlock() @@ -97,7 +138,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket { newbuck.list = out e := b.list.Front() for e != nil { - peerID := ConvertPeerID(e.Value.(peer.ID)) + peerID := ConvertPeerID(e.Value.(PeerInfo).Id) peerCPL := CommonPrefixLen(peerID, target) if peerCPL > cpl { cur := e diff --git a/cpl_replacement_cache.go b/cpl_replacement_cache.go new file mode 100644 index 0000000..7f17991 --- /dev/null +++ b/cpl_replacement_cache.go @@ -0,0 +1,98 @@ +package kbucket + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/wangjia184/sortedset" +) + +// TODO Should ideally use a Circular queue for this +// maintains a bounded, de-duplicated and FIFO peer candidate queue for each Cpl +type cplReplacementCache struct { + localPeer ID + maxQueueSize int + + sync.Mutex + candidates map[uint]*sortedset.SortedSet // candidates for a Cpl +} + +func newCplReplacementCache(localPeer ID, maxQueueSize int) *cplReplacementCache { + return &cplReplacementCache{ + localPeer: localPeer, + maxQueueSize: maxQueueSize, + candidates: make(map[uint]*sortedset.SortedSet), + } +} + +// pushes a candidate to the end of the queue for the corresponding Cpl +// returns false if the queue is full or it already has the peer +// returns true if was successfully added +func (c *cplReplacementCache) push(p peer.ID) bool { + c.Lock() + defer c.Unlock() + + // create queue if not created + cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) + if c.candidates[cpl] == nil { + c.candidates[cpl] = sortedset.New() + } + + q := c.candidates[cpl] + + // queue is full + if (q.GetCount()) >= c.maxQueueSize { + return false + } + // queue already has the peer + if q.GetByKey(string(p)) != nil { + return false + } + + // push + q.AddOrUpdate(string(p), sortedset.SCORE(q.GetCount()+1), nil) + return true +} + +// pops a candidate from the top of the candidate queue for the given Cpl +// returns false if the queue is empty +// returns the peerId and true if successful +func (c *cplReplacementCache) pop(cpl uint) (peer.ID, bool) { + c.Lock() + c.Unlock() + + q := c.candidates[cpl] + if q != nil && q.GetCount() > 0 { + n := q.PopMin() + + // delete the queue if it's empty + if q.GetCount() == 0 { + delete(c.candidates, cpl) + } + + return peer.ID(n.Key()), true + } + return "", false +} + +// removes a given peer if it's present +// returns false if the peer is absent +func (c *cplReplacementCache) remove(p peer.ID) bool { + c.Lock() + defer c.Unlock() + + cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) + q := c.candidates[cpl] + if q != nil { + q.Remove(string(p)) + + // remove the queue if it's empty + if q.GetCount() == 0 { + delete(c.candidates, cpl) + } + + return true + } + return false +} diff --git a/cpl_replacement_cache_test.go b/cpl_replacement_cache_test.go new file mode 100644 index 0000000..2d34bea --- /dev/null +++ b/cpl_replacement_cache_test.go @@ -0,0 +1,82 @@ +package kbucket + +import ( + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + + "github.com/stretchr/testify/require" +) + +func TestCandidateQueue(t *testing.T) { + t.Parallel() + + maxQSize := 2 + local := ConvertPeerID(test.RandPeerIDFatal(t)) + c := newCplReplacementCache(local, maxQSize) + + // pop an empty queue fails + p, b := c.pop(1) + require.Empty(t, p) + require.False(t, b) + + // push two elements to an empty queue works + testPeer1 := genPeer(t, local, 1) + testPeer2 := genPeer(t, local, 1) + + // pushing first peer works + require.True(t, c.push(testPeer1)) + // pushing a duplicate fails + require.False(t, c.push(testPeer1)) + // pushing another peers works + require.True(t, c.push(testPeer2)) + + // popping the above pushes works + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer1, p) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer2, p) + + // pushing & popping again works + require.True(t, c.push(testPeer1)) + require.True(t, c.push(testPeer2)) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer1, p) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer2, p) + + // fill up a queue + p1 := genPeer(t, local, 2) + p2 := genPeer(t, local, 2) + require.True(t, c.push(p1)) + require.True(t, c.push(p2)) + + // push should not work on a full queue + p3 := genPeer(t, local, 2) + require.False(t, c.push(p3)) + + // remove a peer & verify it's been removed + require.NotNil(t, c.candidates[2].GetByKey(string(p2))) + require.True(t, c.remove(p2)) + c.Lock() + require.Nil(t, c.candidates[2].GetByKey(string(p2))) + c.Unlock() + + // now push should work + require.True(t, c.push(p3)) +} + +func genPeer(t *testing.T, local ID, cpl int) peer.ID { + var p peer.ID + for { + p = test.RandPeerIDFatal(t) + if CommonPrefixLen(local, ConvertPeerID(p)) == cpl { + return p + } + } +} diff --git a/go.mod b/go.mod index 980d86b..c4125e8 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/minio/sha256-simd v0.1.1 github.com/multiformats/go-multihash v0.0.13 github.com/stretchr/testify v1.4.0 + github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 ) go 1.13 diff --git a/go.sum b/go.sum index 46342c1..593300d 100644 --- a/go.sum +++ b/go.sum @@ -174,6 +174,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -204,6 +206,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnUy4kzoKJemjH8eqwCl3RjW1r1ITCjjW7G8= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= diff --git a/options.go b/options.go new file mode 100644 index 0000000..730d96c --- /dev/null +++ b/options.go @@ -0,0 +1,72 @@ +package kbucket + +import ( + "fmt" + "time" +) + +// Option is the Routing Table functional option type. +type Option func(*Options) error + +// Options is a structure containing all the functional options that can be used when constructing a Routing Table. +type Options struct { + TableCleanup struct { + PeersForValidationFnc PeerSelectionFnc + PeerValidationTimeout time.Duration + Interval time.Duration + } +} + +// Apply applies the given options to this Option. +func (o *Options) Apply(opts ...Option) error { + for i, opt := range opts { + if err := opt(o); err != nil { + return fmt.Errorf("routing table option %d failed: %s", i, err) + } + } + return nil +} + +// PeersForValidationFnc configures the function that will be used to select the peers that need to be validated during cleanup. +func PeersForValidationFnc(f PeerSelectionFnc) Option { + return func(o *Options) error { + o.TableCleanup.PeersForValidationFnc = f + return nil + } +} + +// TableCleanupInterval configures the interval between two runs of the Routing Table cleanup routine. +func TableCleanupInterval(i time.Duration) Option { + return func(o *Options) error { + o.TableCleanup.Interval = i + return nil + } +} + +// PeerValidationTimeout sets the timeout for a single peer validation during cleanup. +func PeerValidationTimeout(timeout time.Duration) Option { + return func(o *Options) error { + o.TableCleanup.PeerValidationTimeout = timeout + return nil + } +} + +// Defaults are the default options. This option will be automatically +// prepended to any options you pass to the Routing Table constructor. +var Defaults = func(o *Options) error { + o.TableCleanup.PeerValidationTimeout = 30 * time.Second + o.TableCleanup.Interval = 2 * time.Minute + + // default selector function selects all peers that are in missing state. + o.TableCleanup.PeersForValidationFnc = func(peers []PeerInfo) []PeerInfo { + var selectedPeers []PeerInfo + for _, p := range peers { + if p.State == PeerStateMissing { + selectedPeers = append(selectedPeers, p) + } + } + return selectedPeers + } + + return nil +} diff --git a/sorting.go b/sorting.go index 3b67072..b5c2a8c 100644 --- a/sorting.go +++ b/sorting.go @@ -36,7 +36,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID) { // Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted. func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) { for e := l.Front(); e != nil; e = e.Next() { - pds.appendPeer(e.Value.(peer.ID)) + pds.appendPeer(e.Value.(PeerInfo).Id) } } diff --git a/table.go b/table.go index 0b4455e..59b9512 100644 --- a/table.go +++ b/table.go @@ -2,6 +2,7 @@ package kbucket import ( + "context" "encoding/binary" "errors" "fmt" @@ -11,9 +12,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - mh "github.com/multiformats/go-multihash" logging "github.com/ipfs/go-log" + mh "github.com/multiformats/go-multihash" ) var log = logging.Logger("table") @@ -21,6 +22,13 @@ var log = logging.Logger("table") var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high") var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity") +// PeerSelectionFnc is the signature of a function that selects zero or more peers from the given peers +// based on some criteria. +type PeerSelectionFnc func(peers []PeerInfo) []PeerInfo + +// PeerValidationFnc is the signature of a function that determines the validity a peer for Routing Table membership. +type PeerValidationFnc func(ctx context.Context, p peer.ID) bool + // maxCplForRefresh is the maximum cpl we support for refresh. // This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now. const maxCplForRefresh uint = 15 @@ -34,6 +42,8 @@ type CplRefresh struct { // RoutingTable defines the routing table. type RoutingTable struct { + ctx context.Context + // ID of the local peer local ID @@ -47,31 +57,133 @@ type RoutingTable struct { maxLatency time.Duration // kBuckets define all the fingers to other nodes. - Buckets []*Bucket + Buckets []*bucket bucketsize int cplRefreshLk sync.RWMutex cplRefreshedAt map[uint]time.Time + // replacement candidates for a Cpl + cplReplacementCache *cplReplacementCache + // notification functions PeerRemoved func(peer.ID) PeerAdded func(peer.ID) + + // function to determine the validity of a peer for RT membership + PeerValidationFnc PeerValidationFnc + + // timeout for a single call to the peer validation function + peerValidationTimeout time.Duration + // interval between two runs of the table cleanup routine + tableCleanupInterval time.Duration + // function to select peers that need to be validated + peersForValidationFnc PeerSelectionFnc } // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. -func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable { +// Passing a nil PeerValidationFnc disables periodic table cleanup. +func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, + peerValidationFnc PeerValidationFnc, options ...Option) (*RoutingTable, error) { + + var cfg Options + if err := cfg.Apply(append([]Option{Defaults}, options...)...); err != nil { + return nil, err + } + rt := &RoutingTable{ - Buckets: []*Bucket{newBucket()}, - bucketsize: bucketsize, - local: localID, - maxLatency: latency, - metrics: m, + ctx: ctx, + Buckets: []*bucket{newBucket()}, + bucketsize: bucketsize, + local: localID, + + maxLatency: latency, + metrics: m, + cplRefreshedAt: make(map[uint]time.Time), - PeerRemoved: func(peer.ID) {}, - PeerAdded: func(peer.ID) {}, + + PeerRemoved: func(peer.ID) {}, + PeerAdded: func(peer.ID) {}, + + PeerValidationFnc: peerValidationFnc, + peersForValidationFnc: cfg.TableCleanup.PeersForValidationFnc, + peerValidationTimeout: cfg.TableCleanup.PeerValidationTimeout, + tableCleanupInterval: cfg.TableCleanup.Interval, } - return rt + rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize) + + // schedule periodic RT cleanup + if peerValidationFnc != nil { + go rt.cleanup() + } + + return rt, nil +} + +func (rt *RoutingTable) cleanup() { + validatePeerF := func(p peer.ID) bool { + queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout) + defer cancel() + return rt.PeerValidationFnc(queryCtx, p) + } + + cleanupTickr := time.NewTicker(rt.tableCleanupInterval) + defer cleanupTickr.Stop() + for { + select { + case <-rt.ctx.Done(): + return + case <-cleanupTickr.C: + ps := rt.peersToValidate() + for _, pinfo := range ps { + // continue if we are able to successfully validate the peer + // it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive() + // TODO Should we revisit this ? It makes more sense for the RT to mark it as active here + if validatePeerF(pinfo.Id) { + log.Infof("successfully validated missing peer=%s", pinfo.Id) + continue + } + + // peer does not seem to be alive, let's try candidates now + log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id) + // evict missing peer + rt.HandlePeerDead(pinfo.Id) + + // keep trying replacement candidates for the missing peer till we get a successful validation or + // we run out of candidates + cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local)) + c, notEmpty := rt.cplReplacementCache.pop(cpl) + for notEmpty { + if validatePeerF(c) { + log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id) + break + } + log.Infof("failed to validated candidate=%s", c) + // remove candidate + rt.HandlePeerDead(c) + + c, notEmpty = rt.cplReplacementCache.pop(cpl) + } + + if !notEmpty { + log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id) + } + } + } + } +} + +// returns the peers that need to be validated. +func (rt *RoutingTable) peersToValidate() []PeerInfo { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + var peers []PeerInfo + for _, b := range rt.Buckets { + peers = append(peers, b.peers()...) + } + return rt.peersForValidationFnc(peers) } // GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh. @@ -128,24 +240,41 @@ func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) { rt.cplRefreshedAt[uint(cpl)] = newTime } -// Update adds or moves the given peer to the front of its respective bucket -func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) { - peerID := ConvertPeerID(p) - cpl := CommonPrefixLen(peerID, rt.local) - +// HandlePeerDisconnect should be called when the caller detects a disconnection with the peer. +// This enables the Routing Table to mark the peer as missing. +func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) { rt.tabLock.Lock() defer rt.tabLock.Unlock() - bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 + + // mark the peer as missing + bucketId := rt.bucketIdForPeer(p) + b := rt.Buckets[bucketId] + if peer, has := b.getPeer(p); has { + peer.State = PeerStateMissing + b.replace(peer) } +} + +// HandlePeerAlive should be called when the caller detects that a peer is alive. +// This could be a successful incoming/outgoing connection with the peer or even a successful message delivery to/from the peer. +// This enables the RT to update it's internal state to mark the peer as active. +func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) { + rt.tabLock.Lock() + defer rt.tabLock.Unlock() + bucketID := rt.bucketIdForPeer(p) bucket := rt.Buckets[bucketID] - if bucket.Has(p) { + if peer, has := bucket.getPeer(p); has { + // mark the peer as active if it was missing + if peer.State == PeerStateMissing { + peer.State = PeerStateActive + bucket.replace(peer) + } + // If the peer is already in the table, move it to the front. // This signifies that it it "more active" and the less active nodes // Will as a result tend towards the back of the list - bucket.MoveToFront(p) + bucket.moveToFront(p) return "", nil } @@ -155,8 +284,8 @@ func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) { } // We have enough space in the bucket (whether spawned or grouped). - if bucket.Len() < rt.bucketsize { - bucket.PushFront(p) + if bucket.len() < rt.bucketsize { + bucket.pushFront(PeerInfo{p, PeerStateActive}) rt.PeerAdded(p) return "", nil } @@ -165,39 +294,34 @@ func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) { // if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it. rt.nextBucket() // the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket. - bucketID = cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 - } + bucketID = rt.bucketIdForPeer(p) bucket = rt.Buckets[bucketID] - if bucket.Len() >= rt.bucketsize { - // if after all the unfolding, we're unable to find room for this peer, scrap it. - return "", ErrPeerRejectedNoCapacity + + // push the peer only if the bucket isn't overflowing after slitting + if bucket.len() < rt.bucketsize { + bucket.pushFront(PeerInfo{p, PeerStateActive}) + rt.PeerAdded(p) + return "", nil } - bucket.PushFront(p) - rt.PeerAdded(p) - return "", nil } + // try to push it as a candidate in the replacement cache + rt.cplReplacementCache.push(p) return "", ErrPeerRejectedNoCapacity } -// Remove deletes a peer from the routing table. This is to be used -// when we are sure a node has disconnected completely. -func (rt *RoutingTable) Remove(p peer.ID) { - peerID := ConvertPeerID(p) - cpl := CommonPrefixLen(peerID, rt.local) +// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable. +// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one. +func (rt *RoutingTable) HandlePeerDead(p peer.ID) { + // remove it as a candidate + rt.cplReplacementCache.remove(p) + // remove it from the RT rt.tabLock.Lock() defer rt.tabLock.Unlock() - - bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 - } - + bucketID := rt.bucketIdForPeer(p) bucket := rt.Buckets[bucketID] - if bucket.Remove(p) { + if bucket.remove(p) { rt.PeerRemoved(p) } } @@ -207,11 +331,11 @@ func (rt *RoutingTable) nextBucket() { // _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket. // This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8. bucket := rt.Buckets[len(rt.Buckets)-1] - newBucket := bucket.Split(len(rt.Buckets)-1, rt.local) + newBucket := bucket.split(len(rt.Buckets)-1, rt.local) rt.Buckets = append(rt.Buckets, newBucket) // The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket. - if newBucket.Len() >= rt.bucketsize { + if newBucket.len() >= rt.bucketsize { // Keep unfolding the table until the last bucket is not overflowing. rt.nextBucket() } @@ -248,7 +372,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { // It's assumed that this also protects the buckets. rt.tabLock.RLock() - // Get bucket index or last bucket + // getPeer bucket index or last bucket if cpl >= len(rt.Buckets) { cpl = len(rt.Buckets) - 1 } @@ -307,7 +431,7 @@ func (rt *RoutingTable) Size() int { var tot int rt.tabLock.RLock() for _, buck := range rt.Buckets { - tot += buck.Len() + tot += buck.len() } rt.tabLock.RUnlock() return tot @@ -318,7 +442,7 @@ func (rt *RoutingTable) ListPeers() []peer.ID { var peers []peer.ID rt.tabLock.RLock() for _, buck := range rt.Buckets { - peers = append(peers, buck.Peers()...) + peers = append(peers, buck.peerIds()...) } rt.tabLock.RUnlock() return peers @@ -341,3 +465,14 @@ func (rt *RoutingTable) Print() { } rt.tabLock.RUnlock() } + +// the caller is responsible for the locking +func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int { + peerID := ConvertPeerID(p) + cpl := CommonPrefixLen(peerID, rt.local) + bucketID := cpl + if bucketID >= len(rt.Buckets) { + bucketID = len(rt.Buckets) - 1 + } + return bucketID +} diff --git a/table_test.go b/table_test.go index ce83aff..e02219c 100644 --- a/table_test.go +++ b/table_test.go @@ -1,16 +1,22 @@ package kbucket import ( + "context" "math/rand" "testing" "time" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/test" + pstore "github.com/libp2p/go-libp2p-peerstore" "github.com/stretchr/testify/require" ) +var PeerAlwaysValidFnc = func(ctx context.Context, p peer.ID) bool { + return true +} + // Test basic features of the bucket struct func TestBucket(t *testing.T) { t.Parallel() @@ -20,43 +26,55 @@ func TestBucket(t *testing.T) { peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - b.PushFront(peers[i]) + b.pushFront(PeerInfo{peers[i], PeerStateActive}) } local := test.RandPeerIDFatal(t) localID := ConvertPeerID(local) - i := rand.Intn(len(peers)) - if !b.Has(peers[i]) { - t.Errorf("Failed to find peer: %v", peers[i]) - } + infos := b.peers() + require.Len(t, infos, 100) - spl := b.Split(0, ConvertPeerID(local)) + i := rand.Intn(len(peers)) + p, has := b.getPeer(peers[i]) + require.True(t, has) + require.Equal(t, peers[i], p.Id) + require.Equal(t, PeerStateActive, p.State) + + // replace + require.True(t, b.replace(PeerInfo{peers[i], PeerStateMissing})) + p, has = b.getPeer(peers[i]) + require.True(t, has) + require.Equal(t, PeerStateMissing, p.State) + + spl := b.split(0, ConvertPeerID(local)) llist := b.list for e := llist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(peer.ID)) + p := ConvertPeerID(e.Value.(PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl > 0 { - t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket") + t.Fatalf("split failed. found id with cpl > 0 in 0 bucket") } } rlist := spl.list for e := rlist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(peer.ID)) + p := ConvertPeerID(e.Value.(PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl == 0 { - t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket") + t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket") } } } func TestGenRandPeerID(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) // generate above maxCplForRefresh fails p, err := rt.GenRandPeerID(maxCplForRefresh + 1) @@ -74,12 +92,14 @@ func TestGenRandPeerID(t *testing.T) { func TestRefreshAndGetTrackedCpls(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) - // add cpl's for tracking + // push cpl's for tracking for cpl := uint(0); cpl < maxCplForRefresh; cpl++ { peerID, err := rt.GenRandPeerID(cpl) require.NoError(t, err) @@ -100,12 +120,52 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { } } +func TestHandlePeerDead(t *testing.T) { + t.Parallel() + ctx := context.Background() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(ctx, 2, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) + + // push 3 peers -> 2 for the first bucket, and 1 as candidates + var peers []peer.ID + for i := 0; i < 3; i++ { + p, err := rt.GenRandPeerID(uint(0)) + require.NoError(t, err) + require.NotEmpty(t, p) + rt.HandlePeerAlive(p) + peers = append(peers, p) + } + + // ensure we have 1 candidate + rt.cplReplacementCache.Lock() + require.NotNil(t, rt.cplReplacementCache.candidates[uint(0)]) + require.True(t, rt.cplReplacementCache.candidates[uint(0)].GetCount() == 1) + rt.cplReplacementCache.Unlock() + + // mark a peer as dead and ensure it's not in the RT + require.NotEmpty(t, rt.Find(peers[0])) + rt.HandlePeerDead(peers[0]) + require.Empty(t, rt.Find(peers[0])) + + // mark the peer as dead & verify we don't get it as a candidate + rt.HandlePeerDead(peers[2]) + + rt.cplReplacementCache.Lock() + require.Nil(t, rt.cplReplacementCache.candidates[uint(0)]) + rt.cplReplacementCache.Unlock() +} + func TestTableCallbacks(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { @@ -120,18 +180,18 @@ func TestTableCallbacks(t *testing.T) { delete(pset, p) } - rt.Update(peers[0]) + rt.HandlePeerAlive(peers[0]) if _, ok := pset[peers[0]]; !ok { t.Fatal("should have this peer") } - rt.Remove(peers[0]) + rt.HandlePeerDead(peers[0]) if _, ok := pset[peers[0]]; ok { t.Fatal("should not have this peer") } for _, p := range peers { - rt.Update(p) + rt.HandlePeerAlive(p) } out := rt.ListPeers() @@ -147,22 +207,55 @@ func TestTableCallbacks(t *testing.T) { } } +func TestHandlePeerDisconnect(t *testing.T) { + t.Parallel() + ctx := context.Background() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) + + p := test.RandPeerIDFatal(t) + // mark a peer as alive + rt.HandlePeerAlive(p) + + // verify it's active + rt.tabLock.Lock() + bp, has := rt.Buckets[0].getPeer(p) + require.True(t, has) + require.NotNil(t, bp) + require.Equal(t, PeerStateActive, bp.State) + rt.tabLock.Unlock() + + //now mark it as disconnected & verify it's in missing state + rt.HandlePeerDisconnect(p) + rt.tabLock.Lock() + bp, has = rt.Buckets[0].getPeer(p) + require.True(t, has) + require.NotNil(t, bp) + require.Equal(t, PeerStateMissing, bp.State) + rt.tabLock.Unlock() +} + // Right now, this just makes sure that it doesnt hang or crash -func TestTableUpdate(t *testing.T) { +func TestHandlePeerAlive(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) } - // Testing Update + // Testing HandlePeerAlive for i := 0; i < 10000; i++ { - rt.Update(peers[rand.Intn(len(peers))]) + rt.HandlePeerAlive(peers[rand.Intn(len(peers))]) } for i := 0; i < 100; i++ { @@ -176,15 +269,17 @@ func TestTableUpdate(t *testing.T) { func TestTableFind(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 5; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } t.Logf("Searching for peer: '%s'", peers[2]) @@ -194,12 +289,45 @@ func TestTableFind(t *testing.T) { } } +func TestCandidateAddition(t *testing.T) { + t.Parallel() + ctx := context.Background() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) + + // generate 6 peers for the first bucket, 3 to push to it, and 3 as candidates + var peers []peer.ID + for i := 0; i < 6; i++ { + p, err := rt.GenRandPeerID(uint(0)) + require.NoError(t, err) + require.NotEmpty(t, p) + rt.HandlePeerAlive(p) + peers = append(peers, p) + } + + // fetch & verify candidates + for _, p := range peers[3:] { + ap, b := rt.cplReplacementCache.pop(0) + require.True(t, b) + require.Equal(t, p, ap) + } + + // now pop should fail as queue should be empty + _, b := rt.cplReplacementCache.pop(0) + require.False(t, b) +} + func TestTableEldestPreferred(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) // generate size + 1 peers to saturate a bucket peers := make([]peer.ID, 15) @@ -212,14 +340,14 @@ func TestTableEldestPreferred(t *testing.T) { // test 10 first peers are accepted. for _, p := range peers[:10] { - if _, err := rt.Update(p); err != nil { + if _, err := rt.HandlePeerAlive(p); err != nil { t.Errorf("expected all 10 peers to be accepted; instead got: %v", err) } } // test next 5 peers are rejected. for _, p := range peers[10:] { - if _, err := rt.Update(p); err != ErrPeerRejectedNoCapacity { + if _, err := rt.HandlePeerAlive(p); err != ErrPeerRejectedNoCapacity { t.Errorf("expected extra 5 peers to be rejected; instead got: %v", err) } } @@ -227,15 +355,17 @@ func TestTableEldestPreferred(t *testing.T) { func TestTableFindMultiple(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 18; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } t.Logf("Searching for peer: '%s'", peers[2]) @@ -259,17 +389,19 @@ func assertSortedPeerIdsEqual(t *testing.T, a, b []peer.ID) { func TestTableFindMultipleBuckets(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) localID := ConvertPeerID(local) m := pstore.NewMetrics() - rt := NewRoutingTable(5, localID, time.Hour, m) + rt, err := NewRoutingTable(ctx, 5, localID, time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } targetID := ConvertPeerID(peers[2]) @@ -277,7 +409,7 @@ func TestTableFindMultipleBuckets(t *testing.T) { closest := SortClosestPeers(rt.ListPeers(), targetID) targetCpl := CommonPrefixLen(localID, targetID) - // Split the peers into closer, same, and further from the key (than us). + // split the peers into closer, same, and further from the key (than us). var ( closer, same, further []peer.ID ) @@ -374,10 +506,12 @@ func TestTableFindMultipleBuckets(t *testing.T) { // and set GOMAXPROCS above 1 func TestTableMultithreaded(t *testing.T) { t.Parallel() + ctx := context.Background() local := peer.ID("localPeer") m := pstore.NewMetrics() - tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m) + tab, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) var peers []peer.ID for i := 0; i < 500; i++ { peers = append(peers, test.RandPeerIDFatal(t)) @@ -387,7 +521,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.Update(peers[n]) + tab.HandlePeerAlive(peers[n]) } done <- struct{}{} }() @@ -395,7 +529,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.Update(peers[n]) + tab.HandlePeerAlive(peers[n]) } done <- struct{}{} }() @@ -412,11 +546,107 @@ func TestTableMultithreaded(t *testing.T) { <-done } -func BenchmarkUpdates(b *testing.B) { +func TestTableCleanup(t *testing.T) { + t.Parallel() + ctx := context.Background() + local := test.RandPeerIDFatal(t) + + // Generate: + // 6 peers with CPL 0 + // 6 peers with CPL 1 + cplPeerMap := make(map[int][]peer.ID) + for cpl := 0; cpl < 2; cpl++ { + i := 0 + + for { + p := test.RandPeerIDFatal(t) + if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl { + cplPeerMap[cpl] = append(cplPeerMap[cpl], p) + + i++ + if i == 6 { + break + } + } + } + } + + // create RT with a very short cleanup interval + rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerAlwaysValidFnc, + TableCleanupInterval(100*time.Millisecond)) + require.NoError(t, err) + + // mock peer validation fnc that successfully validates p[1], p[3] & p[5] + f := func(ctx context.Context, p peer.ID) bool { + cpl := CommonPrefixLen(rt.local, ConvertPeerID(p)) + if cplPeerMap[cpl][1] == p || cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p { + rt.HandlePeerAlive(p) + return true + + } else { + return false + } + } + + // for each CPL, p[0], p[1] & p[2] got the bucket & p[3], p[4] & p[5] become candidates + for _, peers := range cplPeerMap { + for _, p := range peers { + rt.HandlePeerAlive(p) + + } + } + + // validate current state + rt.tabLock.RLock() + require.Len(t, rt.ListPeers(), 6) + ps0 := rt.Buckets[0].peerIds() + require.Len(t, ps0, 3) + ps1 := rt.Buckets[1].peerIds() + require.Len(t, ps1, 3) + require.Contains(t, ps0, cplPeerMap[0][0]) + require.Contains(t, ps0, cplPeerMap[0][1]) + require.Contains(t, ps0, cplPeerMap[0][2]) + require.Contains(t, ps1, cplPeerMap[1][0]) + require.Contains(t, ps1, cplPeerMap[1][1]) + require.Contains(t, ps1, cplPeerMap[1][2]) + rt.tabLock.RUnlock() + + // now change peer validation fnc + rt.PeerValidationFnc = f + + // now mark p[0],p[1] & p[2] as dead so p[3] & p[5] replace p[0] and p[2] + for _, peers := range cplPeerMap { + rt.HandlePeerDisconnect(peers[0]) + rt.HandlePeerDisconnect(peers[1]) + rt.HandlePeerDisconnect(peers[2]) + } + + // let RT cleanup complete + time.Sleep(1 * time.Second) + + // verify RT state + rt.tabLock.RLock() + require.Len(t, rt.ListPeers(), 6) + ps0 = rt.Buckets[0].peerIds() + require.Len(t, ps0, 3) + ps1 = rt.Buckets[1].peerIds() + require.Len(t, ps1, 3) + require.Contains(t, ps0, cplPeerMap[0][1]) + require.Contains(t, ps0, cplPeerMap[0][3]) + require.Contains(t, ps0, cplPeerMap[0][5]) + require.Contains(t, ps1, cplPeerMap[1][1]) + require.Contains(t, ps1, cplPeerMap[1][3]) + require.Contains(t, ps1, cplPeerMap[1][5]) + rt.tabLock.RUnlock() +} + +func BenchmarkHandlePeerAlive(b *testing.B) { + ctx := context.Background() b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab := NewRoutingTable(20, local, time.Hour, m) + tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerAlwaysValidFnc) + require.NoError(b, err) var peers []peer.ID for i := 0; i < b.N; i++ { @@ -425,20 +655,22 @@ func BenchmarkUpdates(b *testing.B) { b.StartTimer() for i := 0; i < b.N; i++ { - tab.Update(peers[i]) + tab.HandlePeerAlive(peers[i]) } } func BenchmarkFinds(b *testing.B) { + ctx := context.Background() b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab := NewRoutingTable(20, local, time.Hour, m) + tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerAlwaysValidFnc) + require.NoError(b, err) var peers []peer.ID for i := 0; i < b.N; i++ { peers = append(peers, test.RandPeerIDFatal(b)) - tab.Update(peers[i]) + tab.HandlePeerAlive(peers[i]) } b.StartTimer() From 1c814a57e2999c0f1b21a59d6e6262b93ba90301 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 17 Feb 2020 13:56:03 +0530 Subject: [PATCH 2/5] changes as per review --- bucket.go | 42 +++++-------- cpl_replacement_cache.go | 47 +++++++------- cpl_replacement_cache_test.go | 17 ++++-- options.go | 14 ++++- sorting.go | 2 +- table.go | 92 +++++++++++++--------------- table_test.go | 111 ++++++++++++++++++---------------- 7 files changed, 163 insertions(+), 162 deletions(-) diff --git a/bucket.go b/bucket.go index 0108799..f556e81 100644 --- a/bucket.go +++ b/bucket.go @@ -38,13 +38,14 @@ func newBucket() *bucket { } // returns all peers in the bucket +// it is safe for the caller to modify the returned objects as it is a defensive copy func (b *bucket) peers() []PeerInfo { b.lk.RLock() defer b.lk.RUnlock() var ps []PeerInfo for e := b.list.Front(); e != nil; e = e.Next() { - p := e.Value.(PeerInfo) - ps = append(ps, p) + p := e.Value.(*PeerInfo) + ps = append(ps, *p) } return ps } @@ -55,38 +56,23 @@ func (b *bucket) peerIds() []peer.ID { defer b.lk.RUnlock() ps := make([]peer.ID, 0, b.list.Len()) for e := b.list.Front(); e != nil; e = e.Next() { - p := e.Value.(PeerInfo) + p := e.Value.(*PeerInfo) ps = append(ps, p.Id) } return ps } -// returns the peer with the given Id and true if peer exists -// returns false if the peerId does not exist -func (b *bucket) getPeer(p peer.ID) (PeerInfo, bool) { +// returns the peer with the given Id if it exists +// returns nil if the peerId does not exist +func (b *bucket) getPeer(p peer.ID) *PeerInfo { b.lk.RLock() defer b.lk.RUnlock() for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(PeerInfo).Id == p { - return e.Value.(PeerInfo), true + if e.Value.(*PeerInfo).Id == p { + return e.Value.(*PeerInfo) } } - return PeerInfo{}, false -} - -// replaces the peer based on the Id. -// returns true if the replace was successful, false otherwise. -func (b *bucket) replace(p PeerInfo) bool { - b.lk.Lock() - defer b.lk.Unlock() - for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(PeerInfo).Id == p.Id { - b.list.Remove(e) - b.list.PushBack(p) - return true - } - } - return false + return nil } // removes the peer with the given Id from the bucket. @@ -95,7 +81,7 @@ func (b *bucket) remove(id peer.ID) bool { b.lk.Lock() defer b.lk.Unlock() for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(PeerInfo).Id == id { + if e.Value.(*PeerInfo).Id == id { b.list.Remove(e) return true } @@ -108,13 +94,13 @@ func (b *bucket) moveToFront(id peer.ID) { defer b.lk.Unlock() for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(PeerInfo).Id == id { + if e.Value.(*PeerInfo).Id == id { b.list.MoveToFront(e) } } } -func (b *bucket) pushFront(p PeerInfo) { +func (b *bucket) pushFront(p *PeerInfo) { b.lk.Lock() b.list.PushFront(p) b.lk.Unlock() @@ -138,7 +124,7 @@ func (b *bucket) split(cpl int, target ID) *bucket { newbuck.list = out e := b.list.Front() for e != nil { - peerID := ConvertPeerID(e.Value.(PeerInfo).Id) + peerID := ConvertPeerID(e.Value.(*PeerInfo).Id) peerCPL := CommonPrefixLen(peerID, target) if peerCPL > cpl { cur := e diff --git a/cpl_replacement_cache.go b/cpl_replacement_cache.go index 7f17991..2f521f7 100644 --- a/cpl_replacement_cache.go +++ b/cpl_replacement_cache.go @@ -4,8 +4,6 @@ import ( "sync" "github.com/libp2p/go-libp2p-core/peer" - - "github.com/wangjia184/sortedset" ) // TODO Should ideally use a Circular queue for this @@ -15,14 +13,14 @@ type cplReplacementCache struct { maxQueueSize int sync.Mutex - candidates map[uint]*sortedset.SortedSet // candidates for a Cpl + candidates map[uint][]peer.ID // candidates for a Cpl } func newCplReplacementCache(localPeer ID, maxQueueSize int) *cplReplacementCache { return &cplReplacementCache{ localPeer: localPeer, maxQueueSize: maxQueueSize, - candidates: make(map[uint]*sortedset.SortedSet), + candidates: make(map[uint][]peer.ID), } } @@ -33,25 +31,21 @@ func (c *cplReplacementCache) push(p peer.ID) bool { c.Lock() defer c.Unlock() - // create queue if not created cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) - if c.candidates[cpl] == nil { - c.candidates[cpl] = sortedset.New() - } - - q := c.candidates[cpl] // queue is full - if (q.GetCount()) >= c.maxQueueSize { + if len(c.candidates[cpl]) >= c.maxQueueSize { return false } // queue already has the peer - if q.GetByKey(string(p)) != nil { - return false + for _, pr := range c.candidates[cpl] { + if pr == p { + return false + } } // push - q.AddOrUpdate(string(p), sortedset.SCORE(q.GetCount()+1), nil) + c.candidates[cpl] = append(c.candidates[cpl], p) return true } @@ -60,18 +54,18 @@ func (c *cplReplacementCache) push(p peer.ID) bool { // returns the peerId and true if successful func (c *cplReplacementCache) pop(cpl uint) (peer.ID, bool) { c.Lock() - c.Unlock() + defer c.Unlock() - q := c.candidates[cpl] - if q != nil && q.GetCount() > 0 { - n := q.PopMin() + if len(c.candidates[cpl]) != 0 { + p := c.candidates[cpl][0] + c.candidates[cpl] = c.candidates[cpl][1:] // delete the queue if it's empty - if q.GetCount() == 0 { + if len(c.candidates[cpl]) == 0 { delete(c.candidates, cpl) } - return peer.ID(n.Key()), true + return p, true } return "", false } @@ -83,12 +77,17 @@ func (c *cplReplacementCache) remove(p peer.ID) bool { defer c.Unlock() cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) - q := c.candidates[cpl] - if q != nil { - q.Remove(string(p)) + + if len(c.candidates[cpl]) != 0 { + // remove the peer if it's present + for i, pr := range c.candidates[cpl] { + if pr == p { + c.candidates[cpl] = append(c.candidates[cpl][:i], c.candidates[cpl][i+1:]...) + } + } // remove the queue if it's empty - if q.GetCount() == 0 { + if len(c.candidates[cpl]) == 0 { delete(c.candidates, cpl) } diff --git a/cpl_replacement_cache_test.go b/cpl_replacement_cache_test.go index 2d34bea..98981a4 100644 --- a/cpl_replacement_cache_test.go +++ b/cpl_replacement_cache_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestCandidateQueue(t *testing.T) { +func TestCplReplacementCache(t *testing.T) { t.Parallel() maxQSize := 2 @@ -61,14 +61,19 @@ func TestCandidateQueue(t *testing.T) { require.False(t, c.push(p3)) // remove a peer & verify it's been removed - require.NotNil(t, c.candidates[2].GetByKey(string(p2))) - require.True(t, c.remove(p2)) + p4 := genPeer(t, local, 0) + require.True(t, c.push(p4)) + c.Lock() - require.Nil(t, c.candidates[2].GetByKey(string(p2))) + require.Len(t, c.candidates[0], 1) + require.True(t, c.candidates[0][0] == p4) c.Unlock() - // now push should work - require.True(t, c.push(p3)) + require.True(t, c.remove(p4)) + + c.Lock() + require.Len(t, c.candidates[0], 0) + c.Unlock() } func genPeer(t *testing.T, local ID, cpl int) peer.ID { diff --git a/options.go b/options.go index 730d96c..4e1bdfa 100644 --- a/options.go +++ b/options.go @@ -11,7 +11,8 @@ type Option func(*Options) error // Options is a structure containing all the functional options that can be used when constructing a Routing Table. type Options struct { TableCleanup struct { - PeersForValidationFnc PeerSelectionFnc + PeerValidationFnc PeerValidationFunc + PeersForValidationFnc PeerSelectionFunc PeerValidationTimeout time.Duration Interval time.Duration } @@ -27,8 +28,17 @@ func (o *Options) Apply(opts ...Option) error { return nil } +// PeerValidationFnc configures the Peer Validation function used for RT cleanup. +// Not configuring this disables Routing Table cleanup. +func PeerValidationFnc(f PeerValidationFunc) Option { + return func(o *Options) error { + o.TableCleanup.PeerValidationFnc = f + return nil + } +} + // PeersForValidationFnc configures the function that will be used to select the peers that need to be validated during cleanup. -func PeersForValidationFnc(f PeerSelectionFnc) Option { +func PeersForValidationFnc(f PeerSelectionFunc) Option { return func(o *Options) error { o.TableCleanup.PeersForValidationFnc = f return nil diff --git a/sorting.go b/sorting.go index b5c2a8c..10aaf11 100644 --- a/sorting.go +++ b/sorting.go @@ -36,7 +36,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID) { // Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted. func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) { for e := l.Front(); e != nil; e = e.Next() { - pds.appendPeer(e.Value.(PeerInfo).Id) + pds.appendPeer(e.Value.(*PeerInfo).Id) } } diff --git a/table.go b/table.go index 59b9512..d8c4c57 100644 --- a/table.go +++ b/table.go @@ -22,12 +22,12 @@ var log = logging.Logger("table") var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high") var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity") -// PeerSelectionFnc is the signature of a function that selects zero or more peers from the given peers +// PeerSelectionFunc is the signature of a function that selects zero or more peers from the given peers // based on some criteria. -type PeerSelectionFnc func(peers []PeerInfo) []PeerInfo +type PeerSelectionFunc func(peers []PeerInfo) []PeerInfo -// PeerValidationFnc is the signature of a function that determines the validity a peer for Routing Table membership. -type PeerValidationFnc func(ctx context.Context, p peer.ID) bool +// PeerValidationFunc is the signature of a function that determines the validity a peer for Routing Table membership. +type PeerValidationFunc func(ctx context.Context, p peer.ID) bool // maxCplForRefresh is the maximum cpl we support for refresh. // This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now. @@ -57,7 +57,7 @@ type RoutingTable struct { maxLatency time.Duration // kBuckets define all the fingers to other nodes. - Buckets []*bucket + buckets []*bucket bucketsize int cplRefreshLk sync.RWMutex @@ -71,20 +71,20 @@ type RoutingTable struct { PeerAdded func(peer.ID) // function to determine the validity of a peer for RT membership - PeerValidationFnc PeerValidationFnc + peerValidationFnc PeerValidationFunc // timeout for a single call to the peer validation function peerValidationTimeout time.Duration // interval between two runs of the table cleanup routine tableCleanupInterval time.Duration // function to select peers that need to be validated - peersForValidationFnc PeerSelectionFnc + peersForValidationFnc PeerSelectionFunc } // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. -// Passing a nil PeerValidationFnc disables periodic table cleanup. +// Passing a nil PeerValidationFunc disables periodic table cleanup. func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, - peerValidationFnc PeerValidationFnc, options ...Option) (*RoutingTable, error) { + options ...Option) (*RoutingTable, error) { var cfg Options if err := cfg.Apply(append([]Option{Defaults}, options...)...); err != nil { @@ -93,7 +93,7 @@ func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency ti rt := &RoutingTable{ ctx: ctx, - Buckets: []*bucket{newBucket()}, + buckets: []*bucket{newBucket()}, bucketsize: bucketsize, local: localID, @@ -105,7 +105,7 @@ func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency ti PeerRemoved: func(peer.ID) {}, PeerAdded: func(peer.ID) {}, - PeerValidationFnc: peerValidationFnc, + peerValidationFnc: cfg.TableCleanup.PeerValidationFnc, peersForValidationFnc: cfg.TableCleanup.PeersForValidationFnc, peerValidationTimeout: cfg.TableCleanup.PeerValidationTimeout, tableCleanupInterval: cfg.TableCleanup.Interval, @@ -113,8 +113,8 @@ func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency ti rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize) - // schedule periodic RT cleanup - if peerValidationFnc != nil { + // schedule periodic RT cleanup if peer validation function has been passed + if rt.peerValidationFnc != nil { go rt.cleanup() } @@ -125,7 +125,7 @@ func (rt *RoutingTable) cleanup() { validatePeerF := func(p peer.ID) bool { queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout) defer cancel() - return rt.PeerValidationFnc(queryCtx, p) + return rt.peerValidationFnc(queryCtx, p) } cleanupTickr := time.NewTicker(rt.tableCleanupInterval) @@ -180,7 +180,7 @@ func (rt *RoutingTable) peersToValidate() []PeerInfo { defer rt.tabLock.RUnlock() var peers []PeerInfo - for _, b := range rt.Buckets { + for _, b := range rt.buckets { peers = append(peers, b.peers()...) } return rt.peersForValidationFnc(peers) @@ -246,12 +246,11 @@ func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) { rt.tabLock.Lock() defer rt.tabLock.Unlock() - // mark the peer as missing bucketId := rt.bucketIdForPeer(p) - b := rt.Buckets[bucketId] - if peer, has := b.getPeer(p); has { + // mark the peer as missing + b := rt.buckets[bucketId] + if peer := b.getPeer(p); peer != nil { peer.State = PeerStateMissing - b.replace(peer) } } @@ -263,18 +262,11 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) defer rt.tabLock.Unlock() bucketID := rt.bucketIdForPeer(p) - bucket := rt.Buckets[bucketID] - if peer, has := bucket.getPeer(p); has { - // mark the peer as active if it was missing - if peer.State == PeerStateMissing { - peer.State = PeerStateActive - bucket.replace(peer) - } + bucket := rt.buckets[bucketID] + if peer := bucket.getPeer(p); peer != nil { + // mark the peer as active + peer.State = PeerStateActive - // If the peer is already in the table, move it to the front. - // This signifies that it it "more active" and the less active nodes - // Will as a result tend towards the back of the list - bucket.moveToFront(p) return "", nil } @@ -285,21 +277,21 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) // We have enough space in the bucket (whether spawned or grouped). if bucket.len() < rt.bucketsize { - bucket.pushFront(PeerInfo{p, PeerStateActive}) + bucket.pushFront(&PeerInfo{p, PeerStateActive}) rt.PeerAdded(p) return "", nil } - if bucketID == len(rt.Buckets)-1 { + if bucketID == len(rt.buckets)-1 { // if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it. rt.nextBucket() // the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket. bucketID = rt.bucketIdForPeer(p) - bucket = rt.Buckets[bucketID] + bucket = rt.buckets[bucketID] // push the peer only if the bucket isn't overflowing after slitting if bucket.len() < rt.bucketsize { - bucket.pushFront(PeerInfo{p, PeerStateActive}) + bucket.pushFront(&PeerInfo{p, PeerStateActive}) rt.PeerAdded(p) return "", nil } @@ -320,7 +312,7 @@ func (rt *RoutingTable) HandlePeerDead(p peer.ID) { rt.tabLock.Lock() defer rt.tabLock.Unlock() bucketID := rt.bucketIdForPeer(p) - bucket := rt.Buckets[bucketID] + bucket := rt.buckets[bucketID] if bucket.remove(p) { rt.PeerRemoved(p) } @@ -330,9 +322,9 @@ func (rt *RoutingTable) nextBucket() { // This is the last bucket, which allegedly is a mixed bag containing peers not belonging in dedicated (unfolded) buckets. // _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket. // This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8. - bucket := rt.Buckets[len(rt.Buckets)-1] - newBucket := bucket.split(len(rt.Buckets)-1, rt.local) - rt.Buckets = append(rt.Buckets, newBucket) + bucket := rt.buckets[len(rt.buckets)-1] + newBucket := bucket.split(len(rt.buckets)-1, rt.local) + rt.buckets = append(rt.buckets, newBucket) // The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket. if newBucket.len() >= rt.bucketsize { @@ -372,9 +364,9 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { // It's assumed that this also protects the buckets. rt.tabLock.RLock() - // getPeer bucket index or last bucket - if cpl >= len(rt.Buckets) { - cpl = len(rt.Buckets) - 1 + // Get bucket index or last bucket + if cpl >= len(rt.buckets) { + cpl = len(rt.buckets) - 1 } pds := peerDistanceSorter{ @@ -383,7 +375,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { } // Add peers from the target bucket (cpl+1 shared bits). - pds.appendPeersFromList(rt.Buckets[cpl].list) + pds.appendPeersFromList(rt.buckets[cpl].list) // If we're short, add peers from buckets to the right until we have // enough. All buckets to the right share exactly cpl bits (as opposed @@ -395,8 +387,8 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { // // However, we're going to do that anyways as it's "good enough" - for i := cpl + 1; i < len(rt.Buckets) && pds.Len() < count; i++ { - pds.appendPeersFromList(rt.Buckets[i].list) + for i := cpl + 1; i < len(rt.buckets) && pds.Len() < count; i++ { + pds.appendPeersFromList(rt.buckets[i].list) } // If we're still short, add in buckets that share _fewer_ bits. We can @@ -407,7 +399,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { // * bucket cpl-2: cpl-2 shared bits. // ... for i := cpl - 1; i >= 0 && pds.Len() < count; i-- { - pds.appendPeersFromList(rt.Buckets[i].list) + pds.appendPeersFromList(rt.buckets[i].list) } rt.tabLock.RUnlock() @@ -430,7 +422,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { func (rt *RoutingTable) Size() int { var tot int rt.tabLock.RLock() - for _, buck := range rt.Buckets { + for _, buck := range rt.buckets { tot += buck.len() } rt.tabLock.RUnlock() @@ -441,7 +433,7 @@ func (rt *RoutingTable) Size() int { func (rt *RoutingTable) ListPeers() []peer.ID { var peers []peer.ID rt.tabLock.RLock() - for _, buck := range rt.Buckets { + for _, buck := range rt.buckets { peers = append(peers, buck.peerIds()...) } rt.tabLock.RUnlock() @@ -453,7 +445,7 @@ func (rt *RoutingTable) Print() { fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency) rt.tabLock.RLock() - for i, b := range rt.Buckets { + for i, b := range rt.buckets { fmt.Printf("\tbucket: %d\n", i) b.lk.RLock() @@ -471,8 +463,8 @@ func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int { peerID := ConvertPeerID(p) cpl := CommonPrefixLen(peerID, rt.local) bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 + if bucketID >= len(rt.buckets) { + bucketID = len(rt.buckets) - 1 } return bucketID } diff --git a/table_test.go b/table_test.go index e02219c..5d8b150 100644 --- a/table_test.go +++ b/table_test.go @@ -3,6 +3,7 @@ package kbucket import ( "context" "math/rand" + "sync" "testing" "time" @@ -26,7 +27,7 @@ func TestBucket(t *testing.T) { peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - b.pushFront(PeerInfo{peers[i], PeerStateActive}) + b.pushFront(&PeerInfo{peers[i], PeerStateActive}) } local := test.RandPeerIDFatal(t) @@ -36,21 +37,21 @@ func TestBucket(t *testing.T) { require.Len(t, infos, 100) i := rand.Intn(len(peers)) - p, has := b.getPeer(peers[i]) - require.True(t, has) + p := b.getPeer(peers[i]) + require.NotNil(t, p) require.Equal(t, peers[i], p.Id) require.Equal(t, PeerStateActive, p.State) - // replace - require.True(t, b.replace(PeerInfo{peers[i], PeerStateMissing})) - p, has = b.getPeer(peers[i]) - require.True(t, has) + // mark as missing + p.State = PeerStateMissing + p = b.getPeer(peers[i]) + require.NotNil(t, p) require.Equal(t, PeerStateMissing, p.State) spl := b.split(0, ConvertPeerID(local)) llist := b.list for e := llist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(PeerInfo).Id) + p := ConvertPeerID(e.Value.(*PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl > 0 { t.Fatalf("split failed. found id with cpl > 0 in 0 bucket") @@ -59,7 +60,7 @@ func TestBucket(t *testing.T) { rlist := spl.list for e := rlist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(PeerInfo).Id) + p := ConvertPeerID(e.Value.(*PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl == 0 { t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket") @@ -73,7 +74,7 @@ func TestGenRandPeerID(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // generate above maxCplForRefresh fails @@ -96,7 +97,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // push cpl's for tracking @@ -126,7 +127,7 @@ func TestHandlePeerDead(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 2, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // push 3 peers -> 2 for the first bucket, and 1 as candidates @@ -142,7 +143,7 @@ func TestHandlePeerDead(t *testing.T) { // ensure we have 1 candidate rt.cplReplacementCache.Lock() require.NotNil(t, rt.cplReplacementCache.candidates[uint(0)]) - require.True(t, rt.cplReplacementCache.candidates[uint(0)].GetCount() == 1) + require.True(t, len(rt.cplReplacementCache.candidates[uint(0)]) == 1) rt.cplReplacementCache.Unlock() // mark a peer as dead and ensure it's not in the RT @@ -164,7 +165,7 @@ func TestTableCallbacks(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -213,7 +214,7 @@ func TestHandlePeerDisconnect(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) p := test.RandPeerIDFatal(t) @@ -222,8 +223,8 @@ func TestHandlePeerDisconnect(t *testing.T) { // verify it's active rt.tabLock.Lock() - bp, has := rt.Buckets[0].getPeer(p) - require.True(t, has) + bp := rt.buckets[0].getPeer(p) + require.NotNil(t, bp) require.NotNil(t, bp) require.Equal(t, PeerStateActive, bp.State) rt.tabLock.Unlock() @@ -231,8 +232,7 @@ func TestHandlePeerDisconnect(t *testing.T) { //now mark it as disconnected & verify it's in missing state rt.HandlePeerDisconnect(p) rt.tabLock.Lock() - bp, has = rt.Buckets[0].getPeer(p) - require.True(t, has) + bp = rt.buckets[0].getPeer(p) require.NotNil(t, bp) require.Equal(t, PeerStateMissing, bp.State) rt.tabLock.Unlock() @@ -245,7 +245,7 @@ func TestHandlePeerAlive(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -273,7 +273,7 @@ func TestTableFind(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -295,7 +295,7 @@ func TestCandidateAddition(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // generate 6 peers for the first bucket, 3 to push to it, and 3 as candidates @@ -326,7 +326,7 @@ func TestTableEldestPreferred(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // generate size + 1 peers to saturate a bucket @@ -359,7 +359,7 @@ func TestTableFindMultiple(t *testing.T) { local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -395,7 +395,7 @@ func TestTableFindMultipleBuckets(t *testing.T) { localID := ConvertPeerID(local) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 5, localID, time.Hour, m, PeerAlwaysValidFnc) + rt, err := NewRoutingTable(ctx, 5, localID, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -510,7 +510,7 @@ func TestTableMultithreaded(t *testing.T) { local := peer.ID("localPeer") m := pstore.NewMetrics() - tab, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + tab, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) var peers []peer.ID for i := 0; i < 500; i++ { @@ -571,23 +571,30 @@ func TestTableCleanup(t *testing.T) { } } - // create RT with a very short cleanup interval - rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerAlwaysValidFnc, - TableCleanupInterval(100*time.Millisecond)) - require.NoError(t, err) - // mock peer validation fnc that successfully validates p[1], p[3] & p[5] + var addedCandidatesLk sync.Mutex + addedCandidates := make(map[peer.ID]struct{}) f := func(ctx context.Context, p peer.ID) bool { - cpl := CommonPrefixLen(rt.local, ConvertPeerID(p)) + cpl := CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) if cplPeerMap[cpl][1] == p || cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p { - rt.HandlePeerAlive(p) - return true + // 1 is already in the RT, but 3 & 5 are candidates + if cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p { + addedCandidatesLk.Lock() + addedCandidates[p] = struct{}{} + addedCandidatesLk.Unlock() + } + return true } else { return false } } + // create RT with a very short cleanup interval + rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerValidationFnc(f), + TableCleanupInterval(100*time.Millisecond)) + require.NoError(t, err) + // for each CPL, p[0], p[1] & p[2] got the bucket & p[3], p[4] & p[5] become candidates for _, peers := range cplPeerMap { for _, p := range peers { @@ -599,9 +606,9 @@ func TestTableCleanup(t *testing.T) { // validate current state rt.tabLock.RLock() require.Len(t, rt.ListPeers(), 6) - ps0 := rt.Buckets[0].peerIds() + ps0 := rt.buckets[0].peerIds() require.Len(t, ps0, 3) - ps1 := rt.Buckets[1].peerIds() + ps1 := rt.buckets[1].peerIds() require.Len(t, ps1, 3) require.Contains(t, ps0, cplPeerMap[0][0]) require.Contains(t, ps0, cplPeerMap[0][1]) @@ -611,10 +618,7 @@ func TestTableCleanup(t *testing.T) { require.Contains(t, ps1, cplPeerMap[1][2]) rt.tabLock.RUnlock() - // now change peer validation fnc - rt.PeerValidationFnc = f - - // now mark p[0],p[1] & p[2] as dead so p[3] & p[5] replace p[0] and p[2] + // now disconnect peers 0 1 & 2 from both buckets so it has only 0 left after it gets validated for _, peers := range cplPeerMap { rt.HandlePeerDisconnect(peers[0]) rt.HandlePeerDisconnect(peers[1]) @@ -626,18 +630,23 @@ func TestTableCleanup(t *testing.T) { // verify RT state rt.tabLock.RLock() - require.Len(t, rt.ListPeers(), 6) - ps0 = rt.Buckets[0].peerIds() - require.Len(t, ps0, 3) - ps1 = rt.Buckets[1].peerIds() - require.Len(t, ps1, 3) + require.Len(t, rt.ListPeers(), 2) + ps0 = rt.buckets[0].peerIds() + require.Len(t, ps0, 1) + ps1 = rt.buckets[1].peerIds() + require.Len(t, ps1, 1) require.Contains(t, ps0, cplPeerMap[0][1]) - require.Contains(t, ps0, cplPeerMap[0][3]) - require.Contains(t, ps0, cplPeerMap[0][5]) require.Contains(t, ps1, cplPeerMap[1][1]) - require.Contains(t, ps1, cplPeerMap[1][3]) - require.Contains(t, ps1, cplPeerMap[1][5]) rt.tabLock.RUnlock() + + // verify candidate state + addedCandidatesLk.Lock() + require.Len(t, addedCandidates, 4) + require.Contains(t, addedCandidates, cplPeerMap[0][3]) + require.Contains(t, addedCandidates, cplPeerMap[0][5]) + require.Contains(t, addedCandidates, cplPeerMap[1][3]) + require.Contains(t, addedCandidates, cplPeerMap[1][5]) + addedCandidatesLk.Unlock() } func BenchmarkHandlePeerAlive(b *testing.B) { @@ -645,7 +654,7 @@ func BenchmarkHandlePeerAlive(b *testing.B) { b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerAlwaysValidFnc) + tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(b, err) var peers []peer.ID @@ -664,7 +673,7 @@ func BenchmarkFinds(b *testing.B) { b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerAlwaysValidFnc) + tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(b, err) var peers []peer.ID From acf009a77dab7fbccc08abd89696815cde14f44b Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 18 Feb 2020 12:36:17 +0530 Subject: [PATCH 3/5] implement idomatic RT shutdown --- go.mod | 1 + table.go | 24 +++++++++++++++++------- table_test.go | 49 ++++++++++++++++++------------------------------- 3 files changed, 36 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index c4125e8..1af884a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/libp2p/go-libp2p-kbucket require ( github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-log v0.0.1 + github.com/jbenet/goprocess v0.1.3 github.com/libp2p/go-libp2p-core v0.3.0 github.com/libp2p/go-libp2p-peerstore v0.1.4 github.com/minio/sha256-simd v0.1.1 diff --git a/table.go b/table.go index d8c4c57..2f2e297 100644 --- a/table.go +++ b/table.go @@ -14,6 +14,8 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" logging "github.com/ipfs/go-log" + "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" mh "github.com/multiformats/go-multihash" ) @@ -43,7 +45,6 @@ type CplRefresh struct { // RoutingTable defines the routing table. type RoutingTable struct { ctx context.Context - // ID of the local peer local ID @@ -79,11 +80,13 @@ type RoutingTable struct { tableCleanupInterval time.Duration // function to select peers that need to be validated peersForValidationFnc PeerSelectionFunc + + proc goprocess.Process } // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. // Passing a nil PeerValidationFunc disables periodic table cleanup. -func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, +func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, options ...Option) (*RoutingTable, error) { var cfg Options @@ -92,7 +95,7 @@ func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency ti } rt := &RoutingTable{ - ctx: ctx, + ctx: context.Background(), buckets: []*bucket{newBucket()}, bucketsize: bucketsize, local: localID, @@ -112,16 +115,23 @@ func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency ti } rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize) + rt.proc = goprocessctx.WithContext(rt.ctx) // schedule periodic RT cleanup if peer validation function has been passed if rt.peerValidationFnc != nil { - go rt.cleanup() + rt.proc.Go(rt.cleanup) } return rt, nil } -func (rt *RoutingTable) cleanup() { +// Close shuts down the Routing Table & all associated processes. +// It is safe to call this multiple times. +func (rt *RoutingTable) Close() error { + return rt.proc.Close() +} + +func (rt *RoutingTable) cleanup(proc goprocess.Process) { validatePeerF := func(p peer.ID) bool { queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout) defer cancel() @@ -132,8 +142,6 @@ func (rt *RoutingTable) cleanup() { defer cleanupTickr.Stop() for { select { - case <-rt.ctx.Done(): - return case <-cleanupTickr.C: ps := rt.peersToValidate() for _, pinfo := range ps { @@ -170,6 +178,8 @@ func (rt *RoutingTable) cleanup() { log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id) } } + case <-proc.Closing(): + return } } } diff --git a/table_test.go b/table_test.go index 5d8b150..c72d289 100644 --- a/table_test.go +++ b/table_test.go @@ -70,11 +70,10 @@ func TestBucket(t *testing.T) { func TestGenRandPeerID(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // generate above maxCplForRefresh fails @@ -93,11 +92,9 @@ func TestGenRandPeerID(t *testing.T) { func TestRefreshAndGetTrackedCpls(t *testing.T) { t.Parallel() - ctx := context.Background() - local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // push cpl's for tracking @@ -123,11 +120,10 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { func TestHandlePeerDead(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // push 3 peers -> 2 for the first bucket, and 1 as candidates @@ -161,11 +157,10 @@ func TestHandlePeerDead(t *testing.T) { func TestTableCallbacks(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -210,11 +205,10 @@ func TestTableCallbacks(t *testing.T) { func TestHandlePeerDisconnect(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) p := test.RandPeerIDFatal(t) @@ -241,11 +235,10 @@ func TestHandlePeerDisconnect(t *testing.T) { // Right now, this just makes sure that it doesnt hang or crash func TestHandlePeerAlive(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -269,11 +262,10 @@ func TestHandlePeerAlive(t *testing.T) { func TestTableFind(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -291,11 +283,10 @@ func TestTableFind(t *testing.T) { func TestCandidateAddition(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(3, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // generate 6 peers for the first bucket, 3 to push to it, and 3 as candidates @@ -322,11 +313,10 @@ func TestCandidateAddition(t *testing.T) { func TestTableEldestPreferred(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) // generate size + 1 peers to saturate a bucket @@ -355,11 +345,10 @@ func TestTableEldestPreferred(t *testing.T) { func TestTableFindMultiple(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -389,13 +378,12 @@ func assertSortedPeerIdsEqual(t *testing.T, a, b []peer.ID) { func TestTableFindMultipleBuckets(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) localID := ConvertPeerID(local) m := pstore.NewMetrics() - rt, err := NewRoutingTable(ctx, 5, localID, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + rt, err := NewRoutingTable(5, localID, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) peers := make([]peer.ID, 100) @@ -506,11 +494,10 @@ func TestTableFindMultipleBuckets(t *testing.T) { // and set GOMAXPROCS above 1 func TestTableMultithreaded(t *testing.T) { t.Parallel() - ctx := context.Background() local := peer.ID("localPeer") m := pstore.NewMetrics() - tab, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + tab, err := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(t, err) var peers []peer.ID for i := 0; i < 500; i++ { @@ -548,7 +535,6 @@ func TestTableMultithreaded(t *testing.T) { func TestTableCleanup(t *testing.T) { t.Parallel() - ctx := context.Background() local := test.RandPeerIDFatal(t) // Generate: @@ -591,7 +577,7 @@ func TestTableCleanup(t *testing.T) { } // create RT with a very short cleanup interval - rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerValidationFnc(f), + rt, err := NewRoutingTable(3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerValidationFnc(f), TableCleanupInterval(100*time.Millisecond)) require.NoError(t, err) @@ -647,14 +633,16 @@ func TestTableCleanup(t *testing.T) { require.Contains(t, addedCandidates, cplPeerMap[1][3]) require.Contains(t, addedCandidates, cplPeerMap[1][5]) addedCandidatesLk.Unlock() + + // close RT + require.NoError(t, rt.Close()) } func BenchmarkHandlePeerAlive(b *testing.B) { - ctx := context.Background() b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + tab, err := NewRoutingTable(20, local, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(b, err) var peers []peer.ID @@ -669,11 +657,10 @@ func BenchmarkHandlePeerAlive(b *testing.B) { } func BenchmarkFinds(b *testing.B) { - ctx := context.Background() b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) + tab, err := NewRoutingTable(20, local, time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc)) require.NoError(b, err) var peers []peer.ID From 6cceecfeb50243c6bde852e7ebb2a56a879be68d Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 19 Feb 2020 20:25:00 +0530 Subject: [PATCH 4/5] do not export options --- options.go | 54 +++++++++++++++++++++++++++--------------------------- table.go | 14 +++++++------- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/options.go b/options.go index 4e1bdfa..f2bbeaf 100644 --- a/options.go +++ b/options.go @@ -5,21 +5,21 @@ import ( "time" ) -// Option is the Routing Table functional option type. -type Option func(*Options) error +// option is the Routing Table functional option type. +type option func(*options) error -// Options is a structure containing all the functional options that can be used when constructing a Routing Table. -type Options struct { - TableCleanup struct { - PeerValidationFnc PeerValidationFunc - PeersForValidationFnc PeerSelectionFunc - PeerValidationTimeout time.Duration - Interval time.Duration +// options is a structure containing all the functional options that can be used when constructing a Routing Table. +type options struct { + tableCleanup struct { + peerValidationFnc PeerValidationFunc + peersForValidationFnc PeerSelectionFunc + peerValidationTimeout time.Duration + interval time.Duration } } -// Apply applies the given options to this Option. -func (o *Options) Apply(opts ...Option) error { +// Apply applies the given options to this option. +func (o *options) Apply(opts ...option) error { for i, opt := range opts { if err := opt(o); err != nil { return fmt.Errorf("routing table option %d failed: %s", i, err) @@ -30,45 +30,45 @@ func (o *Options) Apply(opts ...Option) error { // PeerValidationFnc configures the Peer Validation function used for RT cleanup. // Not configuring this disables Routing Table cleanup. -func PeerValidationFnc(f PeerValidationFunc) Option { - return func(o *Options) error { - o.TableCleanup.PeerValidationFnc = f +func PeerValidationFnc(f PeerValidationFunc) option { + return func(o *options) error { + o.tableCleanup.peerValidationFnc = f return nil } } // PeersForValidationFnc configures the function that will be used to select the peers that need to be validated during cleanup. -func PeersForValidationFnc(f PeerSelectionFunc) Option { - return func(o *Options) error { - o.TableCleanup.PeersForValidationFnc = f +func PeersForValidationFnc(f PeerSelectionFunc) option { + return func(o *options) error { + o.tableCleanup.peersForValidationFnc = f return nil } } // TableCleanupInterval configures the interval between two runs of the Routing Table cleanup routine. -func TableCleanupInterval(i time.Duration) Option { - return func(o *Options) error { - o.TableCleanup.Interval = i +func TableCleanupInterval(i time.Duration) option { + return func(o *options) error { + o.tableCleanup.interval = i return nil } } // PeerValidationTimeout sets the timeout for a single peer validation during cleanup. -func PeerValidationTimeout(timeout time.Duration) Option { - return func(o *Options) error { - o.TableCleanup.PeerValidationTimeout = timeout +func PeerValidationTimeout(timeout time.Duration) option { + return func(o *options) error { + o.tableCleanup.peerValidationTimeout = timeout return nil } } // Defaults are the default options. This option will be automatically // prepended to any options you pass to the Routing Table constructor. -var Defaults = func(o *Options) error { - o.TableCleanup.PeerValidationTimeout = 30 * time.Second - o.TableCleanup.Interval = 2 * time.Minute +var Defaults = func(o *options) error { + o.tableCleanup.peerValidationTimeout = 30 * time.Second + o.tableCleanup.interval = 2 * time.Minute // default selector function selects all peers that are in missing state. - o.TableCleanup.PeersForValidationFnc = func(peers []PeerInfo) []PeerInfo { + o.tableCleanup.peersForValidationFnc = func(peers []PeerInfo) []PeerInfo { var selectedPeers []PeerInfo for _, p := range peers { if p.State == PeerStateMissing { diff --git a/table.go b/table.go index 2f2e297..a605c54 100644 --- a/table.go +++ b/table.go @@ -87,10 +87,10 @@ type RoutingTable struct { // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. // Passing a nil PeerValidationFunc disables periodic table cleanup. func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, - options ...Option) (*RoutingTable, error) { + opts ...option) (*RoutingTable, error) { - var cfg Options - if err := cfg.Apply(append([]Option{Defaults}, options...)...); err != nil { + var cfg options + if err := cfg.Apply(append([]option{Defaults}, opts...)...); err != nil { return nil, err } @@ -108,10 +108,10 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst PeerRemoved: func(peer.ID) {}, PeerAdded: func(peer.ID) {}, - peerValidationFnc: cfg.TableCleanup.PeerValidationFnc, - peersForValidationFnc: cfg.TableCleanup.PeersForValidationFnc, - peerValidationTimeout: cfg.TableCleanup.PeerValidationTimeout, - tableCleanupInterval: cfg.TableCleanup.Interval, + peerValidationFnc: cfg.tableCleanup.peerValidationFnc, + peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc, + peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout, + tableCleanupInterval: cfg.tableCleanup.interval, } rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize) From 5e0665910af372583700ea800ed86aed88410625 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 20 Feb 2020 13:11:11 +0530 Subject: [PATCH 5/5] remove locking from buckets --- bucket.go | 24 ++++-------------------- table.go | 2 -- 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/bucket.go b/bucket.go index f556e81..4165cd4 100644 --- a/bucket.go +++ b/bucket.go @@ -4,8 +4,6 @@ package kbucket import ( "container/list" - "sync" - "github.com/libp2p/go-libp2p-core/peer" ) @@ -26,8 +24,11 @@ type PeerInfo struct { } // bucket holds a list of peers. +// we synchronize on the Routing Table lock for all access to the bucket +// and so do not need any locks in the bucket. +// if we want/need to avoid locking the table for accessing a bucket in the future, +// it WILL be the caller's responsibility to synchronize all access to a bucket. type bucket struct { - lk sync.RWMutex list *list.List } @@ -40,8 +41,6 @@ func newBucket() *bucket { // returns all peers in the bucket // it is safe for the caller to modify the returned objects as it is a defensive copy func (b *bucket) peers() []PeerInfo { - b.lk.RLock() - defer b.lk.RUnlock() var ps []PeerInfo for e := b.list.Front(); e != nil; e = e.Next() { p := e.Value.(*PeerInfo) @@ -52,8 +51,6 @@ func (b *bucket) peers() []PeerInfo { // return the Ids of all the peers in the bucket. func (b *bucket) peerIds() []peer.ID { - b.lk.RLock() - defer b.lk.RUnlock() ps := make([]peer.ID, 0, b.list.Len()) for e := b.list.Front(); e != nil; e = e.Next() { p := e.Value.(*PeerInfo) @@ -65,8 +62,6 @@ func (b *bucket) peerIds() []peer.ID { // returns the peer with the given Id if it exists // returns nil if the peerId does not exist func (b *bucket) getPeer(p peer.ID) *PeerInfo { - b.lk.RLock() - defer b.lk.RUnlock() for e := b.list.Front(); e != nil; e = e.Next() { if e.Value.(*PeerInfo).Id == p { return e.Value.(*PeerInfo) @@ -78,8 +73,6 @@ func (b *bucket) getPeer(p peer.ID) *PeerInfo { // removes the peer with the given Id from the bucket. // returns true if successful, false otherwise. func (b *bucket) remove(id peer.ID) bool { - b.lk.Lock() - defer b.lk.Unlock() for e := b.list.Front(); e != nil; e = e.Next() { if e.Value.(*PeerInfo).Id == id { b.list.Remove(e) @@ -90,8 +83,6 @@ func (b *bucket) remove(id peer.ID) bool { } func (b *bucket) moveToFront(id peer.ID) { - b.lk.Lock() - defer b.lk.Unlock() for e := b.list.Front(); e != nil; e = e.Next() { if e.Value.(*PeerInfo).Id == id { @@ -101,14 +92,10 @@ func (b *bucket) moveToFront(id peer.ID) { } func (b *bucket) pushFront(p *PeerInfo) { - b.lk.Lock() b.list.PushFront(p) - b.lk.Unlock() } func (b *bucket) len() int { - b.lk.RLock() - defer b.lk.RUnlock() return b.list.Len() } @@ -116,9 +103,6 @@ func (b *bucket) len() int { // peers with CPL equal to cpl, the returned bucket will have peers with CPL // greater than cpl (returned bucket has closer peers) func (b *bucket) split(cpl int, target ID) *bucket { - b.lk.Lock() - defer b.lk.Unlock() - out := list.New() newbuck := newBucket() newbuck.list = out diff --git a/table.go b/table.go index a605c54..0e1cfa9 100644 --- a/table.go +++ b/table.go @@ -458,12 +458,10 @@ func (rt *RoutingTable) Print() { for i, b := range rt.buckets { fmt.Printf("\tbucket: %d\n", i) - b.lk.RLock() for e := b.list.Front(); e != nil; e = e.Next() { p := e.Value.(peer.ID) fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String()) } - b.lk.RUnlock() } rt.tabLock.RUnlock() }