Skip to content

Commit

Permalink
[fix][broker] change name limitTime to limitTimeInSec (apache#19053)
Browse files Browse the repository at this point in the history
Co-authored-by: lushiji <lushiji@didiglobal.com>
  • Loading branch information
2 people authored and tisonkun committed Dec 27, 2022
1 parent 02d7f65 commit d680084
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retent
}
// time based quota is in second
if (retention.getRetentionTimeInMinutes() > 0
&& quota.getLimitTime() >= retention.getRetentionTimeInMinutes() * 60) {
&& quota.getLimitTimeInSec() >= retention.getRetentionTimeInMinutes() * 60) {
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
if (preciseTimeBasedBacklogQuotaCheck) {
// Set the reduction factor to 90%. The aim is to drop down the backlog to 90% of the quota limit.
double reductionFactor = 0.9;
int target = (int) (reductionFactor * quota.getLimitTime());
int target = (int) (reductionFactor * quota.getLimitTimeInSec());
if (log.isDebugEnabled()) {
log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target);
}
Expand Down Expand Up @@ -226,7 +226,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTimeInSec() * 1000) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2724,7 +2724,7 @@ public boolean isSizeBacklogExceeded() {
*/
public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTimeInSec();

// If backlog quota by time is not set and we have no durable cursor.
if (backlogQuotaLimitInSecond <= 0
Expand Down Expand Up @@ -2780,7 +2780,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

private CompletableFuture<Boolean> slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition)
throws ExecutionException, InterruptedException {
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTimeInSec();
Long ledgerId = slowestPosition.getLedgerId();
if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
return CompletableFuture.completedFuture(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.backlogQuotaLimit = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
stats.backlogQuotaLimitTime = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTimeInSec();

stats.managedLedgerStats.storageWriteLatencyBuckets
.addAll(mlStats.getInternalAddEntryLatencyBuckets());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testReplicateQuotaTopicPolicies() throws Exception {
// set BacklogQuota
BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl();
backlogQuota.setLimitSize(1);
backlogQuota.setLimitTime(2);
backlogQuota.setLimitTimeInSec(2);
// local
admin1.topicPolicies().setBacklogQuota(topic, backlogQuota);
Awaitility.await().untilAsserted(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface BacklogQuota {
*
* @return quota limit in second
*/
int getLimitTime();
int getLimitTimeInSec();

RetentionPolicy getPolicy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class BacklogQuotaImpl implements BacklogQuota {
/**
* backlog quota by time in second.
*/
private int limitTime;
private int limitTimeInSec;
private RetentionPolicy policy;

public BacklogQuotaImpl(long limitSize, int limitTime, RetentionPolicy policy) {
public BacklogQuotaImpl(long limitSize, int limitTimeInSec, RetentionPolicy policy) {
this.limitSize = limitSize;
this.limitTime = limitTime;
this.limitTimeInSec = limitTimeInSec;
this.policy = policy;
}

Expand Down Expand Up @@ -80,12 +80,12 @@ public void setLimitSize(long limitSize) {
this.limit = limitSize;
}

public int getLimitTime() {
return limitTime;
public int getLimitTimeInSec() {
return limitTimeInSec;
}

public void setLimitTime(int limitTime) {
this.limitTime = limitTime;
public void setLimitTimeInSec(int limitTimeInSec) {
this.limitTimeInSec = limitTimeInSec;
}

public RetentionPolicy getPolicy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testV27ClientSetV28BrokerRead() throws Exception {
Policies writePolicy = new Policies();
BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
writeBacklogQuota.setLimit(1024);
writeBacklogQuota.setLimitTime(60);
writeBacklogQuota.setLimitTimeInSec(60);
writeBacklogQuota.setPolicy(testPolicy);
HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap = new HashMap<>();
quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota);
Expand All @@ -56,7 +56,7 @@ public void testV27ClientSetV28BrokerRead() throws Exception {
Policies policies = simpleType.deserialize("/path", serialize, null);
BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
Assert.assertEquals(readBacklogQuota.getLimitSize(), 1024);
Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
Assert.assertEquals(readBacklogQuota.getLimitTimeInSec(), 60);
Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
}

Expand All @@ -65,7 +65,7 @@ public void testV28ClientSetV28BrokerRead() throws Exception {
Policies writePolicy = new Policies();
BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
writeBacklogQuota.setLimitSize(1024);
writeBacklogQuota.setLimitTime(60);
writeBacklogQuota.setLimitTimeInSec(60);
writeBacklogQuota.setPolicy(testPolicy);
HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap = new HashMap<>();
quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota);
Expand All @@ -74,7 +74,7 @@ public void testV28ClientSetV28BrokerRead() throws Exception {
Policies policies = simpleType.deserialize("/path", serialize, null);
BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
Assert.assertEquals(readBacklogQuota.getLimit(), 1024);
Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
Assert.assertEquals(readBacklogQuota.getLimitTimeInSec(), 60);
Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
}

Expand Down Expand Up @@ -103,7 +103,7 @@ public void testBackwardCompatibility() throws IOException {
Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
1001);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTimeInSec(),
0);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getPolicy(),
BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
Expand All @@ -125,7 +125,7 @@ public void testBackwardCompatibilityNullLimitAndLimitSize() throws IOException
Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
0);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTimeInSec(),
0);
}

Expand Down

0 comments on commit d680084

Please sign in to comment.