Skip to content

Commit

Permalink
[fix][client] Use dedicated executor for requests in BinaryProtoLooku…
Browse files Browse the repository at this point in the history
…pService (apache#23378)

Signed-off-by: Zixuan Liu <nodeces@gmail.com>

(cherry picked from commit f98297f)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Oct 16, 2024
1 parent 76cbaf1 commit 154cf62
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider;
private final Timer brokerClientSharedTimer;
private final ExecutorProvider brokerClientSharedLookupExecutorProvider;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -353,6 +354,8 @@ public PulsarService(ServiceConfiguration config,
new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
this.brokerClientSharedLookupExecutorProvider =
new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor");

// here in the constructor we don't have the offloader scheduler yet
this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
Expand Down Expand Up @@ -593,6 +596,7 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedScheduledExecutorProvider.shutdownNow();
brokerClientSharedLookupExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();

asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
Expand Down Expand Up @@ -1587,6 +1591,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import static java.lang.String.format;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -55,37 +57,68 @@ public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService executor;
private final ExecutorService scheduleExecutor;
private final String listenerName;
private final int maxLookupRedirects;
private final ExecutorService lookupPinnedExecutor;
private final boolean createdLookupPinnedExecutor;

private final ConcurrentHashMap<TopicName, CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
lookupInProgress = new ConcurrentHashMap<>();

private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
partitionedMetadataInProgress = new ConcurrentHashMap<>();

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, scheduleExecutor);
}

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, executor);
this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null);
}

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor,
ExecutorService lookupPinnedExecutor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
this.executor = executor;
this.scheduleExecutor = scheduleExecutor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);

if (lookupPinnedExecutor == null) {
this.createdLookupPinnedExecutor = true;
this.lookupPinnedExecutor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
} else {
this.createdLookupPinnedExecutor = false;
this.lookupPinnedExecutor = lookupPinnedExecutor;
}
}

@Override
Expand Down Expand Up @@ -153,7 +186,7 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
return addressFuture;
}

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
Expand Down Expand Up @@ -218,7 +251,7 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, lookupPinnedExecutor).exceptionally(connectionException -> {
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -230,7 +263,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(

CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
if (useFallbackForNonPIP344Brokers) {
Expand Down Expand Up @@ -269,7 +302,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, lookupPinnedExecutor).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -291,7 +324,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
return schemaFuture;
}
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
Expand All @@ -305,7 +338,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
}, lookupPinnedExecutor).exceptionally(ex -> {
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
Expand Down Expand Up @@ -348,7 +381,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
Mode mode,
String topicsPattern,
String topicsHash) {
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern, topicsHash);
Expand All @@ -365,7 +398,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally((e) -> {
}, lookupPinnedExecutor).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
getTopicsResultFuture.completeExceptionally(
Expand All @@ -375,7 +408,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
return null;
}

((ScheduledExecutorService) executor).schedule(() -> {
((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
Expand All @@ -389,7 +422,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,

@Override
public void close() throws Exception {
// no-op
if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) {
lookupPinnedExecutor.shutdown();
}
}

public static class LookupDataResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class PulsarClientImpl implements PulsarClient {
private final boolean createdExecutorProviders;

private final boolean createdScheduledProviders;
private final boolean createdLookupProviders;
private LookupService lookup;
private Map<String, LookupService> urlLookupMap = new ConcurrentHashMap<>();
private final ConnectionPool cnxPool;
Expand All @@ -117,6 +118,7 @@ public class PulsarClientImpl implements PulsarClient {
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
private final ExecutorProvider lookupExecutorProvider;

private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
Expand Down Expand Up @@ -157,29 +159,40 @@ public SchemaInfoProvider load(String topicName) {
private TransactionCoordinatorClientImpl tcClient;

public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, null, null, null, null, null, null);
this(conf, null, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, null, null, null, null, null);
this(conf, eventLoopGroup, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, null, null, null, null);
this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider)
throws PulsarClientException {
this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider,
scheduledExecutorProvider, null);
}

@Builder(builderClassName = "PulsarClientImplBuilder")
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
ScheduledExecutorProvider scheduledExecutorProvider,
ExecutorProvider lookupExecutorProvider) throws PulsarClientException {

EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
try {
Expand All @@ -191,6 +204,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
}
this.createdExecutorProviders = externalExecutorProvider == null;
this.createdScheduledProviders = scheduledExecutorProvider == null;
this.createdLookupProviders = lookupExecutorProvider == null;
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
if (conf == null || isBlank(conf.getServiceUrl())) {
Expand All @@ -206,13 +220,16 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider :
new ExecutorProvider(1, "pulsar-client-lookup");
this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(),
this.lookupExecutorProvider.getExecutor());
}
if (timer == null) {
this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -965,6 +982,16 @@ private void shutdownExecutors() throws PulsarClientException {
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) {
try {
lookupExecutorProvider.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown lookupExecutorProvider", t);
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (pulsarClientException != null) {
throw pulsarClientException;
}
Expand Down
Loading

0 comments on commit 154cf62

Please sign in to comment.