Skip to content

Commit

Permalink
[fix][client] Use dedicated executor for requests in BinaryProtoLooku…
Browse files Browse the repository at this point in the history
…pService

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Sep 30, 2024
1 parent 5e832a1 commit e11061a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class BinaryProtoLookupService implements LookupService {
private final ExecutorService executor;
private final String listenerName;
private final int maxLookupRedirects;
private final ExecutorService internalPinnedExecutor;

private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -103,6 +104,8 @@ public BinaryProtoLookupService(PulsarClientImpl client,
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build());
histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());

internalPinnedExecutor = client.getInternalExecutorService();
}

@Override
Expand Down Expand Up @@ -180,7 +183,7 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
return addressFuture;
}

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId,
properties);
Expand Down Expand Up @@ -247,7 +250,7 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, internalPinnedExecutor).exceptionally(connectionException -> {
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -260,7 +263,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
long startTime = System.nanoTime();
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
if (useFallbackForNonPIP344Brokers) {
Expand Down Expand Up @@ -301,7 +304,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
}, internalPinnedExecutor).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
Expand All @@ -324,7 +327,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
return schemaFuture;
}
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
Expand All @@ -340,7 +343,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
}, internalPinnedExecutor).exceptionally(ex -> {
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
Expand Down Expand Up @@ -385,7 +388,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
String topicsHash) {
long startTime = System.nanoTime();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern, topicsHash);
Expand All @@ -404,7 +407,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
}
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally((e) -> {
}, internalPinnedExecutor).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
getTopicsResultFuture.completeExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,40 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException.LookupException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class BinaryProtoLookupServiceTest {
private BinaryProtoLookupService lookup;
private TopicName topicName;
private ExecutorService internalExecutor;

@AfterMethod
public void cleanup() throws Exception {
internalExecutor.shutdownNow();
}

@BeforeMethod
public void setup() throws Exception {
Expand Down Expand Up @@ -72,6 +87,9 @@ public void setup() throws Exception {
doReturn(1L).when(client).newRequestId();
ClientConfigurationData data = new ClientConfigurationData();
doReturn(data).when(client).getConfiguration();
internalExecutor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor"));
doReturn(internalExecutor).when(client).getInternalExecutorService();

lookup = spy(
new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class)));
Expand Down Expand Up @@ -118,6 +136,37 @@ public void maxLookupRedirectsTest3() throws Exception {
}
}

@Test
public void testCommandUnChangedInDifferentThread() throws Exception {
BaseCommand successCommand = Commands.newSuccessCommand(10000);
lookup.getBroker(topicName).get();
assertEquals(successCommand.getType(), Type.SUCCESS);
lookup.getPartitionedTopicMetadata(topicName, true, true).get();
assertEquals(successCommand.getType(), Type.SUCCESS);
}

@Test
public void testCommandChangedInSameThread() throws Exception {
AtomicReference<BaseCommand> successCommand = new AtomicReference<>();
internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000)));
Awaitility.await().untilAsserted(() -> {
BaseCommand baseCommand = successCommand.get();
assertNotNull(baseCommand);
assertEquals(baseCommand.getType(), Type.SUCCESS);
});
lookup.getBroker(topicName).get();
assertEquals(successCommand.get().getType(), Type.LOOKUP);

internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000)));
Awaitility.await().untilAsserted(() -> {
BaseCommand baseCommand = successCommand.get();
assertNotNull(baseCommand);
assertEquals(baseCommand.getType(), Type.SUCCESS);
});
lookup.getPartitionedTopicMetadata(topicName, true, true).get();
assertEquals(successCommand.get().getType(), Type.PARTITIONED_METADATA);
}

private static LookupDataResult createLookupDataResult(String brokerUrl, boolean redirect) throws Exception {
LookupDataResult lookupResult = new LookupDataResult(-1);

Expand Down

0 comments on commit e11061a

Please sign in to comment.