Skip to content

Commit

Permalink
Add pulsar-client-binary-proto-lookup executor
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Oct 10, 2024
1 parent e11061a commit df0949b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -58,7 +60,7 @@ public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService executor;
private final ExecutorService scheduleExecutor;
private final String listenerName;
private final int maxLookupRedirects;
private final ExecutorService internalPinnedExecutor;
Expand All @@ -74,23 +76,44 @@ public class BinaryProtoLookupService implements LookupService {
private final LatencyHistogram histoGetSchema;
private final LatencyHistogram histoListTopics;

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, scheduleExecutor);
}

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, executor);
this(client, serviceUrl, listenerName, useTls, scheduleExecutor,
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup")));
}

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor,
ExecutorService internalPinnedExecutor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
this.executor = executor;
this.scheduleExecutor = scheduleExecutor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
Expand All @@ -105,7 +128,7 @@ public BinaryProtoLookupService(PulsarClientImpl client,
histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());

internalPinnedExecutor = client.getInternalExecutorService();
this.internalPinnedExecutor = internalPinnedExecutor;
}

@Override
Expand Down Expand Up @@ -417,7 +440,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
return null;
}

((ScheduledExecutorService) executor).schedule(() -> {
((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
Expand All @@ -431,7 +454,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,

@Override
public void close() throws Exception {
// no-op
if (internalPinnedExecutor != null && !internalPinnedExecutor.isShutdown()) {
internalPinnedExecutor.shutdown();
}
}

public static class LookupDataResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Clock;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class PulsarClientImpl implements PulsarClient {
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
private ExecutorService binaryProtoLookupExecutor = null;

private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
Expand Down Expand Up @@ -221,8 +223,10 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup);
} else {
binaryProtoLookupExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(), binaryProtoLookupExecutor);
}
if (timer == null) {
this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -976,6 +980,16 @@ private void shutdownExecutors() throws PulsarClientException {
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (binaryProtoLookupExecutor != null && !binaryProtoLookupExecutor.isShutdown()) {
try {
binaryProtoLookupExecutor.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown binaryProtoLookupExecutor", t);
pulsarClientException = PulsarClientException.unwrap(t);
}
}

if (pulsarClientException != null) {
throw pulsarClientException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class BinaryProtoLookupServiceTest {
@AfterMethod
public void cleanup() throws Exception {
internalExecutor.shutdownNow();
lookup.close();
}

@BeforeMethod
Expand Down Expand Up @@ -91,8 +92,9 @@ public void setup() throws Exception {
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor"));
doReturn(internalExecutor).when(client).getInternalExecutorService();

lookup = spy(
new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class)));
lookup = spy(new BinaryProtoLookupService(client, "pulsar://localhost:6650", null, false,
mock(ExecutorService.class), internalExecutor));

topicName = TopicName.get("persistent://tenant1/ns1/t1");
}

Expand Down

0 comments on commit df0949b

Please sign in to comment.