From ebea662591796035b3994a89cb7f040bd5a3be5a Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 11 Oct 2024 00:10:42 +0800 Subject: [PATCH 1/5] [fix][client] Use dedicated executor for requests in BinaryProtoLookupService Signed-off-by: Zixuan Liu --- .../apache/pulsar/broker/PulsarService.java | 5 +- .../client/impl/BinaryProtoLookupService.java | 65 ++++++++++++++----- .../pulsar/client/impl/PulsarClientImpl.java | 29 +++++++-- .../impl/BinaryProtoLookupServiceTest.java | 55 +++++++++++++++- 4 files changed, 130 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index dcc0e961275bd..2551d7f7cabaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -263,6 +263,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ExecutorProvider brokerClientSharedExternalExecutorProvider; private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider; private final Timer brokerClientSharedTimer; + private final ExecutorProvider brokerClientSharedLookupExecutorProvider; private MetricsGenerator metricsGenerator; private final PulsarBrokerOpenTelemetry openTelemetry; @@ -388,6 +389,8 @@ public PulsarService(ServiceConfiguration config, new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor"); this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); + this.brokerClientSharedLookupExecutorProvider = + new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor"); // here in the constructor we don't have the offloader scheduler yet this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0); @@ -1686,7 +1689,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) .timer(brokerClientSharedTimer) .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) - .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider) + .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider) .build(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index b45d6e9f6a80a..e6be88d2c1aa1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -21,6 +21,7 @@ import static java.lang.String.format; import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; @@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -58,9 +60,11 @@ public class BinaryProtoLookupService implements LookupService { private final PulsarClientImpl client; private final ServiceNameResolver serviceNameResolver; private final boolean useTls; - private final ExecutorService executor; + private final ExecutorService scheduleExecutor; private final String listenerName; private final int maxLookupRedirects; + private final ExecutorService lookupPinnedExecutor; + private final boolean createdLookupPinnedExecutor; private final ConcurrentHashMap>, CompletableFuture> lookupInProgress = new ConcurrentHashMap<>(); @@ -73,23 +77,43 @@ public class BinaryProtoLookupService implements LookupService { private final LatencyHistogram histoGetSchema; private final LatencyHistogram histoListTopics; + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated + public BinaryProtoLookupService(PulsarClientImpl client, + String serviceUrl, + boolean useTls, + ExecutorService scheduleExecutor) + throws PulsarClientException { + this(client, serviceUrl, null, useTls, scheduleExecutor); + } + + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, + String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor) throws PulsarClientException { - this(client, serviceUrl, null, useTls, executor); + this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null); } public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor, + ExecutorService lookupPinnedExecutor) throws PulsarClientException { this.client = client; this.useTls = useTls; - this.executor = executor; + this.scheduleExecutor = scheduleExecutor; this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects(); this.serviceNameResolver = new PulsarServiceNameResolver(); this.listenerName = listenerName; @@ -103,6 +127,15 @@ public BinaryProtoLookupService(PulsarClientImpl client, histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build()); histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build()); histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); + + if (lookupPinnedExecutor == null) { + this.createdLookupPinnedExecutor = false; + this.lookupPinnedExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup")); + } else { + this.createdLookupPinnedExecutor = true; + this.lookupPinnedExecutor = lookupPinnedExecutor; + } } @Override @@ -180,7 +213,7 @@ private CompletableFuture findBroker(InetSocketAddress socket return addressFuture; } - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId, properties); @@ -247,7 +280,7 @@ private CompletableFuture findBroker(InetSocketAddress socket } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -260,7 +293,7 @@ private CompletableFuture getPartitionedTopicMetadata( long startTime = System.nanoTime(); CompletableFuture partitionFuture = new CompletableFuture<>(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { boolean finalAutoCreationEnabled = metadataAutoCreationEnabled; if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { if (useFallbackForNonPIP344Brokers) { @@ -301,7 +334,7 @@ private CompletableFuture getPartitionedTopicMetadata( } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -324,7 +357,7 @@ public CompletableFuture> getSchema(TopicName topicName, by return schemaFuture; } InetSocketAddress socketAddress = serviceNameResolver.resolveHost(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.ofNullable(BytesSchemaVersion.of(version))); @@ -340,7 +373,7 @@ public CompletableFuture> getSchema(TopicName topicName, by } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(ex -> { + }, lookupPinnedExecutor).exceptionally(ex -> { schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); @@ -385,7 +418,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, String topicsHash) { long startTime = System.nanoTime(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( namespace.toString(), requestId, mode, topicsPattern, topicsHash); @@ -404,7 +437,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally((e) -> { + }, lookupPinnedExecutor).exceptionally((e) -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { getTopicsResultFuture.completeExceptionally( @@ -414,7 +447,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, return null; } - ((ScheduledExecutorService) executor).schedule(() -> { + ((ScheduledExecutorService) scheduleExecutor).schedule(() -> { log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in" + " {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -428,7 +461,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, @Override public void close() throws Exception { - // no-op + if (!createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) { + lookupPinnedExecutor.shutdown(); + } } public static class LookupDataResult { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e0d4bf35f8a22..03435e416cd38 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -113,6 +113,7 @@ public class PulsarClientImpl implements PulsarClient { private final boolean createdExecutorProviders; private final boolean createdScheduledProviders; + private final boolean createdLookupProviders; private LookupService lookup; private Map urlLookupMap = new ConcurrentHashMap<>(); private final ConnectionPool cnxPool; @@ -121,6 +122,7 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + private final ExecutorProvider lookupExecutorProvider; private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; @@ -163,29 +165,30 @@ public SchemaInfoProvider load(String topicName) { private TransactionCoordinatorClientImpl tcClient; public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { - this(conf, null, null, null, null, null, null); + this(conf, null, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, null, null, null, null, null); + this(conf, eventLoopGroup, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null, null, null, null); + this(conf, eventLoopGroup, cnxPool, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, timer, null, null, null); + this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null); } @Builder(builderClassName = "PulsarClientImplBuilder") private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer, ExecutorProvider externalExecutorProvider, ExecutorProvider internalExecutorProvider, - ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException { + ScheduledExecutorProvider scheduledExecutorProvider, + ExecutorProvider lookupExecutorProvider) throws PulsarClientException { EventLoopGroup eventLoopGroupReference = null; ConnectionPool connectionPoolReference = null; @@ -198,6 +201,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG } this.createdExecutorProviders = externalExecutorProvider == null; this.createdScheduledProviders = scheduledExecutorProvider == null; + this.createdLookupProviders = lookupExecutorProvider == null; eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf); this.eventLoopGroup = eventLoopGroupReference; if (conf == null || isBlank(conf.getServiceUrl())) { @@ -218,11 +222,14 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); + this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider : + new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-lookup"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup); } else { lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), - conf.isUseTls(), this.scheduledExecutorProvider.getExecutor()); + conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(), + this.lookupExecutorProvider.getExecutor()); } if (timer == null) { this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); @@ -976,6 +983,16 @@ private void shutdownExecutors() throws PulsarClientException { pulsarClientException = PulsarClientException.unwrap(t); } } + + if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) { + try { + lookupExecutorProvider.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown lookupExecutorProvider", t); + pulsarClientException = PulsarClientException.unwrap(t); + } + } + if (pulsarClientException != null) { throw pulsarClientException; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index f691215b04e08..11e00eefcfdde 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -25,25 +25,41 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.api.PulsarClientException.LookupException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class BinaryProtoLookupServiceTest { private BinaryProtoLookupService lookup; private TopicName topicName; + private ExecutorService internalExecutor; + + @AfterMethod + public void cleanup() throws Exception { + internalExecutor.shutdown(); + lookup.close(); + } @BeforeMethod public void setup() throws Exception { @@ -72,9 +88,13 @@ public void setup() throws Exception { doReturn(1L).when(client).newRequestId(); ClientConfigurationData data = new ClientConfigurationData(); doReturn(data).when(client).getConfiguration(); + internalExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor")); + doReturn(internalExecutor).when(client).getInternalExecutorService(); + + lookup = spy(new BinaryProtoLookupService(client, "pulsar://localhost:6650", null, false, + mock(ExecutorService.class), internalExecutor)); - lookup = spy( - new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class))); topicName = TopicName.get("persistent://tenant1/ns1/t1"); } @@ -118,6 +138,37 @@ public void maxLookupRedirectsTest3() throws Exception { } } + @Test + public void testCommandUnChangedInDifferentThread() throws Exception { + BaseCommand successCommand = Commands.newSuccessCommand(10000); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + lookup.getPartitionedTopicMetadata(topicName, true, true).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + } + + @Test + public void testCommandChangedInSameThread() throws Exception { + AtomicReference successCommand = new AtomicReference<>(); + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.get().getType(), Type.LOOKUP); + + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getPartitionedTopicMetadata(topicName, true, true).get(); + assertEquals(successCommand.get().getType(), Type.PARTITIONED_METADATA); + } + private static LookupDataResult createLookupDataResult(String brokerUrl, boolean redirect) throws Exception { LookupDataResult lookupResult = new LookupDataResult(-1); From e59a1f9285a8d785be9f6ee535b16417c8b3964c Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 11 Oct 2024 23:56:31 +0800 Subject: [PATCH 2/5] Revert scheduledExecutorProvider Signed-off-by: Zixuan Liu --- .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2551d7f7cabaa..8535b01f5dfb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1689,6 +1689,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) .timer(brokerClientSharedTimer) .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) + .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider) .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider) .build(); } From 7ac2448ed7761359f918009e3f386112e4f8ea03 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 14 Oct 2024 19:12:12 +0800 Subject: [PATCH 3/5] inverse createdLookupPinnedExecutor and fix constructor Signed-off-by: Zixuan Liu --- .../pulsar/client/impl/BinaryProtoLookupService.java | 6 +++--- .../org/apache/pulsar/client/impl/PulsarClientImpl.java | 9 +++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index e6be88d2c1aa1..795cdc6d69383 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -129,11 +129,11 @@ public BinaryProtoLookupService(PulsarClientImpl client, histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); if (lookupPinnedExecutor == null) { - this.createdLookupPinnedExecutor = false; + this.createdLookupPinnedExecutor = true; this.lookupPinnedExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup")); } else { - this.createdLookupPinnedExecutor = true; + this.createdLookupPinnedExecutor = false; this.lookupPinnedExecutor = lookupPinnedExecutor; } } @@ -461,7 +461,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, @Override public void close() throws Exception { - if (!createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) { + if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) { lookupPinnedExecutor.shutdown(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 03435e416cd38..6448a3651aa93 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -183,6 +183,15 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null); } + public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, + Timer timer, ExecutorProvider externalExecutorProvider, + ExecutorProvider internalExecutorProvider, + ScheduledExecutorProvider scheduledExecutorProvider) + throws PulsarClientException { + this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider, + scheduledExecutorProvider, null); + } + @Builder(builderClassName = "PulsarClientImplBuilder") private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer, ExecutorProvider externalExecutorProvider, From e52bd097d268c0ecb2d3fc5a89fd4578d325358c Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 14 Oct 2024 21:35:22 +0800 Subject: [PATCH 4/5] use the single thread as lookupExecutorProvider Signed-off-by: Zixuan Liu --- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 6448a3651aa93..603844eeb786e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -232,7 +232,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider : - new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-lookup"); + new ExecutorProvider(1, "pulsar-client-lookup"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup); } else { From 4c3db767398defe57ae0d266692483aa71789db4 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 15 Oct 2024 17:37:15 +0800 Subject: [PATCH 5/5] close brokerClientSharedLookupExecutorProvider Signed-off-by: Zixuan Liu --- .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 8535b01f5dfb6..05491d9c281c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -699,6 +699,7 @@ public CompletableFuture closeAsync() { brokerClientSharedExternalExecutorProvider.shutdownNow(); brokerClientSharedInternalExecutorProvider.shutdownNow(); brokerClientSharedScheduledExecutorProvider.shutdownNow(); + brokerClientSharedLookupExecutorProvider.shutdownNow(); brokerClientSharedTimer.stop(); monotonicSnapshotClock.close();