From 690b267161c51a5bfd43b3793a6fdb5425540e73 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Mon, 30 Jan 2017 17:18:46 -0800 Subject: [PATCH 1/2] Add server-side lookup throttling --- .../java/com/yahoo/pulsar/broker/service/BrokerService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java index cfd9bc658eb0d..f6856c4ef9f41 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java @@ -140,6 +140,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener>(pulsar().getLocalZkCache()) { @Override public Map deserialize(String key, byte[] content) throws Exception { @@ -853,6 +856,7 @@ public AuthenticationService getAuthenticationService() { return authenticationService; } + public List getAllTopicsFromNamespaceBundle(String namespace, String bundle) { return multiLayerTopicsMap.get(namespace).get(bundle).values(); } From 0fe349b241eb6d216c5fe414fe400e576a37429d Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Fri, 3 Mar 2017 10:29:22 -0800 Subject: [PATCH 2/2] close client connection tooManyRequest and internal-server error --- .../pulsar/broker/admin/PersistentTopics.java | 6 +- .../pulsar/broker/service/BrokerService.java | 5 - .../pulsar/broker/web/PulsarWebResource.java | 30 ++-- .../broker/service/BrokerServiceTest.java | 3 +- .../impl/BrokerClientIntegrationTest.java | 134 ++++++++++++++++-- .../client/api/ClientConfiguration.java | 22 +++ .../client/impl/BinaryProtoLookupService.java | 3 +- .../yahoo/pulsar/client/impl/ClientCnx.java | 49 ++++++- 8 files changed, 208 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java index 794cf9783cdb4..b0de3644fce75 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java @@ -1030,11 +1030,11 @@ public static CompletableFuture getPartitionedTopicMet clientAppId, dn.toString(), authException.getMessage())); } } catch (Exception ex) { - // unknown error marked as internal server error + // throw without wrapping to PulsarClientException that considers: unknown error marked as internal + // server error log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId, dn.toString(), ex.getMessage(), ex); - throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s", - clientAppId, dn.toString(), ex.getMessage())); + throw ex; } String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(), dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName()); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java index f6856c4ef9f41..05b4297da4599 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java @@ -140,9 +140,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener>(pulsar().getLocalZkCache()) { @Override public Map deserialize(String key, byte[] content) throws Exception { @@ -855,7 +851,6 @@ public Map getTopicStats() { public AuthenticationService getAuthenticationService() { return authenticationService; } - public List getAllTopicsFromNamespaceBundle(String namespace, String bundle) { return multiLayerTopicsMap.get(namespace).get(bundle).values(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java index 7f94c290eb94b..ac6b924ea7bd4 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java @@ -154,10 +154,17 @@ protected void validateSuperUserAccess() { * if not authorized */ protected void validateAdminAccessOnProperty(String property) { - validateAdminAccessOnProperty(pulsar(), clientAppId(), property); + try { + validateAdminAccessOnProperty(pulsar(), clientAppId(), property); + } catch (RestException e) { + throw e; + } catch (Exception e) { + log.error("Failed to get property admin data for property"); + throw new RestException(e); + } } - protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) { + protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{ if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) { log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property, (isClientAuthenticated(clientAppId)), clientAppId); @@ -178,9 +185,6 @@ protected static void validateAdminAccessOnProperty(PulsarService pulsar, String } catch (KeeperException.NoNodeException e) { log.warn("Failed to get property admin data for non existing property {}", property); throw new RestException(Status.UNAUTHORIZED, "Property does not exist"); - } catch (Exception e) { - log.error("Failed to get property admin data for property"); - throw new RestException(e); } if (!propertyAdmin.getAdminRoles().contains(clientAppId)) { @@ -565,20 +569,16 @@ protected void checkConnect(DestinationName destination) throws RestException, E checkAuthorization(pulsar(), destination, clientAppId()); } - protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role) throws RestException, Exception{ + protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role) + throws RestException, Exception { if (!pulsarService.getConfiguration().isAuthorizationEnabled()) { // No enforcing of authorization policies return; } - try { - // get zk policy manager - if (!pulsarService.getBrokerService().getAuthorizationManager().canLookup(destination, role)) { - log.warn("[{}] Role {} is not allowed to lookup topic", destination, role); - throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace"); - } - } catch (RestException e) { - // Let it through - throw e; + // get zk policy manager + if (!pulsarService.getBrokerService().getAuthorizationManager().canLookup(destination, role)) { + log.warn("[{}] Role {} is not allowed to lookup topic", destination, role); + throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace"); } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java index f9910b484deb1..32e2c2709d82c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java @@ -728,5 +728,4 @@ public void testLookupThrottlingForClientByClient() throws Exception { // ok as throttling set to 0 } } - -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java index dae05f424f47e..6c6dba930c904 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -21,15 +21,20 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.lang.reflect.Field; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -43,6 +48,7 @@ import com.google.common.collect.Sets; import com.yahoo.pulsar.broker.namespace.OwnershipCache; import com.yahoo.pulsar.broker.service.Topic; +import com.yahoo.pulsar.client.api.ClientConfiguration; import com.yahoo.pulsar.client.api.Consumer; import com.yahoo.pulsar.client.api.ConsumerConfiguration; import com.yahoo.pulsar.client.api.Message; @@ -75,23 +81,22 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { super.internalCleanup(); } - + @DataProvider public Object[][] subType() { - return new Object[][] {{SubscriptionType.Shared}, {SubscriptionType.Failover}}; + return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } }; } - /** * Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle. - * + *
      * 1. after disabling broker fron loadbalancer
      * 2. unload namespace-bundle "my-ns1" which disconnects client (producer/consumer) connected on that namespacebundle
      * 3. but doesn't close the connection for namesapce-bundle "my-ns2" and clients are still connected
      * 4. verifies unloaded "my-ns1" should not connected again with the broker as broker is disabled
      * 5. unload "my-ns2" which closes the connection as broker doesn't have any more client connected on that connection
      * 6. all namespace-bundles are in "connecting" state and waiting for available broker
-     * 
+     * 
* * @throws Exception */ @@ -105,7 +110,8 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception { final String dn1 = "persistent://" + ns1 + "/my-topic"; final String dn2 = "persistent://" + ns2 + "/my-topic"; - ConsumerImpl cons1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration()); + ConsumerImpl cons1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", + new ConsumerConfiguration()); ProducerImpl prod1 = (ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration()); ProducerImpl prod2 = (ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration()); ConsumerImpl consumer1 = spy(cons1); @@ -182,7 +188,6 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception { assertTrue(prod2.getClientCnx() != null); assertTrue(prod2.getState().equals(State.Ready)); - // unload ns-bundle2 as well pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle2); verify(producer2, atLeastOnce()).connectionClosed(anyObject()); @@ -208,10 +213,9 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception { } - /** * Verifies: 1. Closing of Broker service unloads all bundle gracefully and there must not be any connected bundles - * after closing broker service + * after closing broker service * * @throws Exception */ @@ -225,18 +229,19 @@ public void testCloseBrokerService() throws Exception { final String dn1 = "persistent://" + ns1 + "/my-topic"; final String dn2 = "persistent://" + ns2 + "/my-topic"; - - ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration()); + + ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", + new ConsumerConfiguration()); ProducerImpl producer1 = (ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration()); ProducerImpl producer2 = (ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration()); - //unload all other namespace + // unload all other namespace pulsar.getBrokerService().close(); // [1] OwnershipCache should not contain any more namespaces OwnershipCache ownershipCache = pulsar.getNamespaceService().getOwnershipCache(); assertTrue(ownershipCache.getOwnedBundles().keySet().isEmpty()); - + // [2] All clients must be disconnected and in connecting state // producer1 must not be able to connect again assertTrue(producer1.getClientCnx() == null); @@ -247,11 +252,11 @@ public void testCloseBrokerService() throws Exception { // producer2 must not be able to connect again assertTrue(producer2.getClientCnx() == null); assertTrue(producer2.getState().equals(State.Connecting)); - + producer1.close(); producer2.close(); consumer1.close(); - + } /** @@ -449,6 +454,105 @@ public void testResetCursor(SubscriptionType subType) throws Exception { Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset"); } + /** + *
+     * Verifies: that client-cnx gets closed when server gives TooManyRequestException in certain time frame
+     * 1. Client1: which has set MaxNumberOfRejectedRequestPerConnection=0
+     * 2. Client2: which has set MaxNumberOfRejectedRequestPerConnection=100
+     * 3. create multiple producer and make lookup-requests simultaneously
+     * 4. Client1 receives TooManyLookupException and should close connection
+     * 
+ * + * @throws Exception + */ + @Test(timeOut = 5000) + public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { + + final PulsarClient pulsarClient; + final PulsarClient pulsarClient2; + + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + final int concurrentLookupRequests = 20; + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + clientConf.setMaxNumberOfRejectedRequestPerConnection(0); + stopBroker(); + pulsar.getConfiguration().setMaxConcurrentLookupRequest(1); + startBroker(); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + pulsarClient = PulsarClient.create(lookupUrl, clientConf); + + ClientConfiguration clientConf2 = new ClientConfiguration(); + clientConf2.setStatsInterval(0, TimeUnit.SECONDS); + clientConf2.setIoThreads(concurrentLookupRequests); + clientConf2.setConnectionsPerBroker(20); + pulsarClient2 = PulsarClient.create(lookupUrl, clientConf2); + + ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName); + ClientCnx cnx = producer.cnx(); + assertTrue(cnx.channel().isActive()); + ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests); + for (int i = 0; i < 100; i++) { + executor.submit(() -> { + pulsarClient2.createProducerAsync(topicName).handle((ok, e) -> { + return null; + }); + pulsarClient.createProducerAsync(topicName).handle((ok, e) -> { + return null; + }); + + }); + if (!cnx.channel().isActive()) { + break; + } + if (i % 10 == 0) { + Thread.sleep(100); + } + } + // connection must be closed + assertFalse(cnx.channel().isActive()); + pulsarClient.close(); + pulsarClient2.close(); + } + + /** + * It verifies that client closes the connection on internalSerevrError which is "ServiceNotReady" from Broker-side + * + * @throws Exception + */ + @Test(timeOut = 5000) + public void testCloseConnectionOnInternalServerError() throws Exception { + + try { + final PulsarClient pulsarClient; + + final String topicName = "persistent://prop/usw/my-ns/newTopic"; + + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); + pulsarClient = PulsarClient.create(lookupUrl, clientConf); + + ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName); + ClientCnx cnx = producer.cnx(); + assertTrue(cnx.channel().isActive()); + // this will throw NPE at broker while authorizing and it will throw InternalServerError + pulsar.getConfiguration().setAuthorizationEnabled(true); + try { + pulsarClient.createProducer(topicName); + fail("it should have fail with lookup-exception:"); + } catch (Exception e) { + // ok + } + // connection must be closed + assertFalse(cnx.channel().isActive()); + pulsarClient.close(); + } finally { + pulsar.getConfiguration().setAuthorizationEnabled(false); + } + } + private static class TimestampEntryCount { private final long timestamp; private int numMessages; diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java index 6a2ab81035b4a..8b8ca088d5b53 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ClientConfiguration.java @@ -49,6 +49,7 @@ public class ClientConfiguration implements Serializable { private String tlsTrustCertsFilePath = ""; private boolean tlsAllowInsecureConnection = false; private int concurrentLookupRequest = 5000; + private int maxNumberOfRejectedRequestPerConnection = 50; /** * @return the authentication provider to be used @@ -330,4 +331,25 @@ public int getConcurrentLookupRequest() { public void setConcurrentLookupRequest(int concurrentLookupRequest) { this.concurrentLookupRequest = concurrentLookupRequest; } + + /** + * Get configured max number of reject-request in a time-frame (30 seconds) after which connection will be closed + * + * @return + */ + public int getMaxNumberOfRejectedRequestPerConnection() { + return maxNumberOfRejectedRequestPerConnection; + } + + /** + * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection + * will be closed and client creates a new connection that give chance to connect a different broker (default: + * 50) + * + * @param maxNumberOfRejectedRequestPerConnection + */ + public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) { + this.maxNumberOfRejectedRequestPerConnection = maxNumberOfRejectedRequestPerConnection; + } + } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BinaryProtoLookupService.java index 98e905ad2532c..5697dcd2e60d0 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BinaryProtoLookupService.java @@ -144,7 +144,8 @@ private CompletableFuture getPartitionedTopicMetadata( lookupDataResult.redirect, lookupDataResult.partitions, e.getMessage()))); } }).exceptionally((e) -> { - log.warn("[{}] failed to get Partitioned metadata : {}", destination.toString(), e.getMessage(), e); + log.warn("[{}] failed to get Partitioned metadata : {}", destination.toString(), + e.getCause().getMessage(), e); partitionFuture.completeExceptionally(e); return null; }); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java index 51ac4e08eac25..a498cc0adc9da 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.Promise; public class ClientCnx extends PulsarHandler { @@ -61,6 +63,12 @@ public class ClientCnx extends PulsarHandler { private final CompletableFuture connectionFuture = new CompletableFuture(); private final Semaphore pendingLookupRequestSemaphore; + private final EventLoopGroup eventLoopGroup; + private static final AtomicIntegerFieldUpdater NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(ClientCnx.class, "numberOfRejectRequests"); + private volatile int numberOfRejectRequests = 0; + private final int maxNumberOfRejectedRequestPerConnection; + private final int rejectedRequestResetTimeSec = 60; enum State { None, SentConnectFrame, Ready @@ -70,8 +78,10 @@ public ClientCnx(PulsarClientImpl pulsarClient) { super(30, TimeUnit.SECONDS); this.pendingLookupRequestSemaphore = new Semaphore(pulsarClient.getConfiguration().getConcurrentLookupRequest(), true); - authentication = pulsarClient.getConfiguration().getAuthentication(); - state = State.None; + this.authentication = pulsarClient.getConfiguration().getAuthentication(); + this.eventLoopGroup = pulsarClient.eventLoopGroup(); + this.maxNumberOfRejectedRequestPerConnection = pulsarClient.getConfiguration().getMaxNumberOfRejectedRequestPerConnection(); + this.state = State.None; } @Override @@ -109,7 +119,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { // Fail out all the pending ops pendingRequests.forEach((key, future) -> future.completeExceptionally(e)); - pendingLookupRequests.forEach((key, future) -> getAndRemovePendingLookupRequest(key).completeExceptionally(e)); + pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e)); // Notify all attached producers/consumers so they have a chance to reconnect producers.forEach((id, producer) -> producer.connectionClosed(this)); @@ -212,6 +222,7 @@ protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) { // Complete future with exception if : Result.response=fail/null if (!lookupResult.hasResponse() || CommandLookupTopicResponse.LookupType.Failed.equals(lookupResult.getResponse())) { if (lookupResult.hasError()) { + checkServerError(lookupResult.getError(), lookupResult.getMessage()); requestFuture.completeExceptionally( getPulsarClientException(lookupResult.getError(), lookupResult.getMessage())); } else { @@ -240,6 +251,7 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l // Complete future with exception if : Result.response=fail/null if (!lookupResult.hasResponse() || CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals(lookupResult.getResponse())) { if (lookupResult.hasError()) { + checkServerError(lookupResult.getError(), lookupResult.getMessage()); requestFuture.completeExceptionally( getPulsarClientException(lookupResult.getError(), lookupResult.getMessage())); } else { @@ -338,6 +350,7 @@ CompletableFuture newLookup(ByteBuf request, long requestId) { if (!writeFuture.isSuccess()) { log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId, writeFuture.cause().getMessage()); + getAndRemovePendingLookupRequest(requestId); future.completeExceptionally(writeFuture.cause()); } }); @@ -377,12 +390,42 @@ CompletableFuture sendRequestWithId(ByteBuf cmd, long requestId) { ctx.writeAndFlush(cmd).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { log.warn("{} Failed to send request to broker: {}", ctx.channel(), writeFuture.cause().getMessage()); + pendingRequests.remove(requestId); future.completeExceptionally(writeFuture.cause()); } }); return future; } + /** + * check serverError and take appropriate action + *
    + *
  • InternalServerError: close connection immediately
  • + *
  • TooManyRequest: received error count is more than maxNumberOfRejectedRequestPerConnection in + * #rejectedRequestResetTimeSec
  • + *
+ * + * @param error + * @param errMsg + */ + private void checkServerError(ServerError error, String errMsg) { + if (ServerError.ServiceNotReady.equals(error)) { + log.error("{} Close connection becaues received internal-server error {}", ctx.channel(), errMsg); + ctx.close(); + } else if (ServerError.TooManyRequests.equals(error)) { + long rejectedRequests = NUMBER_OF_REJECTED_REQUESTS_UPDATER.getAndIncrement(this); + if (rejectedRequests == 0) { + // schedule timer + eventLoopGroup.schedule(() -> NUMBER_OF_REJECTED_REQUESTS_UPDATER.set(ClientCnx.this, 0), + rejectedRequestResetTimeSec, TimeUnit.SECONDS); + } else if (rejectedRequests >= maxNumberOfRejectedRequestPerConnection) { + log.error("{} Close connection becaues received {} rejected request in {} seconds ", ctx.channel(), + NUMBER_OF_REJECTED_REQUESTS_UPDATER.get(ClientCnx.this), rejectedRequestResetTimeSec); + ctx.close(); + } + } + } + void registerConsumer(final long consumerId, final ConsumerImpl consumer) { consumers.put(consumerId, consumer); }