Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check entry empty state to ensure GC eligible #3634

Merged
merged 37 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
de7e6b9
WIP - refactor flush state marking to occur after index flush
rallen090 Jul 28, 2021
9fc4934
WIP - remove unused ns marking func
rallen090 Jul 28, 2021
e19a2ac
WIP - do not remove from index if series not empty
rallen090 Jul 29, 2021
ca70206
WIP - remove flushed block checks
rallen090 Jul 29, 2021
6a2eaf1
Cleanup 1
rallen090 Jul 29, 2021
5f1be6c
Mock gen
rallen090 Jul 29, 2021
de4c165
Fix tests 1
rallen090 Jul 29, 2021
f3117c9
Fix TestBlockWriteBackgroundCompact
rallen090 Aug 2, 2021
6568044
Lint
rallen090 Aug 2, 2021
7c1f06c
WIP - fix index flush conditions
rallen090 Aug 2, 2021
75842e3
WIP - fix index flush conditions 2
rallen090 Aug 2, 2021
ee6e6ea
Add test to verify warm flush ordering
rallen090 Aug 2, 2021
db5552f
Lint
rallen090 Aug 2, 2021
18984ca
Merge remote-tracking branch 'origin/r/index-active-block' into ra/in…
rallen090 Aug 3, 2021
1e81687
Experimental index flush matching
rallen090 Aug 5, 2021
8344c17
Use maps for shard flushes
rallen090 Aug 10, 2021
c6c3b4f
Mark flushed shards based on block size
rallen090 Aug 17, 2021
0b28fe7
Fixup shard marking logic
rallen090 Aug 17, 2021
3bf7412
Mock
rallen090 Aug 17, 2021
dd4dd0a
Fix test
rallen090 Aug 17, 2021
4084517
Fix test TestFlushManagerNamespaceIndexingEnabled
rallen090 Aug 17, 2021
f83f864
Lint
rallen090 Aug 17, 2021
e80e383
Add RelookupAndCheckIsEmpty
rallen090 Aug 18, 2021
db170c5
Mock
rallen090 Aug 18, 2021
db0b9c1
Fix OnIndexSeries ref type
rallen090 Aug 18, 2021
b2a016c
Cleanup feedback
rallen090 Aug 18, 2021
5c5d6e0
Fixing tests 1
rallen090 Aug 18, 2021
a4e1f0a
Fixing tests 2
rallen090 Aug 18, 2021
b5b3713
Mock
rallen090 Aug 18, 2021
cdcab19
Lint
rallen090 Aug 18, 2021
cad7fcd
More fixing tests 1
rallen090 Aug 18, 2021
7070278
Lint
rallen090 Aug 18, 2021
45e9a92
Split warm flush status into data and index
rallen090 Aug 19, 2021
3e58b36
Fix tests
rallen090 Aug 19, 2021
548591f
Fixing tests
rallen090 Aug 19, 2021
c68120b
Fixing tests 2
rallen090 Aug 19, 2021
6651096
For bootstrapping just use presence of datafiles
rallen090 Aug 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 18 additions & 98 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,6 @@ const (
flushManagerIndexFlushInProgress
)

type namespaceFlushes map[string]namespaceFlush

type namespaceFlush struct {
namespace databaseNamespace
shardFlushes shardFlushes
}

type shardFlushes map[shardFlushKey]databaseShard

type shardFlushKey struct {
shardID uint32
blockStart xtime.UnixNano
}

type flushManagerMetrics struct {
isFlushing tally.Gauge
isSnapshotting tally.Gauge
Expand Down Expand Up @@ -158,8 +144,7 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error {
// will attempt to snapshot blocks w/ unflushed data which would be wasteful if
// the block is already flushable.
multiErr := xerrors.NewMultiError()
dataFlushes, err := m.dataWarmFlush(namespaces, startTime)
if err != nil {
if err := m.dataWarmFlush(namespaces, startTime); err != nil {
multiErr = multiErr.Add(err)
}

Expand All @@ -174,69 +159,26 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error {
multiErr = multiErr.Add(fmt.Errorf("error rotating commitlog in mediator tick: %v", err))
}

indexFlushes, err := m.indexFlush(namespaces)
if err != nil {
if err := m.indexFlush(namespaces); err != nil {
multiErr = multiErr.Add(err)
}

err = multiErr.FinalError()

// Mark all flushed shards as such.
// If index is not enabled, then a shard+blockStart is "flushed" if the data has been flushed.
// If index is enabled, then a shard+blockStart is "flushed" if the data AND index has been flushed.
for _, n := range namespaces {
var (
indexEnabled = n.Options().IndexOptions().Enabled()
flushed shardFlushes
)
if indexEnabled {
flushesForNs, ok := indexFlushes[n.ID().String()]
if !ok {
continue
}
flushed = flushesForNs.shardFlushes
} else {
flushesForNs, ok := dataFlushes[n.ID().String()]
if !ok {
continue
}
flushed = flushesForNs.shardFlushes
}

for k, v := range flushed {
// Block sizes for data and index can differ and so if we are driving the flushing by
// the index blockStarts, we must expand them to mark all containing data blockStarts.
// E.g. if blockSize == 2h and indexBlockSize == 4h and the flushed index time is 6:00pm,
// we should mark as flushed [6:00pm, 8:00pm].
if indexEnabled {
blockSize := n.Options().RetentionOptions().BlockSize()
indexBlockSize := n.Options().IndexOptions().BlockSize()
for start := k.blockStart; start < k.blockStart.Add(indexBlockSize); start = start.Add(blockSize) {
v.MarkWarmFlushStateSuccessOrError(start, err)
}
} else {
v.MarkWarmFlushStateSuccessOrError(k.blockStart, err)
}
}
}

return err
return multiErr.FinalError()
}

func (m *flushManager) dataWarmFlush(
namespaces []databaseNamespace,
startTime xtime.UnixNano,
) (namespaceFlushes, error) {
) error {
flushPersist, err := m.pm.StartFlushPersist()
if err != nil {
return nil, err
return err
}

m.setState(flushManagerFlushInProgress)
var (
start = m.nowFn()
multiErr = xerrors.NewMultiError()
allFlushes = make(map[string]namespaceFlush)
start = m.nowFn()
multiErr = xerrors.NewMultiError()
)
for _, ns := range namespaces {
// Flush first because we will only snapshot if there are no outstanding flushes.
Expand All @@ -245,11 +187,9 @@ func (m *flushManager) dataWarmFlush(
multiErr = multiErr.Add(err)
continue
}
flush, err := m.flushNamespaceWithTimes(ns, flushTimes, flushPersist)
if err != nil {
if err := m.flushNamespaceWithTimes(ns, flushTimes, flushPersist); err != nil {
multiErr = multiErr.Add(err)
}
allFlushes[ns.ID().String()] = flush
}

err = flushPersist.DoneFlush()
Expand All @@ -258,7 +198,7 @@ func (m *flushManager) dataWarmFlush(
}

m.metrics.dataWarmFlushDuration.Record(m.nowFn().Sub(start))
return allFlushes, multiErr.FinalError()
return multiErr.FinalError()
}

func (m *flushManager) dataSnapshot(
Expand Down Expand Up @@ -312,17 +252,16 @@ func (m *flushManager) dataSnapshot(

func (m *flushManager) indexFlush(
namespaces []databaseNamespace,
) (namespaceFlushes, error) {
) error {
indexFlush, err := m.pm.StartIndexPersist()
if err != nil {
return nil, err
return err
}

m.setState(flushManagerIndexFlushInProgress)
var (
start = m.nowFn()
multiErr = xerrors.NewMultiError()
namespaceFlushes = make(map[string]namespaceFlush)
start = m.nowFn()
multiErr = xerrors.NewMultiError()
)
for _, ns := range namespaces {
var (
Expand All @@ -333,20 +272,14 @@ func (m *flushManager) indexFlush(
continue
}

flushes, err := ns.FlushIndex(indexFlush)
if err != nil {
if err := ns.FlushIndex(indexFlush); err != nil {
multiErr = multiErr.Add(err)
} else {
namespaceFlushes[ns.ID().String()] = namespaceFlush{
namespace: ns,
shardFlushes: flushes,
}
}
}
multiErr = multiErr.Add(indexFlush.DoneIndex())

m.metrics.indexFlushDuration.Record(m.nowFn().Sub(start))
return namespaceFlushes, multiErr.FinalError()
return multiErr.FinalError()
}

func (m *flushManager) Report() {
Expand Down Expand Up @@ -430,31 +363,18 @@ func (m *flushManager) flushNamespaceWithTimes(
ns databaseNamespace,
times []xtime.UnixNano,
flushPreparer persist.FlushPreparer,
) (namespaceFlush, error) {
flushes := make(shardFlushes)
) error {
multiErr := xerrors.NewMultiError()
for _, t := range times {
// NB(xichen): we still want to proceed if a namespace fails to flush its data.
// Probably want to emit a counter here, but for now just log it.
shards, err := ns.WarmFlush(t, flushPreparer)
if err != nil {
if err := ns.WarmFlush(t, flushPreparer); err != nil {
detailedErr := fmt.Errorf("namespace %s failed to flush data: %v",
ns.ID().String(), err)
multiErr = multiErr.Add(detailedErr)
continue
}

for _, s := range shards {
flushes[shardFlushKey{
shardID: s.ID(),
blockStart: t,
}] = s
}
}
return namespaceFlush{
namespace: ns,
shardFlushes: flushes,
}, multiErr.FinalError()
return multiErr.FinalError()
}

func (m *flushManager) LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool) {
Expand Down
16 changes: 5 additions & 11 deletions src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,10 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) {
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return([]databaseShard{s1, s2}, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
s1.EXPECT().ID().Return(uint32(1)).AnyTimes()
s2.EXPECT().ID().Return(uint32(2)).AnyTimes()
s1.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil).AnyTimes()
s2.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil).AnyTimes()

var (
mockFlushPersist = persist.NewMockFlushPreparer(ctrl)
Expand Down Expand Up @@ -377,17 +375,13 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) {
// Validate that the flush state is marked as successful only AFTER all prequisite steps have been run.
// Order is important to avoid any edge case where data is GCed from memory without all flushing operations
// being completed.
mockFlushedShards := shardFlushes{
shardFlushKey{shardID: s1.ID(), blockStart: xtime.Now().Add(time.Minute * 1)}: s1,
shardFlushKey{shardID: s2.ID(), blockStart: xtime.Now().Add(time.Minute * 1)}: s2,
}
steps := make([]*gomock.Call, 0)
steps = append(steps,
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return([]databaseShard{s1, s2}, nil).Times(blocks),
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).Times(blocks),
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes(),
ns.EXPECT().FlushIndex(gomock.Any()).Return(mockFlushedShards, nil),
s1.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil),
s2.EXPECT().MarkWarmFlushStateSuccessOrError(gomock.Any(), nil),
ns.EXPECT().FlushIndex(gomock.Any()).Return(nil),
s1.EXPECT().MarkWarmIndexFlushStateSuccessOrError(gomock.Any(), nil),
s2.EXPECT().MarkWarmIndexFlushStateSuccessOrError(gomock.Any(), nil),
)
gomock.InOrder(steps...)

Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/storage/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,17 @@ const (
fileOpFailed
)

type warmStatus struct {
DataFlushed fileOpStatus
IndexFlushed fileOpStatus
}

type fileOpState struct {
// WarmStatus is the status of data persistence for WarmWrites only.
// Each block will only be warm-flushed once, so not keeping track of a
// version here is okay. This is used in the buffer Tick to determine when
// a warm bucket is evictable from memory.
WarmStatus fileOpStatus
WarmStatus warmStatus
// ColdVersionRetrievable keeps track of data persistence for ColdWrites only.
// Each block can be cold-flushed multiple times, so this tracks which
// version of the flush completed successfully. This is ultimately used in
Expand Down
61 changes: 40 additions & 21 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,15 +1007,15 @@ func (i *nsIndex) tickingBlocks(
func (i *nsIndex) WarmFlush(
flush persist.IndexFlush,
shards []databaseShard,
) (shardFlushes, error) {
) error {
if len(shards) == 0 {
// No-op if no shards currently owned.
return nil, nil
return nil
}

flushable, err := i.flushableBlocks(shards, series.WarmWrite)
if err != nil {
return nil, err
return err
}

// Determine the current flush indexing concurrency.
Expand All @@ -1029,7 +1029,7 @@ func (i *nsIndex) WarmFlush(

builder, err := builder.NewBuilderFromDocuments(builderOpts)
if err != nil {
return nil, err
return err
}
defer builder.Close()

Expand All @@ -1039,11 +1039,10 @@ func (i *nsIndex) WarmFlush(
defer i.metrics.flushIndexingConcurrency.Update(0)

var evicted int
flushes := make(shardFlushes)
for _, block := range flushable {
immutableSegments, err := i.flushBlock(flush, block, shards, builder)
if err != nil {
return nil, err
return err
}
// Make a result that covers the entire time ranges for the
// block for each shard
Expand All @@ -1060,7 +1059,7 @@ func (i *nsIndex) WarmFlush(
results := result.NewIndexBlockByVolumeType(block.StartTime())
results.SetBlock(idxpersist.DefaultIndexVolumeType, blockResult)
if err := block.AddResults(results); err != nil {
return nil, err
return err
}

evicted++
Expand All @@ -1074,18 +1073,16 @@ func (i *nsIndex) WarmFlush(
zap.Error(err),
zap.Time("blockStart", block.StartTime().ToTime()),
)
continue
}

for _, s := range shards {
flushes[shardFlushKey{
shardID: s.ID(),
blockStart: block.StartTime(),
}] = s
for _, t := range i.blockStartsFromIndexBlockStart(block.StartTime()) {
for _, s := range shards {
s.MarkWarmIndexFlushStateSuccessOrError(t, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, IIUC, blockStartsFromIndexBlockStart returns data block starts between index block start and index block start + index block size. This is important because index block size >= data block size. If that's true, why do we need to MarkWarmIndexFlushStateSuccessOrError for each data block start time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. And the reason is because MarkWarmIndexFlushStateSuccessOrError is still marking data blockStarts (not index blockStarts), but each block now just have a flag for both data and index. This is because the state we are tracking is always still at the data block size not index block size. Eg we have 1h block and 2h indexBlock. In-mem we now have:

1pm: {dataFlushed: bool, indexFlushed: bool}
2pm: {dataFlushed: bool, indexFlushed: bool}
3pm: {dataFlushed: bool, indexFlushed: bool}
4pm: {dataFlushed: bool, indexFlushed: bool}
...

where determining if a given block is eligible for GC we check both dataFlushed && indexFlushed. So when an index flush occurs, that actually in this case covers 2 blocks.

This is a somewhat confusing nuance though so if you have any suggestions are making this easier to understand let me know.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, may be worth adding a comment here just clarify

}
}
}
i.metrics.blocksEvictedMutableSegments.Inc(int64(evicted))
return flushes, nil
return nil
}

func (i *nsIndex) ColdFlush(shards []databaseShard) (OnColdFlushDone, error) {
Expand Down Expand Up @@ -1115,6 +1112,18 @@ func (i *nsIndex) ColdFlush(shards []databaseShard) (OnColdFlushDone, error) {
}, nil
}

// WarmFlushedBlockStarts returns all index blockStarts which have been flushed to disk.
func (i *nsIndex) WarmFlushedBlockStarts() []xtime.UnixNano {
flushed := make([]xtime.UnixNano, 0)
infoFiles := i.readInfoFilesAsMap()
for blockStart := range infoFiles {
if i.hasIndexWarmFlushedToDisk(infoFiles, blockStart) {
flushed = append(flushed, blockStart)
}
}
return flushed
}

func (i *nsIndex) readInfoFilesAsMap() map[xtime.UnixNano]fs.ReadIndexInfoFileResult {
fsOpts := i.opts.CommitLogOptions().FilesystemOptions()
infoFiles := i.readIndexInfoFilesFn(fs.ReadIndexInfoFilesOptions{
Expand Down Expand Up @@ -1198,18 +1207,15 @@ func (i *nsIndex) canFlushBlockWithRLock(
Debug("skipping index cold flush due to shard not bootstrapped yet")
continue
}
start := blockStart
end := blockStart.Add(i.blockSize)
dataBlockSize := i.nsMetadata.Options().RetentionOptions().BlockSize()
for t := start; t.Before(end); t = t.Add(dataBlockSize) {

for _, t := range i.blockStartsFromIndexBlockStart(blockStart) {
flushState, err := shard.FlushState(t)
if err != nil {
return false, err
}

// Skip if the data flushing failed. We mark as "success" only once both
// data and index are flushed.
if flushState.WarmStatus == fileOpFailed {
// Skip if the data flushing failed. Data flushing precedes index flushing.
if flushState.WarmStatus.DataFlushed != fileOpSuccess {
return false, nil
}
}
Expand All @@ -1218,6 +1224,19 @@ func (i *nsIndex) canFlushBlockWithRLock(
return true, nil
}

// blockStartsFromIndexBlockStart returns the possibly many blocksStarts that exist within
// a given index block (since index block size >= data block size)
func (i *nsIndex) blockStartsFromIndexBlockStart(blockStart xtime.UnixNano) []xtime.UnixNano {
start := blockStart
end := blockStart.Add(i.blockSize)
dataBlockSize := i.nsMetadata.Options().RetentionOptions().BlockSize()
blockStarts := make([]xtime.UnixNano, 0)
for t := start; t.Before(end); t = t.Add(dataBlockSize) {
blockStarts = append(blockStarts, t)
}
return blockStarts
}

func (i *nsIndex) hasIndexWarmFlushedToDisk(
infoFiles map[xtime.UnixNano]fs.ReadIndexInfoFileResult,
blockStart xtime.UnixNano,
Expand Down
Loading