diff --git a/src/dbnode/generated/mocks/generate.go b/src/dbnode/generated/mocks/generate.go index 0c8ad0f3f5..83b5e48972 100644 --- a/src/dbnode/generated/mocks/generate.go +++ b/src/dbnode/generated/mocks/generate.go @@ -24,7 +24,7 @@ //go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go" //go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest" //go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go" -//go:generate sh -c "mockgen -package=lookup $PACKAGE/src/dbnode/storage/series/lookup IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage/series/lookup -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/lookup/lookup_mock.go" +//go:generate sh -c "mockgen -package=storage $PACKAGE/src/dbnode/storage IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage -out $GOPATH/src/$PACKAGE/src/dbnode/storage/lookup_mock.go" // mockgen rules for generating mocks for unexported interfaces (file mode) //go:generate sh -c "mockgen -package=encoding -destination=$GOPATH/src/$PACKAGE/src/dbnode/encoding/encoding_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/encoding/types.go" diff --git a/src/dbnode/integration/index_active_block_rotate_test.go b/src/dbnode/integration/index_active_block_rotate_test.go index 0abc5e8924..87d96902bb 100644 --- a/src/dbnode/integration/index_active_block_rotate_test.go +++ b/src/dbnode/integration/index_active_block_rotate_test.go @@ -51,7 +51,7 @@ func TestIndexActiveBlockRotate(t *testing.T) { numWrites = 50 numTags = 10 blockSize = 2 * time.Hour - indexBlockSize = blockSize + indexBlockSize = blockSize * 2 retentionPeriod = 12 * blockSize bufferPast = 10 * time.Minute rOpts = retention.NewOptions(). diff --git a/src/dbnode/storage/series/lookup/entry.go b/src/dbnode/storage/entry.go similarity index 85% rename from src/dbnode/storage/series/lookup/entry.go rename to src/dbnode/storage/entry.go index 31209dd20f..49c6508863 100644 --- a/src/dbnode/storage/series/lookup/entry.go +++ b/src/dbnode/storage/entry.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package lookup +package storage import ( "sync" @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts/writes" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" xtime "github.com/m3db/m3/src/x/time" @@ -58,18 +59,18 @@ type IndexWriter interface { // members to track lifecycle and minimize indexing overhead. // NB: users are expected to use `NewEntry` to construct these objects. type Entry struct { - relookupAndIncrementReaderWriterCount func() (index.OnIndexSeries, bool) - Series series.DatabaseSeries - Index uint64 - indexWriter IndexWriter - curReadWriters int32 - reverseIndex entryIndexState - nowFn clock.NowFn - pendingIndexBatchSizeOne []writes.PendingIndexInsert + Shard Shard + Series series.DatabaseSeries + Index uint64 + indexWriter IndexWriter + curReadWriters int32 + reverseIndex entryIndexState + nowFn clock.NowFn + pendingIndexBatchSizeOne []writes.PendingIndexInsert } -// ensure Entry satisfies the `index.OnIndexSeries` interface. -var _ index.OnIndexSeries = &Entry{} +// ensure Entry satisfies the `doc.OnIndexSeries` interface. +var _ doc.OnIndexSeries = &Entry{} // ensure Entry satisfies the `bootstrap.SeriesRef` interface. var _ bootstrap.SeriesRef = &Entry{} @@ -79,11 +80,11 @@ var _ bootstrap.SeriesRefResolver = &Entry{} // NewEntryOptions supplies options for a new entry. type NewEntryOptions struct { - RelookupAndIncrementReaderWriterCount func() (index.OnIndexSeries, bool) - Series series.DatabaseSeries - Index uint64 - IndexWriter IndexWriter - NowFn clock.NowFn + Shard Shard + Series series.DatabaseSeries + Index uint64 + IndexWriter IndexWriter + NowFn clock.NowFn } // NewEntry returns a new Entry. @@ -93,22 +94,17 @@ func NewEntry(opts NewEntryOptions) *Entry { nowFn = opts.NowFn } entry := &Entry{ - relookupAndIncrementReaderWriterCount: opts.RelookupAndIncrementReaderWriterCount, - Series: opts.Series, - Index: opts.Index, - indexWriter: opts.IndexWriter, - nowFn: nowFn, - pendingIndexBatchSizeOne: make([]writes.PendingIndexInsert, 1), - reverseIndex: newEntryIndexState(), + Shard: opts.Shard, + Series: opts.Series, + Index: opts.Index, + indexWriter: opts.IndexWriter, + nowFn: nowFn, + pendingIndexBatchSizeOne: make([]writes.PendingIndexInsert, 1), + reverseIndex: newEntryIndexState(), } return entry } -// RelookupAndIncrementReaderWriterCount will relookup the entry. -func (entry *Entry) RelookupAndIncrementReaderWriterCount() (index.OnIndexSeries, bool) { - return entry.relookupAndIncrementReaderWriterCount() -} - // ReaderWriterCount returns the current ref count on the Entry. func (entry *Entry) ReaderWriterCount() int32 { return atomic.LoadInt32(&entry.curReadWriters) @@ -124,6 +120,14 @@ func (entry *Entry) DecrementReaderWriterCount() { atomic.AddInt32(&entry.curReadWriters, -1) } +// IndexedBlockCount returns the count of indexed block states. +func (entry *Entry) IndexedBlockCount() int { + entry.reverseIndex.RLock() + count := len(entry.reverseIndex.states) + entry.reverseIndex.RUnlock() + return count +} + // IndexedForBlockStart returns a bool to indicate if the Entry has been successfully // indexed for the given index blockstart. func (entry *Entry) IndexedForBlockStart(indexBlockStart xtime.UnixNano) bool { @@ -231,23 +235,17 @@ func (entry *Entry) IfAlreadyIndexedMarkIndexSuccessAndFinalize( return successAlready } -// RemoveIndexedForBlockStarts removes the entry for the index for all blockStarts. -func (entry *Entry) RemoveIndexedForBlockStarts( - blockStarts map[xtime.UnixNano]struct{}, -) index.RemoveIndexedForBlockStartsResult { - var result index.RemoveIndexedForBlockStartsResult - entry.reverseIndex.Lock() - for k, state := range entry.reverseIndex.states { - _, ok := blockStarts[k] - if ok && state.success { - delete(entry.reverseIndex.states, k) - result.IndexedBlockStartsRemoved++ - continue - } - result.IndexedBlockStartsRemaining++ +// RelookupAndCheckIsEmpty looks up the series and checks if it is empty. +// The first result indicates if the series is empty. +// The second result indicates if the series can be looked up at all. +func (entry *Entry) RelookupAndCheckIsEmpty() (bool, bool) { + e, _, err := entry.Shard.TryRetrieveWritableSeries(entry.Series.ID()) + if err != nil || e == nil { + return false, false } - entry.reverseIndex.Unlock() - return result + defer entry.DecrementReaderWriterCount() + + return entry.Series.IsEmpty(), true } // Write writes a new value. diff --git a/src/dbnode/storage/series/lookup/entry_blackbox_test.go b/src/dbnode/storage/entry_blackbox_test.go similarity index 91% rename from src/dbnode/storage/series/lookup/entry_blackbox_test.go rename to src/dbnode/storage/entry_blackbox_test.go index e7a059ed93..97447e4fff 100644 --- a/src/dbnode/storage/series/lookup/entry_blackbox_test.go +++ b/src/dbnode/storage/entry_blackbox_test.go @@ -18,14 +18,13 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package lookup_test +package storage import ( "sync" "testing" "time" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" xtime "github.com/m3db/m3/src/x/time" "github.com/fortytw2/leaktest" @@ -43,7 +42,7 @@ func newTime(n int) xtime.UnixNano { } func TestEntryReaderWriterCount(t *testing.T) { - e := lookup.NewEntry(lookup.NewEntryOptions{}) + e := NewEntry(NewEntryOptions{}) require.Equal(t, int32(0), e.ReaderWriterCount()) e.IncrementReaderWriterCount() @@ -54,7 +53,7 @@ func TestEntryReaderWriterCount(t *testing.T) { } func TestEntryIndexSuccessPath(t *testing.T) { - e := lookup.NewEntry(lookup.NewEntryOptions{}) + e := NewEntry(NewEntryOptions{}) t0 := newTime(0) require.False(t, e.IndexedForBlockStart(t0)) @@ -69,7 +68,7 @@ func TestEntryIndexSuccessPath(t *testing.T) { } func TestEntryIndexFailPath(t *testing.T) { - e := lookup.NewEntry(lookup.NewEntryOptions{}) + e := NewEntry(NewEntryOptions{}) t0 := newTime(0) require.False(t, e.IndexedForBlockStart(t0)) @@ -85,7 +84,7 @@ func TestEntryIndexFailPath(t *testing.T) { func TestEntryMultipleGoroutinesRaceIndexUpdate(t *testing.T) { defer leaktest.CheckTimeout(t, time.Second)() - e := lookup.NewEntry(lookup.NewEntryOptions{}) + e := NewEntry(NewEntryOptions{}) t0 := newTime(0) require.False(t, e.IndexedForBlockStart(t0)) diff --git a/src/dbnode/storage/series/lookup/entry_whitebox_test.go b/src/dbnode/storage/entry_whitebox_test.go similarity index 89% rename from src/dbnode/storage/series/lookup/entry_whitebox_test.go rename to src/dbnode/storage/entry_whitebox_test.go index 52de0011b7..34a552cfb0 100644 --- a/src/dbnode/storage/series/lookup/entry_whitebox_test.go +++ b/src/dbnode/storage/entry_whitebox_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package lookup +package storage import ( "testing" @@ -35,22 +35,12 @@ import ( "github.com/stretchr/testify/require" ) -var ( - initTime = time.Date(2018, time.May, 12, 15, 55, 0, 0, time.UTC) - testBlockSize = 24 * time.Hour -) - -func newTime(n int) xtime.UnixNano { - t := initTime.Truncate(testBlockSize).Add(time.Duration(n) * testBlockSize) - return xtime.ToUnixNano(t) -} - func TestEntryIndexAttemptRotatesSlice(t *testing.T) { e := NewEntry(NewEntryOptions{}) for i := 0; i < 10; i++ { ti := newTime(i) require.True(t, e.NeedsIndexUpdate(ti)) - require.Equal(t, i+1, len(e.reverseIndex.states)) + require.Equal(t, i+1, e.IndexedBlockCount()) } // ensure only the latest ones are held on to diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index 242c676d83..7e206ffa71 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -144,7 +144,7 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error { // will attempt to snapshot blocks w/ unflushed data which would be wasteful if // the block is already flushable. multiErr := xerrors.NewMultiError() - if err = m.dataWarmFlush(namespaces, startTime); err != nil { + if err := m.dataWarmFlush(namespaces, startTime); err != nil { multiErr = multiErr.Add(err) } @@ -159,7 +159,7 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error { multiErr = multiErr.Add(fmt.Errorf("error rotating commitlog in mediator tick: %v", err)) } - if err = m.indexFlush(namespaces); err != nil { + if err := m.indexFlush(namespaces); err != nil { multiErr = multiErr.Add(err) } @@ -187,8 +187,7 @@ func (m *flushManager) dataWarmFlush( multiErr = multiErr.Add(err) continue } - err = m.flushNamespaceWithTimes(ns, flushTimes, flushPersist) - if err != nil { + if err := m.flushNamespaceWithTimes(ns, flushTimes, flushPersist); err != nil { multiErr = multiErr.Add(err) } } @@ -272,7 +271,10 @@ func (m *flushManager) indexFlush( if !indexEnabled { continue } - multiErr = multiErr.Add(ns.FlushIndex(indexFlush)) + + if err := ns.FlushIndex(indexFlush); err != nil { + multiErr = multiErr.Add(err) + } } multiErr = multiErr.Add(indexFlush.DoneIndex()) diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index 667c1f05ff..b072dd6e03 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -315,12 +315,16 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { defer ctrl.Finish() nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(false)) + s1 := NewMockdatabaseShard(ctrl) + s2 := NewMockdatabaseShard(ctrl) ns := NewMockdatabaseNamespace(ctrl) ns.EXPECT().Options().Return(nsOpts).AnyTimes() ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + s1.EXPECT().ID().Return(uint32(1)).AnyTimes() + s2.EXPECT().ID().Return(uint32(2)).AnyTimes() var ( mockFlushPersist = persist.NewMockFlushPreparer(ctrl) @@ -357,14 +361,25 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() + blocks := 24 nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(true)) + s1 := NewMockdatabaseShard(ctrl) + s2 := NewMockdatabaseShard(ctrl) ns := NewMockdatabaseNamespace(ctrl) ns.EXPECT().Options().Return(nsOpts).AnyTimes() ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() - ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().FlushIndex(gomock.Any()).Return(nil) + s1.EXPECT().ID().Return(uint32(1)).AnyTimes() + s2.EXPECT().ID().Return(uint32(2)).AnyTimes() + + // Validate that the flush state is marked as successful only AFTER all prequisite steps have been run. + // Order is important to avoid any edge case where data is GCed from memory without all flushing operations + // being completed. + gomock.InOrder( + ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).Times(blocks), + ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes(), + ns.EXPECT().FlushIndex(gomock.Any()).Return(nil), + ) var ( mockFlushPersist = persist.NewMockFlushPreparer(ctrl) diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index c6f99366df..7c247b0a65 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -39,12 +39,17 @@ const ( fileOpFailed ) +type warmStatus struct { + DataFlushed fileOpStatus + IndexFlushed fileOpStatus +} + type fileOpState struct { // WarmStatus is the status of data persistence for WarmWrites only. // Each block will only be warm-flushed once, so not keeping track of a // version here is okay. This is used in the buffer Tick to determine when // a warm bucket is evictable from memory. - WarmStatus fileOpStatus + WarmStatus warmStatus // ColdVersionRetrievable keeps track of data persistence for ColdWrites only. // Each block can be cold-flushed multiple times, so this tracks which // version of the flush completed successfully. This is ultimately used in diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 7892c0e06c..595bd0a3d8 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -929,9 +929,6 @@ func (i *nsIndex) Tick( // such as notify of sealed blocks. tickingBlocks, multiErr := i.tickingBlocks(startTime) - // Track blocks that are flushed (and have their bootstrapped results). - flushedBlocks := make([]xtime.UnixNano, 0, len(tickingBlocks.tickingBlocks)) - result.NumBlocks = int64(tickingBlocks.totalBlocks) for _, block := range tickingBlocks.tickingBlocks { if c.IsCancelled() { @@ -946,10 +943,6 @@ func (i *nsIndex) Tick( result.NumSegmentsMutable += blockTickResult.NumSegmentsMutable result.NumTotalDocs += blockTickResult.NumDocs result.FreeMmap += blockTickResult.FreeMmap - - if tickErr == nil && blockTickResult.NumSegmentsBootstrapped != 0 { - flushedBlocks = append(flushedBlocks, block.StartTime()) - } } blockTickResult, tickErr := tickingBlocks.activeBlock.Tick(c) @@ -960,18 +953,6 @@ func (i *nsIndex) Tick( result.NumTotalDocs += blockTickResult.NumDocs result.FreeMmap += blockTickResult.FreeMmap - // Notify in memory block of sealed and flushed blocks - // and make sure to do this out of the lock since - // this can take a considerable amount of time - // and is an expensive task that doesn't require - // holding the index lock. - notifyErr := tickingBlocks.activeBlock.ActiveBlockNotifyFlushedBlocks(flushedBlocks) - if notifyErr != nil { - multiErr = multiErr.Add(notifyErr) - } else { - i.metrics.blocksNotifyFlushed.Inc(int64(len(flushedBlocks))) - } - i.metrics.tick.Inc(1) return result, multiErr.FinalError() @@ -1093,6 +1074,12 @@ func (i *nsIndex) WarmFlush( zap.Time("blockStart", block.StartTime().ToTime()), ) } + + for _, t := range i.blockStartsFromIndexBlockStart(block.StartTime()) { + for _, s := range shards { + s.MarkWarmIndexFlushStateSuccessOrError(t, err) + } + } } i.metrics.blocksEvictedMutableSegments.Inc(int64(evicted)) return nil @@ -1208,15 +1195,15 @@ func (i *nsIndex) canFlushBlockWithRLock( Debug("skipping index cold flush due to shard not bootstrapped yet") continue } - start := blockStart - end := blockStart.Add(i.blockSize) - dataBlockSize := i.nsMetadata.Options().RetentionOptions().BlockSize() - for t := start; t.Before(end); t = t.Add(dataBlockSize) { + + for _, t := range i.blockStartsFromIndexBlockStart(blockStart) { flushState, err := shard.FlushState(t) if err != nil { return false, err } - if flushState.WarmStatus != fileOpSuccess { + + // Skip if the data flushing failed. Data flushing precedes index flushing. + if flushState.WarmStatus.DataFlushed != fileOpSuccess { return false, nil } } @@ -1225,6 +1212,19 @@ func (i *nsIndex) canFlushBlockWithRLock( return true, nil } +// blockStartsFromIndexBlockStart returns the possibly many blocksStarts that exist within +// a given index block (since index block size >= data block size) +func (i *nsIndex) blockStartsFromIndexBlockStart(blockStart xtime.UnixNano) []xtime.UnixNano { + start := blockStart + end := blockStart.Add(i.blockSize) + dataBlockSize := i.nsMetadata.Options().RetentionOptions().BlockSize() + blockStarts := make([]xtime.UnixNano, 0) + for t := start; t.Before(end); t = t.Add(dataBlockSize) { + blockStarts = append(blockStarts, t) + } + return blockStarts +} + func (i *nsIndex) hasIndexWarmFlushedToDisk( infoFiles map[xtime.UnixNano]fs.ReadIndexInfoFileResult, blockStart xtime.UnixNano, @@ -2508,7 +2508,6 @@ type nsIndexMetrics struct { forwardIndexCounter tally.Counter insertEndToEndLatency tally.Timer blocksEvictedMutableSegments tally.Counter - blocksNotifyFlushed tally.Counter blockMetrics nsIndexBlocksMetrics indexingConcurrencyMin tally.Gauge indexingConcurrencyMax tally.Gauge @@ -2576,7 +2575,6 @@ func newNamespaceIndexMetrics( insertEndToEndLatency: instrument.NewTimer(scope, "insert-end-to-end-latency", iopts.TimerOptions()), blocksEvictedMutableSegments: scope.Counter("blocks-evicted-mutable-segments"), - blocksNotifyFlushed: scope.Counter("blocks-notify-flushed"), blockMetrics: newNamespaceIndexBlocksMetrics(opts, blocksScope), indexingConcurrencyMin: scope.Tagged(map[string]string{ "stat": "min", diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index e4dfe18aec..c5ff50a472 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -280,15 +280,6 @@ func NewBlock( return b, nil } -func (b *block) ActiveBlockNotifyFlushedBlocks( - flushed []xtime.UnixNano, -) error { - if !b.blockOpts.ActiveBlock { - return fmt.Errorf("block not in-memory block: start=%v", b.StartTime()) - } - return b.mutableSegments.NotifyFlushedBlocks(flushed) -} - func (b *block) StartTime() xtime.UnixNano { return b.blockStart } @@ -532,23 +523,21 @@ func (b *block) queryWithSpan( // Ensure that the block contains any of the relevant time segments for the query range. doc := iter.Current() - if md, ok := doc.Metadata(); ok { - if entry, ok := md.Ref.(OnIndexSeries); ok { - var ( - inBlock bool - currentBlock = opts.StartInclusive.Truncate(b.blockSize) - ) - for !inBlock { - inBlock = entry.IndexedForBlockStart(currentBlock) - currentBlock = currentBlock.Add(b.blockSize) - if !currentBlock.Before(opts.EndExclusive) { - break - } + if md, ok := doc.Metadata(); ok && md.OnIndexSeries != nil { + var ( + inBlock bool + currentBlock = opts.StartInclusive.Truncate(b.blockSize) + ) + for !inBlock { + inBlock = md.OnIndexSeries.IndexedForBlockStart(currentBlock) + currentBlock = currentBlock.Add(b.blockSize) + if !currentBlock.Before(opts.EndExclusive) { + break } + } - if !inBlock { - continue - } + if !inBlock { + continue } } diff --git a/src/dbnode/storage/index/block_bench_test.go b/src/dbnode/storage/index/block_bench_test.go index 70b1e68aff..9d3f209bce 100644 --- a/src/dbnode/storage/index/block_bench_test.go +++ b/src/dbnode/storage/index/block_bench_test.go @@ -115,7 +115,7 @@ func BenchmarkBlockWrite(b *testing.B) { // useless to use in benchmarks type mockOnIndexSeries struct{} -var _ OnIndexSeries = mockOnIndexSeries{} +var _ doc.OnIndexSeries = mockOnIndexSeries{} func (m mockOnIndexSeries) OnIndexSuccess(_ xtime.UnixNano) {} func (m mockOnIndexSeries) OnIndexFinalize(_ xtime.UnixNano) {} @@ -133,7 +133,5 @@ func (m mockOnIndexSeries) RemoveIndexedForBlockStarts( ) RemoveIndexedForBlockStartsResult { return RemoveIndexedForBlockStartsResult{} } -func (m mockOnIndexSeries) IndexedOrAttemptedAny() bool { return false } -func (m mockOnIndexSeries) RelookupAndIncrementReaderWriterCount() (OnIndexSeries, bool) { - return m, false -} +func (m mockOnIndexSeries) IndexedOrAttemptedAny() bool { return false } +func (m mockOnIndexSeries) RelookupAndCheckIsEmpty() (bool, bool) { return false, false } diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index bac240fd14..d92ebd040d 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -111,7 +111,7 @@ func TestBlockWriteAfterClose(t *testing.T) { require.NoError(t, err) require.NoError(t, b.Close()) - lifecycle := NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) lifecycle.EXPECT().OnIndexFinalize(blockStart) batch := NewWriteBatch(WriteBatchOptions{ @@ -160,7 +160,7 @@ func TestBlockWriteAfterSeal(t *testing.T) { require.NoError(t, err) require.NoError(t, b.Seal()) - lifecycle := NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) lifecycle.EXPECT().OnIndexFinalize(blockStart) batch := NewWriteBatch(WriteBatchOptions{ @@ -214,11 +214,11 @@ func TestBlockWrite(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) h2.EXPECT().OnIndexSuccess(blockStart) @@ -260,11 +260,11 @@ func TestBlockWriteActualSegmentPartialFailure(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) batch := NewWriteBatch(WriteBatchOptions{ @@ -321,11 +321,11 @@ func TestBlockWritePartialFailure(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) batch := NewWriteBatch(WriteBatchOptions{ @@ -1236,7 +1236,7 @@ func TestBlockNeedsMutableSegmentsEvicted(t *testing.T) { require.False(t, b.NeedsMutableSegmentsEvicted()) // perform write and ensure it says it needs eviction - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(start) h1.EXPECT().OnIndexSuccess(start) batch := NewWriteBatch(WriteBatchOptions{ @@ -1372,11 +1372,11 @@ func TestBlockE2EInsertQuery(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) h2.EXPECT().OnIndexSuccess(blockStart) @@ -1456,14 +1456,14 @@ func TestBlockE2EInsertQueryLimit(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) h1.EXPECT().IndexedForBlockStart(blockStart). Return(true). AnyTimes() - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) h2.EXPECT().OnIndexSuccess(blockStart) h1.EXPECT().IndexedForBlockStart(blockStart). @@ -1548,14 +1548,14 @@ func TestBlockE2EInsertAddResultsQuery(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) h1.EXPECT().IndexedForBlockStart(blockStart). Return(true). AnyTimes() - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) h2.EXPECT().OnIndexSuccess(blockStart) h2.EXPECT().IndexedForBlockStart(blockStart). @@ -1650,7 +1650,7 @@ func TestBlockE2EInsertAddResultsMergeQuery(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) h1.EXPECT().IndexedForBlockStart(blockStart). @@ -1750,12 +1750,15 @@ func TestBlockWriteBackgroundCompact(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) + // Testing compaction only, so mark GC as already running so the test is limited only to compaction. + b.mutableSegments.compact.compactingBackgroundGarbageCollect = true + // First write - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) h2.EXPECT().OnIndexSuccess(blockStart) @@ -1784,7 +1787,7 @@ func TestBlockWriteBackgroundCompact(t *testing.T) { b.Unlock() // Second write - h1 = NewMockOnIndexSeries(ctrl) + h1 = doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) @@ -2181,15 +2184,15 @@ func TestBlockE2EInsertAggregate(t *testing.T) { b, ok := blk.(*block) require.True(t, ok) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) h2.EXPECT().OnIndexSuccess(blockStart) - h3 := NewMockOnIndexSeries(ctrl) + h3 := doc.NewMockOnIndexSeries(ctrl) h3.EXPECT().OnIndexFinalize(blockStart) h3.EXPECT().OnIndexSuccess(blockStart) diff --git a/src/dbnode/storage/index/for_each_test.go b/src/dbnode/storage/index/for_each_test.go index b2dc532922..94e042ab0f 100644 --- a/src/dbnode/storage/index/for_each_test.go +++ b/src/dbnode/storage/index/for_each_test.go @@ -53,7 +53,7 @@ func TestWriteBatchForEachUnmarkedBatchByBlockStart(t *testing.T) { for _, n := range []int64{2, 0, 1} { batch.Append(WriteBatchEntry{ Timestamp: tn(n), - OnIndexSeries: NewMockOnIndexSeries(ctrl), + OnIndexSeries: doc.NewMockOnIndexSeries(ctrl), }, d(n)) } @@ -109,7 +109,7 @@ func TestWriteBatchForEachUnmarkedBatchByBlockStartMore(t *testing.T) { } { batch.Append(WriteBatchEntry{ Timestamp: tn(v.nTime), - OnIndexSeries: NewMockOnIndexSeries(ctrl), + OnIndexSeries: doc.NewMockOnIndexSeries(ctrl), }, d(v.nDoc)) } diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 3c78ffa986..7c6840382c 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -790,148 +790,6 @@ func (mr *MockAggregateValuesPoolMockRecorder) Put(value interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockAggregateValuesPool)(nil).Put), value) } -// MockOnIndexSeries is a mock of OnIndexSeries interface. -type MockOnIndexSeries struct { - ctrl *gomock.Controller - recorder *MockOnIndexSeriesMockRecorder -} - -// MockOnIndexSeriesMockRecorder is the mock recorder for MockOnIndexSeries. -type MockOnIndexSeriesMockRecorder struct { - mock *MockOnIndexSeries -} - -// NewMockOnIndexSeries creates a new mock instance. -func NewMockOnIndexSeries(ctrl *gomock.Controller) *MockOnIndexSeries { - mock := &MockOnIndexSeries{ctrl: ctrl} - mock.recorder = &MockOnIndexSeriesMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockOnIndexSeries) EXPECT() *MockOnIndexSeriesMockRecorder { - return m.recorder -} - -// DecrementReaderWriterCount mocks base method. -func (m *MockOnIndexSeries) DecrementReaderWriterCount() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "DecrementReaderWriterCount") -} - -// DecrementReaderWriterCount indicates an expected call of DecrementReaderWriterCount. -func (mr *MockOnIndexSeriesMockRecorder) DecrementReaderWriterCount() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DecrementReaderWriterCount", reflect.TypeOf((*MockOnIndexSeries)(nil).DecrementReaderWriterCount)) -} - -// IfAlreadyIndexedMarkIndexSuccessAndFinalize mocks base method. -func (m *MockOnIndexSeries) IfAlreadyIndexedMarkIndexSuccessAndFinalize(blockStart time0.UnixNano) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IfAlreadyIndexedMarkIndexSuccessAndFinalize", blockStart) - ret0, _ := ret[0].(bool) - return ret0 -} - -// IfAlreadyIndexedMarkIndexSuccessAndFinalize indicates an expected call of IfAlreadyIndexedMarkIndexSuccessAndFinalize. -func (mr *MockOnIndexSeriesMockRecorder) IfAlreadyIndexedMarkIndexSuccessAndFinalize(blockStart interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IfAlreadyIndexedMarkIndexSuccessAndFinalize", reflect.TypeOf((*MockOnIndexSeries)(nil).IfAlreadyIndexedMarkIndexSuccessAndFinalize), blockStart) -} - -// IndexedForBlockStart mocks base method. -func (m *MockOnIndexSeries) IndexedForBlockStart(indexBlockStart time0.UnixNano) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IndexedForBlockStart", indexBlockStart) - ret0, _ := ret[0].(bool) - return ret0 -} - -// IndexedForBlockStart indicates an expected call of IndexedForBlockStart. -func (mr *MockOnIndexSeriesMockRecorder) IndexedForBlockStart(indexBlockStart interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexedForBlockStart", reflect.TypeOf((*MockOnIndexSeries)(nil).IndexedForBlockStart), indexBlockStart) -} - -// NeedsIndexUpdate mocks base method. -func (m *MockOnIndexSeries) NeedsIndexUpdate(indexBlockStartForWrite time0.UnixNano) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NeedsIndexUpdate", indexBlockStartForWrite) - ret0, _ := ret[0].(bool) - return ret0 -} - -// NeedsIndexUpdate indicates an expected call of NeedsIndexUpdate. -func (mr *MockOnIndexSeriesMockRecorder) NeedsIndexUpdate(indexBlockStartForWrite interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NeedsIndexUpdate", reflect.TypeOf((*MockOnIndexSeries)(nil).NeedsIndexUpdate), indexBlockStartForWrite) -} - -// OnIndexFinalize mocks base method. -func (m *MockOnIndexSeries) OnIndexFinalize(blockStart time0.UnixNano) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "OnIndexFinalize", blockStart) -} - -// OnIndexFinalize indicates an expected call of OnIndexFinalize. -func (mr *MockOnIndexSeriesMockRecorder) OnIndexFinalize(blockStart interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnIndexFinalize", reflect.TypeOf((*MockOnIndexSeries)(nil).OnIndexFinalize), blockStart) -} - -// OnIndexPrepare mocks base method. -func (m *MockOnIndexSeries) OnIndexPrepare(blockStart time0.UnixNano) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "OnIndexPrepare", blockStart) -} - -// OnIndexPrepare indicates an expected call of OnIndexPrepare. -func (mr *MockOnIndexSeriesMockRecorder) OnIndexPrepare(blockStart interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnIndexPrepare", reflect.TypeOf((*MockOnIndexSeries)(nil).OnIndexPrepare), blockStart) -} - -// OnIndexSuccess mocks base method. -func (m *MockOnIndexSeries) OnIndexSuccess(blockStart time0.UnixNano) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "OnIndexSuccess", blockStart) -} - -// OnIndexSuccess indicates an expected call of OnIndexSuccess. -func (mr *MockOnIndexSeriesMockRecorder) OnIndexSuccess(blockStart interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnIndexSuccess", reflect.TypeOf((*MockOnIndexSeries)(nil).OnIndexSuccess), blockStart) -} - -// RelookupAndIncrementReaderWriterCount mocks base method. -func (m *MockOnIndexSeries) RelookupAndIncrementReaderWriterCount() (OnIndexSeries, bool) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RelookupAndIncrementReaderWriterCount") - ret0, _ := ret[0].(OnIndexSeries) - ret1, _ := ret[1].(bool) - return ret0, ret1 -} - -// RelookupAndIncrementReaderWriterCount indicates an expected call of RelookupAndIncrementReaderWriterCount. -func (mr *MockOnIndexSeriesMockRecorder) RelookupAndIncrementReaderWriterCount() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RelookupAndIncrementReaderWriterCount", reflect.TypeOf((*MockOnIndexSeries)(nil).RelookupAndIncrementReaderWriterCount)) -} - -// RemoveIndexedForBlockStarts mocks base method. -func (m *MockOnIndexSeries) RemoveIndexedForBlockStarts(blockStarts map[time0.UnixNano]struct{}) RemoveIndexedForBlockStartsResult { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveIndexedForBlockStarts", blockStarts) - ret0, _ := ret[0].(RemoveIndexedForBlockStartsResult) - return ret0 -} - -// RemoveIndexedForBlockStarts indicates an expected call of RemoveIndexedForBlockStarts. -func (mr *MockOnIndexSeriesMockRecorder) RemoveIndexedForBlockStarts(blockStarts interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveIndexedForBlockStarts", reflect.TypeOf((*MockOnIndexSeries)(nil).RemoveIndexedForBlockStarts), blockStarts) -} - // MockBlock is a mock of Block interface. type MockBlock struct { ctrl *gomock.Controller @@ -955,20 +813,6 @@ func (m *MockBlock) EXPECT() *MockBlockMockRecorder { return m.recorder } -// ActiveBlockNotifyFlushedBlocks mocks base method. -func (m *MockBlock) ActiveBlockNotifyFlushedBlocks(sealed []time0.UnixNano) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ActiveBlockNotifyFlushedBlocks", sealed) - ret0, _ := ret[0].(error) - return ret0 -} - -// ActiveBlockNotifyFlushedBlocks indicates an expected call of ActiveBlockNotifyFlushedBlocks. -func (mr *MockBlockMockRecorder) ActiveBlockNotifyFlushedBlocks(sealed interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ActiveBlockNotifyFlushedBlocks", reflect.TypeOf((*MockBlock)(nil).ActiveBlockNotifyFlushedBlocks), sealed) -} - // AddResults mocks base method. func (m *MockBlock) AddResults(resultsByVolumeType result.IndexBlockByVolumeType) error { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/index/mutable_segments.go b/src/dbnode/storage/index/mutable_segments.go index 72c3e5f2ba..c6868e4c57 100644 --- a/src/dbnode/storage/index/mutable_segments.go +++ b/src/dbnode/storage/index/mutable_segments.go @@ -82,10 +82,6 @@ type mutableSegments struct { optsListener xresource.SimpleCloser writeIndexingConcurrency int - flushedBlockStarts map[xtime.UnixNano]struct{} - backgroundCompactGCPending bool - backgroundCompactDisable bool - metrics mutableSegmentsMetrics logger *zap.Logger } @@ -130,48 +126,19 @@ func newMutableSegments( iopts instrument.Options, ) *mutableSegments { m := &mutableSegments{ - blockStart: blockStart, - blockSize: md.Options().IndexOptions().BlockSize(), - opts: opts, - blockOpts: blockOpts, - compact: mutableSegmentsCompact{opts: opts, blockOpts: blockOpts}, - flushedBlockStarts: make(map[xtime.UnixNano]struct{}), - iopts: iopts, - metrics: newMutableSegmentsMetrics(iopts.MetricsScope()), - logger: iopts.Logger(), + blockStart: blockStart, + blockSize: md.Options().IndexOptions().BlockSize(), + opts: opts, + blockOpts: blockOpts, + compact: mutableSegmentsCompact{opts: opts, blockOpts: blockOpts}, + iopts: iopts, + metrics: newMutableSegmentsMetrics(iopts.MetricsScope()), + logger: iopts.Logger(), } m.optsListener = namespaceRuntimeOptsMgr.RegisterListener(m) return m } -func (m *mutableSegments) NotifyFlushedBlocks( - flushed []xtime.UnixNano, -) error { - if len(flushed) == 0 { - return nil - } - - m.Lock() - updated := false - for _, blockStart := range flushed { - _, exists := m.flushedBlockStarts[blockStart] - if exists { - continue - } - m.flushedBlockStarts[blockStart] = struct{}{} - updated = true - } - if updated { - // Only trigger background compact GC if - // and only if updated the sealed block starts. - m.backgroundCompactGCPending = true - m.maybeBackgroundCompactWithLock() - } - m.Unlock() - - return nil -} - func (m *mutableSegments) SetNamespaceRuntimeOptions(opts namespace.RuntimeOptions) { m.Lock() // Update current runtime opts for segment builders created in future. @@ -229,7 +196,7 @@ func (m *mutableSegments) WriteBatch(inserts *WriteBatch) (MutableSegmentsStats, // Set the doc ref for later recall. for i := range entries { - docs[i].Ref = entries[i].OnIndexSeries + docs[i].OnIndexSeries = entries[i].OnIndexSeries } segmentBuilder.Reset() @@ -393,9 +360,6 @@ func (m *mutableSegments) Close() { } func (m *mutableSegments) maybeBackgroundCompactWithLock() { - if m.backgroundCompactDisable { - return - } if m.compact.compactingBackgroundStandard { return } @@ -429,19 +393,13 @@ func (m *mutableSegments) backgroundCompactWithLock() { } var ( - gcRequired = false - gcPlan = &compaction.Plan{} - gcAlreadyRunning = m.compact.compactingBackgroundGarbageCollect - flushedBlockStarts = make(map[xtime.UnixNano]struct{}, len(m.flushedBlockStarts)) + gcRequired = false + gcPlan = &compaction.Plan{} + gcAlreadyRunning = m.compact.compactingBackgroundGarbageCollect ) - // Take copy of sealed block starts so can act on this - // async. - for k, v := range m.flushedBlockStarts { - flushedBlockStarts[k] = v - } - if !gcAlreadyRunning && m.backgroundCompactGCPending { + + if !gcAlreadyRunning { gcRequired = true - m.backgroundCompactGCPending = false for _, seg := range m.backgroundSegments { alreadyHasTask := false @@ -488,8 +446,7 @@ func (m *mutableSegments) backgroundCompactWithLock() { // Kick off compaction. m.compact.compactingBackgroundStandard = true go func() { - m.backgroundCompactWithPlan(plan, m.compact.backgroundCompactors, - gcRequired, flushedBlockStarts) + m.backgroundCompactWithPlan(plan, m.compact.backgroundCompactors, gcRequired) m.Lock() m.compact.compactingBackgroundStandard = false @@ -508,8 +465,7 @@ func (m *mutableSegments) backgroundCompactWithLock() { l.Error("error background gc segments", zap.Error(err)) }) } else { - m.backgroundCompactWithPlan(gcPlan, compactors, - gcRequired, flushedBlockStarts) + m.backgroundCompactWithPlan(gcPlan, compactors, gcRequired) m.closeCompactors(compactors) } @@ -580,7 +536,6 @@ func (m *mutableSegments) backgroundCompactWithPlan( plan *compaction.Plan, compactors chan *compaction.Compactor, gcRequired bool, - flushedBlocks map[xtime.UnixNano]struct{}, ) { sw := m.metrics.backgroundCompactionPlanRunLatency.Start() defer sw.Stop() @@ -617,7 +572,7 @@ func (m *mutableSegments) backgroundCompactWithPlan( wg.Done() }() err := m.backgroundCompactWithTask(task, compactor, gcRequired, - flushedBlocks, log, logger.With(zap.Int("task", i))) + log, logger.With(zap.Int("task", i))) if err != nil { instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) { l.Error("error compacting segments", zap.Error(err)) @@ -638,7 +593,6 @@ func (m *mutableSegments) backgroundCompactWithTask( task compaction.Task, compactor *compaction.Compactor, gcRequired bool, - flushedBlocks map[xtime.UnixNano]struct{}, log bool, logger *zap.Logger, ) error { @@ -657,22 +611,14 @@ func (m *mutableSegments) backgroundCompactWithTask( documentsFilter = segment.DocumentsFilterFn(func(d doc.Metadata) bool { // Filter out any documents that only were indexed for // sealed blocks. - if d.Ref == nil { - instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) { - l.Error("unexpected nil for document ref for background compact") - }) - return true - } - - entry, ok := d.Ref.(OnIndexSeries) - if !ok { + if d.OnIndexSeries == nil { instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) { - l.Error("unexpected type for document ref for background compact") + l.Error("unexpected nil for document index entry for background compact") }) return true } - latestEntry, ok := entry.RelookupAndIncrementReaderWriterCount() + isEmpty, ok := d.OnIndexSeries.RelookupAndCheckIsEmpty() if !ok { // Should not happen since shard will not expire until // no more block starts are indexed. @@ -680,15 +626,14 @@ func (m *mutableSegments) backgroundCompactWithTask( // we open up a race condition where the entry is not // in the shard yet and we GC it since we can't find it // due to an asynchronous insert. + instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) { + l.Error("unexpected checking series entry does not exist") + }) return true } - result := latestEntry.RemoveIndexedForBlockStarts(flushedBlocks) - latestEntry.DecrementReaderWriterCount() - - // Keep the series if and only if there are remaining - // index block starts outside of the sealed blocks starts. - return result.IndexedBlockStartsRemaining > 0 + // Keep if not yet empty (i.e. there is still in-memory data associated with the series). + return !isEmpty }) } diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index f617242dc8..9371ded4f9 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -365,52 +365,6 @@ type AggregateResultsEntry struct { Terms []ident.ID } -// OnIndexSeries provides a set of callback hooks to allow the reverse index -// to do lifecycle management of any resources retained during indexing. -type OnIndexSeries interface { - // OnIndexSuccess is executed when an entry is successfully indexed. The - // provided value for `blockStart` is the blockStart for which the write - // was indexed. - OnIndexSuccess(blockStart xtime.UnixNano) - - // OnIndexFinalize is executed when the index no longer holds any references - // to the provided resources. It can be used to cleanup any resources held - // during the course of indexing. `blockStart` is the startTime of the index - // block for which the write was attempted. - OnIndexFinalize(blockStart xtime.UnixNano) - - // OnIndexPrepare prepares the Entry to be handed off to the indexing sub-system. - // NB(prateek): we retain the ref count on the entry while the indexing is pending, - // the callback executed on the entry once the indexing is completed releases this - // reference. - OnIndexPrepare(blockStart xtime.UnixNano) - - // NeedsIndexUpdate returns a bool to indicate if the Entry needs to be indexed - // for the provided blockStart. It only allows a single index attempt at a time - // for a single entry. - // NB(prateek): NeedsIndexUpdate is a CAS, i.e. when this method returns true, it - // also sets state on the entry to indicate that a write for the given blockStart - // is going to be sent to the index, and other go routines should not attempt the - // same write. Callers are expected to ensure they follow this guideline. - // Further, every call to NeedsIndexUpdate which returns true needs to have a corresponding - // OnIndexFinalze() call. This is required for correct lifecycle maintenance. - NeedsIndexUpdate(indexBlockStartForWrite xtime.UnixNano) bool - - IfAlreadyIndexedMarkIndexSuccessAndFinalize( - blockStart xtime.UnixNano, - ) bool - - RemoveIndexedForBlockStarts( - blockStarts map[xtime.UnixNano]struct{}, - ) RemoveIndexedForBlockStartsResult - - RelookupAndIncrementReaderWriterCount() (OnIndexSeries, bool) - - DecrementReaderWriterCount() - - IndexedForBlockStart(indexBlockStart xtime.UnixNano) bool -} - // RemoveIndexedForBlockStartsResult is the result from calling // RemoveIndexedForBlockStarts. type RemoveIndexedForBlockStartsResult struct { @@ -459,10 +413,6 @@ type Block interface { // AddResults adds bootstrap results to the block. AddResults(resultsByVolumeType result.IndexBlockByVolumeType) error - // ActiveBlockNotifyFlushedBlocks notifies an active in-memory block of - // sealed blocks. - ActiveBlockNotifyFlushedBlocks(sealed []xtime.UnixNano) error - // Tick does internal house keeping operations. Tick(c context.Cancellable) (BlockTickResult, error) @@ -943,7 +893,7 @@ type WriteBatchEntry struct { Timestamp xtime.UnixNano // OnIndexSeries is a listener/callback for when this entry is marked done // it is set to nil when the entry is marked done - OnIndexSeries OnIndexSeries + OnIndexSeries doc.OnIndexSeries // EnqueuedAt is the timestamp that this entry was enqueued for indexing // so that we can calculate the latency it takes to index the entry EnqueuedAt time.Time diff --git a/src/dbnode/storage/index/write_batch_test.go b/src/dbnode/storage/index/write_batch_test.go index 1f7edef18a..323896e3f5 100644 --- a/src/dbnode/storage/index/write_batch_test.go +++ b/src/dbnode/storage/index/write_batch_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/golang/mock/gomock" + "github.com/m3db/m3/src/m3ninx/doc" xtime "github.com/m3db/m3/src/x/time" ) @@ -44,14 +45,14 @@ func TestWriteBatchSortByUnmarkedAndIndexBlockStart(t *testing.T) { Truncate(blockSize). Add(time.Minute) - h1 := NewMockOnIndexSeries(ctrl) + h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h2 := NewMockOnIndexSeries(ctrl) + h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) - h3 := NewMockOnIndexSeries(ctrl) + h3 := doc.NewMockOnIndexSeries(ctrl) h3.EXPECT().OnIndexFinalize(blockStart) h3.EXPECT().OnIndexSuccess(blockStart) diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index caa7de660f..6b6029b410 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -98,7 +98,7 @@ func testWriteBatchEntry( id ident.ID, tags ident.Tags, timestamp xtime.UnixNano, - fns index.OnIndexSeries, + fns doc.OnIndexSeries, ) (index.WriteBatchEntry, doc.Metadata) { d := doc.Metadata{ID: copyBytes(id.Bytes())} for _, tag := range tags.Values() { @@ -256,7 +256,7 @@ func TestNamespaceIndexWrite(t *testing.T) { id := ident.StringID("foo") tag := ident.StringTag("name", "value") tags := ident.NewTags(tag) - lifecycle := index.NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) mockWriteBatch(t, &now, lifecycle, mockBlock, &tag) lifecycle.EXPECT().IfAlreadyIndexedMarkIndexSuccessAndFinalize(gomock.Any()).Return(false) batch := index.NewWriteBatch(index.WriteBatchOptions{ @@ -325,7 +325,7 @@ func TestNamespaceIndexWriteCreatesBlock(t *testing.T) { id := ident.StringID("foo") tag := ident.StringTag("name", "value") tags := ident.NewTags(tag) - lifecycle := index.NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) mockWriteBatch(t, &now, lifecycle, bActive, &tag) lifecycle.EXPECT().IfAlreadyIndexedMarkIndexSuccessAndFinalize(gomock.Any()). Return(false). @@ -460,7 +460,6 @@ func TestNamespaceIndexTickExpire(t *testing.T) { c := context.NewCancellable() bActive.EXPECT().Tick(c).Return(index.BlockTickResult{}, nil) - bActive.EXPECT().ActiveBlockNotifyFlushedBlocks(gomock.Any()).Return(nil) b0.EXPECT().Close().Return(nil) @@ -531,9 +530,6 @@ func TestNamespaceIndexTick(t *testing.T) { }, nil). AnyTimes() bActive.EXPECT().IsSealed().Return(false).AnyTimes() - bActive.EXPECT().ActiveBlockNotifyFlushedBlocks(gomock.Any()). - Return(nil). - AnyTimes() b0.EXPECT().Tick(c). Return(index.BlockTickResult{ @@ -1599,7 +1595,7 @@ func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { func mockWriteBatch(t *testing.T, now *xtime.UnixNano, - lifecycle *index.MockOnIndexSeries, + lifecycle *doc.MockOnIndexSeries, block *index.MockBlock, tag *ident.Tag, ) { diff --git a/src/dbnode/storage/index_insert_queue_test.go b/src/dbnode/storage/index_insert_queue_test.go index b52aac8052..b69a3005c3 100644 --- a/src/dbnode/storage/index_insert_queue_test.go +++ b/src/dbnode/storage/index_insert_queue_test.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" @@ -89,7 +90,7 @@ func TestIndexInsertQueueCallback(t *testing.T) { q = newTestIndexInsertQueue(newTestNamespaceMetadata(t)) insertLock sync.Mutex insertedBatches []*index.WriteBatch - callback = index.NewMockOnIndexSeries(ctrl) + callback = doc.NewMockOnIndexSeries(ctrl) ) q.indexBatchFn = func(inserts *index.WriteBatch) { insertLock.Lock() @@ -150,7 +151,7 @@ func TestIndexInsertQueueBatchBackoff(t *testing.T) { } q.indexBatchBackoff = backoff - callback := index.NewMockOnIndexSeries(ctrl) + callback := doc.NewMockOnIndexSeries(ctrl) var slept time.Duration var numSleeps int diff --git a/src/dbnode/storage/index_query_concurrent_test.go b/src/dbnode/storage/index_query_concurrent_test.go index fdae6c74f8..49048eabc6 100644 --- a/src/dbnode/storage/index_query_concurrent_test.go +++ b/src/dbnode/storage/index_query_concurrent_test.go @@ -162,7 +162,7 @@ func testNamespaceIndexHighConcurrentQueries( var onIndexWg sync.WaitGroup onIndexWg.Add(idsPerBlock) - onIndexSeries := index.NewMockOnIndexSeries(ctrl) + onIndexSeries := doc.NewMockOnIndexSeries(ctrl) onIndexSeries.EXPECT(). OnIndexSuccess(gomock.Any()). Times(idsPerBlock). diff --git a/src/dbnode/storage/index_queue_forward_write_test.go b/src/dbnode/storage/index_queue_forward_write_test.go index c083128f9b..ea1138fdca 100644 --- a/src/dbnode/storage/index_queue_forward_write_test.go +++ b/src/dbnode/storage/index_queue_forward_write_test.go @@ -107,7 +107,7 @@ func setupForwardIndex( tags = ident.NewTags( ident.StringTag("name", "value"), ) - lifecycle = index.NewMockOnIndexSeries(ctrl) + lifecycle = doc.NewMockOnIndexSeries(ctrl) ) gomock.InOrder( @@ -282,7 +282,7 @@ func setupMockBlock( ts xtime.UnixNano, id ident.ID, tag ident.Tag, - lifecycle index.OnIndexSeries, + lifecycle doc.OnIndexSeries, ) { bl.EXPECT(). WriteBatch(gomock.Any()). @@ -377,7 +377,7 @@ func TestNamespaceIndexForwardWrite(t *testing.T) { id := ident.StringID("foo") tag := ident.StringTag("name", "value") tags := ident.NewTags(tag) - lifecycle := index.NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) var ( ts = idx.(*nsIndex).state.latestBlock.StartTime() @@ -420,7 +420,7 @@ func TestNamespaceIndexForwardWriteCreatesBlock(t *testing.T) { id := ident.StringID("foo") tag := ident.StringTag("name", "value") tags := ident.NewTags(tag) - lifecycle := index.NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) var ( ts = idx.(*nsIndex).state.latestBlock.StartTime() @@ -598,7 +598,7 @@ func testShardForwardWriteTaggedSyncRefCount( // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() require.NoError(t, err) require.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -614,7 +614,7 @@ func testShardForwardWriteTaggedSyncRefCount( // // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() require.NoError(t, err) require.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -658,7 +658,7 @@ func testShardForwardWriteTaggedAsyncRefCount( // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() require.NoError(t, err) require.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -673,7 +673,7 @@ func testShardForwardWriteTaggedAsyncRefCount( // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() require.NoError(t, err) require.Equal(t, int32(0), entry.ReaderWriterCount(), id) diff --git a/src/dbnode/storage/index_queue_test.go b/src/dbnode/storage/index_queue_test.go index ea181e61a6..cd8790ae20 100644 --- a/src/dbnode/storage/index_queue_test.go +++ b/src/dbnode/storage/index_queue_test.go @@ -145,7 +145,7 @@ func TestNamespaceIndexWriteAfterClose(t *testing.T) { now := xtime.Now() - lifecycle := index.NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) lifecycle.EXPECT().OnIndexFinalize(now.Truncate(idx.blockSize)) lifecycle.EXPECT().IfAlreadyIndexedMarkIndexSuccessAndFinalize(gomock.Any()). Return(false). @@ -169,7 +169,7 @@ func TestNamespaceIndexWriteQueueError(t *testing.T) { ) n := xtime.Now() - lifecycle := index.NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) lifecycle.EXPECT().OnIndexFinalize(n.Truncate(idx.blockSize)) lifecycle.EXPECT().IfAlreadyIndexedMarkIndexSuccessAndFinalize(gomock.Any()).Return(false) q.EXPECT(). @@ -213,7 +213,7 @@ func TestNamespaceIndexInsertOlderThanRetentionPeriod(t *testing.T) { tags = ident.NewTags( ident.StringTag("name", "value"), ) - lifecycle = index.NewMockOnIndexSeries(ctrl) + lifecycle = doc.NewMockOnIndexSeries(ctrl) ) tooOld := now.Add(-1 * idx.bufferPast).Add(-1 * time.Second) @@ -279,7 +279,7 @@ func TestNamespaceIndexInsertQueueInteraction(t *testing.T) { now := xtime.Now() var wg sync.WaitGroup - lifecycle := index.NewMockOnIndexSeries(ctrl) + lifecycle := doc.NewMockOnIndexSeries(ctrl) q.EXPECT().InsertBatch(gomock.Any()).Return(&wg, nil) lifecycle.EXPECT().IfAlreadyIndexedMarkIndexSuccessAndFinalize(gomock.Any()). Return(false). @@ -319,7 +319,7 @@ func setupIndex(t *testing.T, tags = ident.NewTags( ident.StringTag("name", "value"), ) - lifecycleFns = index.NewMockOnIndexSeries(ctrl) + lifecycleFns = doc.NewMockOnIndexSeries(ctrl) ) lifecycleFns.EXPECT().OnIndexFinalize(ts) diff --git a/src/dbnode/storage/index_test.go b/src/dbnode/storage/index_test.go index e5d0d5c543..9802a54d82 100644 --- a/src/dbnode/storage/index_test.go +++ b/src/dbnode/storage/index_test.go @@ -644,12 +644,15 @@ func TestNamespaceIndexFlushShardStateNotSuccess(t *testing.T) { mockShard := NewMockdatabaseShard(ctrl) mockShard.EXPECT().IsBootstrapped().Return(true).AnyTimes() mockShard.EXPECT().ID().Return(uint32(0)).AnyTimes() - mockShard.EXPECT().FlushState(gomock.Any()).Return(fileOpState{WarmStatus: fileOpFailed}, nil).AnyTimes() + mockShard.EXPECT().FlushState(gomock.Any()).Return(fileOpState{WarmStatus: warmStatus{ + IndexFlushed: fileOpFailed, + }}, nil).AnyTimes() shards := []databaseShard{mockShard} mockFlush := persist.NewMockIndexFlush(ctrl) - require.NoError(t, idx.WarmFlush(mockFlush, shards)) + err := idx.WarmFlush(mockFlush, shards) + require.NoError(t, err) } func TestNamespaceIndexQueryNoMatchingBlocks(t *testing.T) { @@ -811,7 +814,9 @@ func TestNamespaceIndexFlushSkipBootstrappingShards(t *testing.T) { mockShard.EXPECT().IsBootstrapped().Return(shardInfo.isBootstrapped).AnyTimes() mockShard.EXPECT().ID().Return(shardInfo.id).AnyTimes() if shardInfo.isBootstrapped { - mockShard.EXPECT().FlushState(gomock.Any()).Return(fileOpState{WarmStatus: fileOpSuccess}, nil).AnyTimes() + mockShard.EXPECT().FlushState(gomock.Any()).Return(fileOpState{WarmStatus: warmStatus{ + IndexFlushed: fileOpSuccess, + }}, nil).AnyTimes() } shards = append(shards, mockShard) } @@ -902,8 +907,14 @@ func verifyFlushForShards( for _, mockShard := range mockShards { mockShard.EXPECT().IsBootstrapped().Return(true) - mockShard.EXPECT().FlushState(blockStart).Return(fileOpState{WarmStatus: fileOpSuccess}, nil) - mockShard.EXPECT().FlushState(blockStart.Add(blockSize)).Return(fileOpState{WarmStatus: fileOpSuccess}, nil) + mockShard.EXPECT().FlushState(blockStart).Return(fileOpState{WarmStatus: warmStatus{ + // Index flushing requires data flush already happened. + DataFlushed: fileOpSuccess, + }}, nil) + mockShard.EXPECT().FlushState(blockStart.Add(blockSize)).Return(fileOpState{WarmStatus: warmStatus{ + // Index flushing requires data flush already happened. + DataFlushed: fileOpSuccess, + }}, nil) resultsTags1 := ident.NewTagsIterator(ident.NewTags()) resultsTags2 := ident.NewTagsIterator(ident.NewTags()) @@ -925,13 +936,19 @@ func verifyFlushForShards( mockShard.EXPECT().FetchBlocksMetadataV2(gomock.Any(), blockStart, blockStart.Add(idx.blockSize), gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{OnlyDisk: true}).Return(results, nil, nil) + + // For a given index block, which in this test is 2x the size of a block, we expect that + // we mark as flushed 2 blockStarts that fall within the index block. + mockShard.EXPECT().MarkWarmIndexFlushStateSuccessOrError(blockStart, nil) + mockShard.EXPECT().MarkWarmIndexFlushStateSuccessOrError(blockStart.Add(blockSize), nil) } mockBlock.EXPECT().IsSealed().Return(true) mockBlock.EXPECT().AddResults(gomock.Any()).Return(nil) mockBlock.EXPECT().EvictMutableSegments().Return(nil) } - require.NoError(t, idx.WarmFlush(mockFlush, dbShards)) + err := idx.WarmFlush(mockFlush, dbShards) + require.NoError(t, err) require.Equal(t, numBlocks, persistClosedTimes) require.Equal(t, numBlocks, persistCalledTimes) require.Equal(t, expectedDocs, actualDocs) diff --git a/src/dbnode/storage/series/lookup/lookup_mock.go b/src/dbnode/storage/lookup_mock.go similarity index 95% rename from src/dbnode/storage/series/lookup/lookup_mock.go rename to src/dbnode/storage/lookup_mock.go index 77e76f9073..57f9dd3a25 100644 --- a/src/dbnode/storage/series/lookup/lookup_mock.go +++ b/src/dbnode/storage/lookup_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/dbnode/storage/series/lookup (interfaces: IndexWriter) +// Source: github.com/m3db/m3/src/dbnode/storage (interfaces: IndexWriter) // Copyright (c) 2021 Uber Technologies, Inc. // @@ -21,8 +21,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Package lookup is a generated GoMock package. -package lookup +// Package storage is a generated GoMock package. +package storage import ( "reflect" diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 0cd2a3b63b..3a57c4104f 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1221,7 +1221,7 @@ func (n *dbNamespace) WarmFlush( return err } // skip flushing if the shard has already flushed data for the `blockStart` - if flushState.WarmStatus == fileOpSuccess { + if flushState.WarmStatus.DataFlushed == fileOpSuccess { continue } @@ -1499,7 +1499,7 @@ func (n *dbNamespace) needsFlushWithLock( if err != nil { return false, err } - if flushState.WarmStatus != fileOpSuccess { + if flushState.WarmStatus.DataFlushed != fileOpSuccess { return true, nil } } diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 74abfcc679..435b7517c7 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -607,7 +607,8 @@ func testNamespaceBootstrapUnfulfilledShards( func TestNamespaceFlushNotBootstrapped(t *testing.T) { ns, closer := newTestNamespace(t) defer closer() - require.Equal(t, errNamespaceNotBootstrapped, ns.WarmFlush(xtime.Now(), nil)) + err := ns.WarmFlush(xtime.Now(), nil) + require.Equal(t, errNamespaceNotBootstrapped, err) require.Equal(t, errNamespaceNotBootstrapped, ns.ColdFlush(nil)) } @@ -617,7 +618,8 @@ func TestNamespaceFlushDontNeedFlush(t *testing.T) { defer close() ns.bootstrapState = Bootstrapped - require.NoError(t, ns.WarmFlush(xtime.Now(), nil)) + err := ns.WarmFlush(xtime.Now(), nil) + require.NoError(t, err) require.NoError(t, ns.ColdFlush(nil)) } @@ -627,7 +629,8 @@ func TestNamespaceSkipFlushIfReadOnly(t *testing.T) { ns.bootstrapState = Bootstrapped ns.SetReadOnly(true) - require.NoError(t, ns.WarmFlush(xtime.Now(), nil)) + err := ns.WarmFlush(xtime.Now(), nil) + require.NoError(t, err) require.NoError(t, ns.ColdFlush(nil)) } @@ -645,20 +648,21 @@ func TestNamespaceFlushSkipFlushed(t *testing.T) { blockStart := xtime.Now().Truncate(ns.Options().RetentionOptions().BlockSize()) states := []fileOpState{ - {WarmStatus: fileOpNotStarted}, - {WarmStatus: fileOpSuccess}, + {WarmStatus: warmStatus{DataFlushed: fileOpNotStarted}}, + {WarmStatus: warmStatus{DataFlushed: fileOpSuccess}}, } for i, s := range states { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() shard.EXPECT().FlushState(blockStart).Return(s, nil) - if s.WarmStatus != fileOpSuccess { + if s.WarmStatus.DataFlushed != fileOpSuccess { shard.EXPECT().WarmFlush(blockStart, gomock.Any(), gomock.Any()).Return(nil) } ns.shards[testShardIDs[i].ID()] = shard } - require.NoError(t, ns.WarmFlush(blockStart, nil)) + err := ns.WarmFlush(blockStart, nil) + require.NoError(t, err) } func TestNamespaceFlushSkipShardNotBootstrapped(t *testing.T) { @@ -679,7 +683,8 @@ func TestNamespaceFlushSkipShardNotBootstrapped(t *testing.T) { shard.EXPECT().IsBootstrapped().Return(false) ns.shards[testShardIDs[0].ID()] = shard - require.NoError(t, ns.WarmFlush(blockStart, nil)) + err := ns.WarmFlush(blockStart, nil) + require.NoError(t, err) require.NoError(t, ns.ColdFlush(nil)) } @@ -1007,11 +1012,15 @@ func setShardExpects(ns *dbNamespace, ctrl *gomock.Controller, cases []needsFlus for t, needFlush := range cs.needsFlush { if needFlush { shard.EXPECT().FlushState(t).Return(fileOpState{ - WarmStatus: fileOpNotStarted, + WarmStatus: warmStatus{ + DataFlushed: fileOpNotStarted, + }, }, nil).AnyTimes() } else { shard.EXPECT().FlushState(t).Return(fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, }, nil).AnyTimes() } } @@ -1140,7 +1149,9 @@ func TestNamespaceNeedsFlushAllSuccess(t *testing.T) { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().ID().Return(s.ID()).AnyTimes() shard.EXPECT().FlushState(blockStart).Return(fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, }, nil).AnyTimes() ns.shards[s.ID()] = shard } @@ -1184,15 +1195,21 @@ func TestNamespaceNeedsFlushAnyFailed(t *testing.T) { switch shard.ID() { case shards[0].ID(): shard.EXPECT().FlushState(blockStart).Return(fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, }, nil).AnyTimes() case shards[1].ID(): shard.EXPECT().FlushState(blockStart).Return(fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, }, nil).AnyTimes() case shards[2].ID(): shard.EXPECT().FlushState(blockStart).Return(fileOpState{ - WarmStatus: fileOpFailed, + WarmStatus: warmStatus{ + DataFlushed: fileOpFailed, + }, NumFailures: 999, }, nil).AnyTimes() } @@ -1238,15 +1255,21 @@ func TestNamespaceNeedsFlushAnyNotStarted(t *testing.T) { switch shard.ID() { case shards[0].ID(): shard.EXPECT().FlushState(blockStart).Return(fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, }, nil).AnyTimes() case shards[1].ID(): shard.EXPECT().FlushState(blockStart).Return(fileOpState{ - WarmStatus: fileOpNotStarted, + WarmStatus: warmStatus{ + DataFlushed: fileOpNotStarted, + }, }, nil).AnyTimes() case shards[2].ID(): shard.EXPECT().FlushState(blockStart).Return(fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, }, nil).AnyTimes() } ns.shards[s.ID()] = shard diff --git a/src/dbnode/storage/series_resolver.go b/src/dbnode/storage/series_resolver.go index 8e3b2798c5..a425a2f180 100644 --- a/src/dbnode/storage/series_resolver.go +++ b/src/dbnode/storage/series_resolver.go @@ -25,12 +25,11 @@ import ( "sync" "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/x/ident" ) // retrieveWritableSeriesFn represents the function to retrieve series entry. -type retrieveWritableSeriesFn func(id ident.ID) (*lookup.Entry, error) +type retrieveWritableSeriesFn func(id ident.ID) (*Entry, error) type seriesResolver struct { sync.RWMutex @@ -41,7 +40,7 @@ type seriesResolver struct { resolved bool resolvedErr error - entry *lookup.Entry + entry *Entry } // NewSeriesResolver creates new series ref resolver. diff --git a/src/dbnode/storage/series_resolver_test.go b/src/dbnode/storage/series_resolver_test.go index b55e6581f6..99b9198b7a 100644 --- a/src/dbnode/storage/series_resolver_test.go +++ b/src/dbnode/storage/series_resolver_test.go @@ -27,14 +27,13 @@ import ( "github.com/stretchr/testify/require" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/x/ident" ) func TestResolveError(t *testing.T) { wg := sync.WaitGroup{} id := ident.StringID("foo") - sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*Entry, error) { return nil, fmt.Errorf("unable to resolve series") }) _, err := sut.SeriesRef() @@ -44,7 +43,7 @@ func TestResolveError(t *testing.T) { func TestResolveNilEntry(t *testing.T) { wg := sync.WaitGroup{} id := ident.StringID("foo") - sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*Entry, error) { return nil, nil }) _, err := sut.SeriesRef() @@ -54,53 +53,53 @@ func TestResolveNilEntry(t *testing.T) { func TestResolve(t *testing.T) { wg := sync.WaitGroup{} id := ident.StringID("foo") - sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { - return lookup.NewEntry(lookup.NewEntryOptions{ + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*Entry, error) { + return NewEntry(NewEntryOptions{ Index: 11, }), nil }) seriesRef, err := sut.SeriesRef() require.NoError(t, err) - require.IsType(t, &lookup.Entry{}, seriesRef) - entry := seriesRef.(*lookup.Entry) + require.IsType(t, &Entry{}, seriesRef) + entry := seriesRef.(*Entry) require.Equal(t, uint64(11), entry.Index) } func TestSecondResolveWontWait(t *testing.T) { wg := sync.WaitGroup{} id := ident.StringID("foo") - sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { - return lookup.NewEntry(lookup.NewEntryOptions{ + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*Entry, error) { + return NewEntry(NewEntryOptions{ Index: 11, }), nil }) seriesRef, err := sut.SeriesRef() require.NoError(t, err) - require.IsType(t, &lookup.Entry{}, seriesRef) - entry := seriesRef.(*lookup.Entry) + require.IsType(t, &Entry{}, seriesRef) + entry := seriesRef.(*Entry) require.Equal(t, uint64(11), entry.Index) wg.Add(1) seriesRef2, err := sut.SeriesRef() require.NoError(t, err) - require.IsType(t, &lookup.Entry{}, seriesRef2) - entry2 := seriesRef2.(*lookup.Entry) + require.IsType(t, &Entry{}, seriesRef2) + entry2 := seriesRef2.(*Entry) require.Equal(t, entry, entry2) } func TestReleaseRef(t *testing.T) { wg := sync.WaitGroup{} id := ident.StringID("foo") - sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { - entry := lookup.NewEntry(lookup.NewEntryOptions{}) + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*Entry, error) { + entry := NewEntry(NewEntryOptions{}) entry.IncrementReaderWriterCount() return entry, nil }) seriesRef, err := sut.SeriesRef() require.NoError(t, err) - require.IsType(t, &lookup.Entry{}, seriesRef) + require.IsType(t, &Entry{}, seriesRef) - entry := seriesRef.(*lookup.Entry) + entry := seriesRef.(*Entry) require.Equal(t, int32(1), entry.ReaderWriterCount()) err = sut.ReleaseRef() require.NoError(t, err) @@ -110,7 +109,7 @@ func TestReleaseRef(t *testing.T) { func TestReleaseRefError(t *testing.T) { wg := sync.WaitGroup{} id := ident.StringID("foo") - sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*Entry, error) { return nil, fmt.Errorf("unable to resolve series") }) err := sut.ReleaseRef() @@ -120,8 +119,8 @@ func TestReleaseRefError(t *testing.T) { func TestReleaseRefWithoutSeriesRef(t *testing.T) { wg := sync.WaitGroup{} id := ident.StringID("foo") - sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { - entry := lookup.NewEntry(lookup.NewEntryOptions{}) + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*Entry, error) { + entry := NewEntry(NewEntryOptions{}) entry.IncrementReaderWriterCount() return entry, nil }) diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go index 52a2b6ab04..ef242c7080 100644 --- a/src/dbnode/storage/series_wired_list_interaction_test.go +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -29,7 +29,6 @@ import ( "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/series" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" @@ -116,7 +115,7 @@ func TestSeriesWiredListConcurrentInteractions(t *testing.T) { require.NoError(t, err) shard.Lock() - shard.insertNewShardEntryWithLock(lookup.NewEntry(lookup.NewEntryOptions{ + shard.insertNewShardEntryWithLock(NewEntry(NewEntryOptions{ Series: seriesEntry, })) shard.Unlock() diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 10bc935234..a22825a2aa 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -43,7 +43,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -223,9 +222,9 @@ func newDatabaseShardMetrics(shardID uint32, scope tally.Scope) dbShardMetrics { } } -type dbShardEntryWorkFn func(entry *lookup.Entry) bool +type dbShardEntryWorkFn func(entry *Entry) bool -type dbShardEntryBatchWorkFn func(entries []*lookup.Entry) bool +type dbShardEntryBatchWorkFn func(entries []*Entry) bool type shardListElement *list.Element @@ -376,7 +375,21 @@ func (s *dbShard) hasWarmFlushed(blockStart xtime.UnixNano) (bool, error) { if err != nil { return false, err } - return statusIsRetrievable(flushState.WarmStatus), nil + return s.warmStatusIsRetrievable(flushState.WarmStatus), nil +} + +func (s *dbShard) warmStatusIsRetrievable(status warmStatus) bool { + if !statusIsRetrievable(status.DataFlushed) { + return false + } + + // If the index is disabled, then we only are tracking data flushing. + // Otherwise, warm status requires both data and index flushed. + if !s.namespace.Options().IndexOptions().Enabled() { + return true + } + + return statusIsRetrievable(status.IndexFlushed) } func statusIsRetrievable(status fileOpStatus) bool { @@ -425,7 +438,7 @@ func (s *dbShard) blockStatesSnapshotWithRLock() series.ShardBlockStateSnapshot snapshot := make(map[xtime.UnixNano]series.BlockState, len(s.flushState.statesByTime)) for time, state := range s.flushState.statesByTime { snapshot[time] = series.BlockState{ - WarmRetrievable: statusIsRetrievable(state.WarmStatus), + WarmRetrievable: s.warmStatusIsRetrievable(state.WarmStatus), // Use ColdVersionRetrievable instead of ColdVersionFlushed since the snapshot // will be used to make eviction decisions and we don't want to evict data before // it is retrievable. @@ -446,7 +459,7 @@ func (s *dbShard) OnRetrieveBlock( nsCtx namespace.Context, ) { s.RLock() - entry, _, err := s.lookupEntryWithLock(id) + entry, err := s.lookupEntryWithLock(id) if entry != nil { entry.IncrementReaderWriterCount() defer entry.DecrementReaderWriterCount() @@ -500,7 +513,7 @@ func (s *dbShard) OnRetrieveBlock( func (s *dbShard) OnEvictedFromWiredList(id ident.ID, blockStart xtime.UnixNano) { s.RLock() - entry, _, err := s.lookupEntryWithLock(id) + entry, err := s.lookupEntryWithLock(id) s.RUnlock() if err != nil && err != errShardEntryNotFound { @@ -524,8 +537,8 @@ func (s *dbShard) OnEvictedFromWiredList(id ident.ID, blockStart xtime.UnixNano) entry.Series.OnEvictedFromWiredList(id, blockStart) } -func (s *dbShard) forEachShardEntry(entryFn dbShardEntryWorkFn) error { - return s.forEachShardEntryBatch(func(currEntries []*lookup.Entry) bool { +func (s *dbShard) forEachShardEntry(entryFn dbShardEntryWorkFn) { + s.forEachShardEntryBatch(func(currEntries []*Entry) bool { for _, entry := range currEntries { if continueForEach := entryFn(entry); !continueForEach { return false @@ -543,7 +556,7 @@ func iterateBatchSize(elemsLen int) int { return int(math.Max(shardIterateBatchMinSize, t)) } -func (s *dbShard) forEachShardEntryBatch(entriesBatchFn dbShardEntryBatchWorkFn) error { +func (s *dbShard) forEachShardEntryBatch(entriesBatchFn dbShardEntryBatchWorkFn) { // NB(r): consider using a lockless list for ticking. s.RLock() elemsLen := s.list.Len() @@ -554,11 +567,11 @@ func (s *dbShard) forEachShardEntryBatch(entriesBatchFn dbShardEntryBatchWorkFn) if e == nil { return } - e.Value.(*lookup.Entry).DecrementReaderWriterCount() + e.Value.(*Entry).DecrementReaderWriterCount() } var ( - currEntries = make([]*lookup.Entry, 0, batchSize) + currEntries = make([]*Entry, 0, batchSize) first = true nextElem *list.Element ) @@ -578,7 +591,7 @@ func (s *dbShard) forEachShardEntryBatch(entriesBatchFn dbShardEntryBatchWorkFn) elem := nextElem for ticked := 0; ticked < batchSize && elem != nil; ticked++ { nextElem = elem.Next() - entry := elem.Value.(*lookup.Entry) + entry := elem.Value.(*Entry) entry.IncrementReaderWriterCount() currEntries = append(currEntries, entry) elem = nextElem @@ -587,7 +600,7 @@ func (s *dbShard) forEachShardEntryBatch(entriesBatchFn dbShardEntryBatchWorkFn) // NB(prateek): inc a reference to the next element while we have a lock, // to guarantee the element pointer cannot be changed from under us. if nextElem != nil { - nextElem.Value.(*lookup.Entry).IncrementReaderWriterCount() + nextElem.Value.(*Entry).IncrementReaderWriterCount() } s.RUnlock() @@ -599,11 +612,9 @@ func (s *dbShard) forEachShardEntryBatch(entriesBatchFn dbShardEntryBatchWorkFn) currEntries = currEntries[:0] if !continueExecution { decRefElem(nextElem) - return nil + return } } - - return nil } func (s *dbShard) IsBootstrapped() bool { @@ -707,7 +718,7 @@ func (s *dbShard) tickAndExpire( terminatedTickingDueToClosing bool i int slept time.Duration - expired []*lookup.Entry + expired []*Entry ) s.RLock() tickSleepBatch := s.currRuntimeOptions.tickSleepSeriesBatchSize @@ -718,7 +729,7 @@ func (s *dbShard) tickAndExpire( // future read lock attempts. blockStates := s.blockStatesSnapshotWithRLock() s.RUnlock() - s.forEachShardEntryBatch(func(currEntries []*lookup.Entry) bool { + s.forEachShardEntryBatch(func(currEntries []*Entry) bool { // re-using `expired` to amortize allocs, still need to reset it // to be safe for re-use. for i := range expired { @@ -802,7 +813,7 @@ func (s *dbShard) tickAndExpire( // Currently, this function is only called by the lambda inside `tickAndExpire`'s `forEachShardEntryBatch` // call. This satisfies the contract of all entries it operating upon being guaranteed to have a // readerWriterEntryCount of at least 1, by virtue of the implementation of `forEachShardEntryBatch`. -func (s *dbShard) purgeExpiredSeries(expiredEntries []*lookup.Entry) { +func (s *dbShard) purgeExpiredSeries(expiredEntries []*Entry) { // Remove all expired series from lookup and list. s.Lock() for _, entry := range expiredEntries { @@ -887,7 +898,7 @@ func (s *dbShard) writeAndIndex( shouldReverseIndex bool, ) (SeriesWrite, error) { // Prepare write - entry, opts, err := s.tryRetrieveWritableSeries(id) + entry, opts, err := s.TryRetrieveWritableSeries(id) if err != nil { return SeriesWrite{}, err } @@ -895,7 +906,7 @@ func (s *dbShard) writeAndIndex( writable := entry != nil // If no entry and we are not writing new series asynchronously. - if !writable && !opts.writeNewSeriesAsync { + if !writable && !opts.WriteNewSeriesAsync { // Avoid double lookup by enqueueing insert immediately. result, err := s.insertSeriesAsyncBatched(id, tagResolver, dbShardInsertAsyncOptions{ hasPendingIndexing: shouldReverseIndex, @@ -947,7 +958,7 @@ func (s *dbShard) writeAndIndex( commitLogSeriesUniqueIndex = entry.Index if err == nil && shouldReverseIndex { if entry.NeedsIndexUpdate(s.reverseIndex.BlockStartForWriteTime(timestamp)) { - if !opts.writeNewSeriesAsync { + if !opts.WriteNewSeriesAsync { return SeriesWrite{}, fmt.Errorf("to index async need write new series to be enabled") } needsIndex = true @@ -986,7 +997,7 @@ func (s *dbShard) writeAndIndex( } if shouldReverseIndex { - if !opts.writeNewSeriesAsync { + if !opts.WriteNewSeriesAsync { return SeriesWrite{}, fmt.Errorf("to index async need write new series to be enabled") } needsIndex = true @@ -1067,7 +1078,7 @@ func (s *dbShard) ReadEncoded( nsCtx namespace.Context, ) (series.BlockReaderIter, error) { s.RLock() - entry, _, err := s.lookupEntryWithLock(id) + entry, err := s.lookupEntryWithLock(id) if entry != nil { // NB(r): Ensure readers have consistent view of this series, do // not expire the series while being read from. @@ -1112,20 +1123,20 @@ func (s *dbShard) FetchWideEntry( } // lookupEntryWithLock returns the entry for a given id while holding a read lock or a write lock. -func (s *dbShard) lookupEntryWithLock(id ident.ID) (*lookup.Entry, *list.Element, error) { +func (s *dbShard) lookupEntryWithLock(id ident.ID) (*Entry, error) { if s.state != dbShardStateOpen { // NB(r): Return an invalid params error here so any upstream // callers will not retry this operation - return nil, nil, xerrors.NewInvalidParamsError(errShardNotOpen) + return nil, xerrors.NewInvalidParamsError(errShardNotOpen) } elem, exists := s.lookup.Get(id) if !exists { - return nil, nil, errShardEntryNotFound + return nil, errShardEntryNotFound } - return elem.Value.(*lookup.Entry), elem, nil + return elem.Value.(*Entry), nil } -func (s *dbShard) writableSeries(id ident.ID, tagResolver convert.TagMetadataResolver) (*lookup.Entry, error) { +func (s *dbShard) writableSeries(id ident.ID, tagResolver convert.TagMetadataResolver) (*Entry, error) { for { entry, err := s.retrieveWritableSeries(id) if entry != nil { @@ -1146,20 +1157,23 @@ func (s *dbShard) writableSeries(id ident.ID, tagResolver convert.TagMetadataRes } } -type writableSeriesOptions struct { - writeNewSeriesAsync bool +// WritableSeriesOptions defines writable series options. +type WritableSeriesOptions struct { + // WriteNewSeriesAsync specifies if the series should be async written. + WriteNewSeriesAsync bool } -func (s *dbShard) tryRetrieveWritableSeries(id ident.ID) ( - *lookup.Entry, - writableSeriesOptions, +// TryRetrieveWritableSeries attempts to retrieve a writable series. +func (s *dbShard) TryRetrieveWritableSeries(id ident.ID) ( + *Entry, + WritableSeriesOptions, error, ) { s.RLock() - opts := writableSeriesOptions{ - writeNewSeriesAsync: s.currRuntimeOptions.writeNewSeriesAsync, + opts := WritableSeriesOptions{ + WriteNewSeriesAsync: s.currRuntimeOptions.writeNewSeriesAsync, } - if entry, _, err := s.lookupEntryWithLock(id); err == nil { + if entry, err := s.lookupEntryWithLock(id); err == nil { entry.IncrementReaderWriterCount() s.RUnlock() return entry, opts, nil @@ -1171,15 +1185,15 @@ func (s *dbShard) tryRetrieveWritableSeries(id ident.ID) ( return nil, opts, nil } -func (s *dbShard) retrieveWritableSeries(id ident.ID) (*lookup.Entry, error) { - entry, _, err := s.tryRetrieveWritableSeries(id) +func (s *dbShard) retrieveWritableSeries(id ident.ID) (*Entry, error) { + entry, _, err := s.TryRetrieveWritableSeries(id) return entry, err } func (s *dbShard) newShardEntry( id ident.ID, tagResolver convert.TagMetadataResolver, -) (*lookup.Entry, error) { +) (*Entry, error) { // NB(r): As documented in storage/series.DatabaseSeries the series IDs // and metadata are garbage collected, hence we cast the ID to a BytesID // that can't be finalized. @@ -1215,14 +1229,8 @@ func (s *dbShard) newShardEntry( OnEvictedFromWiredList: s, Options: s.seriesOpts, }) - return lookup.NewEntry(lookup.NewEntryOptions{ - RelookupAndIncrementReaderWriterCount: func() (index.OnIndexSeries, bool) { - e, _, err := s.tryRetrieveWritableSeries(seriesID) - if err != nil || e == nil { - return nil, false - } - return e, true - }, + return NewEntry(NewEntryOptions{ + Shard: s, Series: newSeries, Index: uniqueIndex, IndexWriter: s.reverseIndex, @@ -1236,11 +1244,11 @@ type insertAsyncResult struct { // entry is not guaranteed to be the final entry // inserted into the shard map in case there is already // an existing entry waiting in the insert queue - entry *lookup.Entry + entry *Entry } func (s *dbShard) pendingIndexInsert( - entry *lookup.Entry, + entry *Entry, timestamp xtime.UnixNano, ) writes.PendingIndexInsert { // inc a ref on the entry to ensure it's valid until the queue acts upon it. @@ -1256,7 +1264,7 @@ func (s *dbShard) pendingIndexInsert( } func (s *dbShard) insertSeriesForIndexingAsyncBatched( - entry *lookup.Entry, + entry *Entry, timestamp xtime.UnixNano, async bool, ) error { @@ -1340,7 +1348,7 @@ func (s *dbShard) insertSeriesSync( id ident.ID, tagResolver convert.TagMetadataResolver, opts insertSyncOptions, -) (*lookup.Entry, error) { +) (*Entry, error) { // NB(r): Create new shard entry outside of write lock to reduce // time using write lock. newEntry, err := s.newShardEntry(id, tagResolver) @@ -1363,7 +1371,7 @@ func (s *dbShard) insertSeriesSync( } }() - existingEntry, _, err := s.lookupEntryWithLock(id) + existingEntry, err := s.lookupEntryWithLock(id) if err != nil && err != errShardEntryNotFound { // Shard not taking inserts likely. return nil, err @@ -1405,7 +1413,7 @@ func (s *dbShard) insertSeriesSync( return newEntry, nil } -func (s *dbShard) insertNewShardEntryWithLock(entry *lookup.Entry) { +func (s *dbShard) insertNewShardEntryWithLock(entry *Entry) { // Set the lookup value, we use the copied ID and since it is GC'd // we explicitly set it with options to not copy the key and not to // finalize it. @@ -1448,7 +1456,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { // i.e. we don't have a ref on provided entry, so we check if between the operation being // enqueue in the shard insert queue, and this function executing, an entry was created // for the same ID. - entry, _, err := s.lookupEntryWithLock(inserts[i].entry.Series.ID()) + entry, err := s.lookupEntryWithLock(inserts[i].entry.Series.ID()) if entry != nil { // Already exists so update the entry we're pointed at for this insert. inserts[i].entry = entry @@ -1582,7 +1590,7 @@ func (s *dbShard) FetchBlocks( nsCtx namespace.Context, ) ([]block.FetchBlockResult, error) { s.RLock() - entry, _, err := s.lookupEntryWithLock(id) + entry, err := s.lookupEntryWithLock(id) if entry != nil { // NB(r): Ensure readers have consistent view of this series, do // not expire the series while being read from. @@ -1623,7 +1631,7 @@ func (s *dbShard) FetchBlocksForColdFlush( nsCtx namespace.Context, ) (block.FetchBlockResult, error) { s.RLock() - entry, _, err := s.lookupEntryWithLock(seriesID) + entry, err := s.lookupEntryWithLock(seriesID) s.RUnlock() if entry == nil || err != nil { return block.FetchBlockResult{}, err @@ -1646,7 +1654,7 @@ func (s *dbShard) fetchActiveBlocksMetadata( ) var loopErr error - s.forEachShardEntry(func(entry *lookup.Entry) bool { + s.forEachShardEntry(func(entry *Entry) bool { // Break out of the iteration loop once we've accumulated enough entries. if int64(len(res.Results())) >= limit { next := int64(entry.Index) @@ -1956,8 +1964,18 @@ func (s *dbShard) UpdateFlushStates() { info := result.Info at := xtime.UnixNano(info.BlockStart) currState := s.flushStateNoBootstrapCheck(at) - if currState.WarmStatus != fileOpSuccess { - s.markWarmFlushStateSuccess(at) + + // When initializing from disk, the data files being present are sufficient + // for considering the data+index are flushed because that distinction is only + // needed to account for the raciness surrounding GCing series based on when + // data + index flushes have occurred. For the purposes of just initializing + // the state of which blocks have been flushed when bootstrapping, we can + // just use the data being present as the indicator. + if currState.WarmStatus.DataFlushed != fileOpSuccess { + s.markWarmDataFlushStateSuccess(at) + } + if currState.WarmStatus.IndexFlushed != fileOpSuccess { + s.markWarmIndexFlushStateSuccess(at) } // Cold version needs to get bootstrapped so that the 1:1 relationship @@ -2012,7 +2030,7 @@ func (s *dbShard) Bootstrap( } // Move any bootstrap buffers into position for reading. - s.forEachShardEntry(func(entry *lookup.Entry) bool { + s.forEachShardEntry(func(entry *Entry) bool { if err := entry.Series.Bootstrap(nsCtx); err != nil { multiErr = multiErr.Add(err) } @@ -2088,7 +2106,7 @@ func (s *dbShard) loadBlock( ) // First lookup if series already exists. - entry, shardOpts, err := s.tryRetrieveWritableSeries(id) + entry, shardOpts, err := s.TryRetrieveWritableSeries(id) if err != nil && err != errShardEntryNotFound { return result, err } @@ -2144,7 +2162,7 @@ func (s *dbShard) loadBlock( if s.reverseIndex != nil && entry.NeedsIndexUpdate(s.reverseIndex.BlockStartForWriteTime(timestamp)) { err = s.insertSeriesForIndexingAsyncBatched(entry, timestamp, - shardOpts.writeNewSeriesAsync) + shardOpts.WriteNewSeriesAsync) if err != nil { return result, err } @@ -2202,14 +2220,14 @@ func (s *dbShard) WarmFlush( } prepared, err := flushPreparer.PrepareData(prepareOpts) if err != nil { - return s.markWarmFlushStateSuccessOrError(blockStart, err) + return err } var multiErr xerrors.MultiError flushCtx := s.contextPool.Get() // From pool so finalizers are from pool. flushResult := dbShardFlushResult{} - s.forEachShardEntry(func(entry *lookup.Entry) bool { + s.forEachShardEntry(func(entry *Entry) bool { curr := entry.Series // Use a temporary context here so the stream readers can be returned to // the pool after we finish fetching flushing the series. @@ -2236,7 +2254,7 @@ func (s *dbShard) WarmFlush( multiErr = multiErr.Add(err) } - return s.markWarmFlushStateSuccessOrError(blockStart, multiErr.FinalError()) + return s.markWarmDataFlushStateSuccessOrError(blockStart, multiErr.FinalError()) } func (s *dbShard) ColdFlush( @@ -2275,7 +2293,7 @@ func (s *dbShard) ColdFlush( ) // First, loop through all series to capture data on which blocks have dirty // series and add them to the resources for further processing. - s.forEachShardEntry(func(entry *lookup.Entry) bool { + s.forEachShardEntry(func(entry *Entry) bool { curr := entry.Series seriesMetadata := curr.Metadata() blockStarts := curr.ColdFlushBlockStarts(blockStatesSnapshot) @@ -2385,7 +2403,7 @@ func (s *dbShard) Snapshot( var needsSnapshot bool checkNeedsSnapshotTimer := s.metrics.snapshotCheckNeedsSnapshotLatency.Start() - s.forEachShardEntry(func(entry *lookup.Entry) bool { + s.forEachShardEntry(func(entry *Entry) bool { if !entry.Series.IsBufferEmptyAtBlockStart(blockStart) { needsSnapshot = true return false @@ -2425,7 +2443,7 @@ func (s *dbShard) Snapshot( stats series.SnapshotResultStats multiErr xerrors.MultiError ) - s.forEachShardEntry(func(entry *lookup.Entry) bool { + s.forEachShardEntry(func(entry *Entry) bool { series := entry.Series // Use a temporary context here so the stream readers can be returned to // pool after we finish fetching flushing the series @@ -2493,34 +2511,64 @@ func (s *dbShard) flushStateNoBootstrapCheck(blockStart xtime.UnixNano) fileOpSt func (s *dbShard) flushStateWithRLock(blockStart xtime.UnixNano) fileOpState { state, ok := s.flushState.statesByTime[blockStart] if !ok { - return fileOpState{WarmStatus: fileOpNotStarted} + return fileOpState{WarmStatus: warmStatus{ + DataFlushed: fileOpNotStarted, + IndexFlushed: fileOpNotStarted, + }} } return state } -func (s *dbShard) markWarmFlushStateSuccessOrError(blockStart xtime.UnixNano, err error) error { +func (s *dbShard) markWarmDataFlushStateSuccessOrError(blockStart xtime.UnixNano, err error) error { // Track flush state for block state if err == nil { - s.markWarmFlushStateSuccess(blockStart) + s.markWarmDataFlushStateSuccess(blockStart) } else { - s.markWarmFlushStateFail(blockStart) + s.markWarmDataFlushStateFail(blockStart) } return err } -func (s *dbShard) markWarmFlushStateSuccess(blockStart xtime.UnixNano) { +func (s *dbShard) markWarmDataFlushStateSuccess(blockStart xtime.UnixNano) { s.flushState.Lock() - s.flushState.statesByTime[blockStart] = - fileOpState{ - WarmStatus: fileOpSuccess, - } + state := s.flushState.statesByTime[blockStart] + state.WarmStatus.DataFlushed = fileOpSuccess + s.flushState.statesByTime[blockStart] = state + s.flushState.Unlock() +} + +func (s *dbShard) markWarmDataFlushStateFail(blockStart xtime.UnixNano) { + s.flushState.Lock() + state := s.flushState.statesByTime[blockStart] + state.WarmStatus.DataFlushed = fileOpFailed + state.NumFailures++ + s.flushState.statesByTime[blockStart] = state + s.flushState.Unlock() +} + +// MarkWarmIndexFlushStateSuccessOrError marks the blockStart as +// success or fail based on the provided err. +func (s *dbShard) MarkWarmIndexFlushStateSuccessOrError(blockStart xtime.UnixNano, err error) { + // Track flush state for block state + if err == nil { + s.markWarmIndexFlushStateSuccess(blockStart) + } else { + s.markWarmIndexFlushStateFail(blockStart) + } +} + +func (s *dbShard) markWarmIndexFlushStateSuccess(blockStart xtime.UnixNano) { + s.flushState.Lock() + state := s.flushState.statesByTime[blockStart] + state.WarmStatus.IndexFlushed = fileOpSuccess + s.flushState.statesByTime[blockStart] = state s.flushState.Unlock() } -func (s *dbShard) markWarmFlushStateFail(blockStart xtime.UnixNano) { +func (s *dbShard) markWarmIndexFlushStateFail(blockStart xtime.UnixNano) { s.flushState.Lock() state := s.flushState.statesByTime[blockStart] - state.WarmStatus = fileOpFailed + state.WarmStatus.IndexFlushed = fileOpFailed state.NumFailures++ s.flushState.statesByTime[blockStart] = state s.flushState.Unlock() @@ -2651,7 +2699,7 @@ func (s *dbShard) DocRef(id ident.ID) (doc.Metadata, bool, error) { s.RLock() defer s.RUnlock() - entry, _, err := s.lookupEntryWithLock(id) + entry, err := s.lookupEntryWithLock(id) if err == nil { return entry.Series.Metadata(), true, nil } @@ -2707,7 +2755,7 @@ func (s *dbShard) finishWriting( markWarmFlushStateSuccess bool, ) error { if markWarmFlushStateSuccess { - s.markWarmFlushStateSuccess(blockStart) + s.markWarmDataFlushStateSuccess(blockStart) } // After writing the full block successfully update the ColdVersionFlushed number. This will diff --git a/src/dbnode/storage/shard_foreachentry_prop_test.go b/src/dbnode/storage/shard_foreachentry_prop_test.go index b286eebb7e..bcbcb7863f 100644 --- a/src/dbnode/storage/shard_foreachentry_prop_test.go +++ b/src/dbnode/storage/shard_foreachentry_prop_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" @@ -205,7 +204,7 @@ func shardEntriesAreEqual(shard *dbShard, expectedEntries []shardEntryState) err return fmt.Errorf("expected to have %d idx, but did not see anything", idx) } nextElem := elem.Next() - entry := elem.Value.(*lookup.Entry) + entry := elem.Value.(*Entry) if !entry.Series.ID().Equal(expectedEntry.id) { return fmt.Errorf("expected id: %s at %d, observed: %s", expectedEntry.id.String(), idx, entry.Series.ID().String()) @@ -253,7 +252,7 @@ func genBatchWorkFn() gopter.Gen { return gen.UInt8(). Map(func(n uint8) dbShardEntryBatchWorkFn { i := uint8(0) - return func([]*lookup.Entry) bool { + return func([]*Entry) bool { i++ return i < n } diff --git a/src/dbnode/storage/shard_index_test.go b/src/dbnode/storage/shard_index_test.go index 4341f782ab..e7b9ee586d 100644 --- a/src/dbnode/storage/shard_index_test.go +++ b/src/dbnode/storage/shard_index_test.go @@ -140,7 +140,7 @@ func TestShardAsyncInsertMarkIndexedForBlockStart(t *testing.T) { start := time.Now() for time.Since(start) < 10*time.Second { - entry, _, err := shard.tryRetrieveWritableSeries(ident.StringID("foo")) + entry, _, err := shard.TryRetrieveWritableSeries(ident.StringID("foo")) require.NoError(t, err) if entry == nil { time.Sleep(10 * time.Millisecond) @@ -190,7 +190,7 @@ func TestShardAsyncIndexIfExpired(t *testing.T) { // make sure next block not marked as indexed start := time.Now() for time.Since(start) < 10*time.Second { - entry, _, err := shard.tryRetrieveWritableSeries(ident.StringID("foo")) + entry, _, err := shard.TryRetrieveWritableSeries(ident.StringID("foo")) require.NoError(t, err) if entry == nil { time.Sleep(10 * time.Millisecond) diff --git a/src/dbnode/storage/shard_insert_queue.go b/src/dbnode/storage/shard_insert_queue.go index 927f0868e8..6f93c05ade 100644 --- a/src/dbnode/storage/shard_insert_queue.go +++ b/src/dbnode/storage/shard_insert_queue.go @@ -29,7 +29,6 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/series" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/clock" @@ -335,7 +334,7 @@ type dbShardInsertsByCPUCore struct { } type dbShardInsert struct { - entry *lookup.Entry + entry *Entry opts dbShardInsertAsyncOptions } diff --git a/src/dbnode/storage/shard_insert_queue_test.go b/src/dbnode/storage/shard_insert_queue_test.go index 1190a80bb3..b11464c0ce 100644 --- a/src/dbnode/storage/shard_insert_queue_test.go +++ b/src/dbnode/storage/shard_insert_queue_test.go @@ -26,8 +26,6 @@ import ( "testing" "time" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" - "github.com/fortytw2/leaktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -90,16 +88,16 @@ func TestShardInsertQueueBatchBackoff(t *testing.T) { }() // first insert - _, err := q.Insert(dbShardInsert{entry: &lookup.Entry{Index: 0}}) + _, err := q.Insert(dbShardInsert{entry: &Entry{Index: 0}}) require.NoError(t, err) // wait for first insert batch to complete insertWgs[0].Wait() // now next batch will need to wait as we haven't progressed time - _, err = q.Insert(dbShardInsert{entry: &lookup.Entry{Index: 1}}) + _, err = q.Insert(dbShardInsert{entry: &Entry{Index: 1}}) require.NoError(t, err) - _, err = q.Insert(dbShardInsert{entry: &lookup.Entry{Index: 2}}) + _, err = q.Insert(dbShardInsert{entry: &Entry{Index: 2}}) require.NoError(t, err) // allow first insert to finish @@ -112,7 +110,7 @@ func TestShardInsertQueueBatchBackoff(t *testing.T) { assert.Equal(t, 1, numSleeps) // insert third batch, will also need to wait - _, err = q.Insert(dbShardInsert{entry: &lookup.Entry{Index: 3}}) + _, err = q.Insert(dbShardInsert{entry: &Entry{Index: 3}}) require.NoError(t, err) // allow second batch to finish diff --git a/src/dbnode/storage/shard_ref_count_test.go b/src/dbnode/storage/shard_ref_count_test.go index 74cbffac9d..f0bfb49edc 100644 --- a/src/dbnode/storage/shard_ref_count_test.go +++ b/src/dbnode/storage/shard_ref_count_test.go @@ -90,7 +90,7 @@ func testShardWriteSyncRefCount(t *testing.T, opts Options) { // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() assert.NoError(t, err) assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -114,7 +114,7 @@ func testShardWriteSyncRefCount(t *testing.T, opts Options) { // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() assert.NoError(t, err) assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -213,7 +213,7 @@ func testShardWriteTaggedSyncRefCount(t *testing.T, idx NamespaceIndex) { // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() assert.NoError(t, err) assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -240,7 +240,7 @@ func testShardWriteTaggedSyncRefCount(t *testing.T, idx NamespaceIndex) { // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() assert.NoError(t, err) assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -293,7 +293,7 @@ func TestShardWriteAsyncRefCount(t *testing.T) { // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() assert.NoError(t, err) assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -317,7 +317,7 @@ func TestShardWriteAsyncRefCount(t *testing.T) { // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() assert.NoError(t, err) assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -459,7 +459,7 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx NamespaceIndex, nowFn f // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() assert.NoError(t, err) assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) @@ -486,7 +486,7 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx NamespaceIndex, nowFn f // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { shard.Lock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) shard.Unlock() assert.NoError(t, err) assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 93ab49f37b..af270ffbc3 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -42,7 +42,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/dbnode/ts" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" "github.com/m3db/m3/src/dbnode/x/xio" @@ -100,7 +99,7 @@ func addMockSeries(ctrl *gomock.Controller, shard *dbShard, id ident.ID, tags id series.EXPECT().ID().Return(id).AnyTimes() series.EXPECT().IsEmpty().Return(false).AnyTimes() shard.Lock() - shard.insertNewShardEntryWithLock(lookup.NewEntry(lookup.NewEntryOptions{ + shard.insertNewShardEntryWithLock(NewEntry(NewEntryOptions{ Series: series, Index: index, })) @@ -177,7 +176,9 @@ func TestShardFlushStateNotStarted(t *testing.T) { nsCtx := namespace.Context{ID: ident.StringID("foo")} s.Bootstrap(ctx, nsCtx) - notStarted := fileOpState{WarmStatus: fileOpNotStarted} + notStarted := fileOpState{WarmStatus: warmStatus{ + DataFlushed: fileOpNotStarted, + }} for st := earliest; !st.After(latest); st = st.Add(ropts.BlockSize()) { flushState, err := s.FlushState(earliest) require.NoError(t, err) @@ -215,7 +216,7 @@ func TestShardBootstrapWithFlushVersion(t *testing.T) { // Load the mock into the shard as an expected series so that we can assert // on the call to its Bootstrap() method below. - entry := lookup.NewEntry(lookup.NewEntryOptions{ + entry := NewEntry(NewEntryOptions{ Series: mockSeries, }) s.Lock() @@ -431,8 +432,10 @@ func TestShardFlushSeriesFlushError(t *testing.T) { s.Bootstrap(ctx, nsCtx) s.flushState.statesByTime[blockStart] = fileOpState{ - WarmStatus: fileOpFailed, - NumFailures: 1, + WarmStatus: warmStatus{ + DataFlushed: fileOpNotStarted, + }, + NumFailures: 0, } var closed bool @@ -464,12 +467,12 @@ func TestShardFlushSeriesFlushError(t *testing.T) { flushed[i] = struct{}{} }). Return(series.FlushOutcomeErr, expectedErr) - s.list.PushBack(lookup.NewEntry(lookup.NewEntryOptions{ + s.list.PushBack(NewEntry(NewEntryOptions{ Series: curr, })) } - err := s.WarmFlush(blockStart, flush, namespace.Context{}) + flushErr := s.WarmFlush(blockStart, flush, namespace.Context{}) require.Equal(t, len(flushed), 2) for i := 0; i < 2; i++ { @@ -478,14 +481,16 @@ func TestShardFlushSeriesFlushError(t *testing.T) { } require.True(t, closed) - require.NotNil(t, err) - require.Equal(t, "error bar", err.Error()) + require.NotNil(t, flushErr) + require.Equal(t, "error bar", flushErr.Error()) flushState, err := s.FlushState(blockStart) require.NoError(t, err) require.Equal(t, fileOpState{ - WarmStatus: fileOpFailed, - NumFailures: 2, + WarmStatus: warmStatus{ + DataFlushed: fileOpFailed, + }, + NumFailures: 1, }, flushState) } @@ -511,8 +516,10 @@ func TestShardFlushSeriesFlushSuccess(t *testing.T) { s.Bootstrap(ctx, nsCtx) s.flushState.statesByTime[blockStart] = fileOpState{ - WarmStatus: fileOpFailed, - NumFailures: 1, + WarmStatus: warmStatus{ + DataFlushed: fileOpNotStarted, + }, + NumFailures: 0, } var closed bool @@ -541,7 +548,7 @@ func TestShardFlushSeriesFlushSuccess(t *testing.T) { flushed[i] = struct{}{} }). Return(series.FlushOutcomeFlushedToDisk, nil) - s.list.PushBack(lookup.NewEntry(lookup.NewEntryOptions{ + s.list.PushBack(NewEntry(NewEntryOptions{ Series: curr, })) } @@ -557,10 +564,13 @@ func TestShardFlushSeriesFlushSuccess(t *testing.T) { require.True(t, closed) require.Nil(t, err) + // State not yet updated since an explicit call to MarkWarmFlushStateSuccessOrError is required. flushState, err := s.FlushState(blockStart) require.NoError(t, err) require.Equal(t, fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, ColdVersionRetrievable: 0, NumFailures: 0, }, flushState) @@ -621,13 +631,13 @@ func TestShardColdFlush(t *testing.T) { // happen after a successful warm flush because warm flushes currently don't // have merging logic. This means that all blocks except t7 should // successfully cold flush. - shard.markWarmFlushStateSuccess(t0) - shard.markWarmFlushStateSuccess(t1) - shard.markWarmFlushStateSuccess(t2) - shard.markWarmFlushStateSuccess(t3) - shard.markWarmFlushStateSuccess(t4) - shard.markWarmFlushStateSuccess(t5) - shard.markWarmFlushStateSuccess(t6) + shard.markWarmDataFlushStateSuccess(t0) + shard.markWarmDataFlushStateSuccess(t1) + shard.markWarmDataFlushStateSuccess(t2) + shard.markWarmDataFlushStateSuccess(t3) + shard.markWarmDataFlushStateSuccess(t4) + shard.markWarmDataFlushStateSuccess(t5) + shard.markWarmDataFlushStateSuccess(t6) dirtyData := []testDirtySeries{ {id: ident.StringID("id0"), dirtyTimes: []xtime.UnixNano{t0, t2, t3, t4}}, @@ -641,7 +651,7 @@ func TestShardColdFlush(t *testing.T) { curr.EXPECT().Metadata().Return(doc.Metadata{ID: ds.id.Bytes()}).AnyTimes() curr.EXPECT().ColdFlushBlockStarts(gomock.Any()). Return(optimizedTimesFromTimes(ds.dirtyTimes)) - shard.list.PushBack(lookup.NewEntry(lookup.NewEntryOptions{ + shard.list.PushBack(NewEntry(NewEntryOptions{ Series: curr, })) } @@ -702,10 +712,10 @@ func TestShardColdFlushNoMergeIfNothingDirty(t *testing.T) { t1 := t0.Add(1 * blockSize) t2 := t0.Add(2 * blockSize) t3 := t0.Add(3 * blockSize) - shard.markWarmFlushStateSuccess(t0) - shard.markWarmFlushStateSuccess(t1) - shard.markWarmFlushStateSuccess(t2) - shard.markWarmFlushStateSuccess(t3) + shard.markWarmDataFlushStateSuccess(t0) + shard.markWarmDataFlushStateSuccess(t1) + shard.markWarmDataFlushStateSuccess(t2) + shard.markWarmDataFlushStateSuccess(t3) preparer := persist.NewMockFlushPreparer(ctrl) fsReader := fs.NewMockDataFileSetReader(ctrl) @@ -845,7 +855,7 @@ func TestShardSnapshotSeriesSnapshotSuccess(t *testing.T) { snapshotted[i] = struct{}{} }). Return(series.SnapshotResult{}, nil) - s.list.PushBack(lookup.NewEntry(lookup.NewEntryOptions{ + s.list.PushBack(NewEntry(NewEntryOptions{ Series: entry, })) } @@ -865,7 +875,7 @@ func addMockTestSeries(ctrl *gomock.Controller, shard *dbShard, id ident.ID) *se series := series.NewMockDatabaseSeries(ctrl) series.EXPECT().ID().AnyTimes().Return(id) shard.Lock() - shard.insertNewShardEntryWithLock(lookup.NewEntry(lookup.NewEntryOptions{ + shard.insertNewShardEntryWithLock(NewEntry(NewEntryOptions{ Series: series, })) shard.Unlock() @@ -883,7 +893,7 @@ func addTestSeriesWithCount(shard *dbShard, id ident.ID, count int32) series.Dat Options: shard.seriesOpts, }) shard.Lock() - entry := lookup.NewEntry(lookup.NewEntryOptions{ + entry := NewEntry(NewEntryOptions{ Series: seriesEntry, }) for i := int32(0); i < count; i++ { @@ -971,10 +981,14 @@ func TestShardTick(t *testing.T) { // Also check that it expires flush states by time shard.flushState.statesByTime[earliestFlush] = fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, } shard.flushState.statesByTime[beforeEarliestFlush] = fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, } assert.Equal(t, 2, len(shard.flushState.statesByTime)) @@ -1142,10 +1156,14 @@ func testShardWriteAsync(t *testing.T, writes []testWrite) { // Also check that it expires flush states by time shard.flushState.statesByTime[earliestFlush] = fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, } shard.flushState.statesByTime[beforeEarliestFlush] = fileOpState{ - WarmStatus: fileOpSuccess, + WarmStatus: warmStatus{ + DataFlushed: fileOpSuccess, + }, } assert.Equal(t, 2, len(shard.flushState.statesByTime)) @@ -1455,7 +1473,7 @@ func TestPurgeExpiredSeriesWriteAfterPurging(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() - var entry *lookup.Entry + var entry *Entry opts := DefaultTestOptions() shard := testDatabaseShard(t, opts) @@ -1488,7 +1506,7 @@ func TestForEachShardEntry(t *testing.T) { } count := 0 - entryFn := func(entry *lookup.Entry) bool { + entryFn := func(entry *Entry) bool { if entry.Series.ID().String() == "foo.8" { return false } @@ -1508,7 +1526,7 @@ func TestForEachShardEntry(t *testing.T) { // Ensure that reader writer count gets reset shard.RLock() for elem := shard.list.Front(); elem != nil; elem = elem.Next() { - entry := elem.Value.(*lookup.Entry) + entry := elem.Value.(*Entry) assert.Equal(t, int32(0), entry.ReaderWriterCount()) } shard.RUnlock() @@ -1639,8 +1657,8 @@ func TestShardFetchIndexChecksum(t *testing.T) { ropts := shard.seriesOpts.RetentionOptions() end := xtime.ToUnixNano(opts.ClockOptions().NowFn()()).Truncate(ropts.BlockSize()) start := end.Add(-2 * ropts.BlockSize()) - shard.markWarmFlushStateSuccess(start) - shard.markWarmFlushStateSuccess(start.Add(ropts.BlockSize())) + shard.markWarmDataFlushStateSuccess(start) + shard.markWarmDataFlushStateSuccess(start.Add(ropts.BlockSize())) retriever := block.NewMockDatabaseBlockRetriever(ctrl) shard.setBlockRetriever(retriever) @@ -1678,7 +1696,7 @@ func TestShardFetchIndexChecksum(t *testing.T) { time.Sleep(time.Second) shard.RLock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID("foo")) + entry, err := shard.lookupEntryWithLock(ident.StringID("foo")) shard.RUnlock() require.Equal(t, err, errShardEntryNotFound) @@ -1713,8 +1731,8 @@ func TestShardReadEncodedCachesSeriesWithRecentlyReadPolicy(t *testing.T) { ropts := shard.seriesOpts.RetentionOptions() end := xtime.ToUnixNano(opts.ClockOptions().NowFn()()).Truncate(ropts.BlockSize()) start := end.Add(-2 * ropts.BlockSize()) - shard.markWarmFlushStateSuccess(start) - shard.markWarmFlushStateSuccess(start.Add(ropts.BlockSize())) + shard.markWarmDataFlushStateSuccess(start) + shard.markWarmDataFlushStateSuccess(start.Add(ropts.BlockSize())) retriever := block.NewMockDatabaseBlockRetriever(ctrl) shard.setBlockRetriever(retriever) @@ -1772,7 +1790,7 @@ func TestShardReadEncodedCachesSeriesWithRecentlyReadPolicy(t *testing.T) { begin := time.Now() for time.Since(begin) < 10*time.Second { shard.RLock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID("foo")) + entry, err := shard.lookupEntryWithLock(ident.StringID("foo")) shard.RUnlock() if err == errShardEntryNotFound { time.Sleep(5 * time.Millisecond) @@ -1786,7 +1804,7 @@ func TestShardReadEncodedCachesSeriesWithRecentlyReadPolicy(t *testing.T) { } shard.RLock() - entry, _, err := shard.lookupEntryWithLock(ident.StringID("foo")) + entry, err := shard.lookupEntryWithLock(ident.StringID("foo")) shard.RUnlock() require.NoError(t, err) require.NotNil(t, entry) @@ -1866,7 +1884,7 @@ func TestShardNewEntryDoesNotAlterIDOrTags(t *testing.T) { shard.insertNewShardEntryWithLock(entry) shard.Unlock() - entry, _, err = shard.tryRetrieveWritableSeries(seriesID) + entry, _, err = shard.TryRetrieveWritableSeries(seriesID) require.NoError(t, err) entryIDBytes := entry.Series.ID().Bytes() @@ -2002,7 +2020,7 @@ func TestSeriesRefResolver(t *testing.T) { // should return already inserted entry as series. resolverEntry, err := shard.SeriesRefResolver(seriesID, iter) require.NoError(t, err) - require.IsType(t, &lookup.Entry{}, resolverEntry) + require.IsType(t, &Entry{}, resolverEntry) refEntry, err := resolverEntry.SeriesRef() require.NoError(t, err) require.Equal(t, seriesRef, refEntry) diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 256e576658..5e71c3c284 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1949,6 +1949,22 @@ func (mr *MockShardMockRecorder) OpenStreamingReader(blockStart interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenStreamingReader", reflect.TypeOf((*MockShard)(nil).OpenStreamingReader), blockStart) } +// TryRetrieveWritableSeries mocks base method. +func (m *MockShard) TryRetrieveWritableSeries(id ident.ID) (*Entry, WritableSeriesOptions, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryRetrieveWritableSeries", id) + ret0, _ := ret[0].(*Entry) + ret1, _ := ret[1].(WritableSeriesOptions) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// TryRetrieveWritableSeries indicates an expected call of TryRetrieveWritableSeries. +func (mr *MockShardMockRecorder) TryRetrieveWritableSeries(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryRetrieveWritableSeries", reflect.TypeOf((*MockShard)(nil).TryRetrieveWritableSeries), id) +} + // MockdatabaseShard is a mock of databaseShard interface. type MockdatabaseShard struct { ctrl *gomock.Controller @@ -2221,6 +2237,18 @@ func (mr *MockdatabaseShardMockRecorder) LoadBlocks(series interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadBlocks", reflect.TypeOf((*MockdatabaseShard)(nil).LoadBlocks), series) } +// MarkWarmIndexFlushStateSuccessOrError mocks base method. +func (m *MockdatabaseShard) MarkWarmIndexFlushStateSuccessOrError(blockStart time0.UnixNano, err error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "MarkWarmIndexFlushStateSuccessOrError", blockStart, err) +} + +// MarkWarmIndexFlushStateSuccessOrError indicates an expected call of MarkWarmIndexFlushStateSuccessOrError. +func (mr *MockdatabaseShardMockRecorder) MarkWarmIndexFlushStateSuccessOrError(blockStart, err interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkWarmIndexFlushStateSuccessOrError", reflect.TypeOf((*MockdatabaseShard)(nil).MarkWarmIndexFlushStateSuccessOrError), blockStart, err) +} + // NumSeries mocks base method. func (m *MockdatabaseShard) NumSeries() int64 { m.ctrl.T.Helper() @@ -2351,6 +2379,22 @@ func (mr *MockdatabaseShardMockRecorder) Tick(c, startTime, nsCtx interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockdatabaseShard)(nil).Tick), c, startTime, nsCtx) } +// TryRetrieveWritableSeries mocks base method. +func (m *MockdatabaseShard) TryRetrieveWritableSeries(id ident.ID) (*Entry, WritableSeriesOptions, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryRetrieveWritableSeries", id) + ret0, _ := ret[0].(*Entry) + ret1, _ := ret[1].(WritableSeriesOptions) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// TryRetrieveWritableSeries indicates an expected call of TryRetrieveWritableSeries. +func (mr *MockdatabaseShardMockRecorder) TryRetrieveWritableSeries(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryRetrieveWritableSeries", reflect.TypeOf((*MockdatabaseShard)(nil).TryRetrieveWritableSeries), id) +} + // UpdateFlushStates mocks base method. func (m *MockdatabaseShard) UpdateFlushStates() { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 61317be6e4..4c9a0bd9c1 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -502,6 +502,9 @@ type Shard interface { // OpenStreamingDataReader creates and opens a streaming fs.DataFileSetReader // on the latest volume of the given block. OpenStreamingReader(blockStart xtime.UnixNano) (fs.DataFileSetReader, error) + + // TryRetrieveWritableSeries attempts to retrieve a writable series. + TryRetrieveWritableSeries(id ident.ID) (*Entry, WritableSeriesOptions, error) } type databaseShard interface { @@ -611,6 +614,10 @@ type databaseShard interface { nsCtx namespace.Context, ) error + // MarkWarmIndexFlushStateSuccessOrError marks the blockStart as + // success or fail based on the provided err. + MarkWarmIndexFlushStateSuccessOrError(blockStart xtime.UnixNano, err error) + // ColdFlush flushes the unflushed ColdWrites in this shard. ColdFlush( flush persist.FlushPreparer, diff --git a/src/m3ninx/doc/doc_mock.go b/src/m3ninx/doc/doc_mock.go index a44db060d3..897732adab 100644 --- a/src/m3ninx/doc/doc_mock.go +++ b/src/m3ninx/doc/doc_mock.go @@ -27,6 +27,8 @@ package doc import ( "reflect" + "github.com/m3db/m3/src/x/time" + "github.com/golang/mock/gomock" ) @@ -280,3 +282,131 @@ func (mr *MockQueryDocIteratorMockRecorder) Next() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockQueryDocIterator)(nil).Next)) } + +// MockOnIndexSeries is a mock of OnIndexSeries interface. +type MockOnIndexSeries struct { + ctrl *gomock.Controller + recorder *MockOnIndexSeriesMockRecorder +} + +// MockOnIndexSeriesMockRecorder is the mock recorder for MockOnIndexSeries. +type MockOnIndexSeriesMockRecorder struct { + mock *MockOnIndexSeries +} + +// NewMockOnIndexSeries creates a new mock instance. +func NewMockOnIndexSeries(ctrl *gomock.Controller) *MockOnIndexSeries { + mock := &MockOnIndexSeries{ctrl: ctrl} + mock.recorder = &MockOnIndexSeriesMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOnIndexSeries) EXPECT() *MockOnIndexSeriesMockRecorder { + return m.recorder +} + +// DecrementReaderWriterCount mocks base method. +func (m *MockOnIndexSeries) DecrementReaderWriterCount() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "DecrementReaderWriterCount") +} + +// DecrementReaderWriterCount indicates an expected call of DecrementReaderWriterCount. +func (mr *MockOnIndexSeriesMockRecorder) DecrementReaderWriterCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DecrementReaderWriterCount", reflect.TypeOf((*MockOnIndexSeries)(nil).DecrementReaderWriterCount)) +} + +// IfAlreadyIndexedMarkIndexSuccessAndFinalize mocks base method. +func (m *MockOnIndexSeries) IfAlreadyIndexedMarkIndexSuccessAndFinalize(blockStart time.UnixNano) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IfAlreadyIndexedMarkIndexSuccessAndFinalize", blockStart) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IfAlreadyIndexedMarkIndexSuccessAndFinalize indicates an expected call of IfAlreadyIndexedMarkIndexSuccessAndFinalize. +func (mr *MockOnIndexSeriesMockRecorder) IfAlreadyIndexedMarkIndexSuccessAndFinalize(blockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IfAlreadyIndexedMarkIndexSuccessAndFinalize", reflect.TypeOf((*MockOnIndexSeries)(nil).IfAlreadyIndexedMarkIndexSuccessAndFinalize), blockStart) +} + +// IndexedForBlockStart mocks base method. +func (m *MockOnIndexSeries) IndexedForBlockStart(indexBlockStart time.UnixNano) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexedForBlockStart", indexBlockStart) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IndexedForBlockStart indicates an expected call of IndexedForBlockStart. +func (mr *MockOnIndexSeriesMockRecorder) IndexedForBlockStart(indexBlockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexedForBlockStart", reflect.TypeOf((*MockOnIndexSeries)(nil).IndexedForBlockStart), indexBlockStart) +} + +// NeedsIndexUpdate mocks base method. +func (m *MockOnIndexSeries) NeedsIndexUpdate(indexBlockStartForWrite time.UnixNano) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NeedsIndexUpdate", indexBlockStartForWrite) + ret0, _ := ret[0].(bool) + return ret0 +} + +// NeedsIndexUpdate indicates an expected call of NeedsIndexUpdate. +func (mr *MockOnIndexSeriesMockRecorder) NeedsIndexUpdate(indexBlockStartForWrite interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NeedsIndexUpdate", reflect.TypeOf((*MockOnIndexSeries)(nil).NeedsIndexUpdate), indexBlockStartForWrite) +} + +// OnIndexFinalize mocks base method. +func (m *MockOnIndexSeries) OnIndexFinalize(blockStart time.UnixNano) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnIndexFinalize", blockStart) +} + +// OnIndexFinalize indicates an expected call of OnIndexFinalize. +func (mr *MockOnIndexSeriesMockRecorder) OnIndexFinalize(blockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnIndexFinalize", reflect.TypeOf((*MockOnIndexSeries)(nil).OnIndexFinalize), blockStart) +} + +// OnIndexPrepare mocks base method. +func (m *MockOnIndexSeries) OnIndexPrepare(blockStart time.UnixNano) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnIndexPrepare", blockStart) +} + +// OnIndexPrepare indicates an expected call of OnIndexPrepare. +func (mr *MockOnIndexSeriesMockRecorder) OnIndexPrepare(blockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnIndexPrepare", reflect.TypeOf((*MockOnIndexSeries)(nil).OnIndexPrepare), blockStart) +} + +// OnIndexSuccess mocks base method. +func (m *MockOnIndexSeries) OnIndexSuccess(blockStart time.UnixNano) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnIndexSuccess", blockStart) +} + +// OnIndexSuccess indicates an expected call of OnIndexSuccess. +func (mr *MockOnIndexSeriesMockRecorder) OnIndexSuccess(blockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnIndexSuccess", reflect.TypeOf((*MockOnIndexSeries)(nil).OnIndexSuccess), blockStart) +} + +// RelookupAndCheckIsEmpty mocks base method. +func (m *MockOnIndexSeries) RelookupAndCheckIsEmpty() (bool, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RelookupAndCheckIsEmpty") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// RelookupAndCheckIsEmpty indicates an expected call of RelookupAndCheckIsEmpty. +func (mr *MockOnIndexSeriesMockRecorder) RelookupAndCheckIsEmpty() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RelookupAndCheckIsEmpty", reflect.TypeOf((*MockOnIndexSeries)(nil).RelookupAndCheckIsEmpty)) +} diff --git a/src/m3ninx/doc/document.go b/src/m3ninx/doc/document.go index 8faefedf80..741c10efec 100644 --- a/src/m3ninx/doc/document.go +++ b/src/m3ninx/doc/document.go @@ -89,9 +89,9 @@ func (f Fields) shallowCopy() Fields { // Metadata represents a document to be indexed. type Metadata struct { - ID []byte - Fields []Field - Ref interface{} + ID []byte + Fields []Field + OnIndexSeries OnIndexSeries } // Get returns the value of the specified field name in the document if it exists. diff --git a/src/m3ninx/doc/types.go b/src/m3ninx/doc/types.go index 879172719f..3ed6cb3ffa 100644 --- a/src/m3ninx/doc/types.go +++ b/src/m3ninx/doc/types.go @@ -20,6 +20,10 @@ package doc +import ( + xtime "github.com/m3db/m3/src/x/time" +) + // MetadataIterator provides an iterator over a collection of document metadata. It is NOT // safe for multiple goroutines to invoke methods on an MetadataIterator simultaneously. type MetadataIterator interface { @@ -72,3 +76,45 @@ type QueryDocIterator interface { // worker. Done() bool } + +// OnIndexSeries provides a set of callback hooks to allow the reverse index +// to do lifecycle management of any resources retained during indexing. +type OnIndexSeries interface { + // OnIndexSuccess is executed when an entry is successfully indexed. The + // provided value for `blockStart` is the blockStart for which the write + // was indexed. + OnIndexSuccess(blockStart xtime.UnixNano) + + // OnIndexFinalize is executed when the index no longer holds any references + // to the provided resources. It can be used to cleanup any resources held + // during the course of indexing. `blockStart` is the startTime of the index + // block for which the write was attempted. + OnIndexFinalize(blockStart xtime.UnixNano) + + // OnIndexPrepare prepares the Entry to be handed off to the indexing sub-system. + // NB(prateek): we retain the ref count on the entry while the indexing is pending, + // the callback executed on the entry once the indexing is completed releases this + // reference. + OnIndexPrepare(blockStart xtime.UnixNano) + + // NeedsIndexUpdate returns a bool to indicate if the Entry needs to be indexed + // for the provided blockStart. It only allows a single index attempt at a time + // for a single entry. + // NB(prateek): NeedsIndexUpdate is a CAS, i.e. when this method returns true, it + // also sets state on the entry to indicate that a write for the given blockStart + // is going to be sent to the index, and other go routines should not attempt the + // same write. Callers are expected to ensure they follow this guideline. + // Further, every call to NeedsIndexUpdate which returns true needs to have a corresponding + // OnIndexFinalze() call. This is required for correct lifecycle maintenance. + NeedsIndexUpdate(indexBlockStartForWrite xtime.UnixNano) bool + + IfAlreadyIndexedMarkIndexSuccessAndFinalize( + blockStart xtime.UnixNano, + ) bool + + RelookupAndCheckIsEmpty() (bool, bool) + + DecrementReaderWriterCount() + + IndexedForBlockStart(indexBlockStart xtime.UnixNano) bool +}