Skip to content

Commit

Permalink
Use NettyFutureUtil to manage future
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Jan 26, 2023
1 parent dd40ff9 commit 684a135
Showing 1 changed file with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;

@Slf4j
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
Expand Down Expand Up @@ -213,13 +214,11 @@ CompletableFuture<Channel> initSocks5IfConfig(Channel ch) {
CompletableFuture<Channel> initializeClientCnx(Channel ch,
InetSocketAddress logicalAddress,
InetSocketAddress resolvedPhysicalAddress) {
CompletableFuture<Channel> initFuture = new CompletableFuture<>();
ch.eventLoop().execute(() -> {
return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> {
final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");

if (cnx == null) {
initFuture
.completeExceptionally(new IllegalStateException("Missing ClientCnx. This should not happen."));
throw new IllegalStateException("Missing ClientCnx. This should not happen.");
}

// Need to do our own equality because the physical address is resolved already
Expand All @@ -232,10 +231,8 @@ CompletableFuture<Channel> initializeClientCnx(Channel ch,

cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());

initFuture.complete(ch);
});

return initFuture;
return ch;
}));
}
}

0 comments on commit 684a135

Please sign in to comment.