Skip to content

Commit

Permalink
Add ssl listeners and sslCtxRefresher in channelInitializer (apache#45)
Browse files Browse the repository at this point in the history
Add ssl listeners and sslCtxRefresher in channelInitializer
  • Loading branch information
jiazhai authored and sijie committed Nov 13, 2019
1 parent 117f367 commit 3424abc
Show file tree
Hide file tree
Showing 18 changed files with 871 additions and 111 deletions.
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,60 @@ bin/java-consumer-demo.sh
```
bin/java-producer-demo.sh
```

#### SSL Connection

KOP support Kafka listeners config of type "PLAINTEXT" and "SSL".
You could set config like `listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093`.
Please reference [Kafka SSL document](https://kafka.apache.org/documentation/#security_ssl) for how to config SSL keys.
Here is some steps that you need to be able to connect KOP through SSL.

1. create SSL related Keys.

Here is an example of a bash script to create related CA and jks files.
```access transformers
#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
```

2. config KOP Broker.

In configration file, e.g. [`kop_standalone.conf`](https://github.com/streamnative/kop/blob/master/conf/kop_standalone.conf),
Add related configurations that using the jks configs that create in step1:
```access transformers
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
kopSslKeystoreLocation=/Users/kop/server.keystore.jks
kopSslKeystorePassword=test1234
kopSslKeyPassword=test1234
kopSslTruststoreLocation=/Users/kop/server.truststore.jks
kopSslTruststorePassword=test1234
```

3. config kafka clients

This is similar to [Kafka client config doc](https://kafka.apache.org/documentation/#security_configclients).

Prepare a file named `client-ssl.properties`, which contains:
```
security.protocol=SSL
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=test1234
ssl.endpoint.identification.algorithm=
```

And verify us console-producer and console-consumer:
```access transformers
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
```
2 changes: 1 addition & 1 deletion conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

### --- Kafka broker settings --- ###

kafkaServicePort=9092

enableGroupCoordinator=true

messagingProtocols=kafka

listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
### --- General broker settings --- ###

# Zookeeper quorum connection string
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/io/streamnative/kop/KafkaChannelInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,25 @@
*/
package io.streamnative.kop;

import static io.streamnative.kop.KafkaProtocolHandler.TLS_HANDLER;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
import io.streamnative.kop.coordinator.group.GroupCoordinator;
import io.streamnative.kop.utils.ssl.SSLUtils;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
import org.eclipse.jetty.util.ssl.SslContextFactory;

/**
* A channel initializer that initialize channels for kafka protocol.
*/
public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {

static final int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB
public static final int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB

@Getter
private final PulsarService pulsarService;
Expand All @@ -36,9 +41,10 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final KafkaTopicManager kafkaTopicManager;
@Getter
private final GroupCoordinator groupCoordinator;
// TODO: handle TLS -- https://github.com/streamnative/kop/issues/2
// can turn into get this config from kafkaConfig.
@Getter
private final boolean enableTls;
@Getter
private final SslContextFactory sslContextFactory;

public KafkaChannelInitializer(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
Expand All @@ -51,14 +57,24 @@ public KafkaChannelInitializer(PulsarService pulsarService,
this.kafkaTopicManager = kafkaTopicManager;
this.groupCoordinator = groupCoordinator;
this.enableTls = enableTLS;

if (enableTls) {
sslContextFactory = SSLUtils.createSslContextFactory(kafkaConfig);
} else {
sslContextFactory = null;
}
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (this.enableTls) {
ch.pipeline().addLast(TLS_HANDLER, new SslHandler(SSLUtils.createSslEngine(sslContextFactory)));
}
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig, kafkaTopicManager, groupCoordinator));
new KafkaRequestHandler(pulsarService, kafkaConfig, kafkaTopicManager, groupCoordinator, enableTls));
}

}
105 changes: 92 additions & 13 deletions src/main/java/io/streamnative/kop/KafkaProtocolHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
Expand All @@ -37,6 +36,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand All @@ -58,6 +58,19 @@
public class KafkaProtocolHandler implements ProtocolHandler {

public static final String PROTOCOL_NAME = "kafka";
public static final String SSL_PREFIX = "SSL://";
public static final String PLAINTEXT_PREFIX = "PLAINTEXT://";
public static final String LISTENER_DEL = ",";
public static final String TLS_HANDLER = "tls";
public static final String LISTENER_PATTEN = "^(PLAINTEXT?|SSL)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

/**
* Kafka Listener Type.
*/
public enum ListenerType {
PLAINTEXT,
SSL
}

@Getter
private KafkaServiceConfiguration kafkaConfig;
Expand All @@ -67,6 +80,9 @@ public class KafkaProtocolHandler implements ProtocolHandler {
private KafkaTopicManager kafkaTopicManager;
@Getter
private GroupCoordinator groupCoordinator;
@Getter
private String bindAddress;


@Override
public String protocolName() {
Expand All @@ -88,12 +104,16 @@ public void initialize(ServiceConfiguration conf) throws Exception {
// when loaded with PulsarService as NAR, `conf` will be type of ServiceConfiguration
kafkaConfig = ConfigurationUtils.create(conf.getProperties(), KafkaServiceConfiguration.class);
}
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(kafkaConfig.getBindAddress());
}

// This method is called after initialize
@Override
public String getProtocolDataToAdvertise() {
// TODO: support data register, when do https://github.com/streamnative/kop/issues/2
return "mock-data-for-kafka";
if (log.isDebugEnabled()) {
log.debug("Get configured listeners", kafkaConfig.getListeners());
}
return kafkaConfig.getListeners();
}

@Override
Expand All @@ -114,30 +134,50 @@ public void start(BrokerService service) {
}
}

// this is called after init, and with kafkaTopicManager, kafkaConfig, brokerService all set.
// this is called after initialize, and with kafkaTopicManager, kafkaConfig, brokerService all set.
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
checkState(kafkaConfig != null);
checkState(kafkaConfig.getListeners() != null);
checkState(brokerService != null);
checkState(kafkaTopicManager != null);
if (kafkaConfig.isEnableGroupCoordinator()) {
checkState(groupCoordinator != null);
}

Optional<Integer> port = kafkaConfig.getKafkaServicePort();
InetSocketAddress addr = new InetSocketAddress(brokerService.pulsar().getBindAddress(), port.get());
String listeners = kafkaConfig.getListeners();
String[] parts = listeners.split(LISTENER_DEL);

try {
Map<InetSocketAddress, ChannelInitializer<SocketChannel>> initializerMap =
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder()
.put(addr,
(new KafkaChannelInitializer(brokerService.pulsar(),
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder();

for (String listener: parts) {
if (listener.startsWith(PLAINTEXT_PREFIX)) {
builder.put(
// TODO: consider using the address in the listener as the bind address.
// https://github.com/streamnative/kop/issues/46
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new KafkaChannelInitializer(brokerService.pulsar(),
kafkaConfig,
kafkaTopicManager,
groupCoordinator,
false));
} else if (listener.startsWith(SSL_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new KafkaChannelInitializer(brokerService.pulsar(),
kafkaConfig,
kafkaTopicManager,
groupCoordinator,
false)))
.build();
return initializerMap;
true));
} else {
log.error("Kafka listener {} not supported. supports {} and {}",
listener, PLAINTEXT_PREFIX, SSL_PREFIX);
}
}

return builder.build();
} catch (Exception e){
log.error("KafkaProtocolHandler newChannelInitializers failed with", e);
return null;
Expand Down Expand Up @@ -256,4 +296,43 @@ private String createKafkaOffsetsTopic(BrokerService service) throws PulsarServe

return offsetsTopic;
}

public static int getListenerPort(String listener) {
checkState(listener.matches(LISTENER_PATTEN), "listener not match patten");

int lastIndex = listener.lastIndexOf(':');
return Integer.parseInt(listener.substring(lastIndex + 1));
}

public static int getListenerPort(String listeners, ListenerType type) {
String[] parts = listeners.split(LISTENER_DEL);

for (String listener: parts) {
if (type == ListenerType.PLAINTEXT && listener.startsWith(PLAINTEXT_PREFIX)) {
return getListenerPort(listener);
}
if (type == ListenerType.SSL && listener.startsWith(SSL_PREFIX)) {
return getListenerPort(listener);
}
}

log.error("KafkaProtocolHandler listeners {} not contains type {}", listeners, type);
return -1;
}

public static String getBrokerUrl(String listeners, Boolean tlsEnabled) {
String[] parts = listeners.split(LISTENER_DEL);

for (String listener: parts) {
if (tlsEnabled && listener.startsWith(SSL_PREFIX)) {
return listener;
}
if (!tlsEnabled && listener.startsWith(PLAINTEXT_PREFIX)) {
return listener;
}
}

log.error("listener {} not contains a valid SSL or PLAINTEXT address", listeners);
return null;
}
}
Loading

0 comments on commit 3424abc

Please sign in to comment.