Skip to content

Commit

Permalink
feat: specify port binding for docker (#592)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored Aug 22, 2024
1 parent c31d397 commit ff8a8e2
Show file tree
Hide file tree
Showing 22 changed files with 221 additions and 152 deletions.
20 changes: 8 additions & 12 deletions kafkaclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestClient_Ping(t *testing.T) {
kafkaContainer, err := dockerKafka.Setup(pool, t)
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{})
require.NoError(t, err)

Expand Down Expand Up @@ -63,11 +63,7 @@ func TestProducerBatchConsumerGroup(t *testing.T) {
dockerKafka.WithBrokers(3))
require.NoError(t, err)

addresses := make([]string, 0, len(kafkaContainer.Ports))
for i := 0; i < len(kafkaContainer.Ports); i++ {
addresses = append(addresses, fmt.Sprintf("localhost:%s", kafkaContainer.Ports[i]))
}
c, err := New("tcp", addresses, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
c, err := New("tcp", kafkaContainer.Brokers, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

var (
Expand Down Expand Up @@ -201,7 +197,7 @@ func TestConsumer_Partition(t *testing.T) {
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

Expand Down Expand Up @@ -354,7 +350,7 @@ func TestWithSASL(t *testing.T) {
kafkaContainer, err := dockerKafka.Setup(pool, t, containerOptions...)
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{
ClientID: "some-client",
DialTimeout: 10 * time.Second,
Expand Down Expand Up @@ -432,7 +428,7 @@ func TestWithSASLBadCredentials(t *testing.T) {
kafkaContainer, err := dockerKafka.Setup(pool, t, containerOptions...)
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{
ClientID: "some-client",
DialTimeout: 10 * time.Second,
Expand Down Expand Up @@ -465,7 +461,7 @@ func TestProducer_Timeout(t *testing.T) {
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

Expand Down Expand Up @@ -534,7 +530,7 @@ func TestIsProducerErrTemporary(t *testing.T) {
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
c, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

Expand Down Expand Up @@ -707,7 +703,7 @@ func TestConsumerACK(t *testing.T) {
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaHost := kafkaContainer.Brokers[0]
kafkaClient, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion kafkaclient/compression_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func BenchmarkCompression(b *testing.B) {
kafkaContainer, err := kafka.Setup(pool, b, kafka.WithCustomAdvertisedListener(proxyHost))
require.NoError(b, err)

return "localhost:" + kafkaContainer.Ports[0]
return kafkaContainer.Brokers[0]
}

setupProxy := func(b *testing.B, kafkaAddr string, c Compression, bs int, bt time.Duration) (
Expand Down
2 changes: 1 addition & 1 deletion stats/internal/otel/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestZipkinIntegration(t *testing.T) {
var (
om Manager
ctx = context.Background()
zipkinURL = "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinURL = zipkinContainer.URL + "/api/v2/spans"
)
tp, _, err := om.Setup(ctx, res,
WithTracerProvider(zipkinURL,
Expand Down
2 changes: 1 addition & 1 deletion stats/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func TestZipkin(t *testing.T) {
zipkinContainer, err := zipkin.Setup(pool, t)
require.NoError(t, err)

zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinURL := zipkinContainer.URL + "/api/v2/spans"

conf := config.New()
conf.Set("INSTANCE_ID", t.Name())
Expand Down
10 changes: 5 additions & 5 deletions stats/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func TestSpanFromContext(t *testing.T) {
zipkinContainer, err := zipkin.Setup(pool, t)
require.NoError(t, err)

zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinTracesURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/traces?serviceName=" + t.Name()
zipkinURL := zipkinContainer.URL + "/api/v2/spans"
zipkinTracesURL := zipkinContainer.URL + "/api/v2/traces?serviceName=" + t.Name()

c := config.New()
c.Set("INSTANCE_ID", t.Name())
Expand Down Expand Up @@ -98,8 +98,8 @@ func TestAsyncTracePropagation(t *testing.T) {
zipkinContainer, err := zipkin.Setup(pool, t)
require.NoError(t, err)

zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinTracesURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/traces?serviceName=" + t.Name()
zipkinURL := zipkinContainer.URL + "/api/v2/spans"
zipkinTracesURL := zipkinContainer.URL + "/api/v2/traces?serviceName=" + t.Name()

c := config.New()
c.Set("INSTANCE_ID", t.Name())
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestZipkinDownIsNotBlocking(t *testing.T) {
zipkinContainer, err := zipkin.Setup(pool, t)
require.NoError(t, err)

zipkinURL := "http://localhost:" + zipkinContainer.Port + "/api/v2/spans"
zipkinURL := zipkinContainer.URL + "/api/v2/spans"

c := config.New()
c.Set("INSTANCE_ID", t.Name())
Expand Down
20 changes: 20 additions & 0 deletions testhelper/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"regexp"
"strconv"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
)

// GetHostPort returns the desired port mapping
Expand All @@ -26,3 +31,18 @@ func GetHostPort(t testing.TB, port string, container *docker.Container) int {
func ToInternalDockerHost(url string) string {
return regexp.MustCompile(`(localhost|127\.0\.0\.1)`).ReplaceAllString(url, "host.docker.internal")
}

func CreateNetwork(pool *dockertest.Pool, cln resource.Cleaner, prefix string) (*docker.Network, error) {
network, err := pool.Client.CreateNetwork(docker.CreateNetworkOptions{Name: prefix + "_test_" + time.Now().Format("YY-MM-DD-") + rand.String(6)})
if err != nil {
return nil, err
}

cln.Cleanup(func() {
if err := pool.Client.RemoveNetwork(network.ID); err != nil {
cln.Logf("Error while removing Docker network: %v", err)
}
})

return network, nil
}
23 changes: 7 additions & 16 deletions testhelper/docker/resource/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@ package etcd
import (
"context"
"fmt"
"strconv"
"time"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
etcd "go.etcd.io/etcd/client/v3"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/internal"
)

type Resource struct {
Client *etcd.Client
Hosts []string
// HostsInNetwork is the list of ETCD hosts accessible from the provided Docker network (if any).
HostsInNetwork []string
Port int
}

type config struct {
Expand Down Expand Up @@ -50,29 +49,22 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour
Env: []string{
"ALLOW_NONE_AUTHENTICATION=yes",
},
})
if err != nil {
return nil, fmt.Errorf("could not create container: %v", err)
}
PortBindings: internal.IPv4PortBindings([]string{"2379"}),
}, internal.DefaultHostConfig)
cln.Cleanup(func() {
if err := pool.Purge(container); err != nil {
cln.Log(fmt.Errorf("could not purge ETCD resource: %v", err))
}
})
if err != nil {
return nil, fmt.Errorf("could not create container: %v", err)
}

var (
etcdClient *etcd.Client
etcdHosts []string
etcdPort int

etcdPortStr = container.GetPort("2379/tcp")
)
etcdPort, err = strconv.Atoi(etcdPortStr)
if err != nil {
return nil, fmt.Errorf("could not convert port %q to int: %v", etcdPortStr, err)
}

etcdHosts = []string{"http://localhost:" + etcdPortStr}
etcdHosts = []string{"http://" + container.GetBoundIP("2379/tcp") + ":" + container.GetPort("2379/tcp")}

etcdClient, err = etcd.New(etcd.Config{
Endpoints: etcdHosts,
Expand Down Expand Up @@ -102,6 +94,5 @@ func Setup(pool *dockertest.Pool, cln resource.Cleaner, opts ...Option) (*Resour
Client: etcdClient,
Hosts: etcdHosts,
HostsInNetwork: hostsInNetwork,
Port: etcdPort,
}, nil
}
27 changes: 27 additions & 0 deletions testhelper/docker/resource/internal/ports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package internal

import (
"github.com/ory/dockertest/v3/docker"
)

const BindHostIP = "127.0.0.1"

// IPv4PortBindings returns the port bindings for the given exposed ports forcing ipv4 address.
func IPv4PortBindings(exposedPorts []string) map[docker.Port][]docker.PortBinding {
portBindings := make(map[docker.Port][]docker.PortBinding)

for _, exposedPort := range exposedPorts {
portBindings[docker.Port(exposedPort)+"/tcp"] = []docker.PortBinding{
{
HostIP: BindHostIP,
HostPort: "0",
},
}
}

return portBindings
}

func DefaultHostConfig(hc *docker.HostConfig) {
hc.PublishAllPorts = false
}
Loading

0 comments on commit ff8a8e2

Please sign in to comment.