Skip to content

Commit

Permalink
[receiver/kafkametrics] Using unique container networks and container…
Browse files Browse the repository at this point in the history
… names and attempt to fix flaky tests (open-telemetry#28903)

**Description:** <Describe what has changed.>
Using unique container networks and container names and attempt to fix
flaky tests

**Link to tracking Issue:**

open-telemetry#26293

**Testing:**
**Preparation:** 
    DIR = receiver/kafkametricsreceiver
CMD = go test -v -count=1 -race -timeout 360s -parallel 4
-tags=integration,"" -run=Integration ./...

**Tests:**
1. If we manually modify the code(as shown below) and use invalid kafka
broker, such as `localhost:invalid-port`, the same error as shown in the
issue may occur.
    ```
    // receiver/kafkametricsreceiver/integration_test.go
    scraperinttest.WithCustomConfig(
func(t *testing.T, cfg component.Config, ci
*scraperinttest.ContainerInfo) {
            rCfg := cfg.(*Config)
            rCfg.CollectionInterval = 5 * time.Second
            rCfg.Brokers = []string{"localhost:invalid-port"}
            rCfg.Scrapers = []string{"brokers", "consumers", "topics"}
        }),
    ```

2. If we execute the test commands **sequentially** , it seems that the
execution results are all correct.
    ```
    # all result are correct
    for i in {1..100}; do echo "Run $i"; ./${CMD} ; done
    ```

3. If we execute the commands in **parallel** end with **`&`**,
sometimes the error shown in the issue may occur.
    ```
    # sometimes result occur error
    for i in {1..20}; do echo "Run $i"; ./${CMD} &; done
    ```

**Inference:**
I have found that duplicate container networks and container names can
cause container creation to fail or result in successfully created
containers with unavailable ports, which may lead to issues similar to
the one shown.

**Additional information:** 
Since Kafka's startup relies on ZooKeeper (which waits for the default
`zookeeper.connection.timeout.ms=18000`), if Kafka starts first and
ZooKeeper fails to start properly after the timeout duration, it will
cause the Kafka container to fail to start correctly. I found the issue
testcontainers/testcontainers-go#1791 wants to
support that.

**Documentation:**

---------

Signed-off-by: sakulali <sakulali@126.com>
  • Loading branch information
sakulali authored and jmsnll committed Nov 12, 2023
1 parent 9839efa commit 523ff27
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
2 changes: 1 addition & 1 deletion receiver/kafkametricsreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/IBM/sarama v1.41.3
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.4.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.88.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.88.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.88.0
Expand Down Expand Up @@ -44,7 +45,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
Expand Down
20 changes: 12 additions & 8 deletions receiver/kafkametricsreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"go.opentelemetry.io/collector/component"
Expand All @@ -20,13 +21,16 @@ import (
)

const (
networkName = "kafka-network"
kafkaPort = "9092"
zookeeperPort = "2181"
zookeeperHost = "zookeeper"
)

func TestIntegration(t *testing.T) {
uid := fmt.Sprintf("-%s", uuid.NewString())
networkName := "kafka-network" + uid
zkContainerName := "zookeeper" + uid
kafkaContainerName := "kafka" + uid

scraperinttest.NewIntegrationTest(
NewFactory(),
scraperinttest.WithNetworkRequest(
Expand All @@ -37,23 +41,23 @@ func TestIntegration(t *testing.T) {
),
scraperinttest.WithContainerRequest(
testcontainers.ContainerRequest{
Name: "zookeeper",
Name: zkContainerName,
Image: "ubuntu/zookeeper:3.1-22.04_beta",
Networks: []string{networkName},
Hostname: zookeeperHost,
Hostname: zkContainerName,
ExposedPorts: []string{zookeeperPort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(zookeeperPort).WithStartupTimeout(2 * time.Minute),
),
}),
scraperinttest.WithContainerRequest(
testcontainers.ContainerRequest{
Name: "kafka",
Name: kafkaContainerName,
Image: "ubuntu/kafka:3.1-22.04_beta",
Networks: []string{networkName},
ExposedPorts: []string{kafkaPort},
Env: map[string]string{
"ZOOKEEPER_HOST": zookeeperHost,
"ZOOKEEPER_HOST": zkContainerName,
"ZOOKEEPER_PORT": zookeeperPort,
},
WaitingFor: wait.ForAll(
Expand All @@ -65,8 +69,8 @@ func TestIntegration(t *testing.T) {
rCfg := cfg.(*Config)
rCfg.CollectionInterval = 5 * time.Second
rCfg.Brokers = []string{fmt.Sprintf("%s:%s",
ci.HostForNamedContainer(t, "kafka"),
ci.MappedPortForNamedContainer(t, "kafka", kafkaPort))}
ci.HostForNamedContainer(t, kafkaContainerName),
ci.MappedPortForNamedContainer(t, kafkaContainerName, kafkaPort))}
rCfg.Scrapers = []string{"brokers", "consumers", "topics"}
}),
// scraperinttest.WriteExpected(), // TODO remove
Expand Down

0 comments on commit 523ff27

Please sign in to comment.