From c862356f624ba46b1a457da90c513b2470c026b0 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Sun, 8 Jan 2023 01:12:45 -0600 Subject: [PATCH] [revert] "[fix][broker] change name limitTime to limitTimeInSec (#19053)" (#19152) --- .../apache/pulsar/broker/admin/AdminResource.java | 2 +- .../pulsar/broker/service/BacklogQuotaManager.java | 4 ++-- .../broker/service/persistent/PersistentTopic.java | 4 ++-- .../stats/prometheus/NamespaceStatsAggregator.java | 2 +- .../service/ReplicatorTopicPoliciesTest.java | 2 +- .../pulsar/common/policies/data/BacklogQuota.java | 2 +- .../policies/data/impl/BacklogQuotaImpl.java | 14 +++++++------- .../metadata/BacklogQuotaCompatibilityTest.java | 12 ++++++------ 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 09c9e2b032c6e..a8a7570b9ae4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -394,7 +394,7 @@ protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retent } // time based quota is in second if (retention.getRetentionTimeInMinutes() > 0 - && quota.getLimitTimeInSec() >= retention.getRetentionTimeInMinutes() * 60) { + && quota.getLimitTime() >= retention.getRetentionTimeInMinutes() * 60) { return false; } return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index c92ff3fd636fe..bc2541c802e63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -197,7 +197,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo if (preciseTimeBasedBacklogQuotaCheck) { // Set the reduction factor to 90%. The aim is to drop down the backlog to 90% of the quota limit. double reductionFactor = 0.9; - int target = (int) (reductionFactor * quota.getLimitTimeInSec()); + int target = (int) (reductionFactor * quota.getLimitTime()); if (log.isDebugEnabled()) { log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target); } @@ -226,7 +226,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo } // Timestamp only > 0 if ledger has been closed if (ledgerInfo.getTimestamp() > 0 - && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTimeInSec() * 1000) { + && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) { // skip whole ledger for the slowest cursor PositionImpl nextPosition = PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 04c95126490fb..65831d996c4f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2729,7 +2729,7 @@ public boolean isSizeBacklogExceeded() { */ public CompletableFuture checkTimeBacklogExceeded() { TopicName topicName = TopicName.get(getName()); - int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTimeInSec(); + int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); // If backlog quota by time is not set and we have no durable cursor. if (backlogQuotaLimitInSecond <= 0 @@ -2785,7 +2785,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { private CompletableFuture slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition) throws ExecutionException, InterruptedException { - int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTimeInSec(); + int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); Long ledgerId = slowestPosition.getLedgerId(); if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) { return CompletableFuture.completedFuture(false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index bc588ca50c119..26aae969aab42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -162,7 +162,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.backlogQuotaLimit = topic .getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(); stats.backlogQuotaLimitTime = topic - .getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTimeInSec(); + .getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(); stats.managedLedgerStats.storageWriteLatencyBuckets .addAll(mlStats.getInternalAddEntryLatencyBuckets()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index 1a048e408586d..c0281f073cfd4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -83,7 +83,7 @@ public void testReplicateQuotaTopicPolicies() throws Exception { // set BacklogQuota BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl(); backlogQuota.setLimitSize(1); - backlogQuota.setLimitTimeInSec(2); + backlogQuota.setLimitTime(2); // local admin1.topicPolicies().setBacklogQuota(topic, backlogQuota); Awaitility.await().untilAsserted(() -> diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java index cb7bc46c61dbc..4045eade3db87 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java @@ -49,7 +49,7 @@ public interface BacklogQuota { * * @return quota limit in second */ - int getLimitTimeInSec(); + int getLimitTime(); RetentionPolicy getPolicy(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java index 704c6f0cc0a0b..05da03f1090b6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java @@ -45,12 +45,12 @@ public class BacklogQuotaImpl implements BacklogQuota { /** * backlog quota by time in second. */ - private int limitTimeInSec; + private int limitTime; private RetentionPolicy policy; - public BacklogQuotaImpl(long limitSize, int limitTimeInSec, RetentionPolicy policy) { + public BacklogQuotaImpl(long limitSize, int limitTime, RetentionPolicy policy) { this.limitSize = limitSize; - this.limitTimeInSec = limitTimeInSec; + this.limitTime = limitTime; this.policy = policy; } @@ -80,12 +80,12 @@ public void setLimitSize(long limitSize) { this.limit = limitSize; } - public int getLimitTimeInSec() { - return limitTimeInSec; + public int getLimitTime() { + return limitTime; } - public void setLimitTimeInSec(int limitTimeInSec) { - this.limitTimeInSec = limitTimeInSec; + public void setLimitTime(int limitTime) { + this.limitTime = limitTime; } public RetentionPolicy getPolicy() { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java index c4ecf750c7ad0..02b88361357eb 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java @@ -47,7 +47,7 @@ public void testV27ClientSetV28BrokerRead() throws Exception { Policies writePolicy = new Policies(); BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl(); writeBacklogQuota.setLimit(1024); - writeBacklogQuota.setLimitTimeInSec(60); + writeBacklogQuota.setLimitTime(60); writeBacklogQuota.setPolicy(testPolicy); HashMap quotaHashMap = new HashMap<>(); quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota); @@ -56,7 +56,7 @@ public void testV27ClientSetV28BrokerRead() throws Exception { Policies policies = simpleType.deserialize("/path", serialize, null); BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage); Assert.assertEquals(readBacklogQuota.getLimitSize(), 1024); - Assert.assertEquals(readBacklogQuota.getLimitTimeInSec(), 60); + Assert.assertEquals(readBacklogQuota.getLimitTime(), 60); Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy); } @@ -65,7 +65,7 @@ public void testV28ClientSetV28BrokerRead() throws Exception { Policies writePolicy = new Policies(); BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl(); writeBacklogQuota.setLimitSize(1024); - writeBacklogQuota.setLimitTimeInSec(60); + writeBacklogQuota.setLimitTime(60); writeBacklogQuota.setPolicy(testPolicy); HashMap quotaHashMap = new HashMap<>(); quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota); @@ -74,7 +74,7 @@ public void testV28ClientSetV28BrokerRead() throws Exception { Policies policies = simpleType.deserialize("/path", serialize, null); BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage); Assert.assertEquals(readBacklogQuota.getLimit(), 1024); - Assert.assertEquals(readBacklogQuota.getLimitTimeInSec(), 60); + Assert.assertEquals(readBacklogQuota.getLimitTime(), 60); Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy); } @@ -103,7 +103,7 @@ public void testBackwardCompatibility() throws IOException { Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null); assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(), 1001); - assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTimeInSec(), + assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(), 0); assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getPolicy(), BacklogQuota.RetentionPolicy.consumer_backlog_eviction); @@ -125,7 +125,7 @@ public void testBackwardCompatibilityNullLimitAndLimitSize() throws IOException Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null); assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(), 0); - assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTimeInSec(), + assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(), 0); }