Skip to content

Commit

Permalink
[ManagedLedger] Make 'StatsPeroidSeconds' configurable (apache#11584)
Browse files Browse the repository at this point in the history
Make `StatsPeroidSeconds` configurable.

- Move ‘StatsPeriodSeconds’ from ManagedLedgerFactoryImpl to ManagedLedgerFactoryConfig.
- Add config `managedLedgerStatsPeriodSeconds`.

(cherry picked from commit ddeae65)
  • Loading branch information
Technoboy- authored and dlg99 committed May 12, 2022
1 parent 246e38e commit ca2f1fd
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 13 deletions.
5 changes: 4 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,10 @@ managedLedgerDefaultAckQuorum=2

# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
# Default is 60 seconds
managedLedgerCursorPositionFlushSeconds = 60
managedLedgerCursorPositionFlushSeconds=60

# How frequently to refresh the stats. (seconds). Default is 60 seconds
managedLedgerStatsPeriodSeconds=60

# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ public class ManagedLedgerFactoryConfig {
private double cacheEvictionFrequency = 100;

/**
* All entries that have stayed in cache for more than the configured time, will be evicted
* All entries that have stayed in cache for more than the configured time, will be evicted.
*/
private long cacheEvictionTimeThresholdMillis = 1000;

/**
* Whether we should make a copy of the entry payloads when inserting in cache
* Whether we should make a copy of the entry payloads when inserting in cache.
*/
private boolean copyEntriesInCache = false;

/**
* Whether trace managed ledger task execution time
* Whether trace managed ledger task execution time.
*/
private boolean traceTaskExecution = true;

/**
* Managed ledger prometheus stats Latency Rollover Seconds
* Managed ledger prometheus stats Latency Rollover Seconds.
*/
private int prometheusStatsLatencyRolloverSeconds = 60;

Expand All @@ -73,7 +73,12 @@ public class ManagedLedgerFactoryConfig {
private int cursorPositionFlushSeconds = 60;

/**
* cluster name for prometheus stats
* How frequently to refresh the stats.
*/
private int statsPeriodSeconds = 60;

/**
* cluster name for prometheus stats.
*/
private String clusterName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

public static final int StatsPeriodSeconds = 60;

private static class PendingInitializeManagedLedger {

private final ManagedLedgerImpl ledger;
Expand Down Expand Up @@ -179,7 +177,7 @@ private ManagedLedgerFactoryImpl(MetadataStore metadataStore,
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, StatsPeriodSeconds, TimeUnit.SECONDS);
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds")
private int managedLedgerCursorPositionFlushSeconds = 60;

@FieldContext(minValue = 1,
category = CATEGORY_STORAGE_ML,
doc = "How frequently to refresh the stats. (seconds). Default is 60 seconds")
private int managedLedgerStatsPeriodSeconds = 60;

//
//
@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void initialize(ServiceConfiguration conf, MetadataStore metadataStore,
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType());
managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds());

Configuration configuration = new ClientConfiguration();
if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
private static final Buckets
BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES);

private int statsPeriodSeconds;

public ManagedLedgerMetrics(PulsarService pulsar) {
super(pulsar);
this.metricsCollection = Lists.newArrayList();
this.ledgersByDimensionMap = Maps.newHashMap();
this.tempAggregatedMetricsMap = Maps.newHashMap();
this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
.getConfig().getStatsPeriodSeconds();
}

@Override
Expand Down Expand Up @@ -112,16 +116,16 @@ private List<Metrics> aggregate(Map<Metrics, List<ManagedLedgerImpl>> ledgersByD
// handle bucket entries initialization here
BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getAddEntryLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
statsPeriodSeconds);
BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getLedgerAddEntryLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
statsPeriodSeconds);
BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getLedgerSwitchLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
statsPeriodSeconds);
BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getEntrySizeBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
statsPeriodSeconds);
populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate",
lStats.getMarkDeleteRate());
}
Expand Down

0 comments on commit ca2f1fd

Please sign in to comment.