Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make snapshot process snapshot all unflushed blocks #1017

Merged
merged 26 commits into from
Nov 16, 2018
Merged
59 changes: 47 additions & 12 deletions src/dbnode/integration/disk_flush_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,59 @@ var (
errDiskFlushTimedOut = errors.New("flushing data to disk took too long")
)

type snapshotID struct {
blockStart time.Time
minVolume int
}

func getLatestSnapshotVolumeIndex(
filePathPrefix string,
shardSet sharding.ShardSet,
namespace ident.ID,
blockStart time.Time,
) int {
latestVolumeIndex := -1

for _, shard := range shardSet.AllIDs() {
snapshotFiles, err := fs.SnapshotFiles(
filePathPrefix, namespace, shard)
if err != nil {
panic(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't panic here, propagate error

Copy link
Collaborator

@prateek prateek Nov 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meh just saw its only for integration tests, w/e. feel free to leave as is.

}
latestSnapshot, ok := snapshotFiles.LatestVolumeForBlock(blockStart)
if !ok {
continue
}
if latestSnapshot.ID.VolumeIndex > latestVolumeIndex {
latestVolumeIndex = latestSnapshot.ID.VolumeIndex
}
}

return latestVolumeIndex
}

func waitUntilSnapshotFilesFlushed(
filePathPrefix string,
shardSet sharding.ShardSet,
namespace ident.ID,
expectedSnapshotTimes []time.Time,
expectedSnapshots []snapshotID,
timeout time.Duration,
) error {
dataFlushed := func() bool {
for _, shard := range shardSet.AllIDs() {
for _, t := range expectedSnapshotTimes {
exists, err := fs.SnapshotFileSetExistsAt(
filePathPrefix, namespace, shard, t)
for _, e := range expectedSnapshots {
snapshotFiles, err := fs.SnapshotFiles(
filePathPrefix, namespace, shard)
if err != nil {
panic(err)
}

if !exists {
latest, ok := snapshotFiles.LatestVolumeForBlock(e.blockStart)
if !ok {
return false
}

if !(latest.ID.VolumeIndex >= e.minVolume) {
return false
}
}
Expand Down Expand Up @@ -202,18 +238,17 @@ func verifySnapshottedDataFiles(
shardSet sharding.ShardSet,
storageOpts storage.Options,
namespace ident.ID,
snapshotTime time.Time,
seriesMaps map[xtime.UnixNano]generate.SeriesBlock,
) {
fsOpts := storageOpts.CommitLogOptions().FilesystemOptions()
reader, err := fs.NewReader(storageOpts.BytesPool(), fsOpts)
require.NoError(t, err)
iteratorPool := storageOpts.ReaderIteratorPool()
for _, ns := range testNamespaces {
for _, seriesList := range seriesMaps {
verifyForTime(
t, storageOpts, reader, shardSet, iteratorPool, snapshotTime,
ns, persist.FileSetSnapshotType, seriesList)
}
for blockStart, seriesList := range seriesMaps {

verifyForTime(
t, storageOpts, reader, shardSet, iteratorPool, blockStart.ToTime(),
namespace, persist.FileSetSnapshotType, seriesList)
}

}
113 changes: 92 additions & 21 deletions src/dbnode/integration/disk_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,19 @@ func TestDiskSnapshotSimple(t *testing.T) {
t.SkipNow() // Just skip if we're doing a short run
}
// Test setup
nOpts := namespace.NewOptions().SetSnapshotEnabled(true)
var (
nOpts = namespace.NewOptions().
SetSnapshotEnabled(true)
bufferPast = 50 * time.Minute
bufferFuture = 50 * time.Minute
blockSize = time.Hour
)

nOpts = nOpts.
SetRetentionOptions(nOpts.RetentionOptions().
SetBufferFuture(bufferFuture).
SetBufferPast(bufferPast).
SetBlockSize(blockSize))
md1, err := namespace.NewMetadata(testNamespaces[0], nOpts)
require.NoError(t, err)
md2, err := namespace.NewMetadata(testNamespaces[1], nOpts)
Expand All @@ -52,8 +64,7 @@ func TestDiskSnapshotSimple(t *testing.T) {
require.NoError(t, err)
defer testSetup.close()

blockSize := md1.Options().RetentionOptions().BlockSize()
filePathPrefix := testSetup.storageOpts.CommitLogOptions().FilesystemOptions().FilePathPrefix()
shardSet := testSetup.shardSet

// Start the server
log := testSetup.storageOpts.InstrumentOptions().Logger()
Expand All @@ -68,39 +79,99 @@ func TestDiskSnapshotSimple(t *testing.T) {
}()

// Write test data
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{IDs: []string{"foo", "bar", "baz"}, NumPoints: 100, Start: now},
}
var (
currBlock = testSetup.getNowFn().Truncate(blockSize)
now = currBlock.Add(11 * time.Minute)
assertTimeAllowsWritesToAllBlocks = func(ti time.Time) {
// Make sure now is within bufferPast of the previous block
require.True(t, ti.Before(ti.Truncate(blockSize).Add(bufferPast)))
// Make sure now is within bufferFuture of the next block
require.True(t, ti.After(ti.Truncate(blockSize).Add(blockSize).Add(-bufferFuture)))
}
)

assertTimeAllowsWritesToAllBlocks(now)
testSetup.setNowFn(now)

var (
seriesMaps = make(map[xtime.UnixNano]generate.SeriesBlock)
inputData = []generate.BlockConfig{
// Writes in the previous block which should be mutable due to bufferPast
{IDs: []string{"foo", "bar", "baz"}, NumPoints: 5, Start: currBlock.Add(-10 * time.Minute)},
// Writes in the current block
{IDs: []string{"a", "b", "c"}, NumPoints: 30, Start: currBlock},
// Writes in the next block which should be mutable due to bufferFuture
{IDs: []string{"1", "2", "3"}, NumPoints: 30, Start: currBlock.Add(blockSize)},
}
)
for _, input := range inputData {
testSetup.setNowFn(input.Start)
testData := generate.Block(input)
seriesMaps[xtime.ToUnixNano(input.Start)] = testData
seriesMaps[xtime.ToUnixNano(input.Start.Truncate(blockSize))] = testData
for _, ns := range testSetup.namespaces {
require.NoError(t, testSetup.writeBatch(ns.ID(), testData))
}
}

now = testSetup.getNowFn().Add(blockSize).Add(-10 * time.Minute)
// Now that we've completed the writes, we need to make sure that we wait for
// the next snapshot to guarantee that it should contain all the writes. We do
// this by measuring the current highest snapshot volume index and then updating
// the time (to allow another snapshot process to occur due to the configurable
// minimum time between snapshots), and then waiting for the snapshot files with
// the measure volume index + 1.
var (
snapshotsToWaitForByNS = make([][]snapshotID, 0, len(testSetup.namespaces))
filePathPrefix = testSetup.storageOpts.
CommitLogOptions().
FilesystemOptions().
FilePathPrefix()
)
for _, ns := range testSetup.namespaces {
snapshotsToWaitForByNS = append(snapshotsToWaitForByNS, []snapshotID{
{
blockStart: currBlock.Add(-blockSize),
minVolume: getLatestSnapshotVolumeIndex(
filePathPrefix, shardSet, ns.ID(), currBlock.Add(-blockSize)) + 1,
},
{
blockStart: currBlock,
minVolume: getLatestSnapshotVolumeIndex(
filePathPrefix, shardSet, ns.ID(), currBlock) + 1,
},
{
blockStart: currBlock.Add(blockSize),
minVolume: getLatestSnapshotVolumeIndex(
filePathPrefix, shardSet, ns.ID(), currBlock.Add(blockSize)) + 1,
},
})
}

now = testSetup.getNowFn().Add(2 * time.Minute)
assertTimeAllowsWritesToAllBlocks(now)
testSetup.setNowFn(now)

maxWaitTime := time.Minute
for _, ns := range testSetup.namespaces {
require.NoError(t, waitUntilSnapshotFilesFlushed(filePathPrefix, testSetup.shardSet, ns.ID(), []time.Time{now.Truncate(blockSize)}, maxWaitTime))
verifySnapshottedDataFiles(t, testSetup.shardSet, testSetup.storageOpts, ns.ID(), now.Truncate(blockSize), seriesMaps)
for i, ns := range testSetup.namespaces {
log.Info("waiting for snapshot files to flush")
require.NoError(t, waitUntilSnapshotFilesFlushed(
filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime))
log.Info("verifying snapshot files")
verifySnapshottedDataFiles(t, shardSet, testSetup.storageOpts, ns.ID(), seriesMaps)
}

oldTime := testSetup.getNowFn()
newTime := oldTime.Add(blockSize * 2)
var (
oldTime = testSetup.getNowFn()
newTime = oldTime.Add(blockSize * 2)
)
testSetup.setNowFn(newTime)

testSetup.sleepFor10xTickMinimumInterval()
for _, ns := range testSetup.namespaces {
// Make sure new snapshotfiles are written out
require.NoError(t, waitUntilSnapshotFilesFlushed(filePathPrefix, testSetup.shardSet, ns.ID(), []time.Time{newTime.Truncate(blockSize)}, maxWaitTime))
for _, shard := range testSetup.shardSet.All() {
log.Info("waiting for new snapshot files to be written out")
snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}}
require.NoError(t, waitUntilSnapshotFilesFlushed(
filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime))
log.Info("waiting for old snapshot files to be deleted")
for _, shard := range shardSet.All() {
waitUntil(func() bool {
// Make sure old snapshot files are deleted
exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize))
require.NoError(t, err)
return !exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) {
filePathPrefix,
setup.shardSet,
nsID,
[]time.Time{snapshotBlock}, maxFlushWaitTime)
[]snapshotID{{blockStart: snapshotBlock}},
maxFlushWaitTime,
)
if err != nil {
return false, fmt.Errorf("error waiting for snapshot files: %s", err.Error())
}
Expand Down
Loading