Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 20, 2019
1 parent 904782e commit cfe9f72
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,42 @@
*
*/
public class KafkaContainer extends GenericContainer<KafkaContainer> {
public static final int DEFAULT_KAFKA_PORT = 9093;

public static final String DEFAULT_KAFKA_BROKER_ID = "1";

public static final int DEFAULT_INTERNAL_TOPIC_RF = 1;
public static final int KAFKA_PORT = 9093;

public static final int ZOOKEEPER_PORT = 2181;

public static final String DEFAULT_INTERNAL_TOPIC_RF = "1";

public static final String CONFLUENT_PLATFORM_VERSION = "5.2.1";

private static final int PORT_NOT_ASSIGNED = -1;

protected String externalZookeeperConnect = null;

private int port = PORT_NOT_ASSIGNED;
private int exposedPort;

public KafkaContainer() {
this(CONFLUENT_PLATFORM_VERSION);
}

public KafkaContainer(String confluentPlatformVersion) {
this(confluentPlatformVersion, DEFAULT_KAFKA_PORT, DEFAULT_KAFKA_BROKER_ID, DEFAULT_INTERNAL_TOPIC_RF);
}

public KafkaContainer(String confluentPlatformVersion, int exposedPort, String brokerId, int internalTopicRf) {
super(TestcontainersConfiguration.getInstance().getKafkaImage() + ":" + confluentPlatformVersion);

this.exposedPort = exposedPort;
// TODO Only for backward compatibility
withNetwork(Network.newNetwork());
withNetworkAliases("kafka-" + Base58.randomString(6));
withExposedPorts(exposedPort);
withExposedPorts(KAFKA_PORT);

// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + exposedPort + ",BROKER://0.0.0.0:9092");
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", brokerId);
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicRf + "");
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicRf + "");
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicRf + "");
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicRf + "");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
}
Expand Down Expand Up @@ -96,7 +87,7 @@ protected void doStart() {
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

port = getMappedPort(this.exposedPort);
port = getMappedPort(KAFKA_PORT);

final String zookeeperConnect;
if (externalZookeeperConnect != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
package org.testcontainers.containers;

import lombok.SneakyThrows;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testcontainers.containers.KafkaContainer.CONFLUENT_PLATFORM_VERSION;

/**
* Provides an easy way to launch a Kafka cluster with multiple brokers.
*/
public class KafkaContainerCluster implements AutoCloseable {
public class KafkaContainerCluster implements Startable {

private final int startPort;
private final Network network;
private final GenericContainer zookeeper;
private final Collection<KafkaContainer> brokers;

public KafkaContainerCluster(int brokersNum, int internalTopicsRf, int startPort) {
this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf, startPort);
public KafkaContainerCluster(int brokersNum, int internalTopicsRf) {
this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf);
}

public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf, int startPort) {
public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
if (brokersNum < 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
throw new IllegalArgumentException("internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0");
}

this.startPort = startPort;

this.network = Network.newNetwork();

this.zookeeper = new GenericContainer("confluentinc/cp-zookeeper:" + confluentPlatformVersion)
Expand All @@ -41,9 +43,16 @@ public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, in
this.brokers = IntStream
.range(0, brokersNum)
.mapToObj(brokerNum ->
new KafkaContainer(confluentPlatformVersion, this.startPort + brokerNum, String.valueOf(brokerNum), internalTopicsRf)
new KafkaContainer(confluentPlatformVersion)
.withNetwork(this.network)
.withNetworkAliases("broker-" + brokerNum)
.dependsOn(this.zookeeper)
.withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT)
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "")
)
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -72,16 +81,20 @@ private Stream<GenericContainer> allContainers() {
return Stream.concat(genericBrokers, zookeeper);
}

public void startAll() {
allContainers().parallel().forEach(GenericContainer::start);
@Override
@SneakyThrows
public void start() {
Stream<Startable> startables = this.brokers.stream().map(b -> (Startable) b);
Startables.deepStart(startables).get(60, SECONDS);
}

public void stopAll() {
@Override
public void stop() {
allContainers().parallel().forEach(GenericContainer::stop);
}

@Override
public void close() throws Exception {
this.stopAll();
public void close() {
this.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,42 +75,12 @@ public void testExternalZookeeperWithExternalNetwork() throws Exception {
}
}

@Test
public void testMultiKafkaBrokerCluster() throws Exception {
try (
Network network = Network.newNetwork();

KafkaContainer kafka1 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9093, "1", 2)
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181");
KafkaContainer kafka2 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9193, "2", 2)
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181");
KafkaContainer kafka3 = new KafkaContainer(CONFLUENT_PLATFORM_VERSION, 9293, "3", 2)
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181");

GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:" + CONFLUENT_PLATFORM_VERSION)
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
) {
Stream.of(kafka1, kafka2, kafka3, zookeeper).parallel().forEach(GenericContainer::start);
String bootstrapServers = Stream
.of(kafka1, kafka2, kafka3)
.map(KafkaContainer::getBootstrapServers)
.collect(Collectors.joining(","));

testKafkaFunctionality(bootstrapServers, 3, 2);
}
}

@Test
public void testKafkaContainerCluster() throws Exception {
try (
KafkaContainerCluster cluster = new KafkaContainerCluster(CONFLUENT_PLATFORM_VERSION, 3, 2, 9093)
KafkaContainerCluster cluster = new KafkaContainerCluster(CONFLUENT_PLATFORM_VERSION, 3, 2)
) {
cluster.startAll();
cluster.start();
String bootstrapServers = cluster.getBootstrapServers();

assertThat(cluster.getBrokers()).hasSize(3);
Expand Down

0 comments on commit cfe9f72

Please sign in to comment.