From db7946e8265819cd3b3c47dd55512bd4cd2a112d Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 15 Oct 2024 21:41:26 +0800 Subject: [PATCH] [fix][client] Use dedicated executor for requests in BinaryProtoLookupService (#23378) Signed-off-by: Zixuan Liu (cherry picked from commit f98297f3c9c052d7ddd8444bc0ef876ceeb924b1) --- .../apache/pulsar/broker/PulsarService.java | 5 ++ .../client/impl/BinaryProtoLookupService.java | 65 ++++++++++++++----- .../pulsar/client/impl/PulsarClientImpl.java | 38 +++++++++-- .../impl/BinaryProtoLookupServiceTest.java | 55 +++++++++++++++- 4 files changed, 140 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 392e536e39db9..559db301158a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -251,6 +251,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ExecutorProvider brokerClientSharedExternalExecutorProvider; private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider; private final Timer brokerClientSharedTimer; + private final ExecutorProvider brokerClientSharedLookupExecutorProvider; private MetricsGenerator metricsGenerator; private final PulsarBrokerOpenTelemetry openTelemetry; @@ -370,6 +371,8 @@ public PulsarService(ServiceConfiguration config, new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor"); this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); + this.brokerClientSharedLookupExecutorProvider = + new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor"); // here in the constructor we don't have the offloader scheduler yet this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0); @@ -637,6 +640,7 @@ public CompletableFuture closeAsync() { brokerClientSharedExternalExecutorProvider.shutdownNow(); brokerClientSharedInternalExecutorProvider.shutdownNow(); brokerClientSharedScheduledExecutorProvider.shutdownNow(); + brokerClientSharedLookupExecutorProvider.shutdownNow(); brokerClientSharedTimer.stop(); monotonicSnapshotClock.close(); @@ -1598,6 +1602,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider) + .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider) .build(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 6ee6fafde1c25..c4337f897d89d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -21,6 +21,7 @@ import static java.lang.String.format; import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; @@ -28,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -56,9 +58,11 @@ public class BinaryProtoLookupService implements LookupService { private final PulsarClientImpl client; private final ServiceNameResolver serviceNameResolver; private final boolean useTls; - private final ExecutorService executor; + private final ExecutorService scheduleExecutor; private final String listenerName; private final int maxLookupRedirects; + private final ExecutorService lookupPinnedExecutor; + private final boolean createdLookupPinnedExecutor; private final ConcurrentHashMap> lookupInProgress = new ConcurrentHashMap<>(); @@ -71,23 +75,43 @@ public class BinaryProtoLookupService implements LookupService { private final LatencyHistogram histoGetSchema; private final LatencyHistogram histoListTopics; + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated + public BinaryProtoLookupService(PulsarClientImpl client, + String serviceUrl, + boolean useTls, + ExecutorService scheduleExecutor) + throws PulsarClientException { + this(client, serviceUrl, null, useTls, scheduleExecutor); + } + + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, + String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor) throws PulsarClientException { - this(client, serviceUrl, null, useTls, executor); + this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null); } public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor, + ExecutorService lookupPinnedExecutor) throws PulsarClientException { this.client = client; this.useTls = useTls; - this.executor = executor; + this.scheduleExecutor = scheduleExecutor; this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects(); this.serviceNameResolver = new PulsarServiceNameResolver(); this.listenerName = listenerName; @@ -101,6 +125,15 @@ public BinaryProtoLookupService(PulsarClientImpl client, histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build()); histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build()); histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); + + if (lookupPinnedExecutor == null) { + this.createdLookupPinnedExecutor = true; + this.lookupPinnedExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup")); + } else { + this.createdLookupPinnedExecutor = false; + this.lookupPinnedExecutor = lookupPinnedExecutor; + } } @Override @@ -176,7 +209,7 @@ private CompletableFuture findBroker(InetSocketAddress socket return addressFuture; } - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { @@ -242,7 +275,7 @@ private CompletableFuture findBroker(InetSocketAddress socket } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -255,7 +288,7 @@ private CompletableFuture getPartitionedTopicMetadata( long startTime = System.nanoTime(); CompletableFuture partitionFuture = new CompletableFuture<>(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { boolean finalAutoCreationEnabled = metadataAutoCreationEnabled; if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { if (useFallbackForNonPIP344Brokers) { @@ -296,7 +329,7 @@ private CompletableFuture getPartitionedTopicMetadata( } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -319,7 +352,7 @@ public CompletableFuture> getSchema(TopicName topicName, by return schemaFuture; } InetSocketAddress socketAddress = serviceNameResolver.resolveHost(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.ofNullable(BytesSchemaVersion.of(version))); @@ -335,7 +368,7 @@ public CompletableFuture> getSchema(TopicName topicName, by } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(ex -> { + }, lookupPinnedExecutor).exceptionally(ex -> { schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); @@ -380,7 +413,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, String topicsHash) { long startTime = System.nanoTime(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( namespace.toString(), requestId, mode, topicsPattern, topicsHash); @@ -399,7 +432,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally((e) -> { + }, lookupPinnedExecutor).exceptionally((e) -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { getTopicsResultFuture.completeExceptionally( @@ -409,7 +442,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, return null; } - ((ScheduledExecutorService) executor).schedule(() -> { + ((ScheduledExecutorService) scheduleExecutor).schedule(() -> { log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in" + " {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -423,7 +456,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, @Override public void close() throws Exception { - // no-op + if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) { + lookupPinnedExecutor.shutdown(); + } } public static class LookupDataResult { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f6fb763df1d4b..73e6fe9888c2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -111,6 +111,7 @@ public class PulsarClientImpl implements PulsarClient { private final boolean createdExecutorProviders; private final boolean createdScheduledProviders; + private final boolean createdLookupProviders; private LookupService lookup; private Map urlLookupMap = new ConcurrentHashMap<>(); private final ConnectionPool cnxPool; @@ -119,6 +120,7 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + private final ExecutorProvider lookupExecutorProvider; private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; @@ -161,29 +163,39 @@ public SchemaInfoProvider load(String topicName) { private TransactionCoordinatorClientImpl tcClient; public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { - this(conf, null, null, null, null, null, null); + this(conf, null, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, null, null, null, null, null); + this(conf, eventLoopGroup, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null, null, null, null); + this(conf, eventLoopGroup, cnxPool, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, timer, null, null, null); + this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null); + } + + public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, + Timer timer, ExecutorProvider externalExecutorProvider, + ExecutorProvider internalExecutorProvider, + ScheduledExecutorProvider scheduledExecutorProvider) + throws PulsarClientException { + this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider, + scheduledExecutorProvider, null); } @Builder(builderClassName = "PulsarClientImplBuilder") private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer, ExecutorProvider externalExecutorProvider, ExecutorProvider internalExecutorProvider, - ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException { + ScheduledExecutorProvider scheduledExecutorProvider, + ExecutorProvider lookupExecutorProvider) throws PulsarClientException { EventLoopGroup eventLoopGroupReference = null; ConnectionPool connectionPoolReference = null; @@ -196,6 +208,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG } this.createdExecutorProviders = externalExecutorProvider == null; this.createdScheduledProviders = scheduledExecutorProvider == null; + this.createdLookupProviders = lookupExecutorProvider == null; eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf); this.eventLoopGroup = eventLoopGroupReference; if (conf == null || isBlank(conf.getServiceUrl())) { @@ -215,11 +228,14 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider : new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled"); + this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider : + new ExecutorProvider(1, "pulsar-client-lookup"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup); } else { lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), - conf.isUseTls(), this.scheduledExecutorProvider.getExecutor()); + conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(), + this.lookupExecutorProvider.getExecutor()); } if (timer == null) { this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); @@ -972,6 +988,16 @@ private void shutdownExecutors() throws PulsarClientException { pulsarClientException = PulsarClientException.unwrap(t); } } + + if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) { + try { + lookupExecutorProvider.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown lookupExecutorProvider", t); + pulsarClientException = PulsarClientException.unwrap(t); + } + } + if (pulsarClientException != null) { throw pulsarClientException; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 983cd21a7a9d8..a5b6904d497ef 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -25,25 +25,41 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.api.PulsarClientException.LookupException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class BinaryProtoLookupServiceTest { private BinaryProtoLookupService lookup; private TopicName topicName; + private ExecutorService internalExecutor; + + @AfterMethod + public void cleanup() throws Exception { + internalExecutor.shutdown(); + lookup.close(); + } @BeforeMethod public void setup() throws Exception { @@ -70,9 +86,13 @@ public void setup() throws Exception { doReturn(cnxPool).when(client).getCnxPool(); doReturn(clientConfig).when(client).getConfiguration(); doReturn(1L).when(client).newRequestId(); + internalExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor")); + doReturn(internalExecutor).when(client).getInternalExecutorService(); + + lookup = spy(new BinaryProtoLookupService(client, "pulsar://localhost:6650", null, false, + mock(ExecutorService.class), internalExecutor)); - lookup = spy( - new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class))); topicName = TopicName.get("persistent://tenant1/ns1/t1"); } @@ -116,6 +136,37 @@ public void maxLookupRedirectsTest3() throws Exception { } } + @Test + public void testCommandUnChangedInDifferentThread() throws Exception { + BaseCommand successCommand = Commands.newSuccessCommand(10000); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + lookup.getPartitionedTopicMetadata(topicName, true, true).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + } + + @Test + public void testCommandChangedInSameThread() throws Exception { + AtomicReference successCommand = new AtomicReference<>(); + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.get().getType(), Type.LOOKUP); + + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getPartitionedTopicMetadata(topicName, true, true).get(); + assertEquals(successCommand.get().getType(), Type.PARTITIONED_METADATA); + } + private static LookupDataResult createLookupDataResult(String brokerUrl, boolean redirect) throws Exception { LookupDataResult lookupResult = new LookupDataResult(-1);