Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Use dedicated executor for requests in BinaryProtoLookupService #23378

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider;
private final Timer brokerClientSharedTimer;
private final ExecutorProvider brokerClientSharedLookupExecutorProvider;

private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
Expand Down Expand Up @@ -388,6 +389,8 @@ public PulsarService(ServiceConfiguration config,
new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
this.brokerClientSharedLookupExecutorProvider =
new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor");

// here in the constructor we don't have the offloader scheduler yet
this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
Expand Down Expand Up @@ -1687,6 +1690,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
nodece marked this conversation as resolved.
Show resolved Hide resolved
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -58,9 +60,11 @@ public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService executor;
private final ExecutorService scheduleExecutor;
lhotari marked this conversation as resolved.
Show resolved Hide resolved
private final String listenerName;
private final int maxLookupRedirects;
private final ExecutorService lookupPinnedExecutor;
private final boolean createdLookupPinnedExecutor;

private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();
Expand All @@ -73,23 +77,43 @@ public class BinaryProtoLookupService implements LookupService {
private final LatencyHistogram histoGetSchema;
private final LatencyHistogram histoListTopics;

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, scheduleExecutor);
}

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, executor);
this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null);
}

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor,
ExecutorService lookupPinnedExecutor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
this.executor = executor;
this.scheduleExecutor = scheduleExecutor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
Expand All @@ -103,6 +127,15 @@ public BinaryProtoLookupService(PulsarClientImpl client,
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build());
histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());

if (lookupPinnedExecutor == null) {
this.createdLookupPinnedExecutor = true;
this.lookupPinnedExecutor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
} else {
this.createdLookupPinnedExecutor = false;
this.lookupPinnedExecutor = lookupPinnedExecutor;
}
}

@Override
Expand Down Expand Up @@ -180,7 +213,7 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
return addressFuture;
}

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId,
properties);
Expand Down Expand Up @@ -247,7 +280,7 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, lookupPinnedExecutor).exceptionally(connectionException -> {
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -260,7 +293,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
long startTime = System.nanoTime();
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
if (useFallbackForNonPIP344Brokers) {
Expand Down Expand Up @@ -301,7 +334,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, lookupPinnedExecutor).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -324,7 +357,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
return schemaFuture;
}
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
Expand All @@ -340,7 +373,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
}, lookupPinnedExecutor).exceptionally(ex -> {
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
Expand Down Expand Up @@ -385,7 +418,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
String topicsHash) {
long startTime = System.nanoTime();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern, topicsHash);
Expand All @@ -404,7 +437,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally((e) -> {
}, lookupPinnedExecutor).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
getTopicsResultFuture.completeExceptionally(
Expand All @@ -414,7 +447,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
return null;
}

((ScheduledExecutorService) executor).schedule(() -> {
((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
Expand All @@ -428,7 +461,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,

@Override
public void close() throws Exception {
// no-op
if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) {
lookupPinnedExecutor.shutdown();
}
}

public static class LookupDataResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class PulsarClientImpl implements PulsarClient {
private final boolean createdExecutorProviders;

private final boolean createdScheduledProviders;
private final boolean createdLookupProviders;
private LookupService lookup;
private Map<String, LookupService> urlLookupMap = new ConcurrentHashMap<>();
private final ConnectionPool cnxPool;
Expand All @@ -121,6 +122,7 @@ public class PulsarClientImpl implements PulsarClient {
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
private final ExecutorProvider lookupExecutorProvider;

private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
Expand Down Expand Up @@ -163,29 +165,39 @@ public SchemaInfoProvider load(String topicName) {
private TransactionCoordinatorClientImpl tcClient;

public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, null, null, null, null, null, null);
this(conf, null, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, null, null, null, null, null);
this(conf, eventLoopGroup, null, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, null, null, null, null);
this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider)
throws PulsarClientException {
this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider,
scheduledExecutorProvider, null);
}

@Builder(builderClassName = "PulsarClientImplBuilder")
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
ScheduledExecutorProvider scheduledExecutorProvider,
ExecutorProvider lookupExecutorProvider) throws PulsarClientException {
lhotari marked this conversation as resolved.
Show resolved Hide resolved

EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
Expand All @@ -198,6 +210,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
}
this.createdExecutorProviders = externalExecutorProvider == null;
this.createdScheduledProviders = scheduledExecutorProvider == null;
this.createdLookupProviders = lookupExecutorProvider == null;
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
if (conf == null || isBlank(conf.getServiceUrl())) {
Expand All @@ -218,11 +231,14 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider :
new ExecutorProvider(1, "pulsar-client-lookup");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(),
this.lookupExecutorProvider.getExecutor());
}
if (timer == null) {
this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -976,6 +992,16 @@ private void shutdownExecutors() throws PulsarClientException {
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) {
try {
lookupExecutorProvider.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown lookupExecutorProvider", t);
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (pulsarClientException != null) {
throw pulsarClientException;
}
Expand Down
Loading
Loading