From 765a04d36d2a7591d5a6f5435cef99e2fffe1cfe Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 19 Sep 2023 09:40:00 +0200 Subject: [PATCH 1/6] add HookedDatastore to synchronize GC test --- v2/backend.go | 2 +- v2/backend_provider.go | 1 + v2/backend_provider_test.go | 73 +++++++++++++++++++++++--- v2/internal/kadtest/datastore.go | 89 ++++++++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 7 deletions(-) create mode 100644 v2/internal/kadtest/datastore.go diff --git a/v2/backend.go b/v2/backend.go index a8c7775a..4e5d313f 100644 --- a/v2/backend.go +++ b/v2/backend.go @@ -102,7 +102,7 @@ func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) (be *Reco // The values returned from [ProvidersBackend.Fetch] will be of type // [*providerSet] (unexported). The cfg parameter can be nil, in which case the // [DefaultProviderBackendConfig] will be used. -func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Batching, cfg *ProvidersBackendConfig) (be *ProvidersBackend, err error) { +func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Datastore, cfg *ProvidersBackendConfig) (be *ProvidersBackend, err error) { if cfg == nil { if cfg, err = DefaultProviderBackendConfig(); err != nil { return nil, fmt.Errorf("default provider backend config: %w", err) diff --git a/v2/backend_provider.go b/v2/backend_provider.go index 1ddd764f..aa3a5efb 100644 --- a/v2/backend_provider.go +++ b/v2/backend_provider.go @@ -265,6 +265,7 @@ func (p *ProvidersBackend) StartGarbageCollection() { ticker := p.cfg.clk.Ticker(p.cfg.GCInterval) defer ticker.Stop() + p.log.Info("Provider backend's started for loop") for { select { case <-ctx.Done(): diff --git a/v2/backend_provider_test.go b/v2/backend_provider_test.go index 10407e54..32118a70 100644 --- a/v2/backend_provider_test.go +++ b/v2/backend_provider_test.go @@ -3,14 +3,15 @@ package dht import ( "context" "io" + "os" "sync" "testing" "time" "github.com/benbjohnson/clock" ds "github.com/ipfs/go-datastore" - syncds "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/slog" @@ -22,7 +23,9 @@ func newBackendProvider(t testing.TB, cfg *ProvidersBackendConfig) *ProvidersBac h, err := libp2p.New(libp2p.NoListenAddrs) require.NoError(t, err) - dstore := syncds.MutexWrap(ds.NewMapDatastore()) + dstore, err := InMemoryDatastore() + require.NoError(t, err) + t.Cleanup(func() { if err = dstore.Close(); err != nil { t.Logf("closing datastore: %s", err) @@ -39,20 +42,50 @@ func newBackendProvider(t testing.TB, cfg *ProvidersBackendConfig) *ProvidersBac return b } +type SlogHandler struct { + clck clock.Clock + handler slog.Handler +} + +func (s SlogHandler) Enabled(ctx context.Context, level slog.Level) bool { + return s.handler.Enabled(ctx, level) +} + +func (s SlogHandler) Handle(ctx context.Context, record slog.Record) error { + record.Time = s.clck.Now() + return s.handler.Handle(ctx, record) +} + +func (s SlogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return s.handler.WithAttrs(attrs) +} + +func (s SlogHandler) WithGroup(name string) slog.Handler { + return s.handler.WithGroup(name) +} + +var _ slog.Handler = (*SlogHandler)(nil) + func TestProvidersBackend_GarbageCollection(t *testing.T) { clk := clock.NewMock() cfg, err := DefaultProviderBackendConfig() require.NoError(t, err) + handler := &SlogHandler{ + clck: clk, + handler: slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}), + } + cfg.clk = clk - cfg.Logger = devnull + cfg.Logger = slog.New(handler) b := newBackendProvider(t, cfg) - // start the garbage collection process - b.StartGarbageCollection() - t.Cleanup(func() { b.StopGarbageCollection() }) + // wrap datastore into hooked datastore. This allows us to hook into calls + // to dstore.Delete and thus synchronize on that operation. See below. + dstore := kadtest.NewHookedDatastore(b.datastore) + b.datastore = dstore // write random record to datastore and peerstore ctx := context.Background() @@ -64,6 +97,17 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) { err = b.datastore.Put(ctx, dsKey, rec.MarshalBinary()) require.NoError(t, err) + deleted := make(chan struct{}) + dstore.DeleteAfter = func(ctx context.Context, key ds.Key, err error) { + if key == dsKey { + close(deleted) + } + } + + // start the garbage collection process + b.StartGarbageCollection() + t.Cleanup(func() { b.StopGarbageCollection() }) + // write to peerstore b.addrBook.AddAddrs(p.ID, p.Addrs, time.Hour) @@ -71,13 +115,30 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) { clk.Add(cfg.ProvideValidity / 2) // we expect the record to still be there after half the ProvideValidity + cfg.Logger.Debug("get record", "key", dsKey) _, err = b.datastore.Get(ctx, dsKey) require.NoError(t, err) // advance clock another time and check if the record was GC'd now clk.Add(cfg.ProvideValidity + cfg.GCInterval) + // advancing the clock's time does not guarantee that all other goroutines + // have been run before this test's go routine continues. This means that + // the dstore.Delete call from the garbage collection may not have been + // performed by the time we assert the non-existence of the record below. + // Therefore, we block here until the `deleted` channel was closed. This + // happens after the record was deleted. That way, the below dstore.Get call + // will return a ds.ErrNotFound. This synchronization issue is especially + // problematic on Windows and didn't surface on MacOS. + select { + case <-kadtest.CtxShort(t).Done(): + t.Fatal("garbage collection timeout") + case <-deleted: + // deleted! + } + // we expect the record to be GC'd now + cfg.Logger.Debug("get record", "key", dsKey) val, err := b.datastore.Get(ctx, dsKey) assert.ErrorIs(t, err, ds.ErrNotFound) assert.Nil(t, val) diff --git a/v2/internal/kadtest/datastore.go b/v2/internal/kadtest/datastore.go new file mode 100644 index 00000000..b12fc18f --- /dev/null +++ b/v2/internal/kadtest/datastore.go @@ -0,0 +1,89 @@ +package kadtest + +import ( + "context" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" +) + +// HookedDatastore provides hook into HookedDatastore operations. Use [NewHookedDatastore] to +// initialize this HookedDatastore with no-op methods for each operation. Additional +// hooks can be added as needed. Only a subset is supported at the moment +// +// The idea is to wrap an ordinary [ds.Datastore] with this [HookedDatastore] and +// overwrite the Before/After hooks to, e.g., synchronize tests and assert +// that certain operations have been performed. +type HookedDatastore struct { + dstore ds.Datastore + + GetBefore func(ctx context.Context, key ds.Key) + GetAfter func(ctx context.Context, key ds.Key, value []byte, err error) + + HasBefore func(ctx context.Context, key ds.Key) + HasAfter func(ctx context.Context, key ds.Key, exists bool, err error) + + PutBefore func(ctx context.Context, key ds.Key, value []byte) + PutAfter func(ctx context.Context, key ds.Key, value []byte, err error) + + DeleteBefore func(ctx context.Context, key ds.Key) + DeleteAfter func(ctx context.Context, key ds.Key, err error) +} + +var _ ds.Datastore = (*HookedDatastore)(nil) + +// NewHookedDatastore initializes a [HookedDatastore] with no-op hooks into calls to dstore. +// See [HookedDatastore] for more information. +func NewHookedDatastore(dstore ds.Datastore) *HookedDatastore { + return &HookedDatastore{ + dstore: dstore, + GetBefore: func(context.Context, ds.Key) {}, + GetAfter: func(context.Context, ds.Key, []byte, error) {}, + HasBefore: func(context.Context, ds.Key) {}, + HasAfter: func(context.Context, ds.Key, bool, error) {}, + PutBefore: func(context.Context, ds.Key, []byte) {}, + PutAfter: func(context.Context, ds.Key, []byte, error) {}, + DeleteBefore: func(context.Context, ds.Key) {}, + DeleteAfter: func(context.Context, ds.Key, error) {}, + } +} + +func (d HookedDatastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + d.GetBefore(ctx, key) + defer d.GetAfter(ctx, key, value, err) + return d.dstore.Get(ctx, key) +} + +func (d HookedDatastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { + d.HasBefore(ctx, key) + defer d.HasAfter(ctx, key, exists, err) + return d.dstore.Has(ctx, key) +} + +func (d HookedDatastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { + return d.dstore.GetSize(ctx, key) +} + +func (d HookedDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + return d.dstore.Query(ctx, q) +} + +func (d HookedDatastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { + d.PutBefore(ctx, key, value) + defer d.PutAfter(ctx, key, value, err) + return d.dstore.Put(ctx, key, value) +} + +func (d HookedDatastore) Delete(ctx context.Context, key ds.Key) (err error) { + d.DeleteBefore(ctx, key) + defer d.DeleteAfter(ctx, key, err) + return d.dstore.Delete(ctx, key) +} + +func (d HookedDatastore) Sync(ctx context.Context, prefix ds.Key) error { + return d.dstore.Sync(ctx, prefix) +} + +func (d HookedDatastore) Close() error { + return d.dstore.Close() +} From 91b5fdbb7774aab144324bfd4eb7c4fc8a6fd094 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 19 Sep 2023 10:04:30 +0200 Subject: [PATCH 2/6] wip --- v2/backend_provider.go | 3 ++- v2/backend_provider_test.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/v2/backend_provider.go b/v2/backend_provider.go index aa3a5efb..1d58f837 100644 --- a/v2/backend_provider.go +++ b/v2/backend_provider.go @@ -257,7 +257,7 @@ func (p *ProvidersBackend) StartGarbageCollection() { p.gcCancel = cancel p.gcDone = make(chan struct{}) - p.log.Info("Provider backend's started garbage collection schedule") + p.log.Info("Provider backend started garbage collection schedule") go func() { defer close(p.gcDone) @@ -269,6 +269,7 @@ func (p *ProvidersBackend) StartGarbageCollection() { for { select { case <-ctx.Done(): + p.log.Info("Provider backend's for loop done") return case <-ticker.C: p.collectGarbage(ctx) diff --git a/v2/backend_provider_test.go b/v2/backend_provider_test.go index 32118a70..fd4ebb4e 100644 --- a/v2/backend_provider_test.go +++ b/v2/backend_provider_test.go @@ -130,6 +130,7 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) { // happens after the record was deleted. That way, the below dstore.Get call // will return a ds.ErrNotFound. This synchronization issue is especially // problematic on Windows and didn't surface on MacOS. + cfg.Logger.Debug("block until deleted") select { case <-kadtest.CtxShort(t).Done(): t.Fatal("garbage collection timeout") From 383eb0d84cc895a2459e3d3989128f0ae1c702e8 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 19 Sep 2023 10:07:36 +0200 Subject: [PATCH 3/6] WIP --- v2/backend_provider.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2/backend_provider.go b/v2/backend_provider.go index 1d58f837..8ce5d530 100644 --- a/v2/backend_provider.go +++ b/v2/backend_provider.go @@ -259,10 +259,10 @@ func (p *ProvidersBackend) StartGarbageCollection() { p.log.Info("Provider backend started garbage collection schedule") + ticker := p.cfg.clk.Ticker(p.cfg.GCInterval) + go func() { defer close(p.gcDone) - - ticker := p.cfg.clk.Ticker(p.cfg.GCInterval) defer ticker.Stop() p.log.Info("Provider backend's started for loop") From a743aac9bca7d80a250cdb76be95dca3219d2212 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 19 Sep 2023 10:10:50 +0200 Subject: [PATCH 4/6] WIP --- v2/backend_provider.go | 3 +-- v2/backend_provider_test.go | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/v2/backend_provider.go b/v2/backend_provider.go index 8ce5d530..98b8cd90 100644 --- a/v2/backend_provider.go +++ b/v2/backend_provider.go @@ -259,10 +259,9 @@ func (p *ProvidersBackend) StartGarbageCollection() { p.log.Info("Provider backend started garbage collection schedule") - ticker := p.cfg.clk.Ticker(p.cfg.GCInterval) - go func() { defer close(p.gcDone) + ticker := p.cfg.clk.Ticker(p.cfg.GCInterval) defer ticker.Stop() p.log.Info("Provider backend's started for loop") diff --git a/v2/backend_provider_test.go b/v2/backend_provider_test.go index fd4ebb4e..2b64be46 100644 --- a/v2/backend_provider_test.go +++ b/v2/backend_provider_test.go @@ -120,6 +120,7 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) { require.NoError(t, err) // advance clock another time and check if the record was GC'd now + cfg.Logger.Debug("advance time") clk.Add(cfg.ProvideValidity + cfg.GCInterval) // advancing the clock's time does not guarantee that all other goroutines From f369aa35b4317bac7e96e6590e10bc38b5a50d3e Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 19 Sep 2023 10:17:38 +0200 Subject: [PATCH 5/6] WIP --- v2/backend_provider.go | 8 ++--- v2/backend_provider_test.go | 70 +++---------------------------------- 2 files changed, 8 insertions(+), 70 deletions(-) diff --git a/v2/backend_provider.go b/v2/backend_provider.go index 98b8cd90..3be9d88a 100644 --- a/v2/backend_provider.go +++ b/v2/backend_provider.go @@ -257,18 +257,18 @@ func (p *ProvidersBackend) StartGarbageCollection() { p.gcCancel = cancel p.gcDone = make(chan struct{}) - p.log.Info("Provider backend started garbage collection schedule") + // init ticker outside the goroutine to prevent race condition with + // clock mock in garbage collection test. + ticker := p.cfg.clk.Ticker(p.cfg.GCInterval) go func() { defer close(p.gcDone) - ticker := p.cfg.clk.Ticker(p.cfg.GCInterval) defer ticker.Stop() - p.log.Info("Provider backend's started for loop") + p.log.Info("Provider backend started garbage collection schedule") for { select { case <-ctx.Done(): - p.log.Info("Provider backend's for loop done") return case <-ticker.C: p.collectGarbage(ctx) diff --git a/v2/backend_provider_test.go b/v2/backend_provider_test.go index 2b64be46..d3ab465d 100644 --- a/v2/backend_provider_test.go +++ b/v2/backend_provider_test.go @@ -3,7 +3,6 @@ package dht import ( "context" "io" - "os" "sync" "testing" "time" @@ -11,7 +10,6 @@ import ( "github.com/benbjohnson/clock" ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/slog" @@ -42,50 +40,20 @@ func newBackendProvider(t testing.TB, cfg *ProvidersBackendConfig) *ProvidersBac return b } -type SlogHandler struct { - clck clock.Clock - handler slog.Handler -} - -func (s SlogHandler) Enabled(ctx context.Context, level slog.Level) bool { - return s.handler.Enabled(ctx, level) -} - -func (s SlogHandler) Handle(ctx context.Context, record slog.Record) error { - record.Time = s.clck.Now() - return s.handler.Handle(ctx, record) -} - -func (s SlogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - return s.handler.WithAttrs(attrs) -} - -func (s SlogHandler) WithGroup(name string) slog.Handler { - return s.handler.WithGroup(name) -} - -var _ slog.Handler = (*SlogHandler)(nil) - func TestProvidersBackend_GarbageCollection(t *testing.T) { clk := clock.NewMock() cfg, err := DefaultProviderBackendConfig() require.NoError(t, err) - handler := &SlogHandler{ - clck: clk, - handler: slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}), - } - cfg.clk = clk - cfg.Logger = slog.New(handler) + cfg.Logger = devnull b := newBackendProvider(t, cfg) - // wrap datastore into hooked datastore. This allows us to hook into calls - // to dstore.Delete and thus synchronize on that operation. See below. - dstore := kadtest.NewHookedDatastore(b.datastore) - b.datastore = dstore + // start the garbage collection process + b.StartGarbageCollection() + t.Cleanup(func() { b.StopGarbageCollection() }) // write random record to datastore and peerstore ctx := context.Background() @@ -97,17 +65,6 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) { err = b.datastore.Put(ctx, dsKey, rec.MarshalBinary()) require.NoError(t, err) - deleted := make(chan struct{}) - dstore.DeleteAfter = func(ctx context.Context, key ds.Key, err error) { - if key == dsKey { - close(deleted) - } - } - - // start the garbage collection process - b.StartGarbageCollection() - t.Cleanup(func() { b.StopGarbageCollection() }) - // write to peerstore b.addrBook.AddAddrs(p.ID, p.Addrs, time.Hour) @@ -115,32 +72,13 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) { clk.Add(cfg.ProvideValidity / 2) // we expect the record to still be there after half the ProvideValidity - cfg.Logger.Debug("get record", "key", dsKey) _, err = b.datastore.Get(ctx, dsKey) require.NoError(t, err) // advance clock another time and check if the record was GC'd now - cfg.Logger.Debug("advance time") clk.Add(cfg.ProvideValidity + cfg.GCInterval) - // advancing the clock's time does not guarantee that all other goroutines - // have been run before this test's go routine continues. This means that - // the dstore.Delete call from the garbage collection may not have been - // performed by the time we assert the non-existence of the record below. - // Therefore, we block here until the `deleted` channel was closed. This - // happens after the record was deleted. That way, the below dstore.Get call - // will return a ds.ErrNotFound. This synchronization issue is especially - // problematic on Windows and didn't surface on MacOS. - cfg.Logger.Debug("block until deleted") - select { - case <-kadtest.CtxShort(t).Done(): - t.Fatal("garbage collection timeout") - case <-deleted: - // deleted! - } - // we expect the record to be GC'd now - cfg.Logger.Debug("get record", "key", dsKey) val, err := b.datastore.Get(ctx, dsKey) assert.ErrorIs(t, err, ds.ErrNotFound) assert.Nil(t, val) From 593366fb755bb16e0d21368a27846c1d443b4b50 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 19 Sep 2023 10:18:46 +0200 Subject: [PATCH 6/6] WIP --- v2/internal/kadtest/datastore.go | 89 -------------------------------- 1 file changed, 89 deletions(-) delete mode 100644 v2/internal/kadtest/datastore.go diff --git a/v2/internal/kadtest/datastore.go b/v2/internal/kadtest/datastore.go deleted file mode 100644 index b12fc18f..00000000 --- a/v2/internal/kadtest/datastore.go +++ /dev/null @@ -1,89 +0,0 @@ -package kadtest - -import ( - "context" - - ds "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" -) - -// HookedDatastore provides hook into HookedDatastore operations. Use [NewHookedDatastore] to -// initialize this HookedDatastore with no-op methods for each operation. Additional -// hooks can be added as needed. Only a subset is supported at the moment -// -// The idea is to wrap an ordinary [ds.Datastore] with this [HookedDatastore] and -// overwrite the Before/After hooks to, e.g., synchronize tests and assert -// that certain operations have been performed. -type HookedDatastore struct { - dstore ds.Datastore - - GetBefore func(ctx context.Context, key ds.Key) - GetAfter func(ctx context.Context, key ds.Key, value []byte, err error) - - HasBefore func(ctx context.Context, key ds.Key) - HasAfter func(ctx context.Context, key ds.Key, exists bool, err error) - - PutBefore func(ctx context.Context, key ds.Key, value []byte) - PutAfter func(ctx context.Context, key ds.Key, value []byte, err error) - - DeleteBefore func(ctx context.Context, key ds.Key) - DeleteAfter func(ctx context.Context, key ds.Key, err error) -} - -var _ ds.Datastore = (*HookedDatastore)(nil) - -// NewHookedDatastore initializes a [HookedDatastore] with no-op hooks into calls to dstore. -// See [HookedDatastore] for more information. -func NewHookedDatastore(dstore ds.Datastore) *HookedDatastore { - return &HookedDatastore{ - dstore: dstore, - GetBefore: func(context.Context, ds.Key) {}, - GetAfter: func(context.Context, ds.Key, []byte, error) {}, - HasBefore: func(context.Context, ds.Key) {}, - HasAfter: func(context.Context, ds.Key, bool, error) {}, - PutBefore: func(context.Context, ds.Key, []byte) {}, - PutAfter: func(context.Context, ds.Key, []byte, error) {}, - DeleteBefore: func(context.Context, ds.Key) {}, - DeleteAfter: func(context.Context, ds.Key, error) {}, - } -} - -func (d HookedDatastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { - d.GetBefore(ctx, key) - defer d.GetAfter(ctx, key, value, err) - return d.dstore.Get(ctx, key) -} - -func (d HookedDatastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { - d.HasBefore(ctx, key) - defer d.HasAfter(ctx, key, exists, err) - return d.dstore.Has(ctx, key) -} - -func (d HookedDatastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { - return d.dstore.GetSize(ctx, key) -} - -func (d HookedDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { - return d.dstore.Query(ctx, q) -} - -func (d HookedDatastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { - d.PutBefore(ctx, key, value) - defer d.PutAfter(ctx, key, value, err) - return d.dstore.Put(ctx, key, value) -} - -func (d HookedDatastore) Delete(ctx context.Context, key ds.Key) (err error) { - d.DeleteBefore(ctx, key) - defer d.DeleteAfter(ctx, key, err) - return d.dstore.Delete(ctx, key) -} - -func (d HookedDatastore) Sync(ctx context.Context, prefix ds.Key) error { - return d.dstore.Sync(ctx, prefix) -} - -func (d HookedDatastore) Close() error { - return d.dstore.Close() -}