Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request #200 from libp2p/marco/with-clock-2
Browse files Browse the repository at this point in the history
feat: Use a clock interface in pstoreds as well
  • Loading branch information
MarcoPolo authored May 27, 2022
2 parents 3bf1f8c + 1f173bd commit 9f3a96b
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 56 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
retract v0.2.9 // Contains backwards-incompatible changes. Use v0.3.0 instead.

require (
github.com/benbjohnson/clock v1.3.0
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-datastore v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw=
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
Expand Down
44 changes: 33 additions & 11 deletions pstoreds/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
// * after an entry has been modified (e.g. addresses have been added or removed, TTLs updated, etc.)
//
// If the return value is true, the caller should perform a flush immediately to sync the record with the store.
func (r *addrsRecord) clean() (chgd bool) {
now := time.Now().Unix()
func (r *addrsRecord) clean(now time.Time) (chgd bool) {
nowUnix := now.Unix()
addrsLen := len(r.Addrs)

if !r.dirty && !r.hasExpiredAddrs(now) {
if !r.dirty && !r.hasExpiredAddrs(nowUnix) {
// record is not dirty, and we have no expired entries to purge.
return false
}
Expand All @@ -104,7 +104,7 @@ func (r *addrsRecord) clean() (chgd bool) {
})
}

r.Addrs = removeExpired(r.Addrs, now)
r.Addrs = removeExpired(r.Addrs, nowUnix)

return r.dirty || len(r.Addrs) != addrsLen
}
Expand Down Expand Up @@ -144,6 +144,23 @@ type dsAddrBook struct {
// controls children goroutine lifetime.
childrenDone sync.WaitGroup
cancelFn func()

clock clock
}

type clock interface {
Now() time.Time
After(d time.Duration) <-chan time.Time
}

type realclock struct{}

func (rc realclock) Now() time.Time {
return time.Now()
}

func (rc realclock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}

var _ pstore.AddrBook = (*dsAddrBook)(nil)
Expand Down Expand Up @@ -176,6 +193,11 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd
opts: opts,
cancelFn: cancelFn,
subsManager: pstoremem.NewAddrSubManager(),
clock: realclock{},
}

if opts.Clock != nil {
ab.clock = opts.Clock
}

if opts.CacheSize > 0 {
Expand Down Expand Up @@ -212,7 +234,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
pr.Lock()
defer pr.Unlock()

if pr.clean() && update {
if pr.clean(ab.clock.Now()) && update {
err = pr.flush(ab.ds)
}
return pr, err
Expand All @@ -231,7 +253,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
return nil, err
}
// this record is new and local for now (not in cache), so we don't need to lock.
if pr.clean() && update {
if pr.clean(ab.clock.Now()) && update {
err = pr.flush(ab.ds)
}
default:
Expand Down Expand Up @@ -383,7 +405,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D
pr.Lock()
defer pr.Unlock()

newExp := time.Now().Add(newTTL).Unix()
newExp := ab.clock.Now().Add(newTTL).Unix()
for _, entry := range pr.Addrs {
if entry.Ttl != int64(oldTTL) {
continue
Expand All @@ -392,7 +414,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D
pr.dirty = true
}

if pr.clean() {
if pr.clean(ab.clock.Now()) {
pr.flush(ab.ds)
}
}
Expand Down Expand Up @@ -461,7 +483,7 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
// return nil
// }

newExp := time.Now().Add(ttl).Unix()
newExp := ab.clock.Now().Add(ttl).Unix()
addrsMap := make(map[string]*pb.AddrBookRecord_AddrEntry, len(pr.Addrs))
for _, addr := range pr.Addrs {
addrsMap[string(addr.Addr.Bytes())] = addr
Expand Down Expand Up @@ -521,7 +543,7 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
// }

pr.dirty = true
pr.clean()
pr.clean(ab.clock.Now())
return pr.flush(ab.ds)
}

Expand Down Expand Up @@ -567,7 +589,7 @@ func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
pr.Addrs = deleteInPlace(pr.Addrs, addrs)

pr.dirty = true
pr.clean()
pr.clean(ab.clock.Now())
return pr.flush(ab.ds)
}

Expand Down
36 changes: 29 additions & 7 deletions pstoreds/addr_book_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
// queries
purgeLookaheadQuery = query.Query{
Prefix: gcLookaheadBase.String(),
Orders: []query.Order{query.OrderByKey{}},
Orders: []query.Order{query.OrderByFunction(orderByTimestampInKey)},
KeysOnly: true,
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func (gc *dsAddrBookGc) background() {
defer gc.ab.childrenDone.Done()

select {
case <-time.After(gc.ab.opts.GCInitialDelay):
case <-gc.ab.clock.After(gc.ab.opts.GCInitialDelay):
case <-gc.ab.ctx.Done():
// yield if we have been cancelled/closed before the delay elapses.
return
Expand Down Expand Up @@ -180,7 +180,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
}
defer results.Close()

now := time.Now().Unix()
now := gc.ab.clock.Now().Unix()

// keys: /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32>
// values: nil
Expand Down Expand Up @@ -214,7 +214,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
if e, ok := gc.ab.cache.Peek(id); ok {
cached := e.(*addrsRecord)
cached.Lock()
if cached.clean() {
if cached.clean(gc.ab.clock.Now()) {
if err = cached.flush(batch); err != nil {
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
}
Expand All @@ -239,7 +239,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
dropInError(gcKey, err, "unmarshalling entry")
continue
}
if record.clean() {
if record.clean(gc.ab.clock.Now()) {
err = record.flush(batch)
if err != nil {
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
Expand Down Expand Up @@ -284,7 +284,7 @@ func (gc *dsAddrBookGc) purgeStore() {
}

id := record.Id.ID
if !record.clean() {
if !record.clean(gc.ab.clock.Now()) {
continue
}

Expand Down Expand Up @@ -317,7 +317,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
return
}

until := time.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix()
until := gc.ab.clock.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix()

var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
Expand Down Expand Up @@ -386,3 +386,25 @@ func (gc *dsAddrBookGc) populateLookahead() {

gc.currWindowEnd = until
}

// orderByTimestampInKey orders the results by comparing the timestamp in the
// key. A lexiographic sort by itself is wrong since "10" is less than "2", but
// as an int 2 is obviously less than 10.
func orderByTimestampInKey(a, b query.Entry) int {
aKey := ds.RawKey(a.Key)
aInt, err := strconv.ParseInt(aKey.Parent().Name(), 10, 64)
if err != nil {
return -1
}
bKey := ds.RawKey(b.Key)
bInt, err := strconv.ParseInt(bKey.Parent().Name(), 10, 64)
if err != nil {
return -1
}
if aInt < bInt {
return -1
} else if aInt == bInt {
return 0
}
return 1
}
19 changes: 13 additions & 6 deletions pstoreds/addr_book_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

mockClock "github.com/benbjohnson/clock"
query "github.com/ipfs/go-datastore/query"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
test "github.com/libp2p/go-libp2p-peerstore/test"
Expand Down Expand Up @@ -90,6 +91,8 @@ func TestGCPurging(t *testing.T) {
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 20 * time.Second
opts.GCPurgeInterval = 1 * time.Second
clk := mockClock.NewMock()
opts.Clock = clk

factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
Expand Down Expand Up @@ -120,7 +123,7 @@ func TestGCPurging(t *testing.T) {
t.Errorf("expected 4 GC lookahead entries, got: %v", i)
}

<-time.After(2 * time.Second)
clk.Add(2 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
Expand All @@ -129,13 +132,13 @@ func TestGCPurging(t *testing.T) {
// Purge the cache, to exercise a different path in the purge cycle.
tp.clearCache()

<-time.After(5 * time.Second)
clk.Add(5 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
}

<-time.After(5 * time.Second)
clk.Add(5 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 GC lookahead entries, got: %v", i)
Expand All @@ -157,6 +160,8 @@ func TestGCDelay(t *testing.T) {
opts.GCInitialDelay = 2 * time.Second
opts.GCLookaheadInterval = 1 * time.Minute
opts.GCPurgeInterval = 30 * time.Second
clk := mockClock.NewMock()
opts.Clock = clk

factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
Expand All @@ -172,7 +177,7 @@ func TestGCDelay(t *testing.T) {
}

// after the initial delay has passed.
<-time.After(3 * time.Second)
clk.Add(3 * time.Second)
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 lookahead entry, got: %d", i)
}
Expand All @@ -188,6 +193,8 @@ func TestGCLookaheadDisabled(t *testing.T) {
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 0 // disable lookahead
opts.GCPurgeInterval = 9 * time.Hour
clk := mockClock.NewMock()
opts.Clock = clk

factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
Expand All @@ -206,13 +213,13 @@ func TestGCLookaheadDisabled(t *testing.T) {
ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour)
ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour)

time.Sleep(100 * time.Millisecond)
clk.Add(100 * time.Millisecond)

if i := tp.countLookaheadEntries(); i != 0 {
t.Errorf("expected no GC lookahead entries, got: %v", i)
}

time.Sleep(500 * time.Millisecond)
clk.Add(500 * time.Millisecond)
gc := ab.(*dsAddrBook).gc
gc.purgeFunc()

Expand Down
10 changes: 8 additions & 2 deletions pstoreds/ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

pstore "github.com/libp2p/go-libp2p-core/peerstore"
pt "github.com/libp2p/go-libp2p-peerstore/test"

mockClock "github.com/benbjohnson/clock"
)

type datastoreFactory func(tb testing.TB) (ds.Batching, func())
Expand Down Expand Up @@ -50,16 +52,20 @@ func TestDsAddrBook(t *testing.T) {
opts := DefaultOpts()
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 1024
clk := mockClock.NewMock()
opts.Clock = clk

pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts), clk)
})

t.Run(name+" Cacheless", func(t *testing.T) {
opts := DefaultOpts()
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 0
clk := mockClock.NewMock()
opts.Clock = clk

pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts), clk)
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Options struct {
// Initial delay before GC processes start. Intended to give the system breathing room to fully boot
// before starting GC.
GCInitialDelay time.Duration

Clock clock
}

// DefaultOpts returns the default options for a persistent peerstore, with the full-purge GC algorithm:
Expand All @@ -50,6 +52,7 @@ func DefaultOpts() Options {
GCPurgeInterval: 2 * time.Hour,
GCLookaheadInterval: 0,
GCInitialDelay: 60 * time.Second,
Clock: realclock{},
}
}

Expand Down
6 changes: 4 additions & 2 deletions pstoremem/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

pstore "github.com/libp2p/go-libp2p-core/peerstore"

mockClock "github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)
Expand Down Expand Up @@ -43,11 +44,12 @@ func TestPeerstoreProtoStoreLimits(t *testing.T) {
}

func TestInMemoryAddrBook(t *testing.T) {
clk := mockClock.NewMock()
pt.TestAddrBook(t, func() (pstore.AddrBook, func()) {
ps, err := NewPeerstore()
ps, err := NewPeerstore(WithClock(clk))
require.NoError(t, err)
return ps, func() { ps.Close() }
})
}, clk)
}

func TestInMemoryKeyBook(t *testing.T) {
Expand Down
Loading

0 comments on commit 9f3a96b

Please sign in to comment.