Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race/panic in index query code #1356

Merged
merged 1 commit into from
Feb 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,15 +903,17 @@ func (i *nsIndex) Query(
defer func() {
// Ensure that during early error returns we let any aborted
// goroutines know not to try to modify/edit the result any longer.
results.Lock()
results.returned = true
results.Unlock()
}()

execBlockQuery := func(block index.Block) {
blockResults := i.resultsPool.Get()
blockResults.Reset(i.nsMetadata.ID())

blockExhaustive, err := block.Query(query, opts, blockResults)
if err != index.ErrUnableToQueryBlockClosed {
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
Expand Down Expand Up @@ -1068,10 +1070,10 @@ func (i *nsIndex) Query(
// lock/unlock cleanup to not deadlock with this locked code block.
exhaustive := results.exhaustive
mergedResults := results.merged
errResults := results.multiErr.FinalError()
err = results.multiErr.FinalError()
results.Unlock()

if errResults != nil {
if err != nil {
return index.QueryResults{}, err
}

Expand Down
72 changes: 52 additions & 20 deletions src/dbnode/storage/index_query_concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package storage

import (
"errors"
"fmt"
"sync"
"testing"
Expand All @@ -38,7 +39,6 @@ import (

"github.com/fortytw2/leaktest"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -64,15 +64,29 @@ func TestNamespaceIndexHighConcurrentQueriesWithTimeoutsAndForceTimeout(t *testi
})
}

func TestNamespaceIndexHighConcurrentQueriesWithBlockErrors(t *testing.T) {
testNamespaceIndexHighConcurrentQueries(t,
testNamespaceIndexHighConcurrentQueriesOptions{
withTimeouts: false,
forceTimeouts: false,
blockErrors: true,
})
}

type testNamespaceIndexHighConcurrentQueriesOptions struct {
withTimeouts bool
forceTimeouts bool
blockErrors bool
}

func testNamespaceIndexHighConcurrentQueries(
t *testing.T,
opts testNamespaceIndexHighConcurrentQueriesOptions,
) {
if opts.forceTimeouts && opts.blockErrors {
t.Fatalf("force timeout and block errors cannot both be enabled")
}

ctrl := gomock.NewController(xtest.Reporter{t})
defer ctrl.Finish()

Expand Down Expand Up @@ -184,10 +198,10 @@ func testNamespaceIndexHighConcurrentQueries(
onIndexWg.Wait()
}

// If force timeout, replace one of the blocks with a mock
// block that times out.
// If force timeout or block errors are enabled, replace one of the blocks
// with a mock block that times out or returns an error respectively.
var timeoutWg, timedOutQueriesWg sync.WaitGroup
if opts.forceTimeouts {
if opts.forceTimeouts || opts.blockErrors {
// Need to restore now as timeouts are measured by looking at time.Now
restoreNow()

Expand All @@ -196,6 +210,7 @@ func testNamespaceIndexHighConcurrentQueries(
for start, block := range nsIdx.state.blocksByTime {
block := block // Capture for lambda
mockBlock := index.NewMockBlock(ctrl)

mockBlock.EXPECT().
StartTime().
DoAndReturn(func() time.Time { return block.StartTime() }).
Expand All @@ -204,13 +219,24 @@ func testNamespaceIndexHighConcurrentQueries(
EndTime().
DoAndReturn(func() time.Time { return block.EndTime() }).
AnyTimes()
mockBlock.EXPECT().
Query(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(q index.Query, opts index.QueryOptions, r index.Results) (bool, error) {
timeoutWg.Wait()
return block.Query(q, opts, r)
}).
AnyTimes()

if opts.blockErrors {
mockBlock.EXPECT().
Query(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(q index.Query, opts index.QueryOptions, r index.Results) (bool, error) {
return false, errors.New("some-error")
}).
AnyTimes()
} else {
mockBlock.EXPECT().
Query(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(q index.Query, opts index.QueryOptions, r index.Results) (bool, error) {
timeoutWg.Wait()
return block.Query(q, opts, r)
}).
AnyTimes()
}

mockBlock.EXPECT().
Stats(gomock.Any()).
Return(nil).
Expand Down Expand Up @@ -248,6 +274,8 @@ func testNamespaceIndexHighConcurrentQueries(
rangeEnd := blockStarts[k].Add(test.indexBlockSize)

ctx := context.NewContext()
defer ctx.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really shouldn't defer from within a for loop, but.. ok haha.


if opts.forceTimeouts {
// For the force timeout tests we just want to spin up the
// contexts for timeouts.
Expand All @@ -262,7 +290,7 @@ func testNamespaceIndexHighConcurrentQueries(
StartInclusive: rangeStart,
EndExclusive: rangeEnd,
})
assert.Error(t, err)
require.Error(t, err)
timedOutQueriesWg.Done()
}()
continue
Expand All @@ -274,33 +302,37 @@ func testNamespaceIndexHighConcurrentQueries(
StartInclusive: rangeStart,
EndExclusive: rangeEnd,
})
assert.NoError(t, err)

if opts.blockErrors {
require.Error(t, err)
// Early return because we don't want to check the results.
return
} else {
require.NoError(t, err)
}

// Read the results concurrently too
hits := make(map[string]struct{}, results.Results.Size())
for _, entry := range results.Results.Map().Iter() {
id := entry.Key().String()

doc, err := convert.FromMetricNoClone(entry.Key(), entry.Value())
assert.NoError(t, err)
require.NoError(t, err)
if err != nil {
continue // this will fail the test anyway, but don't want to panic
}

expectedDoc, ok := expectedResults[id]
assert.True(t, ok)
require.True(t, ok)
if !ok {
continue // this will fail the test anyway, but don't want to panic
}

assert.Equal(t, expectedDoc, doc)
require.Equal(t, expectedDoc, doc)
hits[id] = struct{}{}
}
expectedHits := idsPerBlock * (k + 1)
assert.Equal(t, expectedHits, len(hits))

// Now safe to close the context after reading results
ctx.Close()
require.Equal(t, expectedHits, len(hits))
}
}()
}
Expand Down