Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-52: [pulsar-sever] Add support of dispatch throttling relative to publish-rate #5797

Merged
merged 1 commit into from
Dec 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

Expand All @@ -44,17 +45,23 @@ public enum Type {
REPLICATOR
}

private final PersistentTopic topic;
private final String topicName;
private final Type type;

private final BrokerService brokerService;
private RateLimiter dispatchRateLimiterOnMessage;
private RateLimiter dispatchRateLimiterOnByte;
private long subscriptionRelativeRatelimiterOnMessage;
private long subscriptionRelativeRatelimiterOnByte;

public DispatchRateLimiter(PersistentTopic topic, Type type) {
this.topic = topic;
this.topicName = topic.getName();
this.brokerService = topic.getBrokerService();
this.type = type;
this.subscriptionRelativeRatelimiterOnMessage = -1;
this.subscriptionRelativeRatelimiterOnByte = -1;
updateDispatchRate();
}

Expand Down Expand Up @@ -272,14 +279,17 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
long byteRate = dispatchRate.dispatchThrottlingRateInByte;
long ratePeriod = dispatchRate.ratePeriodInSecond;

Supplier<Long> permitUpdaterMsg = dispatchRate.relativeToPublishRate
? () -> getRelativeDispatchRateInMsg(dispatchRate)
: null;
// update msg-rateLimiter
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
ratePeriod, TimeUnit.SECONDS);
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg);
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS);
TimeUnit.SECONDS, permitUpdaterMsg);
}
} else {
// message-rate should be disable and close
Expand All @@ -289,14 +299,17 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
}
}

Supplier<Long> permitUpdaterByte = dispatchRate.relativeToPublishRate
? () -> getRelativeDispatchRateInByte(dispatchRate)
: null;
// update byte-rateLimiter
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
ratePeriod, TimeUnit.SECONDS);
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte);
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS);
TimeUnit.SECONDS, permitUpdaterByte);
}
} else {
// message-rate should be disable and close
Expand All @@ -307,6 +320,18 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
}
}

private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
return (topic != null && dispatchRate != null)
? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.dispatchThrottlingRateInMsg
: 0;
}

private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
return (topic != null && dispatchRate != null)
? (long) topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.dispatchThrottlingRateInByte
: 0;
}

/**
* Get configured msg dispatch-throttling rate. Returns -1 if not configured
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ protected TopicStatsHelper initialValue() {
};

private final AtomicLong pendingWriteOps = new AtomicLong(0);
private volatile double lastUpdatedAvgPublishRateInMsg = 0;
private volatile double lastUpdatedAvgPublishRateInByte = 0;

private static class TopicStatsHelper {
public double averageMsgSize;
Expand Down Expand Up @@ -1283,7 +1285,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
}
});
topicStatsStream.endList();

// if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
// average rate.
lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg
? topicStatsHelper.aggMsgRateIn
: (topicStatsHelper.aggMsgRateIn + lastUpdatedAvgPublishRateInMsg) / 2;
lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > lastUpdatedAvgPublishRateInByte
? topicStatsHelper.aggMsgThroughputIn
: (topicStatsHelper.aggMsgThroughputIn + lastUpdatedAvgPublishRateInByte) / 2;
// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
Expand Down Expand Up @@ -1447,6 +1456,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
this.addEntryLatencyStatsUsec.reset();
}

public double getLastUpdatedAvgPublishRateInMsg() {
return lastUpdatedAvgPublishRateInMsg;
}

public double getLastUpdatedAvgPublishRateInByte() {
return lastUpdatedAvgPublishRateInByte;
}

public TopicStats getStats() {

TopicStats stats = new TopicStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RateLimiter;
Expand All @@ -38,10 +36,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;

public class SubscribeRateLimiter {

private final String topicName;
Expand Down Expand Up @@ -121,17 +115,17 @@ private synchronized void removeSubscribeLimiter(ConsumerIdentifier consumerIden
* @param subscribeRate
*/
private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentifier, SubscribeRate subscribeRate) {

long ratePerConsumer = subscribeRate.subscribeThrottlingRatePerConsumer;
long ratePeriod = subscribeRate.ratePeriodInSecond;

// update subscribe-rateLimiter
if (ratePerConsumer > 0) {
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS));
ratePeriod, TimeUnit.SECONDS, null));
} else {
this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS);
this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
null);
}
} else {
// subscribe-rate should be disable and close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import com.google.common.collect.Sets;

import static org.testng.Assert.assertNotNull;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand Down Expand Up @@ -904,4 +907,75 @@ protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
});
}

/**
* It verifies that relative throttling at least dispatch messages as publish-rate.
*
* @param subscription
* @throws Exception
*/
@Test(dataProvider = "subscriptions")
public void testRelativeMessageRateLimitingThrottling(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);

final String namespace = "my-property/relative_throttling_ns";
final String topicName = "persistent://" + namespace + "/relative-throttle";

final int messageRate = 1;
DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1, true);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setDispatchRate(namespace, dispatchRate);
// create producer and topic
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
boolean isMessageRateUpdate = false;
int retry = 10;
for (int i = 0; i < retry; i++) {
if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
isMessageRateUpdate = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
Thread.sleep(2000);

final int numProducedMessages = 1000;

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(subscription).subscribe();
// deactive cursors
deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());

// send a message, which will make dispatcher-ratelimiter initialize and schedule renew task
producer.send("test".getBytes());
assertNotNull(consumer.receive(100, TimeUnit.MILLISECONDS));

Field lastUpdatedMsgRateIn = PersistentTopic.class.getDeclaredField("lastUpdatedAvgPublishRateInMsg");
lastUpdatedMsgRateIn.setAccessible(true);
lastUpdatedMsgRateIn.set(topic, numProducedMessages);

for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}

int totalReceived = 0;
// Relative throttling will let it drain immediately because it allows to dispatch = (publish-rate +
// dispatch-rate)
for (int i = 0; i < numProducedMessages; i++) {
Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
totalReceived++;
assertNotNull(msg);
}

Assert.assertEquals(totalReceived, numProducedMessages);

consumer.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,15 @@ private class SetDispatchRate extends CliCommand {
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;

@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().setDispatchRate(namespace,
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}

Expand Down Expand Up @@ -608,11 +612,15 @@ private class SetSubscriptionDispatchRate extends CliCommand {
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false)
private int dispatchRatePeriodSec = 1;

@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().setSubscriptionDispatchRate(namespace,
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec));
new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class DispatchRate {

public int dispatchThrottlingRateInMsg = -1;
public long dispatchThrottlingRateInByte = -1;
public boolean relativeToPublishRate = false; /* throttles dispatch relatively publish-rate */
public int ratePeriodInSecond = 1; /* by default dispatch-rate will be calculate per 1 second */

public DispatchRate() {
Expand All @@ -45,6 +46,12 @@ public DispatchRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRate
this.ratePeriodInSecond = ratePeriodInSecond;
}

public DispatchRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRateInByte,
int ratePeriodInSecond, boolean relativeToPublishRate) {
this(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, ratePeriodInSecond);
this.relativeToPublishRate = relativeToPublishRate;
}

@Override
public int hashCode() {
return Objects.hash(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
Expand Down Expand Up @@ -57,19 +58,22 @@ public class RateLimiter implements AutoCloseable{
private long permits;
private long acquiredPermits;
private boolean isClosed;
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit);
this(null, permits, rateTime, timeUnit, null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
this.permitUpdater = permitUpdater;

if (service != null) {
this.executorService = service;
Expand Down Expand Up @@ -198,14 +202,16 @@ public synchronized void setRate(long permits) {
* @param permits
* @param rateTime
* @param timeUnit
* @param permitUpdaterByte
*/
public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit) {
public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit, Supplier<Long> permitUpdaterByte) {
if (renewTask != null) {
renewTask.cancel(false);
}
this.permits = permits;
this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permitUpdater = permitUpdaterByte;
this.renewTask = createTask();
}

Expand All @@ -232,6 +238,12 @@ protected ScheduledFuture<?> createTask() {

synchronized void renew() {
acquiredPermits = 0;
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
setRate(newPermitRate);
}
}
notifyAll();
}

Expand Down
Loading