From cb4691c1d97a75ca2f1253266251200db4f755bf Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 4 Oct 2018 09:15:45 -0400 Subject: [PATCH 01/26] interim --- src/dbnode/storage/flush.go | 81 ++++++++++++----------- src/dbnode/storage/flush_test.go | 106 +++++-------------------------- src/dbnode/storage/util.go | 2 +- 3 files changed, 57 insertions(+), 132 deletions(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index bde972724c..cdd457caaa 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -103,19 +103,19 @@ func (m *flushManager) Flush( } multiErr := xerrors.NewMultiError() - m.setState(flushManagerFlushInProgress) - for _, ns := range namespaces { - // Flush first because we will only snapshot if there are no outstanding flushes - flushTimes := m.namespaceFlushTimes(ns, tickStart) - shardBootstrapTimes, ok := dbBootstrapStateAtTickStart.NamespaceBootstrapStates[ns.ID().String()] - if !ok { - // Could happen if namespaces are added / removed. - multiErr = multiErr.Add(fmt.Errorf( - "tried to flush ns: %s, but did not have shard bootstrap times", ns.ID().String())) - continue - } - multiErr = multiErr.Add(m.flushNamespaceWithTimes(ns, shardBootstrapTimes, flushTimes, flush)) - } + // m.setState(flushManagerFlushInProgress) + // for _, ns := range namespaces { + // // Flush first because we will only snapshot if there are no outstanding flushes + // flushTimes := m.namespaceFlushTimes(ns, tickStart) + // shardBootstrapTimes, ok := dbBootstrapStateAtTickStart.NamespaceBootstrapStates[ns.ID().String()] + // if !ok { + // // Could happen if namespaces are added / removed. + // multiErr = multiErr.Add(fmt.Errorf( + // "tried to flush ns: %s, but did not have shard bootstrap times", ns.ID().String())) + // continue + // } + // multiErr = multiErr.Add(m.flushNamespaceWithTimes(ns, shardBootstrapTimes, flushTimes, flush)) + // } // Perform two separate loops through all the namespaces so that we can emit better // gauges I.E all the flushing for all the namespaces happens at once and then all @@ -123,15 +123,8 @@ func (m *flushManager) Flush( // better semantically because flushing should take priority over snapshotting. m.setState(flushManagerSnapshotInProgress) for _, ns := range namespaces { - var ( - blockSize = ns.Options().RetentionOptions().BlockSize() - snapshotBlockStart = m.snapshotBlockStart(ns, tickStart) - prevBlockStart = snapshotBlockStart.Add(-blockSize) - ) - - // Only perform snapshots if the previous block (I.E the block directly before - // the block that we would snapshot) has been flushed. - if !ns.NeedsFlush(prevBlockStart, prevBlockStart) { + snapshotBlockStarts := m.namespaceSnapshotTimes(ns, tickStart) + for _, snapshotBlockStart := range snapshotBlockStarts { if err := ns.Snapshot(snapshotBlockStart, tickStart, flush); err != nil { detailedErr := fmt.Errorf("namespace %s failed to snapshot data: %v", ns.ID().String(), err) @@ -198,26 +191,8 @@ func (m *flushManager) setState(state flushManagerState) { m.Unlock() } -func (m *flushManager) snapshotBlockStart(ns databaseNamespace, curr time.Time) time.Time { - var ( - rOpts = ns.Options().RetentionOptions() - blockSize = rOpts.BlockSize() - bufferPast = rOpts.BufferPast() - ) - // Only begin snapshotting a new block once the previous one is immutable. I.E if we have - // a 2-hour blocksize, and bufferPast is 10 minutes and our blocks are aligned on even hours, - // then at: - // 1) 1:30PM we want to snapshot with a 12PM block start and 1:30.Add(-10min).Truncate(2hours) = 12PM - // 2) 1:59PM we want to snapshot with a 12PM block start and 1:59.Add(-10min).Truncate(2hours) = 12PM - // 3) 2:09PM we want to snapshot with a 12PM block start (because the 12PM block can still be receiving - // "buffer past" writes) and 2:09.Add(-10min).Truncate(2hours) = 12PM - // 4) 2:10PM we want to snapshot with a 2PM block start (because the 12PM block can no long receive - // "buffer past" writes) and 2:10.Add(-10min).Truncate(2hours) = 2PM - return curr.Add(-bufferPast).Truncate(blockSize) -} - -func (m *flushManager) flushRange(ropts retention.Options, t time.Time) (time.Time, time.Time) { - return retention.FlushTimeStart(ropts, t), retention.FlushTimeEnd(ropts, t) +func (m *flushManager) flushRange(rOpts retention.Options, t time.Time) (time.Time, time.Time) { + return retention.FlushTimeStart(rOpts, t), retention.FlushTimeEnd(rOpts, t) } func (m *flushManager) namespaceFlushTimes(ns databaseNamespace, curr time.Time) []time.Time { @@ -229,6 +204,28 @@ func (m *flushManager) namespaceFlushTimes(ns databaseNamespace, curr time.Time) candidateTimes := timesInRange(earliest, latest, blockSize) return filterTimes(candidateTimes, func(t time.Time) bool { + fmt.Println("flush: ", t) + return ns.NeedsFlush(t, t) + }) +} + +func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Time) []time.Time { + var ( + rOpts = ns.Options().RetentionOptions() + blockSize = rOpts.BlockSize() + // Earliest possible snapshottable block is the earliest possible flush + // block start which is the first block in the retention period. + earliest = retention.FlushTimeStart(rOpts, curr) + // Latest possible snapshotting block is either the current block OR the + // next block if the current time and bufferFuture configuration would + // allow writes to be written into the next block. + latest = curr.Add(rOpts.BufferFuture()).Truncate(blockSize) + ) + + candidateTimes := timesInRange(earliest, latest, blockSize) + return filterTimes(candidateTimes, func(t time.Time) bool { + // Snapshot anything that is unflushed + fmt.Println("snapshot: ", t) return ns.NeedsFlush(t, t) }) } diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index 8dd1525e5e..675dce36a4 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -22,6 +22,7 @@ package storage import ( "errors" + "fmt" "sort" "sync" "testing" @@ -172,6 +173,7 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes() ns.EXPECT().Flush(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockFlusher := persist.NewMockDataFlush(ctrl) mockFlusher.EXPECT().DoneData().Return(nil) @@ -209,6 +211,7 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes() ns.EXPECT().Flush(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().FlushIndex(gomock.Any()).Return(nil) mockFlusher := persist.NewMockDataFlush(ctrl) @@ -352,56 +355,25 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { for _, ns := range []*MockdatabaseNamespace{ns1, ns2} { rOpts := ns.Options().RetentionOptions() blockSize := rOpts.BlockSize() - bufferPast := rOpts.BufferPast() + bufferFuture := rOpts.BufferFuture() start := retention.FlushTimeStart(ns.Options().RetentionOptions(), now) - end := retention.FlushTimeEnd(ns.Options().RetentionOptions(), now) - num := numIntervals(start, end, blockSize) - - for i := 0; i < num; i++ { - st := start.Add(time.Duration(i) * blockSize) - ns.EXPECT().NeedsFlush(st, st).Return(false) - } - - currBlockStart := now.Add(-bufferPast).Truncate(blockSize) - prevBlockStart := currBlockStart.Add(-blockSize) - ns.EXPECT().NeedsFlush(prevBlockStart, prevBlockStart).Return(false) - ns.EXPECT().Snapshot(currBlockStart, now, gomock.Any()) - } - - bootstrapStates := DatabaseBootstrapState{ - NamespaceBootstrapStates: map[string]ShardBootstrapStates{ - ns1.ID().String(): ShardBootstrapStates{}, - ns2.ID().String(): ShardBootstrapStates{}, - }, - } - require.NoError(t, fm.Flush(now, bootstrapStates)) -} - -func TestFlushManagerFlushNoSnapshotWhileFlushPending(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - fm, ns1, ns2 := newMultipleFlushManagerNeedsFlush(t, ctrl) - now := time.Now() - - for _, ns := range []*MockdatabaseNamespace{ns1, ns2} { - rOpts := ns.Options().RetentionOptions() - blockSize := rOpts.BlockSize() - bufferPast := rOpts.BufferPast() - - start := retention.FlushTimeStart(ns.Options().RetentionOptions(), now) - end := retention.FlushTimeEnd(ns.Options().RetentionOptions(), now) - num := numIntervals(start, end, blockSize) - + flushEnd := now.Add(bufferFuture).Truncate(blockSize) + num := numIntervals(start, snapshotEnd, blockSize) + + // for i := 0; i < num; i++ { + // st := start.Add(time.Duration(i) * blockSize) + // ns.EXPECT().NeedsFlush(st, st).Return(false) + // ns.EXPECT().Flush(gomock.Any(), gomock.Any(), gomock.Any()) + // } + snapshotEnd := now.Add(bufferFuture).Truncate(blockSize) + num = numIntervals(start, snapshotEnd, blockSize) for i := 0; i < num; i++ { st := start.Add(time.Duration(i) * blockSize) - ns.EXPECT().NeedsFlush(st, st).Return(false) + fmt.Println(st) + ns.EXPECT().NeedsFlush(st, st).Return(true) + ns.EXPECT().Snapshot(st, now, gomock.Any()) } - - currBlockStart := now.Add(-bufferPast).Truncate(blockSize) - prevBlockStart := currBlockStart.Add(-blockSize) - ns.EXPECT().NeedsFlush(prevBlockStart, prevBlockStart).Return(true) } bootstrapStates := DatabaseBootstrapState{ @@ -413,50 +385,6 @@ func TestFlushManagerFlushNoSnapshotWhileFlushPending(t *testing.T) { require.NoError(t, fm.Flush(now, bootstrapStates)) } -func TestFlushManagerSnapshotBlockStart(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - fm, _, _ := newMultipleFlushManagerNeedsFlush(t, ctrl) - now := time.Now() - - nsOpts := namespace.NewOptions() - rOpts := nsOpts.RetentionOptions(). - SetBlockSize(2 * time.Hour). - SetBufferPast(10 * time.Minute) - blockSize := rOpts.BlockSize() - nsOpts = nsOpts.SetRetentionOptions(rOpts) - ns := NewMockdatabaseNamespace(ctrl) - ns.EXPECT().Options().Return(nsOpts).AnyTimes() - - testCases := []struct { - currTime time.Time - expectedBlockStart time.Time - }{ - // Set comment in snapshotBlockStart for explanation of these test cases - { - currTime: now.Truncate(blockSize).Add(30 * time.Minute), - expectedBlockStart: now.Truncate(blockSize), - }, - { - currTime: now.Truncate(blockSize).Add(119 * time.Minute), - expectedBlockStart: now.Truncate(blockSize), - }, - { - currTime: now.Truncate(blockSize).Add(129 * time.Minute), - expectedBlockStart: now.Truncate(blockSize), - }, - { - currTime: now.Truncate(blockSize).Add(130 * time.Minute), - expectedBlockStart: now.Truncate(blockSize).Add(blockSize), - }, - } - - for _, tc := range testCases { - require.Equal(t, tc.expectedBlockStart, fm.snapshotBlockStart(ns, tc.currTime)) - } -} - type timesInOrder []time.Time func (a timesInOrder) Len() int { return len(a) } diff --git a/src/dbnode/storage/util.go b/src/dbnode/storage/util.go index 87faa3a8a2..9ce57935e7 100644 --- a/src/dbnode/storage/util.go +++ b/src/dbnode/storage/util.go @@ -47,7 +47,7 @@ func timesInRange(startInclusive, endInclusive time.Time, windowSize time.Durati return nil } times := make([]time.Time, 0, ni) - for t := endInclusive; !t.Before(startInclusive); t = t.Add(-windowSize) { + for t := startInclusive; !t.After(endInclusive); t = t.Add(windowSize) { times = append(times, t) } return times From 1ee2d0d617da22b630109ac3617baf5a28fc8144 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 4 Oct 2018 19:30:30 -0400 Subject: [PATCH 02/26] Snapshot everything --- src/dbnode/storage/flush.go | 26 +++++++++++++------------- src/dbnode/storage/flush_test.go | 16 ++++++++-------- src/dbnode/storage/util_test.go | 14 +++++++------- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index cdd457caaa..e534329315 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -103,19 +103,19 @@ func (m *flushManager) Flush( } multiErr := xerrors.NewMultiError() - // m.setState(flushManagerFlushInProgress) - // for _, ns := range namespaces { - // // Flush first because we will only snapshot if there are no outstanding flushes - // flushTimes := m.namespaceFlushTimes(ns, tickStart) - // shardBootstrapTimes, ok := dbBootstrapStateAtTickStart.NamespaceBootstrapStates[ns.ID().String()] - // if !ok { - // // Could happen if namespaces are added / removed. - // multiErr = multiErr.Add(fmt.Errorf( - // "tried to flush ns: %s, but did not have shard bootstrap times", ns.ID().String())) - // continue - // } - // multiErr = multiErr.Add(m.flushNamespaceWithTimes(ns, shardBootstrapTimes, flushTimes, flush)) - // } + m.setState(flushManagerFlushInProgress) + for _, ns := range namespaces { + // Flush first because we will only snapshot if there are no outstanding flushes + flushTimes := m.namespaceFlushTimes(ns, tickStart) + shardBootstrapTimes, ok := dbBootstrapStateAtTickStart.NamespaceBootstrapStates[ns.ID().String()] + if !ok { + // Could happen if namespaces are added / removed. + multiErr = multiErr.Add(fmt.Errorf( + "tried to flush ns: %s, but did not have shard bootstrap times", ns.ID().String())) + continue + } + multiErr = multiErr.Add(m.flushNamespaceWithTimes(ns, shardBootstrapTimes, flushTimes, flush)) + } // Perform two separate loops through all the namespaces so that we can emit better // gauges I.E all the flushing for all the namespaces happens at once and then all diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index 675dce36a4..8d4c641ba0 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -358,14 +358,14 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { bufferFuture := rOpts.BufferFuture() start := retention.FlushTimeStart(ns.Options().RetentionOptions(), now) - flushEnd := now.Add(bufferFuture).Truncate(blockSize) - num := numIntervals(start, snapshotEnd, blockSize) - - // for i := 0; i < num; i++ { - // st := start.Add(time.Duration(i) * blockSize) - // ns.EXPECT().NeedsFlush(st, st).Return(false) - // ns.EXPECT().Flush(gomock.Any(), gomock.Any(), gomock.Any()) - // } + flushEnd := retention.FlushTimeEnd(ns.Options().RetentionOptions(), now) + num := numIntervals(start, flushEnd, blockSize) + + for i := 0; i < num; i++ { + st := start.Add(time.Duration(i) * blockSize) + ns.EXPECT().NeedsFlush(st, st).Return(false) + } + snapshotEnd := now.Add(bufferFuture).Truncate(blockSize) num = numIntervals(start, snapshotEnd, blockSize) for i := 0; i < num; i++ { diff --git a/src/dbnode/storage/util_test.go b/src/dbnode/storage/util_test.go index 6264412ec3..b9600748a1 100644 --- a/src/dbnode/storage/util_test.go +++ b/src/dbnode/storage/util_test.go @@ -77,14 +77,14 @@ func TestTimesInRange(t *testing.T) { } timesForN = func(i, j int64) []time.Time { times := make([]time.Time, 0, j-i+1) - for k := j; k >= i; k-- { + for k := i; k <= j; k++ { times = append(times, timeFor(k)) } return times } ) // [0, 2] with a gap of 1 ==> [0, 1, 2] - require.Equal(t, []time.Time{timeFor(2), timeFor(1), timeFor(0)}, + require.Equal(t, []time.Time{timeFor(0), timeFor(1), timeFor(2)}, timesInRange(timeFor(0), timeFor(2), w)) // [2, 1] with a gap of 1 ==> empty result @@ -94,14 +94,14 @@ func TestTimesInRange(t *testing.T) { require.Equal(t, []time.Time{timeFor(2)}, timesInRange(timeFor(2), timeFor(2), w)) - // [0, 9] with a gap of 3 ==> [9, 6, 3, 0] - require.Equal(t, []time.Time{timeFor(9), timeFor(6), timeFor(3), timeFor(0)}, + // [0, 9] with a gap of 3 ==> [0, 3, 6, 9] + require.Equal(t, []time.Time{timeFor(0), timeFor(3), timeFor(6), timeFor(9)}, timesInRange(timeFor(0), timeFor(9), 3*w)) - // [1, 9] with a gap of 3 ==> [9, 6, 3] - require.Equal(t, []time.Time{timeFor(9), timeFor(6), timeFor(3)}, + // [1, 9] with a gap of 3 ==> [1, 4, 7] + require.Equal(t, []time.Time{timeFor(1), timeFor(4), timeFor(7)}, timesInRange(timeFor(1), timeFor(9), 3*w)) - // [1, 100] with a gap of 1 ==> [100, 99, ..., 1] + // [1, 100] with a gap of 1 ==> [1, ..., 99, 100] require.Equal(t, timesForN(1, 100), timesInRange(timeFor(1), timeFor(100), w)) } From e4c1fef3d8def589a1b05bac544570e23c25b286 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 4 Oct 2018 19:31:59 -0400 Subject: [PATCH 03/26] remove println --- src/dbnode/storage/flush.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index e534329315..d2c90fa717 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -204,7 +204,6 @@ func (m *flushManager) namespaceFlushTimes(ns databaseNamespace, curr time.Time) candidateTimes := timesInRange(earliest, latest, blockSize) return filterTimes(candidateTimes, func(t time.Time) bool { - fmt.Println("flush: ", t) return ns.NeedsFlush(t, t) }) } From 7a7924743b9f568c7827c25644b46ee56a526856 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 4 Oct 2018 19:32:44 -0400 Subject: [PATCH 04/26] Refactor comment --- src/dbnode/storage/flush.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index d2c90fa717..b1a9fab2e2 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -212,7 +212,7 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti var ( rOpts = ns.Options().RetentionOptions() blockSize = rOpts.BlockSize() - // Earliest possible snapshottable block is the earliest possible flush + // Earliest possible snapshottable block is the earliest possible flushable // block start which is the first block in the retention period. earliest = retention.FlushTimeStart(rOpts, curr) // Latest possible snapshotting block is either the current block OR the From a7c4c1f12fccb59abdd7432621f3f064ade25b8a Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 4 Oct 2018 19:32:56 -0400 Subject: [PATCH 05/26] delete print statement --- src/dbnode/storage/flush.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index b1a9fab2e2..dbc62af234 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -224,7 +224,6 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti candidateTimes := timesInRange(earliest, latest, blockSize) return filterTimes(candidateTimes, func(t time.Time) bool { // Snapshot anything that is unflushed - fmt.Println("snapshot: ", t) return ns.NeedsFlush(t, t) }) } From eb5ab830132aeedef94cc2a580f2e2347bbd2488 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 4 Oct 2018 19:34:26 -0400 Subject: [PATCH 06/26] Add better comment --- src/dbnode/storage/flush.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index dbc62af234..8795715873 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -120,7 +120,11 @@ func (m *flushManager) Flush( // Perform two separate loops through all the namespaces so that we can emit better // gauges I.E all the flushing for all the namespaces happens at once and then all // the snapshotting for all the namespaces happens at once. This is also slightly - // better semantically because flushing should take priority over snapshotting. + // better semantically because flushing should take priority over snapshotting. In + // addition, we need to make sure that for any given shard/blockStart combination, + // we attempt a flush befor a snapshot as the snapshotting process will attempt to + // snapshot any unflushed blocks which would be wasteful if the block is already + // flushable. m.setState(flushManagerSnapshotInProgress) for _, ns := range namespaces { snapshotBlockStarts := m.namespaceSnapshotTimes(ns, tickStart) From 811c7e32abf7b54bffce4eca2fd7fd165d92260a Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 4 Oct 2018 19:34:44 -0400 Subject: [PATCH 07/26] Restructure comment --- src/dbnode/storage/flush.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index 8795715873..e4a8c6bcf7 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -120,8 +120,9 @@ func (m *flushManager) Flush( // Perform two separate loops through all the namespaces so that we can emit better // gauges I.E all the flushing for all the namespaces happens at once and then all // the snapshotting for all the namespaces happens at once. This is also slightly - // better semantically because flushing should take priority over snapshotting. In - // addition, we need to make sure that for any given shard/blockStart combination, + // better semantically because flushing should take priority over snapshotting. + // + // In addition, we need to make sure that for any given shard/blockStart combination, // we attempt a flush befor a snapshot as the snapshotting process will attempt to // snapshot any unflushed blocks which would be wasteful if the block is already // flushable. From 5d7dd526d6e6d54c91642c40f387f8f933d7a507 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 4 Oct 2018 22:37:51 -0400 Subject: [PATCH 08/26] fix integration test --- src/dbnode/integration/disk_flush_helpers.go | 11 +++++------ src/dbnode/integration/disk_snapshot_test.go | 7 +++++-- src/dbnode/storage/flush_test.go | 2 -- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index d550fe8198..b680a92b32 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -209,11 +209,10 @@ func verifySnapshottedDataFiles( reader, err := fs.NewReader(storageOpts.BytesPool(), fsOpts) require.NoError(t, err) iteratorPool := storageOpts.ReaderIteratorPool() - for _, ns := range testNamespaces { - for _, seriesList := range seriesMaps { - verifyForTime( - t, storageOpts, reader, shardSet, iteratorPool, snapshotTime, - ns, persist.FileSetSnapshotType, seriesList) - } + for _, seriesList := range seriesMaps { + verifyForTime( + t, storageOpts, reader, shardSet, iteratorPool, snapshotTime, + namespace, persist.FileSetSnapshotType, seriesList) } + } diff --git a/src/dbnode/integration/disk_snapshot_test.go b/src/dbnode/integration/disk_snapshot_test.go index 842554b5e4..1dda858e5a 100644 --- a/src/dbnode/integration/disk_snapshot_test.go +++ b/src/dbnode/integration/disk_snapshot_test.go @@ -86,7 +86,9 @@ func TestDiskSnapshotSimple(t *testing.T) { testSetup.setNowFn(now) maxWaitTime := time.Minute for _, ns := range testSetup.namespaces { + log.Debug("waiting for snapshot files to flush") require.NoError(t, waitUntilSnapshotFilesFlushed(filePathPrefix, testSetup.shardSet, ns.ID(), []time.Time{now.Truncate(blockSize)}, maxWaitTime)) + log.Debug("verifying snapshot files") verifySnapshottedDataFiles(t, testSetup.shardSet, testSetup.storageOpts, ns.ID(), now.Truncate(blockSize), seriesMaps) } @@ -94,13 +96,14 @@ func TestDiskSnapshotSimple(t *testing.T) { newTime := oldTime.Add(blockSize * 2) testSetup.setNowFn(newTime) + log.Debug("sleeping for 10x minimum tick interval") testSetup.sleepFor10xTickMinimumInterval() for _, ns := range testSetup.namespaces { - // Make sure new snapshotfiles are written out + log.Debug("waiting for new snapshot files to be written out") require.NoError(t, waitUntilSnapshotFilesFlushed(filePathPrefix, testSetup.shardSet, ns.ID(), []time.Time{newTime.Truncate(blockSize)}, maxWaitTime)) for _, shard := range testSetup.shardSet.All() { + log.Debug("waiting for old snapshot files to be deleted") waitUntil(func() bool { - // Make sure old snapshot files are deleted exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize)) require.NoError(t, err) return !exists diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index 8d4c641ba0..33aaa19483 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -22,7 +22,6 @@ package storage import ( "errors" - "fmt" "sort" "sync" "testing" @@ -370,7 +369,6 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { num = numIntervals(start, snapshotEnd, blockSize) for i := 0; i < num; i++ { st := start.Add(time.Duration(i) * blockSize) - fmt.Println(st) ns.EXPECT().NeedsFlush(st, st).Return(true) ns.EXPECT().Snapshot(st, now, gomock.Any()) } From f60fb669eb3f8f9fa9f84ea101f5ffb4928c930b Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 09:42:18 -0400 Subject: [PATCH 09/26] interim --- src/dbnode/storage/flush.go | 6 ++++++ src/dbnode/storage/namespace.go | 1 + src/dbnode/storage/shard.go | 2 ++ 3 files changed, 9 insertions(+) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index e4a8c6bcf7..da7f3ed2fe 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -104,6 +104,7 @@ func (m *flushManager) Flush( multiErr := xerrors.NewMultiError() m.setState(flushManagerFlushInProgress) + fmt.Println("START") for _, ns := range namespaces { // Flush first because we will only snapshot if there are no outstanding flushes flushTimes := m.namespaceFlushTimes(ns, tickStart) @@ -116,6 +117,7 @@ func (m *flushManager) Flush( } multiErr = multiErr.Add(m.flushNamespaceWithTimes(ns, shardBootstrapTimes, flushTimes, flush)) } + fmt.Println("END") // Perform two separate loops through all the namespaces so that we can emit better // gauges I.E all the flushing for all the namespaces happens at once and then all @@ -126,10 +128,12 @@ func (m *flushManager) Flush( // we attempt a flush befor a snapshot as the snapshotting process will attempt to // snapshot any unflushed blocks which would be wasteful if the block is already // flushable. + fmt.Println("START1") m.setState(flushManagerSnapshotInProgress) for _, ns := range namespaces { snapshotBlockStarts := m.namespaceSnapshotTimes(ns, tickStart) for _, snapshotBlockStart := range snapshotBlockStarts { + fmt.Println("snapshotting: ", snapshotBlockStart) if err := ns.Snapshot(snapshotBlockStart, tickStart, flush); err != nil { detailedErr := fmt.Errorf("namespace %s failed to snapshot data: %v", ns.ID().String(), err) @@ -137,6 +141,7 @@ func (m *flushManager) Flush( } } } + fmt.Println("END1") // mark data flush finished multiErr = multiErr.Add(flush.DoneData()) @@ -243,6 +248,7 @@ func (m *flushManager) flushNamespaceWithTimes( ) error { multiErr := xerrors.NewMultiError() for _, t := range times { + fmt.Println("flushing: ", t) // NB(xichen): we still want to proceed if a namespace fails to flush its data. // Probably want to emit a counter here, but for now just log it. if err := ns.Flush(t, ShardBootstrapStates, flush); err != nil { diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 2e165989dc..466ad23930 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -825,6 +825,7 @@ func (n *dbNamespace) Flush( // used to determine if all of the bootstrapped blocks have been merged / drained (ticked) // and are ready to be flushed. shardBootstrapStateBeforeTick, ok := shardBootstrapStatesAtTickStart[shard.ID()] + // TODO: Apply this same logic to snapshots? if !ok || shardBootstrapStateBeforeTick != Bootstrapped { // We don't own this shard anymore (!ok) or the shard was not bootstrapped // before the previous tick which means that we have no guarantee that all diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 67a1a942b1..e2ac4aaba0 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1922,8 +1922,10 @@ func (s *dbShard) FlushState(blockStart time.Time) fileOpState { func (s *dbShard) markFlushStateSuccessOrError(blockStart time.Time, err error) error { // Track flush state for block state if err == nil { + fmt.Println("mark success") s.markFlushStateSuccess(blockStart) } else { + fmt.Println("mark error") s.markFlushStateFail(blockStart) } return err From 30edd70f0334255f06a14b44c733f51f865280e9 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 11:28:38 -0400 Subject: [PATCH 10/26] Make snapshot wait for tick --- src/dbnode/storage/flush.go | 18 ++++++++++++++++-- src/dbnode/storage/namespace.go | 15 +++++++++++++-- src/dbnode/storage/types.go | 7 ++++++- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index da7f3ed2fe..a91965a59b 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -131,10 +131,24 @@ func (m *flushManager) Flush( fmt.Println("START1") m.setState(flushManagerSnapshotInProgress) for _, ns := range namespaces { - snapshotBlockStarts := m.namespaceSnapshotTimes(ns, tickStart) + var ( + snapshotBlockStarts = m.namespaceSnapshotTimes(ns, tickStart) + shardBootstrapTimes, ok = dbBootstrapStateAtTickStart.NamespaceBootstrapStates[ns.ID().String()] + ) + + if !ok { + // Could happen if namespaces are added / removed. + multiErr = multiErr.Add(fmt.Errorf( + "tried to flush ns: %s, but did not have shard bootstrap times", ns.ID().String())) + continue + } + for _, snapshotBlockStart := range snapshotBlockStarts { fmt.Println("snapshotting: ", snapshotBlockStart) - if err := ns.Snapshot(snapshotBlockStart, tickStart, flush); err != nil { + err := ns.Snapshot( + snapshotBlockStart, tickStart, shardBootstrapTimes, flush) + + if err != nil { detailedErr := fmt.Errorf("namespace %s failed to snapshot data: %v", ns.ID().String(), err) multiErr = multiErr.Add(detailedErr) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 466ad23930..772b1d4998 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -825,7 +825,6 @@ func (n *dbNamespace) Flush( // used to determine if all of the bootstrapped blocks have been merged / drained (ticked) // and are ready to be flushed. shardBootstrapStateBeforeTick, ok := shardBootstrapStatesAtTickStart[shard.ID()] - // TODO: Apply this same logic to snapshots? if !ok || shardBootstrapStateBeforeTick != Bootstrapped { // We don't own this shard anymore (!ok) or the shard was not bootstrapped // before the previous tick which means that we have no guarantee that all @@ -874,7 +873,11 @@ func (n *dbNamespace) FlushIndex( return err } -func (n *dbNamespace) Snapshot(blockStart, snapshotTime time.Time, flush persist.DataFlush) error { +func (n *dbNamespace) Snapshot( + blockStart, + snapshotTime time.Time, + shardBootstrapStatesAtTickStart ShardBootstrapStates, + flush persist.DataFlush) error { // NB(rartoul): This value can be used for emitting metrics, but should not be used // for business logic. callStart := n.nowFn() @@ -910,6 +913,14 @@ func (n *dbNamespace) Snapshot(blockStart, snapshotTime time.Time, flush persist continue } + // We don't need to perform this check for correctness, but we apply the same logic + // here as we do in the Flush() method so that we don't end up snapshotting a bunch + // of shards/blocks that would have been flushed after the next tick. + shardBootstrapStateBeforeTick, ok := shardBootstrapStatesAtTickStart[shard.ID()] + if !ok || shardBootstrapStateBeforeTick != Bootstrapped { + continue + } + err := shard.Snapshot(blockStart, snapshotTime, flush) if err != nil { detailedErr := fmt.Errorf("shard %d failed to snapshot: %v", shard.ID(), err) diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index df41cefcf0..9efceccf21 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -282,7 +282,12 @@ type databaseNamespace interface { ) error // Snapshot snapshots unflushed in-memory data - Snapshot(blockStart, snapshotTime time.Time, flush persist.DataFlush) error + Snapshot( + blockStart, + snapshotTime time.Time, + shardBootstrapStatesAtTickStart ShardBootstrapStates, + flush persist.DataFlush, + ) error // NeedsFlush returns true if the namespace needs a flush for the // period: [start, end] (both inclusive). From d004f4460473b253be1d95ff2a2ec943976e5dbb Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 11:31:54 -0400 Subject: [PATCH 11/26] Add debug log --- src/dbnode/storage/namespace.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 772b1d4998..46dee61643 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -918,6 +918,11 @@ func (n *dbNamespace) Snapshot( // of shards/blocks that would have been flushed after the next tick. shardBootstrapStateBeforeTick, ok := shardBootstrapStatesAtTickStart[shard.ID()] if !ok || shardBootstrapStateBeforeTick != Bootstrapped { + n.log. + WithFields(xlog.NewField("shard", shard.ID())). + WithFields(xlog.NewField("bootstrapStateBeforeTick", shardBootstrapStateBeforeTick)). + WithFields(xlog.NewField("bootstrapStateExists", ok)). + Debug("skipping snapshot") continue } From b5e266ea335fa76cf49af22c0ede60546e8557bb Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 11:33:19 -0400 Subject: [PATCH 12/26] more debug logs --- src/dbnode/storage/namespace.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 46dee61643..d6d62817b9 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -830,6 +830,11 @@ func (n *dbNamespace) Flush( // before the previous tick which means that we have no guarantee that all // bootstrapped blocks have been rotated out of the series buffer buckets, // so we wait until the next opportunity. + n.log. + WithFields(xlog.NewField("shard", shard.ID())). + WithFields(xlog.NewField("bootstrapStateBeforeTick", shardBootstrapStateBeforeTick)). + WithFields(xlog.NewField("bootstrapStateExists", ok)). + Debug("skipping snapshot due to shard bootstrap state before tick") continue } @@ -922,7 +927,7 @@ func (n *dbNamespace) Snapshot( WithFields(xlog.NewField("shard", shard.ID())). WithFields(xlog.NewField("bootstrapStateBeforeTick", shardBootstrapStateBeforeTick)). WithFields(xlog.NewField("bootstrapStateExists", ok)). - Debug("skipping snapshot") + Debug("skipping snapshot due to shard bootstrap state before tick") continue } From 80ba4145ab345a27a647ae3ad56c09002d9d7ff1 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 11:45:52 -0400 Subject: [PATCH 13/26] Refactor namespace tests to support shardBootstrapBeforeTick --- src/dbnode/storage/flush_test.go | 6 +- src/dbnode/storage/namespace_test.go | 83 ++++++++++++++++++++-------- 2 files changed, 64 insertions(+), 25 deletions(-) diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index 33aaa19483..1b3df42cdb 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -172,7 +172,7 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes() ns.EXPECT().Flush(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockFlusher := persist.NewMockDataFlush(ctrl) mockFlusher.EXPECT().DoneData().Return(nil) @@ -210,7 +210,7 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes() ns.EXPECT().Flush(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().FlushIndex(gomock.Any()).Return(nil) mockFlusher := persist.NewMockDataFlush(ctrl) @@ -370,7 +370,7 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { for i := 0; i < num; i++ { st := start.Add(time.Duration(i) * blockSize) ns.EXPECT().NeedsFlush(st, st).Return(true) - ns.EXPECT().Snapshot(st, now, gomock.Any()) + ns.EXPECT().Snapshot(st, now, gomock.Any(), gomock.Any()) } } diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 4395768797..33708520e9 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -405,20 +405,21 @@ func TestNamespaceFlushSkipShardNotBootstrappedBeforeTick(t *testing.T) { blockStart := time.Now().Truncate(ns.Options().RetentionOptions().BlockSize()) shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().ID().Return(testShardIDs[0].ID()) + shard.EXPECT().ID().Return(testShardIDs[0].ID()).AnyTimes() ns.shards[testShardIDs[0].ID()] = shard - ShardBootstrapStates := ShardBootstrapStates{} - ShardBootstrapStates[testShardIDs[0].ID()] = Bootstrapping + shardBootstrapStates := ShardBootstrapStates{} + shardBootstrapStates[testShardIDs[0].ID()] = Bootstrapping - require.NoError(t, ns.Flush(blockStart, ShardBootstrapStates, nil)) + require.NoError(t, ns.Flush(blockStart, shardBootstrapStates, nil)) } type snapshotTestCase struct { - isSnapshotting bool - expectSnapshot bool - lastSnapshotTime func(blockStart time.Time, blockSize time.Duration) time.Time - snapshotErr error + isSnapshotting bool + expectSnapshot bool + shardBootstrapStateBeforeTick BootstrapState + lastSnapshotTime func(blockStart time.Time, blockSize time.Duration) time.Time + snapshotErr error } func TestNamespaceSnapshotNotBootstrapped(t *testing.T) { @@ -435,22 +436,24 @@ func TestNamespaceSnapshotNotBootstrapped(t *testing.T) { blockSize := ns.Options().RetentionOptions().BlockSize() blockStart := time.Now().Truncate(blockSize) - require.Equal(t, errNamespaceNotBootstrapped, ns.Snapshot(blockStart, blockStart, nil)) + require.Equal(t, errNamespaceNotBootstrapped, ns.Snapshot(blockStart, blockStart, nil, nil)) } func TestNamespaceSnapshotNotEnoughTimeSinceLastSnapshot(t *testing.T) { shardMethodResults := []snapshotTestCase{ snapshotTestCase{ - isSnapshotting: false, - expectSnapshot: false, + isSnapshotting: false, + expectSnapshot: false, + shardBootstrapStateBeforeTick: Bootstrapped, lastSnapshotTime: func(curr time.Time, blockSize time.Duration) time.Time { return curr }, snapshotErr: nil, }, snapshotTestCase{ - isSnapshotting: false, - expectSnapshot: true, + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, lastSnapshotTime: func(curr time.Time, blockSize time.Duration) time.Time { return curr.Add(-2 * defaultMinSnapshotInterval) }, @@ -462,24 +465,54 @@ func TestNamespaceSnapshotNotEnoughTimeSinceLastSnapshot(t *testing.T) { func TestNamespaceSnapshotShardIsSnapshotting(t *testing.T) { shardMethodResults := []snapshotTestCase{ - snapshotTestCase{isSnapshotting: false, snapshotErr: nil, expectSnapshot: true}, - snapshotTestCase{isSnapshotting: true, snapshotErr: nil, expectSnapshot: false}, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + snapshotErr: nil, + }, + snapshotTestCase{ + isSnapshotting: true, + expectSnapshot: false, + shardBootstrapStateBeforeTick: Bootstrapped, + snapshotErr: nil, + }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) } func TestNamespaceSnapshotAllShardsSuccess(t *testing.T) { shardMethodResults := []snapshotTestCase{ - snapshotTestCase{isSnapshotting: false, snapshotErr: nil, expectSnapshot: true}, - snapshotTestCase{isSnapshotting: false, snapshotErr: nil, expectSnapshot: true}, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + snapshotErr: nil, + }, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + snapshotErr: nil, + }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) } func TestNamespaceSnapshotShardError(t *testing.T) { shardMethodResults := []snapshotTestCase{ - snapshotTestCase{isSnapshotting: false, snapshotErr: nil, expectSnapshot: true}, - snapshotTestCase{isSnapshotting: false, snapshotErr: errors.New("err"), expectSnapshot: true}, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + snapshotErr: nil, + }, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + snapshotErr: errors.New("err"), + }, } require.Error(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) } @@ -499,8 +532,12 @@ func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapsh ns.nowFn = func() time.Time { return now } - blockSize := ns.Options().RetentionOptions().BlockSize() - blockStart := now.Truncate(blockSize) + + var ( + shardBootstrapStates = ShardBootstrapStates{} + blockSize = ns.Options().RetentionOptions().BlockSize() + blockStart = now.Truncate(blockSize) + ) for i, tc := range shardMethodResults { shard := NewMockdatabaseShard(ctrl) @@ -511,14 +548,16 @@ func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapsh lastSnapshotTime = tc.lastSnapshotTime(now, blockSize) } shard.EXPECT().SnapshotState().Return(tc.isSnapshotting, lastSnapshotTime) + shardID := uint32(i) shard.EXPECT().ID().Return(uint32(i)).AnyTimes() if tc.expectSnapshot { shard.EXPECT().Snapshot(blockStart, now, nil).Return(tc.snapshotErr) } ns.shards[testShardIDs[i].ID()] = shard + shardBootstrapStates[shardID] = tc.shardBootstrapStateBeforeTick } - return ns.Snapshot(blockStart, now, nil) + return ns.Snapshot(blockStart, now, shardBootstrapStates, nil) } func TestNamespaceTruncate(t *testing.T) { From 9ad6cef4031f7c8146657a63d48c3f5859042f8b Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 11:49:13 -0400 Subject: [PATCH 14/26] git status --- src/dbnode/storage/namespace_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 33708520e9..76b45a863a 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -517,6 +517,18 @@ func TestNamespaceSnapshotShardError(t *testing.T) { require.Error(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) } +func TestNamespaceSnapshotShardNotBootstrappedBeforeTick(t *testing.T) { + shardMethodResults := []snapshotTestCase{ + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: false, + shardBootstrapStateBeforeTick: Bootstrapping, + snapshotErr: nil, + }, + } + require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) +} + func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapshotTestCase) error { ctrl := gomock.NewController(t) defer ctrl.Finish() From 45df380ca26ba1152eb1bd3a1a9292ebe13574c9 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 11:49:19 -0400 Subject: [PATCH 15/26] Rename struct field --- src/dbnode/storage/namespace_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 76b45a863a..bb850308e2 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -419,7 +419,7 @@ type snapshotTestCase struct { expectSnapshot bool shardBootstrapStateBeforeTick BootstrapState lastSnapshotTime func(blockStart time.Time, blockSize time.Duration) time.Time - snapshotErr error + shardSnapshotErr error } func TestNamespaceSnapshotNotBootstrapped(t *testing.T) { @@ -448,7 +448,7 @@ func TestNamespaceSnapshotNotEnoughTimeSinceLastSnapshot(t *testing.T) { lastSnapshotTime: func(curr time.Time, blockSize time.Duration) time.Time { return curr }, - snapshotErr: nil, + shardSnapshotErr: nil, }, snapshotTestCase{ isSnapshotting: false, @@ -457,7 +457,7 @@ func TestNamespaceSnapshotNotEnoughTimeSinceLastSnapshot(t *testing.T) { lastSnapshotTime: func(curr time.Time, blockSize time.Duration) time.Time { return curr.Add(-2 * defaultMinSnapshotInterval) }, - snapshotErr: nil, + shardSnapshotErr: nil, }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) @@ -469,13 +469,13 @@ func TestNamespaceSnapshotShardIsSnapshotting(t *testing.T) { isSnapshotting: false, expectSnapshot: true, shardBootstrapStateBeforeTick: Bootstrapped, - snapshotErr: nil, + shardSnapshotErr: nil, }, snapshotTestCase{ isSnapshotting: true, expectSnapshot: false, shardBootstrapStateBeforeTick: Bootstrapped, - snapshotErr: nil, + shardSnapshotErr: nil, }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) @@ -487,13 +487,13 @@ func TestNamespaceSnapshotAllShardsSuccess(t *testing.T) { isSnapshotting: false, expectSnapshot: true, shardBootstrapStateBeforeTick: Bootstrapped, - snapshotErr: nil, + shardSnapshotErr: nil, }, snapshotTestCase{ isSnapshotting: false, expectSnapshot: true, shardBootstrapStateBeforeTick: Bootstrapped, - snapshotErr: nil, + shardSnapshotErr: nil, }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) @@ -505,13 +505,13 @@ func TestNamespaceSnapshotShardError(t *testing.T) { isSnapshotting: false, expectSnapshot: true, shardBootstrapStateBeforeTick: Bootstrapped, - snapshotErr: nil, + shardSnapshotErr: nil, }, snapshotTestCase{ isSnapshotting: false, expectSnapshot: true, shardBootstrapStateBeforeTick: Bootstrapped, - snapshotErr: errors.New("err"), + shardSnapshotErr: errors.New("err"), }, } require.Error(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) @@ -523,7 +523,7 @@ func TestNamespaceSnapshotShardNotBootstrappedBeforeTick(t *testing.T) { isSnapshotting: false, expectSnapshot: false, shardBootstrapStateBeforeTick: Bootstrapping, - snapshotErr: nil, + shardSnapshotErr: nil, }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) @@ -563,7 +563,7 @@ func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapsh shardID := uint32(i) shard.EXPECT().ID().Return(uint32(i)).AnyTimes() if tc.expectSnapshot { - shard.EXPECT().Snapshot(blockStart, now, nil).Return(tc.snapshotErr) + shard.EXPECT().Snapshot(blockStart, now, nil).Return(tc.shardSnapshotErr) } ns.shards[testShardIDs[i].ID()] = shard shardBootstrapStates[shardID] = tc.shardBootstrapStateBeforeTick From 2c8648c31b4285e797b8b63628df2d35016fdfca Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 11:53:03 -0400 Subject: [PATCH 16/26] remove prints --- src/dbnode/storage/flush.go | 6 ------ src/dbnode/storage/shard.go | 2 -- 2 files changed, 8 deletions(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index a91965a59b..6472db4db8 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -104,7 +104,6 @@ func (m *flushManager) Flush( multiErr := xerrors.NewMultiError() m.setState(flushManagerFlushInProgress) - fmt.Println("START") for _, ns := range namespaces { // Flush first because we will only snapshot if there are no outstanding flushes flushTimes := m.namespaceFlushTimes(ns, tickStart) @@ -117,7 +116,6 @@ func (m *flushManager) Flush( } multiErr = multiErr.Add(m.flushNamespaceWithTimes(ns, shardBootstrapTimes, flushTimes, flush)) } - fmt.Println("END") // Perform two separate loops through all the namespaces so that we can emit better // gauges I.E all the flushing for all the namespaces happens at once and then all @@ -128,7 +126,6 @@ func (m *flushManager) Flush( // we attempt a flush befor a snapshot as the snapshotting process will attempt to // snapshot any unflushed blocks which would be wasteful if the block is already // flushable. - fmt.Println("START1") m.setState(flushManagerSnapshotInProgress) for _, ns := range namespaces { var ( @@ -144,7 +141,6 @@ func (m *flushManager) Flush( } for _, snapshotBlockStart := range snapshotBlockStarts { - fmt.Println("snapshotting: ", snapshotBlockStart) err := ns.Snapshot( snapshotBlockStart, tickStart, shardBootstrapTimes, flush) @@ -155,7 +151,6 @@ func (m *flushManager) Flush( } } } - fmt.Println("END1") // mark data flush finished multiErr = multiErr.Add(flush.DoneData()) @@ -262,7 +257,6 @@ func (m *flushManager) flushNamespaceWithTimes( ) error { multiErr := xerrors.NewMultiError() for _, t := range times { - fmt.Println("flushing: ", t) // NB(xichen): we still want to proceed if a namespace fails to flush its data. // Probably want to emit a counter here, but for now just log it. if err := ns.Flush(t, ShardBootstrapStates, flush); err != nil { diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index e2ac4aaba0..67a1a942b1 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1922,10 +1922,8 @@ func (s *dbShard) FlushState(blockStart time.Time) fileOpState { func (s *dbShard) markFlushStateSuccessOrError(blockStart time.Time, err error) error { // Track flush state for block state if err == nil { - fmt.Println("mark success") s.markFlushStateSuccess(blockStart) } else { - fmt.Println("mark error") s.markFlushStateFail(blockStart) } return err From 9c42b2ffc8b20bc7fdcd6191dc70e8f62fd3a608 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 19:00:09 -0400 Subject: [PATCH 17/26] rewrite snapshot integration tests --- src/dbnode/integration/disk_flush_helpers.go | 52 ++++++-- src/dbnode/integration/disk_snapshot_test.go | 111 ++++++++++++++---- ...napshot_mixed_mode_read_write_prop_test.go | 4 +- 3 files changed, 134 insertions(+), 33 deletions(-) diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index b680a92b32..def6fce34e 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -46,23 +46,59 @@ var ( errDiskFlushTimedOut = errors.New("flushing data to disk took too long") ) +type snapshotID struct { + blockStart time.Time + minVolume int +} + +func getLatestSnapshotVolumeIndex( + filePathPrefix string, + shardSet sharding.ShardSet, + namespace ident.ID, + blockStart time.Time, +) int { + latestVolumeIndex := -1 + + for _, shard := range shardSet.AllIDs() { + snapshotFiles, err := fs.SnapshotFiles( + filePathPrefix, namespace, shard) + if err != nil { + panic(err) + } + latestSnapshot, ok := snapshotFiles.LatestVolumeForBlock(blockStart) + if !ok { + continue + } + if latestSnapshot.ID.VolumeIndex > latestVolumeIndex { + latestVolumeIndex = latestSnapshot.ID.VolumeIndex + } + } + + return latestVolumeIndex +} + func waitUntilSnapshotFilesFlushed( filePathPrefix string, shardSet sharding.ShardSet, namespace ident.ID, - expectedSnapshotTimes []time.Time, + expectedSnapshots []snapshotID, timeout time.Duration, ) error { dataFlushed := func() bool { for _, shard := range shardSet.AllIDs() { - for _, t := range expectedSnapshotTimes { - exists, err := fs.SnapshotFileSetExistsAt( - filePathPrefix, namespace, shard, t) + for _, e := range expectedSnapshots { + snapshotFiles, err := fs.SnapshotFiles( + filePathPrefix, namespace, shard) if err != nil { panic(err) } - if !exists { + latest, ok := snapshotFiles.LatestVolumeForBlock(e.blockStart) + if !ok { + return false + } + + if !(latest.ID.VolumeIndex >= e.minVolume) { return false } } @@ -202,16 +238,16 @@ func verifySnapshottedDataFiles( shardSet sharding.ShardSet, storageOpts storage.Options, namespace ident.ID, - snapshotTime time.Time, seriesMaps map[xtime.UnixNano]generate.SeriesBlock, ) { fsOpts := storageOpts.CommitLogOptions().FilesystemOptions() reader, err := fs.NewReader(storageOpts.BytesPool(), fsOpts) require.NoError(t, err) iteratorPool := storageOpts.ReaderIteratorPool() - for _, seriesList := range seriesMaps { + for blockStart, seriesList := range seriesMaps { + verifyForTime( - t, storageOpts, reader, shardSet, iteratorPool, snapshotTime, + t, storageOpts, reader, shardSet, iteratorPool, blockStart.ToTime(), namespace, persist.FileSetSnapshotType, seriesList) } diff --git a/src/dbnode/integration/disk_snapshot_test.go b/src/dbnode/integration/disk_snapshot_test.go index 1dda858e5a..9abfea2be2 100644 --- a/src/dbnode/integration/disk_snapshot_test.go +++ b/src/dbnode/integration/disk_snapshot_test.go @@ -39,12 +39,26 @@ func TestDiskSnapshotSimple(t *testing.T) { t.SkipNow() // Just skip if we're doing a short run } // Test setup - nOpts := namespace.NewOptions().SetSnapshotEnabled(true) + var ( + nOpts = namespace.NewOptions(). + SetSnapshotEnabled(true) + bufferPast = 50 * time.Minute + bufferFuture = 50 * time.Minute + blockSize = time.Hour + ) + + nOpts = nOpts. + SetRetentionOptions(nOpts.RetentionOptions(). + SetBufferFuture(bufferFuture). + SetBufferPast(bufferPast). + SetBlockSize(blockSize)) md1, err := namespace.NewMetadata(testNamespaces[0], nOpts) require.NoError(t, err) md2, err := namespace.NewMetadata(testNamespaces[1], nOpts) require.NoError(t, err) + // md1.SetOptions() + testOpts := newTestOptions(t). SetTickMinimumInterval(time.Second). SetNamespaces([]namespace.Metadata{md1, md2}) @@ -52,8 +66,7 @@ func TestDiskSnapshotSimple(t *testing.T) { require.NoError(t, err) defer testSetup.close() - blockSize := md1.Options().RetentionOptions().BlockSize() - filePathPrefix := testSetup.storageOpts.CommitLogOptions().FilesystemOptions().FilePathPrefix() + shardSet := testSetup.shardSet // Start the server log := testSetup.storageOpts.InstrumentOptions().Logger() @@ -68,41 +81,91 @@ func TestDiskSnapshotSimple(t *testing.T) { }() // Write test data - now := testSetup.getNowFn() - seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock) - inputData := []generate.BlockConfig{ - {IDs: []string{"foo", "bar", "baz"}, NumPoints: 100, Start: now}, - } + var ( + currBlock = testSetup.getNowFn().Truncate(blockSize) + now = currBlock.Add(11 * time.Minute) + ) + // Make sure now is within bufferPast of the previous block + require.True(t, now.Before(now.Truncate(blockSize).Add(bufferPast))) + // Make sure now is within bufferFuture of the next block + require.True(t, now.After(now.Truncate(blockSize).Add(blockSize).Add(-bufferFuture))) + testSetup.setNowFn(now) + + var ( + seriesMaps = make(map[xtime.UnixNano]generate.SeriesBlock) + inputData = []generate.BlockConfig{ + // Writes in the previous block which should be mutable due to bufferPast + {IDs: []string{"foo", "bar", "baz"}, NumPoints: 5, Start: currBlock.Add(-10 * time.Minute)}, + // Writes in the current block + {IDs: []string{"a", "b", "c"}, NumPoints: 30, Start: currBlock}, + // Writes in the next block which should be mutable due to bufferFuture + {IDs: []string{"1", "2", "3"}, NumPoints: 30, Start: currBlock.Add(blockSize)}, + } + ) for _, input := range inputData { - testSetup.setNowFn(input.Start) + // testSetup.setNowFn(input.Start) testData := generate.Block(input) - seriesMaps[xtime.ToUnixNano(input.Start)] = testData + seriesMaps[xtime.ToUnixNano(input.Start.Truncate(blockSize))] = testData for _, ns := range testSetup.namespaces { require.NoError(t, testSetup.writeBatch(ns.ID(), testData)) } } - now = testSetup.getNowFn().Add(blockSize).Add(-10 * time.Minute) + now = testSetup.getNowFn().Add(2 * time.Minute) + // TODO: Make this a function that can be repeated + // Make sure now is within bufferPast of the previous block + require.True(t, now.Before(now.Truncate(blockSize).Add(bufferPast))) + // Make sure now is within bufferFuture of the next block + require.True(t, now.After(now.Truncate(blockSize).Add(blockSize).Add(-bufferFuture))) testSetup.setNowFn(now) - maxWaitTime := time.Minute + + var ( + maxWaitTime = time.Minute + filePathPrefix = testSetup.storageOpts. + CommitLogOptions(). + FilesystemOptions(). + FilePathPrefix() + ) + for _, ns := range testSetup.namespaces { - log.Debug("waiting for snapshot files to flush") - require.NoError(t, waitUntilSnapshotFilesFlushed(filePathPrefix, testSetup.shardSet, ns.ID(), []time.Time{now.Truncate(blockSize)}, maxWaitTime)) - log.Debug("verifying snapshot files") - verifySnapshottedDataFiles(t, testSetup.shardSet, testSetup.storageOpts, ns.ID(), now.Truncate(blockSize), seriesMaps) + snapshotsToWaitFor := []snapshotID{ + { + blockStart: currBlock.Add(-blockSize), + minVolume: getLatestSnapshotVolumeIndex( + filePathPrefix, shardSet, ns.ID(), currBlock.Add(-blockSize)), + }, + { + blockStart: currBlock, + minVolume: getLatestSnapshotVolumeIndex( + filePathPrefix, shardSet, ns.ID(), currBlock), + }, + { + blockStart: currBlock.Add(blockSize), + minVolume: getLatestSnapshotVolumeIndex( + filePathPrefix, shardSet, ns.ID(), currBlock.Add(blockSize)), + }, + } + + log.Info("waiting for snapshot files to flush") + require.NoError(t, waitUntilSnapshotFilesFlushed( + filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)) + log.Info("verifying snapshot files") + verifySnapshottedDataFiles(t, shardSet, testSetup.storageOpts, ns.ID(), seriesMaps) } - oldTime := testSetup.getNowFn() - newTime := oldTime.Add(blockSize * 2) + var ( + oldTime = testSetup.getNowFn() + newTime = oldTime.Add(blockSize * 2) + ) testSetup.setNowFn(newTime) - log.Debug("sleeping for 10x minimum tick interval") - testSetup.sleepFor10xTickMinimumInterval() for _, ns := range testSetup.namespaces { - log.Debug("waiting for new snapshot files to be written out") - require.NoError(t, waitUntilSnapshotFilesFlushed(filePathPrefix, testSetup.shardSet, ns.ID(), []time.Time{newTime.Truncate(blockSize)}, maxWaitTime)) - for _, shard := range testSetup.shardSet.All() { - log.Debug("waiting for old snapshot files to be deleted") + log.Info("waiting for new snapshot files to be written out") + snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}} + require.NoError(t, waitUntilSnapshotFilesFlushed( + filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)) + log.Info("waiting for old snapshot files to be deleted") + for _, shard := range shardSet.All() { waitUntil(func() bool { exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize)) require.NoError(t, err) diff --git a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go index 4949bcc735..b6926c29a8 100644 --- a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go +++ b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go @@ -236,7 +236,9 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) { filePathPrefix, setup.shardSet, nsID, - []time.Time{snapshotBlock}, maxFlushWaitTime) + []snapshotID{{blockStart: snapshotBlock}}, + maxFlushWaitTime, + ) if err != nil { return false, fmt.Errorf("error waiting for snapshot files: %s", err.Error()) } From 869c4dc514e5b826e8ab6eeae4dbe322ae1a0e83 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 19:04:48 -0400 Subject: [PATCH 18/26] Refactor test --- src/dbnode/integration/disk_snapshot_test.go | 24 +++++++++----------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/dbnode/integration/disk_snapshot_test.go b/src/dbnode/integration/disk_snapshot_test.go index 9abfea2be2..31453f0278 100644 --- a/src/dbnode/integration/disk_snapshot_test.go +++ b/src/dbnode/integration/disk_snapshot_test.go @@ -57,8 +57,6 @@ func TestDiskSnapshotSimple(t *testing.T) { md2, err := namespace.NewMetadata(testNamespaces[1], nOpts) require.NoError(t, err) - // md1.SetOptions() - testOpts := newTestOptions(t). SetTickMinimumInterval(time.Second). SetNamespaces([]namespace.Metadata{md1, md2}) @@ -82,13 +80,17 @@ func TestDiskSnapshotSimple(t *testing.T) { // Write test data var ( - currBlock = testSetup.getNowFn().Truncate(blockSize) - now = currBlock.Add(11 * time.Minute) + currBlock = testSetup.getNowFn().Truncate(blockSize) + now = currBlock.Add(11 * time.Minute) + assertTimeAllowsWritesToAllBlocks = func(t time.Time) { + // Make sure now is within bufferPast of the previous block + require.True(t, now.Before(now.Truncate(blockSize).Add(bufferPast))) + // Make sure now is within bufferFuture of the next block + require.True(t, now.After(now.Truncate(blockSize).Add(blockSize).Add(-bufferFuture))) + } ) - // Make sure now is within bufferPast of the previous block - require.True(t, now.Before(now.Truncate(blockSize).Add(bufferPast))) - // Make sure now is within bufferFuture of the next block - require.True(t, now.After(now.Truncate(blockSize).Add(blockSize).Add(-bufferFuture))) + + assertTimeAllowsWritesToAllBlocks(now) testSetup.setNowFn(now) var ( @@ -112,11 +114,7 @@ func TestDiskSnapshotSimple(t *testing.T) { } now = testSetup.getNowFn().Add(2 * time.Minute) - // TODO: Make this a function that can be repeated - // Make sure now is within bufferPast of the previous block - require.True(t, now.Before(now.Truncate(blockSize).Add(bufferPast))) - // Make sure now is within bufferFuture of the next block - require.True(t, now.After(now.Truncate(blockSize).Add(blockSize).Add(-bufferFuture))) + assertTimeAllowsWritesToAllBlocks(now) testSetup.setNowFn(now) var ( From b60e33024478ffec858bbe915af45fcd59d6a296 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 19:12:40 -0400 Subject: [PATCH 19/26] Refactor integration test --- src/dbnode/integration/disk_snapshot_test.go | 47 +++++++++++--------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/dbnode/integration/disk_snapshot_test.go b/src/dbnode/integration/disk_snapshot_test.go index 31453f0278..8d5dc287da 100644 --- a/src/dbnode/integration/disk_snapshot_test.go +++ b/src/dbnode/integration/disk_snapshot_test.go @@ -82,11 +82,11 @@ func TestDiskSnapshotSimple(t *testing.T) { var ( currBlock = testSetup.getNowFn().Truncate(blockSize) now = currBlock.Add(11 * time.Minute) - assertTimeAllowsWritesToAllBlocks = func(t time.Time) { + assertTimeAllowsWritesToAllBlocks = func(ti time.Time) { // Make sure now is within bufferPast of the previous block - require.True(t, now.Before(now.Truncate(blockSize).Add(bufferPast))) + require.True(t, ti.Before(ti.Truncate(blockSize).Add(bufferPast))) // Make sure now is within bufferFuture of the next block - require.True(t, now.After(now.Truncate(blockSize).Add(blockSize).Add(-bufferFuture))) + require.True(t, ti.After(ti.Truncate(blockSize).Add(blockSize).Add(-bufferFuture))) } ) @@ -105,7 +105,6 @@ func TestDiskSnapshotSimple(t *testing.T) { } ) for _, input := range inputData { - // testSetup.setNowFn(input.Start) testData := generate.Block(input) seriesMaps[xtime.ToUnixNano(input.Start.Truncate(blockSize))] = testData for _, ns := range testSetup.namespaces { @@ -113,40 +112,48 @@ func TestDiskSnapshotSimple(t *testing.T) { } } - now = testSetup.getNowFn().Add(2 * time.Minute) - assertTimeAllowsWritesToAllBlocks(now) - testSetup.setNowFn(now) - + // Now that we've completed the writes, we need to make sure that we wait for + // the next snapshot to guarantee that it should contain all the writes. We do + // this by measuring the current highest snapshot volume index and then updating + // the time (to allow another snapshot process to occur due to the configurable + // minimum time between snapshots), and then waiting for the snapshot files with + // the measure volume index + 1. var ( - maxWaitTime = time.Minute - filePathPrefix = testSetup.storageOpts. - CommitLogOptions(). - FilesystemOptions(). - FilePathPrefix() + snapshotsToWaitForByNS = make([][]snapshotID, 0, len(testSetup.namespaces)) + filePathPrefix = testSetup.storageOpts. + CommitLogOptions(). + FilesystemOptions(). + FilePathPrefix() ) - for _, ns := range testSetup.namespaces { - snapshotsToWaitFor := []snapshotID{ + snapshotsToWaitForByNS = append(snapshotsToWaitForByNS, []snapshotID{ { blockStart: currBlock.Add(-blockSize), minVolume: getLatestSnapshotVolumeIndex( - filePathPrefix, shardSet, ns.ID(), currBlock.Add(-blockSize)), + filePathPrefix, shardSet, ns.ID(), currBlock.Add(-blockSize)) + 1, }, { blockStart: currBlock, minVolume: getLatestSnapshotVolumeIndex( - filePathPrefix, shardSet, ns.ID(), currBlock), + filePathPrefix, shardSet, ns.ID(), currBlock) + 1, }, { blockStart: currBlock.Add(blockSize), minVolume: getLatestSnapshotVolumeIndex( - filePathPrefix, shardSet, ns.ID(), currBlock.Add(blockSize)), + filePathPrefix, shardSet, ns.ID(), currBlock.Add(blockSize)) + 1, }, - } + }) + } + + now = testSetup.getNowFn().Add(2 * time.Minute) + assertTimeAllowsWritesToAllBlocks(now) + testSetup.setNowFn(now) + maxWaitTime := time.Minute + for i, ns := range testSetup.namespaces { log.Info("waiting for snapshot files to flush") require.NoError(t, waitUntilSnapshotFilesFlushed( - filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)) + filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)) log.Info("verifying snapshot files") verifySnapshottedDataFiles(t, shardSet, testSetup.storageOpts, ns.ID(), seriesMaps) } From c8e47a6c64d8d28f78d1e326d945c631d3072c5c Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 19:13:26 -0400 Subject: [PATCH 20/26] Fix typo --- src/dbnode/storage/flush.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index 6472db4db8..e11a2651f7 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -123,7 +123,7 @@ func (m *flushManager) Flush( // better semantically because flushing should take priority over snapshotting. // // In addition, we need to make sure that for any given shard/blockStart combination, - // we attempt a flush befor a snapshot as the snapshotting process will attempt to + // we attempt a flush before a snapshot as the snapshotting process will attempt to // snapshot any unflushed blocks which would be wasteful if the block is already // flushable. m.setState(flushManagerSnapshotInProgress) From ee4f425581be99d2e9b48a670e6dee06df22bb19 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Fri, 5 Oct 2018 19:14:20 -0400 Subject: [PATCH 21/26] Fix typo --- src/dbnode/storage/flush.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index e11a2651f7..d9834e75f7 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -232,7 +232,7 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti rOpts = ns.Options().RetentionOptions() blockSize = rOpts.BlockSize() // Earliest possible snapshottable block is the earliest possible flushable - // block start which is the first block in the retention period. + // blockStart which is the first block in the retention period. earliest = retention.FlushTimeStart(rOpts, curr) // Latest possible snapshotting block is either the current block OR the // next block if the current time and bufferFuture configuration would From 1822b83bae3f7af6dade2cff9da51181992f2702 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 15 Nov 2018 16:01:50 -0500 Subject: [PATCH 22/26] Dont reverse order --- src/dbnode/storage/storage_mock.go | 8 ++++---- src/dbnode/storage/util.go | 2 +- src/dbnode/storage/util_test.go | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index a70330f9f1..ec9c87fda8 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -930,15 +930,15 @@ func (mr *MockdatabaseNamespaceMockRecorder) FlushIndex(flush interface{}) *gomo } // Snapshot mocks base method -func (m *MockdatabaseNamespace) Snapshot(blockStart, snapshotTime time.Time, flush persist.DataFlush) error { - ret := m.ctrl.Call(m, "Snapshot", blockStart, snapshotTime, flush) +func (m *MockdatabaseNamespace) Snapshot(blockStart, snapshotTime time.Time, shardBootstrapStatesAtTickStart ShardBootstrapStates, flush persist.DataFlush) error { + ret := m.ctrl.Call(m, "Snapshot", blockStart, snapshotTime, shardBootstrapStatesAtTickStart, flush) ret0, _ := ret[0].(error) return ret0 } // Snapshot indicates an expected call of Snapshot -func (mr *MockdatabaseNamespaceMockRecorder) Snapshot(blockStart, snapshotTime, flush interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Snapshot", reflect.TypeOf((*MockdatabaseNamespace)(nil).Snapshot), blockStart, snapshotTime, flush) +func (mr *MockdatabaseNamespaceMockRecorder) Snapshot(blockStart, snapshotTime, shardBootstrapStatesAtTickStart, flush interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Snapshot", reflect.TypeOf((*MockdatabaseNamespace)(nil).Snapshot), blockStart, snapshotTime, shardBootstrapStatesAtTickStart, flush) } // NeedsFlush mocks base method diff --git a/src/dbnode/storage/util.go b/src/dbnode/storage/util.go index 9ce57935e7..87faa3a8a2 100644 --- a/src/dbnode/storage/util.go +++ b/src/dbnode/storage/util.go @@ -47,7 +47,7 @@ func timesInRange(startInclusive, endInclusive time.Time, windowSize time.Durati return nil } times := make([]time.Time, 0, ni) - for t := startInclusive; !t.After(endInclusive); t = t.Add(windowSize) { + for t := endInclusive; !t.Before(startInclusive); t = t.Add(-windowSize) { times = append(times, t) } return times diff --git a/src/dbnode/storage/util_test.go b/src/dbnode/storage/util_test.go index b9600748a1..b750aa4447 100644 --- a/src/dbnode/storage/util_test.go +++ b/src/dbnode/storage/util_test.go @@ -77,14 +77,14 @@ func TestTimesInRange(t *testing.T) { } timesForN = func(i, j int64) []time.Time { times := make([]time.Time, 0, j-i+1) - for k := i; k <= j; k++ { + for k := j; k >= i; k-- { times = append(times, timeFor(k)) } return times } ) // [0, 2] with a gap of 1 ==> [0, 1, 2] - require.Equal(t, []time.Time{timeFor(0), timeFor(1), timeFor(2)}, + require.Equal(t, []time.Time{timeFor(2), timeFor(1), timeFor(0)}, timesInRange(timeFor(0), timeFor(2), w)) // [2, 1] with a gap of 1 ==> empty result @@ -95,13 +95,13 @@ func TestTimesInRange(t *testing.T) { timesInRange(timeFor(2), timeFor(2), w)) // [0, 9] with a gap of 3 ==> [0, 3, 6, 9] - require.Equal(t, []time.Time{timeFor(0), timeFor(3), timeFor(6), timeFor(9)}, + require.Equal(t, []time.Time{timeFor(9), timeFor(6), timeFor(3), timeFor(0)}, timesInRange(timeFor(0), timeFor(9), 3*w)) // [1, 9] with a gap of 3 ==> [1, 4, 7] - require.Equal(t, []time.Time{timeFor(1), timeFor(4), timeFor(7)}, + require.Equal(t, []time.Time{timeFor(9), timeFor(6), timeFor(3)}, timesInRange(timeFor(1), timeFor(9), 3*w)) - // [1, 100] with a gap of 1 ==> [1, ..., 99, 100] + // [1, 100] with a gap of 1 ==> [100, 99, ..., 1] require.Equal(t, timesForN(1, 100), timesInRange(timeFor(1), timeFor(100), w)) } From 1d4a7b73095ca85aa64b7d7a610d303fbda18b0f Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 15 Nov 2018 16:02:39 -0500 Subject: [PATCH 23/26] fix comment --- src/dbnode/storage/util_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/util_test.go b/src/dbnode/storage/util_test.go index b750aa4447..6264412ec3 100644 --- a/src/dbnode/storage/util_test.go +++ b/src/dbnode/storage/util_test.go @@ -94,11 +94,11 @@ func TestTimesInRange(t *testing.T) { require.Equal(t, []time.Time{timeFor(2)}, timesInRange(timeFor(2), timeFor(2), w)) - // [0, 9] with a gap of 3 ==> [0, 3, 6, 9] + // [0, 9] with a gap of 3 ==> [9, 6, 3, 0] require.Equal(t, []time.Time{timeFor(9), timeFor(6), timeFor(3), timeFor(0)}, timesInRange(timeFor(0), timeFor(9), 3*w)) - // [1, 9] with a gap of 3 ==> [1, 4, 7] + // [1, 9] with a gap of 3 ==> [9, 6, 3] require.Equal(t, []time.Time{timeFor(9), timeFor(6), timeFor(3)}, timesInRange(timeFor(1), timeFor(9), 3*w)) From efd5bdfc8e510c691b9c9e2e226ef70b45172c08 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 15 Nov 2018 16:08:28 -0500 Subject: [PATCH 24/26] Add comment --- src/dbnode/storage/flush.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index d9834e75f7..1bc2a69ac4 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -236,7 +236,9 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti earliest = retention.FlushTimeStart(rOpts, curr) // Latest possible snapshotting block is either the current block OR the // next block if the current time and bufferFuture configuration would - // allow writes to be written into the next block. + // allow writes to be written into the next block. Note that "current time" + // here is defined as "tick start time" because all the guarantees about + // snapshotting are based around the tick start time, now the current time. latest = curr.Add(rOpts.BufferFuture()).Truncate(blockSize) ) From e11b8a5749b8d43bc955ef24c9f2874beeb236e9 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 15 Nov 2018 16:09:15 -0500 Subject: [PATCH 25/26] Fix comment --- src/dbnode/storage/flush.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index 1bc2a69ac4..cea5482739 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -244,7 +244,7 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti candidateTimes := timesInRange(earliest, latest, blockSize) return filterTimes(candidateTimes, func(t time.Time) bool { - // Snapshot anything that is unflushed + // Snapshot anything that is unflushed. return ns.NeedsFlush(t, t) }) } From aa34af75f1b04b57ecda64e8cd0a23d96a0af77a Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 15 Nov 2018 16:19:50 -0500 Subject: [PATCH 26/26] Add gauge --- src/dbnode/storage/flush.go | 40 +++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index cea5482739..38bb78c4aa 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -62,17 +62,22 @@ type flushManager struct { isFlushing tally.Gauge isSnapshotting tally.Gauge isIndexFlushing tally.Gauge + + // This is a "debug" metric for making sure that the snapshotting process + // is not overly aggressive. + maxBlocksSnapshottedByNamespace tally.Gauge } func newFlushManager(database database, scope tally.Scope) databaseFlushManager { opts := database.Options() return &flushManager{ - database: database, - opts: opts, - pm: opts.PersistManager(), - isFlushing: scope.Gauge("flush"), - isSnapshotting: scope.Gauge("snapshot"), - isIndexFlushing: scope.Gauge("index-flush"), + database: database, + opts: opts, + pm: opts.PersistManager(), + isFlushing: scope.Gauge("flush"), + isSnapshotting: scope.Gauge("snapshot"), + isIndexFlushing: scope.Gauge("index-flush"), + maxBlocksSnapshottedByNamespace: scope.Gauge("max-blocks-snapshotted-by-namespace"), } } @@ -102,6 +107,15 @@ func (m *flushManager) Flush( return err } + // Perform two separate loops through all the namespaces so that we can emit better + // gauges I.E all the flushing for all the namespaces happens at once and then all + // the snapshotting for all the namespaces happens at once. This is also slightly + // better semantically because flushing should take priority over snapshotting. + // + // In addition, we need to make sure that for any given shard/blockStart combination, + // we attempt a flush before a snapshot as the snapshotting process will attempt to + // snapshot any unflushed blocks which would be wasteful if the block is already + // flushable. multiErr := xerrors.NewMultiError() m.setState(flushManagerFlushInProgress) for _, ns := range namespaces { @@ -117,16 +131,8 @@ func (m *flushManager) Flush( multiErr = multiErr.Add(m.flushNamespaceWithTimes(ns, shardBootstrapTimes, flushTimes, flush)) } - // Perform two separate loops through all the namespaces so that we can emit better - // gauges I.E all the flushing for all the namespaces happens at once and then all - // the snapshotting for all the namespaces happens at once. This is also slightly - // better semantically because flushing should take priority over snapshotting. - // - // In addition, we need to make sure that for any given shard/blockStart combination, - // we attempt a flush before a snapshot as the snapshotting process will attempt to - // snapshot any unflushed blocks which would be wasteful if the block is already - // flushable. m.setState(flushManagerSnapshotInProgress) + maxBlocksSnapshottedByNamespace := 0 for _, ns := range namespaces { var ( snapshotBlockStarts = m.namespaceSnapshotTimes(ns, tickStart) @@ -140,6 +146,9 @@ func (m *flushManager) Flush( continue } + if len(snapshotBlockStarts) > maxBlocksSnapshottedByNamespace { + maxBlocksSnapshottedByNamespace = len(snapshotBlockStarts) + } for _, snapshotBlockStart := range snapshotBlockStarts { err := ns.Snapshot( snapshotBlockStart, tickStart, shardBootstrapTimes, flush) @@ -151,6 +160,7 @@ func (m *flushManager) Flush( } } } + m.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace)) // mark data flush finished multiErr = multiErr.Add(flush.DoneData())