diff --git a/docker-compose.yml b/docker-compose.yml index f8146f372c..b45b16aea8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,17 @@ services: volumes: - .:/go/src/github.com/m3db/m3 - /usr/bin/buildkite-agent:/usr/bin/buildkite-agent + # Support running docker within docker. That is, buildkite jobs themselves run in a container; that container + # needs to be able to spin up functioning docker containers. + - /var/run/docker.sock:/var/run/docker.sock + extra_hosts: + # Allow routing from the buildkite container to the host machine, as host.docker.internal. This allows us to do + # the following: + # - Spin up an etcd container with ports published to the host machine + # - Connect to the etcd container from the buildkite test process using host.docker.internal + # See + # https://medium.com/@TimvanBaarsen/how-to-connect-to-the-docker-host-from-inside-a-docker-container-112b4c71bc66 + - "host.docker.internal:host-gateway" environment: - CI - BUILDKITE diff --git a/src/integration/resources/docker/dockerexternal/etcd.go b/src/integration/resources/docker/dockerexternal/etcd.go new file mode 100644 index 0000000000..71383df7cc --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd.go @@ -0,0 +1,279 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net" + "strconv" + "time" + + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge" + xdockertest "github.com/m3db/m3/src/x/dockertest" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/retry" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +var ( + etcdImage = xdockertest.Image{ + Name: "bitnami/etcd", + Tag: "3.5.4", + } +) + +// NewEtcd constructs a single etcd node, running in a docker container. +func NewEtcd( + pool *dockertest.Pool, + instrumentOpts instrument.Options, + options ...EtcdClusterOption, +) (*EtcdNode, error) { + logger := instrumentOpts.Logger() + if logger == nil { + logger = zap.NewNop() + instrumentOpts = instrumentOpts.SetLogger(logger) + } + + var opts etcdClusterOptions + for _, o := range options { + o.apply(&opts) + } + + return &EtcdNode{ + pool: pool, + instrumentOpts: instrumentOpts, + logger: logger, + opts: opts, + // Solely for mocking in tests--unfortunately we don't want to take in the etcd client as a dependency here + // (we don't know the endpoints, and therefore need to construct it ourselves). + // Thus, we do two hops (mock newClient returning mock memberClient) + newClient: func(config clientv3.Config) (memberClient, error) { + return clientv3.New(config) + }, + }, nil +} + +// EtcdNode is a single etcd node, running via a docker container. +//nolint:maligned +type EtcdNode struct { + instrumentOpts instrument.Options + logger *zap.Logger + pool *dockertest.Pool + opts etcdClusterOptions + + // namePrefix is used to name the cluster. Exists solely for unittests in this package; otherwise a const + namePrefix string + newClient func(config clientv3.Config) (memberClient, error) + + // initialized by Setup + address string + resource *xdockertest.Resource + etcdCli *clientv3.Client + bridge *bridge.Bridge + + stopped bool +} + +// Setup starts the docker container. +func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) { + if c.resource != nil { + return errors.New("etcd cluster already started") + } + + // nolint:gosec + id := rand.New(rand.NewSource(time.Now().UnixNano())).Int() + + namePrefix := "m3-test-etcd-" + if c.namePrefix != "" { + // support overriding for tests + namePrefix = c.namePrefix + } + + // Roughly, runs: + // docker run --rm --env ALLOW_NONE_AUTHENTICATION=yes -it --name Etcd bitnami/etcd + // Port 2379 on the container is bound to a free port on the host + resource, err := xdockertest.NewDockerResource(c.pool, xdockertest.ResourceOptions{ + OverrideDefaults: false, + // TODO: what even is this? + Source: "etcd", + ContainerName: fmt.Sprintf("%s%d", namePrefix, id), + Image: etcdImage, + Env: []string{"ALLOW_NONE_AUTHENTICATION=yes"}, + InstrumentOpts: c.instrumentOpts, + PortMappings: map[docker.Port][]docker.PortBinding{ + "2379/tcp": {{ + HostIP: "0.0.0.0", + HostPort: strconv.Itoa(c.opts.port), + }}, + }, + NoNetworkOverlay: true, + }) + + if err != nil { + return fmt.Errorf("starting etcd container: %w", err) + } + + container := resource.Resource().Container + c.logger.Info("etcd container started", + zap.String("containerID", container.ID), + zap.Any("ports", container.NetworkSettings.Ports), + // Uncomment if you need gory details about the container printed; equivalent of `docker inspect + // zap.Any("container", container), + ) + // Extract the port on which we are listening. + // This is coming from the equivalent of docker inspect + portBinds := container.NetworkSettings.Ports["2379/tcp"] + + // If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine. + // See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16 + ipAddr := "127.0.0.1" + _, err = net.ResolveIPAddr("ip4", "host.docker.internal") + if err == nil { + c.logger.Info("Running tests within a docker container (e.g. for buildkite. " + + "Using host.docker.internal to talk to etcd") + ipAddr = "host.docker.internal" + } + + c.resource = resource + c.address = fmt.Sprintf("%s:%s", ipAddr, portBinds[0].HostPort) + + etcdCli, err := clientv3.New( + clientv3.Config{ + Endpoints: []string{c.address}, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + Logger: c.logger, + }, + ) + if err != nil { + return fmt.Errorf("constructing etcd client: %w", err) + } + + defer func() { + if err := etcdCli.Close(); err != nil { + var merr xerrors.MultiError + closeErr = merr. + Add(closeErr). + Add(fmt.Errorf("closing etcd client: %w", err)). + FinalError() + } + }() + + return c.waitForHealth(ctx, etcdCli) +} + +func (c *EtcdNode) containerHostPort() string { + portBinds := c.resource.Resource().Container.NetworkSettings.Ports["2379/tcp"] + + return fmt.Sprintf("127.0.0.1:%s", portBinds[0].HostPort) +} + +func (c *EtcdNode) waitForHealth(ctx context.Context, memberCli memberClient) error { + retrier := retry.NewRetrier(retry.NewOptions(). + SetForever(true). + SetMaxBackoff(5 * time.Second), + ) + + var timeout time.Duration + deadline, ok := ctx.Deadline() + if ok { + timeout = deadline.Sub(time.Now()) + } + c.logger.Info( + "Waiting for etcd to report healthy (via member list)", + zap.String("timeout", timeout.String()), + ) + err := retrier.AttemptContext(ctx, func() error { + _, err := memberCli.MemberList(ctx) + if err != nil { + c.logger.Info( + "Failed connecting to etcd while waiting for container to come up", + zap.Error(err), + zap.String("endpoints", c.address), + ) + } + return err + }) + if err == nil { + c.logger.Info("etcd is healthy") + return nil + } + return fmt.Errorf("waiting for etcd to become healthy: %w", err) +} + +// Close stops the etcd node, and removes it. +func (c *EtcdNode) Close(ctx context.Context) error { + var err xerrors.MultiError + err = err. + Add(c.resource.Close()) + return err.FinalError() +} + +// Address is the host:port of the etcd node for use by etcd clients. +func (c *EtcdNode) Address() string { + return c.address +} + +// Stop stops the etcd container, but does not purge it. A stopped container can be restarted with Restart. +func (c *EtcdNode) Stop(ctx context.Context) error { + if c.stopped { + return errors.New("etcd node is already stopped") + } + if err := c.pool.Client.StopContainerWithContext(c.resource.Resource().Container.ID, 0, ctx); err != nil { + return err + } + c.stopped = true + return nil +} + +// Restart restarts the etcd container. If it isn't currently stopped, the etcd container will be stopped and then +// started; else it will just be start. +func (c *EtcdNode) Restart(ctx context.Context) error { + if !c.stopped { + c.logger.Info("Stopping etcd node") + + if err := c.Stop(ctx); err != nil { + return fmt.Errorf("stopping etcd node for Restart: %w", err) + } + } + err := c.pool.Client.StartContainerWithContext(c.resource.Resource().Container.ID, nil, ctx) + if err != nil { + return fmt.Errorf("starting etcd node for Restart: %w", err) + } + c.stopped = false + return nil +} + +var _ memberClient = (*clientv3.Client)(nil) + +// memberClient exposes just one method of *clientv3.Client, for purposes of tests. +type memberClient interface { + MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) +} diff --git a/src/integration/resources/docker/dockerexternal/etcd_options.go b/src/integration/resources/docker/dockerexternal/etcd_options.go new file mode 100644 index 0000000000..e4a9addee5 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd_options.go @@ -0,0 +1,57 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +// EtcdClusterOption configure an etcd cluster. +type EtcdClusterOption interface { + apply(opts *etcdClusterOptions) +} + +type etcdClusterOptions struct { + useBridge bool + port int +} + +// EtcdClusterUseBridge configures an EtcdNode to insert a networking "bridge" between the etcd container and the +// calling processes. The bridge intercepts network traffic, and forwards it, unless told not to via e.g. Blackhole(). +// See the bridge package. +// As noted in that package, this implementation is lifted directly from the etcd/integration package; all credit goes +// to etcd authors for the approach. +func EtcdClusterUseBridge(shouldUseBridge bool) EtcdClusterOption { + return useBridge(shouldUseBridge) +} + +type useBridge bool + +func (u useBridge) apply(opts *etcdClusterOptions) { + opts.useBridge = bool(u) +} + +// EtcdClusterPort sets a specific port for etcd to listen on. Default is to listen on :0 (any free port). +func EtcdClusterPort(port int) EtcdClusterOption { + return withPort(port) +} + +type withPort int + +func (w withPort) apply(opts *etcdClusterOptions) { + opts.port = int(w) +} diff --git a/src/integration/resources/docker/dockerexternal/etcd_test.go b/src/integration/resources/docker/dockerexternal/etcd_test.go new file mode 100644 index 0000000000..a267afb75c --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd_test.go @@ -0,0 +1,184 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +import ( + "context" + "math/rand" + "strconv" + "strings" + "testing" + "time" + + "github.com/m3db/m3/src/x/instrument" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +const ( + testKey = "etcd-test" +) + +var ( + testLogger, _ = zap.NewDevelopment() +) + +type etcdTestDeps struct { + Pool *dockertest.Pool + InstrumentOpts instrument.Options + Etcd *EtcdNode +} + +func setupEtcdTest(t *testing.T) etcdTestDeps { + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + iopts := instrument.NewOptions().SetLogger(testLogger) + c, err := NewEtcd(pool, iopts) + require.NoError(t, err) + + return etcdTestDeps{ + Pool: pool, + Etcd: c, + InstrumentOpts: iopts, + } +} + +func TestCluster(t *testing.T) { + t.Run("starts a functioning cluster", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + require.NoError(t, deps.Etcd.Setup(ctx)) + + t.Cleanup(func() { + require.NoError(t, deps.Etcd.Close(ctx)) + }) + + cli, err := clientv3.New( + clientv3.Config{ + Endpoints: []string{deps.Etcd.Address()}, + }, + ) + require.NoError(t, err) + + //nolint:gosec + testVal := strconv.Itoa(rand.Intn(10000)) + _, err = cli.Put(ctx, testKey, testVal) + require.NoError(t, err) + + actualVal, err := cli.Get(ctx, testKey) + require.NoError(t, err) + + assert.Equal(t, testVal, string(actualVal.Kvs[0].Value)) + }) + + t.Run("can run multiple at once", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + + require.NoError(t, deps.Etcd.Setup(ctx)) + defer func() { + require.NoError(t, deps.Etcd.Close(ctx)) + }() + + c2, err := NewEtcd(deps.Pool, deps.InstrumentOpts) + require.NoError(t, err) + require.NoError(t, c2.Setup(ctx)) + defer func() { + require.NoError(t, c2.Close(ctx)) + }() + }) + + t.Run("cleans up containers on shutdown", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + testPrefix := "cleanup-test-" + deps.Etcd.namePrefix = testPrefix + + findContainers := func(namePrefix string, pool *dockertest.Pool) ([]docker.APIContainers, error) { + containers, err := deps.Pool.Client.ListContainers(docker.ListContainersOptions{}) + if err != nil { + return nil, err + } + + var rtn []docker.APIContainers + for _, ct := range containers { + for _, name := range ct.Names { + // Docker response prefixes the container name with / regardless of what you give it as input. + if strings.HasPrefix(name, "/"+namePrefix) { + rtn = append(rtn, ct) + break + } + } + } + return rtn, nil + } + + require.NoError(t, deps.Etcd.Setup(ctx)) + + cts, err := findContainers(testPrefix, deps.Pool) + require.NoError(t, err) + assert.Len(t, cts, 1) + + require.NoError(t, deps.Etcd.Close(ctx)) + cts, err = findContainers(testPrefix, deps.Pool) + require.NoError(t, err) + assert.Len(t, cts, 0) + }) +} + +func TestCluster_waitForHealth(t *testing.T) { + t.Run("errors when context is canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + deps := setupEtcdTest(t) + etcdCli := fakeMemberClient{err: assert.AnError} + cancel() + require.EqualError( + t, + deps.Etcd.waitForHealth(ctx, etcdCli), + "waiting for etcd to become healthy: context canceled while retrying: context canceled", + ) + }) +} + +type fakeMemberClient struct { + err error +} + +func (f fakeMemberClient) MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) { + return nil, f.err +} + +func newTestContext() (context.Context, func()) { + return context.WithTimeout(context.Background(), 10*time.Second) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md new file mode 100644 index 0000000000..860656226a --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md @@ -0,0 +1,11 @@ +# Fork Notice + +This code is taken very directly from the etcd codebase (as is allowed by the Apache License). +The original license is included in all files. + +The file was copied from: https://github.com/etcd-io/etcd/blob/cdd2b737f04812a919a5735380fdaa1f932346d0/tests/framework/integration/bridge.go + +As of this notice only slight modifications have been made--primarily to satisfy linters +(lint ignores, public method comments). + +Thank you to the etcd maintainers for use of this code! \ No newline at end of file diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go new file mode 100644 index 0000000000..f8ff1525e4 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go @@ -0,0 +1,256 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bridge + +import ( + "io" + "net" + "sync" +) + +// Dialer makes TCP connections. +type Dialer interface { + Dial() (net.Conn, error) +} + +// Bridge proxies connections between listener and dialer, making it possible +// to disconnect grpc network connections without closing the logical grpc connection. +type Bridge struct { + dialer Dialer + l net.Listener + conns map[*bridgeConn]struct{} + + stopc chan struct{} + pausec chan struct{} + blackholec chan struct{} + wg sync.WaitGroup + + mu sync.Mutex +} + +// New constructs a bridge listening to the given listener and connecting using the given dialer. +func New(dialer Dialer, listener net.Listener) (*Bridge, error) { + b := &Bridge{ + // Bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number + dialer: dialer, + l: listener, + conns: make(map[*bridgeConn]struct{}), + stopc: make(chan struct{}), + pausec: make(chan struct{}), + blackholec: make(chan struct{}), + } + close(b.pausec) + b.wg.Add(1) + go b.serveListen() + return b, nil +} + +// Close stops the bridge. +func (b *Bridge) Close() { + //nolint:errcheck + b.l.Close() + b.mu.Lock() + select { + case <-b.stopc: + default: + close(b.stopc) + } + b.mu.Unlock() + b.wg.Wait() +} + +// DropConnections drops connections to the bridge. +func (b *Bridge) DropConnections() { + b.mu.Lock() + defer b.mu.Unlock() + for bc := range b.conns { + bc.Close() + } + b.conns = make(map[*bridgeConn]struct{}) +} + +// PauseConnections pauses all connections. +func (b *Bridge) PauseConnections() { + b.mu.Lock() + b.pausec = make(chan struct{}) + b.mu.Unlock() +} + +// UnpauseConnections unpauses all connections. +func (b *Bridge) UnpauseConnections() { + b.mu.Lock() + select { + case <-b.pausec: + default: + close(b.pausec) + } + b.mu.Unlock() +} + +func (b *Bridge) serveListen() { + defer func() { + //nolint:errcheck + b.l.Close() + b.mu.Lock() + for bc := range b.conns { + bc.Close() + } + b.mu.Unlock() + b.wg.Done() + }() + + for { + inc, ierr := b.l.Accept() + if ierr != nil { + return + } + b.mu.Lock() + pausec := b.pausec + b.mu.Unlock() + select { + case <-b.stopc: + //nolint:errcheck + inc.Close() + return + case <-pausec: + } + + outc, oerr := b.dialer.Dial() + if oerr != nil { + //nolint:errcheck + inc.Close() + return + } + + bc := &bridgeConn{inc, outc, make(chan struct{})} + b.wg.Add(1) + b.mu.Lock() + b.conns[bc] = struct{}{} + go b.serveConn(bc) + b.mu.Unlock() + } +} + +func (b *Bridge) serveConn(bc *bridgeConn) { + defer func() { + close(bc.donec) + bc.Close() + b.mu.Lock() + delete(b.conns, bc) + b.mu.Unlock() + b.wg.Done() + }() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + //nolint:errcheck + b.ioCopy(bc.out, bc.in) + bc.close() + wg.Done() + }() + go func() { + //nolint:errcheck + b.ioCopy(bc.in, bc.out) + bc.close() + wg.Done() + }() + wg.Wait() +} + +type bridgeConn struct { + in net.Conn + out net.Conn + donec chan struct{} +} + +func (bc *bridgeConn) Close() { + bc.close() + <-bc.donec +} + +func (bc *bridgeConn) close() { + //nolint:errcheck + bc.in.Close() + //nolint:errcheck + bc.out.Close() +} + +// Blackhole stops connections to the bridge. +func (b *Bridge) Blackhole() { + b.mu.Lock() + close(b.blackholec) + b.mu.Unlock() +} + +// Unblackhole stops connections to the bridge. +func (b *Bridge) Unblackhole() { + b.mu.Lock() + for bc := range b.conns { + bc.Close() + } + b.conns = make(map[*bridgeConn]struct{}) + b.blackholec = make(chan struct{}) + b.mu.Unlock() +} + +// ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer +func (b *Bridge) ioCopy(dst io.Writer, src io.Reader) (err error) { + buf := make([]byte, 32*1024) + for { + select { + case <-b.blackholec: + //nolint:errcheck + io.Copy(io.Discard, src) + return nil + default: + } + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if ew != nil { + return ew + } + if nr != nw { + return io.ErrShortWrite + } + } + if er != nil { + err = er + break + } + } + return err +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go new file mode 100644 index 0000000000..a83af92fe1 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go @@ -0,0 +1,287 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package etcdintegration is a mostly drop-in replacement for the etcd integration +// (github.com/etcd-io/etcd/tests/v3/framework/integration) package. +// Instead of starting etcd within this Go process, it starts etcd using a docker container. +package etcdintegration + +import ( + "context" + "fmt" + "math/rand" + "net" + "time" + + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/retry" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" +) + +const ( + startTimeout = 30 * time.Second + stopTimeout = 30 * time.Second + + clientHealthTimeout = 30 * time.Second +) + +// ClusterConfig configures an etcd integration test cluster. +type ClusterConfig struct { + // Size is the number of nodes in the cluster. Provided as a parameter to be API compatible with the etcd package, + // but currently only one node is supported. + Size int + + // UseBridge enables a networking bridge on etcd members, accessible via Node.Bridge(). This allows manipulation + // of connections to particular members. + UseBridge bool +} + +// Cluster is an etcd cluster. Currently, the implementation is such that only one node clusters are allowed. +type Cluster struct { + // Members are the etcd nodes that make up the cluster. + Members []*Node + + terminated bool +} + +// NewCluster starts an etcd cluster using docker. +func NewCluster(t testingT, cfg *ClusterConfig) *Cluster { + if cfg.Size > 1 { + t.Errorf("NewCluster currently only supports single node clusters") + t.FailNow() + return nil + } + + logger := zaptest.NewLogger(t) + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + r, err := dockerexternal.NewEtcd(pool, instrument.NewOptions(), dockerexternal.EtcdClusterUseBridge(cfg.UseBridge)) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), startTimeout) + defer cancel() + + cluster := &Cluster{ + Members: []*Node{newNode(r, logger, cfg)}, + } + + require.NoError(t, cluster.start(ctx)) + + // Paranoia: try to ensure that we cleanup the containers, even if our callers mess up. + t.Cleanup(func() { + if !cluster.terminated { + cluster.Terminate(t) + } + }) + return cluster +} + +// start is private because NewCluster is intended to always start the cluster. +func (c *Cluster) start(ctx context.Context) error { + var merr xerrors.MultiError + for _, m := range c.Members { + merr = merr.Add(m.start(ctx)) + } + if err := merr.FinalError(); err != nil { + return fmt.Errorf("failed starting etcd cluster: %w", err) + } + return nil +} + +// RandClient returns a client from any member in the cluster. +func (c *Cluster) RandClient() *clientv3.Client { + //nolint:gosec + return c.Members[rand.Intn(len(c.Members))].Client +} + +// Terminate stops all nodes in the cluster. +func (c *Cluster) Terminate(t testingT) { + ctx, cancel := context.WithTimeout(context.Background(), stopTimeout) + defer cancel() + + c.terminated = true + + var err xerrors.MultiError + for _, node := range c.Members { + err = err.Add(node.close(ctx)) + } + require.NoError(t, err.FinalError()) +} + +// Node is a single etcd server process, running in a docker container. +type Node struct { + Client *clientv3.Client + + resource dockerEtcd + cfg *ClusterConfig + logger *zap.Logger + bridge *bridge.Bridge +} + +func newNode(r dockerEtcd, logger *zap.Logger, cfg *ClusterConfig) *Node { + return &Node{ + resource: r, + logger: logger, + cfg: cfg, + } +} + +// Stop stops the etcd container, but doesn't remove it. +func (n *Node) Stop(t testingT) { + ctx, cancel := context.WithTimeout(context.Background(), stopTimeout) + defer cancel() + require.NoError(t, n.resource.Stop(ctx)) + + if n.bridge != nil { + n.bridge.Close() + } +} + +// Bridge can be used to manipulate connections to this etcd node. It +// is a man-in-the-middle listener which mostly transparently forwards connections, unless told to drop them via e.g. +// the Blackhole method. +// Bridge will only be active if cfg.UseBridge is true; calling this method otherwise will panic. +func (n *Node) Bridge() *bridge.Bridge { + if !n.cfg.UseBridge { + panic("EtcdNode wasn't configured to use a Bridge; pass EtcdClusterUseBridge(true) to enable.") + } + return n.bridge +} + +// Restart starts a stopped etcd container, stopping it first if it's not already. +func (n *Node) Restart(t testingT) error { + ctx, cancel := context.WithTimeout(context.Background(), startTimeout) + defer cancel() + require.NoError(t, n.resource.Restart(ctx)) + return nil +} + +// start starts the etcd node. It is private because it isn't part of the etcd/integration package API, and +// should only be called by Cluster.start. +func (n *Node) start(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, startTimeout) + defer cancel() + + if err := n.resource.Setup(ctx); err != nil { + return err + } + + address := n.resource.Address() + if n.cfg.UseBridge { + addr, err := n.setupBridge() + if err != nil { + return fmt.Errorf("setting up connection bridge for etcd node: %w", err) + } + address = addr + } + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"http://" + address}, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + DialTimeout: 5 * time.Second, + Logger: n.logger, + }) + + if err != nil { + return fmt.Errorf("constructing etcd client for member: %w", err) + } + + n.logger.Info("Connecting to docker etcd using host machine port", + zap.String("endpoint", address), + ) + + n.Client = etcdCli + return nil +} + +// setupBridge puts a man-in-the-middle listener in between the etcd docker process and the client. See Bridge() for +// details. +// Returns the new address of the bridge, which clients should connect to. +func (n *Node) setupBridge() (string, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", fmt.Errorf("setting up listener for bridge: %w", err) + } + + n.logger.Info("etcd bridge is listening", zap.String("addr", listener.Addr().String())) + + // dialer = make connections to the etcd container + // listener = the bridge's inbounds + n.bridge, err = bridge.New(dialer{hostport: n.resource.Address()}, listener) + if err != nil { + return "", err + } + + return listener.Addr().String(), nil +} + +func (n *Node) close(ctx context.Context) error { + var err xerrors.MultiError + err = err.Add(n.Client.Close()) + return err.Add(n.resource.Close(ctx)).FinalError() +} + +type dialer struct { + hostport string +} + +func (d dialer) Dial() (net.Conn, error) { + return net.Dial("tcp", d.hostport) +} + +// testingT wraps *testing.T. Allows us to not directly depend on *testing package. +type testingT interface { + zaptest.TestingT + require.TestingT + + Cleanup(func()) +} + +// BeforeTestExternal -- solely here to match etcd API's. +func BeforeTestExternal(t testingT) {} + +// WaitClientV3 waits for an etcd client to be healthy. +func WaitClientV3(t testingT, kv clientv3.KV) { + ctx, cancel := context.WithTimeout(context.Background(), clientHealthTimeout) + defer cancel() + + err := retry.NewRetrier(retry.NewOptions().SetForever(true)).AttemptContext( + ctx, + func() error { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _, err := kv.Get(ctx, "/") + return err + }, + ) + + require.NoError(t, err) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go new file mode 100644 index 0000000000..ed84e57362 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go @@ -0,0 +1,80 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package etcdintegration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testKey = "test-key" +) + +func TestCluster(t *testing.T) { + c := NewCluster(t, &ClusterConfig{Size: 1}) + + defer c.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + testVal := newTestValue(t) + _, err := c.RandClient().Put(ctx, testKey, testVal) + require.NoError(t, err) + + resp, err := c.RandClient().Get(ctx, testKey) + require.NoError(t, err) + assert.Equal(t, testVal, string(resp.Kvs[0].Value)) +} + +func TestCluster_withBridge(t *testing.T) { + c := NewCluster(t, &ClusterConfig{Size: 1, UseBridge: true}) + + defer c.Terminate(t) + + c.Members[0].Bridge().DropConnections() + c.Members[0].Bridge().Blackhole() + + ctx := context.Background() + + blackholedCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + _, err := c.RandClient().MemberList(blackholedCtx) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + + c.Members[0].Bridge().Unblackhole() + + availCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + _, err = c.RandClient().MemberList(availCtx) + require.NoError(t, err) +} + +func newTestValue(t *testing.T) string { + return fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/types.go b/src/integration/resources/docker/dockerexternal/etcdintegration/types.go new file mode 100644 index 0000000000..7e9dc35ca9 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/types.go @@ -0,0 +1,34 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package etcdintegration + +import "context" + +// dockerEtcd are the operations we need out of a docker based etcd process. It's implemented by at least +// resources/dockerexternal.EtcdNode +type dockerEtcd interface { + Setup(ctx context.Context) error + Close(ctx context.Context) error + + Stop(ctx context.Context) error + Restart(ctx context.Context) error + Address() string +} diff --git a/src/x/dockertest/common.go b/src/x/dockertest/common.go index 01558b0d98..0cb4938651 100644 --- a/src/x/dockertest/common.go +++ b/src/x/dockertest/common.go @@ -58,6 +58,8 @@ type ResourceOptions struct { Image Image PortList []int PortMappings map[dc.Port][]dc.PortBinding + + // NoNetworkOverlay if set, disables use of the default integration testing network we create (networkName). NoNetworkOverlay bool // Env is the environment for the docker container; it corresponds 1:1 with dockertest.RunOptions. @@ -111,8 +113,7 @@ func (o ResourceOptions) WithDefaults( func newOptions(name string) *dockertest.RunOptions { return &dockertest.RunOptions{ - Name: name, - NetworkID: networkName, + Name: name, } } diff --git a/src/x/dockertest/docker_resource.go b/src/x/dockertest/docker_resource.go index c47ca1f69e..927539bcea 100644 --- a/src/x/dockertest/docker_resource.go +++ b/src/x/dockertest/docker_resource.go @@ -83,6 +83,9 @@ func NewDockerResource( } opts := newOptions(containerName) + if !resourceOpts.NoNetworkOverlay { + opts.NetworkID = networkName + } opts, err := exposePorts(opts, portList, resourceOpts.PortMappings) if err != nil { return nil, err