From d7b617cb83faa16e15b76c8b239c6b965e54759d Mon Sep 17 00:00:00 2001 From: Andrew Mains Date: Mon, 29 Aug 2022 15:52:07 -0400 Subject: [PATCH] etcd_docker 4: Incorporate docker based etcd into Go integration tests PR 4 for https://github.com/m3db/m3/issues/4144 High level approach is as described in https://github.com/m3db/m3/issues/4144 . This PR integrates docker based etcd into our Go integration tests. It removes the need to have the embed package running in m3db for them, but doesn't yet touch that functionality. commit-id:3ae12ffd --- src/integration/aggregator/aggregator.go | 11 ++ src/integration/aggregator/aggregator_test.go | 2 + .../repair/repair_and_replication_test.go | 74 +++++--- .../resources/coordinator_client.go | 50 +++--- .../resources/docker/dockerexternal/etcd.go | 2 - .../resources/inprocess/aggregator.go | 14 ++ .../resources/inprocess/cluster.go | 165 ++++++++++-------- .../resources/inprocess/coordinator_test.go | 16 ++ .../resources/inprocess/dbnode_test.go | 2 + .../resources/inprocess/inprocess.go | 11 ++ src/integration/resources/options.go | 23 +++ 11 files changed, 254 insertions(+), 116 deletions(-) diff --git a/src/integration/aggregator/aggregator.go b/src/integration/aggregator/aggregator.go index 5e2dd0878f..c1fad2922e 100644 --- a/src/integration/aggregator/aggregator.go +++ b/src/integration/aggregator/aggregator.go @@ -117,6 +117,17 @@ ingest: maxBackoff: 10s jitter: true storeMetricsType: true + +clusterManagement: + etcd: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - 127.0.0.1:2379 ` // TestAggregatorAggregatorConfig is the test config for the aggregators. diff --git a/src/integration/aggregator/aggregator_test.go b/src/integration/aggregator/aggregator_test.go index 91bf35082f..0e2d48a289 100644 --- a/src/integration/aggregator/aggregator_test.go +++ b/src/integration/aggregator/aggregator_test.go @@ -1,4 +1,6 @@ +//go:build cluster_integration // +build cluster_integration + // // Copyright (c) 2021 Uber Technologies, Inc. // diff --git a/src/integration/repair/repair_and_replication_test.go b/src/integration/repair/repair_and_replication_test.go index d122650225..2e88acfb8d 100644 --- a/src/integration/repair/repair_and_replication_test.go +++ b/src/integration/repair/repair_and_replication_test.go @@ -1,4 +1,6 @@ +//go:build cluster_integration // +build cluster_integration + // // Copyright (c) 2021 Uber Technologies, Inc. // @@ -23,16 +25,21 @@ package repair import ( + "context" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/m3db/m3/src/integration/resources" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" "github.com/m3db/m3/src/integration/resources/inprocess" + "github.com/m3db/m3/src/x/instrument" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestRepairAndReplication(t *testing.T) { + t.Skip("failing after etcd containerization; fix.") cluster1, cluster2, closer := testSetup(t) defer closer() @@ -40,11 +47,23 @@ func TestRepairAndReplication(t *testing.T) { } func testSetup(t *testing.T) (resources.M3Resources, resources.M3Resources, func()) { - fullCfgs1 := getClusterFullConfgs(t) - fullCfgs2 := getClusterFullConfgs(t) + pool, err := dockertest.NewPool("") + require.NoError(t, err) - ep1 := fullCfgs1.Configs.Coordinator.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints - ep2 := fullCfgs2.Configs.Coordinator.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints + etcd1 := mustNewStartedEtcd(t, pool) + etcd2 := mustNewStartedEtcd(t, pool) + + ep1 := []string{etcd1.Address()} + ep2 := []string{etcd2.Address()} + + cluster1Opts := newTestClusterOptions() + cluster1Opts.EtcdEndpoints = ep1 + + cluster2Opts := newTestClusterOptions() + cluster2Opts.EtcdEndpoints = ep2 + + fullCfgs1 := getClusterFullConfgs(t, cluster1Opts) + fullCfgs2 := getClusterFullConfgs(t, cluster2Opts) setRepairAndReplicationCfg( &fullCfgs1, @@ -57,19 +76,28 @@ func testSetup(t *testing.T) (resources.M3Resources, resources.M3Resources, func ep1, ) - cluster1, err := inprocess.NewClusterFromSpecification(fullCfgs1, clusterOptions) + cluster1, err := inprocess.NewClusterFromSpecification(fullCfgs1, cluster1Opts) require.NoError(t, err) - cluster2, err := inprocess.NewClusterFromSpecification(fullCfgs2, clusterOptions) + cluster2, err := inprocess.NewClusterFromSpecification(fullCfgs2, cluster2Opts) require.NoError(t, err) return cluster1, cluster2, func() { + etcd1.Close(context.TODO()) + etcd2.Close(context.TODO()) assert.NoError(t, cluster1.Cleanup()) assert.NoError(t, cluster2.Cleanup()) } } -func getClusterFullConfgs(t *testing.T) inprocess.ClusterSpecification { +func mustNewStartedEtcd(t *testing.T, pool *dockertest.Pool) *dockerexternal.EtcdNode { + etcd, err := dockerexternal.NewEtcd(pool, instrument.NewOptions()) + require.NoError(t, err) + require.NoError(t, etcd.Setup(context.TODO())) + return etcd +} + +func getClusterFullConfgs(t *testing.T, clusterOptions resources.ClusterOptions) inprocess.ClusterSpecification { cfgs, err := inprocess.NewClusterConfigsFromYAML( TestRepairDBNodeConfig, TestRepairCoordinatorConfig, "", ) @@ -84,18 +112,22 @@ func getClusterFullConfgs(t *testing.T) inprocess.ClusterSpecification { func setRepairAndReplicationCfg(fullCfg *inprocess.ClusterSpecification, clusterName string, endpoints []string) { for _, dbnode := range fullCfg.Configs.DBNodes { dbnode.DB.Replication.Clusters[0].Name = clusterName - dbnode.DB.Replication.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints = endpoints + etcdService := &(dbnode.DB.Replication.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0]) + etcdService.AutoSyncInterval = -1 + etcdService.Endpoints = endpoints } } -var clusterOptions = resources.ClusterOptions{ - DBNode: &resources.DBNodeClusterOptions{ - RF: 2, - NumShards: 4, - NumInstances: 1, - NumIsolationGroups: 2, - }, - Coordinator: resources.CoordinatorClusterOptions{ - GeneratePorts: true, - }, +func newTestClusterOptions() resources.ClusterOptions { + return resources.ClusterOptions{ + DBNode: &resources.DBNodeClusterOptions{ + RF: 2, + NumShards: 4, + NumInstances: 1, + NumIsolationGroups: 2, + }, + Coordinator: resources.CoordinatorClusterOptions{ + GeneratePorts: true, + }, + } } diff --git a/src/integration/resources/coordinator_client.go b/src/integration/resources/coordinator_client.go index b930fbc52b..72a0dc74d3 100644 --- a/src/integration/resources/coordinator_client.go +++ b/src/integration/resources/coordinator_client.go @@ -59,8 +59,8 @@ var errUnknownServiceType = errors.New("unknown service type") // operation until successful. type RetryFunc func(op func() error) error -// ZapMethod appends the method as a log field. -func ZapMethod(s string) zapcore.Field { return zap.String("method", s) } +// zapMethod appends the method as a log field. +func zapMethod(s string) zapcore.Field { return zap.String("method", s) } // CoordinatorClient is a client use to invoke API calls // on a coordinator @@ -97,7 +97,7 @@ func (c *CoordinatorClient) makeURL(resource string) string { func (c *CoordinatorClient) GetNamespace() (admin.NamespaceGetResponse, error) { url := c.makeURL("api/v1/services/m3db/namespace") logger := c.logger.With( - ZapMethod("getNamespace"), zap.String("url", url)) + zapMethod("getNamespace"), zap.String("url", url)) //nolint:noctx resp, err := c.client.Get(url) @@ -129,7 +129,7 @@ func (c *CoordinatorClient) GetPlacement(opts PlacementRequestOptions) (admin.Pl } url := c.makeURL(handlerurl) logger := c.logger.With( - ZapMethod("getPlacement"), zap.String("url", url)) + zapMethod("getPlacement"), zap.String("url", url)) resp, err := c.makeRequest(logger, url, placementhandler.GetHTTPMethod, nil, placementOptsToMap(opts)) if err != nil { @@ -163,7 +163,7 @@ func (c *CoordinatorClient) InitPlacement( } url := c.makeURL(handlerurl) logger := c.logger.With( - ZapMethod("initPlacement"), zap.String("url", url)) + zapMethod("initPlacement"), zap.String("url", url)) resp, err := c.makeRequest(logger, url, placementhandler.InitHTTPMethod, &initRequest, placementOptsToMap(opts)) if err != nil { @@ -194,7 +194,7 @@ func (c *CoordinatorClient) DeleteAllPlacements(opts PlacementRequestOptions) er } url := c.makeURL(handlerurl) logger := c.logger.With( - ZapMethod("deleteAllPlacements"), zap.String("url", url)) + zapMethod("deleteAllPlacements"), zap.String("url", url)) resp, err := c.makeRequest( logger, url, placementhandler.DeleteAllHTTPMethod, nil, placementOptsToMap(opts), @@ -221,7 +221,7 @@ func (c *CoordinatorClient) DeleteAllPlacements(opts PlacementRequestOptions) er // NB: if the name string is empty, this will instead // check for a successful response. func (c *CoordinatorClient) WaitForNamespace(name string) error { - logger := c.logger.With(ZapMethod("waitForNamespace")) + logger := c.logger.With(zapMethod("waitForNamespace")) return c.retryFunc(func() error { ns, err := c.GetNamespace() if err != nil { @@ -250,7 +250,7 @@ func (c *CoordinatorClient) WaitForNamespace(name string) error { func (c *CoordinatorClient) WaitForInstances( ids []string, ) error { - logger := c.logger.With(ZapMethod("waitForPlacement")) + logger := c.logger.With(zapMethod("waitForPlacement")) return c.retryFunc(func() error { placement, err := c.GetPlacement(PlacementRequestOptions{Service: ServiceTypeM3DB}) if err != nil { @@ -282,7 +282,7 @@ func (c *CoordinatorClient) WaitForInstances( // WaitForShardsReady waits until all shards gets ready. func (c *CoordinatorClient) WaitForShardsReady() error { - logger := c.logger.With(ZapMethod("waitForShards")) + logger := c.logger.With(zapMethod("waitForShards")) return c.retryFunc(func() error { placement, err := c.GetPlacement(PlacementRequestOptions{Service: ServiceTypeM3DB}) if err != nil { @@ -307,7 +307,7 @@ func (c *CoordinatorClient) WaitForShardsReady() error { func (c *CoordinatorClient) WaitForClusterReady() error { var ( url = c.makeURL("ready") - logger = c.logger.With(ZapMethod("waitForClusterReady"), zap.String("url", url)) + logger = c.logger.With(zapMethod("waitForClusterReady"), zap.String("url", url)) ) return c.retryFunc(func() error { req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) @@ -350,7 +350,7 @@ func (c *CoordinatorClient) CreateDatabase( ) (admin.DatabaseCreateResponse, error) { url := c.makeURL("api/v1/database/create") logger := c.logger.With( - ZapMethod("createDatabase"), zap.String("url", url), + zapMethod("createDatabase"), zap.String("url", url), zap.String("request", addRequest.String())) resp, err := c.makeRequest(logger, url, http.MethodPost, &addRequest, nil) @@ -383,7 +383,7 @@ func (c *CoordinatorClient) AddNamespace( ) (admin.NamespaceGetResponse, error) { url := c.makeURL("api/v1/services/m3db/namespace") logger := c.logger.With( - ZapMethod("addNamespace"), zap.String("url", url), + zapMethod("addNamespace"), zap.String("url", url), zap.String("request", addRequest.String())) resp, err := c.makeRequest(logger, url, http.MethodPost, &addRequest, nil) @@ -411,7 +411,7 @@ func (c *CoordinatorClient) UpdateNamespace( ) (admin.NamespaceGetResponse, error) { url := c.makeURL("api/v1/services/m3db/namespace") logger := c.logger.With( - ZapMethod("updateNamespace"), zap.String("url", url), + zapMethod("updateNamespace"), zap.String("url", url), zap.String("request", req.String())) resp, err := c.makeRequest(logger, url, http.MethodPut, &req, nil) @@ -431,7 +431,7 @@ func (c *CoordinatorClient) UpdateNamespace( func (c *CoordinatorClient) setNamespaceReady(name string) error { url := c.makeURL("api/v1/services/m3db/namespace/ready") logger := c.logger.With( - ZapMethod("setNamespaceReady"), zap.String("url", url), + zapMethod("setNamespaceReady"), zap.String("url", url), zap.String("namespace", name)) _, err := c.makeRequest(logger, url, http.MethodPost, // nolint: bodyclose @@ -445,7 +445,7 @@ func (c *CoordinatorClient) setNamespaceReady(name string) error { // DeleteNamespace removes the namespace. func (c *CoordinatorClient) DeleteNamespace(namespaceID string) error { url := c.makeURL("api/v1/services/m3db/namespace/" + namespaceID) - logger := c.logger.With(ZapMethod("deleteNamespace"), zap.String("url", url)) + logger := c.logger.With(zapMethod("deleteNamespace"), zap.String("url", url)) if _, err := c.makeRequest(logger, url, http.MethodDelete, nil, nil); err != nil { // nolint: bodyclose logger.Error("failed to delete namespace", zap.Error(err)) @@ -462,7 +462,7 @@ func (c *CoordinatorClient) InitM3msgTopic( ) (admin.TopicGetResponse, error) { url := c.makeURL(topic.InitURL) logger := c.logger.With( - ZapMethod("initM3msgTopic"), + zapMethod("initM3msgTopic"), zap.String("url", url), zap.String("request", initRequest.String()), zap.String("topic", fmt.Sprintf("%v", topicOpts))) @@ -489,7 +489,7 @@ func (c *CoordinatorClient) GetM3msgTopic( ) (admin.TopicGetResponse, error) { url := c.makeURL(topic.GetURL) logger := c.logger.With( - ZapMethod("getM3msgTopic"), zap.String("url", url), + zapMethod("getM3msgTopic"), zap.String("url", url), zap.String("topic", fmt.Sprintf("%v", topicOpts))) resp, err := c.makeRequest(logger, url, topic.GetHTTPMethod, nil, m3msgTopicOptionsToMap(topicOpts)) @@ -516,7 +516,7 @@ func (c *CoordinatorClient) AddM3msgTopicConsumer( ) (admin.TopicGetResponse, error) { url := c.makeURL(topic.AddURL) logger := c.logger.With( - ZapMethod("addM3msgTopicConsumer"), + zapMethod("addM3msgTopicConsumer"), zap.String("url", url), zap.String("request", addRequest.String()), zap.String("topic", fmt.Sprintf("%v", topicOpts))) @@ -557,7 +557,7 @@ func (c *CoordinatorClient) WriteCarbon( url string, metric string, v float64, t time.Time, ) error { logger := c.logger.With( - ZapMethod("writeCarbon"), zap.String("url", url), + zapMethod("writeCarbon"), zap.String("url", url), zap.String("at time", time.Now().String()), zap.String("at ts", t.String())) @@ -623,7 +623,7 @@ func (c *CoordinatorClient) WritePromWithRequest(writeRequest prompb.WriteReques url := c.makeURL("api/v1/prom/remote/write") logger := c.logger.With( - ZapMethod("writeProm"), zap.String("url", url), + zapMethod("writeProm"), zap.String("url", url), zap.String("request", writeRequest.String())) body, err := proto.Marshal(&writeRequest) @@ -697,7 +697,7 @@ func (c *CoordinatorClient) ApplyKVUpdate(update string) error { url := c.makeURL("api/v1/kvstore") logger := c.logger.With( - ZapMethod("ApplyKVUpdate"), zap.String("url", url), + zapMethod("ApplyKVUpdate"), zap.String("url", url), zap.String("update", update)) data := bytes.NewBuffer([]byte(update)) @@ -731,7 +731,7 @@ func (c *CoordinatorClient) query( ) error { url := c.makeURL(query) logger := c.logger.With( - ZapMethod("query"), zap.String("url", url), zap.Any("headers", headers)) + zapMethod("query"), zap.String("url", url), zap.Any("headers", headers)) logger.Info("running") req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) if err != nil { @@ -962,7 +962,7 @@ func (c *CoordinatorClient) runQuery( ) (string, error) { url := c.makeURL(query) logger := c.logger.With( - ZapMethod("query"), zap.String("url", url), zap.Any("headers", headers)) + zapMethod("query"), zap.String("url", url), zap.Any("headers", headers)) logger.Info("running") req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) if err != nil { @@ -1000,7 +1000,7 @@ func (c *CoordinatorClient) runQuery( func (c *CoordinatorClient) RunQuery( verifier ResponseVerifier, query string, headers map[string][]string, ) error { - logger := c.logger.With(ZapMethod("runQuery"), + logger := c.logger.With(zapMethod("runQuery"), zap.String("query", query)) err := c.retryFunc(func() error { err := c.query(verifier, query, headers) @@ -1067,7 +1067,7 @@ func (c *CoordinatorClient) GraphiteQuery( url := c.makeURL(queryStr) logger := c.logger.With( - ZapMethod("graphiteQuery"), zap.String("url", url)) + zapMethod("graphiteQuery"), zap.String("url", url)) logger.Info("running") req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) if err != nil { diff --git a/src/integration/resources/docker/dockerexternal/etcd.go b/src/integration/resources/docker/dockerexternal/etcd.go index 207ec26091..2e28743bee 100644 --- a/src/integration/resources/docker/dockerexternal/etcd.go +++ b/src/integration/resources/docker/dockerexternal/etcd.go @@ -152,8 +152,6 @@ func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) { // This is coming from the equivalent of docker inspect portBinds := container.NetworkSettings.Ports["2379/tcp"] - // If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine. - // See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16 ipAddr := "127.0.0.1" _, err = net.ResolveIPAddr("ip4", "host.docker.internal") if err == nil { diff --git a/src/integration/resources/inprocess/aggregator.go b/src/integration/resources/inprocess/aggregator.go index 0192d7822f..fc76aef7c0 100644 --- a/src/integration/resources/inprocess/aggregator.go +++ b/src/integration/resources/inprocess/aggregator.go @@ -33,6 +33,7 @@ import ( m3agg "github.com/m3db/m3/src/aggregator/aggregator" "github.com/m3db/m3/src/aggregator/server" "github.com/m3db/m3/src/aggregator/tools/deploy" + etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cmd/services/m3aggregator/config" "github.com/m3db/m3/src/integration/resources" nettest "github.com/m3db/m3/src/integration/resources/net" @@ -63,12 +64,16 @@ type Aggregator struct { // AggregatorOptions are options of starting an in-process aggregator. type AggregatorOptions struct { + // EtcdEndpoints are the endpoints this aggregator should use to connect to etcd. + EtcdEndpoints []string + // Logger is the logger to use for the in-process aggregator. Logger *zap.Logger // StartFn is a custom function that can be used to start the Aggregator. StartFn AggregatorStartFn // Start indicates whether to start the aggregator instance Start bool + // GeneratePorts will automatically update the config to use open ports // if set to true. If false, configuration is used as-is re: ports. GeneratePorts bool @@ -286,6 +291,10 @@ func updateAggregatorConfig( } } + kvCfg := cfg.KVClientOrDefault() + cfg.KVClient = &kvCfg + updateEtcdEndpoints(opts.EtcdEndpoints, cfg.KVClient.Etcd) + // Replace any filepath with a temporary directory cfg, tmpDirs, err = updateAggregatorFilepaths(cfg) if err != nil { @@ -295,6 +304,11 @@ func updateAggregatorConfig( return cfg, tmpDirs, nil } +func updateEtcdEndpoints(etcdEndpoints []string, etcdCfg *etcdclient.Configuration) { + etcdCfg.ETCDClusters[0].Endpoints = etcdEndpoints + etcdCfg.ETCDClusters[0].AutoSyncInterval = -1 +} + func updateAggregatorHostID(cfg config.Configuration) config.Configuration { hostID := uuid.New().String() aggCfg := cfg.AggregatorOrDefault() diff --git a/src/integration/resources/inprocess/cluster.go b/src/integration/resources/inprocess/cluster.go index 2b2dbc2303..259f4d3c36 100644 --- a/src/integration/resources/inprocess/cluster.go +++ b/src/integration/resources/inprocess/cluster.go @@ -21,11 +21,12 @@ package inprocess import ( + "context" "errors" "fmt" - "net" - "strconv" + "time" + etcdclient "github.com/m3db/m3/src/cluster/client/etcd" aggcfg "github.com/m3db/m3/src/cmd/services/m3aggregator/config" dbcfg "github.com/m3db/m3/src/cmd/services/m3dbnode/config" coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config" @@ -34,13 +35,15 @@ import ( "github.com/m3db/m3/src/dbnode/environment" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/integration/resources" - nettest "github.com/m3db/m3/src/integration/resources/net" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" "github.com/m3db/m3/src/query/storage/m3" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/config/hostid" xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" "github.com/google/uuid" + "github.com/ory/dockertest/v3" "go.uber.org/zap" "gopkg.in/yaml.v2" ) @@ -122,17 +125,51 @@ func NewClusterConfigsFromConfigFile( // NewClusterConfigsFromYAML creates a new ClusterConfigs object from YAML strings // representing component configs. func NewClusterConfigsFromYAML(dbnodeYaml string, coordYaml string, aggYaml string) (ClusterConfigs, error) { - var dbCfg dbcfg.Configuration + // "db": + // discovery: + // "config": + // "service": + // "etcdClusters": + // - "endpoints": ["http://127.0.0.1:2379"] + // "zone": "embedded" + // "service": "m3db" + // "zone": "embedded" + // "env": "default_env" + etcdClientCfg := &etcdclient.Configuration{ + Zone: "embedded", + Env: "default_env", + Service: "m3db", + ETCDClusters: []etcdclient.ClusterConfig{{ + Zone: "embedded", + Endpoints: []string{"http://127.0.0.1:2379"}, + }}, + } + var dbCfg = dbcfg.Configuration{ + DB: &dbcfg.DBConfiguration{ + Discovery: &discovery.Configuration{ + Config: &environment.Configuration{ + Services: environment.DynamicConfiguration{{ + Service: etcdClientCfg, + }}, + }, + }, + }, + } if err := yaml.Unmarshal([]byte(dbnodeYaml), &dbCfg); err != nil { return ClusterConfigs{}, err } - var coordCfg coordinatorcfg.Configuration + var coordCfg = coordinatorcfg.Configuration{ + ClusterManagement: coordinatorcfg.ClusterManagementConfiguration{ + Etcd: etcdClientCfg, + }, + } if err := yaml.Unmarshal([]byte(coordYaml), &coordCfg); err != nil { return ClusterConfigs{}, err } - var aggCfg aggcfg.Configuration + var aggCfg = aggcfg.Configuration{} + if aggYaml != "" { if err := yaml.Unmarshal([]byte(aggYaml), &aggCfg); err != nil { return ClusterConfigs{}, err @@ -164,7 +201,7 @@ func NewCluster( func NewClusterFromSpecification( specs ClusterSpecification, opts resources.ClusterOptions, -) (resources.M3Resources, error) { +) (_ resources.M3Resources, finalErr error) { if err := opts.Validate(); err != nil { return nil, err } @@ -175,6 +212,7 @@ func NewClusterFromSpecification( } var ( + etcd *dockerexternal.EtcdNode coord resources.Coordinator nodes = make(resources.Nodes, 0, len(specs.Configs.DBNodes)) aggs = make(resources.Aggregators, 0, len(specs.Configs.Aggregators)) @@ -185,13 +223,38 @@ func NewClusterFromSpecification( // Ensure that once we start creating resources, they all get cleaned up even if the function // fails half way. defer func() { - if err != nil { - cleanup(logger, nodes, coord, aggs) + if finalErr != nil { + cleanup(logger, etcd, nodes, coord, aggs) } }() + etcdEndpoints := opts.EtcdEndpoints + if len(opts.EtcdEndpoints) == 0 { + // TODO: amainsd: maybe not the cleanest place to do this. + pool, err := dockertest.NewPool("") + if err != nil { + return nil, err + } + etcd, err = dockerexternal.NewEtcd(pool, instrument.NewOptions()) + if err != nil { + return nil, err + } + + // TODO(amains): etcd *needs* to be setup before the coordinator, because ConfigurePlacementsForAggregation spins + // up a dedicated coordinator for some reason. Either clean this up or just accept it. + if err := etcd.Setup(context.TODO()); err != nil { + return nil, err + } + etcdEndpoints = []string{fmt.Sprintf(etcd.Address())} + } + + updateEtcdEndpoints := func(etcdCfg *etcdclient.Configuration) { + etcdCfg.ETCDClusters[0].Endpoints = etcdEndpoints + etcdCfg.ETCDClusters[0].AutoSyncInterval = -1 + } for i := 0; i < len(specs.Configs.DBNodes); i++ { var node resources.Node + updateEtcdEndpoints(specs.Configs.DBNodes[i].DB.Discovery.Config.Services[0].Service) node, err = NewDBNode(specs.Configs.DBNodes[i], specs.Options.DBNode[i]) if err != nil { return nil, err @@ -204,6 +267,7 @@ func NewClusterFromSpecification( agg, err = NewAggregator(aggCfg, AggregatorOptions{ GeneratePorts: true, GenerateHostID: false, + EtcdEndpoints: etcdEndpoints, }) if err != nil { return nil, err @@ -211,6 +275,7 @@ func NewClusterFromSpecification( aggs = append(aggs, agg) } + updateEtcdEndpoints(specs.Configs.Coordinator.ClusterManagement.Etcd) coord, err = NewCoordinator( specs.Configs.Coordinator, CoordinatorOptions{GeneratePorts: opts.Coordinator.GeneratePorts}, @@ -220,7 +285,7 @@ func NewClusterFromSpecification( } if err = ConfigurePlacementsForAggregation(nodes, coord, aggs, specs, opts); err != nil { - return nil, err + return nil, fmt.Errorf("failed to setup placements for aggregation: %w", err) } // Start all the configured resources. @@ -228,6 +293,7 @@ func NewClusterFromSpecification( Coordinator: coord, DBNodes: nodes, Aggregators: aggs, + Etcd: etcd, }) m3.Start() @@ -371,13 +437,13 @@ func GenerateDBNodeConfigsForCluster( // the etcd server (i.e. seed node). hostID := uuid.NewString() defaultDBNodesCfg := configs.DBNode - discoveryCfg, envConfig, err := generateDefaultDiscoveryConfig( - defaultDBNodesCfg, - hostID, - generatePortsAndIDs) - if err != nil { - return nil, nil, environment.Configuration{}, err + + if configs.DBNode.DB.Discovery == nil { + return nil, nil, environment.Configuration{}, errors.New( + "configuration must specify at least `discovery`" + + " in order to construct an etcd client") } + discoveryCfg, envConfig := configs.DBNode.DB.Discovery, configs.DBNode.DB.Discovery.Config var ( defaultDBNodeOpts = DBNodeOptions{ @@ -390,7 +456,7 @@ func GenerateDBNodeConfigsForCluster( ) for i := 0; i < int(numNodes); i++ { var cfg dbcfg.Configuration - cfg, err = defaultDBNodesCfg.DeepCopy() + cfg, err := defaultDBNodesCfg.DeepCopy() if err != nil { return nil, nil, environment.Configuration{}, err } @@ -404,68 +470,31 @@ func GenerateDBNodeConfigsForCluster( Value: &hostID, } } - cfg.DB.Discovery = &discoveryCfg + cfg.DB.Discovery = discoveryCfg cfgs = append(cfgs, cfg) nodeOpts = append(nodeOpts, dbnodeOpts) } - return cfgs, nodeOpts, envConfig, nil + return cfgs, nodeOpts, *envConfig, nil } -// generateDefaultDiscoveryConfig handles creating the correct config -// for having an embedded ETCD server with the correct server and -// client configuration. -func generateDefaultDiscoveryConfig( - cfg dbcfg.Configuration, - hostID string, - generateETCDPorts bool, -) (discovery.Configuration, environment.Configuration, error) { - discoveryConfig := cfg.DB.DiscoveryOrDefault() - envConfig, err := discoveryConfig.EnvironmentConfig(hostID) - if err != nil { - return discovery.Configuration{}, environment.Configuration{}, err - } - - var ( - etcdClientPort = dbcfg.DefaultEtcdClientPort - etcdServerPort = dbcfg.DefaultEtcdServerPort - ) - if generateETCDPorts { - etcdClientPort, err = nettest.GetAvailablePort() - if err != nil { - return discovery.Configuration{}, environment.Configuration{}, err - } - - etcdServerPort, err = nettest.GetAvailablePort() - if err != nil { - return discovery.Configuration{}, environment.Configuration{}, err - } - } +func cleanup( + logger *zap.Logger, + etcd *dockerexternal.EtcdNode, + nodes resources.Nodes, + coord resources.Coordinator, + aggs resources.Aggregators, +) { + var multiErr xerrors.MultiError - etcdServerURL := fmt.Sprintf("http://0.0.0.0:%d", etcdServerPort) - etcdClientAddr := net.JoinHostPort("0.0.0.0", strconv.Itoa(etcdClientPort)) - etcdClientURL := fmt.Sprintf("http://0.0.0.0:%d", etcdClientPort) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - envConfig.SeedNodes.InitialCluster[0].Endpoint = etcdServerURL - envConfig.SeedNodes.InitialCluster[0].HostID = hostID - envConfig.Services[0].Service.ETCDClusters[0].Endpoints = []string{etcdClientAddr} - if generateETCDPorts { - envConfig.SeedNodes.ListenPeerUrls = []string{etcdServerURL} - envConfig.SeedNodes.ListenClientUrls = []string{etcdClientURL} - envConfig.SeedNodes.InitialAdvertisePeerUrls = []string{etcdServerURL} - envConfig.SeedNodes.AdvertiseClientUrls = []string{etcdClientURL} + if etcd != nil { + multiErr = multiErr.Add(etcd.Close(ctx)) } - configType := discovery.ConfigType - return discovery.Configuration{ - Type: &configType, - Config: &envConfig, - }, envConfig, nil -} - -func cleanup(logger *zap.Logger, nodes resources.Nodes, coord resources.Coordinator, aggs resources.Aggregators) { - var multiErr xerrors.MultiError for _, n := range nodes { multiErr = multiErr.Add(n.Close()) } diff --git a/src/integration/resources/inprocess/coordinator_test.go b/src/integration/resources/inprocess/coordinator_test.go index e818926439..9f9e776139 100644 --- a/src/integration/resources/inprocess/coordinator_test.go +++ b/src/integration/resources/inprocess/coordinator_test.go @@ -1,4 +1,6 @@ +//go:build test_harness // +build test_harness + // Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -22,17 +24,21 @@ package inprocess import ( + "context" "testing" "github.com/m3db/m3/src/cluster/generated/proto/placementpb" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/integration/resources" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" "github.com/m3db/m3/src/msg/generated/proto/topicpb" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" + "github.com/ory/dockertest/v3" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" @@ -40,6 +46,16 @@ import ( ) func TestNewCoordinator(t *testing.T) { + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + etcd, err := dockerexternal.NewEtcd(pool, instrument.NewOptions(), dockerexternal.EtcdClusterPort(2379)) + require.NoError(t, err) + require.NoError(t, etcd.Setup(context.TODO())) + t.Cleanup(func() { + require.NoError(t, etcd.Close(context.TODO())) + }) + dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{Start: true}) require.NoError(t, err) defer func() { diff --git a/src/integration/resources/inprocess/dbnode_test.go b/src/integration/resources/inprocess/dbnode_test.go index 4528a7b58a..f08fa81c00 100644 --- a/src/integration/resources/inprocess/dbnode_test.go +++ b/src/integration/resources/inprocess/dbnode_test.go @@ -1,4 +1,6 @@ +//go:build test_harness // +build test_harness + // Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy diff --git a/src/integration/resources/inprocess/inprocess.go b/src/integration/resources/inprocess/inprocess.go index a7a5af95f2..b0e9711bf0 100644 --- a/src/integration/resources/inprocess/inprocess.go +++ b/src/integration/resources/inprocess/inprocess.go @@ -21,7 +21,10 @@ package inprocess import ( + "context" + "github.com/m3db/m3/src/integration/resources" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" "github.com/m3db/m3/src/x/errors" ) @@ -29,11 +32,13 @@ type inprocessM3Resources struct { coordinator resources.Coordinator dbNodes resources.Nodes aggregators resources.Aggregators + etcd *dockerexternal.EtcdNode } // ResourceOptions are the options for creating new // resources.M3Resources. type ResourceOptions struct { + Etcd *dockerexternal.EtcdNode Coordinator resources.Coordinator DBNodes resources.Nodes Aggregators resources.Aggregators @@ -43,6 +48,7 @@ type ResourceOptions struct { // backed by in-process implementations of the M3 components. func NewM3Resources(options ResourceOptions) resources.M3Resources { return &inprocessM3Resources{ + etcd: options.Etcd, coordinator: options.Coordinator, dbNodes: options.DBNodes, aggregators: options.Aggregators, @@ -73,6 +79,11 @@ func (i *inprocessM3Resources) Cleanup() error { err = err.Add(a.Close()) } + if i.etcd != nil { + ctx := context.TODO() + err = err.Add(i.etcd.Close(ctx)) + } + return err.FinalError() } diff --git a/src/integration/resources/options.go b/src/integration/resources/options.go index d980c154f7..88eeddfe95 100644 --- a/src/integration/resources/options.go +++ b/src/integration/resources/options.go @@ -1,3 +1,23 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package resources import ( @@ -8,6 +28,9 @@ import ( // ClusterOptions contains options for spinning up a new M3 cluster // composed of in-process components. type ClusterOptions struct { + // EtcdEndpoints if provided, will be used directly instead of spinning up a dedicated etcd node for the cluster. + // By default, NewClusterFromSpecification will spin up and manage an etcd node itself. + EtcdEndpoints []string // DBNode contains cluster options for spinning up dbnodes. DBNode *DBNodeClusterOptions // Aggregator is the optional cluster options for spinning up aggregators.