Skip to content

Commit

Permalink
fix multi-node
Browse files Browse the repository at this point in the history
Signed-off-by: see-quick <maros.orsak159@gmail.com>
  • Loading branch information
see-quick committed Oct 31, 2024
1 parent 2e8843e commit d6e8113
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
*/
public enum AuthenticationType {

/**
* No authentication.
*/
NONE,

/**
* OAuth authentication over plain text.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public void start() {
for (KafkaContainer kafkaContainer : this.brokers) {
Container.ExecResult result = ((StrimziKafkaContainer) kafkaContainer).execInContainer(
"bash", "-c",
"bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9093 describe --status"
"bin/kafka-metadata-quorum.sh --bootstrap-server localhost:" + StrimziKafkaContainer.INTER_BROKER_LISTENER_PORT + " describe --status"
);
String output = result.getStdout();

Expand Down
46 changes: 20 additions & 26 deletions src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class StrimziKafkaContainer extends GenericContainer<StrimziKafkaContaine
* Prefix for network aliases.
*/
protected static final String NETWORK_ALIAS_PREFIX = "broker-";
protected static final int INTER_BROKER_LISTENER_PORT = 9091;

/**
* Lazy image name provider
Expand Down Expand Up @@ -109,7 +110,7 @@ public class StrimziKafkaContainer extends GenericContainer<StrimziKafkaContaine
private String saslUsername;
private String saslPassword;

private AuthenticationType authenticationType;
private AuthenticationType authenticationType = AuthenticationType.NONE;

/**
* Image name is specified lazily automatically in {@link #doStart()} method
Expand Down Expand Up @@ -243,8 +244,7 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo,
final String[] listenersConfig = this.buildListenersConfig(containerInfo);
final Properties defaultServerProperties = this.buildDefaultServerProperties(
listenersConfig[0],
listenersConfig[1],
listenersConfig[2]);
listenersConfig[1]);
final String serverPropertiesWithOverride = this.overrideProperties(defaultServerProperties, this.kafkaConfigurationMap);

// copy override file to the container
Expand Down Expand Up @@ -308,7 +308,6 @@ private String extractListenerName(String bootstrapServers) {
* @return An array containing:
* The 'listeners' configuration string.
* The 'advertised.listeners' configuration string.
* The 'listener.security.protocol.map' configuration string.
*/
private String[] buildListenersConfig(final InspectContainerResponse containerInfo) {
final String bootstrapServers = getBootstrapServers();
Expand All @@ -317,16 +316,14 @@ private String[] buildListenersConfig(final InspectContainerResponse containerIn
final List<String> advertisedListenersNames = new ArrayList<>();
final StringBuilder kafkaListeners = new StringBuilder();
final StringBuilder advertisedListeners = new StringBuilder();
final StringBuilder kafkaListenerSecurityProtocol = new StringBuilder();

// add first PLAINTEXT listener
advertisedListeners.append(bootstrapServers);
kafkaListeners.append(bsListenerName).append(":").append("//").append("0.0.0.0").append(":").append(KAFKA_PORT).append(",");
kafkaListenerSecurityProtocol.append(bsListenerName).append(":").append("PLAINTEXT").append(",");
this.listenerNames.add(bsListenerName);

int listenerNumber = 1;
int portNumber = 9091;
int portNumber = INTER_BROKER_LISTENER_PORT;

// configure advertised listeners
for (ContainerNetwork network : networks) {
Expand All @@ -342,21 +339,15 @@ private String[] buildListenersConfig(final InspectContainerResponse containerIn
portNumber--;
}

portNumber = 9091;
portNumber = INTER_BROKER_LISTENER_PORT;

// configure listeners and security.protocol.map
// configure listeners
for (String listener : advertisedListenersNames) {
// listeners
kafkaListeners
.append(listener)
.append("://0.0.0.0:")
.append(portNumber)
.append(",");
// listener.security.protocol.map
kafkaListenerSecurityProtocol
.append(listener)
.append(":PLAINTEXT")
.append(",");
this.listenerNames.add(listener);
portNumber--;
}
Expand All @@ -366,16 +357,14 @@ private String[] buildListenersConfig(final InspectContainerResponse containerIn
// adding Controller listener for Kraft mode
// (wildcard address for multi-node setup; that way we other nodes can connect and communicate between each other)
kafkaListeners.append(controllerListenerName).append("://0.0.0.0:9094");
kafkaListenerSecurityProtocol.append(controllerListenerName).append(":PLAINTEXT");
this.listenerNames.add(controllerListenerName);
}

LOGGER.info("This is all advertised listeners for Kafka {}", advertisedListeners);

return new String[] {
kafkaListeners.toString(),
advertisedListeners.toString(),
kafkaListenerSecurityProtocol.toString()
advertisedListeners.toString()
};
}

Expand Down Expand Up @@ -414,12 +403,10 @@ private String randomUuid() {
*
* @param listeners the listeners configuration
* @param advertisedListeners the advertised listeners configuration
* @param listenerSecurityProtocolMap the listener security protocol map
* @return the default server properties
*/
private Properties buildDefaultServerProperties(final String listeners,
final String advertisedListeners,
final String listenerSecurityProtocolMap) {
final String advertisedListeners) {
// Default properties for server.properties
Properties properties = new Properties();

Expand All @@ -428,7 +415,7 @@ private Properties buildDefaultServerProperties(final String listeners,
properties.setProperty("inter.broker.listener.name", "BROKER1");
properties.setProperty("broker.id", String.valueOf(this.brokerId));
properties.setProperty("advertised.listeners", advertisedListeners);
properties.setProperty("listener.security.protocol.map", listenerSecurityProtocolMap);
properties.setProperty("listener.security.protocol.map", this.configureListenerSecurityProtocolMap("PLAINTEXT"));
properties.setProperty("num.network.threads", "3");
properties.setProperty("num.io.threads", "8");
properties.setProperty("socket.send.buffer.bytes", "102400");
Expand All @@ -450,14 +437,21 @@ private Properties buildDefaultServerProperties(final String listeners,
properties.setProperty("controller.quorum.voters", String.format("%d@" + NETWORK_ALIAS_PREFIX + this.nodeId + ":9094", this.nodeId));
properties.setProperty("controller.listener.names", "CONTROLLER");

// configure OAuth if enabled
if (this.isOAuthEnabled()) {
if (this.authenticationType != AuthenticationType.NONE) {
switch (this.authenticationType) {
case OAUTH_OVER_PLAIN:
configureOAuthOverPlain(properties);
if (this.isOAuthEnabled()) {
configureOAuthOverPlain(properties);
} else {
throw new IllegalStateException("OAuth2 is not enabled: " + this.oauthEnabled);
}
break;
case OAUTH_BEARER:
configureOAuthBearer(properties);
if (this.isOAuthEnabled()) {
configureOAuthBearer(properties);
} else {
throw new IllegalStateException("OAuth2 is not enabled: " + this.oauthEnabled);
}
break;
case SCRAM_SHA_256:
case SCRAM_SHA_512:
Expand Down

0 comments on commit d6e8113

Please sign in to comment.