Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[security] broker to broker hostname verification #1983

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import static io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer.MAX_FRAME_LENGTH;
import static io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.TLS_HANDLER;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.util.ssl.SslContextFactory;

/**
Expand Down Expand Up @@ -50,13 +52,27 @@ public TransactionMarkerChannelInitializer(KafkaServiceConfiguration kafkaConfig

@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (this.enableTls) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(SSLUtils.createClientSslEngine(sslContextFactory)));
}
ch.pipeline().addLast(lengthFieldPrepender);
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("txnHandler", new TransactionMarkerChannelHandler(transactionMarkerChannelManager));
}

protected CompletableFuture<Channel> initTls(Channel ch, String host, int port) {
if (this.enableTls) {
CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
ch.eventLoop().execute(() -> {
try {
ch.pipeline().addFirst(TLS_HANDLER,
new SslHandler(SSLUtils.createClientSslEngine(sslContextFactory, host, port)));
initTlsFuture.complete(ch);
} catch (Throwable t) {
initTlsFuture.completeExceptionally(t);
}
});
return initTlsFuture;
} else {
return CompletableFuture.completedFuture(ch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class TransactionMarkerChannelManager {

private final Bootstrap bootstrap;

private final TransactionMarkerChannelInitializer transactionMarkerChannelInitializer;

private final Map<InetSocketAddress, CompletableFuture<TransactionMarkerChannelHandler>> handlerMap =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -182,7 +184,9 @@ public TransactionMarkerChannelManager(String tenant,
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
bootstrap.handler(new TransactionMarkerChannelInitializer(kafkaConfig, enableTls, this));
transactionMarkerChannelInitializer =
new TransactionMarkerChannelInitializer(kafkaConfig, enableTls, this);
bootstrap.handler(transactionMarkerChannelInitializer);
}

public CompletableFuture<TransactionMarkerChannelHandler> getChannel(InetSocketAddress socketAddress) {
Expand All @@ -192,7 +196,10 @@ public CompletableFuture<TransactionMarkerChannelHandler> getChannel(InetSocketA
ensureDrainQueuedTransactionMarkersActivity();
return handlerMap.computeIfAbsent(socketAddress, address -> {
CompletableFuture<TransactionMarkerChannelHandler> handlerFuture = new CompletableFuture<>();
ChannelFutures.toCompletableFuture(bootstrap.connect(socketAddress))
ChannelFutures.toCompletableFuture(bootstrap.register())
.thenCompose(ch -> transactionMarkerChannelInitializer
.initTls(ch, socketAddress.getHostString(), socketAddress.getPort()))
.thenCompose(ch -> ChannelFutures.toCompletableFuture(ch.connect(socketAddress)))
.thenAccept(channel -> {
handlerFuture.complete(
(TransactionMarkerChannelHandler) channel.pipeline().get("txnHandler"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,10 @@ public static SSLEngine createSslEngine(SslContextFactory.Server sslContextFacto
return engine;
}

public static SSLEngine createClientSslEngine(SslContextFactory.Client sslContextFactory) throws Exception {
public static SSLEngine createClientSslEngine(SslContextFactory.Client sslContextFactory,
String host, int port) throws Exception {
sslContextFactory.start();
SSLEngine engine = sslContextFactory.newSSLEngine();
SSLEngine engine = sslContextFactory.newSSLEngine(host, port);
engine.setUseClientMode(true);

return engine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,22 @@ public KafkaSSLChannelTest(final String entryFormat, boolean withCertHost) {
* @param withCertHost the keystore with certHost or not.
*/
private void setSslConfigurations(boolean withCertHost) {
String path = "./src/test/resources/ssl/certificate" + (withCertHost ? "2" : "") + "/";
if (!withCertHost) {
String path = "./src/test/resources/ssl/certificate" + (withCertHost ? "" : "2") + "/";
if (withCertHost) {
this.kopSslKeystoreLocation = path + "broker.keystore.jks";
this.kopSslKeystorePassword = "broker";
this.kopSslTruststoreLocation = path + "broker.truststore.jks";
this.kopSslTruststorePassword = "broker";
this.kopClientTruststoreLocation = path + "broker.truststore.jks";
this.kopClientTruststorePassword = "broker";
} else {
this.kopSslKeystoreLocation = path + "server.keystore.jks";
this.kopSslKeystorePassword = "server";
this.kopSslTruststoreLocation = path + "server.truststore.jks";
this.kopSslTruststorePassword = "server";
kopClientTruststorePassword = "client";
kopClientTruststoreLocation = path + "client.truststore.jks";
}
kopClientTruststoreLocation = path + "client.truststore.jks";
kopClientTruststorePassword = "client";
}

@Factory
Expand All @@ -111,6 +113,10 @@ public static Object[] instances() {
}

protected void sslSetUpForBroker() throws Exception {

// require TLS verification when hostname is on certificate
conf.setTlsHostnameVerificationEnabled(withCertHost);

conf.setKafkaTransactionCoordinatorEnabled(true);
conf.setKopTlsEnabledWithBroker(true);
conf.setKopSslKeystoreType("JKS");
Expand Down Expand Up @@ -153,7 +159,7 @@ public void testKafkaProduceSSL() throws Exception {
String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_";

@Cleanup
SslProducer kProducer = new SslProducer(topicName, getKafkaBrokerPortTls(),
SslProducer kProducer = new SslProducer(topicName, getKafkaBrokerPortTls(), withCertHost,
kopClientTruststoreLocation, kopClientTruststorePassword);

for (int i = 0; i < totalMsgs; i++) {
Expand Down Expand Up @@ -188,7 +194,8 @@ public static class SslProducer implements Closeable {
private final KafkaProducer<Integer, String> producer;
private final String topic;

public SslProducer(String topic, int port, String truststoreLocation, String truststorePassword) {
public SslProducer(String topic, int port, boolean withCertHost, String truststoreLocation,
String truststorePassword) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + port);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducerSSL");
Expand All @@ -201,7 +208,7 @@ public SslProducer(String topic, int port, String truststoreLocation, String tru
props.put("ssl.truststore.password", truststorePassword);

// default is https, here need to set empty.
props.put("ssl.endpoint.identification.algorithm", "");
props.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : "");

producer = new KafkaProducer<>(props);
this.topic = topic;
Expand Down Expand Up @@ -233,7 +240,7 @@ public void basicProduceAndConsumeWithTxTest() throws Exception {
producerProps.put("ssl.truststore.password", kopClientTruststorePassword);

// default is https, here need to set empty.
producerProps.put("ssl.endpoint.identification.algorithm", "");
producerProps.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : "");

@Cleanup
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
Expand Down Expand Up @@ -292,7 +299,7 @@ private void consumeTxData(String kafkaServer, String topicName, String isolatio
consumerProps.put("ssl.truststore.password", kopClientTruststorePassword);

// default is https, here need to set empty.
consumerProps.put("ssl.endpoint.identification.algorithm", "");
consumerProps.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : "");

@Cleanup
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase;
import java.io.Closeable;
import java.util.Properties;
Expand Down Expand Up @@ -47,14 +46,11 @@ public class KafkaSSLChannelWithClientAuthTest extends KopProtocolHandlerTestBas
static {
final HostnameVerifier defaultHostnameVerifier = javax.net.ssl.HttpsURLConnection.getDefaultHostnameVerifier();

final HostnameVerifier localhostAcceptedHostnameVerifier = new HostnameVerifier() {

public boolean verify(String hostname, javax.net.ssl.SSLSession sslSession) {
if (hostname.equals("localhost")) {
return true;
}
return defaultHostnameVerifier.verify(hostname, sslSession);
final HostnameVerifier localhostAcceptedHostnameVerifier = (hostname, sslSession) -> {
if (hostname.equals("localhost")) {
return true;
}
return defaultHostnameVerifier.verify(hostname, sslSession);
};
javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier(localhostAcceptedHostnameVerifier);
}
Expand All @@ -71,13 +67,13 @@ public static Object[] instances() {
};
}

protected void sslSetUpForBroker() throws Exception {
((KafkaServiceConfiguration) conf).setKopSslClientAuth("required");
((KafkaServiceConfiguration) conf).setKopSslKeystoreType("JKS");
((KafkaServiceConfiguration) conf).setKopSslKeystoreLocation(kopSslKeystoreLocation);
((KafkaServiceConfiguration) conf).setKopSslKeystorePassword(kopSslKeystorePassword);
((KafkaServiceConfiguration) conf).setKopSslTruststoreLocation(kopSslTruststoreLocation);
((KafkaServiceConfiguration) conf).setKopSslTruststorePassword(kopSslTruststorePassword);
protected void sslSetUpForBroker() {
conf.setKopSslClientAuth("required");
conf.setKopSslKeystoreType("JKS");
conf.setKopSslKeystoreLocation(kopSslKeystoreLocation);
conf.setKopSslKeystorePassword(kopSslKeystorePassword);
conf.setKopSslTruststoreLocation(kopSslTruststoreLocation);
conf.setKopSslTruststorePassword(kopSslTruststorePassword);
}

@BeforeMethod
Expand Down Expand Up @@ -161,9 +157,6 @@ public SslProducer(String topic, int port) {
props.put("ssl.keystore.location", "./src/test/resources/ssl/certificate/client.keystore.jks");
props.put("ssl.keystore.password", "client");

// default is https, here need to set empty.
props.put("ssl.endpoint.identification.algorithm", "");

producer = new KafkaProducer<>(props);
this.topic = topic;
}
Expand Down
Binary file not shown.
Binary file modified tests/src/test/resources/ssl/certificate/broker.truststore.jks
Binary file not shown.
Binary file not shown.
Binary file modified tests/src/test/resources/ssl/certificate/client.truststore.jks
Binary file not shown.
Loading