Skip to content

Commit

Permalink
[fix][client] Set fields earlier for correct ClientCnx initialization (
Browse files Browse the repository at this point in the history
…#19327)

(cherry picked from commit 3d8b52a)
  • Loading branch information
michaeljmarshall committed Jan 31, 2023
1 parent 80bb1b7 commit f0f9b90
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>();

// Trigger async connect to broker
createConnection(physicalAddress).thenAccept(channel -> {
createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
log.info("[{}] Connected to server", channel);

channel.closeFuture().addListener(v -> {
Expand All @@ -201,16 +201,6 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
return;
}

if (!logicalAddress.equals(physicalAddress)) {
// We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
// That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
// this method.
cnx.setTargetBroker(logicalAddress);
}

cnx.setRemoteHostName(physicalAddress.getHostString());

cnx.connectionFuture().thenRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection handshake completed", cnx.channel());
Expand Down Expand Up @@ -238,19 +228,20 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
/**
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAddress,
InetSocketAddress unresolvedPhysicalAddress) {
CompletableFuture<List<InetSocketAddress>> resolvedAddress;
try {
if (isSniProxy) {
URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
resolvedAddress =
resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), proxyURI.getPort()));
} else {
resolvedAddress = resolveName(unresolvedAddress);
resolvedAddress = resolveName(unresolvedPhysicalAddress);
}
return resolvedAddress.thenCompose(
inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(),
isSniProxy ? unresolvedAddress : null));
inetAddresses -> connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(),
isSniProxy ? unresolvedPhysicalAddress : null));
} catch (URISyntaxException e) {
log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e);
return FutureUtil
Expand All @@ -262,17 +253,19 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolved
* Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
* address is working.
*/
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses,
private CompletableFuture<Channel> connectToResolvedAddresses(InetSocketAddress logicalAddress,
Iterator<InetSocketAddress> resolvedPhysicalAddress,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();

// Successfully connected to server
connectToAddress(unresolvedAddresses.next(), sniHost)
connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
if (resolvedPhysicalAddress.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, sniHost).thenAccept(future::complete)
connectToResolvedAddresses(logicalAddress, resolvedPhysicalAddress, sniHost)
.thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
Expand Down Expand Up @@ -303,14 +296,20 @@ CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress unresol
/**
* Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetSocketAddress remoteAddress, InetSocketAddress sniHost) {
private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, InetSocketAddress sniHost) {
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost : remoteAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
.initTls(channel, sniHost != null ? sniHost : physicalAddress))
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
} else {
return toCompletableFuture(bootstrap.connect(remoteAddress));
return toCompletableFuture(bootstrap.register())
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand Down Expand Up @@ -176,5 +177,29 @@ CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {

return initTlsFuture;
}

CompletableFuture<Channel> initializeClientCnx(Channel ch,
InetSocketAddress logicalAddress,
InetSocketAddress resolvedPhysicalAddress) {
return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> {
final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");

if (cnx == null) {
throw new IllegalStateException("Missing ClientCnx. This should not happen.");
}

// Need to do our own equality because the physical address is resolved already
if (!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString())
&& logicalAddress.getPort() == resolvedPhysicalAddress.getPort())) {
// We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
cnx.setTargetBroker(logicalAddress);
}

cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());

return ch;
}));
}
}

0 comments on commit f0f9b90

Please sign in to comment.