diff --git a/pom.xml b/pom.xml index 62ca46e..8b9407e 100644 --- a/pom.xml +++ b/pom.xml @@ -89,14 +89,15 @@ 11 - 1.17.4 + 1.19.3 4.7.2 2.17.1 - 3.2.13 + 3.3.3 3.2.3 1.7.32 2.13.4 2.13.4.2 + 2.1.7 5.9.1 @@ -172,6 +173,11 @@ jackson-databind ${fasterxml.jackson-databind.version} + + eu.rekawek.toxiproxy + toxiproxy-java + ${toxiproxy.java.version} + diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java index 484cee6..f6df73c 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java @@ -6,6 +6,8 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import com.github.dockerjava.api.model.ContainerNetwork; +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.ToxiproxyClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; @@ -16,6 +18,7 @@ import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.MountableFile; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -75,7 +78,8 @@ public class StrimziKafkaContainer extends GenericContainer imageName) { @Override protected void doStart() { - if (proxyContainer != null && !proxyContainer.isRunning()) { - proxyContainer.start(); + if (this.proxyContainer != null && !this.proxyContainer.isRunning()) { + this.proxyContainer.start(); + + // Instantiate a ToxiproxyClient if it has not been previously provided via configuration settings. + if (toxiproxyClient == null) { + toxiproxyClient = new ToxiproxyClient(this.proxyContainer.getHost(), this.proxyContainer.getControlPort()); + } } + if (!this.imageNameProvider.isDone()) { this.imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(this.kafkaVersion)); } @@ -337,9 +347,7 @@ public String getInternalZooKeeperConnect() { public String getBootstrapServers() { if (proxyContainer != null) { // returning the proxy host and port for indirect connection - return String.format("PLAINTEXT://%s:%d", - getProxy().getContainerIpAddress(), - getProxy().getProxyPort()); + return String.format("PLAINTEXT://%s", getProxy().getListen()); } return bootstrapServersProvider.apply(this); } @@ -470,17 +478,33 @@ public StrimziKafkaContainer withProxyContainer(final ToxiproxyContainer proxyCo } /** - * Returns the proxy for this Kafka broker if configured. + * Retrieves a synchronized Proxy instance for this Kafka broker. + * + * This method ensures that only one instance of Proxy is created per broker. If the proxy has not been + * initialized, it attempts to create one using the Toxiproxy client. If the Toxiproxy client is not initialized, + * it is created using the host and control port of the proxy container. * - * @return ToxiproxyContainer.ContainerProxy instance + * @return Proxy instance for this Kafka broker. + * @throws IllegalStateException if the proxy container has not been configured. + * @throws RuntimeException if an IOException occurs during the creation of the Proxy. */ - public synchronized ToxiproxyContainer.ContainerProxy getProxy() { - if (proxyContainer == null) { + public synchronized Proxy getProxy() { + if (this.proxyContainer == null) { throw new IllegalStateException("The proxy container has not been configured"); } - if (proxy == null) { - this.proxy = proxyContainer.getProxy(this, KAFKA_PORT); + + if (this.proxy == null) { + if (this.toxiproxyClient == null) { + this.toxiproxyClient = new ToxiproxyClient(proxyContainer.getHost(), proxyContainer.getControlPort()); + } + try { + final int listenPort = 8666 + this.brokerId; + this.proxy = this.toxiproxyClient.createProxy("kafka" + this.brokerId, "0.0.0.0:" + listenPort, "toxiproxy:" + Utils.getFreePort()); + } catch (IOException e) { + LOGGER.error("Error happened during creation of the Proxy: {}", e.getMessage()); + throw new RuntimeException(e); + } } - return proxy; + return this.proxy; } } diff --git a/src/main/java/io/strimzi/test/container/Utils.java b/src/main/java/io/strimzi/test/container/Utils.java index 786122a..8b35808 100644 --- a/src/main/java/io/strimzi/test/container/Utils.java +++ b/src/main/java/io/strimzi/test/container/Utils.java @@ -4,8 +4,10 @@ */ package io.strimzi.test.container; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.net.ServerSocket; import java.util.function.BooleanSupplier; import org.slf4j.Logger; @@ -79,5 +81,16 @@ static long waitFor(String description, long pollIntervalMs, long timeoutMs, Boo } } - + /** + * Finds a free server port which can be used by the web server + * + * @return A free TCP port + */ + public static int getFreePort() { + try (ServerSocket serverSocket = new ServerSocket(0)) { + return serverSocket.getLocalPort(); + } catch (IOException e) { + throw new RuntimeException("Failed to find free port", e); + } + } } diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java index e27e5d6..686a76d 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterIT.java @@ -7,6 +7,8 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; + +import eu.rekawek.toxiproxy.Proxy; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -130,7 +132,7 @@ void testKafkaClusterFunctionality() throws InterruptedException, ExecutionExcep @Test void testStartClusterWithProxyContainer() { ToxiproxyContainer proxyContainer = new ToxiproxyContainer( - DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.4.0") + DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0") .asCompatibleSubstituteFor("shopify/toxiproxy")); StrimziKafkaCluster kafkaCluster = new StrimziKafkaCluster(3, proxyContainer); @@ -138,7 +140,7 @@ void testStartClusterWithProxyContainer() { List bootstrapUrls = new ArrayList<>(); for (KafkaContainer kafkaContainer : kafkaCluster.getBrokers()) { - ToxiproxyContainer.ContainerProxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy(); + Proxy proxy = ((StrimziKafkaContainer) kafkaContainer).getProxy(); assertThat(proxy, notNullValue()); bootstrapUrls.add(kafkaContainer.getBootstrapServers()); } diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java index 03e95c2..5f15753 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java @@ -319,7 +319,7 @@ void testStartBrokerWithProxyContainer(final String imageName) { assumeDocker(); ToxiproxyContainer proxyContainer = new ToxiproxyContainer( - DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.4.0") + DockerImageName.parse("ghcr.io/shopify/toxiproxy:2.6.0") .asCompatibleSubstituteFor("shopify/toxiproxy")); systemUnderTest = new StrimziKafkaContainer(imageName) @@ -327,9 +327,8 @@ void testStartBrokerWithProxyContainer(final String imageName) { .waitForRunning(); systemUnderTest.start(); - ToxiproxyContainer.ContainerProxy proxy = systemUnderTest.getProxy(); assertThat(systemUnderTest.getBootstrapServers(), - is(String.format("PLAINTEXT://%s:%d", proxy.getContainerIpAddress(), proxy.getProxyPort()))); + is(String.format("PLAINTEXT://%s", systemUnderTest.getProxy().getListen()))); systemUnderTest.stop(); }