diff --git a/go.mod b/go.mod index c40d0e3a82..485f992594 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,6 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/v3 v3.6.0-alpha.0 go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 - go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 go.opentelemetry.io/collector v0.45.0 go.opentelemetry.io/otel v1.4.1 go.opentelemetry.io/otel/bridge/opentracing v1.4.1 @@ -121,7 +120,6 @@ require ( github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/btree v1.0.1 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum index 770fa1c583..22fa99d1b7 100644 --- a/go.sum +++ b/go.sum @@ -1592,8 +1592,6 @@ go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0 h1:BQ6CnNP4pIpy5rusFlTBxAacDgPXhuiHFwoTsB go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0/go.mod h1:/kZdrBXlc5fUgYXfIEQ0B5sb7ejXPKbtF4jWzF1exiQ= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 h1:BQUVqBqNFZZyrRbfydrRLzq9hYvCcRj97SsX1YwD7CA= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0/go.mod h1:3QM2rLq3B3hSXmVEvgVt3vEEbG/AumSs0Is7EgrlKzU= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 h1:3qrZ3p/E7CxdV1kKtAU75hHOcUoXcSTwC7ELKWyzMJo= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0/go.mod h1:hFQkP/cTsZIXXvUv+BsGHZ3TK+76XZMi5GToYA94iac= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/src/aggregator/integration/custom_aggregations_test.go b/src/aggregator/integration/custom_aggregations_test.go index d9c58cbe41..6bae441f46 100644 --- a/src/aggregator/integration/custom_aggregations_test.go +++ b/src/aggregator/integration/custom_aggregations_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -68,7 +68,6 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { if testing.Short() { t.SkipNow() } - aggTypesOpts := aggregation.NewTypesOptions(). SetCounterTypeStringTransformFn(aggregation.SuffixTransform). SetTimerTypeStringTransformFn(aggregation.SuffixTransform). @@ -179,7 +178,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/election.go b/src/aggregator/integration/election.go index 1f24e02263..146055078d 100644 --- a/src/aggregator/integration/election.go +++ b/src/aggregator/integration/election.go @@ -26,9 +26,9 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/services/leader" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) var ( @@ -40,27 +40,38 @@ var ( ) type testCluster struct { - t *testing.T - cluster *integration.Cluster + t *testing.T + cluster *integration.Cluster + leaderService services.LeaderService } func newTestCluster(t *testing.T) *testCluster { integration.BeforeTestExternal(t) - return &testCluster{ + cluster := &testCluster{ t: t, cluster: integration.NewCluster(t, &integration.ClusterConfig{ Size: testClusterSize, }), } + return cluster } func (tc *testCluster) LeaderService() services.LeaderService { + if tc.leaderService != nil { + return tc.leaderService + } + svc, err := leader.NewService(tc.etcdClient(), tc.options()) require.NoError(tc.t, err) - return svc + tc.leaderService = svc + return tc.leaderService } func (tc *testCluster) Close() { + if tc.leaderService != nil { + // amainsd: check error here! + _ = tc.leaderService.Close() + } tc.cluster.Terminate(tc.t) } diff --git a/src/aggregator/integration/metadata_change_test.go b/src/aggregator/integration/metadata_change_test.go index fa93ce7baf..9822579101 100644 --- a/src/aggregator/integration/metadata_change_test.go +++ b/src/aggregator/integration/metadata_change_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/multi_client_one_type_test.go b/src/aggregator/integration/multi_client_one_type_test.go index d05185a9ab..2af620205f 100644 --- a/src/aggregator/integration/multi_client_one_type_test.go +++ b/src/aggregator/integration/multi_client_one_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -126,7 +126,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) for i := 0; i < numClients; i++ { require.NoError(t, clients[i].close()) diff --git a/src/aggregator/integration/one_client_multi_type_forwarded_test.go b/src/aggregator/integration/one_client_multi_type_forwarded_test.go index 9ead187784..039eeae391 100644 --- a/src/aggregator/integration/one_client_multi_type_forwarded_test.go +++ b/src/aggregator/integration/one_client_multi_type_forwarded_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -121,7 +121,7 @@ func TestOneClientMultiTypeForwardedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(2 * time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_timed_test.go b/src/aggregator/integration/one_client_multi_type_timed_test.go index 8449f5858b..80f8d6ec3f 100644 --- a/src/aggregator/integration/one_client_multi_type_timed_test.go +++ b/src/aggregator/integration/one_client_multi_type_timed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -119,7 +119,7 @@ func TestOneClientMultiTypeTimedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(time.Minute + 2*time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_untimed_test.go b/src/aggregator/integration/one_client_multi_type_untimed_test.go index 59f6800b66..0adc3a5c2d 100644 --- a/src/aggregator/integration/one_client_multi_type_untimed_test.go +++ b/src/aggregator/integration/one_client_multi_type_untimed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -26,9 +26,17 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/placement" + "github.com/stretchr/testify/require" +) - "github.com/m3db/m3/src/cluster/placement" +const ( + // waitForDataToFlush is the amount of time we will wait in these tests between finishing writing data to + // the aggregator, and attempting to assert that data went through. + // The aggregator generally, and these tests specifically are quite sensitive to time. + // The tests probably need a bit of a rethink to wait on (or poll for) an actual condition instead of sleeping. + waitForDataToFlush = 10 * time.Second ) func TestOneClientMultiTypeUntimedMetricsWithStagedMetadatas(t *testing.T) { @@ -114,7 +122,7 @@ func testOneClientMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_passthru_test.go b/src/aggregator/integration/one_client_passthru_test.go index 75a6f49e25..489730a123 100644 --- a/src/aggregator/integration/one_client_passthru_test.go +++ b/src/aggregator/integration/one_client_passthru_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2020 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/placement_change_test.go b/src/aggregator/integration/placement_change_test.go index 99683d7f3d..9f798b0ee1 100644 --- a/src/aggregator/integration/placement_change_test.go +++ b/src/aggregator/integration/placement_change_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -227,9 +226,9 @@ func TestPlacementChange(t *testing.T) { } clock.SetNow(start2) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) setPlacement(t, placementKey, clusterClient, finalPlacement) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) for _, data := range datasets[1] { clock.SetNow(data.timestamp) @@ -245,7 +244,7 @@ func TestPlacementChange(t *testing.T) { // Move time forward and wait for flushing to happen. clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) // Remove all the topic consumers before closing clients and servers. This allows to close the // connections between servers while they still are running. Otherwise, during server shutdown, diff --git a/src/aggregator/integration/resend_stress_test.go b/src/aggregator/integration/resend_stress_test.go index 1db0e5543c..e4934bf1db 100644 --- a/src/aggregator/integration/resend_stress_test.go +++ b/src/aggregator/integration/resend_stress_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/same_id_multi_type_test.go b/src/aggregator/integration/same_id_multi_type_test.go index 09974adb2d..fe6b70d999 100644 --- a/src/aggregator/integration/same_id_multi_type_test.go +++ b/src/aggregator/integration/same_id_multi_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testSameIDMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/setup.go b/src/aggregator/integration/setup.go index 07ed781b0b..80eea426b2 100644 --- a/src/aggregator/integration/setup.go +++ b/src/aggregator/integration/setup.go @@ -92,6 +92,7 @@ type testServerSetup struct { // Signals. doneCh chan struct{} closedCh chan struct{} + stopped bool } func newTestServerSetup(t *testing.T, opts testServerOptions) *testServerSetup { @@ -452,6 +453,10 @@ func (ts *testServerSetup) sortedResults() []aggregated.MetricWithStoragePolicy } func (ts *testServerSetup) stopServer() error { + if ts.stopped { + return nil + } + ts.stopped = true if err := ts.aggregator.Close(); err != nil { return err } @@ -464,6 +469,9 @@ func (ts *testServerSetup) stopServer() error { func (ts *testServerSetup) close() { ts.electionCluster.Close() + if err := ts.stopServer(); err != nil { + panic(err.Error()) + } } func (tss testServerSetups) newClient(t *testing.T) *client { diff --git a/src/cluster/client/etcd/client.go b/src/cluster/client/etcd/client.go index af4a71828a..6dd5f0a338 100644 --- a/src/cluster/client/etcd/client.go +++ b/src/cluster/client/etcd/client.go @@ -339,8 +339,14 @@ func newConfigFromCluster(rnd randInt63N, cluster Cluster) (clientv3.Config, err if err != nil { return clientv3.Config{}, err } + + // Support disabling autosync if a user very explicitly requests it (via negative duration). + autoSyncInterval := cluster.AutoSyncInterval() + if autoSyncInterval < 0 { + autoSyncInterval = 0 + } cfg := clientv3.Config{ - AutoSyncInterval: cluster.AutoSyncInterval(), + AutoSyncInterval: autoSyncInterval, DialTimeout: cluster.DialTimeout(), DialOptions: cluster.DialOptions(), Endpoints: cluster.Endpoints(), diff --git a/src/cluster/client/etcd/client_test.go b/src/cluster/client/etcd/client_test.go index 3d1d0b60db..343842eb4e 100644 --- a/src/cluster/client/etcd/client_test.go +++ b/src/cluster/client/etcd/client_test.go @@ -25,18 +25,23 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/kv" + "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" + "github.com/m3db/m3/src/x/retry" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "google.golang.org/grpc" - - "github.com/m3db/m3/src/cluster/kv" - "github.com/m3db/m3/src/cluster/services" ) func TestETCDClientGen(t *testing.T) { - cs, err := NewConfigServiceClient(testOptions()) + cs, err := NewConfigServiceClient( + testOptions(). + // These are error cases; don't retry for no reason. + SetRetryOptions(retry.NewOptions().SetMaxRetries(0)), + ) require.NoError(t, err) c := cs.(*csclient) @@ -414,6 +419,15 @@ func Test_newConfigFromCluster(t *testing.T) { ) }) + t.Run("negative autosync on M3 disables autosync for etcd", func(t *testing.T) { + inputCfg := newFullConfig() + inputCfg.AutoSyncInterval = -1 + etcdCfg, err := newConfigFromCluster(testRnd, inputCfg.NewCluster()) + require.NoError(t, err) + + assert.Equal(t, time.Duration(0), etcdCfg.AutoSyncInterval) + }) + // Separate test just because the assert.Equal won't work for functions. t.Run("passes through dial options", func(t *testing.T) { clusterCfg := newFullConfig() diff --git a/src/cluster/client/etcd/config.go b/src/cluster/client/etcd/config.go index 877f2051e5..520afa385e 100644 --- a/src/cluster/client/etcd/config.go +++ b/src/cluster/client/etcd/config.go @@ -35,12 +35,22 @@ import ( // ClusterConfig is the config for a zoned etcd cluster. type ClusterConfig struct { - Zone string `yaml:"zone"` - Endpoints []string `yaml:"endpoints"` - KeepAlive *KeepAliveConfig `yaml:"keepAlive"` - TLS *TLSConfig `yaml:"tls"` - AutoSyncInterval time.Duration `yaml:"autoSyncInterval"` - DialTimeout time.Duration `yaml:"dialTimeout"` + Zone string `yaml:"zone"` + Endpoints []string `yaml:"endpoints"` + KeepAlive *KeepAliveConfig `yaml:"keepAlive"` + TLS *TLSConfig `yaml:"tls"` + // AutoSyncInterval configures the etcd client's AutoSyncInterval + // (go.etcd.io/etcd/client/v3@v3.6.0-alpha.0/config.go:32). + // By default, it is 1m. + // + // Advanced: + // + // One important difference from the etcd config: we have autosync *on* by default (unlike etcd), meaning that + // the zero value here doesn't indicate autosync off. + // Instead, users should pass in a negative value to indicate "disable autosync" + // Only do this if you truly have a good reason for it! Most production use cases want autosync on. + AutoSyncInterval time.Duration `yaml:"autoSyncInterval"` + DialTimeout time.Duration `yaml:"dialTimeout"` DialOptions []grpc.DialOption `yaml:"-"` // nonserializable } @@ -59,7 +69,10 @@ func (c ClusterConfig) NewCluster() Cluster { SetKeepAliveOptions(keepAliveOpts). SetTLSOptions(c.TLS.newOptions()) - if c.AutoSyncInterval > 0 { + // Autosync should *always* be on, unless the user very explicitly requests it to be off. They can do this via a + // negative value (in which case we can assume they know what they're doing). + // Therefore, only update if it's nonzero, on the assumption that zero is just the empty value. + if c.AutoSyncInterval != 0 { cluster = cluster.SetAutoSyncInterval(c.AutoSyncInterval) } diff --git a/src/cluster/client/etcd/config_test.go b/src/cluster/client/etcd/config_test.go index 1bc8959117..278b41bb1f 100644 --- a/src/cluster/client/etcd/config_test.go +++ b/src/cluster/client/etcd/config_test.go @@ -181,3 +181,10 @@ func TestDefaultConfig(t *testing.T) { require.Equal(t, defaultDialTimeout, cluster.DialTimeout()) require.Equal(t, defaultAutoSyncInterval, cluster.AutoSyncInterval()) } + +func TestConfig_negativeAutosync(t *testing.T) { + cluster := ClusterConfig{ + AutoSyncInterval: -5, + }.NewCluster() + require.Equal(t, time.Duration(-5), cluster.AutoSyncInterval()) +} diff --git a/src/cluster/client/etcd/types.go b/src/cluster/client/etcd/types.go index 9caf012e70..5b60832801 100644 --- a/src/cluster/client/etcd/types.go +++ b/src/cluster/client/etcd/types.go @@ -159,6 +159,11 @@ type Cluster interface { SetTLSOptions(TLSOptions) Cluster AutoSyncInterval() time.Duration + + // SetAutoSyncInterval sets the etcd client to autosync cluster endpoints periodically. This defaults to + // 1 minute (defaultAutoSyncInterval). If negative or zero, it will disable autosync. This differs slightly + // from the underlying etcd configuration its setting, which only supports 0 for disabling. We do this because + // there's otherwise no good way to specify "disable" in our configs (which default to SetAutoSyncInterval(1m)). SetAutoSyncInterval(value time.Duration) Cluster DialTimeout() time.Duration diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index d65758e8ff..ddc8df306c 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -22,16 +22,15 @@ package watchmanager import ( "fmt" - "runtime" "sync/atomic" "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" "github.com/m3db/m3/src/x/clock" @@ -164,114 +163,117 @@ func TestWatchRecreate(t *testing.T) { <-doneCh } -func TestWatchNoLeader(t *testing.T) { - t.Skip("flaky, started to fail very consistently on CI") - const ( - watchInitAndRetryDelay = 200 * time.Millisecond - watchCheckInterval = 50 * time.Millisecond - ) - - integration.BeforeTestExternal(t) - ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) - defer ecluster.Terminate(t) - - var ( - ec = ecluster.Client(0) - tickDuration = 10 * time.Millisecond - electionTimeout = time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration - doneCh = make(chan struct{}, 1) - eventLog = []*clientv3.Event{} - updateCalled int32 - shouldStop int32 - ) - - opts := NewOptions(). - SetClient(ec). - SetUpdateFn( - func(_ string, e []*clientv3.Event) error { - atomic.AddInt32(&updateCalled, 1) - if len(e) > 0 { - eventLog = append(eventLog, e...) - } - return nil - }, - ). - SetTickAndStopFn( - func(string) bool { - if atomic.LoadInt32(&shouldStop) == 0 { - return false - } - - close(doneCh) - - return true - }, - ). - SetWatchChanInitTimeout(watchInitAndRetryDelay). - SetWatchChanResetInterval(watchInitAndRetryDelay). - SetWatchChanCheckInterval(watchCheckInterval) - - integration.WaitClientV3(t, ec) - - wh, err := NewWatchManager(opts) - require.NoError(t, err) - - go wh.Watch("foo") - - runtime.Gosched() - time.Sleep(10 * time.Millisecond) - - // there should be a valid watch now, trigger a notification - _, err = ec.Put(context.Background(), "foo", "bar") - require.NoError(t, err) - - leaderIdx := ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - - // simulate quorum loss - ecluster.Members[1].Stop(t) - ecluster.Members[2].Stop(t) - - // wait for election timeout, then member[0] will not have a leader. - time.Sleep(electionTimeout) - - require.NoError(t, ecluster.Members[1].Restart(t)) - require.NoError(t, ecluster.Members[2].Restart(t)) - - // wait for leader + election delay just in case - time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) - - leaderIdx = ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - integration.WaitClientV3(t, ec) // wait for client to be ready again - - _, err = ec.Put(context.Background(), "foo", "baz") - require.NoError(t, err) - - // give some time for watch to be updated - require.True(t, clock.WaitUntil(func() bool { - return atomic.LoadInt32(&updateCalled) >= 2 - }, 10*time.Second)) - - updates := atomic.LoadInt32(&updateCalled) - if updates < 2 { - require.Fail(t, - "insufficient update calls", - "expected at least 2 update attempts, got %d during a partition", - updates) - } - - atomic.AddInt32(&shouldStop, 1) - <-doneCh - - require.Len(t, eventLog, 2) - require.NotNil(t, eventLog[0]) - require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) - require.NotNil(t, eventLog[1]) - require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) -} +// TODO: this test has been skipped for a while, and now doesn't work with the docker based etcd integration package. +// Revive it if it's useful, and make it no longer flake. +//nolint:gocritic +//func TestWatchNoLeader(t *testing.T) { +// t.Skip("flaky, started to fail very consistently on CI") +// const ( +// watchInitAndRetryDelay = 200 * time.Millisecond +// watchCheckInterval = 50 * time.Millisecond +// ) +// +// integration.BeforeTestExternal(t) +// ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) +// defer ecluster.Terminate(t) +// +// var ( +// ec = ecluster.Client(0) +// tickDuration = 10 * time.Millisecond +// electionTimeout = time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration +// doneCh = make(chan struct{}, 1) +// eventLog = []*clientv3.Event{} +// updateCalled int32 +// shouldStop int32 +// ) +// +// opts := NewOptions(). +// SetClient(ec). +// SetUpdateFn( +// func(_ string, e []*clientv3.Event) error { +// atomic.AddInt32(&updateCalled, 1) +// if len(e) > 0 { +// eventLog = append(eventLog, e...) +// } +// return nil +// }, +// ). +// SetTickAndStopFn( +// func(string) bool { +// if atomic.LoadInt32(&shouldStop) == 0 { +// return false +// } +// +// close(doneCh) +// +// return true +// }, +// ). +// SetWatchChanInitTimeout(watchInitAndRetryDelay). +// SetWatchChanResetInterval(watchInitAndRetryDelay). +// SetWatchChanCheckInterval(watchCheckInterval) +// +// integration.WaitClientV3(t, ec) +// +// wh, err := NewWatchManager(opts) +// require.NoError(t, err) +// +// go wh.Watch("foo") +// +// runtime.Gosched() +// time.Sleep(10 * time.Millisecond) +// +// // there should be a valid watch now, trigger a notification +// _, err = ec.Put(context.Background(), "foo", "bar") +// require.NoError(t, err) +// +// leaderIdx := ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// +// // simulate quorum loss +// ecluster.Address[1].Stop(t) +// ecluster.Address[2].Stop(t) +// +// // wait for election timeout, then member[0] will not have a leader. +// time.Sleep(electionTimeout) +// +// require.NoError(t, ecluster.Address[1].Restart(t)) +// require.NoError(t, ecluster.Address[2].Restart(t)) +// +// // wait for leader + election delay just in case +// time.Sleep(time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration) +// +// leaderIdx = ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// integration.WaitClientV3(t, ec) // wait for client to be ready again +// +// _, err = ec.Put(context.Background(), "foo", "baz") +// require.NoError(t, err) +// +// // give some time for watch to be updated +// require.True(t, clock.WaitUntil(func() bool { +// return atomic.LoadInt32(&updateCalled) >= 2 +// }, 10*time.Second)) +// +// updates := atomic.LoadInt32(&updateCalled) +// if updates < 2 { +// require.Fail(t, +// "insufficient update calls", +// "expected at least 2 update attempts, got %d during a partition", +// updates) +// } +// +// atomic.AddInt32(&shouldStop, 1) +// <-doneCh +// +// require.Len(t, eventLog, 2) +// require.NotNil(t, eventLog[0]) +// require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) +// require.NotNil(t, eventLog[1]) +// require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) +//} func TestWatchCompactedRevision(t *testing.T) { wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) diff --git a/src/cluster/integration/etcd/etcd.go b/src/cluster/integration/etcd/etcd.go index 2c93bf1200..b836e36967 100644 --- a/src/cluster/integration/etcd/etcd.go +++ b/src/cluster/integration/etcd/etcd.go @@ -21,26 +21,24 @@ package etcd import ( + "context" "encoding/json" "fmt" "io/ioutil" "net/http" - "net/url" - "os" "strings" - "time" "github.com/m3db/m3/src/cluster/client" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cluster/services" - xclock "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" + "github.com/m3db/m3/src/x/instrument" - "go.etcd.io/etcd/server/v3/embed" + "github.com/ory/dockertest/v3" ) type embeddedKV struct { - etcd *embed.Etcd + etcd *dockerexternal.EtcdNode opts Options dir string } @@ -51,12 +49,12 @@ func New(opts Options) (EmbeddedKV, error) { if err != nil { return nil, err } - cfg := embed.NewConfig() - cfg.Dir = dir - cfg.Logger = "zap" - setRandomPorts(cfg) - e, err := embed.StartEtcd(cfg) + pool, err := dockertest.NewPool("") + if err != nil { + return nil, fmt.Errorf("constructing dockertest.Pool for EmbeddedKV: %w", err) + } + e, err := dockerexternal.NewEtcd(pool, instrument.NewOptions()) if err != nil { return nil, fmt.Errorf("unable to start etcd, err: %v", err) } @@ -67,56 +65,16 @@ func New(opts Options) (EmbeddedKV, error) { }, nil } -func setRandomPorts(cfg *embed.Config) { - randomPortURL, err := url.Parse("http://localhost:0") - if err != nil { - panic(err.Error()) - } - - cfg.LPUrls = []url.URL{*randomPortURL} - cfg.APUrls = []url.URL{*randomPortURL} - cfg.LCUrls = []url.URL{*randomPortURL} - cfg.ACUrls = []url.URL{*randomPortURL} - - cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) -} - func (e *embeddedKV) Close() error { - var multi errors.MultiError - - // see if there's any errors - select { - case err := <-e.etcd.Err(): - multi = multi.Add(err) - default: - } - - // shutdown and release - e.etcd.Server.Stop() - e.etcd.Close() - - multi = multi.Add(os.RemoveAll(e.dir)) - return multi.FinalError() + return e.etcd.Close(context.TODO()) } func (e *embeddedKV) Start() error { timeout := e.opts.InitTimeout() - select { - case <-e.etcd.Server.ReadyNotify(): - break - case <-time.After(timeout): - return fmt.Errorf("etcd server took too long to start") - } - - // ensure v3 api endpoints are available, https://github.com/coreos/etcd/pull/7075 - apiVersionEndpoint := fmt.Sprintf("http://%s/version", e.etcd.Clients[0].Addr().String()) - fn := func() bool { return version3Available(apiVersionEndpoint) } - ok := xclock.WaitUntil(fn, timeout) - if !ok { - return fmt.Errorf("api version 3 not available") - } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - return nil + return e.etcd.Setup(ctx) } type versionResponse struct { @@ -144,11 +102,7 @@ func version3Available(endpoint string) bool { } func (e *embeddedKV) Endpoints() []string { - addresses := make([]string, 0, len(e.etcd.Clients)) - for _, c := range e.etcd.Clients { - addresses = append(addresses, c.Addr().String()) - } - return addresses + return []string{e.etcd.Address()} } func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, error) { @@ -156,7 +110,9 @@ func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, err SetInstrumentOptions(e.opts.InstrumentOptions()). SetServicesOptions(services.NewOptions().SetInitTimeout(e.opts.InitTimeout())). SetClusters([]etcdclient.Cluster{ - etcdclient.NewCluster().SetZone(e.opts.Zone()).SetEndpoints(e.Endpoints()), + etcdclient.NewCluster().SetZone(e.opts.Zone()). + SetEndpoints(e.Endpoints()). + SetAutoSyncInterval(-1), }). SetService(e.opts.ServiceID()). SetEnv(e.opts.Environment()). diff --git a/src/cluster/integration/etcd/options.go b/src/cluster/integration/etcd/options.go index 2c986bebfb..4382996a3a 100644 --- a/src/cluster/integration/etcd/options.go +++ b/src/cluster/integration/etcd/options.go @@ -27,7 +27,7 @@ import ( ) const ( - defaulTimeout = 5 * time.Second + defaultTimeout = 30 * time.Second defaultDir = "etcd.dir" defaultServiceID = "integration.service" defaultEnv = "integration.env" @@ -48,7 +48,7 @@ func NewOptions() Options { return &opts{ iopts: instrument.NewOptions(), workingDir: defaultDir, - initTimeout: defaulTimeout, + initTimeout: defaultTimeout, serviceID: defaultServiceID, env: defaultEnv, zone: defaultZone, diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index a4f4773934..64c7e781d2 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -36,10 +36,10 @@ import ( "github.com/m3db/m3/src/x/retry" "github.com/golang/protobuf/proto" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -513,7 +513,7 @@ func TestWatchNonBlocking(t *testing.T) { ecluster, opts, closeFn := testCluster(t) defer closeFn() - ec := ecluster.Client(0) + ec := ecluster.RandClient() opts = opts.SetWatchChanResetInterval(200 * time.Millisecond).SetWatchChanInitTimeout(500 * time.Millisecond) diff --git a/src/cluster/services/heartbeat/etcd/store_test.go b/src/cluster/services/heartbeat/etcd/store_test.go index 4950becc1e..b38025259b 100644 --- a/src/cluster/services/heartbeat/etcd/store_test.go +++ b/src/cluster/services/heartbeat/etcd/store_test.go @@ -28,9 +28,9 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) func TestKeys(t *testing.T) { diff --git a/src/cluster/services/leader/client_test.go b/src/cluster/services/leader/client_test.go index 766d9e59fd..0fb7b75d4c 100644 --- a/src/cluster/services/leader/client_test.go +++ b/src/cluster/services/leader/client_test.go @@ -30,10 +30,10 @@ import ( "github.com/m3db/m3/src/cluster/services/leader/campaign" "github.com/m3db/m3/src/cluster/services/leader/election" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -127,11 +127,15 @@ func TestNewClient(t *testing.T) { assert.NotNil(t, svc) } +// TODO: this test most likely wasn't testing what we thought it was. While using etcd/testing/framework/integration, +// the client gets closed func TestNewClient_BadCluster(t *testing.T) { + t.Skip("This test only works with the etcd/testing/framework/integration package, " + + "and doesn't provide much signal on correctness of our code.") tc := newTestCluster(t) cl := tc.etcdClient() tc.close() - + require.NoError(t, cl.Close()) _, err := newClient(cl, tc.options(), "") assert.Error(t, err) } diff --git a/src/cluster/services/leader/election/client_test.go b/src/cluster/services/leader/election/client_test.go index af9c0b1759..6cc2db348d 100644 --- a/src/cluster/services/leader/election/client_test.go +++ b/src/cluster/services/leader/election/client_test.go @@ -25,11 +25,11 @@ import ( "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/tests/v3/framework/integration" ) type testCluster struct { diff --git a/src/cmd/services/m3dbnode/main/main_test.go b/src/cmd/services/m3dbnode/main/main_test.go index f488662251..f46e6c39aa 100644 --- a/src/cmd/services/m3dbnode/main/main_test.go +++ b/src/cmd/services/m3dbnode/main/main_test.go @@ -1,4 +1,6 @@ +//go:build big // +build big + // // Copyright (c) 2017 Uber Technologies, Inc. // @@ -51,7 +53,7 @@ import ( // TestConfig tests booting a server using file based configuration. func TestConfig(t *testing.T) { // Embedded kv - embeddedKV, err := etcd.New(etcd.NewOptions()) + embeddedKV, err := etcd.New(etcd.NewOptions().SetInitTimeout(30 * time.Second)) require.NoError(t, err) defer func() { require.NoError(t, embeddedKV.Close()) @@ -631,6 +633,7 @@ db: etcdClusters: - zone: {{.ServiceZone}} endpoints: {{.EtcdEndpoints}} + autoSyncInterval: -1 ` embeddedKVConfigPortion = `