Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix KeycloakRBACAuthorizer to work with StandardAuthorizer in KRAFT mode #188

Merged
merged 21 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .travis/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ elif [[ "$arch" != 'ppc64le' ]]; then
exitIfError

clearDockerEnv
mvn -e -V -B clean install -f testsuite -Pkafka-3_2_3
mvn -e -V -B clean install -f testsuite -Pkafka-3_2_3 -DfailIfNoTests=false -Dtest=\!KeycloakKRaftAuthorizationTests
EXIT=$?
exitIfError

# Excluded by default to not exceed Travis job timeout
if [ "SKIP_DISABLED" == "false" ]; then
if [ "$SKIP_DISABLED" == "false" ]; then
clearDockerEnv
mvn -e -V -B clean install -f testsuite -Pkafka-3_1_2
mvn -e -V -B clean install -f testsuite -Pkafka-3_1_2 -DfailIfNoTests=false -Dtest=\!KeycloakKRaftAuthorizationTests,\!KeycloakZKAuthorizationTests
EXIT=$?
exitIfError

clearDockerEnv
mvn -e -V -B clean install -f testsuite -Pkafka-3_0_0
mvn -e -V -B clean install -f testsuite -Pkafka-3_0_0 -DfailIfNoTests=false -Dtest=\!KeycloakKRaftAuthorizationTests,\!KeycloakZKAuthorizationTests
EXIT=$?
exitIfError

clearDockerEnv
mvn -e -V -B clean install -f testsuite -Pkafka-2_8_1
mvn -e -V -B clean install -f testsuite -Pkafka-2_8_1 -DfailIfNoTests=false -Dtest=\!KeycloakKRaftAuthorizationTests,\!KeycloakZKAuthorizationTests
EXIT=$?
exitIfError
fi
Expand Down
134 changes: 77 additions & 57 deletions README.md

Large diffs are not rendered by default.

94 changes: 94 additions & 0 deletions examples/docker/kafka-oauth-strimzi/compose-authz-kraft.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
version: '3.5'

services:

#################################### KAFKA BROKER ####################################
kafka:
image: strimzi/example-kafka
build: kafka-oauth-strimzi/kafka/target
container_name: kafka
ports:
- 9091:9091
- 9092:9092

# javaagent debug port
#- 5006:5006
command:
- /bin/bash
- -c
- cd /opt/kafka && ./start.sh --kraft

environment:

# Java Debug
#KAFKA_DEBUG: y
#DEBUG_SUSPEND_FLAG: y
#JAVA_DEBUG_PORT: 5006

#
# KAFKA Configuration
#
LOG_DIR: /home/kafka/logs
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_NODE_ID: "1"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9091"
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL: PLAIN

KAFKA_LISTENERS: "CONTROLLER://kafka:9091,CLIENT://kafka:9092"
KAFKA_ADVERTISED_LISTENERS: "CLIENT://kafka:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT"

KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER

KAFKA_PRINCIPAL_BUILDER_CLASS: io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder

KAFKA_LISTENER_NAME_CONTROLLER_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-password\" user_admin=\"admin-password\" user_bobby=\"bobby-secret\" ;"

KAFKA_LISTENER_NAME_CLIENT_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_CONNECTIONS_MAX_REAUTH_MS: 3600000

#
# Strimzi OAuth Configuration
#

# Authentication config
OAUTH_CLIENT_ID: "kafka"
OAUTH_CLIENT_SECRET: "kafka-secret"
OAUTH_TOKEN_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-kafka-authz}/protocol/openid-connect/token"

# Validation config
OAUTH_VALID_ISSUER_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-kafka-authz}"
OAUTH_JWKS_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-kafka-authz}/protocol/openid-connect/certs"
#OAUTH_INTROSPECTION_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-kafka-authz}/protocol/openid-connect/token/introspect"

# username extraction from JWT token claim
OAUTH_USERNAME_CLAIM: preferred_username

# timeouts / refresh config
OAUTH_JWKS_REFRESH_MIN_PAUSE_SECONDS: "5"
OAUTH_CONNECT_TIMEOUT_SECONDS: "20"
OAUTH_READ_TIMEOUT_SECONDS: "20"

# Keycloak authorization
KAFKA_AUTHORIZER_CLASS_NAME: io.strimzi.kafka.oauth.server.authorizer.KeycloakAuthorizer

KAFKA_STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME: my-cluster
KAFKA_STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL: "true"
KAFKA_STRIMZI_AUTHORIZATION_GRANTS_REFRESH_POOL_SIZE: "4"
KAFKA_STRIMZI_AUTHORIZATION_GRANTS_REFRESH_PERIOD_SECONDS: "60"

KAFKA_SUPER_USERS: "User:admin;User:service-account-kafka-broker"

# For start.sh script to know where the keycloak is listening
KEYCLOAK_HOST: ${KEYCLOAK_HOST:-keycloak}
REALM: ${REALM:-kafka-authz}
2 changes: 1 addition & 1 deletion examples/docker/kafka-oauth-strimzi/compose-authz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ services:

KAFKA_AUTHORIZER_CLASS_NAME: io.strimzi.kafka.oauth.server.OAuthSessionAuthorizer
KAFKA_PRINCIPAL_BUILDER_CLASS: io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder
KAFKA_STRIMZI_AUTHORIZER_DELEGATE_CLASS_NAME: io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer
KAFKA_STRIMZI_AUTHORIZER_DELEGATE_CLASS_NAME: io.strimzi.kafka.oauth.server.authorizer.KeycloakAuthorizer

KAFKA_STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME: my-cluster
KAFKA_STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL: "true"
Expand Down
81 changes: 81 additions & 0 deletions examples/docker/kafka-oauth-strimzi/compose-kraft.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
version: '3.5'

services:

#################################### KAFKA BROKER ####################################
kafka:
image: strimzi/example-kafka
build: kafka-oauth-strimzi/kafka/target
container_name: kafka
ports:
- 9091:9091
- 9092:9092

# javaagent debug port
#- 5005:5005
command:
- /bin/bash
- -c
- cd /opt/kafka && ./start.sh --kraft

environment:

# Java Debug
#KAFKA_DEBUG: y
#DEBUG_SUSPEND_FLAG: y
#JAVA_DEBUG_PORT: 5005

#
# KAFKA Configuration
#
LOG_DIR: /home/kafka/logs
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_NODE_ID: "1"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9091"
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL: PLAIN

KAFKA_LISTENERS: "CONTROLLER://kafka:9091,CLIENT://kafka:9092"
KAFKA_ADVERTISED_LISTENERS: "CLIENT://kafka:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT"

KAFKA_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER

KAFKA_PRINCIPAL_BUILDER_CLASS: "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder"

KAFKA_LISTENER_NAME_CONTROLLER_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-password\" user_admin=\"admin-password\" user_bobby=\"bobby-secret\" ;"

KAFKA_LISTENER_NAME_CLIENT_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
KAFKA_LISTENER_NAME_CLIENT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler

KAFKA_SUPER_USERS: "User:admin,User:service-account-kafka-broker"

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1


#
# Strimzi OAuth Configuration
#

# Authentication config
OAUTH_CLIENT_ID: "kafka-broker"
OAUTH_CLIENT_SECRET: "kafka-broker-secret"
OAUTH_TOKEN_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-demo}/protocol/openid-connect/token"

# Validation config
OAUTH_VALID_ISSUER_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-demo}"
OAUTH_JWKS_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-demo}/protocol/openid-connect/certs"
#OAUTH_INTROSPECTION_ENDPOINT_URI: "http://${KEYCLOAK_HOST:-keycloak}:8080/auth/realms/${REALM:-demo}/protocol/openid-connect/token/introspect"


# username extraction from JWT token claim
OAUTH_USERNAME_CLAIM: preferred_username
OAUTH_CONNECT_TIMEOUT_SECONDS: "20"

# For start.sh script to know where the keycloak is listening
KEYCLOAK_HOST: ${KEYCLOAK_HOST:-keycloak}
REALM: ${REALM:-demo}
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
log4j.logger.org.apache.zookeeper=INFO

# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
log4j.logger.kafka=DEBUG
log4j.logger.org.apache.kafka=DEBUG
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

# Control Strimzi OAuth logging
log4j.logger.io.strimzi=TRACE
log4j.logger.io.strimzi=DEBUG

# Change to DEBUG or TRACE to enable request logging
log4j.logger.kafka.request.logger=WARN, requestAppender
Expand Down
33 changes: 25 additions & 8 deletions examples/docker/kafka-oauth-strimzi/kafka/simple_kafka_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,37 @@ done
#
# Generate output
#
echo "#"
echo "# strimzi.properties"
echo "#"

echo broker.id=`pop_value broker.id 0`
if [[ "$1" == "--kraft" ]]; then
#
# Output kraft version of server.properties
#
echo "#"
echo "# strimzi.properties (kraft)"
echo "#"

echo process.roles=`pop_value process.roles broker,controller`
echo node.id=`pop_value node.id 1`
echo log.dirs=`pop_value log.dirs /tmp/kraft-combined-logs`

elif [[ "$1" == "" ]]; then
echo "#"
echo "# strimzi.properties"
echo "#"

echo broker.id=`pop_value broker.id 0`
echo log.dirs=`pop_value log.dirs /tmp/kafka-logs`
echo group.initial.rebalance.delay.ms=`pop_value group.initial.rebalance.delay.ms 0`
else
echo "Unsupported argument: $1"
exit 1
fi

echo num.network.threads=`pop_value num.network.threads 3`
echo num.io.threads=`pop_value num.io.threads 8`
echo socket.send.buffer.bytes=`pop_value socket.send.buffer.bytes 102400`
echo socket.receive.buffer.bytes=`pop_value socket.receive.buffer.bytes 102400`
echo socket.request.max.bytes=`pop_value socket.request.max.bytes 104857600`
echo log.dirs=`pop_value log.dirs /tmp/kafka-logs`
echo num.partitions=`pop_value num.partitions 1`
echo num.recovery.threads.per.data.dir=`pop_value num.recovery.threads.per.data.dir 1`
echo offsets.topic.replication.factor=`pop_value offsets.topic.replication.factor 1`
Expand All @@ -71,9 +91,6 @@ echo transaction.state.log.min.isr=`pop_value transaction.state.log.min.isr 1`
echo log.retention.hours=`pop_value log.retention.hours 168`
echo log.segment.bytes=`pop_value log.segment.bytes 1073741824`
echo log.retention.check.interval.ms=`pop_value log.retention.check.interval.ms 300000`
echo zookeeper.connect=`pop_value zookeeper.connect localhost:2181`
echo zookeeper.connection.timeout.ms=`pop_value zookeeper.connection.timeout.ms 6000`
echo group.initial.rebalance.delay.ms=`pop_value group.initial.rebalance.delay.ms 0`

#
# Add what remains of KAFKA_* env vars
Expand Down
7 changes: 6 additions & 1 deletion examples/docker/kafka-oauth-strimzi/kafka/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ wait_for_url "$URI/realms/${REALM:-demo}" "Waiting for realm '${REALM}' to be av

if [ "$SERVER_PROPERTIES_FILE" == "" ]; then
echo "Generating a new strimzi.properties file using ENV vars"
./simple_kafka_config.sh | tee /tmp/strimzi.properties
./simple_kafka_config.sh $1 | tee /tmp/strimzi.properties
else
echo "Using provided server.properties file: $SERVER_PROPERTIES_FILE"
cp $SERVER_PROPERTIES_FILE /tmp/strimzi.properties
fi

if [[ "$1" == "--kraft" ]]; then
KAFKA_CLUSTER_ID="$(/opt/kafka/bin/kafka-storage.sh random-uuid)"
/opt/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /tmp/strimzi.properties
fi

# add Strimzi kafka-oauth-* jars and their dependencies to classpath
export CLASSPATH="/opt/kafka/libs/strimzi/*:$CLASSPATH"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,39 @@
*/
package io.strimzi.kafka.oauth.common;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Set;

/**
* This extension of OAuthBearerToken provides a way to associate any additional information with the token
* at run time, that is cached for the duration of the client session.
*
* <p>
* This class is the only notion of client session that we can get. Kafka code holds on to it for as long as the session is alive,
* and then the object can be garbage collected.
*
* <p>
* Successful re-authentication starts a new session without disconnecting the current connection, avoiding the need to re-establish
* any existing TLS connection for example.
*
* <p>
* Token is instantiated during authentication, but the 'payload' methods can be accessed later by custom extensions.
* For example, it can be used by a custom authorizer to cache a parsed JWT token payload or to cache authorization grants for current session.
*/
public interface BearerTokenWithPayload extends OAuthBearerToken {

/**
* Get the usage dependent object previously associated with this instance by calling {@link BearerTokenWithPayload#setPayload(Object)}
* Get the usage dependent object previously associated with this instance by calling {@link BearerTokenWithPayload#setPayload(com.fasterxml.jackson.databind.JsonNode)}
*
* @return The associated object
*/
Object getPayload();
JsonNode getPayload();

/**
* Associate a usage dependent object with this instance
*
* @param payload The object to associate with this instance
*/
void setPayload(Object payload);
void setPayload(JsonNode payload);

/**
* Get groups associated with this token (principal).
Expand All @@ -50,7 +51,7 @@ public interface BearerTokenWithPayload extends OAuthBearerToken {
*
* @return Token content / details as a JSON object
*/
ObjectNode getJSON();
ObjectNode getClaimsJSON();

/**
* This method returns an id of the current instance of this object.
Expand Down
Loading