Skip to content

Commit

Permalink
Kafka cluster example (#1984, #3758)
Browse files Browse the repository at this point in the history
Co-authored-by: Sean Glover <sean@seanglover.com>
  • Loading branch information
bsideup and seglo authored Feb 6, 2021
1 parent 73da361 commit 625ddc3
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 2 deletions.
17 changes: 17 additions & 0 deletions examples/kafka-cluster/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
id 'java'
}

repositories {
jcenter()
}

dependencies {
testCompileOnly "org.projectlombok:lombok:1.18.10"
testAnnotationProcessor "org.projectlombok:lombok:1.18.10"
testCompile 'org.testcontainers:kafka'
testCompile 'org.apache.kafka:kafka-clients:2.3.1'
testCompile 'org.assertj:assertj-core:3.14.0'
testCompile 'com.google.guava:guava:23.0'
testCompile 'org.slf4j:slf4j-simple:1.7.30'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.example.kafkacluster;

import lombok.SneakyThrows;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Provides an easy way to launch a Kafka cluster with multiple brokers.
*/
public class KafkaContainerCluster implements Startable {

private final int brokersNum;
private final Network network;
private final GenericContainer<?> zookeeper;
private final Collection<KafkaContainer> brokers;

public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
if (brokersNum < 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
throw new IllegalArgumentException("internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0");
}

this.brokersNum = brokersNum;
this.network = Network.newNetwork();

this.zookeeper = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-zookeeper").withTag(confluentPlatformVersion))
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT));

this.brokers = IntStream
.range(0, this.brokersNum)
.mapToObj(brokerNum -> {
return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion))
.withNetwork(this.network)
.withNetworkAliases("broker-" + brokerNum)
.dependsOn(this.zookeeper)
.withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT)
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "");
})
.collect(Collectors.toList());
}

public Collection<KafkaContainer> getBrokers() {
return this.brokers;
}

public String getBootstrapServers() {
return brokers.stream()
.map(KafkaContainer::getBootstrapServers)
.collect(Collectors.joining(","));
}

private Stream<GenericContainer<?>> allContainers() {
return Stream.concat(
this.brokers.stream(),
Stream.of(this.zookeeper)
);
}

@Override
@SneakyThrows
public void start() {
Stream<Startable> startables = this.brokers.stream().map(Startable.class::cast);
Startables.deepStart(startables).get(60, SECONDS);

Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {
Container.ExecResult result = this.zookeeper.execInContainer(
"sh", "-c",
"zookeeper-shell zookeeper:" + KafkaContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1"
);
String brokers = result.getStdout();

return brokers != null && brokers.split(",").length == this.brokersNum;
});
}

@Override
public void stop() {
allContainers().parallel().forEach(GenericContainer::stop);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.example.kafkacluster;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

public class KafkaContainerClusterTest {

@Test
public void testKafkaContainerCluster() throws Exception {
try (
KafkaContainerCluster cluster = new KafkaContainerCluster("5.2.1", 3, 2)
) {
cluster.start();
String bootstrapServers = cluster.getBootstrapServers();

assertThat(cluster.getBrokers()).hasSize(3);

testKafkaFunctionality(bootstrapServers, 3, 2);
}
}

protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers
));

KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()
),
new StringSerializer(),
new StringSerializer()
);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
),
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages";

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
});

consumer.unsubscribe();
}
}

}
1 change: 1 addition & 0 deletions examples/settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ includeBuild '..'

// explicit include to allow Dependabot to autodiscover subprojects
include 'disque-job-queue'
include 'kafka-cluster'
include 'linked-container'
include 'mongodb-container'
include 'redis-backed-cache'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {

public static final int ZOOKEEPER_PORT = 2181;

private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";

private static final int PORT_NOT_ASSIGNED = -1;

protected String externalZookeeperConnect = null;
Expand Down Expand Up @@ -60,8 +62,10 @@ public KafkaContainer(final DockerImageName dockerImageName) {
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -110,7 +114,15 @@ public void testConfluentPlatformVersion6() throws Exception {
}

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}

protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers
));

KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
Expand All @@ -131,6 +143,10 @@ protected void testKafkaFunctionality(String bootstrapServers) throws Exception
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
Expand Down

0 comments on commit 625ddc3

Please sign in to comment.