KoP supports authentication and SSL connection.
By default, KoP is installed with no encryption, authentication, and authorization. You can enable the authentication feature for KoP to improve security. Currently, this feature is only available in Java.
KoP authentication mechanism uses Kafka SASL mechanisms and achieves authentication with Pulsar token-based authentication mechanism. Consequently, if you want to enable the authentication feature for KoP, you need to enable authentication for the following components:
- Pulsar brokers
- KoP (some configurations of KoP rely on the configurations of Pulsar brokers)
- Kafka clients
Currently, KoP supports the following SASL mechanisms:
They are both based on the JWT authentication, so you must configure authenticationProviders
with AuthenticationProviderToken
. See following chapters for more details.
If you want to enable the authentication feature for KoP using the PLAIN
mechanism, follow the steps below.
-
Enable authentication on Pulsar broker.
For the
PLAIN
mechanism, the Kafka authentication is forwarded to the JWT authentication of Pulsar, so you need to configure the JWT authentication and set the following properties in theconf/broker.conf
orconf/standalone.conf
file.(1) Enable authentication for the Pulsar broker.
authenticationEnabled=true authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
(2) Enable authentication between Pulsar broker and KoP.
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken brokerClientAuthenticationParameters=token:<token-of-super-user-role> superUserRoles=<super-user-roles>
(3) Specify the key.
For more information, see Enable Token Authentication on Brokers.
-
If you use a secret key, set the property as below.
tokenSecretKey=file:///path/to/secret.key
The key can also be passed inline.
tokenSecretKey=data:;base64,FLFyW0oLJ2Fi22KKCm21J18mbAdztfSHN/lAT5ucEKU=
-
If you use a public/private key, set the property as below.
tokenPublicKey=file:///path/to/public.key
-
-
Enable authentication on KoP.
Set the following property in the
conf/broker.conf
orconf/standalone.conf
file.saslAllowedMechanisms=PLAIN
-
Enable authentication on Kafka client.
To forward your credentials,
SASL-PLAIN
is used on the Kafka client side. To enableSASL-PLAIN
, you need to set the following properties through Kafka JAAS.
Property | Description | Example value |
---|---|---|
username |
The username of Kafka JAAS is tenant/namespace or tenant , where Kafka’s topics are stored in Pulsar. Note In KoP 2.9.0 or higher, the username can be used with kafkaEnableMultiTenantMetadata to implement multi-tenancy for metadata. |
empty string |
password |
password must be your token authentication parameters from Pulsar.The token can be created by Pulsar token tools. The role is the subject for the token. It is embedded in the created token and the broker can get role by parsing this token. |
token:xxx |
```properties
security.protocol=SASL_PLAINTEXT # or security.protocol=SASL_SSL if SSL connection is used
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="public/default" password="token:xxx";
```
If you want to enable the authentication feature for KoP using the OAUTHBEARER
mechanism, follow the steps below.
-
Enable authentication on Pulsar broker.
Set the following properties in the
conf/broker.conf
orconf/standalone.conf
file.The properties here are same to that of the
PLAIN
mechanism exceptbrokerClientAuthenticationPlugin
andbrokerClientAuthenticationParameters
.Property Description Required or optional Example value type
OAuth 2.0 authentication type
The default value isclient_credentials
Optional client_credentials
privateKey
URL to a JSON credential file
The following pattern formats are supported:
-file:///path/to/file
-file:/path/to/file
-data:application/json;base64,<base64-encoded value>
Required file:///path/to/credentials_file.json issuerUrl
URL of the authentication provider which allows the Pulsar client to obtain an access token Required https://accounts.google.com
audience
An OAuth 2.0 "resource server" identifier for the Pulsar cluster Optional https://broker.example.com
scope
The scope of the access request that is expressed as a list of space-delimited, case-sensitive strings Optional api://pulsar-cluster-1/.default
authenticationEnabled=true authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken superUserRoles=<super-user-roles> brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 brokerClientAuthenticationParameters={"type":"client_credentials","privateKey":"file:///path/to/credentials_file.json","issuerUrl":"<issuer-url>","audience":"<audience>"} tokenPublicKey=<token-public-key>
-
Enable authentication on KoP.
(1) Set the following property in the
conf/broker.conf
orconf/standalone.conf
file.saslAllowedMechanisms=OAUTHBEARER
(2) Specify the Kafka server callback handler.
For the
OAUTHBEARER
mechanism, you can useAuthenticationProviderToken
or custom your authentication provider to process the access tokens from OAuth 2.0 server.KoP provides a built-in
AuthenticateCallbackHandler
that uses the authentication provider of Pulsar for authentication. You need to configure the following properties in the Pulsar broker's configuration file (e.g.conf/broker.conf
)# Use KoP's built-in handler kopOauth2AuthenticateCallbackHandler=io.streamnative.pulsar.handlers.kop.security.oauth.OauthValidatorCallbackHandler # OauthValidatorCallbackHandler configuration file (Java Properties format) kopOauth2ConfigFile=conf/kop-handler.properties
(3) Specify the Authentication Method name of the provider (that is, oauth.validate.method
) in the conf/kop-handler.properties
file. By default, it uses the token
authentication method (AuthenticationProviderToken
).
-
If you use
AuthenticationProviderToken
, setoauth.validate.method
totoken
(sinceAuthenticationProviderToken#getAuthMethodName()
returnstoken
). -
If you use other providers, set the
oauth.validate.method
as the result ofgetAuthMethodName()
of yourAuthenticationProvider
. For example, if your authentication provider in Pulsar isAuthenticationProviderAthenz
, then set the following:oauth.validate.method=athenz
as it has the following code in it:
```java
@Override
public String getAuthMethodName() {
return "athenz";
}
```
-
Enable authentication on Kafka client.
(1) Install the KoP built-in callback handler to your local Maven repository.
To get an access token from an OAuth 2.0 server, you need to use the KoP built-in callback handler instead of the Kafka login callback handler.
mvn clean install -pl oauth-client -DskipTests
(2) Add the following dependencies to the
pom.xml
file.For stable releases, the
pulsar.version
is the same to thekop.version
.<dependency> <groupId>io.streamnative.pulsar.handlers</groupId> <artifactId>oauth-client</artifactId> <version>${kop.version}</version> </dependency>
(3) Configure the producer or consumer with the following required properties.
Property Description Example value Note sasl.login.callback.handler.class
Class of SASL login callback handler io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler Set the value of this property as the same to the value in the example below. security.protocol
Security protocol SASL_PLAINTEXT sasl.mechanism
SASL mechanism OAUTHBEARER sasl.jaas.config
JAAS configuration org.apache.kafka.coPropertymmon.security.oauthbearer.OAuthBearerLoginModule oauth.issuer.url
URL of the authentication provider which allows the Pulsar client to obtain an access token. https://accounts.google.com This property is the same to the issuerUrl property in Pulsar client credentials oauth.credentials.url
URL to a JSON credentials file.
The following pattern formats are supported:
- file:///path/to/file
- file:/path/to/file
- data:application/json;base64,[base64-encoded value]file:///path/to/credentials_file.json This property is the same to the privateKey property in Pulsar client credentials oauth.audience
OAuth 2.0 "resource server" identifier for the Pulsar cluster. https://broker.example.com This property is the same to the audience property in Pulsar client credentials oauth.scope
The scope of the access request that is expressed as a list of space-delimited, case-sensitive strings. api://pulsar-cluster-1/.default This property is the same to the scope property in Pulsar client credentials sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler security.protocol=SASL_PLAINTEXT # or security.protocol=SASL_SSL if SSL connection is used sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \ required oauth.issuer.url="https://accounts.google.com"\ oauth.credentials.url="file:///path/to/credentials_file.json"\ oauth.audience="https://broker.example.com";
(4) Config the credentials_file.json. The
client_id
andclient_secret
is required fields. And thetenant
andgroup_id
is optional fields. When usegroup_id
field and setkafkaEnableAuthorizationForceGroupIdCheck=true
, then the client will only able to use this group id to consumer.{ "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant", "group_id": "my-group-id" }
KoP supports Confluent's Schema Registry since 2.11. See schema.md for a quick start.
When KoP enables the authentication, the Schema Registry also requires the authentication from the Kafka client. You need to set the following properties on the client side.
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=<tenant>:<token>
Since KoP reuses Pulsar's authentication providers for authentication, you can enable KoP's authentication and Pulsar authentication at the same time.
For example, you can enable KoP's PLAIN
SASL mechanism and Pulsar's TLS authentication simultaneously.
superUserRoles=admin
authenticationEnabled=true
# Enable both AuthenticationProviderToken and AuthenticationProviderTls here
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken,org.apache.pulsar.broker.authentication.AuthenticationProviderTls
# Enable JWT authentication at Pulsar side
tokenPublicKey=/path/to/my-public.key
# Enable PLAIN SASL mechanism for KoP, which reuses the JWT authentication
saslAllowedMechanisms=PLAIN
# Enable TLS authentication at Pulsar side
tlsEnabled=true
brokerServicePortTls=6651
webServicePortTls=8443
tlsRequireTrustedClientCertOnConnect=true
tlsCertificateFilePath=/path/to/broker.cert.pem
tlsKeyFilePath=/path/to/broker.key-pk8.pem
tlsTrustCertsFilePath=/path/to/ca.cert.pem
# Configure broker's built-in client
brokerClientTlsEnabled=true
brokerClientTlsTrustCertsFilePath=/path/to/ca.cert.pem
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
brokerClientAuthenticationParameters=tlsCertFile:/path/to/admin.cert.pem,tlsKeyFile:/path/to/my-ca/admin.key-pk8.pem
Note
tlsEnabled
is actually not required to enable TLS authentication at the broker side. However, for some legacy versions of KoP, you have to enable it.The following versions of KoP don't need to enable
tlsEnabled
:
- Pulsar 3.x.y: KoP 3.0.0.1 or later
- Pulsar 2.11.x: KoP 2.11.1.2 or later
- Pulsar 2.10.x: KoP 2.10.4.3 or later
See Transport Encryption using TLS and Authentication using TLS for how to generate certificates and keys for TLS authentication.
To enable authorization on KoP, please make sure the authentication is enabled.
Note: For more information, see Authorization.
-
Enable authorization and assign superusers for the Pulsar broker.
authorizationEnabled=true
-
Generate JWT tokens.
A token is the credential associated with a user. The association is done through the "
principal
" or "role
". In the case of JWT tokens, this field is typically referred assubject
, though they are exactly the same concept. Then, you need to use this command to require the generated token to have asubject
field set.$ bin/pulsar tokens create --secret-key file:///path/to/secret.key \ --subject <user-role>
This command prints the token string on stdout.
-
Grant permission to specific role.
The token itself does not have any permission associated. The authorization engine determines whether the token should have permissions or not. Once you have created the token, you can grant permission for this token to do certain actions.
The following is an example.$ bin/pulsar-admin --auth-plugin "org.apache.pulsar.client.impl.auth.AuthenticationToken" --auth-params "token:<token-of-super-user-role>" \ namespaces grant-permission <tenant>/<namespace> \ --role <user-role> \ --actions produce,consume
KoP supports the following configuration types for Kafka listeners:
- PLAINTEXT
- SSL
Example
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
Tip For how to configure SSL keys, see Kafka SSL.
The following example shows how to connect KoP through SSL.
-
Create SSL related keys.
This example creates the related CA and JKS files.
# You need to input a password, for example "server-keystore". keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey # You need to input a password, for example "server". openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 # You need to input a password, for example "server-truststore" keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert # You need to input a password, for example "client-truststore" keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert # You must input the password of server.keystore.jks: "server-keystore" keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file # NOTE: the password followed by `-passin pass:` is the password of ca-cert: "server" openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:server # You must input the password of server.keystore.jks: "server-keystore" keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert # You must input the password of server.keystore.jks: "server-keystore" keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
In above example, we have input four passwords:
server-keystore
forserver.keystore.jks
fileserver
forca-cert
andca-key
filesserver-truststore
forserver.truststore.jks
fileclient-truststore
forclient.truststore.jks
file
-
Configure the KoP broker.
In the StreamNative Platform configuration file (
${PLATFORM_HOME}/etc/pulsar/broker.conf
or${PLATFORM_HOME}/etc/pulsar/standalone.conf
), add the related configurations that using the jks configurations created in Step 1:listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 # You need to use the full path of server.keystore.jks kopSslKeystoreLocation=server.keystore.jks kopSslKeystorePassword=server-keystore kopSslKeyPassword=server-keystore # You need to use the full path of server.truststore.jks kopSslTruststoreLocation=server.truststore.jks kopSslTruststorePassword=server-truststore
-
Configure the Kafka client.
(1) Prepare a file named
client-ssl.properties
. The file contains the following information.security.protocol=SSL # You need to use the full path of client.truststore.jks ssl.truststore.location=client.truststore.jks ssl.truststore.password=client-truststore # The identification algorithm must be empty ssl.endpoint.identification.algorithm=
(2) Verify the console-producer and the console-consumer.
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
Tip For more information, see Configure Kafka client.