Skip to content

Commit

Permalink
Fix active block test
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 committed Aug 26, 2021
1 parent 5131acd commit ad10d33
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/dbnode/integration/index_active_block_rotate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,17 @@ func TestIndexActiveBlockRotate(t *testing.T) {

expectedNumGCSeries := prevNumGCSeries + numWrites - minCompactSize
gcSeries := xclock.WaitUntil(func() bool {
// Run background compaction path to check that this path correctly
// identifies these series as "empty" post the warm flush above.
// Note: typically this path gets called from just WriteBatch calls but
// for this test we just explcitly invoke the background compact path.
for _, ns := range testSetup.DB().Namespaces() {
idx, err := ns.Index()
require.NoError(t, err)

idx.BackgroundCompact()
}

numGCSeries := counterValue(t, scope, metricGCSeries)
return numGCSeries >= expectedNumGCSeries
}, defaultTimeout)
Expand Down
10 changes: 10 additions & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,16 @@ func (i *nsIndex) WarmFlushBlockStarts() []xtime.UnixNano {
return flushed
}

// BackgroundCompact background compacts eligible segments.
func (i *nsIndex) BackgroundCompact() {
if i.activeBlock != nil {
i.activeBlock.BackgroundCompact()
}
for _, b := range i.state.blocksByTime {
b.BackgroundCompact()
}
}

func (i *nsIndex) readInfoFilesAsMap() map[xtime.UnixNano][]fs.ReadIndexInfoFileResult {
fsOpts := i.opts.CommitLogOptions().FilesystemOptions()
infoFiles := i.readIndexInfoFilesFn(fs.ReadIndexInfoFilesOptions{
Expand Down
5 changes: 5 additions & 0 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ func (b *block) EndTime() xtime.UnixNano {
return b.blockEnd
}

// BackgroundCompact background compacts eligible segments.
func (b *block) BackgroundCompact() {
b.mutableSegments.BackgroundCompact()
}

func (b *block) WriteBatch(inserts *WriteBatch) (WriteBatchResult, error) {
b.RLock()
if !b.writesAcceptedWithRLock() {
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/storage/index/mutable_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ func (m *mutableSegments) maybeBackgroundCompactWithLock() {
m.backgroundCompactWithLock()
}

// BackgroundCompact background compacts eligible segments.
func (m *mutableSegments) BackgroundCompact() {
m.Lock()
defer m.Unlock()

m.backgroundCompactWithLock()
}

func (m *mutableSegments) backgroundCompactWithLock() {
// Create a logical plan.
segs := make([]compaction.Segment, 0, len(m.backgroundSegments))
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ type Block interface {
// MemorySegmentsData returns all in memory segments data.
MemorySegmentsData(ctx context.Context) ([]fst.SegmentData, error)

// BackgroundCompact background compacts eligible segments.
BackgroundCompact()

// Close will release any held resources and close the Block.
Close() error
}
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,9 @@ type NamespaceIndex interface {
// DebugMemorySegments allows for debugging memory segments.
DebugMemorySegments(opts DebugMemorySegmentsOptions) error

// BackgroundCompact background compacts eligible segments.
BackgroundCompact()

// Close will release the index resources and close the index.
Close() error
}
Expand Down

0 comments on commit ad10d33

Please sign in to comment.