Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server-side lookup throttling #181

Merged
merged 2 commits into from
Mar 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ tlsAllowInsecureConnection=false
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=10000

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public class ServiceConfiguration implements PulsarConfiguration{
// messages to consumer once, this limit reaches until consumer starts acknowledging messages back
// Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction
private int maxUnackedMessagesPerConsumer = 50000;
// Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
@FieldContext(dynamic = true)
private int maxConcurrentLookupRequest = 10000;

/***** --- TLS --- ****/
// Enable TLS
Expand Down Expand Up @@ -415,6 +418,14 @@ public void setMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer)
this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer;
}

public int getMaxConcurrentLookupRequest() {
return maxConcurrentLookupRequest;
}

public void setMaxConcurrentLookupRequest(int maxConcurrentLookupRequest) {
this.maxConcurrentLookupRequest = maxConcurrentLookupRequest;
}

public boolean isTlsEnabled() {
return tlsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
@Suspended AsyncResponse asyncResponse) {
dest = Codec.decode(dest);
DestinationName topic = DestinationName.get("persistent", property, cluster, namespace, dest);

if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topic);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}

try {
validateClusterOwnership(topic.getCluster());
Expand All @@ -75,12 +81,12 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
} catch (WebApplicationException we) {
// Validation checks failed
log.error("Validation check failed: {}", we.getMessage());
asyncResponse.resume(we);
completeLookupResponseExceptionally(asyncResponse, we);
return;
} catch (Throwable t) {
// Validation checks failed with unknown error
log.error("Validation check failed: {}", t.getMessage(), t);
asyncResponse.resume(new RestException(t));
completeLookupResponseExceptionally(asyncResponse, new RestException(t));
return;
}

Expand All @@ -90,7 +96,7 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
lookupFuture.thenAccept(result -> {
if (result == null) {
log.warn("No broker was found available for topic {}", topic);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}

Expand All @@ -105,24 +111,24 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
topic.getLookupName(), newAuthoritative));
} catch (URISyntaxException e) {
log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e);
asyncResponse.resume(e);
completeLookupResponseExceptionally(asyncResponse, e);
return;
}
if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topic, redirect);
}
asyncResponse.resume(new WebApplicationException(Response.temporaryRedirect(redirect).build()));
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.temporaryRedirect(redirect).build()));

} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
log.debug("Lookup succeeded for topic {} -- broker: {}", topic, result.getLookupData());
}
asyncResponse.resume(result.getLookupData());
completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
}
}).exceptionally(exception -> {
log.warn("Failed to lookup broker for topic {}: {}", topic, exception.getMessage(), exception);
asyncResponse.resume(exception);
completeLookupResponseExceptionally(asyncResponse, exception);
return null;
});

Expand Down Expand Up @@ -236,6 +242,16 @@ public static CompletableFuture<ByteBuf> lookupDestinationAsync(PulsarService pu

return lookupfuture;
}

private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(t);
}

private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(lookupData);
}

private static final Logger log = LoggerFactory.getLogger(DestinationLookup.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.apache.bookkeeper.client.BookKeeper.DigestType;
Expand Down Expand Up @@ -128,6 +130,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private AuthorizationManager authorizationManager = null;
private final ScheduledExecutorService statsUpdater;
private final ScheduledExecutorService backlogQuotaChecker;

protected final AtomicReference<Semaphore> lookupRequestSemaphore;

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
Expand Down Expand Up @@ -206,7 +210,10 @@ public Map<String, String> deserialize(String key, byte[] content) throws Except
return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
}
};
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), true));

PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}
Expand Down Expand Up @@ -619,6 +626,10 @@ public List<Metrics> getDestinationMetrics() {
public Map<String, NamespaceBundleStats> getBundleStats() {
return pulsarStats.getBundleStats();
}

public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}

public void checkGC(int gcIntervalInSeconds) {
topics.forEach((n, t) -> {
Expand Down Expand Up @@ -841,7 +852,7 @@ public Map<String, PersistentTopicStats> getTopicStats() {
public AuthenticationService getAuthenticationService() {
return authenticationService;
}

public List<PersistentTopic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
return multiLayerTopicsMap.get(namespace).get(bundle).values();
}
Expand All @@ -857,7 +868,10 @@ public ZooKeeperDataCache<Map<String, String>> getDynamicConfigurationCache() {
private void updateConfigurationAndRegisterListeners() {
// update ServiceConfiguration value by reading zk-configuration-map
updateDynamicServiceConfiguration();
//add more listeners here
// add listener on "maxConcurrentLookupRequest" value change
registerConfigurationListener("maxConcurrentLookupRequest",
(pendingLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) pendingLookupRequest, true)));
// add more listeners here
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,4 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
return PulsarApi.ServerError.UnknownError;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,28 @@ protected void handleLookup(CommandLookupTopic lookup) {
}
final long requestId = lookup.getRequestId();
final String topic = lookup.getTopic();
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> {
ctx.writeAndFlush(lookupResponse);
}).exceptionally(ex -> {
// it should never happen
log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
ctx.writeAndFlush(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
if (service.getLookupRequestSemaphore().tryAcquire()) {
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
// it should never happen
log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
ctx.writeAndFlush(
newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
}
service.getLookupRequestSemaphore().release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed lookup due to too many lookup-requets {}", remoteAddress, topic);
}
ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}

}

@Override
Expand All @@ -177,24 +190,36 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
}
final long requestId = partitionMetadata.getRequestId();
final String topic = partitionMetadata.getTopic();
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
.thenAccept(metadata -> {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
}).exceptionally(ex -> {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, topic,
ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId));
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
ex.getMessage(), ex);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
}
return null;
});
if (service.getLookupRequestSemaphore().tryAcquire()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid indenting the whole section, what about doing:

if (!service.getLookupRequestSemaphore().tryAcquire()) {
    /// deal with error
    return;
}

// continue normal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, here I have replaced thenAccept/exceptionally with handle(data, ex) so, don't have to keep duplicate service.getLookupRequestSemaphore().release(). So, this change anyway changed entire section so, kept into if/else.

getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
.handle((metadata, ex) -> {
if (ex == null) {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
} else {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress,
topic, ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId));
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
ex.getMessage(), ex);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
}
}
service.getLookupRequestSemaphore().release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requets {}", remoteAddress,
topic);
}
ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}
}

@Override
Expand Down Expand Up @@ -543,7 +568,6 @@ protected void handleProducer(final CommandProducer cmdProducer) {
});
}


@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
checkArgument(state == State.Connected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
Expand Down Expand Up @@ -108,6 +110,7 @@ public void setUp() throws Exception {
BrokerService brokerService = mock(BrokerService.class);
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(auth).when(brokerService).getAuthorizationManager();
doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore();
}

@Test
Expand All @@ -134,6 +137,35 @@ public void crossColoLookup() throws Exception {
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
}


@Test
public void testNotEnoughLookupPermits() throws Exception {

BrokerService brokerService = pulsar.getBrokerService();
doReturn(new Semaphore(0)).when(brokerService).getLookupRequestSemaphore();

DestinationLookup destLookup = spy(new DestinationLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
doReturn(true).when(config).isAuthorizationEnabled();

AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupDestinationAsync("myprop", "usc", "ns2", "topic1", false, asyncResponse1);

ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse1).resume(arg.capture());
assertEquals(arg.getValue().getClass(), WebApplicationException.class);
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.SERVICE_UNAVAILABLE.getStatusCode());
}

@Test
public void testValidateReplicationSettingsOnNamespace() throws Exception {
Expand Down
Loading