Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBnode] Add Load() API to shard #1831

Merged
merged 23 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/dbnode/integration/truncate_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ func TestTruncateNamespace(t *testing.T) {

log.Sugar().Debugf("fetching data from namespace %s again", testNamespaces[0])
res, err = testSetup.fetch(fetchReq)
require.NoError(t, err)
require.Equal(t, 0, len(res))
require.Error(t, err)

log.Sugar().Debugf("fetching data from a different namespace %s", testNamespaces[1])
fetchReq.ID = "bar"
Expand Down
10 changes: 5 additions & 5 deletions src/dbnode/storage/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) {
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().ID().Return(ident.StringID(fmt.Sprintf("ns%d", i))).AnyTimes()
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
ns.EXPECT().GetOwnedShards().Return(shards).AnyTimes()
namespaces = append(namespaces, ns)
}
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestCleanupManagerNamespaceCleanup(t *testing.T) {
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().ID().Return(ident.StringID("ns")).AnyTimes()
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
ns.EXPECT().GetOwnedShards().Return(nil).AnyTimes()

idx := NewMocknamespaceIndex(ctrl)
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) {
for range namespaces {
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
namespaces = append(namespaces, ns)
}
db := newMockdatabase(ctrl, namespaces...)
Expand Down Expand Up @@ -411,7 +411,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) {
shard.EXPECT().ID().Return(uint32(0)).AnyTimes()
ns.EXPECT().GetOwnedShards().Return([]databaseShard{shard}).AnyTimes()
ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
namespaces := []databaseNamespace{ns}

db := newMockdatabase(ctrl, namespaces...)
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) {
shard.EXPECT().ID().Return(uint32(0)).AnyTimes()
ns.EXPECT().GetOwnedShards().Return([]databaseShard{shard}).AnyTimes()
ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
namespaces := []databaseNamespace{ns}

db := newMockdatabase(ctrl, namespaces...)
Expand Down
46 changes: 36 additions & 10 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ func (m *flushManager) dataWarmFlush(
multiErr := xerrors.NewMultiError()
for _, ns := range namespaces {
// Flush first because we will only snapshot if there are no outstanding flushes
flushTimes := m.namespaceFlushTimes(ns, tickStart)
flushTimes, err := m.namespaceFlushTimes(ns, tickStart)
if err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"error determining namespace flush times for ns: %s, err: %v", ns.ID().String(), err))
continue
}
shardBootstrapTimes, ok := dbBootstrapStateAtTickStart.NamespaceBootstrapStates[ns.ID().String()]
if !ok {
// Could happen if namespaces are added / removed.
Expand Down Expand Up @@ -234,7 +239,14 @@ func (m *flushManager) dataSnapshot(
multiErr = xerrors.NewMultiError()
)
for _, ns := range namespaces {
snapshotBlockStarts := m.namespaceSnapshotTimes(ns, tickStart)
snapshotBlockStarts, err := m.namespaceSnapshotTimes(ns, tickStart)
if err != nil {
detailedErr := fmt.Errorf(
"namespace %s failed to determine snapshot times: %v",
ns.ID().String(), err)
multiErr = multiErr.Add(detailedErr)
continue
}

if len(snapshotBlockStarts) > maxBlocksSnapshottedByNamespace {
maxBlocksSnapshottedByNamespace = len(snapshotBlockStarts)
Expand All @@ -244,9 +256,11 @@ func (m *flushManager) dataSnapshot(
snapshotBlockStart, tickStart, snapshotPersist)

if err != nil {
detailedErr := fmt.Errorf("namespace %s failed to snapshot data: %v",
ns.ID().String(), err)
detailedErr := fmt.Errorf(
"namespace %s failed to snapshot data for blockStart %s: %v",
ns.ID().String(), snapshotBlockStart.String(), err)
multiErr = multiErr.Add(detailedErr)
continue
}
}
}
Expand Down Expand Up @@ -327,20 +341,26 @@ func (m *flushManager) flushRange(rOpts retention.Options, t time.Time) (time.Ti
return retention.FlushTimeStart(rOpts, t), retention.FlushTimeEnd(rOpts, t)
}

func (m *flushManager) namespaceFlushTimes(ns databaseNamespace, curr time.Time) []time.Time {
func (m *flushManager) namespaceFlushTimes(ns databaseNamespace, curr time.Time) ([]time.Time, error) {
var (
rOpts = ns.Options().RetentionOptions()
blockSize = rOpts.BlockSize()
earliest, latest = m.flushRange(rOpts, curr)
)

candidateTimes := timesInRange(earliest, latest, blockSize)
var loopErr error
return filterTimes(candidateTimes, func(t time.Time) bool {
return ns.NeedsFlush(t, t)
})
needsFlush, err := ns.NeedsFlush(t, t)
if err != nil {
loopErr = err
return false
}
return needsFlush
}), loopErr
}

func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Time) []time.Time {
func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Time) ([]time.Time, error) {
var (
rOpts = ns.Options().RetentionOptions()
blockSize = rOpts.BlockSize()
Expand All @@ -356,10 +376,16 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti
)

candidateTimes := timesInRange(earliest, latest, blockSize)
var loopErr error
return filterTimes(candidateTimes, func(t time.Time) bool {
// Snapshot anything that is unflushed.
return ns.NeedsFlush(t, t)
})
needsFlush, err := ns.NeedsFlush(t, t)
if err != nil {
loopErr = err
return false
}
return needsFlush
}), loopErr
}

// flushWithTime flushes in-memory data for a given namespace, at a given
Expand Down
25 changes: 14 additions & 11 deletions src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) {
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
Expand Down Expand Up @@ -329,7 +329,7 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) {
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
Expand Down Expand Up @@ -418,8 +418,9 @@ func TestFlushManagerNamespaceFlushTimesNoNeedFlush(t *testing.T) {
fm, ns1, _, _ := newMultipleFlushManagerNeedsFlush(t, ctrl)
now := time.Now()

ns1.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false).AnyTimes()
flushTimes := fm.namespaceFlushTimes(ns1, now)
ns1.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
flushTimes, err := fm.namespaceFlushTimes(ns1, now)
require.NoError(t, err)
require.Empty(t, flushTimes)
}

Expand All @@ -430,8 +431,9 @@ func TestFlushManagerNamespaceFlushTimesAllNeedFlush(t *testing.T) {
fm, ns1, _, _ := newMultipleFlushManagerNeedsFlush(t, ctrl)
now := time.Now()

ns1.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes()
times := fm.namespaceFlushTimes(ns1, now)
ns1.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
times, err := fm.namespaceFlushTimes(ns1, now)
require.NoError(t, err)
sort.Sort(timesInOrder(times))

blockSize := ns1.Options().RetentionOptions().BlockSize()
Expand Down Expand Up @@ -462,15 +464,16 @@ func TestFlushManagerNamespaceFlushTimesSomeNeedFlush(t *testing.T) {

// skip 1/3 of input
if i%3 == 0 {
ns1.EXPECT().NeedsFlush(st, st).Return(false)
ns1.EXPECT().NeedsFlush(st, st).Return(false, nil)
continue
}

ns1.EXPECT().NeedsFlush(st, st).Return(true)
ns1.EXPECT().NeedsFlush(st, st).Return(true, nil)
expectedTimes = append(expectedTimes, st)
}

times := fm.namespaceFlushTimes(ns1, now)
times, err := fm.namespaceFlushTimes(ns1, now)
require.NoError(t, err)
require.NotEmpty(t, times)
sort.Sort(timesInOrder(times))
require.Equal(t, expectedTimes, times)
Expand All @@ -494,7 +497,7 @@ func TestFlushManagerFlushSnapshot(t *testing.T) {

for i := 0; i < num; i++ {
st := start.Add(time.Duration(i) * blockSize)
ns.EXPECT().NeedsFlush(st, st).Return(false)
ns.EXPECT().NeedsFlush(st, st).Return(false, nil)
}

ns.EXPECT().ColdFlush(gomock.Any())
Expand All @@ -503,7 +506,7 @@ func TestFlushManagerFlushSnapshot(t *testing.T) {
num = numIntervals(start, snapshotEnd, blockSize)
for i := 0; i < num; i++ {
st := start.Add(time.Duration(i) * blockSize)
ns.EXPECT().NeedsFlush(st, st).Return(true)
ns.EXPECT().NeedsFlush(st, st).Return(true, nil)
ns.EXPECT().Snapshot(st, now, gomock.Any())
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/dbnode/storage/fs_merge_with_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ func (m *fsMergeWithMem) fetchBlocks(
nsCtx namespace.Context,
) ([]xio.BlockReader, bool, error) {
startTime := blockStart.ToTime()
nextVersion := m.retriever.RetrievableBlockColdVersion(startTime) + 1
currVersion, err := m.retriever.RetrievableBlockColdVersion(startTime)
if err != nil {
return nil, false, err
}
nextVersion := currVersion + 1

blocks, err := m.shard.FetchBlocksForColdFlush(ctx, id, startTime, nextVersion, nsCtx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/fs_merge_with_mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestRead(t *testing.T) {
ctx := context.NewContext()
nsCtx := namespace.Context{}
fetchedBlocks := []xio.BlockReader{xio.BlockReader{}}
retriever.EXPECT().RetrievableBlockColdVersion(gomock.Any()).Return(version).AnyTimes()
retriever.EXPECT().RetrievableBlockColdVersion(gomock.Any()).Return(version, nil).AnyTimes()

dirtySeries := newDirtySeriesMap(dirtySeriesMapOptions{})
dirtySeriesToWrite := make(map[xtime.UnixNano]*idList)
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestForEachRemaining(t *testing.T) {
ctx := context.NewContext()
nsCtx := namespace.Context{}
fetchedBlocks := []xio.BlockReader{xio.BlockReader{}}
retriever.EXPECT().RetrievableBlockColdVersion(gomock.Any()).Return(version).AnyTimes()
retriever.EXPECT().RetrievableBlockColdVersion(gomock.Any()).Return(version, nil).AnyTimes()

dirtySeries := newDirtySeriesMap(dirtySeriesMapOptions{})
dirtySeriesToWrite := make(map[xtime.UnixNano]*idList)
Expand Down
20 changes: 14 additions & 6 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,11 @@ func (i *nsIndex) flushableBlocks(
}
flushable := make([]index.Block, 0, len(i.state.blocksByTime))
for _, block := range i.state.blocksByTime {
if !i.canFlushBlock(block, shards) {
canFlush, err := i.canFlushBlock(block, shards)
if err != nil {
return nil, err
}
if !canFlush {
continue
}
flushable = append(flushable, block)
Expand All @@ -810,25 +814,29 @@ func (i *nsIndex) flushableBlocks(
func (i *nsIndex) canFlushBlock(
block index.Block,
shards []databaseShard,
) bool {
) (bool, error) {
// Check the block needs flushing because it is sealed and has
// any mutable segments that need to be evicted from memory
if !block.IsSealed() || !block.NeedsMutableSegmentsEvicted() {
return false
return false, nil
}

// Check all data files exist for the shards we own
for _, shard := range shards {
start := block.StartTime()
dataBlockSize := i.nsMetadata.Options().RetentionOptions().BlockSize()
for t := start; t.Before(block.EndTime()); t = t.Add(dataBlockSize) {
if shard.FlushState(t).WarmStatus != fileOpSuccess {
return false
flushState, err := shard.FlushState(t)
if err != nil {
return false, err
}
if flushState.WarmStatus != fileOpSuccess {
return false, nil
}
}
}

return true
return true, nil
}

func (i *nsIndex) flushBlock(
Expand Down
16 changes: 8 additions & 8 deletions src/dbnode/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func TestNamespaceIndexFlushSuccess(t *testing.T) {

mockShard := NewMockdatabaseShard(ctrl)
mockShard.EXPECT().ID().Return(uint32(0)).AnyTimes()
mockShard.EXPECT().FlushState(blockTime).Return(fileOpState{WarmStatus: fileOpSuccess})
mockShard.EXPECT().FlushState(blockTime.Add(test.blockSize)).Return(fileOpState{WarmStatus: fileOpSuccess})
mockShard.EXPECT().FlushState(blockTime).Return(fileOpState{WarmStatus: fileOpSuccess}, nil)
mockShard.EXPECT().FlushState(blockTime.Add(test.blockSize)).Return(fileOpState{WarmStatus: fileOpSuccess}, nil)
shards := []databaseShard{mockShard}

mockFlush := persist.NewMockIndexFlush(ctrl)
Expand Down Expand Up @@ -185,8 +185,8 @@ func TestNamespaceIndexFlushShardStateNotSuccess(t *testing.T) {

mockShard := NewMockdatabaseShard(ctrl)
mockShard.EXPECT().ID().Return(uint32(0)).AnyTimes()
mockShard.EXPECT().FlushState(blockTime).Return(fileOpState{WarmStatus: fileOpSuccess})
mockShard.EXPECT().FlushState(blockTime.Add(test.blockSize)).Return(fileOpState{WarmStatus: fileOpFailed})
mockShard.EXPECT().FlushState(blockTime).Return(fileOpState{WarmStatus: fileOpSuccess}, nil)
mockShard.EXPECT().FlushState(blockTime.Add(test.blockSize)).Return(fileOpState{WarmStatus: fileOpFailed}, nil)
shards := []databaseShard{mockShard}

mockFlush := persist.NewMockIndexFlush(ctrl)
Expand Down Expand Up @@ -220,13 +220,13 @@ func TestNamespaceIndexFlushSuccessMultipleShards(t *testing.T) {

mockShard1 := NewMockdatabaseShard(ctrl)
mockShard1.EXPECT().ID().Return(uint32(0)).AnyTimes()
mockShard1.EXPECT().FlushState(blockTime).Return(fileOpState{WarmStatus: fileOpSuccess})
mockShard1.EXPECT().FlushState(blockTime.Add(test.blockSize)).Return(fileOpState{WarmStatus: fileOpSuccess})
mockShard1.EXPECT().FlushState(blockTime).Return(fileOpState{WarmStatus: fileOpSuccess}, nil)
mockShard1.EXPECT().FlushState(blockTime.Add(test.blockSize)).Return(fileOpState{WarmStatus: fileOpSuccess}, nil)

mockShard2 := NewMockdatabaseShard(ctrl)
mockShard2.EXPECT().ID().Return(uint32(1)).AnyTimes()
mockShard2.EXPECT().FlushState(blockTime).Return(fileOpState{WarmStatus: fileOpSuccess})
mockShard2.EXPECT().FlushState(blockTime.Add(test.blockSize)).Return(fileOpState{WarmStatus: fileOpSuccess})
mockShard2.EXPECT().FlushState(blockTime).Return(fileOpState{WarmStatus: fileOpSuccess}, nil)
mockShard2.EXPECT().FlushState(blockTime.Add(test.blockSize)).Return(fileOpState{WarmStatus: fileOpSuccess}, nil)

shards := []databaseShard{mockShard1, mockShard2}

Expand Down
Loading