From a268786b9d56a5bde2565a7420b673d5704587ca Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Mon, 30 Nov 2020 10:02:51 +0200 Subject: [PATCH] [dbnode] Additional params for Aggregator --- src/dbnode/storage/database.go | 2 +- src/dbnode/storage/database_test.go | 6 ++-- src/dbnode/storage/namespace.go | 6 ++-- src/dbnode/storage/namespace_test.go | 18 +++++++----- src/dbnode/storage/options.go | 7 +++-- src/dbnode/storage/shard.go | 12 +++++--- src/dbnode/storage/shard_test.go | 41 +++++++++++++++++----------- src/dbnode/storage/storage_mock.go | 24 ++++++++-------- src/dbnode/storage/types.go | 14 +++++++--- 9 files changed, 79 insertions(+), 51 deletions(-) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 5d0a645d12..5a0434ff1e 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -1292,7 +1292,7 @@ func (d *db) AggregateTiles( return 0, err } - processedTileCount, err := targetNs.AggregateTiles(sourceNs, opts) + processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts) if err != nil { d.log.Error("error writing large tiles", zap.String("sourceNs", sourceNsID.String()), diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index a843400f67..928a6fa8c2 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -1433,6 +1433,9 @@ func TestDatabaseAggregateTiles(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.NewContext() + defer ctx.Close() + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) defer func() { close(mapCh) @@ -1441,7 +1444,6 @@ func TestDatabaseAggregateTiles(t *testing.T) { var ( sourceNsID = ident.StringID("source") targetNsID = ident.StringID("target") - ctx = context.NewContext() start = time.Now().Truncate(time.Hour) ) @@ -1451,7 +1453,7 @@ func TestDatabaseAggregateTiles(t *testing.T) { sourceNs := dbAddNewMockNamespace(ctrl, d, sourceNsID.String()) targetNs := dbAddNewMockNamespace(ctrl, d, targetNsID.String()) - targetNs.EXPECT().AggregateTiles(sourceNs, opts).Return(int64(4), nil) + targetNs.EXPECT().AggregateTiles(ctx, sourceNs, opts).Return(int64(4), nil) processedTileCount, err := d.AggregateTiles(ctx, sourceNsID, targetNsID, opts) require.NoError(t, err) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 6a45b1cf71..e19693b9d5 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1698,17 +1698,19 @@ func (n *dbNamespace) nsContextWithRLock() namespace.Context { } func (n *dbNamespace) AggregateTiles( + ctx context.Context, sourceNs databaseNamespace, opts AggregateTilesOptions, ) (int64, error) { callStart := n.nowFn() - processedTileCount, err := n.aggregateTiles(sourceNs, opts) + processedTileCount, err := n.aggregateTiles(ctx, sourceNs, opts) n.metrics.aggregateTiles.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) return processedTileCount, err } func (n *dbNamespace) aggregateTiles( + ctx context.Context, sourceNs databaseNamespace, opts AggregateTilesOptions, ) (int64, error) { @@ -1776,7 +1778,7 @@ func (n *dbNamespace) aggregateTiles( } shardProcessedTileCount, err := targetShard.AggregateTiles( - sourceNs.ID(), n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, + ctx, sourceNs, n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, onColdFlushNs, opts) processedTileCount += shardProcessedTileCount diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 8f64e67613..0c564a612b 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -1377,6 +1377,9 @@ func TestNamespaceFlushState(t *testing.T) { } func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) { + ctx := context.NewContext() + defer ctx.Close() + var ( sourceNsID = ident.StringID("source") targetNsID = ident.StringID("target") @@ -1390,12 +1393,12 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) { targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) defer targetCloser() - _, err := targetNs.AggregateTiles(sourceNs, opts) + _, err := targetNs.AggregateTiles(ctx, sourceNs, opts) require.Equal(t, errNamespaceNotBootstrapped, err) sourceNs.bootstrapState = Bootstrapped - _, err = targetNs.AggregateTiles(sourceNs, opts) + _, err = targetNs.AggregateTiles(ctx, sourceNs, opts) require.Equal(t, errNamespaceNotBootstrapped, err) } @@ -1403,6 +1406,9 @@ func TestNamespaceAggregateTiles(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + ctx := context.NewContext() + defer ctx.Close() + var ( sourceNsID = ident.StringID("source") targetNsID = ident.StringID("target") @@ -1463,21 +1469,19 @@ func TestNamespaceAggregateTiles(t *testing.T) { sourceBlockVolumes0 := []shardBlockVolume{{start, 5}, {secondSourceBlockStart, 15}} sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}} - sourceNsIDMatcher := ident.NewIDMatcher(sourceNsID.String()) - targetShard0.EXPECT(). AggregateTiles( - sourceNsIDMatcher, targetNs, shard0ID, gomock.Len(2), gomock.Any(), + ctx, sourceNs, targetNs, shard0ID, gomock.Len(2), gomock.Any(), sourceBlockVolumes0, gomock.Any(), opts). Return(int64(3), nil) targetShard1.EXPECT(). AggregateTiles( - sourceNsIDMatcher, targetNs, shard1ID, gomock.Len(2), gomock.Any(), + ctx, sourceNs, targetNs, shard1ID, gomock.Len(2), gomock.Any(), sourceBlockVolumes1, gomock.Any(), opts). Return(int64(2), nil) - processedTileCount, err := targetNs.AggregateTiles(sourceNs, opts) + processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts) require.NoError(t, err) assert.Equal(t, int64(3+2), processedTileCount) diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 05bf1764e0..9da8bc65f4 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -942,12 +942,13 @@ func (h *noopNamespaceHooks) OnCreatedNamespace(Namespace, GetNamespaceFn) error type noopTileAggregator struct{} func (a *noopTileAggregator) AggregateTiles( - opts AggregateTilesOptions, - ns Namespace, + ctx context.Context, + sourceNs, targetNs Namespace, shardID uint32, - readers []fs.DataFileSetReader, + blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, + opts AggregateTilesOptions, ) (int64, error) { return 0, nil } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 591686e98f..fad6e770cb 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2659,8 +2659,8 @@ func (s *dbShard) Repair( } func (s *dbShard) AggregateTiles( - sourceNsID ident.ID, - targetNs Namespace, + ctx context.Context, + sourceNs, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, @@ -2684,7 +2684,11 @@ func (s *dbShard) AggregateTiles( } }() - maxEntries := 0 + var ( + sourceNsID = sourceNs.ID() + maxEntries = 0 + ) + for sourceBlockPos, blockReader := range blockReaders { sourceBlockVolume := sourceBlockVolumes[sourceBlockPos] openOpts := fs.DataReaderOpenOptions{ @@ -2739,7 +2743,7 @@ func (s *dbShard) AggregateTiles( var multiErr xerrors.MultiError processedTileCount, err := s.tileAggregator.AggregateTiles( - opts, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries) + ctx, sourceNs, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries, opts) if err != nil { // NB: cannot return on the error here, must finish writing. multiErr = multiErr.Add(err) diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 6e4e6a989f..65c4dbabf8 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -1844,6 +1844,9 @@ func TestShardAggregateTiles(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() + ctx := context.NewContext() + defer ctx.Close() + var ( sourceBlockSize = time.Hour targetBlockSize = 2 * time.Hour @@ -1865,8 +1868,6 @@ func TestShardAggregateTiles(t *testing.T) { sourceShard := testDatabaseShard(t, testOpts) defer assert.NoError(t, sourceShard.Close()) - sourceNsID := sourceShard.namespace.ID() - reader0, volume0 := getMockReader(ctrl, t, sourceShard, start, true) reader0.EXPECT().Entries().Return(firstSourceBlockEntries) @@ -1899,16 +1900,22 @@ func TestShardAggregateTiles(t *testing.T) { }), writer.EXPECT().Close(), ) - noOpColdFlushNs := &persist.NoOpColdFlushNamespace{} - targetNs := NewMockNamespace(ctrl) + var ( + noOpColdFlushNs = &persist.NoOpColdFlushNamespace{} + sourceNs = NewMockNamespace(ctrl) + targetNs = NewMockNamespace(ctrl) + ) + + sourceNs.EXPECT().ID().Return(sourceShard.namespace.ID()) + aggregator.EXPECT(). - AggregateTiles(opts, targetNs, sourceShard.ID(), gomock.Len(2), writer, - noOpColdFlushNs). + AggregateTiles(ctx, sourceNs, targetNs, sourceShard.ID(), gomock.Len(2), writer, + noOpColdFlushNs, opts). Return(expectedProcessedTileCount, nil) processedTileCount, err := targetShard.AggregateTiles( - sourceNsID, targetNs, sourceShard.ID(), blockReaders, writer, + ctx, sourceNs, targetNs, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, noOpColdFlushNs, opts) require.NoError(t, err) assert.Equal(t, expectedProcessedTileCount, processedTileCount) @@ -1918,21 +1925,23 @@ func TestShardAggregateTilesVerifySliceLengths(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() - var ( - srcNsID = ident.StringID("src") - start = time.Now() - ) + ctx := context.NewContext() + defer ctx.Close() targetShard := testDatabaseShardWithIndexFn(t, DefaultTestOptions(), nil, true) defer assert.NoError(t, targetShard.Close()) - var blockReaders []fs.DataFileSetReader - sourceBlockVolumes := []shardBlockVolume{{start, 0}} - - writer := fs.NewMockStreamingWriter(ctrl) + var ( + start = time.Now() + blockReaders []fs.DataFileSetReader + sourceBlockVolumes = []shardBlockVolume{{start, 0}} + writer = fs.NewMockStreamingWriter(ctrl) + sourceNs = NewMockNamespace(ctrl) + targetNs = NewMockNamespace(ctrl) + ) _, err := targetShard.AggregateTiles( - srcNsID, nil, 1, blockReaders, writer, sourceBlockVolumes, + ctx, sourceNs, targetNs, 1, blockReaders, writer, sourceBlockVolumes, &persist.NoOpColdFlushNamespace{}, AggregateTilesOptions{}) require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)") } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index bf9c30d220..565419cfb0 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1765,18 +1765,18 @@ func (mr *MockdatabaseNamespaceMockRecorder) WritePendingIndexInserts(pending in } // AggregateTiles mocks base method -func (m *MockdatabaseNamespace) AggregateTiles(sourceNs databaseNamespace, opts AggregateTilesOptions) (int64, error) { +func (m *MockdatabaseNamespace) AggregateTiles(ctx context.Context, sourceNs databaseNamespace, opts AggregateTilesOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateTiles", sourceNs, opts) + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, opts) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // AggregateTiles indicates an expected call of AggregateTiles -func (mr *MockdatabaseNamespaceMockRecorder) AggregateTiles(sourceNs, opts interface{}) *gomock.Call { +func (mr *MockdatabaseNamespaceMockRecorder) AggregateTiles(ctx, sourceNs, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseNamespace)(nil).AggregateTiles), sourceNs, opts) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseNamespace)(nil).AggregateTiles), ctx, sourceNs, opts) } // ReadableShardAt mocks base method @@ -2288,18 +2288,18 @@ func (mr *MockdatabaseShardMockRecorder) DocRef(id interface{}) *gomock.Call { } // AggregateTiles mocks base method -func (m *MockdatabaseShard) AggregateTiles(sourceNsID ident.ID, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, sourceBlockVolumes []shardBlockVolume, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { +func (m *MockdatabaseShard) AggregateTiles(ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, sourceBlockVolumes []shardBlockVolume, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateTiles", sourceNsID, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // AggregateTiles indicates an expected call of AggregateTiles -func (mr *MockdatabaseShardMockRecorder) AggregateTiles(sourceNsID, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts interface{}) *gomock.Call { +func (mr *MockdatabaseShardMockRecorder) AggregateTiles(ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseShard)(nil).AggregateTiles), sourceNsID, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseShard)(nil).AggregateTiles), ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) } // LatestVolume mocks base method @@ -5202,18 +5202,18 @@ func (m *MockTileAggregator) EXPECT() *MockTileAggregatorMockRecorder { } // AggregateTiles mocks base method -func (m *MockTileAggregator) AggregateTiles(opts AggregateTilesOptions, ns Namespace, shardID uint32, readers []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries) (int64, error) { +func (m *MockTileAggregator) AggregateTiles(ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, readers []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateTiles", opts, ns, shardID, readers, writer, onFlushSeries) + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // AggregateTiles indicates an expected call of AggregateTiles -func (mr *MockTileAggregatorMockRecorder) AggregateTiles(opts, ns, shardID, readers, writer, onFlushSeries interface{}) *gomock.Call { +func (mr *MockTileAggregatorMockRecorder) AggregateTiles(ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTileAggregator)(nil).AggregateTiles), opts, ns, shardID, readers, writer, onFlushSeries) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTileAggregator)(nil).AggregateTiles), ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts) } // MockNamespaceHooks is a mock of NamespaceHooks interface diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 20705b0abe..db9a9e2d9f 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -456,7 +456,11 @@ type databaseNamespace interface { WritePendingIndexInserts(pending []writes.PendingIndexInsert) error // AggregateTiles does large tile aggregation from source namespace into this namespace. - AggregateTiles(sourceNs databaseNamespace, opts AggregateTilesOptions) (int64, error) + AggregateTiles( + ctx context.Context, + sourceNs databaseNamespace, + opts AggregateTilesOptions, + ) (int64, error) // ReadableShardAt returns a shard of this namespace by shardID. ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error) @@ -647,7 +651,8 @@ type databaseShard interface { // AggregateTiles does large tile aggregation from source shards into this shard. AggregateTiles( - sourceNsID ident.ID, + ctx context.Context, + sourceNs Namespace, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, @@ -1399,12 +1404,13 @@ type AggregateTilesOptions struct { type TileAggregator interface { // AggregateTiles does tile aggregation. AggregateTiles( - opts AggregateTilesOptions, - ns Namespace, + ctx context.Context, + sourceNs, targetNs Namespace, shardID uint32, readers []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, + opts AggregateTilesOptions, ) (int64, error) }