From 5d7e272a63f90c9d95add9471571a6fbcd84a5ac Mon Sep 17 00:00:00 2001 From: GuoJiwei Date: Mon, 9 Aug 2021 17:31:53 +0800 Subject: [PATCH] [ManagedLedger] Make 'StatsPeroidSeconds' configurable (#11584) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make `StatsPeroidSeconds` configurable. - Move ‘StatsPeriodSeconds’ from ManagedLedgerFactoryImpl to ManagedLedgerFactoryConfig. - Add config `managedLedgerStatsPeriodSeconds`. --- conf/broker.conf | 5 ++++- .../mledger/ManagedLedgerFactoryConfig.java | 15 ++++++++++----- .../mledger/impl/ManagedLedgerFactoryImpl.java | 4 +--- .../pulsar/broker/ServiceConfiguration.java | 5 +++++ .../pulsar/broker/ManagedLedgerClientFactory.java | 1 + .../stats/metrics/ManagedLedgerMetrics.java | 12 ++++++++---- 6 files changed, 29 insertions(+), 13 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 196ee5f1c5eaa..1688aaf7743ad 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -855,7 +855,10 @@ managedLedgerDefaultAckQuorum=2 # How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). # Default is 60 seconds -managedLedgerCursorPositionFlushSeconds = 60 +managedLedgerCursorPositionFlushSeconds=60 + +# How frequently to refresh the stats. (seconds). Default is 60 seconds +managedLedgerStatsPeriodSeconds=60 # Default type of checksum to use when writing to BookKeeper. Default is "CRC32C" # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum). diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index d2109eaac9dcd..1f9cb317a9e92 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -47,22 +47,22 @@ public class ManagedLedgerFactoryConfig { private double cacheEvictionFrequency = 100; /** - * All entries that have stayed in cache for more than the configured time, will be evicted + * All entries that have stayed in cache for more than the configured time, will be evicted. */ private long cacheEvictionTimeThresholdMillis = 1000; /** - * Whether we should make a copy of the entry payloads when inserting in cache + * Whether we should make a copy of the entry payloads when inserting in cache. */ private boolean copyEntriesInCache = false; /** - * Whether trace managed ledger task execution time + * Whether trace managed ledger task execution time. */ private boolean traceTaskExecution = true; /** - * Managed ledger prometheus stats Latency Rollover Seconds + * Managed ledger prometheus stats Latency Rollover Seconds. */ private int prometheusStatsLatencyRolloverSeconds = 60; @@ -72,7 +72,12 @@ public class ManagedLedgerFactoryConfig { private int cursorPositionFlushSeconds = 60; /** - * cluster name for prometheus stats + * How frequently to refresh the stats. + */ + private int statsPeriodSeconds = 60; + + /** + * cluster name for prometheus stats. */ private String clusterName; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index b00cd0ce02f92..39c4e9b39ddba 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -104,8 +104,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final long cacheEvictionTimeThresholdNanos; private final MetadataStore metadataStore; - public static final int StatsPeriodSeconds = 60; - private static class PendingInitializeManagedLedger { private final ManagedLedgerImpl ledger; @@ -179,7 +177,7 @@ private ManagedLedgerFactoryImpl(MetadataStore metadataStore, this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); this.statsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::refreshStats), - 0, StatsPeriodSeconds, TimeUnit.SECONDS); + 0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS); this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors), config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 5b608552c66c1..07f6924afe5ea 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1375,6 +1375,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds") private int managedLedgerCursorPositionFlushSeconds = 60; + @FieldContext(minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "How frequently to refresh the stats. (seconds). Default is 60 seconds") + private int managedLedgerStatsPeriodSeconds = 60; + // // @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 3b684bf6180f3..68348e2ca2c23 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -69,6 +69,7 @@ public void initialize(ServiceConfiguration conf, MetadataStore metadataStore, conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()); managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds()); + managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds()); Configuration configuration = new ClientConfiguration(); if (conf.isBookkeeperClientExposeStatsToPrometheus()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java index 889eb8ea5c1d7..8a9dd4a3da6cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java @@ -46,11 +46,15 @@ public class ManagedLedgerMetrics extends AbstractMetrics { private static final Buckets BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES); + private int statsPeriodSeconds; + public ManagedLedgerMetrics(PulsarService pulsar) { super(pulsar); this.metricsCollection = Lists.newArrayList(); this.ledgersByDimensionMap = Maps.newHashMap(); this.tempAggregatedMetricsMap = Maps.newHashMap(); + this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()) + .getConfig().getStatsPeriodSeconds(); } @Override @@ -112,16 +116,16 @@ private List aggregate(Map> ledgersByD // handle bucket entries initialization here BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap, lStats.getAddEntryLatencyBuckets(), - ManagedLedgerFactoryImpl.StatsPeriodSeconds); + statsPeriodSeconds); BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap, lStats.getLedgerAddEntryLatencyBuckets(), - ManagedLedgerFactoryImpl.StatsPeriodSeconds); + statsPeriodSeconds); BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap, lStats.getLedgerSwitchLatencyBuckets(), - ManagedLedgerFactoryImpl.StatsPeriodSeconds); + statsPeriodSeconds); BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap, lStats.getEntrySizeBuckets(), - ManagedLedgerFactoryImpl.StatsPeriodSeconds); + statsPeriodSeconds); populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate", lStats.getMarkDeleteRate()); }