From d5f3f31337024a312106f5c0a3d3be893f5b2838 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 30 Jul 2021 06:59:24 -0400 Subject: [PATCH 01/19] [query] Add Graphite find integration test that verifies complex trees return valid results --- src/dbnode/integration/graphite_find_test.go | 381 +++++++++++++++++++ src/dbnode/integration/setup.go | 100 ++++- src/query/server/query.go | 17 +- src/query/stores/m3db/async_session.go | 4 +- src/x/test/diff.go | 18 +- src/x/test/util.go | 22 ++ 6 files changed, 526 insertions(+), 16 deletions(-) create mode 100644 src/dbnode/integration/graphite_find_test.go diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go new file mode 100644 index 0000000000..ee6b21bf44 --- /dev/null +++ b/src/dbnode/integration/graphite_find_test.go @@ -0,0 +1,381 @@ +// +build integration + +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "encoding/json" + "fmt" + "math/rand" + "net/http" + "net/url" + "reflect" + "runtime" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + graphitehandler "github.com/m3db/m3/src/query/api/v1/handler/graphite" + "github.com/m3db/m3/src/query/graphite/graphite" + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/net/http" + xsync "github.com/m3db/m3/src/x/sync" + xtest "github.com/m3db/m3/src/x/test" +) + +func TestGraphiteFind(tt *testing.T) { + if testing.Short() { + tt.SkipNow() // Just skip if we're doing a short run + } + + // Make sure that parallel assertions fail test immediately + // by using a TestingT that panics when FailNow is called. + t := xtest.FailNowPanicsTestingT(tt) + + const queryConfigYAML = ` +listenAddress: 127.0.0.1:7201 + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: "127.0.0.1:0" + sanitization: prometheus + samplingRate: 1.0 + +local: + namespaces: + - namespace: testns + type: unaggregated + retention: 12h +` + + var ( + blockSize = 2 * time.Hour + retentionPeriod = 6 * blockSize + rOpts = retention.NewOptions(). + SetRetentionPeriod(retentionPeriod). + SetBlockSize(blockSize) + idxOpts = namespace.NewIndexOptions(). + SetEnabled(true). + SetBlockSize(2 * blockSize) + nOpts = namespace.NewOptions(). + SetRetentionOptions(rOpts). + SetIndexOptions(idxOpts) + ) + ns, err := namespace.NewMetadata(ident.StringID("testns"), nOpts) + require.NoError(t, err) + + opts := NewTestOptions(tt). + SetNamespaces([]namespace.Metadata{ns}) + + // Test setup. + setup, err := NewTestSetup(tt, opts, nil) + require.NoError(t, err) + defer setup.Close() + + log := setup.StorageOpts().InstrumentOptions().Logger(). + With(zap.String("ns", ns.ID().String())) + + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + WithFileSystem: true, + })) + + // Write test data. + now := setup.NowFn()() + + // Create graphite node tree for tests. + var ( + levels = 5 + entriesPerLevelMin = 6 + entriesPerLevelMax = 9 + randConstSeedSrc = rand.NewSource(123456789) + randGen = rand.New(randConstSeedSrc) + rootNode = &graphiteNode{} + buildNodes func(node *graphiteNode, level int) + generateSeries []generate.Series + ) + buildNodes = func(node *graphiteNode, level int) { + entries := entriesPerLevelMin + + randGen.Intn(entriesPerLevelMax-entriesPerLevelMin) + for entry := 0; entry < entries; entry++ { + name := fmt.Sprintf("lvl%02d_entry%02d", level, entry) + + // Create a directory node and spawn more underneath. + if nextLevel := level + 1; nextLevel <= levels { + childDir := node.child(name+"_dir", graphiteNodeChildOptions{ + isLeaf: false, + }) + buildNodes(childDir, nextLevel) + } + + // Create a leaf node. + childLeaf := node.child(name+"_leaf", graphiteNodeChildOptions{ + isLeaf: true, + }) + + // Create series to generate data for the leaf node. + tags := make([]ident.Tag, 0, len(childLeaf.pathParts)) + for i, pathPartValue := range childLeaf.pathParts { + tags = append(tags, ident.Tag{ + Name: graphite.TagNameID(i), + Value: ident.StringID(pathPartValue), + }) + } + series := generate.Series{ + ID: ident.StringID(strings.Join(childLeaf.pathParts, ".")), + Tags: ident.NewTags(tags...), + } + generateSeries = append(generateSeries, series) + } + } + + // Build tree. + log.Info("building graphite data set series") + buildNodes(rootNode, 0) + + // Generate and write test data. + log.Info("generating graphite data set datapoints", + zap.Int("seriesSize", len(generateSeries))) + generateBlocks := make([]generate.BlockConfig, 0, len(generateSeries)) + for _, series := range generateSeries { + generateBlocks = append(generateBlocks, []generate.BlockConfig{ + { + IDs: []string{series.ID.String()}, + Tags: series.Tags, + NumPoints: 1, + Start: now.Add(-1 * blockSize), + }, + { + IDs: []string{series.ID.String()}, + Tags: series.Tags, + NumPoints: 1, + Start: now, + }, + }...) + } + seriesMaps := generate.BlocksByStart(generateBlocks) + log.Info("writing graphite data set to disk", + zap.Int("seriesMapSize", len(seriesMaps))) + require.NoError(t, writeTestDataToDisk(ns, setup, seriesMaps, 0)) + + // Start the server with filesystem bootstrapper. + log.Info("starting server") + require.NoError(t, setup.StartServer()) + log.Info("server is now up") + + // Stop the server. + defer func() { + require.NoError(t, setup.StopServer()) + log.Info("server is now down") + }() + + // Start the query server + log.Info("starting query server") + require.NoError(t, setup.StartQuery(queryConfigYAML)) + log.Info("started query server", zap.String("addr", setup.QueryAddress())) + + // Stop the query server. + defer func() { + require.NoError(t, setup.StopQuery()) + log.Info("query server is now down") + }() + + // Check each level of the tree can answer expected queries. + var ( + verifyFindQueries func(node *graphiteNode, level int) + parallelVerifyFindQueries func(node *graphiteNode, level int) + checkedSeriesAbort = atomic.NewBool(false) + numSeriesChecking = uint64(len(generateSeries)) + checkedSeriesLogEvery = numSeriesChecking / 10 + checkedSeries = atomic.NewUint64(0) + checkedSeriesLog = atomic.NewUint64(0) + // Use custom http client for higher number of max idle conns. + httpClient = xhttp.NewHTTPClient(xhttp.DefaultHTTPClientOptions()) + wg sync.WaitGroup + workerConcurrency = runtime.NumCPU() + workerPool = xsync.NewWorkerPool(workerConcurrency) + ) + workerPool.Init() + parallelVerifyFindQueries = func(node *graphiteNode, level int) { + wg.Add(1) + workerPool.Go(func() { + verifyFindQueries(node, level) + wg.Done() + }) + + // Verify children of children. + for _, child := range node.children { + parallelVerifyFindQueries(child, level+1) + } + } + verifyFindQueries = func(node *graphiteNode, level int) { + if checkedSeriesAbort.Load() { + // Do not execute if aborted. + return + } + + // Write progress report if progress made. + checked := checkedSeries.Load() + nextLog := checked - (checked % checkedSeriesLogEvery) + if lastLog := checkedSeriesLog.Swap(nextLog); lastLog < nextLog { + log.Info("checked series progressing", zap.Int("checked", int(checked))) + } + + // Verify at depth. + numPathParts := len(node.pathParts) + queryPathParts := make([]string, 0, 1+numPathParts) + if numPathParts > 0 { + queryPathParts = append(queryPathParts, node.pathParts...) + } + queryPathParts = append(queryPathParts, "*") + query := strings.Join(queryPathParts, ".") + + params := make(url.Values) + params.Set("query", query) + + url := fmt.Sprintf("http://%s%s?%s", setup.QueryAddress(), + graphitehandler.FindURL, params.Encode()) + + req, err := http.NewRequest(http.MethodGet, url, nil) + require.NoError(t, err) + + res, err := httpClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, res.StatusCode) + + defer res.Body.Close() + + // Compare results. + var actual graphiteFindResults + require.NoError(t, json.NewDecoder(res.Body).Decode(&actual)) + + expected := make(graphiteFindResults, 0, len(node.children)) + leaves := 0 + for _, child := range node.children { + leaf := 0 + if child.isLeaf { + leaf = 1 + leaves++ + } + expected = append(expected, graphiteFindResult{ + Text: child.name, + Leaf: leaf, + }) + } + + sortGraphiteFindResults(actual) + sortGraphiteFindResults(expected) + + failMsg := fmt.Sprintf("invalid results: level=%d, parts=%d, query=%s", + level, len(node.pathParts), query) + failMsg += fmt.Sprintf("\n\ndiff:\n%s\n\n", + xtest.Diff(xtest.MustPrettyJSONObject(t, expected), + xtest.MustPrettyJSONObject(t, actual))) + if !reflect.DeepEqual(expected, actual) { + // Bail parallel execution (failed require/assert won't stop execution). + if checkedSeriesAbort.CAS(false, true) { + // Assert an error result and log once. + assert.Equal(t, expected, actual, failMsg) + log.Error("aborting checks") + } + return + } + + // Account for series checked (for progress report). + checkedSeries.Add(uint64(leaves)) + } + + // Check all top level entries and recurse. + log.Info("checking series", + zap.Int("workerConcurrency", workerConcurrency), + zap.Uint64("numSeriesChecking", numSeriesChecking)) + parallelVerifyFindQueries(rootNode, 0) + + // Wait for execution. + wg.Wait() + + // Allow for debugging by issuing queries, etc. + if DebugTest() { + log.Info("debug test set, pausing for investigate") + <-make(chan struct{}) + } +} + +type graphiteFindResults []graphiteFindResult + +type graphiteFindResult struct { + Text string `json:"text"` + Leaf int `json:"leaf"` +} + +func sortGraphiteFindResults(r graphiteFindResults) { + sort.Slice(r, func(i, j int) bool { + if r[i].Leaf != r[j].Leaf { + return r[i].Leaf < r[j].Leaf + } + return r[i].Text < r[j].Text + }) +} + +type graphiteNode struct { + name string + pathParts []string + isLeaf bool + children []*graphiteNode +} + +type graphiteNodeChildOptions struct { + isLeaf bool +} + +func (n *graphiteNode) child( + name string, + opts graphiteNodeChildOptions, +) *graphiteNode { + pathParts := append(make([]string, 0, 1+len(n.pathParts)), n.pathParts...) + pathParts = append(pathParts, name) + + child := &graphiteNode{ + name: name, + pathParts: pathParts, + isLeaf: opts.isLeaf, + } + + n.children = append(n.children, child) + + return child +} diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 9c3a3c1360..0c4efe1520 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -25,6 +25,7 @@ import ( "flag" "fmt" "io/ioutil" + "net" "os" "os/exec" "strings" @@ -32,8 +33,17 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/tchannel-go" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" + queryconfig "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/integration/fake" @@ -58,7 +68,9 @@ import ( "github.com/m3db/m3/src/dbnode/testdata/prototest" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" + queryserver "github.com/m3db/m3/src/query/server" "github.com/m3db/m3/src/x/clock" + xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xsync "github.com/m3db/m3/src/x/sync" @@ -127,6 +139,7 @@ type testSetup struct { m3dbAdminClient client.AdminClient m3dbVerificationAdminClient client.AdminClient workerPool xsync.WorkerPool + queryAddress string // compare expected with actual data function assertEqual assertTestDataEqual @@ -137,8 +150,10 @@ type testSetup struct { namespaces []namespace.Metadata // signals - doneCh chan struct{} - closedCh chan struct{} + doneCh chan struct{} + closedCh chan struct{} + queryInterruptCh chan error + queryDoneCh chan struct{} } type xNowFn func() xtime.UnixNano @@ -170,6 +185,9 @@ type TestSetup interface { StopServerAndVerifyOpenFilesAreClosed() error StartServer() error StartServerDontWaitBootstrap() error + StopQuery() error + StartQuery(configYAML string) error + QueryAddress() string NowFn() xNowFn ClockNowFn() clock.NowFn SetNowFn(xtime.UnixNano) @@ -843,6 +861,66 @@ func openFiles(parentDir string) []string { return strings.Split(string(out), "\n") } +func (ts *testSetup) StartQuery(configYAML string) error { + m3dbClient := ts.m3dbClient + if m3dbClient == nil { + return fmt.Errorf("dbnode admin client not set") + } + + configFile, close := newTestFile(ts.t, "config.yaml", configYAML) + defer close() + + var cfg queryconfig.Configuration + err := xconfig.LoadFile(&cfg, configFile.Name(), xconfig.Options{}) + if err != nil { + return err + } + + dbClientCh := make(chan client.Client, 1) + dbClientCh <- m3dbClient + clusterClientCh := make(chan clusterclient.Client, 1) + listenerCh := make(chan net.Listener, 1) + localSessionReadyCh := make(chan struct{}, 1) + + ts.queryInterruptCh = make(chan error, 1) + ts.queryDoneCh = make(chan struct{}, 1) + + go func() { + queryserver.Run(queryserver.RunOptions{ + Config: cfg, + InterruptCh: ts.queryInterruptCh, + ListenerCh: listenerCh, + LocalSessionReadyCh: localSessionReadyCh, + DBClient: dbClientCh, + ClusterClient: clusterClientCh, + }) + ts.queryDoneCh <- struct{}{} + }() + + // Wait for local session to connect. + <-localSessionReadyCh + + // Wait for listener. + listener := <-listenerCh + ts.queryAddress = listener.Addr().String() + + return nil +} + +func (ts *testSetup) StopQuery() error { + // Send interrupt. + ts.queryInterruptCh <- fmt.Errorf("interrupt") + + // Wait for done. + <-ts.queryDoneCh + + return nil +} + +func (ts *testSetup) QueryAddress() string { + return ts.queryAddress +} + func (ts *testSetup) TChannelClient() *TestTChannelClient { return ts.tchannelClient } @@ -1189,3 +1267,21 @@ func mustInspectFilesystem(fsOpts fs.Options) fs.Inspection { return inspection } + +func newTestFile(t *testing.T, fileName, contents string) (*os.File, closeFn) { + tmpFile, err := ioutil.TempFile("", fileName) + require.NoError(t, err) + + _, err = tmpFile.Write([]byte(contents)) + require.NoError(t, err) + + return tmpFile, func() { + assert.NoError(t, tmpFile.Close()) + assert.NoError(t, os.Remove(tmpFile.Name())) + } +} + +// DebugTest allows testing to see if a standard debug test env var is set. +func DebugTest() bool { + return os.Getenv("DEBUG_TEST") == "true" +} diff --git a/src/query/server/query.go b/src/query/server/query.go index bb4487b31f..f4a36a9362 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -136,6 +136,10 @@ type RunOptions struct { // ready signal once it is open. DownsamplerReadyCh chan<- struct{} + // LocalSessionReadyCh is a programmatic channel to receive the + // local DB session ready signal once it is open. + LocalSessionReadyCh chan struct{} + // InstrumentOptionsReadyCh is a programmatic channel to receive a set of // instrument options and metric reporters that is delivered when // constructed. @@ -470,8 +474,8 @@ func Run(runOpts RunOptions) RunResult { case "", config.M3DBStorageType: // For m3db backend, we need to make connections to the m3db cluster // which generates a session and use the storage with the session. - m3dbClusters, m3dbPoolWrapper, err = initClusters(cfg, runOpts.DBConfig, - clusterNamespacesWatcher, runOpts.DBClient, encodingOpts, + m3dbClusters, clusterNamespacesWatcher, m3dbPoolWrapper, err = initClusters(cfg, + runOpts.DBConfig, runOpts.DBClient, encodingOpts, runOpts.LocalSessionReadyCh, instrumentOptions, tsdbOpts.CustomAdminOptions()) if err != nil { logger.Fatal("unable to init clusters", zap.Error(err)) @@ -968,6 +972,7 @@ func initClusters( clusterNamespacesWatcher m3.ClusterNamespacesWatcher, dbClientCh <-chan client.Client, encodingOpts encoding.Options, + localSessionReadyCh chan struct{}, instrumentOpts instrument.Options, customAdminOptions []client.CustomAdminOption, ) (m3.Clusters, *pools.PoolWrapper, error) { @@ -1018,10 +1023,12 @@ func initClusters( return nil, nil, errors.New("no clusters configured and not running local cluster") } - sessionInitChan := make(chan struct{}) + if localSessionReadyCh == nil { + localSessionReadyCh = make(chan struct{}) + } session := m3db.NewAsyncSession(func() (client.Client, error) { return <-dbClientCh, nil - }, sessionInitChan) + }, localSessionReadyCh) clusterStaticConfig := m3.ClusterStaticConfiguration{ Namespaces: localCfg.Namespaces, @@ -1065,7 +1072,7 @@ func initClusters( poolWrapper = pools.NewAsyncPoolsWrapper() go func() { - <-sessionInitChan + <-localSessionReadyCh poolWrapper.Init(session.IteratorPools()) }() } diff --git a/src/query/stores/m3db/async_session.go b/src/query/stores/m3db/async_session.go index 4c3fbf4bf0..316f0f7d22 100644 --- a/src/query/stores/m3db/async_session.go +++ b/src/query/stores/m3db/async_session.go @@ -64,9 +64,7 @@ func NewAsyncSession(fn NewClientFn, done chan<- struct{}) *AsyncSession { go func() { if asyncSession.done != nil { - defer func() { - asyncSession.done <- struct{}{} - }() + defer close(asyncSession.done) } c, err := fn() diff --git a/src/x/test/diff.go b/src/x/test/diff.go index 4cf1309d9d..2ff4833fa0 100644 --- a/src/x/test/diff.go +++ b/src/x/test/diff.go @@ -22,12 +22,11 @@ package test import ( "encoding/json" - "testing" - - xjson "github.com/m3db/m3/src/x/json" "github.com/sergi/go-diff/diffmatchpatch" "github.com/stretchr/testify/require" + + xjson "github.com/m3db/m3/src/x/json" ) // Diff is a helper method to print a terminal pretty diff of two strings @@ -39,21 +38,28 @@ func Diff(expected, actual string) string { } // MustPrettyJSONMap returns an indented JSON string of the object. -func MustPrettyJSONMap(t *testing.T, value xjson.Map) string { +func MustPrettyJSONMap(t require.TestingT, value xjson.Map) string { pretty, err := json.MarshalIndent(value, "", " ") require.NoError(t, err) return string(pretty) } // MustPrettyJSONArray returns an indented JSON string of the object. -func MustPrettyJSONArray(t *testing.T, value xjson.Array) string { +func MustPrettyJSONArray(t require.TestingT, value xjson.Array) string { + pretty, err := json.MarshalIndent(value, "", " ") + require.NoError(t, err) + return string(pretty) +} + +// MustPrettyJSONObject returns an indented JSON string of the object. +func MustPrettyJSONObject(t require.TestingT, value interface{}) string { pretty, err := json.MarshalIndent(value, "", " ") require.NoError(t, err) return string(pretty) } // MustPrettyJSONString returns an indented version of the JSON. -func MustPrettyJSONString(t *testing.T, str string) string { +func MustPrettyJSONString(t require.TestingT, str string) string { var unmarshalled map[string]interface{} err := json.Unmarshal([]byte(str), &unmarshalled) require.NoError(t, err) diff --git a/src/x/test/util.go b/src/x/test/util.go index f10f1cb5d6..8ee893eb2c 100644 --- a/src/x/test/util.go +++ b/src/x/test/util.go @@ -22,9 +22,31 @@ package test import ( "reflect" + "testing" "unsafe" + + "github.com/stretchr/testify/require" ) +// FailNowPanicsTestingT returns a TestingT that panics on a failed assertion. +// This is useful for aborting a test on failed assertions from an asynchronous +// goroutine (since stretchr calls FailNow on testing.T but it will not abort +// the test unless the goroutine is the one running the benchmark or test). +// For more info see: https://github.com/stretchr/testify/issues/652. +func FailNowPanicsTestingT(t *testing.T) require.TestingT { + return failNowPanicsTestingT{TestingT: t} +} + +var _ require.TestingT = failNowPanicsTestingT{} + +type failNowPanicsTestingT struct { + require.TestingT +} + +func (t failNowPanicsTestingT) FailNow() { + panic("failed assertion") +} + // ByteSlicesBackedBySameData returns a bool indicating if the raw backing bytes // under the []byte slice point to the same memory. func ByteSlicesBackedBySameData(a, b []byte) bool { From 0fc32f2c21dbb9b82678742edad72a8bffc553c6 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 30 Jul 2021 07:06:13 -0400 Subject: [PATCH 02/19] Fix imports after merge --- src/dbnode/integration/setup.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 0c4efe1520..d1d10c6e45 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -75,12 +75,6 @@ import ( "github.com/m3db/m3/src/x/instrument" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" - - "github.com/stretchr/testify/require" - "github.com/uber-go/tally" - "github.com/uber/tchannel-go" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) var ( From 635af99cbec32e9e79762d412332361847b19e0f Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 31 May 2022 16:11:39 -0400 Subject: [PATCH 03/19] Fix lint --- src/dbnode/integration/graphite_find_test.go | 7 +++++-- src/dbnode/integration/setup.go | 4 ++-- src/query/server/query.go | 7 ++++--- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index ee6b21bf44..a9a80512fa 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -1,3 +1,4 @@ +//go:build integration // +build integration // Copyright (c) 2021 Uber Technologies, Inc. @@ -23,6 +24,7 @@ package integration import ( + "context" "encoding/json" "fmt" "math/rand" @@ -122,7 +124,7 @@ local: levels = 5 entriesPerLevelMin = 6 entriesPerLevelMax = 9 - randConstSeedSrc = rand.NewSource(123456789) + randConstSeedSrc = rand.NewSource(123456789) // nolint: gosec randGen = rand.New(randConstSeedSrc) rootNode = &graphiteNode{} buildNodes func(node *graphiteNode, level int) @@ -270,7 +272,8 @@ local: url := fmt.Sprintf("http://%s%s?%s", setup.QueryAddress(), graphitehandler.FindURL, params.Encode()) - req, err := http.NewRequest(http.MethodGet, url, nil) + req, err := http.NewRequestWithContext(context.Background(), + http.MethodGet, url, nil) require.NoError(t, err) res, err := httpClient.Do(req) diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index d1d10c6e45..18c63436b0 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -861,8 +861,8 @@ func (ts *testSetup) StartQuery(configYAML string) error { return fmt.Errorf("dbnode admin client not set") } - configFile, close := newTestFile(ts.t, "config.yaml", configYAML) - defer close() + configFile, cleanup := newTestFile(ts.t, "config.yaml", configYAML) + defer cleanup() var cfg queryconfig.Configuration err := xconfig.LoadFile(&cfg, configFile.Name(), xconfig.Options{}) diff --git a/src/query/server/query.go b/src/query/server/query.go index f4a36a9362..f4ed82eb7c 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -474,9 +474,10 @@ func Run(runOpts RunOptions) RunResult { case "", config.M3DBStorageType: // For m3db backend, we need to make connections to the m3db cluster // which generates a session and use the storage with the session. - m3dbClusters, clusterNamespacesWatcher, m3dbPoolWrapper, err = initClusters(cfg, - runOpts.DBConfig, runOpts.DBClient, encodingOpts, runOpts.LocalSessionReadyCh, - instrumentOptions, tsdbOpts.CustomAdminOptions()) + m3dbClusters, m3dbPoolWrapper, err = initClusters(cfg, runOpts.DBConfig, + clusterNamespacesWatcher, runOpts.DBClient, encodingOpts, + runOpts.LocalSessionReadyCh, instrumentOptions, + tsdbOpts.CustomAdminOptions()) if err != nil { logger.Fatal("unable to init clusters", zap.Error(err)) } From a94176dd5dfdea10eba777d9908d5c85501c705a Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 31 May 2022 16:57:21 -0400 Subject: [PATCH 04/19] Fix namespace cfg --- src/dbnode/integration/graphite_find_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index a9a80512fa..3bb22a7d60 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -80,9 +80,13 @@ metrics: local: namespaces: - - namespace: testns + - namespace: default type: unaggregated retention: 12h + - namespace: testns + type: aggregated + retention: 12h + resolution: 1m ` var ( From f953b5f7eccf595e84f2f037c423cf226df69c01 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 31 May 2022 17:11:20 -0400 Subject: [PATCH 05/19] Fix lint again --- src/dbnode/integration/graphite_find_test.go | 7 ++++--- src/dbnode/integration/setup.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 3bb22a7d60..2bdf59c386 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -49,7 +49,7 @@ import ( graphitehandler "github.com/m3db/m3/src/query/api/v1/handler/graphite" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/x/ident" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" xsync "github.com/m3db/m3/src/x/sync" xtest "github.com/m3db/m3/src/x/test" ) @@ -125,11 +125,12 @@ local: // Create graphite node tree for tests. var ( + // nolint: gosec + randConstSeedSrc = rand.NewSource(123456789) + randGen = rand.New(randConstSeedSrc) levels = 5 entriesPerLevelMin = 6 entriesPerLevelMax = 9 - randConstSeedSrc = rand.NewSource(123456789) // nolint: gosec - randGen = rand.New(randConstSeedSrc) rootNode = &graphiteNode{} buildNodes func(node *graphiteNode, level int) generateSeries []generate.Series diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 18c63436b0..f17bc2b44c 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -1266,7 +1266,7 @@ func newTestFile(t *testing.T, fileName, contents string) (*os.File, closeFn) { tmpFile, err := ioutil.TempFile("", fileName) require.NoError(t, err) - _, err = tmpFile.Write([]byte(contents)) + _, err = tmpFile.WriteString(contents) require.NoError(t, err) return tmpFile, func() { From c6b1f769b5f9d66a4a0d3231331050ce29c2bf46 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 31 May 2022 17:21:27 -0400 Subject: [PATCH 06/19] Fix lint once more --- src/dbnode/integration/graphite_find_test.go | 4 +++- src/dbnode/integration/setup.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 2bdf59c386..4814e391f7 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -38,6 +38,7 @@ import ( "testing" "time" + // nolint: gci "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -126,7 +127,8 @@ local: // Create graphite node tree for tests. var ( // nolint: gosec - randConstSeedSrc = rand.NewSource(123456789) + randConstSeedSrc = rand.NewSource(123456789) + // nolint: gosec randGen = rand.New(randConstSeedSrc) levels = 5 entriesPerLevelMin = 6 diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index f17bc2b44c..b0f7da8684 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -33,6 +33,7 @@ import ( "testing" "time" + // nolint: gci "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" From 36b17f0fbea482d196480015f081ee28e859aafa Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 3 Jun 2022 15:32:16 -0400 Subject: [PATCH 07/19] Fix parallel use of terms iterable --- src/m3ninx/index/segment/fst/segment.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index b8608f0912..d927761e82 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -873,10 +873,9 @@ var _ sgmt.Reader = (*fsSegmentReader)(nil) // fsSegmentReader is not thread safe for use and relies on the underlying // segment for synchronization. type fsSegmentReader struct { - closed bool - ctx context.Context - fsSegment *fsSegment - termsIterable *termsIterable + closed bool + ctx context.Context + fsSegment *fsSegment } func newReader( @@ -927,9 +926,9 @@ func (sr *fsSegmentReader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator if sr.closed { return nil, errReaderClosed } - fieldsIterable := newTermsIterable(sr.fsSegment) + iterable := newTermsIterable(sr.fsSegment) sr.fsSegment.RLock() - iter, err := fieldsIterable.fieldsNotClosedMaybeFinalizedWithRLock() + iter, err := iterable.fieldsNotClosedMaybeFinalizedWithRLock() sr.fsSegment.RUnlock() return iter, err } @@ -938,11 +937,9 @@ func (sr *fsSegmentReader) Terms(field []byte) (sgmt.TermsIterator, error) { if sr.closed { return nil, errReaderClosed } - if sr.termsIterable == nil { - sr.termsIterable = newTermsIterable(sr.fsSegment) - } + iterable := newTermsIterable(sr.fsSegment) sr.fsSegment.RLock() - iter, err := sr.termsIterable.termsNotClosedMaybeFinalizedWithRLock(field) + iter, err := iterable.termsNotClosedMaybeFinalizedWithRLock(field) sr.fsSegment.RUnlock() return iter, err } From 30937e2d6cfafb11aacfe9c311ae1e331276baa6 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 3 Jun 2022 15:35:08 -0400 Subject: [PATCH 08/19] Allocate error only on test failure --- src/dbnode/integration/graphite_find_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 4814e391f7..9b80df4d93 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -310,12 +310,12 @@ local: sortGraphiteFindResults(actual) sortGraphiteFindResults(expected) - failMsg := fmt.Sprintf("invalid results: level=%d, parts=%d, query=%s", - level, len(node.pathParts), query) - failMsg += fmt.Sprintf("\n\ndiff:\n%s\n\n", - xtest.Diff(xtest.MustPrettyJSONObject(t, expected), - xtest.MustPrettyJSONObject(t, actual))) if !reflect.DeepEqual(expected, actual) { + failMsg := fmt.Sprintf("invalid results: level=%d, parts=%d, query=%s", + level, len(node.pathParts), query) + failMsg += fmt.Sprintf("\n\ndiff:\n%s\n\n", + xtest.Diff(xtest.MustPrettyJSONObject(t, expected), + xtest.MustPrettyJSONObject(t, actual))) // Bail parallel execution (failed require/assert won't stop execution). if checkedSeriesAbort.CAS(false, true) { // Assert an error result and log once. From 1c92098659973c029a6e5d70440a28c7320279fb Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 3 Jun 2022 16:33:48 -0400 Subject: [PATCH 09/19] Hold rlock for entire lifetime of FST segment reader --- src/m3ninx/index/segment/fst/segment.go | 100 ++++++++---------------- 1 file changed, 32 insertions(+), 68 deletions(-) diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index d927761e82..87fe282e98 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -245,14 +245,11 @@ func (r *fsSegment) ContainsField(field []byte) (bool, error) { } func (r *fsSegment) Reader() (sgmt.Reader, error) { - r.RLock() - defer r.RUnlock() - if r.closed { - return nil, errReaderClosed + reader, err := newReaderOpenAndHoldRLockAndCheckedNotClosed(r, r.opts) + if err != nil { + return nil, err } - reader := newReader(r, r.opts) - // NB(r): Ensure that we do not release, mmaps, etc // until all readers have been closed. r.ctx.DependsOn(reader.ctx) @@ -410,16 +407,6 @@ func (i *termsIterable) fieldsNotClosedMaybeFinalizedWithRLock() (sgmt.FieldsPos return i.postingsIter, nil } -func (r *fsSegment) UnmarshalPostingsListBitmap(b *pilosaroaring.Bitmap, offset uint64) error { - r.RLock() - defer r.RUnlock() - if r.closed { - return errReaderClosed - } - - return r.unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(b, offset) -} - func (r *fsSegment) unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(b *pilosaroaring.Bitmap, offset uint64) error { if r.finalized { return errReaderFinalized @@ -873,19 +860,27 @@ var _ sgmt.Reader = (*fsSegmentReader)(nil) // fsSegmentReader is not thread safe for use and relies on the underlying // segment for synchronization. type fsSegmentReader struct { - closed bool - ctx context.Context - fsSegment *fsSegment + closed bool + ctx context.Context + fsSegment *fsSegment + onCloseRUnlocker sync.Locker } -func newReader( +func newReaderOpenAndHoldRLockAndCheckedNotClosed( fsSegment *fsSegment, opts Options, -) *fsSegmentReader { - return &fsSegmentReader{ - ctx: opts.ContextPool().Get(), - fsSegment: fsSegment, +) (*fsSegmentReader, error) { + // NB(rob): We unlock this on close so we reliably extend the lifetime + // of the segment until all readers are closed. + fsSegment.RLock() + if fsSegment.closed { + return nil, errReaderClosed } + return &fsSegmentReader{ + ctx: opts.ContextPool().Get(), + fsSegment: fsSegment, + onCloseRUnlocker: fsSegment.RLocker(), + }, nil } func (sr *fsSegmentReader) Fields() (sgmt.FieldsIterator, error) { @@ -893,8 +888,6 @@ func (sr *fsSegmentReader) Fields() (sgmt.FieldsIterator, error) { return nil, errReaderClosed } - sr.fsSegment.RLock() - defer sr.fsSegment.RUnlock() if sr.fsSegment.finalized { return nil, errReaderFinalized } @@ -913,8 +906,6 @@ func (sr *fsSegmentReader) ContainsField(field []byte) (bool, error) { return false, errReaderClosed } - sr.fsSegment.RLock() - defer sr.fsSegment.RUnlock() if sr.fsSegment.finalized { return false, errReaderFinalized } @@ -927,10 +918,7 @@ func (sr *fsSegmentReader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator return nil, errReaderClosed } iterable := newTermsIterable(sr.fsSegment) - sr.fsSegment.RLock() - iter, err := iterable.fieldsNotClosedMaybeFinalizedWithRLock() - sr.fsSegment.RUnlock() - return iter, err + return iterable.fieldsNotClosedMaybeFinalizedWithRLock() } func (sr *fsSegmentReader) Terms(field []byte) (sgmt.TermsIterator, error) { @@ -938,10 +926,7 @@ func (sr *fsSegmentReader) Terms(field []byte) (sgmt.TermsIterator, error) { return nil, errReaderClosed } iterable := newTermsIterable(sr.fsSegment) - sr.fsSegment.RLock() - iter, err := iterable.termsNotClosedMaybeFinalizedWithRLock(field) - sr.fsSegment.RUnlock() - return iter, err + return iterable.termsNotClosedMaybeFinalizedWithRLock(field) } func (sr *fsSegmentReader) MatchField(field []byte) (postings.List, error) { @@ -962,10 +947,7 @@ func (sr *fsSegmentReader) MatchTerm(field []byte, term []byte) (postings.List, } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - sr.fsSegment.RLock() - pl, err := sr.fsSegment.matchTermNotClosedMaybeFinalizedWithRLock(field, term) - sr.fsSegment.RUnlock() - return pl, err + return sr.fsSegment.matchTermNotClosedMaybeFinalizedWithRLock(field, term) } func (sr *fsSegmentReader) MatchRegexp( @@ -977,10 +959,7 @@ func (sr *fsSegmentReader) MatchRegexp( } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - sr.fsSegment.RLock() - pl, err := sr.fsSegment.matchRegexpNotClosedMaybeFinalizedWithRLock(field, compiled) - sr.fsSegment.RUnlock() - return pl, err + return sr.fsSegment.matchRegexpNotClosedMaybeFinalizedWithRLock(field, compiled) } func (sr *fsSegmentReader) MatchAll() (postings.List, error) { @@ -989,10 +968,7 @@ func (sr *fsSegmentReader) MatchAll() (postings.List, error) { } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - sr.fsSegment.RLock() - pl, err := sr.fsSegment.matchAllNotClosedMaybeFinalizedWithRLock() - sr.fsSegment.RUnlock() - return pl, err + return sr.fsSegment.matchAllNotClosedMaybeFinalizedWithRLock() } func (sr *fsSegmentReader) Metadata(id postings.ID) (doc.Metadata, error) { @@ -1001,10 +977,7 @@ func (sr *fsSegmentReader) Metadata(id postings.ID) (doc.Metadata, error) { } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - sr.fsSegment.RLock() - pl, err := sr.fsSegment.metadataNotClosedMaybeFinalizedWithRLock(id) - sr.fsSegment.RUnlock() - return pl, err + return sr.fsSegment.metadataNotClosedMaybeFinalizedWithRLock(id) } func (sr *fsSegmentReader) MetadataIterator(pl postings.List) (doc.MetadataIterator, error) { @@ -1015,10 +988,7 @@ func (sr *fsSegmentReader) MetadataIterator(pl postings.List) (doc.MetadataItera // the segment but not after it is finalized. // Also make sure the doc retriever is the reader not the segment so that // is closed check is not performed and only the is finalized check. - sr.fsSegment.RLock() - iter, err := sr.fsSegment.metadataIteratorNotClosedMaybeFinalizedWithRLock(sr, pl) - sr.fsSegment.RUnlock() - return iter, err + return sr.fsSegment.metadataIteratorNotClosedMaybeFinalizedWithRLock(sr, pl) } func (sr *fsSegmentReader) Doc(id postings.ID) (doc.Document, error) { @@ -1027,10 +997,7 @@ func (sr *fsSegmentReader) Doc(id postings.ID) (doc.Document, error) { } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - sr.fsSegment.RLock() - pl, err := sr.fsSegment.docNotClosedMaybeFinalizedWithRLock(id) - sr.fsSegment.RUnlock() - return pl, err + return sr.fsSegment.docNotClosedMaybeFinalizedWithRLock(id) } func (sr *fsSegmentReader) Docs(pl postings.List) (doc.Iterator, error) { @@ -1041,10 +1008,7 @@ func (sr *fsSegmentReader) Docs(pl postings.List) (doc.Iterator, error) { // the segment but not after it is finalized. // Also make sure the doc retriever is the reader not the segment so that // is closed check is not performed and only the is finalized check. - sr.fsSegment.RLock() - iter, err := sr.fsSegment.docsNotClosedMaybeFinalizedWithRLock(sr, pl) - sr.fsSegment.RUnlock() - return iter, err + return sr.fsSegment.docsNotClosedMaybeFinalizedWithRLock(sr, pl) } func (sr *fsSegmentReader) AllDocs() (index.IDDocIterator, error) { @@ -1055,10 +1019,7 @@ func (sr *fsSegmentReader) AllDocs() (index.IDDocIterator, error) { // the segment but not after it is finalized. // Also make sure the doc retriever is the reader not the segment so that // is closed check is not performed and only the is finalized check. - sr.fsSegment.RLock() - iter, err := sr.fsSegment.allDocsNotClosedMaybeFinalizedWithRLock(sr) - sr.fsSegment.RUnlock() - return iter, err + return sr.fsSegment.allDocsNotClosedMaybeFinalizedWithRLock(sr) } func (sr *fsSegmentReader) Close() error { @@ -1068,5 +1029,8 @@ func (sr *fsSegmentReader) Close() error { sr.closed = true // Close the context so that segment doesn't need to track this any longer. sr.ctx.Close() + // This unlocks the reader lock that was established to extend the lifetime + // of the segment. + sr.onCloseRUnlocker.Unlock() return nil } From b456c0d4dd742f0d11f7d9740ca6cbdc9705ffe2 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 6 Jun 2022 12:10:28 -0400 Subject: [PATCH 10/19] Split test into sequential/parallel check --- src/dbnode/integration/graphite_find_test.go | 64 +++++++++++++++++--- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 9b80df4d93..112a9643c9 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -55,7 +55,38 @@ import ( xtest "github.com/m3db/m3/src/x/test" ) -func TestGraphiteFind(tt *testing.T) { +type testGraphiteFindDatasetSize uint + +const ( + smallDatasetSize testGraphiteFindDatasetSize = iota + largeDatasetSize +) + +type testGraphiteFindOptions struct { + checkConcurrency int + datasetSize testGraphiteFindDatasetSize +} + +func TestGraphiteFindSequential(t *testing.T) { + // NB(rob): We need to investigate why using high concurrency (and hence + // need to use small dataset size since otherwise verification takes + // forever) encounters errors running on CI. + testGraphiteFind(t, testGraphiteFindOptions{ + checkConcurrency: 1, + datasetSize: smallDatasetSize, + }) +} + +func TestGraphiteFindParallel(t *testing.T) { + // Skip until investigation of why check concurrency encounters errors on CI. + t.SkipNow() + testGraphiteFind(t, testGraphiteFindOptions{ + checkConcurrency: runtime.NumCPU(), + datasetSize: largeDatasetSize, + }) +} + +func testGraphiteFind(tt *testing.T, testOpts testGraphiteFindOptions) { if testing.Short() { tt.SkipNow() // Just skip if we're doing a short run } @@ -130,13 +161,29 @@ local: randConstSeedSrc = rand.NewSource(123456789) // nolint: gosec randGen = rand.New(randConstSeedSrc) - levels = 5 - entriesPerLevelMin = 6 - entriesPerLevelMax = 9 rootNode = &graphiteNode{} buildNodes func(node *graphiteNode, level int) generateSeries []generate.Series + levels int + entriesPerLevelMin int + entriesPerLevelMax int ) + switch testOpts.datasetSize { + case smallDatasetSize: + levels = 5 + entriesPerLevelMin = 5 + entriesPerLevelMax = 7 + case largeDatasetSize: + // Ideally we'd always use a large dataset size, however you do need + // high concurrency to validate this entire dataset and CI can't seem + // to handle high concurrency without encountering errors. + levels = 5 + entriesPerLevelMin = 6 + entriesPerLevelMax = 9 + default: + require.FailNow(t, fmt.Sprintf("invalid test dataset size set: %d", testOpts.datasetSize)) + } + buildNodes = func(node *graphiteNode, level int) { entries := entriesPerLevelMin + randGen.Intn(entriesPerLevelMax-entriesPerLevelMin) @@ -233,10 +280,9 @@ local: checkedSeries = atomic.NewUint64(0) checkedSeriesLog = atomic.NewUint64(0) // Use custom http client for higher number of max idle conns. - httpClient = xhttp.NewHTTPClient(xhttp.DefaultHTTPClientOptions()) - wg sync.WaitGroup - workerConcurrency = runtime.NumCPU() - workerPool = xsync.NewWorkerPool(workerConcurrency) + httpClient = xhttp.NewHTTPClient(xhttp.DefaultHTTPClientOptions()) + wg sync.WaitGroup + workerPool = xsync.NewWorkerPool(testOpts.checkConcurrency) ) workerPool.Init() parallelVerifyFindQueries = func(node *graphiteNode, level int) { @@ -331,7 +377,7 @@ local: // Check all top level entries and recurse. log.Info("checking series", - zap.Int("workerConcurrency", workerConcurrency), + zap.Int("checkConcurrency", testOpts.checkConcurrency), zap.Uint64("numSeriesChecking", numSeriesChecking)) parallelVerifyFindQueries(rootNode, 0) From a0143de7a02938675c4bf55a8dda6136e016d085 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 6 Jun 2022 14:06:18 -0400 Subject: [PATCH 11/19] Restore fst/segment.go changes --- src/m3ninx/index/segment/fst/segment.go | 107 ++++++++++++++++-------- 1 file changed, 73 insertions(+), 34 deletions(-) diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index 87fe282e98..b8608f0912 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -245,11 +245,14 @@ func (r *fsSegment) ContainsField(field []byte) (bool, error) { } func (r *fsSegment) Reader() (sgmt.Reader, error) { - reader, err := newReaderOpenAndHoldRLockAndCheckedNotClosed(r, r.opts) - if err != nil { - return nil, err + r.RLock() + defer r.RUnlock() + if r.closed { + return nil, errReaderClosed } + reader := newReader(r, r.opts) + // NB(r): Ensure that we do not release, mmaps, etc // until all readers have been closed. r.ctx.DependsOn(reader.ctx) @@ -407,6 +410,16 @@ func (i *termsIterable) fieldsNotClosedMaybeFinalizedWithRLock() (sgmt.FieldsPos return i.postingsIter, nil } +func (r *fsSegment) UnmarshalPostingsListBitmap(b *pilosaroaring.Bitmap, offset uint64) error { + r.RLock() + defer r.RUnlock() + if r.closed { + return errReaderClosed + } + + return r.unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(b, offset) +} + func (r *fsSegment) unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(b *pilosaroaring.Bitmap, offset uint64) error { if r.finalized { return errReaderFinalized @@ -860,27 +873,20 @@ var _ sgmt.Reader = (*fsSegmentReader)(nil) // fsSegmentReader is not thread safe for use and relies on the underlying // segment for synchronization. type fsSegmentReader struct { - closed bool - ctx context.Context - fsSegment *fsSegment - onCloseRUnlocker sync.Locker + closed bool + ctx context.Context + fsSegment *fsSegment + termsIterable *termsIterable } -func newReaderOpenAndHoldRLockAndCheckedNotClosed( +func newReader( fsSegment *fsSegment, opts Options, -) (*fsSegmentReader, error) { - // NB(rob): We unlock this on close so we reliably extend the lifetime - // of the segment until all readers are closed. - fsSegment.RLock() - if fsSegment.closed { - return nil, errReaderClosed - } +) *fsSegmentReader { return &fsSegmentReader{ - ctx: opts.ContextPool().Get(), - fsSegment: fsSegment, - onCloseRUnlocker: fsSegment.RLocker(), - }, nil + ctx: opts.ContextPool().Get(), + fsSegment: fsSegment, + } } func (sr *fsSegmentReader) Fields() (sgmt.FieldsIterator, error) { @@ -888,6 +894,8 @@ func (sr *fsSegmentReader) Fields() (sgmt.FieldsIterator, error) { return nil, errReaderClosed } + sr.fsSegment.RLock() + defer sr.fsSegment.RUnlock() if sr.fsSegment.finalized { return nil, errReaderFinalized } @@ -906,6 +914,8 @@ func (sr *fsSegmentReader) ContainsField(field []byte) (bool, error) { return false, errReaderClosed } + sr.fsSegment.RLock() + defer sr.fsSegment.RUnlock() if sr.fsSegment.finalized { return false, errReaderFinalized } @@ -917,16 +927,24 @@ func (sr *fsSegmentReader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator if sr.closed { return nil, errReaderClosed } - iterable := newTermsIterable(sr.fsSegment) - return iterable.fieldsNotClosedMaybeFinalizedWithRLock() + fieldsIterable := newTermsIterable(sr.fsSegment) + sr.fsSegment.RLock() + iter, err := fieldsIterable.fieldsNotClosedMaybeFinalizedWithRLock() + sr.fsSegment.RUnlock() + return iter, err } func (sr *fsSegmentReader) Terms(field []byte) (sgmt.TermsIterator, error) { if sr.closed { return nil, errReaderClosed } - iterable := newTermsIterable(sr.fsSegment) - return iterable.termsNotClosedMaybeFinalizedWithRLock(field) + if sr.termsIterable == nil { + sr.termsIterable = newTermsIterable(sr.fsSegment) + } + sr.fsSegment.RLock() + iter, err := sr.termsIterable.termsNotClosedMaybeFinalizedWithRLock(field) + sr.fsSegment.RUnlock() + return iter, err } func (sr *fsSegmentReader) MatchField(field []byte) (postings.List, error) { @@ -947,7 +965,10 @@ func (sr *fsSegmentReader) MatchTerm(field []byte, term []byte) (postings.List, } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - return sr.fsSegment.matchTermNotClosedMaybeFinalizedWithRLock(field, term) + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchTermNotClosedMaybeFinalizedWithRLock(field, term) + sr.fsSegment.RUnlock() + return pl, err } func (sr *fsSegmentReader) MatchRegexp( @@ -959,7 +980,10 @@ func (sr *fsSegmentReader) MatchRegexp( } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - return sr.fsSegment.matchRegexpNotClosedMaybeFinalizedWithRLock(field, compiled) + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchRegexpNotClosedMaybeFinalizedWithRLock(field, compiled) + sr.fsSegment.RUnlock() + return pl, err } func (sr *fsSegmentReader) MatchAll() (postings.List, error) { @@ -968,7 +992,10 @@ func (sr *fsSegmentReader) MatchAll() (postings.List, error) { } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - return sr.fsSegment.matchAllNotClosedMaybeFinalizedWithRLock() + sr.fsSegment.RLock() + pl, err := sr.fsSegment.matchAllNotClosedMaybeFinalizedWithRLock() + sr.fsSegment.RUnlock() + return pl, err } func (sr *fsSegmentReader) Metadata(id postings.ID) (doc.Metadata, error) { @@ -977,7 +1004,10 @@ func (sr *fsSegmentReader) Metadata(id postings.ID) (doc.Metadata, error) { } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - return sr.fsSegment.metadataNotClosedMaybeFinalizedWithRLock(id) + sr.fsSegment.RLock() + pl, err := sr.fsSegment.metadataNotClosedMaybeFinalizedWithRLock(id) + sr.fsSegment.RUnlock() + return pl, err } func (sr *fsSegmentReader) MetadataIterator(pl postings.List) (doc.MetadataIterator, error) { @@ -988,7 +1018,10 @@ func (sr *fsSegmentReader) MetadataIterator(pl postings.List) (doc.MetadataItera // the segment but not after it is finalized. // Also make sure the doc retriever is the reader not the segment so that // is closed check is not performed and only the is finalized check. - return sr.fsSegment.metadataIteratorNotClosedMaybeFinalizedWithRLock(sr, pl) + sr.fsSegment.RLock() + iter, err := sr.fsSegment.metadataIteratorNotClosedMaybeFinalizedWithRLock(sr, pl) + sr.fsSegment.RUnlock() + return iter, err } func (sr *fsSegmentReader) Doc(id postings.ID) (doc.Document, error) { @@ -997,7 +1030,10 @@ func (sr *fsSegmentReader) Doc(id postings.ID) (doc.Document, error) { } // NB(r): We are allowed to call match field after Close called on // the segment but not after it is finalized. - return sr.fsSegment.docNotClosedMaybeFinalizedWithRLock(id) + sr.fsSegment.RLock() + pl, err := sr.fsSegment.docNotClosedMaybeFinalizedWithRLock(id) + sr.fsSegment.RUnlock() + return pl, err } func (sr *fsSegmentReader) Docs(pl postings.List) (doc.Iterator, error) { @@ -1008,7 +1044,10 @@ func (sr *fsSegmentReader) Docs(pl postings.List) (doc.Iterator, error) { // the segment but not after it is finalized. // Also make sure the doc retriever is the reader not the segment so that // is closed check is not performed and only the is finalized check. - return sr.fsSegment.docsNotClosedMaybeFinalizedWithRLock(sr, pl) + sr.fsSegment.RLock() + iter, err := sr.fsSegment.docsNotClosedMaybeFinalizedWithRLock(sr, pl) + sr.fsSegment.RUnlock() + return iter, err } func (sr *fsSegmentReader) AllDocs() (index.IDDocIterator, error) { @@ -1019,7 +1058,10 @@ func (sr *fsSegmentReader) AllDocs() (index.IDDocIterator, error) { // the segment but not after it is finalized. // Also make sure the doc retriever is the reader not the segment so that // is closed check is not performed and only the is finalized check. - return sr.fsSegment.allDocsNotClosedMaybeFinalizedWithRLock(sr) + sr.fsSegment.RLock() + iter, err := sr.fsSegment.allDocsNotClosedMaybeFinalizedWithRLock(sr) + sr.fsSegment.RUnlock() + return iter, err } func (sr *fsSegmentReader) Close() error { @@ -1029,8 +1071,5 @@ func (sr *fsSegmentReader) Close() error { sr.closed = true // Close the context so that segment doesn't need to track this any longer. sr.ctx.Close() - // This unlocks the reader lock that was established to extend the lifetime - // of the segment. - sr.onCloseRUnlocker.Unlock() return nil } From e8744b427ed4d807ae943656c262f903b4ad94cb Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 7 Jun 2022 12:39:29 -0400 Subject: [PATCH 12/19] Add debug log level --- src/dbnode/integration/graphite_find_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 112a9643c9..e3079989ce 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -30,6 +30,7 @@ import ( "math/rand" "net/http" "net/url" + "os" "reflect" "runtime" "sort" @@ -137,6 +138,7 @@ local: ns, err := namespace.NewMetadata(ident.StringID("testns"), nOpts) require.NoError(t, err) + os.Setenv("TEST_DEBUG_LOG", "true") opts := NewTestOptions(tt). SetNamespaces([]namespace.Metadata{ns}) From fda0e980ef8484c332c88bd732754de511210b53 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 7 Jun 2022 15:14:20 -0400 Subject: [PATCH 13/19] Add retry to recover from transient errors when checking the tens of thousands of series entries --- src/dbnode/integration/graphite_find_test.go | 91 +++++++++++++++----- src/dbnode/storage/coldflush.go | 12 +-- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index e3079989ce..08455b6bda 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -273,8 +273,16 @@ local: }() // Check each level of the tree can answer expected queries. + type checkResult struct { + leavesVerified int + } + type checkFailure struct { + expected graphiteFindResults + actual graphiteFindResults + failMsg string + } var ( - verifyFindQueries func(node *graphiteNode, level int) + verifyFindQueries func(node *graphiteNode, level int) (checkResult, *checkFailure, error) parallelVerifyFindQueries func(node *graphiteNode, level int) checkedSeriesAbort = atomic.NewBool(false) numSeriesChecking = uint64(len(generateSeries)) @@ -288,10 +296,49 @@ local: ) workerPool.Init() parallelVerifyFindQueries = func(node *graphiteNode, level int) { + // Verify this node at level. wg.Add(1) workerPool.Go(func() { - verifyFindQueries(node, level) - wg.Done() + defer wg.Done() + + if checkedSeriesAbort.Load() { + // Do not execute if aborted. + return + } + + var ( + result checkResult + failure = &checkFailure{} + err = fmt.Errorf("initial error") + ) + for attempt := 0; (failure != nil || err != nil) && attempt < 2; attempt++ { + if attempt > 0 { + // Retry transient errors (should add a strict mode for this test + // avoid allowing transient errors too). + time.Sleep(5*time.Millsecond) + } + result, failure, err = verifyFindQueries(node, level) + } + if failure == nil && err == nil { + // Account for series checked (for progress report). + checkedSeries.Add(uint64(result.leavesVerified)) + return + } + + // Bail parallel execution (failed require/assert won't stop execution). + if checkedSeriesAbort.CAS(false, true) { + switch { + case failure != nil: + // Assert an error result and log once. + assert.Equal(t, failure.expected, failure.actual, failure.failMsg) + log.Error("aborting checks due to mismatch") + case err != nil: + assert.NoError(t, err) + log.Error("aborting checks due to error") + default: + + } + } }) // Verify children of children. @@ -299,11 +346,8 @@ local: parallelVerifyFindQueries(child, level+1) } } - verifyFindQueries = func(node *graphiteNode, level int) { - if checkedSeriesAbort.Load() { - // Do not execute if aborted. - return - } + verifyFindQueries = func(node *graphiteNode, level int) (checkResult, *checkFailure, error) { + var r checkResult // Write progress report if progress made. checked := checkedSeries.Load() @@ -332,22 +376,28 @@ local: require.NoError(t, err) res, err := httpClient.Do(req) - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) + if err != nil { + return r, nil, err + } + if res.StatusCode != http.StatusOK { + return r, nil, fmt.Errorf("bad response code: expected=%d, actual=%d", + http.StatusOK, res.StatusCode) + } defer res.Body.Close() // Compare results. var actual graphiteFindResults - require.NoError(t, json.NewDecoder(res.Body).Decode(&actual)) + if err := json.NewDecoder(res.Body).Decode(&actual); err != nil { + return r, nil, err + } expected := make(graphiteFindResults, 0, len(node.children)) - leaves := 0 for _, child := range node.children { leaf := 0 if child.isLeaf { leaf = 1 - leaves++ + r.leavesVerified++ } expected = append(expected, graphiteFindResult{ Text: child.name, @@ -364,17 +414,14 @@ local: failMsg += fmt.Sprintf("\n\ndiff:\n%s\n\n", xtest.Diff(xtest.MustPrettyJSONObject(t, expected), xtest.MustPrettyJSONObject(t, actual))) - // Bail parallel execution (failed require/assert won't stop execution). - if checkedSeriesAbort.CAS(false, true) { - // Assert an error result and log once. - assert.Equal(t, expected, actual, failMsg) - log.Error("aborting checks") - } - return + return r, &checkFailure{ + expected: expected, + actual: actual, + failMsg: failMsg, + }, nil } - // Account for series checked (for progress report). - checkedSeries.Add(uint64(leaves)) + return r, nil, nil } // Check all top level entries and recurse. diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index 23b7f4ae4b..d0b1f7c46d 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -108,9 +108,9 @@ func (m *coldFlushManager) Run(t xtime.UnixNano) bool { m.Unlock() }() - debugLog := m.log.Check(zapcore.DebugLevel, "cold flush run") - if debugLog != nil { - debugLog.Write(zap.String("status", "starting cold flush"), zap.Time("time", t.ToTime())) + + if log := m.log.Check(zapcore.DebugLevel, "cold flush run start"); log != nil { + log.Write(zap.Time("time", t.ToTime())) } // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. @@ -132,9 +132,9 @@ func (m *coldFlushManager) Run(t xtime.UnixNano) bool { zap.Time("time", t.ToTime()), zap.Error(err)) }) } - - if debugLog != nil { - debugLog.Write(zap.String("status", "completed cold flush"), zap.Time("time", t.ToTime())) + + if log := m.log.Check(zapcore.DebugLevel, "cold flush run complete"); log != nil { + log.Write(zap.Time("time", t.ToTime())) } return true From 75569baed67e2247fe8ee320531d10e1b495d167 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 7 Jun 2022 15:55:41 -0400 Subject: [PATCH 14/19] Fix up formatting --- src/dbnode/integration/graphite_find_test.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 08455b6bda..1aacc89eb8 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -30,7 +30,6 @@ import ( "math/rand" "net/http" "net/url" - "os" "reflect" "runtime" "sort" @@ -138,7 +137,6 @@ local: ns, err := namespace.NewMetadata(ident.StringID("testns"), nOpts) require.NoError(t, err) - os.Setenv("TEST_DEBUG_LOG", "true") opts := NewTestOptions(tt). SetNamespaces([]namespace.Metadata{ns}) @@ -278,8 +276,8 @@ local: } type checkFailure struct { expected graphiteFindResults - actual graphiteFindResults - failMsg string + actual graphiteFindResults + failMsg string } var ( verifyFindQueries func(node *graphiteNode, level int) (checkResult, *checkFailure, error) @@ -307,15 +305,15 @@ local: } var ( - result checkResult + result checkResult failure = &checkFailure{} - err = fmt.Errorf("initial error") + err = fmt.Errorf("initial error") ) for attempt := 0; (failure != nil || err != nil) && attempt < 2; attempt++ { if attempt > 0 { - // Retry transient errors (should add a strict mode for this test + // Retry transient errors (should add a strict mode for this test // avoid allowing transient errors too). - time.Sleep(5*time.Millsecond) + time.Sleep(5 * time.Millisecond) } result, failure, err = verifyFindQueries(node, level) } @@ -380,7 +378,7 @@ local: return r, nil, err } if res.StatusCode != http.StatusOK { - return r, nil, fmt.Errorf("bad response code: expected=%d, actual=%d", + return r, nil, fmt.Errorf("bad response code: expected=%d, actual=%d", http.StatusOK, res.StatusCode) } @@ -416,8 +414,8 @@ local: xtest.MustPrettyJSONObject(t, actual))) return r, &checkFailure{ expected: expected, - actual: actual, - failMsg: failMsg, + actual: actual, + failMsg: failMsg, }, nil } From e130a66e10b058050b5404d8492cefd18bcb2014 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 7 Jun 2022 16:18:29 -0400 Subject: [PATCH 15/19] Change levels, fix lint and add proper abort message if/when failure condition unknown --- src/dbnode/integration/graphite_find_test.go | 9 +++++---- src/dbnode/storage/coldflush.go | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 1aacc89eb8..393caab752 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -170,9 +170,9 @@ local: ) switch testOpts.datasetSize { case smallDatasetSize: - levels = 5 - entriesPerLevelMin = 5 - entriesPerLevelMax = 7 + levels = 4 + entriesPerLevelMin = 7 + entriesPerLevelMax = 10 case largeDatasetSize: // Ideally we'd always use a large dataset size, however you do need // high concurrency to validate this entire dataset and CI can't seem @@ -334,7 +334,8 @@ local: assert.NoError(t, err) log.Error("aborting checks due to error") default: - + require.FailNow(t, "unknown error condition") + log.Error("aborting checks due to unknown condition") } } }) diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index d0b1f7c46d..9e341241f2 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -108,7 +108,6 @@ func (m *coldFlushManager) Run(t xtime.UnixNano) bool { m.Unlock() }() - if log := m.log.Check(zapcore.DebugLevel, "cold flush run start"); log != nil { log.Write(zap.Time("time", t.ToTime())) } @@ -132,7 +131,7 @@ func (m *coldFlushManager) Run(t xtime.UnixNano) bool { zap.Time("time", t.ToTime()), zap.Error(err)) }) } - + if log := m.log.Check(zapcore.DebugLevel, "cold flush run complete"); log != nil { log.Write(zap.Time("time", t.ToTime())) } From ce97d178a533d33fef216071ebfedf9a252cffc8 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 8 Jun 2022 11:29:16 -0400 Subject: [PATCH 16/19] Raise retry time and multiply on each subsequent retry --- src/dbnode/integration/graphite_find_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 393caab752..e0620cff12 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -309,11 +309,11 @@ local: failure = &checkFailure{} err = fmt.Errorf("initial error") ) - for attempt := 0; (failure != nil || err != nil) && attempt < 2; attempt++ { + for attempt := 0; (failure != nil || err != nil) && attempt < 3; attempt++ { if attempt > 0 { // Retry transient errors (should add a strict mode for this test // avoid allowing transient errors too). - time.Sleep(5 * time.Millisecond) + time.Sleep(time.Duration(attempt) * 500 * time.Millisecond) } result, failure, err = verifyFindQueries(node, level) } From a809d82d68b44e6d21488f27ca88e47991798494 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 8 Jun 2022 12:04:21 -0400 Subject: [PATCH 17/19] Increase timeout of retries --- src/dbnode/integration/graphite_find_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index e0620cff12..5bc92eb56d 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -313,7 +313,8 @@ local: if attempt > 0 { // Retry transient errors (should add a strict mode for this test // avoid allowing transient errors too). - time.Sleep(time.Duration(attempt) * 500 * time.Millisecond) + seconds := 5 * attempt + time.Sleep(time.Duration(seconds) * time.Second) } result, failure, err = verifyFindQueries(node, level) } From 5b79d554169424a21d375cc39c5bec42c5b38ecf Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 8 Jun 2022 13:06:07 -0400 Subject: [PATCH 18/19] Lower levels --- src/dbnode/integration/graphite_find_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 5bc92eb56d..866dc38614 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -171,8 +171,8 @@ local: switch testOpts.datasetSize { case smallDatasetSize: levels = 4 - entriesPerLevelMin = 7 - entriesPerLevelMax = 10 + entriesPerLevelMin = 5 + entriesPerLevelMax = 7 case largeDatasetSize: // Ideally we'd always use a large dataset size, however you do need // high concurrency to validate this entire dataset and CI can't seem From d55db2c66c8eaeeaff2e156e60418b142e9d8b03 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 8 Jun 2022 14:09:36 -0400 Subject: [PATCH 19/19] Remove retries --- src/dbnode/integration/graphite_find_test.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go index 866dc38614..d655c1fd67 100644 --- a/src/dbnode/integration/graphite_find_test.go +++ b/src/dbnode/integration/graphite_find_test.go @@ -304,20 +304,7 @@ local: return } - var ( - result checkResult - failure = &checkFailure{} - err = fmt.Errorf("initial error") - ) - for attempt := 0; (failure != nil || err != nil) && attempt < 3; attempt++ { - if attempt > 0 { - // Retry transient errors (should add a strict mode for this test - // avoid allowing transient errors too). - seconds := 5 * attempt - time.Sleep(time.Duration(seconds) * time.Second) - } - result, failure, err = verifyFindQueries(node, level) - } + result, failure, err := verifyFindQueries(node, level) if failure == nil && err == nil { // Account for series checked (for progress report). checkedSeries.Add(uint64(result.leavesVerified))