Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-91: Separate lookup timeout from operation timeout #11627

Merged
merged 6 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -241,7 +242,7 @@ public class PulsarService implements AutoCloseable {
private ProtocolHandlers protocolHandlers = null;

private final ShutdownService shutdownService;
private final EventLoopGroup ioEventLoopGroup;
protected final EventLoopGroup ioEventLoopGroup;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -658,7 +659,7 @@ public void start() throws PulsarServerException {
config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup
);

this.brokerService = new BrokerService(this, ioEventLoopGroup);
this.brokerService = newBrokerService(this);

// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));
Expand Down Expand Up @@ -1678,4 +1679,8 @@ private static boolean isTransactionSystemTopic(TopicName topicName) {
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

@VisibleForTesting
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -254,6 +255,7 @@ public class BrokerService implements Closeable {
@Getter
private final BundlesQuotas bundlesQuotas;

private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY;
private Channel listenChannel;
private Channel listenChannelTls;

Expand Down Expand Up @@ -410,7 +412,8 @@ public void start() throws Exception {

ServiceConfiguration serviceConfig = pulsar.getConfiguration();

bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
bootstrap.childHandler(
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, false));

Optional<Integer> port = serviceConfig.getBrokerServicePort();
if (port.isPresent()) {
Expand All @@ -427,7 +430,8 @@ public void start() throws Exception {
Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
if (tlsPort.isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, true));
tlsBootstrap.childHandler(
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, true));
try {
listenChannelTls = tlsBootstrap.bind(new InetSocketAddress(
pulsar.getBindAddress(), tlsPort.get())).sync()
Expand Down Expand Up @@ -2649,4 +2653,9 @@ public void resumedConnections(int numberOfConnections) {
public long getPausedConnections() {
return pausedConnections.longValue();
}

@VisibleForTesting
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand Down Expand Up @@ -129,7 +130,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
// ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling
// auto-read.
ch.pipeline().addLast("flowController", new FlowControlHandler());
ServerCnx cnx = new ServerCnx(pulsar);
ServerCnx cnx = newServerCnx(pulsar);
ch.pipeline().addLast("handler", cnx);

connections.put(ch.remoteAddress(), cnx);
Expand All @@ -144,4 +145,16 @@ private void refreshAuthenticationCredentials() {
}
});
}

@VisibleForTesting
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
return new ServerCnx(pulsar);
}

public interface Factory {
PulsarChannelInitializer newPulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception;
}

public static final Factory DEFAULT_FACTORY =
(pulsar, tls) -> new PulsarChannelInitializer(pulsar, tls);
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,12 @@ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuil
}

protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {

return startBrokerWithoutAuthorization(conf);
}

protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
conf.setBrokerShutdownTimeoutMs(0L);
PulsarService pulsar = spy(new PulsarService(conf));
PulsarService pulsar = spy(newPulsarService(conf));
setupBrokerMocks(pulsar);
beforePulsarStartMocks(pulsar);
pulsar.start();
Expand All @@ -295,6 +294,10 @@ protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration con
return pulsar;
}

protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
return new PulsarService(conf);
}

protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ private void checkLookupException(String tenant, String namespace, PulsarClient
.create();
} catch (PulsarClientException t) {
Assert.assertTrue(t instanceof PulsarClientException.LookupException);
Assert.assertEquals(t.getMessage(), "java.lang.IllegalStateException: The leader election has not yet been completed!");
Assert.assertTrue(
t.getMessage().contains(
"java.lang.IllegalStateException: The leader election has not yet been completed!"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -79,13 +82,18 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -879,42 +887,114 @@ public void testTlsAuthUseTrustCert() throws Exception {
*/
@Test
public void testLookupThrottlingForClientByClient() throws Exception {
// This test looks like it could be flakey, if the broker responds
// quickly enough, there may never be concurrency in requests
final String topicName = "persistent://prop/ns-abc/newTopic";

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.maxConcurrentLookupRequests(1)
.maxLookupRequests(2)
.build();
PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
resolver.updateServiceUrl(pulsar.getBrokerServiceUrl());
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConcurrentLookupRequest(1);
conf.setMaxLookupRequest(2);

EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
long reqId = 0xdeadbeef;
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
// for PMR
// 2 lookup will succeed
long reqId1 = reqId++;
ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1);
CompletableFuture<?> f1 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request1, reqId1));

long reqId2 = reqId++;
ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2);
CompletableFuture<?> f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request2, reqId2));

f1.get();
f2.get();

// 3 lookup will fail
long reqId3 = reqId++;
ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3);
f1 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request3, reqId3));

long reqId4 = reqId++;
ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4);
f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request4, reqId4));

long reqId5 = reqId++;
ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5);
CompletableFuture<?> f3 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request5, reqId5));

// 2 lookup will success.
try {
CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync();
try {
f1.get();
f2.get();
f3.get();
fail("At least one should fail");
} catch (ExecutionException e) {
Throwable rootCause = e;
while (rootCause instanceof ExecutionException) {
rootCause = rootCause.getCause();
}
if (!(rootCause instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
throw e;
}
}

consumer1.get().close();
consumer2.get().close();
} catch (Exception e) {
fail("Subscribe should success with 2 requests");
}
// for Lookup
// 2 lookup will succeed
long reqId6 = reqId++;
ByteBuf request6 = Commands.newLookup(topicName, true, reqId6);
f1 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request6, reqId6));

long reqId7 = reqId++;
ByteBuf request7 = Commands.newLookup(topicName, true, reqId7);
f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request7, reqId7));

f1.get();
f2.get();

// 3 lookup will fail
long reqId8 = reqId++;
ByteBuf request8 = Commands.newLookup(topicName, true, reqId8);
f1 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request8, reqId8));

long reqId9 = reqId++;
ByteBuf request9 = Commands.newLookup(topicName, true, reqId9);
f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request9, reqId9));

long reqId10 = reqId++;
ByteBuf request10 = Commands.newLookup(topicName, true, reqId10);
f3 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request10, reqId10));

// 3 lookup will fail
try {
CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync();

consumer1.get().close();
consumer2.get().close();
consumer3.get().close();
fail("It should fail as throttling should only receive 2 requests");
} catch (Exception e) {
if (!(e.getCause() instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
fail("Subscribe should fail with TooManyRequestsException");
try {
f1.get();
f2.get();
f3.get();
fail("At least one should fail");
} catch (ExecutionException e) {
Throwable rootCause = e;
while (rootCause instanceof ExecutionException) {
rootCause = rootCause.getCause();
}
if (!(rootCause instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
throw e;
}
}

}
}

Expand Down
Loading