diff --git a/conf/broker.conf b/conf/broker.conf index 63324c224a700e..1de4e3ceee84ce 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -871,6 +871,9 @@ managedLedgerDefaultAckQuorum=2 # Default is 60 seconds managedLedgerCursorPositionFlushSeconds = 60 +# How frequently to refresh the stats. (seconds). Default is 60 seconds +managedLedgerStatsPeriodSeconds = 60 + # Default type of checksum to use when writing to BookKeeper. Default is "CRC32C" # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum). managedLedgerDigestType=CRC32C diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index ef92957310fbce..a00c1616410830 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -48,22 +48,22 @@ public class ManagedLedgerFactoryConfig { private double cacheEvictionFrequency = 100; /** - * All entries that have stayed in cache for more than the configured time, will be evicted + * All entries that have stayed in cache for more than the configured time, will be evicted. */ private long cacheEvictionTimeThresholdMillis = 1000; /** - * Whether we should make a copy of the entry payloads when inserting in cache + * Whether we should make a copy of the entry payloads when inserting in cache. */ private boolean copyEntriesInCache = false; /** - * Whether trace managed ledger task execution time + * Whether trace managed ledger task execution time. */ private boolean traceTaskExecution = true; /** - * Managed ledger prometheus stats Latency Rollover Seconds + * Managed ledger prometheus stats Latency Rollover Seconds. */ private int prometheusStatsLatencyRolloverSeconds = 60; @@ -73,7 +73,12 @@ public class ManagedLedgerFactoryConfig { private int cursorPositionFlushSeconds = 60; /** - * cluster name for prometheus stats + * How frequently to refresh the stats. + */ + private int statsPeriodSeconds = 60; + + /** + * cluster name for prometheus stats. */ private String clusterName; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 16d577ab5232af..8453c1a4b2c858 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -102,8 +102,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final long cacheEvictionTimeThresholdNanos; private final MetadataStore metadataStore; - public static final int StatsPeriodSeconds = 60; - private static class PendingInitializeManagedLedger { private final ManagedLedgerImpl ledger; @@ -177,7 +175,7 @@ private ManagedLedgerFactoryImpl(MetadataStore metadataStore, this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats, - 0, StatsPeriodSeconds, TimeUnit.SECONDS); + config.getStatsPeriodSeconds(), config.getStatsPeriodSeconds(), TimeUnit.SECONDS); this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(this::flushCursors, config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e9926f43fcf868..aaec952831c579 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1389,6 +1389,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds") private int managedLedgerCursorPositionFlushSeconds = 60; + @FieldContext(minValue = 1, + category = CATEGORY_STORAGE_ML, + doc = "How frequently to refresh the stats. (seconds). Default is 60 seconds") + private int managedLedgerStatsPeriodSeconds = 60; + // // @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 431cb729b22fb1..52572cff4fd6a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -70,6 +70,7 @@ public void initialize(ServiceConfiguration conf, MetadataStore metadataStore, managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds()); managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType()); + managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds()); Configuration configuration = new ClientConfiguration(); if (conf.isBookkeeperClientExposeStatsToPrometheus()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java index 889eb8ea5c1d70..8a9dd4a3da6cdb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java @@ -46,11 +46,15 @@ public class ManagedLedgerMetrics extends AbstractMetrics { private static final Buckets BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES); + private int statsPeriodSeconds; + public ManagedLedgerMetrics(PulsarService pulsar) { super(pulsar); this.metricsCollection = Lists.newArrayList(); this.ledgersByDimensionMap = Maps.newHashMap(); this.tempAggregatedMetricsMap = Maps.newHashMap(); + this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()) + .getConfig().getStatsPeriodSeconds(); } @Override @@ -112,16 +116,16 @@ private List aggregate(Map> ledgersByD // handle bucket entries initialization here BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap, lStats.getAddEntryLatencyBuckets(), - ManagedLedgerFactoryImpl.StatsPeriodSeconds); + statsPeriodSeconds); BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap, lStats.getLedgerAddEntryLatencyBuckets(), - ManagedLedgerFactoryImpl.StatsPeriodSeconds); + statsPeriodSeconds); BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap, lStats.getLedgerSwitchLatencyBuckets(), - ManagedLedgerFactoryImpl.StatsPeriodSeconds); + statsPeriodSeconds); BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap, lStats.getEntrySizeBuckets(), - ManagedLedgerFactoryImpl.StatsPeriodSeconds); + statsPeriodSeconds); populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate", lStats.getMarkDeleteRate()); }