diff --git a/src/dbnode/integration/graphite_find_test.go b/src/dbnode/integration/graphite_find_test.go new file mode 100644 index 0000000000..d655c1fd67 --- /dev/null +++ b/src/dbnode/integration/graphite_find_test.go @@ -0,0 +1,473 @@ +//go:build integration +// +build integration + +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "net/url" + "reflect" + "runtime" + "sort" + "strings" + "sync" + "testing" + "time" + + // nolint: gci + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + graphitehandler "github.com/m3db/m3/src/query/api/v1/handler/graphite" + "github.com/m3db/m3/src/query/graphite/graphite" + "github.com/m3db/m3/src/x/ident" + xhttp "github.com/m3db/m3/src/x/net/http" + xsync "github.com/m3db/m3/src/x/sync" + xtest "github.com/m3db/m3/src/x/test" +) + +type testGraphiteFindDatasetSize uint + +const ( + smallDatasetSize testGraphiteFindDatasetSize = iota + largeDatasetSize +) + +type testGraphiteFindOptions struct { + checkConcurrency int + datasetSize testGraphiteFindDatasetSize +} + +func TestGraphiteFindSequential(t *testing.T) { + // NB(rob): We need to investigate why using high concurrency (and hence + // need to use small dataset size since otherwise verification takes + // forever) encounters errors running on CI. + testGraphiteFind(t, testGraphiteFindOptions{ + checkConcurrency: 1, + datasetSize: smallDatasetSize, + }) +} + +func TestGraphiteFindParallel(t *testing.T) { + // Skip until investigation of why check concurrency encounters errors on CI. + t.SkipNow() + testGraphiteFind(t, testGraphiteFindOptions{ + checkConcurrency: runtime.NumCPU(), + datasetSize: largeDatasetSize, + }) +} + +func testGraphiteFind(tt *testing.T, testOpts testGraphiteFindOptions) { + if testing.Short() { + tt.SkipNow() // Just skip if we're doing a short run + } + + // Make sure that parallel assertions fail test immediately + // by using a TestingT that panics when FailNow is called. + t := xtest.FailNowPanicsTestingT(tt) + + const queryConfigYAML = ` +listenAddress: 127.0.0.1:7201 + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: "127.0.0.1:0" + sanitization: prometheus + samplingRate: 1.0 + +local: + namespaces: + - namespace: default + type: unaggregated + retention: 12h + - namespace: testns + type: aggregated + retention: 12h + resolution: 1m +` + + var ( + blockSize = 2 * time.Hour + retentionPeriod = 6 * blockSize + rOpts = retention.NewOptions(). + SetRetentionPeriod(retentionPeriod). + SetBlockSize(blockSize) + idxOpts = namespace.NewIndexOptions(). + SetEnabled(true). + SetBlockSize(2 * blockSize) + nOpts = namespace.NewOptions(). + SetRetentionOptions(rOpts). + SetIndexOptions(idxOpts) + ) + ns, err := namespace.NewMetadata(ident.StringID("testns"), nOpts) + require.NoError(t, err) + + opts := NewTestOptions(tt). + SetNamespaces([]namespace.Metadata{ns}) + + // Test setup. + setup, err := NewTestSetup(tt, opts, nil) + require.NoError(t, err) + defer setup.Close() + + log := setup.StorageOpts().InstrumentOptions().Logger(). + With(zap.String("ns", ns.ID().String())) + + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + WithFileSystem: true, + })) + + // Write test data. + now := setup.NowFn()() + + // Create graphite node tree for tests. + var ( + // nolint: gosec + randConstSeedSrc = rand.NewSource(123456789) + // nolint: gosec + randGen = rand.New(randConstSeedSrc) + rootNode = &graphiteNode{} + buildNodes func(node *graphiteNode, level int) + generateSeries []generate.Series + levels int + entriesPerLevelMin int + entriesPerLevelMax int + ) + switch testOpts.datasetSize { + case smallDatasetSize: + levels = 4 + entriesPerLevelMin = 5 + entriesPerLevelMax = 7 + case largeDatasetSize: + // Ideally we'd always use a large dataset size, however you do need + // high concurrency to validate this entire dataset and CI can't seem + // to handle high concurrency without encountering errors. + levels = 5 + entriesPerLevelMin = 6 + entriesPerLevelMax = 9 + default: + require.FailNow(t, fmt.Sprintf("invalid test dataset size set: %d", testOpts.datasetSize)) + } + + buildNodes = func(node *graphiteNode, level int) { + entries := entriesPerLevelMin + + randGen.Intn(entriesPerLevelMax-entriesPerLevelMin) + for entry := 0; entry < entries; entry++ { + name := fmt.Sprintf("lvl%02d_entry%02d", level, entry) + + // Create a directory node and spawn more underneath. + if nextLevel := level + 1; nextLevel <= levels { + childDir := node.child(name+"_dir", graphiteNodeChildOptions{ + isLeaf: false, + }) + buildNodes(childDir, nextLevel) + } + + // Create a leaf node. + childLeaf := node.child(name+"_leaf", graphiteNodeChildOptions{ + isLeaf: true, + }) + + // Create series to generate data for the leaf node. + tags := make([]ident.Tag, 0, len(childLeaf.pathParts)) + for i, pathPartValue := range childLeaf.pathParts { + tags = append(tags, ident.Tag{ + Name: graphite.TagNameID(i), + Value: ident.StringID(pathPartValue), + }) + } + series := generate.Series{ + ID: ident.StringID(strings.Join(childLeaf.pathParts, ".")), + Tags: ident.NewTags(tags...), + } + generateSeries = append(generateSeries, series) + } + } + + // Build tree. + log.Info("building graphite data set series") + buildNodes(rootNode, 0) + + // Generate and write test data. + log.Info("generating graphite data set datapoints", + zap.Int("seriesSize", len(generateSeries))) + generateBlocks := make([]generate.BlockConfig, 0, len(generateSeries)) + for _, series := range generateSeries { + generateBlocks = append(generateBlocks, []generate.BlockConfig{ + { + IDs: []string{series.ID.String()}, + Tags: series.Tags, + NumPoints: 1, + Start: now.Add(-1 * blockSize), + }, + { + IDs: []string{series.ID.String()}, + Tags: series.Tags, + NumPoints: 1, + Start: now, + }, + }...) + } + seriesMaps := generate.BlocksByStart(generateBlocks) + log.Info("writing graphite data set to disk", + zap.Int("seriesMapSize", len(seriesMaps))) + require.NoError(t, writeTestDataToDisk(ns, setup, seriesMaps, 0)) + + // Start the server with filesystem bootstrapper. + log.Info("starting server") + require.NoError(t, setup.StartServer()) + log.Info("server is now up") + + // Stop the server. + defer func() { + require.NoError(t, setup.StopServer()) + log.Info("server is now down") + }() + + // Start the query server + log.Info("starting query server") + require.NoError(t, setup.StartQuery(queryConfigYAML)) + log.Info("started query server", zap.String("addr", setup.QueryAddress())) + + // Stop the query server. + defer func() { + require.NoError(t, setup.StopQuery()) + log.Info("query server is now down") + }() + + // Check each level of the tree can answer expected queries. + type checkResult struct { + leavesVerified int + } + type checkFailure struct { + expected graphiteFindResults + actual graphiteFindResults + failMsg string + } + var ( + verifyFindQueries func(node *graphiteNode, level int) (checkResult, *checkFailure, error) + parallelVerifyFindQueries func(node *graphiteNode, level int) + checkedSeriesAbort = atomic.NewBool(false) + numSeriesChecking = uint64(len(generateSeries)) + checkedSeriesLogEvery = numSeriesChecking / 10 + checkedSeries = atomic.NewUint64(0) + checkedSeriesLog = atomic.NewUint64(0) + // Use custom http client for higher number of max idle conns. + httpClient = xhttp.NewHTTPClient(xhttp.DefaultHTTPClientOptions()) + wg sync.WaitGroup + workerPool = xsync.NewWorkerPool(testOpts.checkConcurrency) + ) + workerPool.Init() + parallelVerifyFindQueries = func(node *graphiteNode, level int) { + // Verify this node at level. + wg.Add(1) + workerPool.Go(func() { + defer wg.Done() + + if checkedSeriesAbort.Load() { + // Do not execute if aborted. + return + } + + result, failure, err := verifyFindQueries(node, level) + if failure == nil && err == nil { + // Account for series checked (for progress report). + checkedSeries.Add(uint64(result.leavesVerified)) + return + } + + // Bail parallel execution (failed require/assert won't stop execution). + if checkedSeriesAbort.CAS(false, true) { + switch { + case failure != nil: + // Assert an error result and log once. + assert.Equal(t, failure.expected, failure.actual, failure.failMsg) + log.Error("aborting checks due to mismatch") + case err != nil: + assert.NoError(t, err) + log.Error("aborting checks due to error") + default: + require.FailNow(t, "unknown error condition") + log.Error("aborting checks due to unknown condition") + } + } + }) + + // Verify children of children. + for _, child := range node.children { + parallelVerifyFindQueries(child, level+1) + } + } + verifyFindQueries = func(node *graphiteNode, level int) (checkResult, *checkFailure, error) { + var r checkResult + + // Write progress report if progress made. + checked := checkedSeries.Load() + nextLog := checked - (checked % checkedSeriesLogEvery) + if lastLog := checkedSeriesLog.Swap(nextLog); lastLog < nextLog { + log.Info("checked series progressing", zap.Int("checked", int(checked))) + } + + // Verify at depth. + numPathParts := len(node.pathParts) + queryPathParts := make([]string, 0, 1+numPathParts) + if numPathParts > 0 { + queryPathParts = append(queryPathParts, node.pathParts...) + } + queryPathParts = append(queryPathParts, "*") + query := strings.Join(queryPathParts, ".") + + params := make(url.Values) + params.Set("query", query) + + url := fmt.Sprintf("http://%s%s?%s", setup.QueryAddress(), + graphitehandler.FindURL, params.Encode()) + + req, err := http.NewRequestWithContext(context.Background(), + http.MethodGet, url, nil) + require.NoError(t, err) + + res, err := httpClient.Do(req) + if err != nil { + return r, nil, err + } + if res.StatusCode != http.StatusOK { + return r, nil, fmt.Errorf("bad response code: expected=%d, actual=%d", + http.StatusOK, res.StatusCode) + } + + defer res.Body.Close() + + // Compare results. + var actual graphiteFindResults + if err := json.NewDecoder(res.Body).Decode(&actual); err != nil { + return r, nil, err + } + + expected := make(graphiteFindResults, 0, len(node.children)) + for _, child := range node.children { + leaf := 0 + if child.isLeaf { + leaf = 1 + r.leavesVerified++ + } + expected = append(expected, graphiteFindResult{ + Text: child.name, + Leaf: leaf, + }) + } + + sortGraphiteFindResults(actual) + sortGraphiteFindResults(expected) + + if !reflect.DeepEqual(expected, actual) { + failMsg := fmt.Sprintf("invalid results: level=%d, parts=%d, query=%s", + level, len(node.pathParts), query) + failMsg += fmt.Sprintf("\n\ndiff:\n%s\n\n", + xtest.Diff(xtest.MustPrettyJSONObject(t, expected), + xtest.MustPrettyJSONObject(t, actual))) + return r, &checkFailure{ + expected: expected, + actual: actual, + failMsg: failMsg, + }, nil + } + + return r, nil, nil + } + + // Check all top level entries and recurse. + log.Info("checking series", + zap.Int("checkConcurrency", testOpts.checkConcurrency), + zap.Uint64("numSeriesChecking", numSeriesChecking)) + parallelVerifyFindQueries(rootNode, 0) + + // Wait for execution. + wg.Wait() + + // Allow for debugging by issuing queries, etc. + if DebugTest() { + log.Info("debug test set, pausing for investigate") + <-make(chan struct{}) + } +} + +type graphiteFindResults []graphiteFindResult + +type graphiteFindResult struct { + Text string `json:"text"` + Leaf int `json:"leaf"` +} + +func sortGraphiteFindResults(r graphiteFindResults) { + sort.Slice(r, func(i, j int) bool { + if r[i].Leaf != r[j].Leaf { + return r[i].Leaf < r[j].Leaf + } + return r[i].Text < r[j].Text + }) +} + +type graphiteNode struct { + name string + pathParts []string + isLeaf bool + children []*graphiteNode +} + +type graphiteNodeChildOptions struct { + isLeaf bool +} + +func (n *graphiteNode) child( + name string, + opts graphiteNodeChildOptions, +) *graphiteNode { + pathParts := append(make([]string, 0, 1+len(n.pathParts)), n.pathParts...) + pathParts = append(pathParts, name) + + child := &graphiteNode{ + name: name, + pathParts: pathParts, + isLeaf: opts.isLeaf, + } + + n.children = append(n.children, child) + + return child +} diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 9c3a3c1360..b0f7da8684 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -25,6 +25,7 @@ import ( "flag" "fmt" "io/ioutil" + "net" "os" "os/exec" "strings" @@ -32,8 +33,18 @@ import ( "testing" "time" + // nolint: gci + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/tchannel-go" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" + queryconfig "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/integration/fake" @@ -58,17 +69,13 @@ import ( "github.com/m3db/m3/src/dbnode/testdata/prototest" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" + queryserver "github.com/m3db/m3/src/query/server" "github.com/m3db/m3/src/x/clock" + xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" - - "github.com/stretchr/testify/require" - "github.com/uber-go/tally" - "github.com/uber/tchannel-go" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) var ( @@ -127,6 +134,7 @@ type testSetup struct { m3dbAdminClient client.AdminClient m3dbVerificationAdminClient client.AdminClient workerPool xsync.WorkerPool + queryAddress string // compare expected with actual data function assertEqual assertTestDataEqual @@ -137,8 +145,10 @@ type testSetup struct { namespaces []namespace.Metadata // signals - doneCh chan struct{} - closedCh chan struct{} + doneCh chan struct{} + closedCh chan struct{} + queryInterruptCh chan error + queryDoneCh chan struct{} } type xNowFn func() xtime.UnixNano @@ -170,6 +180,9 @@ type TestSetup interface { StopServerAndVerifyOpenFilesAreClosed() error StartServer() error StartServerDontWaitBootstrap() error + StopQuery() error + StartQuery(configYAML string) error + QueryAddress() string NowFn() xNowFn ClockNowFn() clock.NowFn SetNowFn(xtime.UnixNano) @@ -843,6 +856,66 @@ func openFiles(parentDir string) []string { return strings.Split(string(out), "\n") } +func (ts *testSetup) StartQuery(configYAML string) error { + m3dbClient := ts.m3dbClient + if m3dbClient == nil { + return fmt.Errorf("dbnode admin client not set") + } + + configFile, cleanup := newTestFile(ts.t, "config.yaml", configYAML) + defer cleanup() + + var cfg queryconfig.Configuration + err := xconfig.LoadFile(&cfg, configFile.Name(), xconfig.Options{}) + if err != nil { + return err + } + + dbClientCh := make(chan client.Client, 1) + dbClientCh <- m3dbClient + clusterClientCh := make(chan clusterclient.Client, 1) + listenerCh := make(chan net.Listener, 1) + localSessionReadyCh := make(chan struct{}, 1) + + ts.queryInterruptCh = make(chan error, 1) + ts.queryDoneCh = make(chan struct{}, 1) + + go func() { + queryserver.Run(queryserver.RunOptions{ + Config: cfg, + InterruptCh: ts.queryInterruptCh, + ListenerCh: listenerCh, + LocalSessionReadyCh: localSessionReadyCh, + DBClient: dbClientCh, + ClusterClient: clusterClientCh, + }) + ts.queryDoneCh <- struct{}{} + }() + + // Wait for local session to connect. + <-localSessionReadyCh + + // Wait for listener. + listener := <-listenerCh + ts.queryAddress = listener.Addr().String() + + return nil +} + +func (ts *testSetup) StopQuery() error { + // Send interrupt. + ts.queryInterruptCh <- fmt.Errorf("interrupt") + + // Wait for done. + <-ts.queryDoneCh + + return nil +} + +func (ts *testSetup) QueryAddress() string { + return ts.queryAddress +} + func (ts *testSetup) TChannelClient() *TestTChannelClient { return ts.tchannelClient } @@ -1189,3 +1262,21 @@ func mustInspectFilesystem(fsOpts fs.Options) fs.Inspection { return inspection } + +func newTestFile(t *testing.T, fileName, contents string) (*os.File, closeFn) { + tmpFile, err := ioutil.TempFile("", fileName) + require.NoError(t, err) + + _, err = tmpFile.WriteString(contents) + require.NoError(t, err) + + return tmpFile, func() { + assert.NoError(t, tmpFile.Close()) + assert.NoError(t, os.Remove(tmpFile.Name())) + } +} + +// DebugTest allows testing to see if a standard debug test env var is set. +func DebugTest() bool { + return os.Getenv("DEBUG_TEST") == "true" +} diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index 23b7f4ae4b..9e341241f2 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -108,9 +108,8 @@ func (m *coldFlushManager) Run(t xtime.UnixNano) bool { m.Unlock() }() - debugLog := m.log.Check(zapcore.DebugLevel, "cold flush run") - if debugLog != nil { - debugLog.Write(zap.String("status", "starting cold flush"), zap.Time("time", t.ToTime())) + if log := m.log.Check(zapcore.DebugLevel, "cold flush run start"); log != nil { + log.Write(zap.Time("time", t.ToTime())) } // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. @@ -133,8 +132,8 @@ func (m *coldFlushManager) Run(t xtime.UnixNano) bool { }) } - if debugLog != nil { - debugLog.Write(zap.String("status", "completed cold flush"), zap.Time("time", t.ToTime())) + if log := m.log.Check(zapcore.DebugLevel, "cold flush run complete"); log != nil { + log.Write(zap.Time("time", t.ToTime())) } return true diff --git a/src/query/server/query.go b/src/query/server/query.go index bb4487b31f..f4ed82eb7c 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -136,6 +136,10 @@ type RunOptions struct { // ready signal once it is open. DownsamplerReadyCh chan<- struct{} + // LocalSessionReadyCh is a programmatic channel to receive the + // local DB session ready signal once it is open. + LocalSessionReadyCh chan struct{} + // InstrumentOptionsReadyCh is a programmatic channel to receive a set of // instrument options and metric reporters that is delivered when // constructed. @@ -472,7 +476,8 @@ func Run(runOpts RunOptions) RunResult { // which generates a session and use the storage with the session. m3dbClusters, m3dbPoolWrapper, err = initClusters(cfg, runOpts.DBConfig, clusterNamespacesWatcher, runOpts.DBClient, encodingOpts, - instrumentOptions, tsdbOpts.CustomAdminOptions()) + runOpts.LocalSessionReadyCh, instrumentOptions, + tsdbOpts.CustomAdminOptions()) if err != nil { logger.Fatal("unable to init clusters", zap.Error(err)) } @@ -968,6 +973,7 @@ func initClusters( clusterNamespacesWatcher m3.ClusterNamespacesWatcher, dbClientCh <-chan client.Client, encodingOpts encoding.Options, + localSessionReadyCh chan struct{}, instrumentOpts instrument.Options, customAdminOptions []client.CustomAdminOption, ) (m3.Clusters, *pools.PoolWrapper, error) { @@ -1018,10 +1024,12 @@ func initClusters( return nil, nil, errors.New("no clusters configured and not running local cluster") } - sessionInitChan := make(chan struct{}) + if localSessionReadyCh == nil { + localSessionReadyCh = make(chan struct{}) + } session := m3db.NewAsyncSession(func() (client.Client, error) { return <-dbClientCh, nil - }, sessionInitChan) + }, localSessionReadyCh) clusterStaticConfig := m3.ClusterStaticConfiguration{ Namespaces: localCfg.Namespaces, @@ -1065,7 +1073,7 @@ func initClusters( poolWrapper = pools.NewAsyncPoolsWrapper() go func() { - <-sessionInitChan + <-localSessionReadyCh poolWrapper.Init(session.IteratorPools()) }() } diff --git a/src/query/stores/m3db/async_session.go b/src/query/stores/m3db/async_session.go index 4c3fbf4bf0..316f0f7d22 100644 --- a/src/query/stores/m3db/async_session.go +++ b/src/query/stores/m3db/async_session.go @@ -64,9 +64,7 @@ func NewAsyncSession(fn NewClientFn, done chan<- struct{}) *AsyncSession { go func() { if asyncSession.done != nil { - defer func() { - asyncSession.done <- struct{}{} - }() + defer close(asyncSession.done) } c, err := fn() diff --git a/src/x/test/diff.go b/src/x/test/diff.go index 4cf1309d9d..2ff4833fa0 100644 --- a/src/x/test/diff.go +++ b/src/x/test/diff.go @@ -22,12 +22,11 @@ package test import ( "encoding/json" - "testing" - - xjson "github.com/m3db/m3/src/x/json" "github.com/sergi/go-diff/diffmatchpatch" "github.com/stretchr/testify/require" + + xjson "github.com/m3db/m3/src/x/json" ) // Diff is a helper method to print a terminal pretty diff of two strings @@ -39,21 +38,28 @@ func Diff(expected, actual string) string { } // MustPrettyJSONMap returns an indented JSON string of the object. -func MustPrettyJSONMap(t *testing.T, value xjson.Map) string { +func MustPrettyJSONMap(t require.TestingT, value xjson.Map) string { pretty, err := json.MarshalIndent(value, "", " ") require.NoError(t, err) return string(pretty) } // MustPrettyJSONArray returns an indented JSON string of the object. -func MustPrettyJSONArray(t *testing.T, value xjson.Array) string { +func MustPrettyJSONArray(t require.TestingT, value xjson.Array) string { + pretty, err := json.MarshalIndent(value, "", " ") + require.NoError(t, err) + return string(pretty) +} + +// MustPrettyJSONObject returns an indented JSON string of the object. +func MustPrettyJSONObject(t require.TestingT, value interface{}) string { pretty, err := json.MarshalIndent(value, "", " ") require.NoError(t, err) return string(pretty) } // MustPrettyJSONString returns an indented version of the JSON. -func MustPrettyJSONString(t *testing.T, str string) string { +func MustPrettyJSONString(t require.TestingT, str string) string { var unmarshalled map[string]interface{} err := json.Unmarshal([]byte(str), &unmarshalled) require.NoError(t, err) diff --git a/src/x/test/util.go b/src/x/test/util.go index f10f1cb5d6..8ee893eb2c 100644 --- a/src/x/test/util.go +++ b/src/x/test/util.go @@ -22,9 +22,31 @@ package test import ( "reflect" + "testing" "unsafe" + + "github.com/stretchr/testify/require" ) +// FailNowPanicsTestingT returns a TestingT that panics on a failed assertion. +// This is useful for aborting a test on failed assertions from an asynchronous +// goroutine (since stretchr calls FailNow on testing.T but it will not abort +// the test unless the goroutine is the one running the benchmark or test). +// For more info see: https://github.com/stretchr/testify/issues/652. +func FailNowPanicsTestingT(t *testing.T) require.TestingT { + return failNowPanicsTestingT{TestingT: t} +} + +var _ require.TestingT = failNowPanicsTestingT{} + +type failNowPanicsTestingT struct { + require.TestingT +} + +func (t failNowPanicsTestingT) FailNow() { + panic("failed assertion") +} + // ByteSlicesBackedBySameData returns a bool indicating if the raw backing bytes // under the []byte slice point to the same memory. func ByteSlicesBackedBySameData(a, b []byte) bool {