diff --git a/conf/broker.conf b/conf/broker.conf index 9a00ba1137076..a4273685cd99d 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -102,6 +102,9 @@ tlsAllowInsecureConnection=false # Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction maxUnackedMessagesPerConsumer=50000 +# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic +maxConcurrentLookupRequest=10000 + ### --- Authentication --- ### # Enable authentication diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java index 87553b3fe13a8..980031fa746b7 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java @@ -88,6 +88,9 @@ public class ServiceConfiguration implements PulsarConfiguration{ // messages to consumer once, this limit reaches until consumer starts acknowledging messages back // Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction private int maxUnackedMessagesPerConsumer = 50000; + // Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic + @FieldContext(dynamic = true) + private int maxConcurrentLookupRequest = 10000; /***** --- TLS --- ****/ // Enable TLS @@ -415,6 +418,14 @@ public void setMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer; } + public int getMaxConcurrentLookupRequest() { + return maxConcurrentLookupRequest; + } + + public void setMaxConcurrentLookupRequest(int maxConcurrentLookupRequest) { + this.maxConcurrentLookupRequest = maxConcurrentLookupRequest; + } + public boolean isTlsEnabled() { return tlsEnabled; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java index 019da964b88e3..1c1b3653b7149 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/lookup/DestinationLookup.java @@ -67,6 +67,12 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path @Suspended AsyncResponse asyncResponse) { dest = Codec.decode(dest); DestinationName topic = DestinationName.get("persistent", property, cluster, namespace, dest); + + if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) { + log.warn("No broker was found available for topic {}", topic); + asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE)); + return; + } try { validateClusterOwnership(topic.getCluster()); @@ -75,12 +81,12 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path } catch (WebApplicationException we) { // Validation checks failed log.error("Validation check failed: {}", we.getMessage()); - asyncResponse.resume(we); + completeLookupResponseExceptionally(asyncResponse, we); return; } catch (Throwable t) { // Validation checks failed with unknown error log.error("Validation check failed: {}", t.getMessage(), t); - asyncResponse.resume(new RestException(t)); + completeLookupResponseExceptionally(asyncResponse, new RestException(t)); return; } @@ -90,7 +96,7 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path lookupFuture.thenAccept(result -> { if (result == null) { log.warn("No broker was found available for topic {}", topic); - asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE)); + completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE)); return; } @@ -105,24 +111,24 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path topic.getLookupName(), newAuthoritative)); } catch (URISyntaxException e) { log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e); - asyncResponse.resume(e); + completeLookupResponseExceptionally(asyncResponse, e); return; } if (log.isDebugEnabled()) { log.debug("Redirect lookup for topic {} to {}", topic, redirect); } - asyncResponse.resume(new WebApplicationException(Response.temporaryRedirect(redirect).build())); + completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.temporaryRedirect(redirect).build())); } else { // Found broker owning the topic if (log.isDebugEnabled()) { log.debug("Lookup succeeded for topic {} -- broker: {}", topic, result.getLookupData()); } - asyncResponse.resume(result.getLookupData()); + completeLookupResponseSuccessfully(asyncResponse, result.getLookupData()); } }).exceptionally(exception -> { log.warn("Failed to lookup broker for topic {}: {}", topic, exception.getMessage(), exception); - asyncResponse.resume(exception); + completeLookupResponseExceptionally(asyncResponse, exception); return null; }); @@ -236,6 +242,16 @@ public static CompletableFuture lookupDestinationAsync(PulsarService pu return lookupfuture; } + + private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) { + pulsar().getBrokerService().getLookupRequestSemaphore().release(); + asyncResponse.resume(t); + } + + private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) { + pulsar().getBrokerService().getLookupRequestSemaphore().release(); + asyncResponse.resume(lookupData); + } private static final Logger log = LoggerFactory.getLogger(DestinationLookup.class); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java index aa10c9331aead..cfd9bc658eb0d 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java @@ -34,7 +34,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -128,6 +130,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener lookupRequestSemaphore; private final ScheduledExecutorService inactivityMonitor; private final ScheduledExecutorService messageExpiryMonitor; @@ -206,7 +210,10 @@ public Map deserialize(String key, byte[] content) throws Except return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class); } }; + // update dynamic configuration and register-listener updateConfigurationAndRegisterListeners(); + this.lookupRequestSemaphore = new AtomicReference( + new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), true)); PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize()); } @@ -619,6 +626,10 @@ public List getDestinationMetrics() { public Map getBundleStats() { return pulsarStats.getBundleStats(); } + + public Semaphore getLookupRequestSemaphore() { + return lookupRequestSemaphore.get(); + } public void checkGC(int gcIntervalInSeconds) { topics.forEach((n, t) -> { @@ -841,7 +852,7 @@ public Map getTopicStats() { public AuthenticationService getAuthenticationService() { return authenticationService; } - + public List getAllTopicsFromNamespaceBundle(String namespace, String bundle) { return multiLayerTopicsMap.get(namespace).get(bundle).values(); } @@ -857,7 +868,10 @@ public ZooKeeperDataCache> getDynamicConfigurationCache() { private void updateConfigurationAndRegisterListeners() { // update ServiceConfiguration value by reading zk-configuration-map updateDynamicServiceConfiguration(); - //add more listeners here + // add listener on "maxConcurrentLookupRequest" value change + registerConfigurationListener("maxConcurrentLookupRequest", + (pendingLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) pendingLookupRequest, true))); + // add more listeners here } /** diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java index 4e4653a2934ae..c426aaa50bb3f 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java @@ -124,4 +124,4 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) { return PulsarApi.ServerError.UnknownError; } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index ee8a66fbac439..544f643940c17 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -159,15 +159,28 @@ protected void handleLookup(CommandLookupTopic lookup) { } final long requestId = lookup.getRequestId(); final String topic = lookup.getTopic(); - lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), - getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> { - ctx.writeAndFlush(lookupResponse); - }).exceptionally(ex -> { - // it should never happen - log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); - ctx.writeAndFlush(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); - return null; - }); + if (service.getLookupRequestSemaphore().tryAcquire()) { + lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), + getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { + if (ex == null) { + ctx.writeAndFlush(lookupResponse); + } else { + // it should never happen + log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); + ctx.writeAndFlush( + newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); + } + service.getLookupRequestSemaphore().release(); + return null; + }); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed lookup due to too many lookup-requets {}", remoteAddress, topic); + } + ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequests, + "Failed due to too many pending lookup requests", requestId)); + } + } @Override @@ -177,24 +190,36 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa } final long requestId = partitionMetadata.getRequestId(); final String topic = partitionMetadata.getTopic(); - getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic)) - .thenAccept(metadata -> { - int partitions = metadata.partitions; - ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId)); - }).exceptionally(ex -> { - if (ex instanceof PulsarClientException) { - log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, topic, - ex.getMessage()); - ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, - ex.getMessage(), requestId)); - } else { - log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic, - ex.getMessage(), ex); - ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, - ex.getMessage(), requestId)); - } - return null; - }); + if (service.getLookupRequestSemaphore().tryAcquire()) { + getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic)) + .handle((metadata, ex) -> { + if (ex == null) { + int partitions = metadata.partitions; + ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId)); + } else { + if (ex instanceof PulsarClientException) { + log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, + topic, ex.getMessage()); + ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, + ex.getMessage(), requestId)); + } else { + log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic, + ex.getMessage(), ex); + ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, + ex.getMessage(), requestId)); + } + } + service.getLookupRequestSemaphore().release(); + return null; + }); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requets {}", remoteAddress, + topic); + } + ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequests, + "Failed due to too many pending lookup requests", requestId)); + } } @Override @@ -543,7 +568,6 @@ protected void handleProducer(final CommandProducer cmdProducer) { }); } - @Override protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { checkArgument(state == State.Connected); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java index 0b10a86f5bbbf..cc980976ac67b 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java @@ -27,6 +27,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; @@ -108,6 +110,7 @@ public void setUp() throws Exception { BrokerService brokerService = mock(BrokerService.class); doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(auth).when(brokerService).getAuthorizationManager(); + doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore(); } @Test @@ -134,6 +137,35 @@ public void crossColoLookup() throws Exception { WebApplicationException wae = (WebApplicationException) arg.getValue(); assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode()); } + + + @Test + public void testNotEnoughLookupPermits() throws Exception { + + BrokerService brokerService = pulsar.getBrokerService(); + doReturn(new Semaphore(0)).when(brokerService).getLookupRequestSemaphore(); + + DestinationLookup destLookup = spy(new DestinationLookup()); + doReturn(false).when(destLookup).isRequestHttps(); + destLookup.setPulsar(pulsar); + doReturn("null").when(destLookup).clientAppId(); + Field uriField = PulsarWebResource.class.getDeclaredField("uri"); + uriField.setAccessible(true); + UriInfo uriInfo = mock(UriInfo.class); + uriField.set(destLookup, uriInfo); + URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1"); + doReturn(uri).when(uriInfo).getRequestUri(); + doReturn(true).when(config).isAuthorizationEnabled(); + + AsyncResponse asyncResponse1 = mock(AsyncResponse.class); + destLookup.lookupDestinationAsync("myprop", "usc", "ns2", "topic1", false, asyncResponse1); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Throwable.class); + verify(asyncResponse1).resume(arg.capture()); + assertEquals(arg.getValue().getClass(), WebApplicationException.class); + WebApplicationException wae = (WebApplicationException) arg.getValue(); + assertEquals(wae.getResponse().getStatus(), Status.SERVICE_UNAVAILABLE.getStatusCode()); + } @Test public void testValidateReplicationSettingsOnNamespace() throws Exception { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceThrottlingTest.java new file mode 100644 index 0000000000000..170046cc4bc8a --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceThrottlingTest.java @@ -0,0 +1,287 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker.service; + +import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.fail; + +import java.lang.reflect.Method; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.yahoo.pulsar.client.api.Consumer; +import com.yahoo.pulsar.client.api.ConsumerConfiguration; +import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.api.PulsarClientException; +import com.yahoo.pulsar.client.api.SubscriptionType; +import com.yahoo.pulsar.client.impl.ConsumerImpl; +import com.yahoo.pulsar.common.util.ObjectMapperFactory; + +/** + */ +public class BrokerServiceThrottlingTest extends BrokerTestBase { + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + /** + * Verifies: updating zk-thottling node reflects broker-maxConcurrentLookupRequest and updates semaphore. + * + * @throws Exception + */ + @Test + public void testThrottlingLookupRequestSemaphore() throws Exception { + // create configuration znode + ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + // Now, znode is created: set the watch and listener on the znode + setWatchOnThrottlingZnode(); + + BrokerService service = pulsar.getBrokerService(); + assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0); + admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0)); + Thread.sleep(1000); + assertEquals(service.lookupRequestSemaphore.get().availablePermits(), 0); + } + + /** + * Broker has maxConcurrentLookupRequest = 0 so, it rejects incoming lookup request and it cause consumer creation + * failure. + * + * @throws Exception + */ + @Test + public void testLookupThrottlingForClientByBroker0Permit() throws Exception { + + // create configuration znode + ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + // Now, znode is created: set the watch and listener on the znode + setWatchOnThrottlingZnode(); + + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); + + ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); + Consumer consumer = pulsarClient.subscribe(topicName, "mysub", consumerConfig); + consumer.close(); + + int newPermits = 0; + admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits)); + // wait config to be updated + for (int i = 0; i < 5; i++) { + if (pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits) { + Thread.sleep(100 + (i * 10)); + } else { + break; + } + } + + try { + consumer = pulsarClient.subscribe(topicName, "mysub", consumerConfig); + consumer.close(); + fail("It should fail as throttling should not receive any request"); + } catch (com.yahoo.pulsar.client.api.PulsarClientException.TooManyLookupRequestException e) { + // ok as throttling set to 0 + } + } + + /** + * Verifies: Broker side throttling: + * + *
+     * 1. concurrent_consumer_creation > maxConcurrentLookupRequest at broker 
+     * 2. few of the consumer creation must fail with TooManyLookupRequestException.
+     * 
+ * + * @throws Exception + */ + @Test + public void testLookupThrottlingForClientByBroker() throws Exception { + + // create configuration znode + ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + // Now, znode is created: set the watch and listener on the znode + setWatchOnThrottlingZnode(); + + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + clientConf.setIoThreads(20); + clientConf.setConnectionsPerBroker(20); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); + + ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); + consumerConfig.setSubscriptionType(SubscriptionType.Shared); + + int newPermits = 1; + admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits)); + // wait config to be updated + for (int i = 0; i < 5; i++) { + if (pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits) { + Thread.sleep(100 + (i * 10)); + } else { + break; + } + } + + List successfulConsumers = Lists.newArrayList(); + ExecutorService executor = Executors.newFixedThreadPool(10); + final int totalConsumers = 20; + CountDownLatch latch = new CountDownLatch(totalConsumers); + for (int i = 0; i < totalConsumers; i++) { + executor.execute(() -> { + try { + successfulConsumers.add(pulsarClient.subscribe(topicName, "mysub", consumerConfig)); + } catch (PulsarClientException.TooManyLookupRequestException e) { + // ok + } catch (Exception e) { + fail("it shouldn't failed"); + } + latch.countDown(); + }); + } + latch.await(); + + for (int i = 0; i < successfulConsumers.size(); i++) { + successfulConsumers.get(i).close(); + } + pulsarClient.close(); + assertNotEquals(successfulConsumers.size(), totalConsumers); + } + + /** + * This testcase make sure that once consumer lost connection with broker, it always reconnects with broker by + * retrying on throttling-error exception also. + * + *
+     * 1. all consumers get connected 
+     * 2. broker restarts with maxConcurrentLookupRequest = 1 
+     * 3. consumers reconnect and some get TooManyRequestException and again retries
+     * 4. eventually all consumers will successfully connect to broker
+     * 
+ * + * @throws Exception + */ + @Test + public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exception { + + // create configuration znode + ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + // Now, znode is created: set the watch and listener on the znode + setWatchOnThrottlingZnode(); + + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + clientConf.setIoThreads(20); + clientConf.setConnectionsPerBroker(20); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); + upsertLookupPermits(100); + ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); + consumerConfig.setSubscriptionType(SubscriptionType.Shared); + List consumers = Lists.newArrayList(); + ExecutorService executor = Executors.newFixedThreadPool(10); + final int totalConsumers = 8; + CountDownLatch latch = new CountDownLatch(totalConsumers); + for (int i = 0; i < totalConsumers; i++) { + executor.execute(() -> { + try { + consumers.add(pulsarClient.subscribe(topicName, "mysub", consumerConfig)); + } catch (PulsarClientException.TooManyLookupRequestException e) { + // ok + } catch (Exception e) { + fail("it shouldn't failed"); + } + latch.countDown(); + }); + } + latch.await(); + + stopBroker(); + conf.setMaxConcurrentLookupRequest(1); + startBroker(); + // wait for consumer to reconnect + Thread.sleep(3000); + + int totalConnectedConsumers = 0; + for (int i = 0; i < consumers.size(); i++) { + if (((ConsumerImpl) consumers.get(i)).isConnected()) { + totalConnectedConsumers++; + } + consumers.get(i).close(); + + } + assertEquals(totalConnectedConsumers, totalConsumers); + + pulsarClient.close(); + } + + private void upsertLookupPermits(int permits) throws Exception { + Map throttlingMap = Maps.newHashMap(); + throttlingMap.put("maxConcurrentLookupRequest", Integer.toString(permits)); + byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(throttlingMap); + if (mockZookKeeper.exists(BROKER_SERVICE_CONFIGURATION_PATH, false) != null) { + mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1); + } else { + ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, content, + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } + + private void setWatchOnThrottlingZnode() throws Exception { + Method updateConfigListenerMethod = BrokerService.class + .getDeclaredMethod("updateConfigurationAndRegisterListeners"); + updateConfigListenerMethod.setAccessible(true); + updateConfigListenerMethod.invoke(pulsar.getBrokerService()); + } + +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java index 0953161c9b044..51ac4e08eac25 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java @@ -413,6 +413,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String return new PulsarClientException.BrokerPersistenceException(errorMsg); case ServiceNotReady: return new PulsarClientException.LookupException(errorMsg); + case TooManyRequests: + return new PulsarClientException.TooManyLookupRequestException(errorMsg); case ProducerBlockedQuotaExceededError: return new PulsarClientException.ProducerBlockedQuotaExceededError(errorMsg); case ProducerBlockedQuotaExceededException: diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java index a26e4a4fa12a1..09b468819b8a4 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java @@ -68,6 +68,7 @@ public enum ServerError TopicNotFound(11, 11), SubscriptionNotFound(12, 12), ConsumerNotFound(13, 13), + TooManyRequests(14, 14), ; public static final int UnknownError_VALUE = 0; @@ -84,6 +85,7 @@ public enum ServerError public static final int TopicNotFound_VALUE = 11; public static final int SubscriptionNotFound_VALUE = 12; public static final int ConsumerNotFound_VALUE = 13; + public static final int TooManyRequests_VALUE = 14; public final int getNumber() { return value; } @@ -104,6 +106,7 @@ public static ServerError valueOf(int value) { case 11: return TopicNotFound; case 12: return SubscriptionNotFound; case 13: return ConsumerNotFound; + case 14: return TooManyRequests; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 07da8a7878fc9..5d255126a198b 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -81,6 +81,7 @@ enum ServerError { TopicNotFound = 11; // Topic not found SubscriptionNotFound = 12; // Subscription not found ConsumerNotFound = 13; // Consumer not found + TooManyRequests = 14; // Error with too many simultaneously request } enum AuthMethod {