Skip to content

Commit

Permalink
Check for cluster formation using zookeeper-shell script
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Nov 11, 2019
1 parent 26f1881 commit f0a3dd3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package org.testcontainers.containers;

import com.google.common.collect.ImmutableMap;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.core.command.ExecStartResultCallback;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

import java.io.ByteArrayOutputStream;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -28,6 +27,7 @@ public class KafkaContainerCluster implements Startable {
private final Network network;
private final GenericContainer zookeeper;
private final Collection<KafkaContainer> brokers;
private final DockerClient dockerClient = DockerClientFactory.instance().client();

public KafkaContainerCluster(int brokersNum, int internalTopicsRf) {
this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf);
Expand Down Expand Up @@ -96,20 +96,31 @@ public void start() {
Stream<Startable> startables = this.brokers.stream().map(b -> (Startable) b);
Startables.deepStart(startables).get(60, SECONDS);

AdminClient adminClient = AdminClient.create(ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()
));
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> this.brokers.stream().findFirst()
.map(this::clusterBrokers)
.filter(brokers -> {
System.out.println("Found broker str: " + brokers);
return brokers.split(",").length == this.brokersNum;
})
.isPresent());
}

// wait for cluster to form
// TODO: this require the `AdminClient`. Maybe we could use Kafka bin scripts instead so we don't need to include the `kafka-clients` dep
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {
DescribeClusterResult cluster = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(5000));
return cluster.nodes().get().size() == this.brokersNum;
});
@SneakyThrows
private String clusterBrokers(KafkaContainer c) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
dockerClient
.execStartCmd(
dockerClient.execCreateCmd(c.getContainerId()).withAttachStdout(true)
.withCmd("sh", "-c", "zookeeper-shell zookeeper:" + KafkaContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1").exec().getId()
)
.exec(new ExecStartResultCallback(outputStream, null))
.awaitCompletion();
return outputStream.toString();
}

@Override
public void stop() {
// TODO: is there an inverse of `deepStart` to stop all dependant containers? stopping zookeeper didn't seem to stop brokers alone.
allContainers().parallel().forEach(GenericContainer::stop);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -14,14 +13,14 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;

import java.time.Duration;
import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down

0 comments on commit f0a3dd3

Please sign in to comment.