Skip to content

Commit

Permalink
Multi-broker Kafka support and test
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 20, 2019
1 parent 3806235 commit 904782e
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,51 @@
*
*/
public class KafkaContainer extends GenericContainer<KafkaContainer> {
public static final int DEFAULT_KAFKA_PORT = 9093;

public static final int KAFKA_PORT = 9093;
public static final String DEFAULT_KAFKA_BROKER_ID = "1";

public static final int DEFAULT_INTERNAL_TOPIC_RF = 1;

public static final int ZOOKEEPER_PORT = 2181;

public static final String CONFLUENT_PLATFORM_VERSION = "5.2.1";

private static final int PORT_NOT_ASSIGNED = -1;

protected String externalZookeeperConnect = null;

private int port = PORT_NOT_ASSIGNED;
private int exposedPort;

public KafkaContainer() {
this("5.2.1");
this(CONFLUENT_PLATFORM_VERSION);
}

public KafkaContainer(String confluentPlatformVersion) {
this(confluentPlatformVersion, DEFAULT_KAFKA_PORT, DEFAULT_KAFKA_BROKER_ID, DEFAULT_INTERNAL_TOPIC_RF);
}

public KafkaContainer(String confluentPlatformVersion, int exposedPort, String brokerId, int internalTopicRf) {
super(TestcontainersConfiguration.getInstance().getKafkaImage() + ":" + confluentPlatformVersion);

this.exposedPort = exposedPort;
// TODO Only for backward compatibility
withNetwork(Network.newNetwork());
withNetworkAliases("kafka-" + Base58.randomString(6));
withExposedPorts(KAFKA_PORT);
withExposedPorts(exposedPort);

// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + exposedPort + ",BROKER://0.0.0.0:9092");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
withEnv("KAFKA_BROKER_ID", brokerId);
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicRf + "");
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicRf + "");
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicRf + "");
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicRf + "");
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
}
Expand Down Expand Up @@ -83,7 +96,7 @@ protected void doStart() {
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

port = getMappedPort(KAFKA_PORT);
port = getMappedPort(this.exposedPort);

final String zookeeperConnect;
if (externalZookeeperConnect != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.testcontainers.containers;

import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.testcontainers.containers.KafkaContainer.CONFLUENT_PLATFORM_VERSION;

/**
* Provides an easy way to launch a Kafka cluster with multiple brokers.
*/
public class KafkaContainerCluster implements AutoCloseable {

private final int startPort;
private final Network network;
private final GenericContainer zookeeper;
private final Collection<KafkaContainer> brokers;

public KafkaContainerCluster(int brokersNum, int internalTopicsRf, int startPort) {
this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf, startPort);
}

public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf, int startPort) {
if (brokersNum < 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
throw new IllegalArgumentException("internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0");
}

this.startPort = startPort;

this.network = Network.newNetwork();

this.zookeeper = new GenericContainer("confluentinc/cp-zookeeper:" + confluentPlatformVersion)
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT));

this.brokers = IntStream
.range(0, brokersNum)
.mapToObj(brokerNum ->
new KafkaContainer(confluentPlatformVersion, this.startPort + brokerNum, String.valueOf(brokerNum), internalTopicsRf)
.withNetwork(this.network)
.withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT)
)
.collect(Collectors.toList());
}

public Network getNetwork() {
return this.network;
}

public GenericContainer getZooKeeper() {
return this.zookeeper;
}

public Collection<KafkaContainer> getBrokers() {
return this.brokers;
}

public String getBootstrapServers() {
return brokers.stream()
.map(KafkaContainer::getBootstrapServers)
.collect(Collectors.joining(","));
}

private Stream<GenericContainer> allContainers() {
Stream<GenericContainer> genericBrokers = this.brokers.stream().map(b -> (GenericContainer) b);
Stream<GenericContainer> zookeeper = Stream.of(this.zookeeper);
return Stream.concat(genericBrokers, zookeeper);
}

public void startAll() {
allContainers().parallel().forEach(GenericContainer::start);
}

public void stopAll() {
allContainers().parallel().forEach(GenericContainer::stop);
}

@Override
public void close() throws Exception {
this.stopAll();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.testcontainers.containers;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -10,16 +14,19 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;

import java.util.Arrays;
import java.util.UUID;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;
import static org.testcontainers.containers.KafkaContainer.CONFLUENT_PLATFORM_VERSION;

public class KafkaContainerTest {

Expand Down Expand Up @@ -68,8 +75,60 @@ public void testExternalZookeeperWithExternalNetwork() throws Exception {
}
}

@Test
public void testMultiKafkaBrokerCluster() throws Exception {
try (
Network network = Network.newNetwork();

KafkaContainer kafka1 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9093, "1", 2)
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181");
KafkaContainer kafka2 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9193, "2", 2)
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181");
KafkaContainer kafka3 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9293, "3", 2)
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181");

GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:" + CONFLUENT_PLATFORM_VERSION)
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
) {
Stream.of(kafka1, kafka2, kafka3, zookeeper).parallel().forEach(GenericContainer::start);
String bootstrapServers = Stream
.of(kafka1, kafka2, kafka3)
.map(KafkaContainer::getBootstrapServers)
.collect(Collectors.joining(","));

testKafkaFunctionality(bootstrapServers, 3, 2);
}
}

@Test
public void testKafkaContainerCluster() throws Exception {
try (
KafkaContainerCluster cluster = new KafkaContainerCluster(CONFLUENT_PLATFORM_VERSION, 3, 2, 9093)
) {
cluster.startAll();
String bootstrapServers = cluster.getBootstrapServers();

assertThat(cluster.getBrokers()).hasSize(3);

testKafkaFunctionality(bootstrapServers, 3, 2);
}
}

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}

protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers
));

KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
Expand All @@ -90,12 +149,16 @@ protected void testKafkaFunctionality(String bootstrapServers) throws Exception
);
) {
String topicName = "messages";
consumer.subscribe(Arrays.asList(topicName));

Collection<NewTopic> topics = Stream.of(new NewTopic(topicName, partitions, (short) rf)).collect(Collectors.toList());
adminClient.createTopics(topics).all().get();

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
ConsumerRecords<String, String> records = consumer.poll(100);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
Expand Down

0 comments on commit 904782e

Please sign in to comment.