From d680084d56687909909b2a9ff4135198f32e7219 Mon Sep 17 00:00:00 2001 From: StevenLuMT Date: Mon, 26 Dec 2022 00:45:22 +0800 Subject: [PATCH] [fix][broker] change name limitTime to limitTimeInSec (#19053) Co-authored-by: lushiji --- .../apache/pulsar/broker/admin/AdminResource.java | 2 +- .../pulsar/broker/service/BacklogQuotaManager.java | 4 ++-- .../broker/service/persistent/PersistentTopic.java | 4 ++-- .../stats/prometheus/NamespaceStatsAggregator.java | 2 +- .../service/ReplicatorTopicPoliciesTest.java | 2 +- .../pulsar/common/policies/data/BacklogQuota.java | 2 +- .../policies/data/impl/BacklogQuotaImpl.java | 14 +++++++------- .../metadata/BacklogQuotaCompatibilityTest.java | 12 ++++++------ 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index ad141c5884fc15..90eafa7c35cccc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -394,7 +394,7 @@ protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retent } // time based quota is in second if (retention.getRetentionTimeInMinutes() > 0 - && quota.getLimitTime() >= retention.getRetentionTimeInMinutes() * 60) { + && quota.getLimitTimeInSec() >= retention.getRetentionTimeInMinutes() * 60) { return false; } return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index bc2541c802e637..c92ff3fd636fee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -197,7 +197,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo if (preciseTimeBasedBacklogQuotaCheck) { // Set the reduction factor to 90%. The aim is to drop down the backlog to 90% of the quota limit. double reductionFactor = 0.9; - int target = (int) (reductionFactor * quota.getLimitTime()); + int target = (int) (reductionFactor * quota.getLimitTimeInSec()); if (log.isDebugEnabled()) { log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target); } @@ -226,7 +226,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo } // Timestamp only > 0 if ledger has been closed if (ledgerInfo.getTimestamp() > 0 - && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) { + && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTimeInSec() * 1000) { // skip whole ledger for the slowest cursor PositionImpl nextPosition = PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2066ac2ee29888..42f73a70328355 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2724,7 +2724,7 @@ public boolean isSizeBacklogExceeded() { */ public CompletableFuture checkTimeBacklogExceeded() { TopicName topicName = TopicName.get(getName()); - int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); + int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTimeInSec(); // If backlog quota by time is not set and we have no durable cursor. if (backlogQuotaLimitInSecond <= 0 @@ -2780,7 +2780,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { private CompletableFuture slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition) throws ExecutionException, InterruptedException { - int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); + int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTimeInSec(); Long ledgerId = slowestPosition.getLedgerId(); if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) { return CompletableFuture.completedFuture(false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 26aae969aab422..bc588ca50c1196 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -162,7 +162,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.backlogQuotaLimit = topic .getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(); stats.backlogQuotaLimitTime = topic - .getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(); + .getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTimeInSec(); stats.managedLedgerStats.storageWriteLatencyBuckets .addAll(mlStats.getInternalAddEntryLatencyBuckets()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index c0281f073cfd4c..1a048e408586d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -83,7 +83,7 @@ public void testReplicateQuotaTopicPolicies() throws Exception { // set BacklogQuota BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl(); backlogQuota.setLimitSize(1); - backlogQuota.setLimitTime(2); + backlogQuota.setLimitTimeInSec(2); // local admin1.topicPolicies().setBacklogQuota(topic, backlogQuota); Awaitility.await().untilAsserted(() -> diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java index 4045eade3db87e..cb7bc46c61dbc8 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java @@ -49,7 +49,7 @@ public interface BacklogQuota { * * @return quota limit in second */ - int getLimitTime(); + int getLimitTimeInSec(); RetentionPolicy getPolicy(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java index 05da03f1090b6b..704c6f0cc0a0bc 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java @@ -45,12 +45,12 @@ public class BacklogQuotaImpl implements BacklogQuota { /** * backlog quota by time in second. */ - private int limitTime; + private int limitTimeInSec; private RetentionPolicy policy; - public BacklogQuotaImpl(long limitSize, int limitTime, RetentionPolicy policy) { + public BacklogQuotaImpl(long limitSize, int limitTimeInSec, RetentionPolicy policy) { this.limitSize = limitSize; - this.limitTime = limitTime; + this.limitTimeInSec = limitTimeInSec; this.policy = policy; } @@ -80,12 +80,12 @@ public void setLimitSize(long limitSize) { this.limit = limitSize; } - public int getLimitTime() { - return limitTime; + public int getLimitTimeInSec() { + return limitTimeInSec; } - public void setLimitTime(int limitTime) { - this.limitTime = limitTime; + public void setLimitTimeInSec(int limitTimeInSec) { + this.limitTimeInSec = limitTimeInSec; } public RetentionPolicy getPolicy() { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java index 02b88361357eba..c4ecf750c7ad0b 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java @@ -47,7 +47,7 @@ public void testV27ClientSetV28BrokerRead() throws Exception { Policies writePolicy = new Policies(); BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl(); writeBacklogQuota.setLimit(1024); - writeBacklogQuota.setLimitTime(60); + writeBacklogQuota.setLimitTimeInSec(60); writeBacklogQuota.setPolicy(testPolicy); HashMap quotaHashMap = new HashMap<>(); quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota); @@ -56,7 +56,7 @@ public void testV27ClientSetV28BrokerRead() throws Exception { Policies policies = simpleType.deserialize("/path", serialize, null); BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage); Assert.assertEquals(readBacklogQuota.getLimitSize(), 1024); - Assert.assertEquals(readBacklogQuota.getLimitTime(), 60); + Assert.assertEquals(readBacklogQuota.getLimitTimeInSec(), 60); Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy); } @@ -65,7 +65,7 @@ public void testV28ClientSetV28BrokerRead() throws Exception { Policies writePolicy = new Policies(); BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl(); writeBacklogQuota.setLimitSize(1024); - writeBacklogQuota.setLimitTime(60); + writeBacklogQuota.setLimitTimeInSec(60); writeBacklogQuota.setPolicy(testPolicy); HashMap quotaHashMap = new HashMap<>(); quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota); @@ -74,7 +74,7 @@ public void testV28ClientSetV28BrokerRead() throws Exception { Policies policies = simpleType.deserialize("/path", serialize, null); BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage); Assert.assertEquals(readBacklogQuota.getLimit(), 1024); - Assert.assertEquals(readBacklogQuota.getLimitTime(), 60); + Assert.assertEquals(readBacklogQuota.getLimitTimeInSec(), 60); Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy); } @@ -103,7 +103,7 @@ public void testBackwardCompatibility() throws IOException { Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null); assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(), 1001); - assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(), + assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTimeInSec(), 0); assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getPolicy(), BacklogQuota.RetentionPolicy.consumer_backlog_eviction); @@ -125,7 +125,7 @@ public void testBackwardCompatibilityNullLimitAndLimitSize() throws IOException Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null); assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(), 0); - assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(), + assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTimeInSec(), 0); }