From c46a4c66fc1b23ab8b41c5aa4e3936d3f7a0eafc Mon Sep 17 00:00:00 2001 From: Andrew Mains Date: Fri, 24 Mar 2023 12:10:04 -0400 Subject: [PATCH] etcd_docker 1: Refactor resources/docker package into resources/dockerexternal, resources/dockerm3, x/dockertest (#4145) (#4153) PR 1 of etcd test refactor I am introducing a docker based etcd test framework. Overall structure I'm going for: resources/dockerm3 -- m3 docker containers resources/dockerexternal -- dependencies (prometheus, etcd) The reason I need this refactor: I'm going to pull the dockerexternal package into a bunch of internal tests. This introduces circular dependencies with the M3 resources, thus splitting out here. commit-id:f922a2aa --- src/integration/prometheus/prometheus.go | 12 +- src/integration/prometheus/prometheus_test.go | 13 +- .../config/prometheus.yml | 0 .../docker/{ => dockerexternal}/prometheus.go | 34 ++--- .../{ => dockerexternal}/prometheus_test.go | 12 +- .../resources/docker/dockerm3/common.go | 29 +++++ .../docker/{ => dockerm3}/coordinator.go | 119 +++++++++--------- .../resources/docker/{ => dockerm3}/dbnode.go | 102 ++++++++------- .../docker/{ => dockerm3}/harness.go | 17 +-- .../docker/{ => dockerm3}/options.go | 16 +-- src/integration/resources/types.go | 5 +- .../docker => x/dockertest}/common.go | 47 +++++-- .../dockertest}/docker_resource.go | 88 +++++++------ src/x/retry/retry.go | 21 ++++ src/x/retry/types.go | 4 + 15 files changed, 318 insertions(+), 201 deletions(-) rename src/integration/resources/docker/{ => dockerexternal}/config/prometheus.yml (100%) rename src/integration/resources/docker/{ => dockerexternal}/prometheus.go (85%) rename src/integration/resources/docker/{ => dockerexternal}/prometheus_test.go (87%) create mode 100644 src/integration/resources/docker/dockerm3/common.go rename src/integration/resources/docker/{ => dockerm3}/coordinator.go (78%) rename src/integration/resources/docker/{ => dockerm3}/dbnode.go (71%) rename src/integration/resources/docker/{ => dockerm3}/harness.go (90%) rename src/integration/resources/docker/{ => dockerm3}/options.go (89%) rename src/{integration/resources/docker => x/dockertest}/common.go (75%) rename src/{integration/resources/docker => x/dockertest}/docker_resource.go (77%) diff --git a/src/integration/prometheus/prometheus.go b/src/integration/prometheus/prometheus.go index f2f6c8c8de..f8efe6e65f 100644 --- a/src/integration/prometheus/prometheus.go +++ b/src/integration/prometheus/prometheus.go @@ -37,7 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/kvconfig" "github.com/m3db/m3/src/integration/resources" - "github.com/m3db/m3/src/integration/resources/docker" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" "github.com/m3db/m3/src/query/api/v1/handler/database" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/generated/proto/prompb" @@ -89,7 +89,7 @@ func RunTest(t *testing.T, m3 resources.M3Resources, prom resources.ExternalReso logger.Info("running prometheus tests") - p := prom.(*docker.Prometheus) + p := prom.(*dockerexternal.Prometheus) testPrometheusRemoteRead(t, p, logger) testPrometheusRemoteWriteMultiNamespaces(t, p, logger) @@ -118,7 +118,7 @@ func RunTest(t *testing.T, m3 resources.M3Resources, prom resources.ExternalReso testDebugPromReturnsDuplicates(t, m3, logger) } -func testPrometheusRemoteRead(t *testing.T, p *docker.Prometheus, logger *zap.Logger) { +func testPrometheusRemoteRead(t *testing.T, p *dockerexternal.Prometheus, logger *zap.Logger) { // Ensure Prometheus can proxy a Prometheus query logger.Info("testing prometheus remote read") verifyPrometheusQuery(t, p, "prometheus_remote_storage_samples_total", 100) @@ -126,7 +126,7 @@ func testPrometheusRemoteRead(t *testing.T, p *docker.Prometheus, logger *zap.Lo func testPrometheusRemoteWriteMultiNamespaces( t *testing.T, - p *docker.Prometheus, + p *dockerexternal.Prometheus, logger *zap.Logger, ) { logger.Info("testing remote write to multiple namespaces") @@ -1839,9 +1839,9 @@ func requireSeriesSuccess( })) } -func verifyPrometheusQuery(t *testing.T, p *docker.Prometheus, query string, threshold float64) { +func verifyPrometheusQuery(t *testing.T, p *dockerexternal.Prometheus, query string, threshold float64) { require.NoError(t, resources.Retry(func() error { - res, err := p.Query(docker.PrometheusQueryRequest{ + res, err := p.Query(dockerexternal.PrometheusQueryRequest{ Query: query, }) if err != nil { diff --git a/src/integration/prometheus/prometheus_test.go b/src/integration/prometheus/prometheus_test.go index 4bc87054af..778102c9c6 100644 --- a/src/integration/prometheus/prometheus_test.go +++ b/src/integration/prometheus/prometheus_test.go @@ -1,4 +1,6 @@ +//go:build cluster_integration // +build cluster_integration + // // Copyright (c) 2021 Uber Technologies, Inc. // @@ -23,12 +25,13 @@ package prometheus import ( + "context" "path" "runtime" "testing" "github.com/m3db/m3/src/integration/resources" - "github.com/m3db/m3/src/integration/resources/docker" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" "github.com/m3db/m3/src/integration/resources/inprocess" "github.com/ory/dockertest/v3" @@ -60,14 +63,14 @@ func testSetup(t *testing.T) (resources.M3Resources, resources.ExternalResources require.NoError(t, err) _, filename, _, _ := runtime.Caller(0) - prom := docker.NewPrometheus(docker.PrometheusOptions{ + prom := dockerexternal.NewPrometheus(dockerexternal.PrometheusOptions{ Pool: pool, - PathToCfg: path.Join(path.Dir(filename), "../resources/docker/config/prometheus.yml"), + PathToCfg: path.Join(path.Dir(filename), "../resources/docker/dockerexternal/config/prometheus.yml"), }) - require.NoError(t, prom.Setup()) + require.NoError(t, prom.Setup(context.TODO())) return m3, prom, func() { - assert.NoError(t, prom.Close()) + assert.NoError(t, prom.Close(context.TODO())) assert.NoError(t, m3.Cleanup()) } } diff --git a/src/integration/resources/docker/config/prometheus.yml b/src/integration/resources/docker/dockerexternal/config/prometheus.yml similarity index 100% rename from src/integration/resources/docker/config/prometheus.yml rename to src/integration/resources/docker/dockerexternal/config/prometheus.yml diff --git a/src/integration/resources/docker/prometheus.go b/src/integration/resources/docker/dockerexternal/prometheus.go similarity index 85% rename from src/integration/resources/docker/prometheus.go rename to src/integration/resources/docker/dockerexternal/prometheus.go index ff6e6ced46..7adc580d21 100644 --- a/src/integration/resources/docker/prometheus.go +++ b/src/integration/resources/docker/dockerexternal/prometheus.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package docker +package dockerexternal import ( "context" @@ -29,9 +29,10 @@ import ( "net/http" "time" - "github.com/m3db/m3/src/integration/resources" + xdockertest "github.com/m3db/m3/src/x/dockertest" "github.com/m3db/m3/src/x/instrument" + "github.com/cenkalti/backoff/v3" "github.com/ory/dockertest/v3" "github.com/prometheus/common/model" ) @@ -42,7 +43,7 @@ type Prometheus struct { pathToCfg string iOpts instrument.Options - resource *Resource + resource *xdockertest.Resource } // PrometheusOptions contains the options for @@ -60,7 +61,7 @@ type PrometheusOptions struct { // NewPrometheus creates a new docker-backed Prometheus // that implements the resources.ExternalResources interface. -func NewPrometheus(opts PrometheusOptions) resources.ExternalResources { +func NewPrometheus(opts PrometheusOptions) *Prometheus { if opts.InstrumentOptions == nil { opts.InstrumentOptions = instrument.NewOptions() } @@ -72,19 +73,19 @@ func NewPrometheus(opts PrometheusOptions) resources.ExternalResources { } // Setup is a method that setups up the prometheus instance. -func (p *Prometheus) Setup() error { +func (p *Prometheus) Setup(ctx context.Context) error { if p.resource != nil { return errors.New("prometheus already setup. must close resource " + "before attempting to setup again") } - if err := SetupNetwork(p.pool); err != nil { + if err := xdockertest.SetupNetwork(p.pool, true); err != nil { return err } - res, err := NewDockerResource(p.pool, ResourceOptions{ + res, err := xdockertest.NewDockerResource(p.pool, xdockertest.ResourceOptions{ ContainerName: "prometheus", - Image: Image{ + Image: xdockertest.Image{ Name: "prom/prometheus", Tag: "latest", }, @@ -100,11 +101,12 @@ func (p *Prometheus) Setup() error { p.resource = res - return p.waitForHealthy() + return p.waitForHealthy(ctx) } -func (p *Prometheus) waitForHealthy() error { - return resources.Retry(func() error { +func (p *Prometheus) waitForHealthy(ctx context.Context) error { + retrier := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + return backoff.Retry(func() error { req, err := http.NewRequestWithContext( context.Background(), http.MethodGet, @@ -126,7 +128,7 @@ func (p *Prometheus) waitForHealthy() error { } return errors.New("prometheus not ready") - }) + }, retrier) } // PrometheusQueryRequest contains the parameters for making a query request. @@ -152,7 +154,7 @@ func (p *PrometheusQueryRequest) String() string { // Query executes a query request against the prometheus resource. func (p *Prometheus) Query(req PrometheusQueryRequest) (model.Vector, error) { if p.resource.Closed() { - return nil, errClosed + return nil, xdockertest.ErrClosed } r, err := http.NewRequestWithContext( @@ -201,9 +203,9 @@ type vectorResult struct { } // Close cleans up the prometheus instance. -func (p *Prometheus) Close() error { +func (p *Prometheus) Close(context.Context) error { if p.resource.Closed() { - return errClosed + return xdockertest.ErrClosed } if err := p.resource.Close(); err != nil { diff --git a/src/integration/resources/docker/prometheus_test.go b/src/integration/resources/docker/dockerexternal/prometheus_test.go similarity index 87% rename from src/integration/resources/docker/prometheus_test.go rename to src/integration/resources/docker/dockerexternal/prometheus_test.go index 199095904a..3571e8dccf 100644 --- a/src/integration/resources/docker/prometheus_test.go +++ b/src/integration/resources/docker/dockerexternal/prometheus_test.go @@ -1,5 +1,4 @@ -// +build integration_v2 -// Copyright (c) 2021 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -19,9 +18,12 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package docker +//go:build integration_v2 + +package dockerexternal import ( + "context" "path" "runtime" "testing" @@ -39,6 +41,6 @@ func TestNewPrometheus(t *testing.T) { Pool: pool, PathToCfg: path.Join(path.Dir(filename), "config/prometheus.yml"), }) - require.NoError(t, prom.Setup()) - require.NoError(t, prom.Close()) + require.NoError(t, prom.Setup(context.TODO())) + require.NoError(t, prom.Close(context.TODO())) } diff --git a/src/integration/resources/docker/dockerm3/common.go b/src/integration/resources/docker/dockerm3/common.go new file mode 100644 index 0000000000..0c1513824f --- /dev/null +++ b/src/integration/resources/docker/dockerm3/common.go @@ -0,0 +1,29 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerm3 + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// zapMethod appends the method as a log field. +func zapMethod(s string) zapcore.Field { return zap.String("method", s) } diff --git a/src/integration/resources/docker/coordinator.go b/src/integration/resources/docker/dockerm3/coordinator.go similarity index 78% rename from src/integration/resources/docker/coordinator.go rename to src/integration/resources/docker/dockerm3/coordinator.go index 7a82408b39..d607bccd96 100644 --- a/src/integration/resources/docker/coordinator.go +++ b/src/integration/resources/docker/dockerm3/coordinator.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package docker +package dockerm3 import ( "errors" @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/generated/proto/prompb" + xdockertest "github.com/m3db/m3/src/x/dockertest" "github.com/ory/dockertest/v3" "github.com/prometheus/common/model" @@ -43,7 +44,7 @@ const ( var ( defaultCoordinatorList = []int{7201, 7203, 7204} - defaultCoordinatorOptions = ResourceOptions{ + defaultCoordinatorOptions = xdockertest.ResourceOptions{ Source: defaultCoordinatorSource, ContainerName: defaultCoordinatorName, PortList: defaultCoordinatorList, @@ -51,18 +52,18 @@ var ( ) type coordinator struct { - resource *Resource + resource *xdockertest.Resource client resources.CoordinatorClient } func newDockerHTTPCoordinator( pool *dockertest.Pool, - opts ResourceOptions, + opts xdockertest.ResourceOptions, ) (resources.Coordinator, error) { - opts = opts.withDefaults(defaultCoordinatorOptions) + opts = opts.WithDefaults(defaultCoordinatorOptions) opts.TmpfsMounts = []string{"/etc/m3coordinator/"} - resource, err := NewDockerResource(pool, opts) + resource, err := xdockertest.NewDockerResource(pool, opts) if err != nil { return nil, err } @@ -72,8 +73,8 @@ func newDockerHTTPCoordinator( client: resources.NewCoordinatorClient(resources.CoordinatorClientOptions{ Client: http.DefaultClient, HTTPPort: 7201, - Logger: resource.logger, - RetryFunc: resource.pool.Retry, + Logger: opts.InstrumentOpts.Logger(), + RetryFunc: pool.Retry, }), }, nil } @@ -88,8 +89,8 @@ func (c *coordinator) HostDetails() (*resources.InstanceInfo, error) { } func (c *coordinator) GetNamespace() (admin.NamespaceGetResponse, error) { - if c.resource.closed { - return admin.NamespaceGetResponse{}, errClosed + if c.resource.Closed() { + return admin.NamespaceGetResponse{}, xdockertest.ErrClosed } return c.client.GetNamespace() @@ -98,8 +99,8 @@ func (c *coordinator) GetNamespace() (admin.NamespaceGetResponse, error) { func (c *coordinator) GetPlacement( opts resources.PlacementRequestOptions, ) (admin.PlacementGetResponse, error) { - if c.resource.closed { - return admin.PlacementGetResponse{}, errClosed + if c.resource.Closed() { + return admin.PlacementGetResponse{}, xdockertest.ErrClosed } return c.client.GetPlacement(opts) @@ -109,8 +110,8 @@ func (c *coordinator) InitPlacement( opts resources.PlacementRequestOptions, req admin.PlacementInitRequest, ) (admin.PlacementGetResponse, error) { - if c.resource.closed { - return admin.PlacementGetResponse{}, errClosed + if c.resource.Closed() { + return admin.PlacementGetResponse{}, xdockertest.ErrClosed } return c.client.InitPlacement(opts, req) @@ -119,16 +120,16 @@ func (c *coordinator) InitPlacement( func (c *coordinator) DeleteAllPlacements( opts resources.PlacementRequestOptions, ) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.DeleteAllPlacements(opts) } func (c *coordinator) WaitForNamespace(name string) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.WaitForNamespace(name) @@ -137,24 +138,24 @@ func (c *coordinator) WaitForNamespace(name string) error { func (c *coordinator) WaitForInstances( ids []string, ) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.WaitForInstances(ids) } func (c *coordinator) WaitForShardsReady() error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.WaitForShardsReady() } func (c *coordinator) WaitForClusterReady() error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.WaitForClusterReady() @@ -163,8 +164,8 @@ func (c *coordinator) WaitForClusterReady() error { func (c *coordinator) CreateDatabase( addRequest admin.DatabaseCreateRequest, ) (admin.DatabaseCreateResponse, error) { - if c.resource.closed { - return admin.DatabaseCreateResponse{}, errClosed + if c.resource.Closed() { + return admin.DatabaseCreateResponse{}, xdockertest.ErrClosed } return c.client.CreateDatabase(addRequest) @@ -173,8 +174,8 @@ func (c *coordinator) CreateDatabase( func (c *coordinator) AddNamespace( addRequest admin.NamespaceAddRequest, ) (admin.NamespaceGetResponse, error) { - if c.resource.closed { - return admin.NamespaceGetResponse{}, errClosed + if c.resource.Closed() { + return admin.NamespaceGetResponse{}, xdockertest.ErrClosed } return c.client.AddNamespace(addRequest) @@ -183,16 +184,16 @@ func (c *coordinator) AddNamespace( func (c *coordinator) UpdateNamespace( req admin.NamespaceUpdateRequest, ) (admin.NamespaceGetResponse, error) { - if c.resource.closed { - return admin.NamespaceGetResponse{}, errClosed + if c.resource.Closed() { + return admin.NamespaceGetResponse{}, xdockertest.ErrClosed } return c.client.UpdateNamespace(req) } func (c *coordinator) DeleteNamespace(namespaceID string) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.DeleteNamespace(namespaceID) @@ -201,11 +202,11 @@ func (c *coordinator) DeleteNamespace(namespaceID string) error { func (c *coordinator) WriteCarbon( port int, metric string, v float64, t time.Time, ) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } - url := c.resource.resource.GetHostPort(fmt.Sprintf("%d/tcp", port)) + url := c.resource.Resource().GetHostPort(fmt.Sprintf("%d/tcp", port)) return c.client.WriteCarbon(url, metric, v, t) } @@ -216,8 +217,8 @@ func (c *coordinator) WriteProm( samples []prompb.Sample, headers resources.Headers, ) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.WriteProm(name, tags, samples, headers) @@ -227,16 +228,16 @@ func (c *coordinator) WritePromWithRequest( writeRequest prompb.WriteRequest, headers resources.Headers, ) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.WritePromWithRequest(writeRequest, headers) } func (c *coordinator) ApplyKVUpdate(update string) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.ApplyKVUpdate(update) @@ -247,8 +248,8 @@ func (c *coordinator) RunQuery( query string, headers resources.Headers, ) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.client.RunQuery(verifier, query, headers) @@ -258,8 +259,8 @@ func (c *coordinator) InstantQuery( req resources.QueryRequest, headers resources.Headers, ) (model.Vector, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } return c.client.InstantQuery(req, headers) } @@ -271,8 +272,8 @@ func (c *coordinator) InstantQueryWithEngine( engine options.QueryEngine, headers resources.Headers, ) (model.Vector, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } return c.client.InstantQueryWithEngine(req, engine, headers) } @@ -282,8 +283,8 @@ func (c *coordinator) RangeQuery( req resources.RangeQueryRequest, headers resources.Headers, ) (model.Matrix, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } return c.client.RangeQuery(req, headers) } @@ -300,8 +301,8 @@ func (c *coordinator) RangeQueryWithEngine( engine options.QueryEngine, headers resources.Headers, ) (model.Matrix, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } return c.client.RangeQueryWithEngine(req, engine, headers) } @@ -311,8 +312,8 @@ func (c *coordinator) LabelNames( req resources.LabelNamesRequest, headers resources.Headers, ) (model.LabelNames, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } return c.client.LabelNames(req, headers) } @@ -322,8 +323,8 @@ func (c *coordinator) LabelValues( req resources.LabelValuesRequest, headers resources.Headers, ) (model.LabelValues, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } return c.client.LabelValues(req, headers) } @@ -333,15 +334,15 @@ func (c *coordinator) Series( req resources.SeriesRequest, headers resources.Headers, ) ([]model.Metric, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } return c.client.Series(req, headers) } func (c *coordinator) Close() error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.resource.Close() diff --git a/src/integration/resources/docker/dbnode.go b/src/integration/resources/docker/dockerm3/dbnode.go similarity index 71% rename from src/integration/resources/docker/dbnode.go rename to src/integration/resources/docker/dockerm3/dbnode.go index 758ac005ea..0053b0729b 100644 --- a/src/integration/resources/docker/dbnode.go +++ b/src/integration/resources/docker/dockerm3/dbnode.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package docker +package dockerm3 import ( "fmt" @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/integration" "github.com/m3db/m3/src/integration/resources" "github.com/m3db/m3/src/query/generated/proto/admin" + xdockertest "github.com/m3db/m3/src/x/dockertest" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -42,7 +43,7 @@ const ( var ( defaultDBNodePortList = []int{2379, 2380, 9000, 9001, 9002, 9003, 9004} - defaultDBNodeOptions = ResourceOptions{ + defaultDBNodeOptions = xdockertest.ResourceOptions{ Source: defaultDBNodeSource, ContainerName: defaultDBNodeContainerName, PortList: defaultDBNodePortList, @@ -51,15 +52,17 @@ var ( type dbNode struct { tchanClient *integration.TestTChannelClient - resource *Resource + resource *xdockertest.Resource + pool *dockertest.Pool + logger *zap.Logger } func newDockerHTTPNode( pool *dockertest.Pool, - opts ResourceOptions, + opts xdockertest.ResourceOptions, ) (resources.Node, error) { - opts = opts.withDefaults(defaultDBNodeOptions) - resource, err := NewDockerResource(pool, opts) + opts = opts.WithDefaults(defaultDBNodeOptions) + resource, err := xdockertest.NewDockerResource(pool, opts) if err != nil { return nil, err } @@ -71,17 +74,20 @@ func newDockerHTTPNode( } }() - addr := resource.resource.GetHostPort("9000/tcp") + logger := opts.InstrumentOpts.Logger() + addr := resource.Resource().GetHostPort("9000/tcp") tchanClient, err := integration.NewTChannelClient("client", addr) if err != nil { return nil, err } - resource.logger.Info("set up tchanClient", zap.String("node_addr", addr)) + logger.Info("set up tchanClient", zap.String("node_addr", addr)) completed = true return &dbNode{ tchanClient: tchanClient, + pool: pool, resource: resource, + logger: logger, }, nil } @@ -91,14 +97,14 @@ func (c *dbNode) Start() { func (c *dbNode) HostDetails(p int) (*admin.Host, error) { var network docker.ContainerNetwork - for _, n := range c.resource.resource.Container.NetworkSettings.Networks { // nolint: gocritic + for _, n := range c.resource.Resource().Container.NetworkSettings.Networks { // nolint: gocritic network = n } - host := strings.TrimLeft(c.resource.resource.Container.Name, "/") + host := strings.TrimLeft(c.resource.Resource().Container.Name, "/") return &admin.Host{ Id: host, - IsolationGroup: "rack-a-" + c.resource.resource.Container.Name, + IsolationGroup: "rack-a-" + c.resource.Resource().Container.Name, Zone: "embedded", Weight: 1024, Address: network.IPAddress, @@ -107,11 +113,11 @@ func (c *dbNode) HostDetails(p int) (*admin.Host, error) { } func (c *dbNode) Health() (*rpc.NodeHealthResult_, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } - logger := c.resource.logger.With(resources.ZapMethod("health")) + logger := c.logger.With(zapMethod("health")) res, err := c.tchanClient.TChannelClientHealth(timeout) if err != nil { logger.Error("failed get", zap.Error(err), zap.Any("res", res)) @@ -121,12 +127,12 @@ func (c *dbNode) Health() (*rpc.NodeHealthResult_, error) { } func (c *dbNode) WaitForBootstrap() error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } - logger := c.resource.logger.With(resources.ZapMethod("waitForBootstrap")) - return c.resource.pool.Retry(func() error { + logger := c.logger.With(zapMethod("waitForBootstrap")) + return c.pool.Retry(func() error { health, err := c.Health() if err != nil { return err @@ -143,11 +149,11 @@ func (c *dbNode) WaitForBootstrap() error { } func (c *dbNode) WritePoint(req *rpc.WriteRequest) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } - logger := c.resource.logger.With(resources.ZapMethod("write")) + logger := c.logger.With(zapMethod("write")) err := c.tchanClient.TChannelClientWrite(timeout, req) if err != nil { logger.Error("could not write", zap.Error(err)) @@ -159,11 +165,11 @@ func (c *dbNode) WritePoint(req *rpc.WriteRequest) error { } func (c *dbNode) WriteTaggedPoint(req *rpc.WriteTaggedRequest) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } - logger := c.resource.logger.With(resources.ZapMethod("write-tagged")) + logger := c.logger.With(zapMethod("write-tagged")) err := c.tchanClient.TChannelClientWriteTagged(timeout, req) if err != nil { logger.Error("could not write-tagged", zap.Error(err)) @@ -176,11 +182,11 @@ func (c *dbNode) WriteTaggedPoint(req *rpc.WriteTaggedRequest) error { // WriteTaggedBatchRaw writes a batch of writes to the node directly. func (c *dbNode) WriteTaggedBatchRaw(req *rpc.WriteTaggedBatchRawRequest) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } - logger := c.resource.logger.With(resources.ZapMethod("write-tagged-batch-raw")) + logger := c.logger.With(zapMethod("write-tagged-batch-raw")) err := c.tchanClient.TChannelClientWriteTaggedBatchRaw(timeout, req) if err != nil { logger.Error("writeTaggedBatchRaw call failed", zap.Error(err)) @@ -192,11 +198,11 @@ func (c *dbNode) WriteTaggedBatchRaw(req *rpc.WriteTaggedBatchRawRequest) error } func (c *dbNode) AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error) { - if c.resource.closed { - return 0, errClosed + if c.resource.Closed() { + return 0, xdockertest.ErrClosed } - logger := c.resource.logger.With(resources.ZapMethod("aggregate-tiles")) + logger := c.logger.With(zapMethod("aggregate-tiles")) rsp, err := c.tchanClient.TChannelClientAggregateTiles(timeout, req) if err != nil { logger.Error("could not aggregate tiles", zap.Error(err)) @@ -208,11 +214,11 @@ func (c *dbNode) AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error) { } func (c *dbNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } - logger := c.resource.logger.With(resources.ZapMethod("fetch")) + logger := c.logger.With(zapMethod("fetch")) dps, err := c.tchanClient.TChannelClientFetch(timeout, req) if err != nil { logger.Error("could not fetch", zap.Error(err)) @@ -224,11 +230,11 @@ func (c *dbNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) { } func (c *dbNode) FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { - if c.resource.closed { - return nil, errClosed + if c.resource.Closed() { + return nil, xdockertest.ErrClosed } - logger := c.resource.logger.With(resources.ZapMethod("fetchtagged")) + logger := c.logger.With(zapMethod("fetchtagged")) result, err := c.tchanClient.TChannelClientFetchTagged(timeout, req) if err != nil { logger.Error("could not fetch", zap.Error(err)) @@ -240,14 +246,14 @@ func (c *dbNode) FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResul } func (c *dbNode) Restart() error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } - cName := c.resource.resource.Container.Name - logger := c.resource.logger.With(resources.ZapMethod("restart")) + cName := c.resource.Resource().Container.Name + logger := c.logger.With(zapMethod("restart")) logger.Info("restarting container", zap.String("container", cName)) - err := c.resource.pool.Client.RestartContainer(cName, 60) + err := c.pool.Client.RestartContainer(cName, 60) if err != nil { logger.Error("could not restart", zap.Error(err)) return err @@ -257,8 +263,8 @@ func (c *dbNode) Restart() error { } func (c *dbNode) Exec(commands ...string) (string, error) { - if c.resource.closed { - return "", errClosed + if c.resource.Closed() { + return "", xdockertest.ErrClosed } return c.resource.Exec(commands...) @@ -268,16 +274,16 @@ func (c *dbNode) GoalStateExec( verifier resources.GoalStateVerifier, commands ...string, ) error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } - return c.resource.GoalStateExec(verifier, commands...) + return c.resource.GoalStateExec(xdockertest.GoalStateVerifier(verifier), commands...) } func (c *dbNode) Close() error { - if c.resource.closed { - return errClosed + if c.resource.Closed() { + return xdockertest.ErrClosed } return c.resource.Close() diff --git a/src/integration/resources/docker/harness.go b/src/integration/resources/docker/dockerm3/harness.go similarity index 90% rename from src/integration/resources/docker/harness.go rename to src/integration/resources/docker/dockerm3/harness.go index 201946d699..8eb4fe3f7c 100644 --- a/src/integration/resources/docker/harness.go +++ b/src/integration/resources/docker/dockerm3/harness.go @@ -18,13 +18,14 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Package docker contains resources needed to setup docker containers for M3 tests. -package docker +// Package dockerm3 contains resources needed to setup docker containers for M3 tests. +package dockerm3 import ( "time" "github.com/m3db/m3/src/integration/resources" + xdockertest "github.com/m3db/m3/src/x/dockertest" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" @@ -57,17 +58,17 @@ func SetupSingleM3DBNode(opts ...SetupOptions) (resources.M3Resources, error) { pool.MaxWait = timeout if !options.existingCluster { - if err := SetupNetwork(pool); err != nil { + if err := xdockertest.SetupNetwork(pool, true); err != nil { return nil, err } - if err := setupVolume(pool); err != nil { + if err := xdockertest.SetupVolume(pool); err != nil { return nil, err } } iOpts := instrument.NewOptions() - dbNode, err := newDockerHTTPNode(pool, ResourceOptions{ + dbNode, err := newDockerHTTPNode(pool, xdockertest.ResourceOptions{ Image: options.dbNodeImage, ContainerName: options.dbNodeContainerName, InstrumentOpts: iOpts, @@ -91,7 +92,7 @@ func SetupSingleM3DBNode(opts ...SetupOptions) (resources.M3Resources, error) { return nil, err } - coordinator, err := newDockerHTTPCoordinator(pool, ResourceOptions{ + coordinator, err := newDockerHTTPCoordinator(pool, xdockertest.ResourceOptions{ Image: options.coordinatorImage, ContainerName: options.coordinatorContainerName, InstrumentOpts: iOpts, @@ -137,7 +138,7 @@ func AttachToExistingContainers( iOpts := instrument.NewOptions() dbNodes := resources.Nodes{} for _, containerName := range dbNodesContainersNames { - dbNode, err := newDockerHTTPNode(pool, ResourceOptions{ + dbNode, err := newDockerHTTPNode(pool, xdockertest.ResourceOptions{ InstrumentOpts: iOpts, ContainerName: containerName, }) @@ -149,7 +150,7 @@ func AttachToExistingContainers( coordinator, err := newDockerHTTPCoordinator( pool, - ResourceOptions{ + xdockertest.ResourceOptions{ InstrumentOpts: iOpts, ContainerName: coordinatorContainerName, }, diff --git a/src/integration/resources/docker/options.go b/src/integration/resources/docker/dockerm3/options.go similarity index 89% rename from src/integration/resources/docker/options.go rename to src/integration/resources/docker/dockerm3/options.go index 5293a7ba84..62d7b45896 100644 --- a/src/integration/resources/docker/options.go +++ b/src/integration/resources/docker/dockerm3/options.go @@ -18,17 +18,13 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package docker +package dockerm3 -// Image represents a docker image. -type Image struct { - Name string - Tag string -} +import "github.com/m3db/m3/src/x/dockertest" type setupOptions struct { - dbNodeImage Image - coordinatorImage Image + dbNodeImage dockertest.Image + coordinatorImage dockertest.Image existingCluster bool dbNodeContainerName string @@ -41,14 +37,14 @@ type SetupOptions func(*setupOptions) // WithDBNodeImage sets an option to use an image name and tag for the DB node. func WithDBNodeImage(name, tag string) SetupOptions { return func(o *setupOptions) { - o.dbNodeImage = Image{Name: name, Tag: tag} + o.dbNodeImage = dockertest.Image{Name: name, Tag: tag} } } // WithCoordinatorImage sets an option to use an image name and tag for the coordinator. func WithCoordinatorImage(name, tag string) SetupOptions { return func(o *setupOptions) { - o.coordinatorImage = Image{Name: name, Tag: tag} + o.coordinatorImage = dockertest.Image{Name: name, Tag: tag} } } diff --git a/src/integration/resources/types.go b/src/integration/resources/types.go index 2becae9745..6cc518b752 100644 --- a/src/integration/resources/types.go +++ b/src/integration/resources/types.go @@ -23,6 +23,7 @@ package resources import ( + "context" "fmt" "strconv" "strings" @@ -206,11 +207,11 @@ type M3Resources interface { type ExternalResources interface { // Setup sets up the external resource so that it's ready // for use. - Setup() error + Setup(ctx context.Context) error // Close stops and cleans up all the resources associated with // the external resource. - Close() error + Close(ctx context.Context) error } // InstanceInfo represents the host information for an instance. diff --git a/src/integration/resources/docker/common.go b/src/x/dockertest/common.go similarity index 75% rename from src/integration/resources/docker/common.go rename to src/x/dockertest/common.go index fb2338dd58..01558b0d98 100644 --- a/src/integration/resources/docker/common.go +++ b/src/x/dockertest/common.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package docker +package dockertest import ( "errors" @@ -34,17 +34,35 @@ var ( networkName = "d-test" volumeName = "d-test" - errClosed = errors.New("container has been closed") + // ErrClosed is a common error for use when a container has been closed. + ErrClosed = errors.New("container has been closed") ) +// Image represents a docker image. +type Image struct { + Name string + Tag string +} + +// GoalStateVerifier asserts that a resource is in a particular state. +// TODO: more info here; this interface is unclear from usage +type GoalStateVerifier func(output string, err error) error + // ResourceOptions returns options for creating // a Resource. +//nolint:maligned type ResourceOptions struct { OverrideDefaults bool Source string ContainerName string Image Image PortList []int + PortMappings map[dc.Port][]dc.PortBinding + NoNetworkOverlay bool + + // Env is the environment for the docker container; it corresponds 1:1 with dockertest.RunOptions. + // Format should be: VAR=value + Env []string // Mounts creates mounts in the container that map back to a resource // on the host system. Mounts []string @@ -54,7 +72,7 @@ type ResourceOptions struct { } // NB: this will fill unset fields with given default values. -func (o ResourceOptions) withDefaults( +func (o ResourceOptions) WithDefaults( defaultOpts ResourceOptions) ResourceOptions { if o.OverrideDefaults { return o @@ -105,7 +123,7 @@ func useImage(opts *dockertest.RunOptions, image Image) *dockertest.RunOptions { } // SetupNetwork sets up a network within docker. -func SetupNetwork(pool *dockertest.Pool) error { +func SetupNetwork(pool *dockertest.Pool, cleanIfExists bool) error { networks, err := pool.Client.ListNetworks() if err != nil { return err @@ -113,6 +131,9 @@ func SetupNetwork(pool *dockertest.Pool) error { for _, n := range networks { if n.Name == networkName { + if !cleanIfExists { + return nil + } if err := pool.Client.RemoveNetwork(networkName); err != nil { return err } @@ -125,7 +146,8 @@ func SetupNetwork(pool *dockertest.Pool) error { return err } -func setupVolume(pool *dockertest.Pool) error { +// SetupVolume creates a default docker volume, with name volumeName (in this package) +func SetupVolume(pool *dockertest.Pool) error { volumes, err := pool.Client.ListVolumes(dc.ListVolumesOptions{}) if err != nil { return err @@ -151,7 +173,8 @@ func setupVolume(pool *dockertest.Pool) error { func exposePorts( opts *dockertest.RunOptions, portList []int, -) *dockertest.RunOptions { + mappings map[dc.Port][]dc.PortBinding, +) (*dockertest.RunOptions, error) { ports := make(map[dc.Port][]dc.PortBinding, len(portList)) for _, p := range portList { port := fmt.Sprintf("%d", p) @@ -168,6 +191,16 @@ func exposePorts( ports[portRepresentation] = entry } + for k, v := range mappings { + if _, ok := ports[k]; ok { + return nil, fmt.Errorf("mapping %s already specified by PortList; "+ + "mappings should be in PortList or PortMappings but not both", + k, + ) + } + ports[k] = v + } + opts.PortBindings = ports - return opts + return opts, nil } diff --git a/src/integration/resources/docker/docker_resource.go b/src/x/dockertest/docker_resource.go similarity index 77% rename from src/integration/resources/docker/docker_resource.go rename to src/x/dockertest/docker_resource.go index 667386d851..c47ca1f69e 100644 --- a/src/integration/resources/docker/docker_resource.go +++ b/src/x/dockertest/docker_resource.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package docker +package dockertest import ( "bytes" @@ -32,22 +32,22 @@ import ( dc "github.com/ory/dockertest/v3/docker" "github.com/ory/dockertest/v3/docker/types/mount" "go.uber.org/zap" - - "github.com/m3db/m3/src/integration/resources" ) // Resource is an object that provides a handle // to a service being spun up via docker. type Resource struct { - closed bool + resource *dockertest.Resource + closed bool logger *zap.Logger - resource *dockertest.Resource - pool *dockertest.Pool + pool *dockertest.Pool } // NewDockerResource creates a new DockerResource. +// If resourceOpts.Image is empty, it will attempt to connect to an existing container. +// Otherwise, it will start the container with the specified image. func NewDockerResource( pool *dockertest.Pool, resourceOpts ResourceOptions, @@ -65,11 +65,33 @@ func NewDockerResource( ) ) - opts := exposePorts(newOptions(containerName), portList) + // TODO: this seems hard to use; a different method might be more appropriate. + if image.Name == "" { + logger.Info("connecting to existing container", zap.String("container", containerName)) + var ok bool + resource, ok := pool.ContainerByName(containerName) + if !ok { + logger.Error("could not find container") + return nil, fmt.Errorf("could not find container %v", containerName) + } + + return &Resource{ + logger: logger, + resource: resource, + pool: nil, + }, nil + } + + opts := newOptions(containerName) + opts, err := exposePorts(opts, portList, resourceOpts.PortMappings) + if err != nil { + return nil, err + } hostConfigOpts := func(c *dc.HostConfig) { - c.AutoRemove = true - c.NetworkMode = networkName + if !resourceOpts.NoNetworkOverlay { + c.NetworkMode = networkName + } // Allow the docker container to call services on the host machine. // Docker for OS X and Windows support the host.docker.internal hostname // natively, but Docker for Linux requires us to register host.docker.internal @@ -88,24 +110,14 @@ func NewDockerResource( c.Mounts = mounts } - var resource *dockertest.Resource - var err error - if image.Name == "" { - logger.Info("connecting to existing container", zap.String("container", containerName)) - var ok bool - resource, ok = pool.ContainerByName(containerName) - if !ok { - logger.Error("could not find container", zap.Error(err)) - return nil, fmt.Errorf("could not find container %v", containerName) - } - } else { - opts = useImage(opts, image) - opts.Mounts = resourceOpts.Mounts - imageWithTag := fmt.Sprintf("%v:%v", image.Name, image.Tag) - logger.Info("running container with options", - zap.String("image", imageWithTag), zap.Any("options", opts)) - resource, err = pool.RunWithOptions(opts, hostConfigOpts) - } + opts = useImage(opts, image) + opts.Mounts = resourceOpts.Mounts + opts.Env = resourceOpts.Env + + imageWithTag := fmt.Sprintf("%v:%v", image.Name, image.Tag) + logger.Info("running container with options", + zap.String("image", imageWithTag), zap.Any("options", opts)) + resource, err := pool.RunWithOptions(opts, hostConfigOpts) if err != nil { logger.Error("could not run container", zap.Error(err)) @@ -135,12 +147,12 @@ func (c *Resource) GetURL(port int, path string) string { // Exec runs commands within a docker container. func (c *Resource) Exec(commands ...string) (string, error) { if c.closed { - return "", errClosed + return "", ErrClosed } // NB: this is prefixed with a `/` that should be trimmed off. name := strings.TrimLeft(c.resource.Container.Name, "/") - logger := c.logger.With(resources.ZapMethod("exec")) + logger := c.logger.With(zap.String("method", "exec")) client := c.pool.Client exec, err := client.CreateExec(dc.CreateExecOptions{ AttachStdout: true, @@ -185,14 +197,14 @@ func (c *Resource) Exec(commands ...string) (string, error) { // GoalStateExec runs commands within a container until // a specified goal state is met. func (c *Resource) GoalStateExec( - verifier resources.GoalStateVerifier, + verifier GoalStateVerifier, commands ...string, ) error { if c.closed { - return errClosed + return ErrClosed } - logger := c.logger.With(resources.ZapMethod("GoalStateExec")) + logger := c.logger.With(zap.String("method", "GoalStateExec")) return c.pool.Retry(func() error { err := verifier(c.Exec(commands...)) if err != nil { @@ -208,16 +220,22 @@ func (c *Resource) GoalStateExec( // Close closes and cleans up the resource. func (c *Resource) Close() error { if c.closed { - c.logger.Error("closing closed resource", zap.Error(errClosed)) - return errClosed + c.logger.Error("closing closed resource", zap.Error(ErrClosed)) + return ErrClosed } c.closed = true c.logger.Info("closing resource") - return c.pool.Purge(c.resource) + return c.pool.Purge(c.Resource()) } // Closed returns true if the resource has been closed. func (c *Resource) Closed() bool { return c.closed } + +// Resource is the underlying dockertest resource used by this Resource. It can be used to perform more advanced +// operations not exposed by this class. +func (c *Resource) Resource() *dockertest.Resource { + return c.resource +} diff --git a/src/x/retry/retry.go b/src/x/retry/retry.go index 0155ad6026..8ec130ebc8 100644 --- a/src/x/retry/retry.go +++ b/src/x/retry/retry.go @@ -21,7 +21,9 @@ package retry import ( + "context" "errors" + "fmt" "math" "time" @@ -112,6 +114,25 @@ func (r *retrier) AttemptWhile(continueFn ContinueFn, fn Fn) error { return r.attempt(continueFn, fn) } +func (r *retrier) AttemptContext(ctx context.Context, fn Fn) error { + contextNotCancelled := func(attempt int) bool { + select { + case <-ctx.Done(): + return false + default: + return true + } + } + err := r.attempt(contextNotCancelled, fn) + if err != nil { + if errors.Is(err, ErrWhileConditionFalse) { + return fmt.Errorf("context canceled while retrying: %w", ctx.Err()) + } + return err + } + return nil +} + func (r *retrier) attempt(continueFn ContinueFn, fn Fn) error { // Always track a call, useful for counting number of total operations. r.metrics.calls.Inc(1) diff --git a/src/x/retry/types.go b/src/x/retry/types.go index ebbcc1aeb5..b388b7a0fe 100644 --- a/src/x/retry/types.go +++ b/src/x/retry/types.go @@ -22,6 +22,7 @@ package retry import ( + "context" "time" "github.com/m3db/m3/src/x/errors" @@ -59,6 +60,9 @@ type Retrier interface { // AttemptWhile will attempt to perform a function with retries. AttemptWhile(continueFn ContinueFn, fn Fn) error + + // AttemptContext attempts fn with retries until ctx is canceled (e.g., due to timeout). + AttemptContext(ctx context.Context, fn Fn) error } // Options is a set of retry options.