Skip to content

Commit

Permalink
PIP-52: [pulsar-sever] Add support of dispatch throttling relative to…
Browse files Browse the repository at this point in the history
… publish-rate (#5797)

### Motivation
With [PIP-3](https://github.com/apache/pulsar/wiki/PIP-3:-Message-dispatch-throttling) , Pulsar broker already supports to configure dispatch rate-limiting for a given topic. Dispatch-throttling feature allows user to configure absolute dispatch rate based on current publish-rate for a given topic or subscriber, and broker will make sure to dispatch only configured number of messages to the consumers regardless current publish-rate or backlog on that topic.

Current dispatch-rate limiting doesn't consider change in publish-rate so, increasing publish-rate on the topic might be larger than configured dispatch-rate which will cause backlog on the topic and consumers will never be able to catch up the backlog unless user again reconfigured the dispatch-rate based on current publish-rate. Reconfiguring dispatch-rate based on publish-rate requires human interaction and monitoring. Therefore, we need a mechanism to configure dispatch rate relative to the current publish-rate on the topic.

### Modification
`set-dispatch-rate` cli have a flag `--relative-to-publish-rate` to enable relative dispatch throttling.

```
pulsar-admin namespaces <property/cluster/namespace> set-dispatch-rate --msg-dispatch-rate 1000 --relative-to-publish-rate
```

### Note:
I will add broker-level configuration and documentation into separate PR.
  • Loading branch information
rdhabalia authored and jiazhai committed Dec 8, 2019
1 parent 5b1ad00 commit 02bf9a0
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

Expand All @@ -44,17 +45,23 @@ public enum Type {
REPLICATOR
}

private final PersistentTopic topic;
private final String topicName;
private final Type type;

private final BrokerService brokerService;
private RateLimiter dispatchRateLimiterOnMessage;
private RateLimiter dispatchRateLimiterOnByte;
private long subscriptionRelativeRatelimiterOnMessage;
private long subscriptionRelativeRatelimiterOnByte;

public DispatchRateLimiter(PersistentTopic topic, Type type) {
this.topic = topic;
this.topicName = topic.getName();
this.brokerService = topic.getBrokerService();
this.type = type;
this.subscriptionRelativeRatelimiterOnMessage = -1;
this.subscriptionRelativeRatelimiterOnByte = -1;
updateDispatchRate();
}

Expand Down Expand Up @@ -272,14 +279,17 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
long byteRate = dispatchRate.dispatchThrottlingRateInByte;
long ratePeriod = dispatchRate.ratePeriodInSecond;

Supplier<Long> permitUpdaterMsg = dispatchRate.relativeToPublishRate
? () -> getRelativeDispatchRateInMsg(dispatchRate)
: null;
// update msg-rateLimiter
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
ratePeriod, TimeUnit.SECONDS);
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg);
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS);
TimeUnit.SECONDS, permitUpdaterMsg);
}
} else {
// message-rate should be disable and close
Expand All @@ -289,14 +299,17 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
}
}

Supplier<Long> permitUpdaterByte = dispatchRate.relativeToPublishRate
? () -> getRelativeDispatchRateInByte(dispatchRate)
: null;
// update byte-rateLimiter
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
ratePeriod, TimeUnit.SECONDS);
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte);
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS);
TimeUnit.SECONDS, permitUpdaterByte);
}
} else {
// message-rate should be disable and close
Expand All @@ -307,6 +320,18 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
}
}

private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
return (topic != null && dispatchRate != null)
? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.dispatchThrottlingRateInMsg
: 0;
}

private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
return (topic != null && dispatchRate != null)
? (long) topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.dispatchThrottlingRateInByte
: 0;
}

/**
* Get configured msg dispatch-throttling rate. Returns -1 if not configured
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ protected TopicStatsHelper initialValue() {
};

private final AtomicLong pendingWriteOps = new AtomicLong(0);
private volatile double lastUpdatedAvgPublishRateInMsg = 0;
private volatile double lastUpdatedAvgPublishRateInByte = 0;

private static class TopicStatsHelper {
public double averageMsgSize;
Expand Down Expand Up @@ -1283,7 +1285,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
}
});
topicStatsStream.endList();

// if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
// average rate.
lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg
? topicStatsHelper.aggMsgRateIn
: (topicStatsHelper.aggMsgRateIn + lastUpdatedAvgPublishRateInMsg) / 2;
lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > lastUpdatedAvgPublishRateInByte
? topicStatsHelper.aggMsgThroughputIn
: (topicStatsHelper.aggMsgThroughputIn + lastUpdatedAvgPublishRateInByte) / 2;
// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
Expand Down Expand Up @@ -1447,6 +1456,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
this.addEntryLatencyStatsUsec.reset();
}

public double getLastUpdatedAvgPublishRateInMsg() {
return lastUpdatedAvgPublishRateInMsg;
}

public double getLastUpdatedAvgPublishRateInByte() {
return lastUpdatedAvgPublishRateInByte;
}

public TopicStats getStats() {

TopicStats stats = new TopicStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RateLimiter;
Expand All @@ -38,10 +36,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;

public class SubscribeRateLimiter {

private final String topicName;
Expand Down Expand Up @@ -121,17 +115,17 @@ private synchronized void removeSubscribeLimiter(ConsumerIdentifier consumerIden
* @param subscribeRate
*/
private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentifier, SubscribeRate subscribeRate) {

long ratePerConsumer = subscribeRate.subscribeThrottlingRatePerConsumer;
long ratePeriod = subscribeRate.ratePeriodInSecond;

// update subscribe-rateLimiter
if (ratePerConsumer > 0) {
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS));
ratePeriod, TimeUnit.SECONDS, null));
} else {
this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS);
this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
null);
}
} else {
// subscribe-rate should be disable and close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import com.google.common.collect.Sets;

import static org.testng.Assert.assertNotNull;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand Down Expand Up @@ -904,4 +907,75 @@ protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
});
}

/**
* It verifies that relative throttling at least dispatch messages as publish-rate.
*
* @param subscription
* @throws Exception
*/
@Test(dataProvider = "subscriptions")
public void testRelativeMessageRateLimitingThrottling(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);

final String namespace = "my-property/relative_throttling_ns";
final String topicName = "persistent://" + namespace + "/relative-throttle";

final int messageRate = 1;
DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1, true);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setDispatchRate(namespace, dispatchRate);
// create producer and topic
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
boolean isMessageRateUpdate = false;
int retry = 10;
for (int i = 0; i < retry; i++) {
if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
isMessageRateUpdate = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
Thread.sleep(2000);

final int numProducedMessages = 1000;

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(subscription).subscribe();
// deactive cursors
deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());

// send a message, which will make dispatcher-ratelimiter initialize and schedule renew task
producer.send("test".getBytes());
assertNotNull(consumer.receive(100, TimeUnit.MILLISECONDS));

Field lastUpdatedMsgRateIn = PersistentTopic.class.getDeclaredField("lastUpdatedAvgPublishRateInMsg");
lastUpdatedMsgRateIn.setAccessible(true);
lastUpdatedMsgRateIn.set(topic, numProducedMessages);

for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}

int totalReceived = 0;
// Relative throttling will let it drain immediately because it allows to dispatch = (publish-rate +
// dispatch-rate)
for (int i = 0; i < numProducedMessages; i++) {
Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
totalReceived++;
assertNotNull(msg);
}

Assert.assertEquals(totalReceived, numProducedMessages);

consumer.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,15 @@ private class SetDispatchRate extends CliCommand {
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;

@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().setDispatchRate(namespace,
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}

Expand Down Expand Up @@ -608,11 +612,15 @@ private class SetSubscriptionDispatchRate extends CliCommand {
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;

@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().setSubscriptionDispatchRate(namespace,
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class DispatchRate {

public int dispatchThrottlingRateInMsg = -1;
public long dispatchThrottlingRateInByte = -1;
public boolean relativeToPublishRate = false; /* throttles dispatch relatively publish-rate */
public int ratePeriodInSecond = 1; /* by default dispatch-rate will be calculate per 1 second */

public DispatchRate() {
Expand All @@ -45,6 +46,12 @@ public DispatchRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRate
this.ratePeriodInSecond = ratePeriodInSecond;
}

public DispatchRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRateInByte,
int ratePeriodInSecond, boolean relativeToPublishRate) {
this(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, ratePeriodInSecond);
this.relativeToPublishRate = relativeToPublishRate;
}

@Override
public int hashCode() {
return Objects.hash(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
Expand Down Expand Up @@ -57,19 +58,22 @@ public class RateLimiter implements AutoCloseable{
private long permits;
private long acquiredPermits;
private boolean isClosed;
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit);
this(null, permits, rateTime, timeUnit, null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
this.permitUpdater = permitUpdater;

if (service != null) {
this.executorService = service;
Expand Down Expand Up @@ -198,14 +202,16 @@ public synchronized void setRate(long permits) {
* @param permits
* @param rateTime
* @param timeUnit
* @param permitUpdaterByte
*/
public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit) {
public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit, Supplier<Long> permitUpdaterByte) {
if (renewTask != null) {
renewTask.cancel(false);
}
this.permits = permits;
this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permitUpdater = permitUpdaterByte;
this.renewTask = createTask();
}

Expand All @@ -232,6 +238,12 @@ protected ScheduledFuture<?> createTask() {

synchronized void renew() {
acquiredPermits = 0;
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
setRate(newPermitRate);
}
}
notifyAll();
}

Expand Down
Loading

0 comments on commit 02bf9a0

Please sign in to comment.