Skip to content

Commit

Permalink
[fix][client] Use dedicated executor for requests in BinaryProtoLooku…
Browse files Browse the repository at this point in the history
…pService (apache#23378) (#28)

* [fix][client] Use dedicated executor for requests in BinaryProtoLookupService (apache#23378)

Signed-off-by: Zixuan Liu <nodeces@gmail.com>

(cherry picked from commit f98297f)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>

* Fix aircompressor license

Signed-off-by: Zixuan Liu <nodeces@gmail.com>

---------

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece authored Dec 5, 2024
1 parent a1e5016 commit 038c42c
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider;
private final Timer brokerClientSharedTimer;
private final ExecutorProvider brokerClientSharedLookupExecutorProvider;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -342,6 +343,8 @@ public PulsarService(ServiceConfiguration config,
new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
this.brokerClientSharedLookupExecutorProvider =
new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor");
}

public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
Expand Down Expand Up @@ -551,6 +554,7 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedScheduledExecutorProvider.shutdownNow();
brokerClientSharedLookupExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
ioEventLoopGroup.shutdownGracefully();

Expand Down Expand Up @@ -1376,6 +1380,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.lang.String.format;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -53,37 +55,67 @@ public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService executor;
private final ExecutorService scheduleExecutor;
private final String listenerName;
private final int maxLookupRedirects;
private final ExecutorService lookupPinnedExecutor;
private final boolean createdLookupPinnedExecutor;

private final ConcurrentHashMap<TopicName, CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
lookupInProgress = new ConcurrentHashMap<>();

private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
partitionedMetadataInProgress = new ConcurrentHashMap<>();

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, executor);
this(client, serviceUrl, null, useTls, scheduleExecutor);
}

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null);
}

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService scheduleExecutor,
ExecutorService lookupPinnedExecutor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
this.executor = executor;
this.scheduleExecutor = scheduleExecutor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
if (lookupPinnedExecutor == null) {
this.createdLookupPinnedExecutor = true;
this.lookupPinnedExecutor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
} else {
this.createdLookupPinnedExecutor = false;
this.lookupPinnedExecutor = lookupPinnedExecutor;
}
}

@Override
Expand Down Expand Up @@ -148,7 +180,7 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
return addressFuture;
}

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
Expand Down Expand Up @@ -213,7 +245,7 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, lookupPinnedExecutor).exceptionally(connectionException -> {
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -225,7 +257,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(

CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<PartitionedTopicMetadata>();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
Expand All @@ -246,7 +278,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, lookupPinnedExecutor).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -268,7 +300,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
return schemaFuture;
}
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
Expand All @@ -282,7 +314,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
}, lookupPinnedExecutor).exceptionally(ex -> {
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
Expand Down Expand Up @@ -314,7 +346,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
AtomicLong remainingTime,
CompletableFuture<List<String>> topicsFuture,
Mode mode) {
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode);
Expand All @@ -341,7 +373,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally((e) -> {
}, lookupPinnedExecutor).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
topicsFuture.completeExceptionally(
Expand All @@ -351,7 +383,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
return null;
}

((ScheduledExecutorService) executor).schedule(() -> {
((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
Expand All @@ -364,7 +396,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,

@Override
public void close() throws Exception {
// no-op
if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) {
lookupPinnedExecutor.shutdown();
}
}

public static class LookupDataResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,15 @@ public class PulsarClientImpl implements PulsarClient {
private final boolean createdExecutorProviders;

private final boolean createdScheduledProviders;
private final boolean createdLookupProviders;
private LookupService lookup;
private final ConnectionPool cnxPool;
@Getter
private final Timer timer;
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
private final ExecutorProvider lookupExecutorProvider;

private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
Expand Down Expand Up @@ -144,29 +146,39 @@ public SchemaInfoProvider load(String topicName) {
private TransactionCoordinatorClientImpl tcClient;

public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, null, null, null, null, null, null);
this(conf, null, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, null, null, null, null, null);
this(conf, eventLoopGroup, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, null, null, null, null);
this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider)
throws PulsarClientException {
this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider,
scheduledExecutorProvider, null);
}

@Builder(builderClassName = "PulsarClientImplBuilder")
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
ScheduledExecutorProvider scheduledExecutorProvider,
ExecutorProvider lookupExecutorProvider) throws PulsarClientException {
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
try {
Expand All @@ -178,6 +190,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
}
this.createdExecutorProviders = externalExecutorProvider == null;
this.createdScheduledProviders = scheduledExecutorProvider == null;
this.createdLookupProviders = lookupExecutorProvider == null;
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
if (conf == null || isBlank(conf.getServiceUrl())) {
Expand All @@ -193,13 +206,17 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled");
this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider :
new ExecutorProvider(1, "pulsar-client-lookup");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(),
this.lookupExecutorProvider.getExecutor());
}
if (timer == null) {
this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -858,6 +875,16 @@ private void shutdownExecutors() throws PulsarClientException {
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) {
try {
lookupExecutorProvider.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown lookupExecutorProvider", t);
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (pulsarClientException != null) {
throw pulsarClientException;
}
Expand Down
Loading

0 comments on commit 038c42c

Please sign in to comment.