Skip to content

Commit

Permalink
marko review update
Browse files Browse the repository at this point in the history
Signed-off-by: see-quick <maros.orsak159@gmail.com>
  • Loading branch information
see-quick committed Oct 30, 2024
1 parent a3a5fcb commit 2e8843e
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ public enum AuthenticationType {
*/
OAUTH_BEARER,

/**
* Mutual TLS authentication.
*/
MUTUAL_TLS,

/**
* SCRAM-SHA-256 authentication.
*/
Expand Down
246 changes: 128 additions & 118 deletions src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -92,14 +95,15 @@ public class StrimziKafkaContainer extends GenericContainer<StrimziKafkaContaine
private ToxiproxyClient toxiproxyClient;
private Proxy proxy;

private Set<String> listenerNames = new HashSet<>();

// OAuth fields
private boolean oauthEnabled;
private String keycloakRealm;
private String realm;
private String clientId;
private String clientSecret;
private String keycloakOauthUri;
private String oauthPreferredUsername;
private List<String> superUsers;
private String oauthUri;
private String usernameClaim;

// OAuth over PLAIN
private String saslUsername;
Expand Down Expand Up @@ -170,12 +174,12 @@ protected void doStart() {

if (this.isOAuthEnabled()) {
// Set OAuth environment variables (using properties does not propagate to System properties)
this.addEnv("OAUTH_JWKS_ENDPOINT_URI", this.keycloakOauthUri + "/realms/" + this.keycloakRealm + "/protocol/openid-connect/certs");
this.addEnv("OAUTH_VALID_ISSUER_URI", this.keycloakOauthUri + "/realms/" + this.keycloakRealm);
this.addEnv("OAUTH_JWKS_ENDPOINT_URI", this.oauthUri + "/realms/" + this.realm + "/protocol/openid-connect/certs");
this.addEnv("OAUTH_VALID_ISSUER_URI", this.oauthUri + "/realms/" + this.realm);
this.addEnv("OAUTH_CLIENT_ID", this.clientId);
this.addEnv("OAUTH_CLIENT_SECRET", this.clientSecret);
this.addEnv("OAUTH_TOKEN_ENDPOINT_URI", this.keycloakOauthUri + "/realms/" + this.keycloakRealm + "/protocol/openid-connect/token");
this.addEnv("OAUTH_USERNAME_CLAIM", this.oauthPreferredUsername);
this.addEnv("OAUTH_TOKEN_ENDPOINT_URI", this.oauthUri + "/realms/" + this.realm + "/protocol/openid-connect/token");
this.addEnv("OAUTH_USERNAME_CLAIM", this.usernameClaim);
}

super.setCommand("sh", "-c", runStarterScript());
Expand Down Expand Up @@ -231,64 +235,16 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo,

LOGGER.info("Mapped port: {}", kafkaExposedPort);

final String bootstrapServers = getBootstrapServers();
final String bsListenerName = extractListenerName(bootstrapServers);

StringBuilder advertisedListeners = new StringBuilder(bootstrapServers);

Collection<ContainerNetwork> cns = containerInfo.getNetworkSettings().getNetworks().values();

int advertisedListenerNumber = 1;
List<String> advertisedListenersNames = new ArrayList<>();

for (ContainerNetwork cn : cns) {
// must be always unique
final String advertisedName = "BROKER" + advertisedListenerNumber;
advertisedListeners.append(",").append(advertisedName).append("://").append(cn.getIpAddress()).append(":9093");
advertisedListenersNames.add(advertisedName);
advertisedListenerNumber++;
}

LOGGER.info("This is all advertised listeners for Kafka {}", advertisedListeners);

StringBuilder kafkaListeners = new StringBuilder();
StringBuilder kafkaListenerSecurityProtocol = new StringBuilder();

advertisedListenersNames.forEach(name -> {
// listeners
kafkaListeners
.append(name)
.append("://0.0.0.0:9093")
.append(",");
// listener.security.protocol.map
kafkaListenerSecurityProtocol
.append(name)
.append(":PLAINTEXT")
.append(",");
});

kafkaListeners.append(bsListenerName).append("://0.0.0.0:").append(KAFKA_PORT);
kafkaListenerSecurityProtocol.append("PLAINTEXT:PLAINTEXT");
if (!bsListenerName.equals("PLAINTEXT")) {
kafkaListenerSecurityProtocol.append(",").append(bsListenerName).append(":").append(bsListenerName);
}

if (this.useKraft) {
// adding Controller listener for Kraft mode
// (wildcard address for multi-node setup; that way we other nodes can connect and communicate between each other)
kafkaListeners.append(",").append("CONTROLLER://0.0.0.0:9094");
kafkaListenerSecurityProtocol.append(",").append("CONTROLLER:PLAINTEXT");
}

if (this.nodeId == null) {
LOGGER.warn("Node ID is not set. Using broker ID {} as the default node ID.", this.brokerId);
this.nodeId = this.brokerId;
}

final String[] listenersConfig = this.buildListenersConfig(containerInfo);
final Properties defaultServerProperties = this.buildDefaultServerProperties(
kafkaListeners.toString(),
advertisedListeners.toString(),
kafkaListenerSecurityProtocol.toString());
listenersConfig[0],
listenersConfig[1],
listenersConfig[2]);
final String serverPropertiesWithOverride = this.overrideProperties(defaultServerProperties, this.kafkaConfigurationMap);

// copy override file to the container
Expand Down Expand Up @@ -345,6 +301,84 @@ private String extractListenerName(String bootstrapServers) {
return strings[0];
}

/**
* Builds the listener configurations for the Kafka broker based on the container's network settings.
*
* @param containerInfo Container network information.
* @return An array containing:
* The 'listeners' configuration string.
* The 'advertised.listeners' configuration string.
* The 'listener.security.protocol.map' configuration string.
*/
private String[] buildListenersConfig(final InspectContainerResponse containerInfo) {
final String bootstrapServers = getBootstrapServers();
final String bsListenerName = extractListenerName(bootstrapServers);
final Collection<ContainerNetwork> networks = containerInfo.getNetworkSettings().getNetworks().values();
final List<String> advertisedListenersNames = new ArrayList<>();
final StringBuilder kafkaListeners = new StringBuilder();
final StringBuilder advertisedListeners = new StringBuilder();
final StringBuilder kafkaListenerSecurityProtocol = new StringBuilder();

// add first PLAINTEXT listener
advertisedListeners.append(bootstrapServers);
kafkaListeners.append(bsListenerName).append(":").append("//").append("0.0.0.0").append(":").append(KAFKA_PORT).append(",");
kafkaListenerSecurityProtocol.append(bsListenerName).append(":").append("PLAINTEXT").append(",");
this.listenerNames.add(bsListenerName);

int listenerNumber = 1;
int portNumber = 9091;

// configure advertised listeners
for (ContainerNetwork network : networks) {
String advertisedName = "BROKER" + listenerNumber;
advertisedListeners.append(",")
.append(advertisedName)
.append("://")
.append(network.getIpAddress())
.append(":")
.append(portNumber);
advertisedListenersNames.add(advertisedName);
listenerNumber++;
portNumber--;
}

portNumber = 9091;

// configure listeners and security.protocol.map
for (String listener : advertisedListenersNames) {
// listeners
kafkaListeners
.append(listener)
.append("://0.0.0.0:")
.append(portNumber)
.append(",");
// listener.security.protocol.map
kafkaListenerSecurityProtocol
.append(listener)
.append(":PLAINTEXT")
.append(",");
this.listenerNames.add(listener);
portNumber--;
}

if (this.useKraft) {
final String controllerListenerName = "CONTROLLER";
// adding Controller listener for Kraft mode
// (wildcard address for multi-node setup; that way we other nodes can connect and communicate between each other)
kafkaListeners.append(controllerListenerName).append("://0.0.0.0:9094");
kafkaListenerSecurityProtocol.append(controllerListenerName).append(":PLAINTEXT");
this.listenerNames.add(controllerListenerName);
}

LOGGER.info("This is all advertised listeners for Kafka {}", advertisedListeners);

return new String[] {
kafkaListeners.toString(),
advertisedListeners.toString(),
kafkaListenerSecurityProtocol.toString()
};
}

static String writeOverrideString(Map<String, String> kafkaConfigurationMap) {
StringBuilder kafkaConfiguration = new StringBuilder();
kafkaConfigurationMap.forEach((configName, configValue) ->
Expand Down Expand Up @@ -427,7 +461,6 @@ private Properties buildDefaultServerProperties(final String listeners,
break;
case SCRAM_SHA_256:
case SCRAM_SHA_512:
case MUTUAL_TLS:
case GSSAPI:
default:
throw new IllegalStateException("Unsupported authentication type: " + this.authenticationType);
Expand All @@ -453,30 +486,23 @@ private Properties buildDefaultServerProperties(final String listeners,
private void configureOAuthOverPlain(Properties properties) {
properties.setProperty("sasl.enabled.mechanisms", "PLAIN");
properties.setProperty("sasl.mechanism.inter.broker.protocol", "PLAIN");
properties.setProperty("listener.security.protocol.map", "PLAINTEXT:SASL_PLAINTEXT,BROKER1:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT");
properties.setProperty("listener.security.protocol.map", this.configureListenerSecurityProtocolMap("SASL_PLAINTEXT"));
properties.setProperty("sasl.mechanism.controller.protocol", "PLAIN");
properties.setProperty("principal.builder.class", "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder");

// Dynamically build the 'super.users' property
this.setSuperUsersIntoProperties(properties);

// Construct the JAAS configuration with configurable username and password
final String jaasConfig = String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
this.saslUsername,
this.saslPassword
);

properties.setProperty("listener.name.plaintext.plain.sasl.jaas.config", jaasConfig);
properties.setProperty("listener.name.controller.plain.sasl.jaas.config", jaasConfig);
properties.setProperty("listener.name.broker1.plain.sasl.jaas.config", jaasConfig);

// Callback handler classes
final String callbackHandler = "io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler";

properties.setProperty("listener.name.plaintext.plain.sasl.server.callback.handler.class", callbackHandler);
properties.setProperty("listener.name.broker1.plain.sasl.server.callback.handler.class", callbackHandler);
properties.setProperty("listener.name.controller.plain.sasl.server.callback.handler.class", callbackHandler);
for (String listenerName : this.listenerNames) {
properties.setProperty("listener.name." + listenerName.toLowerCase(Locale.ROOT) + ".plain.sasl.jaas.config", jaasConfig);
properties.setProperty("listener.name." + listenerName.toLowerCase(Locale.ROOT) + ".plain.sasl.server.callback.handler.class", callbackHandler);
}
}

/**
Expand All @@ -487,12 +513,10 @@ private void configureOAuthOverPlain(Properties properties) {
private void configureOAuthBearer(Properties properties) {
properties.setProperty("sasl.enabled.mechanisms", "OAUTHBEARER");
properties.setProperty("sasl.mechanism.inter.broker.protocol", "OAUTHBEARER");
properties.setProperty("listener.security.protocol.map", "PLAINTEXT:SASL_PLAINTEXT,BROKER1:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT");
properties.setProperty("listener.security.protocol.map", this.configureListenerSecurityProtocolMap("SASL_PLAINTEXT"));
properties.setProperty("sasl.mechanism.controller.protocol", "OAUTHBEARER");
properties.setProperty("principal.builder.class", "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder");

this.setSuperUsersIntoProperties(properties);

// Construct JAAS configuration for OAUTHBEARER
final String jaasConfig = String.format(
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
Expand All @@ -502,27 +526,30 @@ private void configureOAuthBearer(Properties properties) {
"oauth.username.claim=\"%s\";",
this.clientId,
this.clientSecret,
this.keycloakOauthUri + "/realms/" + this.keycloakRealm + "/protocol/openid-connect/token",
this.oauthPreferredUsername // e.g., "preferred_username"
this.oauthUri + "/realms/" + this.realm + "/protocol/openid-connect/token",
this.usernameClaim // e.g., "preferred_username"
);
// Set JAAS config for each listener
properties.setProperty("listener.name.plaintext.oauthbearer.sasl.jaas.config", jaasConfig);
properties.setProperty("listener.name.broker1.oauthbearer.sasl.jaas.config", jaasConfig);
properties.setProperty("listener.name.controller.oauthbearer.sasl.jaas.config", jaasConfig);

// Define Callback Handlers for OAUTHBEARER
final String serverCallbackHandler = "io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler";

properties.setProperty("listener.name.plaintext.oauthbearer.sasl.server.callback.handler.class", serverCallbackHandler);
properties.setProperty("listener.name.broker1.oauthbearer.sasl.server.callback.handler.class", serverCallbackHandler);
properties.setProperty("listener.name.controller.oauthbearer.sasl.server.callback.handler.class", serverCallbackHandler);

final String clientSideCallbackHandler = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";

// Optionally, define client-side callback handlers if using inter-broker communication
properties.setProperty("listener.name.plaintext.oauthbearer.sasl.login.callback.handler.class", clientSideCallbackHandler);
properties.setProperty("listener.name.broker1.oauthbearer.sasl.login.callback.handler.class", clientSideCallbackHandler);
properties.setProperty("listener.name.controller.oauthbearer.sasl.login.callback.handler.class", clientSideCallbackHandler);
for (final String listenerName : this.listenerNames) {
properties.setProperty("listener.name." + listenerName.toLowerCase(Locale.ROOT) + ".oauthbearer.sasl.jaas.config", jaasConfig);
properties.setProperty("listener.name." + listenerName.toLowerCase(Locale.ROOT) + ".oauthbearer.sasl.server.callback.handler.class", serverCallbackHandler);
properties.setProperty("listener.name." + listenerName.toLowerCase(Locale.ROOT) + ".oauthbearer.sasl.login.callback.handler.class", clientSideCallbackHandler);
}
}

/**
* Configures the listener.security.protocol.map property based on the listenerNames set and the given security protocol.
*
* @param securityProtocol The security protocol to map each listener to (e.g., PLAINTEXT, SASL_PLAINTEXT).
* @return The listener.security.protocol.map configuration string.
*/
private String configureListenerSecurityProtocolMap(String securityProtocol) {
return this.listenerNames.stream()
.map(listenerName -> listenerName + ":" + securityProtocol)
.collect(Collectors.joining(","));
}

/**
Expand Down Expand Up @@ -550,20 +577,6 @@ private String overrideProperties(Properties defaultProperties, Map<String, Stri
return writer.toString();
}

/**
* Sets the 'super.users' property in the Kafka server properties.
*
* @param properties The Kafka server properties.
*/
private void setSuperUsersIntoProperties(Properties properties) {
if (this.superUsers != null && !this.superUsers.isEmpty()) {
String superUsersProperty = this.superUsers.stream()
.map(user -> "User:" + user)
.collect(Collectors.joining(";"));
properties.setProperty("super.users", superUsersProperty);
}
}

/**
* Retrieves the internal ZooKeeper connection string.
*
Expand Down Expand Up @@ -739,27 +752,24 @@ public StrimziKafkaContainer withProxyContainer(final ToxiproxyContainer proxyCo
/**
* Fluent method to configure OAuth settings.
*
* @param keycloakRealm the Keycloak realm
* @param realm the realm
* @param clientId the OAuth client ID
* @param clientSecret the OAuth client secret
* @param keycloakOAuthUri the Keycloak OAuth URI
* @param oauthPreferredUsername the preferred username claim for OAuth
* @param superUsers the list of super users
* @param oauthUri the OAuth URI
* @param usernameClaim the preferred username claim for OAuth
* @return {@code StrimziKafkaContainer} instance
*/
public StrimziKafkaContainer withOAuthConfig(final String keycloakRealm,
public StrimziKafkaContainer withOAuthConfig(final String realm,
final String clientId,
final String clientSecret,
final String keycloakOAuthUri,
final String oauthPreferredUsername,
final List<String> superUsers) {
final String oauthUri,
final String usernameClaim) {
this.oauthEnabled = true;
this.keycloakRealm = keycloakRealm;
this.realm = realm;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.keycloakOauthUri = keycloakOAuthUri;
this.oauthPreferredUsername = oauthPreferredUsername;
this.superUsers = superUsers;
this.oauthUri = oauthUri;
this.usernameClaim = usernameClaim;
return self();
}

Expand Down
Loading

0 comments on commit 2e8843e

Please sign in to comment.