Skip to content

Commit

Permalink
[dbnode] Additional params for Aggregator (#2959)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Nov 30, 2020
1 parent 060d253 commit e03804b
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ func (d *db) AggregateTiles(
return 0, err
}

processedTileCount, err := targetNs.AggregateTiles(sourceNs, opts)
processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts)
if err != nil {
d.log.Error("error writing large tiles",
zap.String("sourceNs", sourceNsID.String()),
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,9 @@ func TestDatabaseAggregateTiles(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped)
defer func() {
close(mapCh)
Expand All @@ -1441,7 +1444,6 @@ func TestDatabaseAggregateTiles(t *testing.T) {
var (
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
ctx = context.NewContext()
start = time.Now().Truncate(time.Hour)
)

Expand All @@ -1451,7 +1453,7 @@ func TestDatabaseAggregateTiles(t *testing.T) {

sourceNs := dbAddNewMockNamespace(ctrl, d, sourceNsID.String())
targetNs := dbAddNewMockNamespace(ctrl, d, targetNsID.String())
targetNs.EXPECT().AggregateTiles(sourceNs, opts).Return(int64(4), nil)
targetNs.EXPECT().AggregateTiles(ctx, sourceNs, opts).Return(int64(4), nil)

processedTileCount, err := d.AggregateTiles(ctx, sourceNsID, targetNsID, opts)
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,17 +1700,19 @@ func (n *dbNamespace) nsContextWithRLock() namespace.Context {
}

func (n *dbNamespace) AggregateTiles(
ctx context.Context,
sourceNs databaseNamespace,
opts AggregateTilesOptions,
) (int64, error) {
callStart := n.nowFn()
processedTileCount, err := n.aggregateTiles(sourceNs, opts)
processedTileCount, err := n.aggregateTiles(ctx, sourceNs, opts)
n.metrics.aggregateTiles.ReportSuccessOrError(err, n.nowFn().Sub(callStart))

return processedTileCount, err
}

func (n *dbNamespace) aggregateTiles(
ctx context.Context,
sourceNs databaseNamespace,
opts AggregateTilesOptions,
) (int64, error) {
Expand Down Expand Up @@ -1778,7 +1780,7 @@ func (n *dbNamespace) aggregateTiles(
}

shardProcessedTileCount, err := targetShard.AggregateTiles(
sourceNs.ID(), n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes,
ctx, sourceNs, n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes,
onColdFlushNs, opts)

processedTileCount += shardProcessedTileCount
Expand Down
18 changes: 11 additions & 7 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,9 @@ func TestNamespaceFlushState(t *testing.T) {
}

func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) {
ctx := context.NewContext()
defer ctx.Close()

var (
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
Expand All @@ -1432,19 +1435,22 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) {
targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions())
defer targetCloser()

_, err := targetNs.AggregateTiles(sourceNs, opts)
_, err := targetNs.AggregateTiles(ctx, sourceNs, opts)
require.Equal(t, errNamespaceNotBootstrapped, err)

sourceNs.bootstrapState = Bootstrapped

_, err = targetNs.AggregateTiles(sourceNs, opts)
_, err = targetNs.AggregateTiles(ctx, sourceNs, opts)
require.Equal(t, errNamespaceNotBootstrapped, err)
}

func TestNamespaceAggregateTiles(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

var (
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
Expand Down Expand Up @@ -1505,21 +1511,19 @@ func TestNamespaceAggregateTiles(t *testing.T) {
sourceBlockVolumes0 := []shardBlockVolume{{start, 5}, {secondSourceBlockStart, 15}}
sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}}

sourceNsIDMatcher := ident.NewIDMatcher(sourceNsID.String())

targetShard0.EXPECT().
AggregateTiles(
sourceNsIDMatcher, targetNs, shard0ID, gomock.Len(2), gomock.Any(),
ctx, sourceNs, targetNs, shard0ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes0, gomock.Any(), opts).
Return(int64(3), nil)

targetShard1.EXPECT().
AggregateTiles(
sourceNsIDMatcher, targetNs, shard1ID, gomock.Len(2), gomock.Any(),
ctx, sourceNs, targetNs, shard1ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes1, gomock.Any(), opts).
Return(int64(2), nil)

processedTileCount, err := targetNs.AggregateTiles(sourceNs, opts)
processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts)

require.NoError(t, err)
assert.Equal(t, int64(3+2), processedTileCount)
Expand Down
7 changes: 4 additions & 3 deletions src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,12 +942,13 @@ func (h *noopNamespaceHooks) OnCreatedNamespace(Namespace, GetNamespaceFn) error
type noopTileAggregator struct{}

func (a *noopTileAggregator) AggregateTiles(
opts AggregateTilesOptions,
ns Namespace,
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
readers []fs.DataFileSetReader,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error) {
return 0, nil
}
12 changes: 8 additions & 4 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2662,8 +2662,8 @@ func (s *dbShard) Repair(
}

func (s *dbShard) AggregateTiles(
sourceNsID ident.ID,
targetNs Namespace,
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
Expand All @@ -2687,7 +2687,11 @@ func (s *dbShard) AggregateTiles(
}
}()

maxEntries := 0
var (
sourceNsID = sourceNs.ID()
maxEntries = 0
)

for sourceBlockPos, blockReader := range blockReaders {
sourceBlockVolume := sourceBlockVolumes[sourceBlockPos]
openOpts := fs.DataReaderOpenOptions{
Expand Down Expand Up @@ -2742,7 +2746,7 @@ func (s *dbShard) AggregateTiles(
var multiErr xerrors.MultiError

processedTileCount, err := s.tileAggregator.AggregateTiles(
opts, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries)
ctx, sourceNs, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries, opts)
if err != nil {
// NB: cannot return on the error here, must finish writing.
multiErr = multiErr.Add(err)
Expand Down
41 changes: 25 additions & 16 deletions src/dbnode/storage/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,9 @@ func TestShardAggregateTiles(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

var (
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
Expand All @@ -1869,8 +1872,6 @@ func TestShardAggregateTiles(t *testing.T) {
sourceShard := testDatabaseShard(t, testOpts)
defer assert.NoError(t, sourceShard.Close())

sourceNsID := sourceShard.namespace.ID()

reader0, volume0 := getMockReader(ctrl, t, sourceShard, start, true)
reader0.EXPECT().Entries().Return(firstSourceBlockEntries)

Expand Down Expand Up @@ -1903,16 +1904,22 @@ func TestShardAggregateTiles(t *testing.T) {
}),
writer.EXPECT().Close(),
)
noOpColdFlushNs := &persist.NoOpColdFlushNamespace{}

targetNs := NewMockNamespace(ctrl)
var (
noOpColdFlushNs = &persist.NoOpColdFlushNamespace{}
sourceNs = NewMockNamespace(ctrl)
targetNs = NewMockNamespace(ctrl)
)

sourceNs.EXPECT().ID().Return(sourceShard.namespace.ID())

aggregator.EXPECT().
AggregateTiles(opts, targetNs, sourceShard.ID(), gomock.Len(2), writer,
noOpColdFlushNs).
AggregateTiles(ctx, sourceNs, targetNs, sourceShard.ID(), gomock.Len(2), writer,
noOpColdFlushNs, opts).
Return(expectedProcessedTileCount, nil)

processedTileCount, err := targetShard.AggregateTiles(
sourceNsID, targetNs, sourceShard.ID(), blockReaders, writer,
ctx, sourceNs, targetNs, sourceShard.ID(), blockReaders, writer,
sourceBlockVolumes, noOpColdFlushNs, opts)
require.NoError(t, err)
assert.Equal(t, expectedProcessedTileCount, processedTileCount)
Expand All @@ -1922,21 +1929,23 @@ func TestShardAggregateTilesVerifySliceLengths(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

var (
srcNsID = ident.StringID("src")
start = time.Now()
)
ctx := context.NewContext()
defer ctx.Close()

targetShard := testDatabaseShardWithIndexFn(t, DefaultTestOptions(), nil, true)
defer assert.NoError(t, targetShard.Close())

var blockReaders []fs.DataFileSetReader
sourceBlockVolumes := []shardBlockVolume{{start, 0}}

writer := fs.NewMockStreamingWriter(ctrl)
var (
start = time.Now()
blockReaders []fs.DataFileSetReader
sourceBlockVolumes = []shardBlockVolume{{start, 0}}
writer = fs.NewMockStreamingWriter(ctrl)
sourceNs = NewMockNamespace(ctrl)
targetNs = NewMockNamespace(ctrl)
)

_, err := targetShard.AggregateTiles(
srcNsID, nil, 1, blockReaders, writer, sourceBlockVolumes,
ctx, sourceNs, targetNs, 1, blockReaders, writer, sourceBlockVolumes,
&persist.NoOpColdFlushNamespace{}, AggregateTilesOptions{})
require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)")
}
Expand Down
24 changes: 12 additions & 12 deletions src/dbnode/storage/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,11 @@ type databaseNamespace interface {
WritePendingIndexInserts(pending []writes.PendingIndexInsert) error

// AggregateTiles does large tile aggregation from source namespace into this namespace.
AggregateTiles(sourceNs databaseNamespace, opts AggregateTilesOptions) (int64, error)
AggregateTiles(
ctx context.Context,
sourceNs databaseNamespace,
opts AggregateTilesOptions,
) (int64, error)

// ReadableShardAt returns a shard of this namespace by shardID.
ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error)
Expand Down Expand Up @@ -662,7 +666,8 @@ type databaseShard interface {

// AggregateTiles does large tile aggregation from source shards into this shard.
AggregateTiles(
sourceNsID ident.ID,
ctx context.Context,
sourceNs Namespace,
targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
Expand Down Expand Up @@ -1420,12 +1425,13 @@ type AggregateTilesOptions struct {
type TileAggregator interface {
// AggregateTiles does tile aggregation.
AggregateTiles(
opts AggregateTilesOptions,
ns Namespace,
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
readers []fs.DataFileSetReader,
writer fs.StreamingWriter,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error)
}

Expand Down

0 comments on commit e03804b

Please sign in to comment.