From 1ca88a8889d68046b1afd0d97ada1a064154f992 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 7 Feb 2019 10:52:44 -0500 Subject: [PATCH] fix panic + add lock + add regression test --- src/dbnode/storage/index.go | 8 ++- .../storage/index_query_concurrent_test.go | 72 +++++++++++++------ 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 83810bb913..83298375b8 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -903,7 +903,9 @@ func (i *nsIndex) Query( defer func() { // Ensure that during early error returns we let any aborted // goroutines know not to try to modify/edit the result any longer. + results.Lock() results.returned = true + results.Unlock() }() execBlockQuery := func(block index.Block) { @@ -911,7 +913,7 @@ func (i *nsIndex) Query( blockResults.Reset(i.nsMetadata.ID()) blockExhaustive, err := block.Query(query, opts, blockResults) - if err != index.ErrUnableToQueryBlockClosed { + if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in // that case those results are no longer considered valid and outside of @@ -1068,10 +1070,10 @@ func (i *nsIndex) Query( // lock/unlock cleanup to not deadlock with this locked code block. exhaustive := results.exhaustive mergedResults := results.merged - errResults := results.multiErr.FinalError() + err = results.multiErr.FinalError() results.Unlock() - if errResults != nil { + if err != nil { return index.QueryResults{}, err } diff --git a/src/dbnode/storage/index_query_concurrent_test.go b/src/dbnode/storage/index_query_concurrent_test.go index fac9060f79..b597018866 100644 --- a/src/dbnode/storage/index_query_concurrent_test.go +++ b/src/dbnode/storage/index_query_concurrent_test.go @@ -23,6 +23,7 @@ package storage import ( + "errors" "fmt" "sync" "testing" @@ -38,7 +39,6 @@ import ( "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -64,15 +64,29 @@ func TestNamespaceIndexHighConcurrentQueriesWithTimeoutsAndForceTimeout(t *testi }) } +func TestNamespaceIndexHighConcurrentQueriesWithBlockErrors(t *testing.T) { + testNamespaceIndexHighConcurrentQueries(t, + testNamespaceIndexHighConcurrentQueriesOptions{ + withTimeouts: false, + forceTimeouts: false, + blockErrors: true, + }) +} + type testNamespaceIndexHighConcurrentQueriesOptions struct { withTimeouts bool forceTimeouts bool + blockErrors bool } func testNamespaceIndexHighConcurrentQueries( t *testing.T, opts testNamespaceIndexHighConcurrentQueriesOptions, ) { + if opts.forceTimeouts && opts.blockErrors { + t.Fatalf("force timeout and block errors cannot both be enabled") + } + ctrl := gomock.NewController(xtest.Reporter{t}) defer ctrl.Finish() @@ -184,10 +198,10 @@ func testNamespaceIndexHighConcurrentQueries( onIndexWg.Wait() } - // If force timeout, replace one of the blocks with a mock - // block that times out. + // If force timeout or block errors are enabled, replace one of the blocks + // with a mock block that times out or returns an error respectively. var timeoutWg, timedOutQueriesWg sync.WaitGroup - if opts.forceTimeouts { + if opts.forceTimeouts || opts.blockErrors { // Need to restore now as timeouts are measured by looking at time.Now restoreNow() @@ -196,6 +210,7 @@ func testNamespaceIndexHighConcurrentQueries( for start, block := range nsIdx.state.blocksByTime { block := block // Capture for lambda mockBlock := index.NewMockBlock(ctrl) + mockBlock.EXPECT(). StartTime(). DoAndReturn(func() time.Time { return block.StartTime() }). @@ -204,13 +219,24 @@ func testNamespaceIndexHighConcurrentQueries( EndTime(). DoAndReturn(func() time.Time { return block.EndTime() }). AnyTimes() - mockBlock.EXPECT(). - Query(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(q index.Query, opts index.QueryOptions, r index.Results) (bool, error) { - timeoutWg.Wait() - return block.Query(q, opts, r) - }). - AnyTimes() + + if opts.blockErrors { + mockBlock.EXPECT(). + Query(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(q index.Query, opts index.QueryOptions, r index.Results) (bool, error) { + return false, errors.New("some-error") + }). + AnyTimes() + } else { + mockBlock.EXPECT(). + Query(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(q index.Query, opts index.QueryOptions, r index.Results) (bool, error) { + timeoutWg.Wait() + return block.Query(q, opts, r) + }). + AnyTimes() + } + mockBlock.EXPECT(). Stats(gomock.Any()). Return(nil). @@ -248,6 +274,8 @@ func testNamespaceIndexHighConcurrentQueries( rangeEnd := blockStarts[k].Add(test.indexBlockSize) ctx := context.NewContext() + defer ctx.Close() + if opts.forceTimeouts { // For the force timeout tests we just want to spin up the // contexts for timeouts. @@ -262,7 +290,7 @@ func testNamespaceIndexHighConcurrentQueries( StartInclusive: rangeStart, EndExclusive: rangeEnd, }) - assert.Error(t, err) + require.Error(t, err) timedOutQueriesWg.Done() }() continue @@ -274,7 +302,14 @@ func testNamespaceIndexHighConcurrentQueries( StartInclusive: rangeStart, EndExclusive: rangeEnd, }) - assert.NoError(t, err) + + if opts.blockErrors { + require.Error(t, err) + // Early return because we don't want to check the results. + return + } else { + require.NoError(t, err) + } // Read the results concurrently too hits := make(map[string]struct{}, results.Results.Size()) @@ -282,25 +317,22 @@ func testNamespaceIndexHighConcurrentQueries( id := entry.Key().String() doc, err := convert.FromMetricNoClone(entry.Key(), entry.Value()) - assert.NoError(t, err) + require.NoError(t, err) if err != nil { continue // this will fail the test anyway, but don't want to panic } expectedDoc, ok := expectedResults[id] - assert.True(t, ok) + require.True(t, ok) if !ok { continue // this will fail the test anyway, but don't want to panic } - assert.Equal(t, expectedDoc, doc) + require.Equal(t, expectedDoc, doc) hits[id] = struct{}{} } expectedHits := idsPerBlock * (k + 1) - assert.Equal(t, expectedHits, len(hits)) - - // Now safe to close the context after reading results - ctx.Close() + require.Equal(t, expectedHits, len(hits)) } }() }