From b4ec814cb4d0fba972abdc46eabba9610fd97394 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Tue, 16 Jan 2018 20:37:37 +0100 Subject: [PATCH 1/8] Add Kafka module --- .../testcontainers/containers/Network.java | 2 +- .../containers/SocatContainer.java | 40 ++++++++++ .../utility/TestcontainersConfiguration.java | 4 + modules/kafka/pom.xml | 38 ++++++++++ .../KafkaContainer.java | 61 +++++++++++++++ .../main/resources/tc-zookeeper.properties | 3 + .../containers/KafkaContainerTest.java | 76 +++++++++++++++++++ .../kafka/src/test/resources/logback-test.xml | 18 +++++ pom.xml | 1 + 9 files changed, 242 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/testcontainers/containers/SocatContainer.java create mode 100644 modules/kafka/pom.xml create mode 100644 modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java create mode 100644 modules/kafka/src/main/resources/tc-zookeeper.properties create mode 100644 modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java create mode 100644 modules/kafka/src/test/resources/logback-test.xml diff --git a/core/src/main/java/org/testcontainers/containers/Network.java b/core/src/main/java/org/testcontainers/containers/Network.java index dc848b02b8c..c2ea8a7b993 100644 --- a/core/src/main/java/org/testcontainers/containers/Network.java +++ b/core/src/main/java/org/testcontainers/containers/Network.java @@ -52,7 +52,7 @@ class NetworkImpl extends ExternalResource implements Network { private final AtomicBoolean initialized = new AtomicBoolean(); @Override - public String getId() { + public synchronized String getId() { if (initialized.compareAndSet(false, true)) { id = create(); } diff --git a/core/src/main/java/org/testcontainers/containers/SocatContainer.java b/core/src/main/java/org/testcontainers/containers/SocatContainer.java new file mode 100644 index 00000000000..7b460c21db7 --- /dev/null +++ b/core/src/main/java/org/testcontainers/containers/SocatContainer.java @@ -0,0 +1,40 @@ +package org.testcontainers.containers; + +import org.testcontainers.utility.TestcontainersConfiguration; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A socat container is used as a TCP proxy, enabling any TCP port of another container to be exposed + * publicly, even if that container does not make the port public itself. + */ +public class SocatContainer extends GenericContainer { + + private final Map targets = new HashMap<>(); + + public SocatContainer() { + super(TestcontainersConfiguration.getInstance().getSocatContainerImage()); + withCreateContainerCmdModifier(it -> it.withEntrypoint("/bin/sh")); + } + + public SocatContainer withTarget(int exposedPort, String host) { + return withTarget(exposedPort, host, exposedPort); + } + + public SocatContainer withTarget(int exposedPort, String host, int internalPort) { + addExposedPort(exposedPort); + targets.put(exposedPort, String.format("%s:%s", host, internalPort)); + return self(); + } + + @Override + protected void configure() { + withCommand("-c", + targets.entrySet().stream() + .map(entry -> "socat TCP-LISTEN:" + entry.getKey() + ",fork,reuseaddr TCP:" + entry.getValue()) + .collect(Collectors.joining(" & ")) + ); + } +} \ No newline at end of file diff --git a/core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java b/core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java index bb7acb0bc9e..890cd0d4279 100644 --- a/core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java +++ b/core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java @@ -30,6 +30,10 @@ public String getAmbassadorContainerImage() { return (String) properties.getOrDefault("ambassador.container.image", "richnorth/ambassador:latest"); } + public String getSocatContainerImage() { + return (String) properties.getOrDefault("socat.container.image", "alpine/socat:latest"); + } + public String getVncRecordedContainerImage() { return (String) properties.getOrDefault("vncrecorder.container.image", "richnorth/vnc-recorder:latest"); } diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml new file mode 100644 index 00000000000..526bfe03af5 --- /dev/null +++ b/modules/kafka/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + + org.testcontainers + testcontainers-parent + 0-SNAPSHOT + ../../pom.xml + + + kafka + TestContainers :: Apache Kafka + + + + ${project.groupId} + testcontainers + ${project.version} + + + + org.apache.kafka + kafka-clients + 1.0.0 + test + + + + org.assertj + assertj-core + 3.8.0 + test + + + + \ No newline at end of file diff --git a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java new file mode 100644 index 00000000000..ac0ffba5391 --- /dev/null +++ b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java @@ -0,0 +1,61 @@ +package org.testcontainers.containers; + +import org.testcontainers.utility.Base58; + +import java.util.stream.Stream; + +public class KafkaContainer extends GenericContainer { + + public static final int KAFKA_PORT = 9092; + + public static final int ZOOKEEPER_PORT = 2181; + + protected SocatContainer proxy; + + public KafkaContainer() { + this("4.0.0"); + } + + public KafkaContainer(String confluencePlatformVersion) { + super("confluentinc/cp-kafka:" + confluencePlatformVersion); + + withNetwork(Network.newNetwork()); + withNetworkAliases("kafka-" + Base58.randomString(6)); + withExposedPorts(KAFKA_PORT, ZOOKEEPER_PORT); + + withEnv("KAFKA_BROKER_ID", "1"); + withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://127.0.0.1:9093"); + withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + + withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); + withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); + withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); + + withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181"); + withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY); + withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run"); + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", proxy.getContainerIpAddress(), proxy.getFirstMappedPort()); + } + + @Override + public void start() { + proxy = new SocatContainer() + .withNetwork(getNetwork()) + .withTarget(9092, getNetworkAliases().get(0)) + .withTarget(2181, getNetworkAliases().get(0)); + + proxy.start(); + withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://127.0.0.1:9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort()); + + super.start(); + } + + @Override + public void stop() { + Stream.of(super::stop, proxy::stop).parallel().forEach(Runnable::run); + } +} \ No newline at end of file diff --git a/modules/kafka/src/main/resources/tc-zookeeper.properties b/modules/kafka/src/main/resources/tc-zookeeper.properties new file mode 100644 index 00000000000..f0cb437927d --- /dev/null +++ b/modules/kafka/src/main/resources/tc-zookeeper.properties @@ -0,0 +1,3 @@ +clientPort=2181 +dataDir=/var/lib/zookeeper/data +dataLogDir=/var/lib/zookeeper/log \ No newline at end of file diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java new file mode 100644 index 00000000000..e05580c521a --- /dev/null +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -0,0 +1,76 @@ +package org.testcontainers.containers; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Test; +import org.rnorth.ducttape.unreliables.Unreliables; + +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.tuple; + +public class KafkaContainerTest { + + @Test + public void testUsage() throws Exception { + try (KafkaContainer kafka = new KafkaContainer()) { + kafka.start(); + + try ( + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString() + ), + new StringSerializer(), + new StringSerializer() + ); + + KafkaConsumer consumer = new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" + ), + new StringDeserializer(), + new StringDeserializer() + ); + ) { + String topicName = "messages"; + consumer.subscribe(Arrays.asList(topicName)); + + producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); + + Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { + ConsumerRecords records = consumer.poll(100); + + if (records.isEmpty()) { + return false; + } + + assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); + + return true; + }); + + consumer.unsubscribe(); + } + + } + } + +} \ No newline at end of file diff --git a/modules/kafka/src/test/resources/logback-test.xml b/modules/kafka/src/test/resources/logback-test.xml new file mode 100644 index 00000000000..7bd6a94d827 --- /dev/null +++ b/modules/kafka/src/test/resources/logback-test.xml @@ -0,0 +1,18 @@ + + + + + + %d{HH:mm:ss.SSS} %-5level %logger - %msg%n + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index bf158718386..c73552febcf 100644 --- a/pom.xml +++ b/pom.xml @@ -165,6 +165,7 @@ modules/postgresql modules/selenium modules/nginx + modules/kafka modules/jdbc-test From 1c4e0df8e9b4a8591fe4ef45be928add47126534 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Wed, 17 Jan 2018 08:11:53 +0100 Subject: [PATCH 2/8] Replace AmbassadorContainer with SocatContainer in DockerComposeContainer --- .../containers/AmbassadorContainer.java | 3 + .../containers/DockerComposeContainer.java | 72 +++++++------------ 2 files changed, 28 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/org/testcontainers/containers/AmbassadorContainer.java b/core/src/main/java/org/testcontainers/containers/AmbassadorContainer.java index 756b88d05ab..0b86821d293 100644 --- a/core/src/main/java/org/testcontainers/containers/AmbassadorContainer.java +++ b/core/src/main/java/org/testcontainers/containers/AmbassadorContainer.java @@ -15,9 +15,12 @@ * An ambassador container is used as a TCP proxy, enabling any TCP port of another linked container to be exposed * publicly, even if that container does not make the port public itself. The richnorth/ambassador:latest * container is used (based on HAProxy). + * + * @deprecated use {@link SocatContainer} */ @EqualsAndHashCode(callSuper = false) @Data +@Deprecated public class AmbassadorContainer> extends GenericContainer { private final String otherContainerName; diff --git a/core/src/main/java/org/testcontainers/containers/DockerComposeContainer.java b/core/src/main/java/org/testcontainers/containers/DockerComposeContainer.java index d4061d9e499..b718351c86b 100644 --- a/core/src/main/java/org/testcontainers/containers/DockerComposeContainer.java +++ b/core/src/main/java/org/testcontainers/containers/DockerComposeContainer.java @@ -11,9 +11,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.SystemUtils; import org.junit.runner.Description; -import org.rnorth.ducttape.ratelimits.RateLimiter; -import org.rnorth.ducttape.ratelimits.RateLimiterBuilder; -import org.rnorth.ducttape.unreliables.Unreliables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.profiler.Profiler; @@ -27,8 +24,15 @@ import org.zeroturnaround.exec.stream.slf4j.Slf4jStream; import java.io.File; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -46,7 +50,6 @@ public class DockerComposeContainer> e * Random identifier which will become part of spawned containers names, so we can shut them down */ private final String identifier; - private final Map ambassadorContainers = new HashMap<>(); private final List composeFiles; private final Set spawnedContainerIds = new HashSet<>(); private final Set spawnedNetworkIds = new HashSet<>(); @@ -56,6 +59,10 @@ public class DockerComposeContainer> e private boolean pull = true; private boolean tailChildContainers; + private final AtomicInteger nextAmbassadorPort = new AtomicInteger(2000); + private final Map> ambassadorPortMappings = new ConcurrentHashMap<>(); + private final SocatContainer ambassadorContainer = new SocatContainer(); + private static final Object MUTEX = new Object(); /** @@ -64,12 +71,6 @@ public class DockerComposeContainer> e */ private Map env = new HashMap<>(); - private static final RateLimiter AMBASSADOR_CREATION_RATE_LIMITER = RateLimiterBuilder - .newBuilder() - .withRate(6, TimeUnit.MINUTES) - .withConstantThroughput() - .build(); - @Deprecated public DockerComposeContainer(File composeFile, String identifier) { this(identifier, composeFile); @@ -201,31 +202,9 @@ private List listChildContainers() { } private void startAmbassadorContainers(Profiler profiler) { - for (final Map.Entry address : ambassadorContainers.entrySet()) { - - try { - // Start any ambassador containers we need - profiler.start("Ambassador container startup"); - - final AmbassadorContainer ambassadorContainer = address.getValue(); - Unreliables.retryUntilSuccess(120, TimeUnit.SECONDS, () -> { - - AMBASSADOR_CREATION_RATE_LIMITER.doWhenReady(() -> { - Profiler localProfiler = profiler.startNested("Ambassador container: " + ambassadorContainer.getContainerName()); - - localProfiler.start("Start ambassador container"); - - ambassadorContainer.start(); - }); - - return null; - }); - } catch (Exception e) { - logger().warn("Exception during ambassador container startup!", e); - } finally { - profiler.stop().log(); - } - } + profiler.start("Ambassador container startup"); + ambassadorContainer.start(); + profiler.stop().log(); } private Logger logger() { @@ -237,8 +216,8 @@ public void finished(Description description) { synchronized (MUTEX) { - // shut down all the ambassador containers - ambassadorContainers.forEach((String address, AmbassadorContainer container) -> container.stop()); + // shut down the ambassador container + ambassadorContainer.stop(); // Kill the services using docker-compose try { @@ -270,7 +249,7 @@ public SELF withExposedService(String serviceName, int servicePort) { } /* - * For every service/port pair that needs to be exposed, we have to start an 'ambassador container'. + * For every service/port pair that needs to be exposed, we register a target on an 'ambassador container'. * * The ambassador container's role is to link (within the Docker network) to one of the * compose services, and proxy TCP network I/O out to a port that the ambassador container @@ -282,13 +261,12 @@ public SELF withExposedService(String serviceName, int servicePort) { * {@link GenericContainer} should ensure that the ambassador container is on the same network * as the rest of the compose environment. */ - AmbassadorContainer ambassadorContainer = - new AmbassadorContainer<>(new FutureContainer(this.identifier + "_" + serviceName), serviceName, servicePort) - .withEnv(env); - - // Ambassador containers will all be started together after docker compose has started - ambassadorContainers.put(serviceName + ":" + servicePort, ambassadorContainer); + // Ambassador container will be started together after docker compose has started + int ambassadorPort = nextAmbassadorPort.getAndIncrement(); + ambassadorPortMappings.computeIfAbsent(serviceName, __ -> new ConcurrentHashMap<>()).put(servicePort, ambassadorPort); + ambassadorContainer.withTarget(ambassadorPort, serviceName, servicePort); + ambassadorContainer.addLink(new FutureContainer(this.identifier + "_" + serviceName), serviceName); return self(); } @@ -307,7 +285,7 @@ public DockerComposeContainer withExposedService(String serviceName, int instanc * @return a host IP address or hostname that can be used for accessing the service container. */ public String getServiceHost(String serviceName, Integer servicePort) { - return ambassadorContainers.get(serviceName + ":" + servicePort).getContainerIpAddress(); + return ambassadorContainer.getContainerIpAddress(); } /** @@ -321,7 +299,7 @@ public String getServiceHost(String serviceName, Integer servicePort) { * @return a port that can be used for accessing the service container. */ public Integer getServicePort(String serviceName, Integer servicePort) { - return ambassadorContainers.get(serviceName + ":" + servicePort).getMappedPort(servicePort); + return ambassadorContainer.getMappedPort(ambassadorPortMappings.get(serviceName).get(servicePort)); } public SELF withScaledService(String serviceBaseName, int numInstances) { From 5bb2e25f432f54a728fbe342ea71939d540269fd Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Wed, 17 Jan 2018 11:24:26 +0100 Subject: [PATCH 3/8] make it possible to use KafkaContainer with external Zookeeper --- .../KafkaContainer.java | 25 +++- .../containers/KafkaContainerTest.java | 124 ++++++++++++------ 2 files changed, 103 insertions(+), 46 deletions(-) diff --git a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java index ac0ffba5391..683431a683e 100644 --- a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java @@ -10,6 +10,8 @@ public class KafkaContainer extends GenericContainer { public static final int ZOOKEEPER_PORT = 2181; + protected String externalZookeeperConnect = null; + protected SocatContainer proxy; public KafkaContainer() { @@ -21,7 +23,7 @@ public KafkaContainer(String confluencePlatformVersion) { withNetwork(Network.newNetwork()); withNetworkAliases("kafka-" + Base58.randomString(6)); - withExposedPorts(KAFKA_PORT, ZOOKEEPER_PORT); + withExposedPorts(KAFKA_PORT); withEnv("KAFKA_BROKER_ID", "1"); withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://127.0.0.1:9093"); @@ -31,10 +33,16 @@ public KafkaContainer(String confluencePlatformVersion) { withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); + } - withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181"); - withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY); - withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run"); + public KafkaContainer withEmbeddedZookeeper() { + externalZookeeperConnect = null; + return self(); + } + + public KafkaContainer withExternalZookeeper(String connectString) { + externalZookeeperConnect = connectString; + return self(); } public String getBootstrapServers() { @@ -51,6 +59,15 @@ public void start() { proxy.start(); withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://127.0.0.1:9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort()); + if (externalZookeeperConnect != null) { + withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect); + } else { + addExposedPort(ZOOKEEPER_PORT); + withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181"); + withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY); + withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run"); + } + super.start(); } diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index e05580c521a..f88cb1703ed 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -16,6 +16,7 @@ import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.tuple; @@ -26,50 +27,89 @@ public class KafkaContainerTest { public void testUsage() throws Exception { try (KafkaContainer kafka = new KafkaContainer()) { kafka.start(); + testKafkaFunctionality(kafka.getBootstrapServers()); + } + } + + @Test + public void testExternalZookeeperWithKafkaNetwork() throws Exception { + try ( + KafkaContainer kafka = new KafkaContainer() + .withExternalZookeeper("zookeeper:2181"); + + GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:4.0.0") + .withNetwork(kafka.getNetwork()) + .withNetworkAliases("zookeeper") + .withEnv("ZOOKEEPER_CLIENT_PORT", "2181"); + ) { + Stream.of(kafka, zookeeper).parallel().forEach(GenericContainer::start); + + testKafkaFunctionality(kafka.getBootstrapServers()); + } + } + + @Test + public void testExternalZookeeperWithExternalNetwork() throws Exception { + try ( + Network network = Network.newNetwork(); + + KafkaContainer kafka = new KafkaContainer() + .withNetwork(network) + .withExternalZookeeper("zookeeper:2181"); + + GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:4.0.0") + .withNetwork(network) + .withNetworkAliases("zookeeper") + .withEnv("ZOOKEEPER_CLIENT_PORT", "2181"); + ) { + Stream.of(kafka, zookeeper).parallel().forEach(GenericContainer::start); + + testKafkaFunctionality(kafka.getBootstrapServers()); + } + } + + protected void testKafkaFunctionality(String bootstrapServers) throws Exception { + try ( + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, + ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString() + ), + new StringSerializer(), + new StringSerializer() + ); + + KafkaConsumer consumer = new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" + ), + new StringDeserializer(), + new StringDeserializer() + ); + ) { + String topicName = "messages"; + consumer.subscribe(Arrays.asList(topicName)); + + producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); + + Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { + ConsumerRecords records = consumer.poll(100); + + if (records.isEmpty()) { + return false; + } + + assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); - try ( - KafkaProducer producer = new KafkaProducer<>( - ImmutableMap.of( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), - ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString() - ), - new StringSerializer(), - new StringSerializer() - ); - - KafkaConsumer consumer = new KafkaConsumer<>( - ImmutableMap.of( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), - ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" - ), - new StringDeserializer(), - new StringDeserializer() - ); - ) { - String topicName = "messages"; - consumer.subscribe(Arrays.asList(topicName)); - - producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); - - Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { - ConsumerRecords records = consumer.poll(100); - - if (records.isEmpty()) { - return false; - } - - assertThat(records) - .hasSize(1) - .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) - .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); - - return true; - }); - - consumer.unsubscribe(); - } + return true; + }); + consumer.unsubscribe(); } } From 6a893dcdd301b6c6fe14db299a6eb3c204aa560b Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Wed, 17 Jan 2018 15:17:29 +0100 Subject: [PATCH 4/8] fix typo --- .../java/org.testcontainers.containers/KafkaContainer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java index 683431a683e..60edc1ec94f 100644 --- a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java @@ -18,8 +18,8 @@ public KafkaContainer() { this("4.0.0"); } - public KafkaContainer(String confluencePlatformVersion) { - super("confluentinc/cp-kafka:" + confluencePlatformVersion); + public KafkaContainer(String confluentPlatformVersion) { + super("confluentinc/cp-kafka:" + confluentPlatformVersion); withNetwork(Network.newNetwork()); withNetworkAliases("kafka-" + Base58.randomString(6)); From 47bedb65a189767b94464e843dd6939249ca4388 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Wed, 17 Jan 2018 15:26:55 +0100 Subject: [PATCH 5/8] fix Kafka tests --- modules/kafka/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml index 526bfe03af5..09a20d01fe0 100644 --- a/modules/kafka/pom.xml +++ b/modules/kafka/pom.xml @@ -34,5 +34,12 @@ test + + com.google.guava + guava + 23.0 + test + + \ No newline at end of file From e93a7c1f63bad4ef03f307014fd3d98dabd5f2e7 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Mon, 22 Jan 2018 08:36:43 +0100 Subject: [PATCH 6/8] Add to CHANGELOG.md, name SocatContainer, add listeners explanation comment to KafkaContainer --- CHANGELOG.md | 3 +++ .../java/org/testcontainers/containers/SocatContainer.java | 2 ++ .../java/org.testcontainers.containers/KafkaContainer.java | 4 +++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bf12b52095..54f50a67436 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. ### Fixed - Fixed retrieval of Docker host IP when running inside Docker. ([\#479](https://github.com/testcontainers/testcontainers-java/issues/479)) +### Changed +- Added Kafka module ([\#546](https://github.com/testcontainers/testcontainers-java/pull/546)) + ## [1.5.1] - 2017-12-19 ### Fixed diff --git a/core/src/main/java/org/testcontainers/containers/SocatContainer.java b/core/src/main/java/org/testcontainers/containers/SocatContainer.java index 7b460c21db7..5949a14490d 100644 --- a/core/src/main/java/org/testcontainers/containers/SocatContainer.java +++ b/core/src/main/java/org/testcontainers/containers/SocatContainer.java @@ -1,5 +1,6 @@ package org.testcontainers.containers; +import org.testcontainers.utility.Base58; import org.testcontainers.utility.TestcontainersConfiguration; import java.util.HashMap; @@ -17,6 +18,7 @@ public class SocatContainer extends GenericContainer { public SocatContainer() { super(TestcontainersConfiguration.getInstance().getSocatContainerImage()); withCreateContainerCmdModifier(it -> it.withEntrypoint("/bin/sh")); + withCreateContainerCmdModifier(it -> it.withName("testcontainers-socat-" + Base58.randomString(8))); } public SocatContainer withTarget(int exposedPort, String host) { diff --git a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java index 60edc1ec94f..8179dd84443 100644 --- a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java @@ -25,11 +25,13 @@ public KafkaContainer(String confluentPlatformVersion) { withNetworkAliases("kafka-" + Base58.randomString(6)); withExposedPorts(KAFKA_PORT); - withEnv("KAFKA_BROKER_ID", "1"); + // Use two listeners with different names, it will force Kafka to communicate with itself via internal + // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://127.0.0.1:9093"); withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + withEnv("KAFKA_BROKER_ID", "1"); withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); From e02ab6e5c4be2a93265534849cc3ef7de90c17f7 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Wed, 24 Jan 2018 14:14:37 +0100 Subject: [PATCH 7/8] listen on alias --- .../KafkaContainer.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java index 8179dd84443..a28867a41c8 100644 --- a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java @@ -22,12 +22,13 @@ public KafkaContainer(String confluentPlatformVersion) { super("confluentinc/cp-kafka:" + confluentPlatformVersion); withNetwork(Network.newNetwork()); - withNetworkAliases("kafka-" + Base58.randomString(6)); + String myNetworkAlias = "kafka-" + Base58.randomString(6); + withNetworkAliases(myNetworkAlias); withExposedPorts(KAFKA_PORT); // Use two listeners with different names, it will force Kafka to communicate with itself via internal // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener - withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://127.0.0.1:9093"); + withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://" + myNetworkAlias + ":9093"); withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); @@ -53,13 +54,14 @@ public String getBootstrapServers() { @Override public void start() { + String myNetworkAlias = getNetworkAliases().get(0); proxy = new SocatContainer() .withNetwork(getNetwork()) - .withTarget(9092, getNetworkAliases().get(0)) - .withTarget(2181, getNetworkAliases().get(0)); + .withTarget(9092, myNetworkAlias) + .withTarget(2181, myNetworkAlias); proxy.start(); - withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://127.0.0.1:9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort()); + withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://" + myNetworkAlias + ":9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort()); if (externalZookeeperConnect != null) { withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect); From b9608e311f831df50b74377cc58e9979bde3e368 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Thu, 25 Jan 2018 08:07:21 +0100 Subject: [PATCH 8/8] rename myNetworkAlias -> networkAlias --- .../KafkaContainer.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java index a28867a41c8..228dc8a3399 100644 --- a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java @@ -22,13 +22,13 @@ public KafkaContainer(String confluentPlatformVersion) { super("confluentinc/cp-kafka:" + confluentPlatformVersion); withNetwork(Network.newNetwork()); - String myNetworkAlias = "kafka-" + Base58.randomString(6); - withNetworkAliases(myNetworkAlias); + String networkAlias = "kafka-" + Base58.randomString(6); + withNetworkAliases(networkAlias); withExposedPorts(KAFKA_PORT); // Use two listeners with different names, it will force Kafka to communicate with itself via internal // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener - withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://" + myNetworkAlias + ":9093"); + withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://" + networkAlias + ":9093"); withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); @@ -54,14 +54,14 @@ public String getBootstrapServers() { @Override public void start() { - String myNetworkAlias = getNetworkAliases().get(0); + String networkAlias = getNetworkAliases().get(0); proxy = new SocatContainer() .withNetwork(getNetwork()) - .withTarget(9092, myNetworkAlias) - .withTarget(2181, myNetworkAlias); + .withTarget(9092, networkAlias) + .withTarget(2181, networkAlias); proxy.start(); - withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://" + myNetworkAlias + ":9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort()); + withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://" + networkAlias + ":9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort()); if (externalZookeeperConnect != null) { withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);