Skip to content

Commit

Permalink
[dbnode] Optimize snapshotting which can speedup bootstrapping consid…
Browse files Browse the repository at this point in the history
…erably
  • Loading branch information
robskillington committed Apr 1, 2022
1 parent 866fa71 commit 6b3b33b
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 65 deletions.
23 changes: 9 additions & 14 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ import (
"go.uber.org/zap"
)

var (
errFlushOperationsInProgress = errors.New("flush operations already in progress")
)
var errFlushOperationsInProgress = errors.New("flush operations already in progress")

type flushManagerState int

Expand Down Expand Up @@ -224,17 +222,14 @@ func (m *flushManager) dataSnapshot(
if len(snapshotBlockStarts) > maxBlocksSnapshottedByNamespace {
maxBlocksSnapshottedByNamespace = len(snapshotBlockStarts)
}
for _, snapshotBlockStart := range snapshotBlockStarts {
err := ns.Snapshot(
snapshotBlockStart, startTime, snapshotPersist)

if err != nil {
detailedErr := fmt.Errorf(
"namespace %s failed to snapshot data for blockStart %s: %v",
ns.ID().String(), snapshotBlockStart.String(), err)
multiErr = multiErr.Add(detailedErr)
continue
}

err := ns.Snapshot(snapshotBlockStarts, startTime, snapshotPersist)
if err != nil {
detailedErr := fmt.Errorf(
"namespace %s failed to snapshot data for some blocks: %w",
ns.ID().String(), err)
multiErr = multiErr.Add(detailedErr)
continue
}
}
m.metrics.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace))
Expand Down
11 changes: 7 additions & 4 deletions src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 +573,15 @@ func TestFlushManagerFlushSnapshot(t *testing.T) {
ns.EXPECT().NeedsFlush(st, st).Return(false, nil)
}

snapshotEnd := now.Add(bufferFuture).Truncate(blockSize)
var (
snapshotEnd = now.Add(bufferFuture).Truncate(blockSize)
snapshotBlocks []xtime.UnixNano
)
num = numIntervals(start, snapshotEnd, blockSize)
for i := 0; i < num; i++ {
st := start.Add(time.Duration(i) * blockSize)
ns.EXPECT().Snapshot(st, now, gomock.Any())
for i := num - 1; i >= 0; i-- {
snapshotBlocks = append(snapshotBlocks, start.Add(time.Duration(i)*blockSize))
}
ns.EXPECT().Snapshot(snapshotBlocks, now, gomock.Any())
}

require.NoError(t, fm.Flush(now))
Expand Down
28 changes: 17 additions & 11 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ func (n *dbNamespace) FlushIndex(flush persist.IndexFlush) error {
}

func (n *dbNamespace) Snapshot(
blockStart xtime.UnixNano,
blockStarts []xtime.UnixNano,
snapshotTime xtime.UnixNano,
snapshotPersist persist.SnapshotPreparer,
) error {
Expand Down Expand Up @@ -1374,21 +1374,27 @@ func (n *dbNamespace) Snapshot(
seriesPersist int
multiErr xerrors.MultiError
)

for _, shard := range n.OwnedShards() {
log := n.log.With(zap.Uint32("shard", shard.ID()))
if !shard.IsBootstrapped() {
n.log.
With(zap.Uint32("shard", shard.ID())).
Debug("skipping snapshot due to shard not bootstrapped yet")
log.Debug("skipping snapshot due to shard not bootstrapped yet")
continue
}
result, err := shard.Snapshot(blockStart, snapshotTime, snapshotPersist, nsCtx)
if err != nil {
detailedErr := fmt.Errorf("shard %d failed to snapshot: %v", shard.ID(), err)
multiErr = multiErr.Add(detailedErr)
// Continue with remaining shards
snapshotBlockStarts := shard.FilterBlocksNeedSnapshot(blockStarts)
if len(snapshotBlockStarts) == 0 {
log.Debug("skipping shard snapshot since no blocks need it")
continue
}
for _, blockStart := range snapshotBlockStarts {
snapshotResult, err := shard.Snapshot(blockStart, snapshotTime, snapshotPersist, nsCtx)
if err != nil {
detailedErr := fmt.Errorf("shard %d failed to snapshot %v block: %w", shard.ID(), blockStart, err)
multiErr = multiErr.Add(detailedErr)
continue
}
seriesPersist += snapshotResult.SeriesPersist
}

seriesPersist += result.SeriesPersist
}

n.metrics.snapshotSeriesPersist.Inc(int64(seriesPersist))
Expand Down
83 changes: 81 additions & 2 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ func TestNamespaceSnapshotNotBootstrapped(t *testing.T) {

blockSize := ns.Options().RetentionOptions().BlockSize()
blockStart := xtime.Now().Truncate(blockSize)
require.Equal(t, errNamespaceNotBootstrapped, ns.Snapshot(blockStart, blockStart, nil))
require.Equal(t, errNamespaceNotBootstrapped, ns.Snapshot([]xtime.UnixNano{blockStart}, blockStart, nil))
}

func TestNamespaceSnapshotAllShardsSuccess(t *testing.T) {
Expand Down Expand Up @@ -763,6 +763,74 @@ func TestNamespaceSnapshotShardSkipNotBootstrapped(t *testing.T) {
require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults))
}

func TestNamespaceSnapshotShardBlockFiltered(t *testing.T) {
var (
ctrl = xtest.NewController(t)
ctx = context.NewBackground()
now = xtime.Now()
ns, closer = newTestNamespaceWithIDOpts(t, defaultTestNs1ID,
namespace.NewOptions().SetSnapshotEnabled(true))
)
defer func() {
ctrl.Finish()
ctx.Close()
closer()
}()

var (
shardBootstrapStates = ShardBootstrapStates{}
blockSize = ns.Options().RetentionOptions().BlockSize()
block1 = now.Truncate(blockSize)
block2 = block1.Truncate(blockSize)
blocks = []xtime.UnixNano{block2, block1}
filteredBlocks = []xtime.UnixNano{block1}
)

ns.bootstrapState = Bootstrapped
ns.nowFn = func() time.Time { return now.ToTime() }
shard := newTestShard(ctrl)
ns.shards[shard.ID()] = shard
shardBootstrapStates[shard.ID()] = Bootstrapped
shard.EXPECT().FilterBlocksNeedSnapshot(blocks).Return(filteredBlocks)
shard.EXPECT().
Snapshot(block1, now, gomock.Any(), gomock.Any()).
Return(ShardSnapshotResult{}, nil)

err := ns.Snapshot(blocks, now, nil)
require.NoError(t, err)
}

func TestNamespaceSnapshotShardBlockAllShardsFiltered(t *testing.T) {
var (
ctrl = xtest.NewController(t)
ctx = context.NewBackground()
now = xtime.Now()
ns, closer = newTestNamespaceWithIDOpts(t, defaultTestNs1ID,
namespace.NewOptions().SetSnapshotEnabled(true))
)
defer func() {
ctrl.Finish()
ctx.Close()
closer()
}()

var (
shardBootstrapStates = ShardBootstrapStates{}
blockSize = ns.Options().RetentionOptions().BlockSize()
blocks = []xtime.UnixNano{now.Truncate(blockSize)}
)

ns.bootstrapState = Bootstrapped
ns.nowFn = func() time.Time { return now.ToTime() }
shard := newTestShard(ctrl)
ns.shards[shard.ID()] = shard
shardBootstrapStates[shard.ID()] = Bootstrapped
shard.EXPECT().FilterBlocksNeedSnapshot(blocks).Return([]xtime.UnixNano{})

err := ns.Snapshot(blocks, now, nil)
require.NoError(t, err)
}

func testSnapshotWithShardSnapshotErrs(
t *testing.T,
shardMethodResults []snapshotTestCase,
Expand Down Expand Up @@ -796,6 +864,9 @@ func testSnapshotWithShardSnapshotErrs(
if !tc.isBootstrapped {
continue
}
shard.EXPECT().
FilterBlocksNeedSnapshot([]xtime.UnixNano{blockStart}).
Return([]xtime.UnixNano{blockStart})
if tc.expectSnapshot {
shard.EXPECT().
Snapshot(blockStart, now, gomock.Any(), gomock.Any()).
Expand All @@ -805,7 +876,7 @@ func testSnapshotWithShardSnapshotErrs(
shardBootstrapStates[shardID] = tc.shardBootstrapStateBeforeTick
}

return ns.Snapshot(blockStart, now, nil)
return ns.Snapshot([]xtime.UnixNano{blockStart}, now, nil)
}

func TestNamespaceTruncate(t *testing.T) {
Expand Down Expand Up @@ -1780,3 +1851,11 @@ func contains(c []uint32, x uint32) bool {
}
return false
}

func newTestShard(ctrl *gomock.Controller) *MockdatabaseShard {
s := NewMockdatabaseShard(ctrl)
shardID := testShardIDs[0].ID()
s.EXPECT().ID().Return(shardID).AnyTimes()
s.EXPECT().IsBootstrapped().Return(true)
return s
}
60 changes: 38 additions & 22 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ type dbShardMetrics struct {
insertAsyncWriteInvalidParamsErrors tally.Counter
insertAsyncIndexErrors tally.Counter
snapshotTotalLatency tally.Timer
snapshotCheckNeedsSnapshotLatency tally.Timer
snapshotPrepareLatency tally.Timer
snapshotMergeByBucketLatency tally.Timer
snapshotMergeAcrossBucketsLatency tally.Timer
Expand Down Expand Up @@ -216,7 +215,6 @@ func newDatabaseShardMetrics(shardID uint32, scope tally.Scope) dbShardMetrics {
"suberror_type": "write-batch-error",
}).Counter(insertErrorName),
snapshotTotalLatency: snapshotScope.Timer("total-latency"),
snapshotCheckNeedsSnapshotLatency: snapshotScope.Timer("check-needs-snapshot-latency"),
snapshotPrepareLatency: snapshotScope.Timer("prepare-latency"),
snapshotMergeByBucketLatency: snapshotScope.Timer("merge-by-bucket-latency"),
snapshotMergeAcrossBucketsLatency: snapshotScope.Timer("merge-across-buckets-latency"),
Expand Down Expand Up @@ -2388,41 +2386,59 @@ func (s *dbShard) ColdFlush(
return flush, multiErr.FinalError()
}

func (s *dbShard) FilterBlocksNeedSnapshot(blockStarts []xtime.UnixNano) []xtime.UnixNano {
if !s.IsBootstrapped() {
return nil
}

needs := map[xtime.UnixNano]struct{}{}
s.forEachShardEntry(blocksNeedSnapshotFilter(blockStarts, needs))

// Note: doing this to keep original ordering. Not sure if that matters though.
filtered := make([]xtime.UnixNano, 0, len(needs))
for _, bl := range blockStarts {
if _, ok := needs[bl]; ok {
filtered = append(filtered, bl)
}
}
return filtered
}

func blocksNeedSnapshotFilter(
blockStarts []xtime.UnixNano,
needs map[xtime.UnixNano]struct{},
) func(entry *Entry) bool {
return func(entry *Entry) bool {
for _, blockStart := range blockStarts {
if _, ok := needs[blockStart]; ok {
continue
}
if !entry.Series.IsBufferEmptyAtBlockStart(blockStart) {
needs[blockStart] = struct{}{}
continue
}
}

return len(needs) < len(blockStarts)
}
}

func (s *dbShard) Snapshot(
blockStart xtime.UnixNano,
snapshotTime xtime.UnixNano,
snapshotPreparer persist.SnapshotPreparer,
nsCtx namespace.Context,
) (ShardSnapshotResult, error) {
// We don't snapshot data when the shard is still bootstrapping
s.RLock()
if s.bootstrapState != Bootstrapped {
s.RUnlock()
if !s.IsBootstrapped() {
return ShardSnapshotResult{}, errShardNotBootstrappedToSnapshot
}

s.RUnlock()

// Record per-shard snapshot latency, not many shards so safe
// to use a timer.
totalTimer := s.metrics.snapshotTotalLatency.Start()
defer totalTimer.Stop()

var needsSnapshot bool
checkNeedsSnapshotTimer := s.metrics.snapshotCheckNeedsSnapshotLatency.Start()
s.forEachShardEntry(func(entry *Entry) bool {
if !entry.Series.IsBufferEmptyAtBlockStart(blockStart) {
needsSnapshot = true
return false
}
return true
})
checkNeedsSnapshotTimer.Stop()

if !needsSnapshot {
return ShardSnapshotResult{}, nil
}

prepareOpts := persist.DataPrepareOptions{
NamespaceMetadata: s.namespace,
Shard: s.ID(),
Expand Down
Loading

0 comments on commit 6b3b33b

Please sign in to comment.