Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close client connectionon tooManyRequest and internal-server error #274

Merged
merged 2 commits into from
Mar 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1030,11 +1030,11 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
clientAppId, dn.toString(), authException.getMessage()));
}
} catch (Exception ex) {
// unknown error marked as internal server error
// throw without wrapping to PulsarClientException that considers: unknown error marked as internal
// server error
log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
dn.toString(), ex.getMessage(), ex);
throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
clientAppId, dn.toString(), ex.getMessage()));
throw ex;
}
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(),
dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.backlogQuotaChecker = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());

this.dynamicConfigurationCache = new ZooKeeperDataCache<Map<String, String>>(pulsar().getLocalZkCache()) {
@Override
public Map<String, String> deserialize(String key, byte[] content) throws Exception {
Expand Down Expand Up @@ -852,7 +851,7 @@ public Map<String, PersistentTopicStats> getTopicStats() {
public AuthenticationService getAuthenticationService() {
return authenticationService;
}

public List<PersistentTopic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
return multiLayerTopicsMap.get(namespace).get(bundle).values();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,17 @@ protected void validateSuperUserAccess() {
* if not authorized
*/
protected void validateAdminAccessOnProperty(String property) {
validateAdminAccessOnProperty(pulsar(), clientAppId(), property);
try {
validateAdminAccessOnProperty(pulsar(), clientAppId(), property);
} catch (RestException e) {
throw e;
} catch (Exception e) {
log.error("Failed to get property admin data for property");
throw new RestException(e);
}
}

protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) {
protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{
if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property,
(isClientAuthenticated(clientAppId)), clientAppId);
Expand All @@ -178,9 +185,6 @@ protected static void validateAdminAccessOnProperty(PulsarService pulsar, String
} catch (KeeperException.NoNodeException e) {
log.warn("Failed to get property admin data for non existing property {}", property);
throw new RestException(Status.UNAUTHORIZED, "Property does not exist");
} catch (Exception e) {
log.error("Failed to get property admin data for property");
throw new RestException(e);
}

if (!propertyAdmin.getAdminRoles().contains(clientAppId)) {
Expand Down Expand Up @@ -565,20 +569,16 @@ protected void checkConnect(DestinationName destination) throws RestException, E
checkAuthorization(pulsar(), destination, clientAppId());
}

protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role) throws RestException, Exception{
protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role)
throws RestException, Exception {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
// No enforcing of authorization policies
return;
}
try {
// get zk policy manager
if (!pulsarService.getBrokerService().getAuthorizationManager().canLookup(destination, role)) {
log.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
}
} catch (RestException e) {
// Let it through
throw e;
// get zk policy manager
if (!pulsarService.getBrokerService().getAuthorizationManager().canLookup(destination, role)) {
log.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,5 +728,4 @@ public void testLookupThrottlingForClientByClient() throws Exception {
// ok as throttling set to 0
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand All @@ -43,6 +48,7 @@
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.namespace.OwnershipCache;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
Expand Down Expand Up @@ -75,23 +81,22 @@ protected void setup() throws Exception {
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider
public Object[][] subType() {
return new Object[][] {{SubscriptionType.Shared}, {SubscriptionType.Failover}};
return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } };
}


/**
* Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle.
*
* <pre>
* 1. after disabling broker fron loadbalancer
* 2. unload namespace-bundle "my-ns1" which disconnects client (producer/consumer) connected on that namespacebundle
* 3. but doesn't close the connection for namesapce-bundle "my-ns2" and clients are still connected
* 4. verifies unloaded "my-ns1" should not connected again with the broker as broker is disabled
* 5. unload "my-ns2" which closes the connection as broker doesn't have any more client connected on that connection
* 6. all namespace-bundles are in "connecting" state and waiting for available broker
*
* </pre>
*
* @throws Exception
*/
Expand All @@ -105,7 +110,8 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception {

final String dn1 = "persistent://" + ns1 + "/my-topic";
final String dn2 = "persistent://" + ns2 + "/my-topic";
ConsumerImpl cons1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration());
ConsumerImpl cons1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name",
new ConsumerConfiguration());
ProducerImpl prod1 = (ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration());
ProducerImpl prod2 = (ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration());
ConsumerImpl consumer1 = spy(cons1);
Expand Down Expand Up @@ -182,7 +188,6 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception {
assertTrue(prod2.getClientCnx() != null);
assertTrue(prod2.getState().equals(State.Ready));


// unload ns-bundle2 as well
pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle2);
verify(producer2, atLeastOnce()).connectionClosed(anyObject());
Expand All @@ -208,10 +213,9 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception {

}


/**
* Verifies: 1. Closing of Broker service unloads all bundle gracefully and there must not be any connected bundles
* after closing broker service
* after closing broker service
*
* @throws Exception
*/
Expand All @@ -225,18 +229,19 @@ public void testCloseBrokerService() throws Exception {

final String dn1 = "persistent://" + ns1 + "/my-topic";
final String dn2 = "persistent://" + ns2 + "/my-topic";

ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration());

ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name",
new ConsumerConfiguration());
ProducerImpl producer1 = (ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration());
ProducerImpl producer2 = (ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration());

//unload all other namespace
// unload all other namespace
pulsar.getBrokerService().close();

// [1] OwnershipCache should not contain any more namespaces
OwnershipCache ownershipCache = pulsar.getNamespaceService().getOwnershipCache();
assertTrue(ownershipCache.getOwnedBundles().keySet().isEmpty());

// [2] All clients must be disconnected and in connecting state
// producer1 must not be able to connect again
assertTrue(producer1.getClientCnx() == null);
Expand All @@ -247,11 +252,11 @@ public void testCloseBrokerService() throws Exception {
// producer2 must not be able to connect again
assertTrue(producer2.getClientCnx() == null);
assertTrue(producer2.getState().equals(State.Connecting));

producer1.close();
producer2.close();
consumer1.close();

}

/**
Expand Down Expand Up @@ -449,6 +454,105 @@ public void testResetCursor(SubscriptionType subType) throws Exception {
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
}

/**
* <pre>
* Verifies: that client-cnx gets closed when server gives TooManyRequestException in certain time frame
* 1. Client1: which has set MaxNumberOfRejectedRequestPerConnection=0
* 2. Client2: which has set MaxNumberOfRejectedRequestPerConnection=100
* 3. create multiple producer and make lookup-requests simultaneously
* 4. Client1 receives TooManyLookupException and should close connection
* </pre>
*
* @throws Exception
*/
@Test(timeOut = 5000)
public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {

final PulsarClient pulsarClient;
final PulsarClient pulsarClient2;

final String topicName = "persistent://prop/usw/my-ns/newTopic";

final int concurrentLookupRequests = 20;
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setMaxNumberOfRejectedRequestPerConnection(0);
stopBroker();
pulsar.getConfiguration().setMaxConcurrentLookupRequest(1);
startBroker();
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);

ClientConfiguration clientConf2 = new ClientConfiguration();
clientConf2.setStatsInterval(0, TimeUnit.SECONDS);
clientConf2.setIoThreads(concurrentLookupRequests);
clientConf2.setConnectionsPerBroker(20);
pulsarClient2 = PulsarClient.create(lookupUrl, clientConf2);

ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests);
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
pulsarClient2.createProducerAsync(topicName).handle((ok, e) -> {
return null;
});
pulsarClient.createProducerAsync(topicName).handle((ok, e) -> {
return null;
});

});
if (!cnx.channel().isActive()) {
break;
}
if (i % 10 == 0) {
Thread.sleep(100);
}
}
// connection must be closed
assertFalse(cnx.channel().isActive());
pulsarClient.close();
pulsarClient2.close();
}

/**
* It verifies that client closes the connection on internalSerevrError which is "ServiceNotReady" from Broker-side
*
* @throws Exception
*/
@Test(timeOut = 5000)
public void testCloseConnectionOnInternalServerError() throws Exception {

try {
final PulsarClient pulsarClient;

final String topicName = "persistent://prop/usw/my-ns/newTopic";

ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);

ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
// this will throw NPE at broker while authorizing and it will throw InternalServerError
pulsar.getConfiguration().setAuthorizationEnabled(true);
try {
pulsarClient.createProducer(topicName);
fail("it should have fail with lookup-exception:");
} catch (Exception e) {
// ok
}
// connection must be closed
assertFalse(cnx.channel().isActive());
pulsarClient.close();
} finally {
pulsar.getConfiguration().setAuthorizationEnabled(false);
}
}

private static class TimestampEntryCount {
private final long timestamp;
private int numMessages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ClientConfiguration implements Serializable {
private String tlsTrustCertsFilePath = "";
private boolean tlsAllowInsecureConnection = false;
private int concurrentLookupRequest = 5000;
private int maxNumberOfRejectedRequestPerConnection = 50;

/**
* @return the authentication provider to be used
Expand Down Expand Up @@ -330,4 +331,25 @@ public int getConcurrentLookupRequest() {
public void setConcurrentLookupRequest(int concurrentLookupRequest) {
this.concurrentLookupRequest = concurrentLookupRequest;
}

/**
* Get configured max number of reject-request in a time-frame (30 seconds) after which connection will be closed
*
* @return
*/
public int getMaxNumberOfRejectedRequestPerConnection() {
return maxNumberOfRejectedRequestPerConnection;
}

/**
* Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
* will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
* 50)</i>
*
* @param maxNumberOfRejectedRequestPerConnection
*/
public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
this.maxNumberOfRejectedRequestPerConnection = maxNumberOfRejectedRequestPerConnection;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
lookupDataResult.redirect, lookupDataResult.partitions, e.getMessage())));
}
}).exceptionally((e) -> {
log.warn("[{}] failed to get Partitioned metadata : {}", destination.toString(), e.getMessage(), e);
log.warn("[{}] failed to get Partitioned metadata : {}", destination.toString(),
e.getCause().getMessage(), e);
partitionFuture.completeExceptionally(e);
return null;
});
Expand Down
Loading