Skip to content

Commit

Permalink
Consider cluster-throttling config when namespace-policy throttling i…
Browse files Browse the repository at this point in the history
…s disabled (#748)
  • Loading branch information
rdhabalia authored and merlimat committed Sep 13, 2017
1 parent 6e3ba01 commit c19a1ce
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,20 +1054,16 @@ private void updateConfigurationAndRegisterListeners() {
});
// add listener to update message-dispatch-rate in msg
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
DispatchRate dispatchRate = new DispatchRate((int) dispatchRatePerTopicInMsg,
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(), 1);
updateTopicMessageDispatchRate(dispatchRate);
updateTopicMessageDispatchRate();
});
// add listener to update message-dispatch-rate in byte
registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> {
DispatchRate dispatchRate = new DispatchRate(pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
(long) dispatchRatePerTopicInByte, 1);
updateTopicMessageDispatchRate(dispatchRate);
updateTopicMessageDispatchRate();
});
// add more listeners here
}

private void updateTopicMessageDispatchRate(final DispatchRate dispatchRate) {
private void updateTopicMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic
topics.forEach((name, topicFuture) -> {
Expand All @@ -1077,14 +1073,11 @@ private void updateTopicMessageDispatchRate(final DispatchRate dispatchRate) {
if (topicFuture.get() instanceof PersistentTopic) {
PersistentTopic topic = (PersistentTopic) topicFuture.get();
topicName = topicFuture.get().getName();
// update broker-dispatch throttling only if namespace-policy is not configured
DispatchRateLimiter rateLimiter = topic.getDispatchRateLimiter();
if (rateLimiter.getPoliciesDispatchRate() == null) {
rateLimiter.updateDispatchRate(dispatchRate);
}
// it first checks namespace-policy rate and if not present then applies broker-config
topic.getDispatchRateLimiter().updateDispatchRate();
}
} catch (Exception e) {
log.warn("[{}] failed to update message-dispatch rate {}", topicName, dispatchRate);
log.warn("[{}] failed to update message-dispatch rate {}", topicName, e.getMessage());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public boolean isDispatchRateLimitingEnabled() {
* Update dispatch-throttling-rate. gives first priority to namespace-policy configured dispatch rate else applies
* default broker dispatch-throttling-rate
*/
private void updateDispatchRate() {
public void updateDispatchRate() {
DispatchRate dispatchRate = getPoliciesDispatchRate();
if (dispatchRate == null) {
dispatchRate = new DispatchRate(brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
Expand All @@ -121,14 +121,22 @@ private void registerLocalPoliciesListener() {
DispatchRate dispatchRate = data.clusterDispatchRate.get(cluster);
// update dispatch-rate only if it's configured in policies else ignore
if (dispatchRate != null) {
final DispatchRate clusterDispatchRate = new DispatchRate(
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte(), 1);
// if policy-throttling rate is disabled and cluster-throttling is enabled then apply
// cluster-throttling rate
if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(clusterDispatchRate)) {
dispatchRate = clusterDispatchRate;
}
updateDispatchRate(dispatchRate);
}
}
});
}

/**
* Gets configured dispatch-rate from namespace policies. Returns null if dispach-rate is not configured
* Gets configured dispatch-rate from namespace policies. Returns null if dispatch-rate is not configured
*
* @return
*/
Expand All @@ -141,7 +149,11 @@ public DispatchRate getPoliciesDispatchRate() {
.get(cacheTimeOutInSec, SECONDS);
if (policies.isPresent() && policies.get().clusterDispatchRate != null
&& policies.get().clusterDispatchRate.get(cluster) != null) {
return policies.get().clusterDispatchRate.get(cluster);
DispatchRate dispatchRate = policies.get().clusterDispatchRate.get(cluster);
// return policy-dispatch rate only if it's enabled in policies
if (isDispatchRateEnabled(dispatchRate)) {
return dispatchRate;
}
}
} catch (Exception e) {
log.warn("Failed to get message-rate for {}", this.topicName, e);
Expand Down Expand Up @@ -216,6 +228,12 @@ public long getDispatchRateOnByte() {
return dispatchRateLimiterOnByte != null ? dispatchRateLimiterOnByte.getRate() : -1;
}


private boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
return dispatchRate != null && (dispatchRate.dispatchThrottlingRatePerTopicInMsg > 0
|| dispatchRate.dispatchThrottlingRatePerTopicInByte > 0);
}

public void close() {
// close rate-limiter
if (dispatchRateLimiterOnMessage != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,80 @@ public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscri
this.conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
log.info("-- Exiting {} test --", methodName);
}

/**
* <pre>
* It verifies that cluster-throttling value gets considered when namespace-policy throttling is disabled.
*
* 1. Update cluster-throttling-config: topic rate-limiter has cluster-config
* 2. Update namespace-throttling-config: topic rate-limiter has namespace-config
* 3. Disable namespace-throttling-config: topic rate-limiter has cluster-config
* 4. Create new topic with disable namespace-config and enabled cluster-config: it takes cluster-config
*
* </pre>
*
* @throws Exception
*/
@Test
public void testClusterPolicyOverrideConfiguration() throws Exception {
log.info("-- Starting {} test --", methodName);

final String namespace = "my-property/use/throttling_ns";
final String topicName1 = "persistent://" + namespace + "/throttlingOverride1";
final String topicName2 = "persistent://" + namespace + "/throttlingOverride2";
final int clusterMessageRate = 100;

int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
// (1) Update message-dispatch-rate limit
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
Integer.toString(clusterMessageRate));
// sleep incrementally as zk-watch notification is async and may take some time
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg() != initValue) {
Thread.sleep(50 + (i * 10));
}
}
Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), initValue);

admin.namespaces().createNamespace(namespace);
// create producer and topic
Producer producer = pulsarClient.createProducer(topicName1);
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName1).get();

// (1) Update dispatch rate on cluster-config update
Assert.assertEquals(clusterMessageRate, topic.getDispatchRateLimiter().getDispatchRateOnMsg());

// (2) Update namespace throttling limit
int nsMessageRate = 500;
DispatchRate dispatchRate = new DispatchRate(nsMessageRate, 0, 1);
admin.namespaces().setDispatchRate(namespace, dispatchRate);
for (int i = 0; i < 5; i++) {
if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() != nsMessageRate) {
Thread.sleep(50 + (i * 10));
}
}
Assert.assertEquals(nsMessageRate, topic.getDispatchRateLimiter().getDispatchRateOnMsg());

// (3) Disable namespace throttling limit will force to take cluster-config
dispatchRate = new DispatchRate(0, 0, 1);
admin.namespaces().setDispatchRate(namespace, dispatchRate);
for (int i = 0; i < 5; i++) {
if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() == nsMessageRate) {
Thread.sleep(50 + (i * 10));
}
}
Assert.assertEquals(clusterMessageRate, topic.getDispatchRateLimiter().getDispatchRateOnMsg());

// (5) Namespace throttling is disabled so, new topic should take cluster throttling limit
Producer producer2 = pulsarClient.createProducer(topicName2);
PersistentTopic topic2 = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName2).get();
Assert.assertEquals(clusterMessageRate, topic2.getDispatchRateLimiter().getDispatchRateOnMsg());

producer.close();
producer2.close();

log.info("-- Exiting {} test --", methodName);
}

private void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.stats.client;

import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.assertEquals;
import static org.mockito.Mockito.spy;

import java.net.URL;
Expand Down Expand Up @@ -49,11 +47,12 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;

public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {

Expand Down Expand Up @@ -135,7 +134,8 @@ public void testTopicInternalStats() throws Exception {
PersistentTopicInternalStats internalStats = topic.getInternalStats();
CursorStats cursor = internalStats.cursors.get(subscriptionName);
assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
assertEquals(cursor.totalNonContiguousDeletedMessagesRange, numberOfMsgs / 2 - 1);
assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
&& (cursor.totalNonContiguousDeletedMessagesRange) < numberOfMsgs / 2);

producer.close();
consumer.close();
Expand Down

0 comments on commit c19a1ce

Please sign in to comment.