Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-91: Separate lookup timeout from operation timeout #11627

Merged
merged 6 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -241,7 +242,7 @@ public class PulsarService implements AutoCloseable {
private ProtocolHandlers protocolHandlers = null;

private final ShutdownService shutdownService;
private final EventLoopGroup ioEventLoopGroup;
protected final EventLoopGroup ioEventLoopGroup;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -658,7 +659,7 @@ public void start() throws PulsarServerException {
config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup
);

this.brokerService = new BrokerService(this, ioEventLoopGroup);
this.brokerService = newBrokerService(this);

// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));
Expand Down Expand Up @@ -1678,4 +1679,8 @@ private static boolean isTransactionSystemTopic(TopicName topicName) {
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

@VisibleForTesting
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -254,6 +255,7 @@ public class BrokerService implements Closeable {
@Getter
private final BundlesQuotas bundlesQuotas;

private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY;
private Channel listenChannel;
private Channel listenChannelTls;

Expand Down Expand Up @@ -410,7 +412,8 @@ public void start() throws Exception {

ServiceConfiguration serviceConfig = pulsar.getConfiguration();

bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
bootstrap.childHandler(
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, false));

Optional<Integer> port = serviceConfig.getBrokerServicePort();
if (port.isPresent()) {
Expand All @@ -427,7 +430,8 @@ public void start() throws Exception {
Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
if (tlsPort.isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, true));
tlsBootstrap.childHandler(
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, true));
try {
listenChannelTls = tlsBootstrap.bind(new InetSocketAddress(
pulsar.getBindAddress(), tlsPort.get())).sync()
Expand Down Expand Up @@ -2649,4 +2653,9 @@ public void resumedConnections(int numberOfConnections) {
public long getPausedConnections() {
return pausedConnections.longValue();
}

@VisibleForTesting
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand Down Expand Up @@ -129,7 +130,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
// ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling
// auto-read.
ch.pipeline().addLast("flowController", new FlowControlHandler());
ServerCnx cnx = new ServerCnx(pulsar);
ServerCnx cnx = newServerCnx(pulsar);
ch.pipeline().addLast("handler", cnx);

connections.put(ch.remoteAddress(), cnx);
Expand All @@ -144,4 +145,16 @@ private void refreshAuthenticationCredentials() {
}
});
}

@VisibleForTesting
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
return new ServerCnx(pulsar);
}

public interface Factory {
PulsarChannelInitializer newPulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception;
}

public static final Factory DEFAULT_FACTORY =
(pulsar, tls) -> new PulsarChannelInitializer(pulsar, tls);
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,12 @@ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuil
}

protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {

return startBrokerWithoutAuthorization(conf);
}

protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
conf.setBrokerShutdownTimeoutMs(0L);
PulsarService pulsar = spy(new PulsarService(conf));
PulsarService pulsar = spy(newPulsarService(conf));
setupBrokerMocks(pulsar);
beforePulsarStartMocks(pulsar);
pulsar.start();
Expand All @@ -295,6 +294,10 @@ protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration con
return pulsar;
}

protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
return new PulsarService(conf);
}

protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,74 +527,6 @@ public void testResetCursor(SubscriptionType subType) throws Exception {
restartBroker();
}

/**
* <pre>
* Verifies: that client-cnx gets closed when server gives TooManyRequestException in certain time frame
* 1. Client1: which has set MaxNumberOfRejectedRequestPerConnection=0
* 2. Client2: which has set MaxNumberOfRejectedRequestPerConnection=100
* 3. create multiple producer and make lookup-requests simultaneously
* 4. Client1 receives TooManyLookupException and should close connection
* </pre>
*
* @throws Exception
*/
@Test
public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {

final String topicName = "persistent://prop/usw/my-ns/newTopic";
final int maxConccurentLookupRequest = pulsar.getConfiguration().getMaxConcurrentLookupRequest();
final int concurrentLookupRequests = 20;
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests);
try {
stopBroker();
conf.setMaxConcurrentLookupRequest(1);
startBroker();
String lookupUrl = pulsar.getBrokerServiceUrl();

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
.maxNumberOfRejectedRequestPerConnection(0).build();

@Cleanup
PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
.ioThreads(concurrentLookupRequests).connectionsPerBroker(20).build();

ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create();
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());

final int totalProducer = 100;
CountDownLatch latch = new CountDownLatch(totalProducer * 2);
AtomicInteger failed = new AtomicInteger(0);
for (int i = 0; i < totalProducer; i++) {
executor.submit(() -> {
pulsarClient2.newProducer().topic(topicName).createAsync().handle((ok, e) -> {
if (e != null) {
failed.set(1);
}
latch.countDown();
return null;
});
pulsarClient.newProducer().topic(topicName).createAsync().handle((ok, e) -> {
if (e != null) {
failed.set(1);
}
latch.countDown();
return null;
});
});

}

latch.await(10, TimeUnit.SECONDS);
// connection must be closed
assertEquals(failed.get(), 1);
} finally {
conf.setMaxConcurrentLookupRequest(maxConccurentLookupRequest);
}
}

/**
* It verifies that broker throttles down configured concurrent topic loading requests
*
Expand Down Expand Up @@ -1021,4 +953,4 @@ public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Ex
consumer.close();
producer.close();
}
}
}
Loading