-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka module #546
Add Kafka module #546
Changes from 1 commit
b4ec814
1c4e0df
5bb2e25
6a893dc
47bedb6
e93a7c1
e02ab6e
b9608e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package org.testcontainers.containers; | ||
|
||
import org.testcontainers.utility.TestcontainersConfiguration; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* A socat container is used as a TCP proxy, enabling any TCP port of another container to be exposed | ||
* publicly, even if that container does not make the port public itself. | ||
*/ | ||
public class SocatContainer extends GenericContainer<SocatContainer> { | ||
|
||
private final Map<Integer, String> targets = new HashMap<>(); | ||
|
||
public SocatContainer() { | ||
super(TestcontainersConfiguration.getInstance().getSocatContainerImage()); | ||
withCreateContainerCmdModifier(it -> it.withEntrypoint("/bin/sh")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a thought - please could we set the container's name to something that people will be able to identify? (i.e. stop people wondering "what's this random socat container?"). A name like I realise that the image |
||
} | ||
|
||
public SocatContainer withTarget(int exposedPort, String host) { | ||
return withTarget(exposedPort, host, exposedPort); | ||
} | ||
|
||
public SocatContainer withTarget(int exposedPort, String host, int internalPort) { | ||
addExposedPort(exposedPort); | ||
targets.put(exposedPort, String.format("%s:%s", host, internalPort)); | ||
return self(); | ||
} | ||
|
||
@Override | ||
protected void configure() { | ||
withCommand("-c", | ||
targets.entrySet().stream() | ||
.map(entry -> "socat TCP-LISTEN:" + entry.getKey() + ",fork,reuseaddr TCP:" + entry.getValue()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure that strange hostname values won't break the shell if we concat the String like this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK any valid hostname should be valid shell argument |
||
.collect(Collectors.joining(" & ")) | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,10 @@ public String getAmbassadorContainerImage() { | |
return (String) properties.getOrDefault("ambassador.container.image", "richnorth/ambassador:latest"); | ||
} | ||
|
||
public String getSocatContainerImage() { | ||
return (String) properties.getOrDefault("socat.container.image", "alpine/socat:latest"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to use Even tho there is a tag, it seems to be old and didn't work for me. But I assume it's fine to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or would it be more responsible to maintain our own image? 😕 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. once we get our name on Docker Hub - sure, for now I would stick with Alpine's |
||
} | ||
|
||
public String getVncRecordedContainerImage() { | ||
return (String) properties.getOrDefault("vncrecorder.container.image", "richnorth/vnc-recorder:latest"); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<parent> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>testcontainers-parent</artifactId> | ||
<version>0-SNAPSHOT</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<artifactId>kafka</artifactId> | ||
<name>TestContainers :: Apache Kafka</name> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>${project.groupId}</groupId> | ||
<artifactId>testcontainers</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> | ||
<version>1.0.0</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.assertj</groupId> | ||
<artifactId>assertj-core</artifactId> | ||
<version>3.8.0</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
</dependencies> | ||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package org.testcontainers.containers; | ||
|
||
import org.testcontainers.utility.Base58; | ||
|
||
import java.util.stream.Stream; | ||
|
||
public class KafkaContainer extends GenericContainer<KafkaContainer> { | ||
|
||
public static final int KAFKA_PORT = 9092; | ||
|
||
public static final int ZOOKEEPER_PORT = 2181; | ||
|
||
protected SocatContainer proxy; | ||
|
||
public KafkaContainer() { | ||
this("4.0.0"); | ||
} | ||
|
||
public KafkaContainer(String confluencePlatformVersion) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK it's WDYT about allowing non-confluent images? We are i.e. using https://hub.docker.com/r/ches/kafka/ for our acceptance-tests. This image is missing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
👍 😄
|
||
super("confluentinc/cp-kafka:" + confluencePlatformVersion); | ||
|
||
withNetwork(Network.newNetwork()); | ||
withNetworkAliases("kafka-" + Base58.randomString(6)); | ||
withExposedPorts(KAFKA_PORT, ZOOKEEPER_PORT); | ||
|
||
withEnv("KAFKA_BROKER_ID", "1"); | ||
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://127.0.0.1:9093"); | ||
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); | ||
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line makes the whole thing work :D Otherwise, Kafka would try to communicate with itself with host's port (yes, even in 1-node scenario (sic!) ) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not directly familiar with Kafka or the 7 environment variables that are being set in this method. Is it possible that someone might want to use alternative values for some of these? If so, they could happily subclass and re-set the env vars they want, so that's OK. However, would it be worth having a comment in the code (similar to your Github comment!) to say which settings are just critical to getting Kafka to work in the Dockerised environment and shouldn't be changed? |
||
|
||
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); | ||
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); | ||
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); | ||
|
||
withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181"); | ||
withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY); | ||
withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've decided to implement all-in-one container simply to speed it up. Starting from Kafka 0.11, Zookeeper is no longer have to be exposed to the clients and becomes a detail of implementation of Kafka, so IMO it's fine There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would personally prefer to have those as separate containers. or at least provide this as an option to configure. this will allow testing scenarios of network partitions between zookeeper and kafka nodes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done ;) |
||
} | ||
|
||
public String getBootstrapServers() { | ||
return String.format("PLAINTEXT://%s:%s", proxy.getContainerIpAddress(), proxy.getFirstMappedPort()); | ||
} | ||
|
||
@Override | ||
public void start() { | ||
proxy = new SocatContainer() | ||
.withNetwork(getNetwork()) | ||
.withTarget(9092, getNetworkAliases().get(0)) | ||
.withTarget(2181, getNetworkAliases().get(0)); | ||
|
||
proxy.start(); | ||
withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://127.0.0.1:9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort()); | ||
|
||
super.start(); | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
Stream.<Runnable>of(super::stop, proxy::stop).parallel().forEach(Runnable::run); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
clientPort=2181 | ||
dataDir=/var/lib/zookeeper/data | ||
dataLogDir=/var/lib/zookeeper/log |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package org.testcontainers.containers; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.junit.Test; | ||
import org.rnorth.ducttape.unreliables.Unreliables; | ||
|
||
import java.util.Arrays; | ||
import java.util.UUID; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.assertj.core.api.Assertions.tuple; | ||
|
||
public class KafkaContainerTest { | ||
|
||
@Test | ||
public void testUsage() throws Exception { | ||
try (KafkaContainer kafka = new KafkaContainer()) { | ||
kafka.start(); | ||
|
||
try ( | ||
KafkaProducer<String, String> producer = new KafkaProducer<>( | ||
ImmutableMap.of( | ||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), | ||
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString() | ||
), | ||
new StringSerializer(), | ||
new StringSerializer() | ||
); | ||
|
||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>( | ||
ImmutableMap.of( | ||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), | ||
ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), | ||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" | ||
), | ||
new StringDeserializer(), | ||
new StringDeserializer() | ||
); | ||
) { | ||
String topicName = "messages"; | ||
consumer.subscribe(Arrays.asList(topicName)); | ||
|
||
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); | ||
|
||
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { | ||
ConsumerRecords<String, String> records = consumer.poll(100); | ||
|
||
if (records.isEmpty()) { | ||
return false; | ||
} | ||
|
||
assertThat(records) | ||
.hasSize(1) | ||
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) | ||
.containsExactly(tuple(topicName, "testcontainers", "rulezzz")); | ||
|
||
return true; | ||
}); | ||
|
||
consumer.unsubscribe(); | ||
} | ||
|
||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
<configuration> | ||
|
||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | ||
<!-- encoders are assigned the type | ||
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> | ||
<encoder> | ||
<pattern>%d{HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<root level="INFO"> | ||
<appender-ref ref="STDOUT"/> | ||
</root> | ||
|
||
<logger name="org.testcontainers" level="DEBUG"/> | ||
<logger name="org.testcontainers.shaded" level="WARN"/> | ||
|
||
</configuration> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nasty bug discovered during my experiments with Kafka module, concurrent access was giving
null
for the second invocationThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried making
id
volatile. This should work and is cheaper under contention thatsynchronized
methodThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a performance critical part (being called just a few times), doesn't make sense to over-optimize it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Train your good habits! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep it simple 😎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't
volatile
as simple? Doesn't really matter for me in this case 😁