Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] change name limitTime to limitTimeInSec #19053

Merged
merged 1 commit into from
Dec 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retent
}
// time based quota is in second
if (retention.getRetentionTimeInMinutes() > 0
&& quota.getLimitTime() >= retention.getRetentionTimeInMinutes() * 60) {
&& quota.getLimitTimeInSec() >= retention.getRetentionTimeInMinutes() * 60) {
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
if (preciseTimeBasedBacklogQuotaCheck) {
// Set the reduction factor to 90%. The aim is to drop down the backlog to 90% of the quota limit.
double reductionFactor = 0.9;
int target = (int) (reductionFactor * quota.getLimitTime());
int target = (int) (reductionFactor * quota.getLimitTimeInSec());
if (log.isDebugEnabled()) {
log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target);
}
Expand Down Expand Up @@ -226,7 +226,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTimeInSec() * 1000) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2724,7 +2724,7 @@ public boolean isSizeBacklogExceeded() {
*/
public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTimeInSec();

// If backlog quota by time is not set and we have no durable cursor.
if (backlogQuotaLimitInSecond <= 0
Expand Down Expand Up @@ -2780,7 +2780,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

private CompletableFuture<Boolean> slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition)
throws ExecutionException, InterruptedException {
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTimeInSec();
Long ledgerId = slowestPosition.getLedgerId();
if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
return CompletableFuture.completedFuture(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.backlogQuotaLimit = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
stats.backlogQuotaLimitTime = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTimeInSec();

stats.managedLedgerStats.storageWriteLatencyBuckets
.addAll(mlStats.getInternalAddEntryLatencyBuckets());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testReplicateQuotaTopicPolicies() throws Exception {
// set BacklogQuota
BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl();
backlogQuota.setLimitSize(1);
backlogQuota.setLimitTime(2);
backlogQuota.setLimitTimeInSec(2);
// local
admin1.topicPolicies().setBacklogQuota(topic, backlogQuota);
Awaitility.await().untilAsserted(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface BacklogQuota {
*
* @return quota limit in second
*/
int getLimitTime();
int getLimitTimeInSec();

RetentionPolicy getPolicy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class BacklogQuotaImpl implements BacklogQuota {
/**
* backlog quota by time in second.
*/
private int limitTime;
private int limitTimeInSec;
private RetentionPolicy policy;

public BacklogQuotaImpl(long limitSize, int limitTime, RetentionPolicy policy) {
public BacklogQuotaImpl(long limitSize, int limitTimeInSec, RetentionPolicy policy) {
this.limitSize = limitSize;
this.limitTime = limitTime;
this.limitTimeInSec = limitTimeInSec;
this.policy = policy;
}

Expand Down Expand Up @@ -80,12 +80,12 @@ public void setLimitSize(long limitSize) {
this.limit = limitSize;
}

public int getLimitTime() {
return limitTime;
public int getLimitTimeInSec() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change without any backwards compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to revert this change and instead focus on improving documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that approach. There are many names in Pulsar that are slightly confusing. It'd make more sense to define a paradigm for naming before making many one-off changes.

return limitTimeInSec;
}

public void setLimitTime(int limitTime) {
this.limitTime = limitTime;
public void setLimitTimeInSec(int limitTimeInSec) {
this.limitTimeInSec = limitTimeInSec;
}

public RetentionPolicy getPolicy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testV27ClientSetV28BrokerRead() throws Exception {
Policies writePolicy = new Policies();
BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
writeBacklogQuota.setLimit(1024);
writeBacklogQuota.setLimitTime(60);
writeBacklogQuota.setLimitTimeInSec(60);
writeBacklogQuota.setPolicy(testPolicy);
HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap = new HashMap<>();
quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota);
Expand All @@ -56,7 +56,7 @@ public void testV27ClientSetV28BrokerRead() throws Exception {
Policies policies = simpleType.deserialize("/path", serialize, null);
BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
Assert.assertEquals(readBacklogQuota.getLimitSize(), 1024);
Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
Assert.assertEquals(readBacklogQuota.getLimitTimeInSec(), 60);
Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
}

Expand All @@ -65,7 +65,7 @@ public void testV28ClientSetV28BrokerRead() throws Exception {
Policies writePolicy = new Policies();
BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
writeBacklogQuota.setLimitSize(1024);
writeBacklogQuota.setLimitTime(60);
writeBacklogQuota.setLimitTimeInSec(60);
writeBacklogQuota.setPolicy(testPolicy);
HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap = new HashMap<>();
quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota);
Expand All @@ -74,7 +74,7 @@ public void testV28ClientSetV28BrokerRead() throws Exception {
Policies policies = simpleType.deserialize("/path", serialize, null);
BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
Assert.assertEquals(readBacklogQuota.getLimit(), 1024);
Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
Assert.assertEquals(readBacklogQuota.getLimitTimeInSec(), 60);
Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
}

Expand Down Expand Up @@ -103,7 +103,7 @@ public void testBackwardCompatibility() throws IOException {
Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
1001);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTimeInSec(),
0);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getPolicy(),
BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
Expand All @@ -125,7 +125,7 @@ public void testBackwardCompatibilityNullLimitAndLimitSize() throws IOException
Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
0);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTimeInSec(),
0);
}

Expand Down