Skip to content

Commit

Permalink
StrimziKafkaCluster KRaft mode (#89)
Browse files Browse the repository at this point in the history
* StrimziKafkaCluster KRaft ready

Signed-off-by: see-quick <maros.orsak159@gmail.com>

* finally its working!

Signed-off-by: see-quick <maros.orsak159@gmail.com>

* code sweep a little bit

Signed-off-by: see-quick <maros.orsak159@gmail.com>

* fix single-node

Signed-off-by: see-quick <maros.orsak159@gmail.com>

---------

Signed-off-by: see-quick <maros.orsak159@gmail.com>
  • Loading branch information
see-quick authored Oct 16, 2024
1 parent 0f91bb7 commit 408a2a2
Show file tree
Hide file tree
Showing 3 changed files with 413 additions and 52 deletions.
117 changes: 86 additions & 31 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -37,17 +38,19 @@ public class StrimziKafkaCluster implements KafkaContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaCluster.class);

// instance attributes
private int brokersNum;
private int internalTopicReplicationFactor;
private final int brokersNum;
private final int internalTopicReplicationFactor;
private Map<String, String> additionalKafkaConfiguration;
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;
private String kafkaVersion;
private boolean enableKraft;

// not editable
private final Network network;
private final StrimziZookeeperContainer zookeeper;
private StrimziZookeeperContainer zookeeper;
private Collection<KafkaContainer> brokers;
private String clusterId;

/**
* Constructor for @StrimziKafkaCluster class, which allows you to specify number of brokers @see{brokersNum},
Expand Down Expand Up @@ -141,12 +144,16 @@ private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration;
this.proxyContainer = builder.proxyContainer;
this.kafkaVersion = builder.kafkaVersion;
this.enableKraft = builder.enableKRaft;
this.clusterId = builder.clusterId;

validateBrokerNum(this.brokersNum);
validateInternalTopicReplicationFactor(this.internalTopicReplicationFactor, this.brokersNum);

this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);
if (this.isZooKeeperBasedKafkaCluster()) {
this.zookeeper = new StrimziZookeeperContainer()
.withNetwork(this.network);
}

if (this.proxyContainer != null) {
this.proxyContainer.setNetwork(this.network);
Expand All @@ -162,6 +169,11 @@ private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfig
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(internalTopicReplicationFactor));
defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(internalTopicReplicationFactor));

if (this.isKraftKafkaCluster()) {
// we have to configure quorum voters but also we simplify process because we use network aliases (i.e., broker-<id>)
this.configureQuorumVoters(additionalKafkaConfiguration);
}

if (additionalKafkaConfiguration != null) {
defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
}
Expand All @@ -172,15 +184,27 @@ private void prepareKafkaCluster(final Map<String, String> additionalKafkaConfig
.mapToObj(brokerId -> {
LOGGER.info("Starting broker with id {}", brokerId);
// adding broker id for each kafka container
KafkaContainer kafkaContainer = new StrimziKafkaContainer()
StrimziKafkaContainer kafkaContainer = new StrimziKafkaContainer()
.withBrokerId(brokerId)
.withKafkaConfigurationMap(defaultKafkaConfigurationForMultiNode)
.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT)
.withNetwork(this.network)
.withProxyContainer(proxyContainer)
.withNetworkAliases("broker-" + brokerId)
.withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion)
.dependsOn(this.zookeeper);
.withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion);

// if it's ZK-based Kafka cluster we depend on ZK container and we need to specify external ZK connect
if (this.isZooKeeperBasedKafkaCluster()) {
kafkaContainer.withExternalZookeeperConnect("zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT)
.dependsOn(this.zookeeper);
} else {
kafkaContainer
// if KRaft we need to enable it
.withKraft()
// One must set `node.id` to the same value as `broker.id` if we use KRaft mode
.withNodeId(brokerId)
// pass shared `cluster.id` to each broker
.withClusterId(this.clusterId)
.waitForRunning();
}

LOGGER.info("Started broker with id: {}", kafkaContainer);

Expand Down Expand Up @@ -208,6 +232,8 @@ public static class StrimziKafkaClusterBuilder {
private ToxiproxyContainer proxyContainer;
private boolean enableSharedNetwork;
private String kafkaVersion;
private boolean enableKRaft;
private String clusterId;

/**
* Sets the number of Kafka brokers in the cluster.
Expand Down Expand Up @@ -280,12 +306,20 @@ public StrimziKafkaClusterBuilder withKafkaVersion(String kafkaVersion) {
return this;
}

public StrimziKafkaClusterBuilder withKraftEnabled() {
this.enableKRaft = true;
return this;
}

/**
* Builds and returns a {@code StrimziKafkaCluster} instance based on the provided configurations.
*
* @return a new instance of {@code StrimziKafkaCluster}
*/
public StrimziKafkaCluster build() {
// Generate a single cluster ID, which will be shared by all brokers
this.clusterId = UUID.randomUUID().toString();

return new StrimziKafkaCluster(this);
}
}
Expand Down Expand Up @@ -319,6 +353,14 @@ public String getBootstrapServers() {
.collect(Collectors.joining(","));
}

public boolean isZooKeeperBasedKafkaCluster() {
return !this.enableKraft;
}

public boolean isKraftKafkaCluster() {
return this.enableKraft;
}

/* test */ int getInternalTopicReplicationFactor() {
return this.internalTopicReplicationFactor;
}
Expand All @@ -331,6 +373,15 @@ public String getBootstrapServers() {
return this.additionalKafkaConfiguration;
}

private void configureQuorumVoters(final Map<String, String> additionalKafkaConfiguration) {
// Construct controller.quorum.voters based on network aliases (broker-1, broker-2, etc.)
final String quorumVoters = IntStream.range(0, this.brokersNum)
.mapToObj(brokerId -> String.format("%d@" + StrimziKafkaContainer.NETWORK_ALIAS_PREFIX + "%d:9094", brokerId, brokerId))
.collect(Collectors.joining(","));

additionalKafkaConfiguration.put("controller.quorum.voters", quorumVoters);
}

@Override
public void start() {
Stream<KafkaContainer> startables = this.brokers.stream();
Expand All @@ -341,31 +392,35 @@ public void start() {
e.printStackTrace();
}

Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(),
() -> {
Container.ExecResult result;
try {
result = this.zookeeper.execInContainer(
"sh", "-c",
"bin/zookeeper-shell.sh zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1"
);
String brokers = result.getStdout();

LOGGER.info("Running Kafka brokers: {}", result.getStdout());

return brokers != null && brokers.split(",").length == this.brokersNum;
} catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
return false;
}
});
if (this.isZooKeeperBasedKafkaCluster()) {
Utils.waitFor("Kafka brokers nodes to be connected to the ZooKeeper", Duration.ofSeconds(5).toMillis(), Duration.ofMinutes(1).toMillis(),
() -> {
Container.ExecResult result;
try {
result = this.zookeeper.execInContainer(
"sh", "-c",
"bin/zookeeper-shell.sh zookeeper:" + StrimziZookeeperContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1"
);
String brokers = result.getStdout();

LOGGER.info("Running Kafka brokers: {}", result.getStdout());

return brokers != null && brokers.split(",").length == this.brokersNum;
} catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
return false;
}
});
}
}

@Override
public void stop() {
// firstly we shut-down zookeeper -> reason: 'On the command line if I kill ZK first it sometimes prevents a broker from shutting down quickly.'
this.zookeeper.stop();
if (this.isZooKeeperBasedKafkaCluster()) {
// firstly we shut-down zookeeper -> reason: 'On the command line if I kill ZK first it sometimes prevents a broker from shutting down quickly.'
this.zookeeper.stop();
}

// stop all kafka containers in parallel
this.brokers.stream()
Expand Down
Loading

0 comments on commit 408a2a2

Please sign in to comment.