Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ManagedLedger] Make 'StatsPeroidSeconds' configurable #11584

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,9 @@ managedLedgerDefaultAckQuorum=2
# Default is 60 seconds
managedLedgerCursorPositionFlushSeconds = 60

# How frequently to refresh the stats. (seconds). Default is 60 seconds
managedLedgerStatsPeriodSeconds = 60
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved

# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ public class ManagedLedgerFactoryConfig {
private double cacheEvictionFrequency = 100;

/**
* All entries that have stayed in cache for more than the configured time, will be evicted
* All entries that have stayed in cache for more than the configured time, will be evicted.
*/
private long cacheEvictionTimeThresholdMillis = 1000;

/**
* Whether we should make a copy of the entry payloads when inserting in cache
* Whether we should make a copy of the entry payloads when inserting in cache.
*/
private boolean copyEntriesInCache = false;

/**
* Whether trace managed ledger task execution time
* Whether trace managed ledger task execution time.
*/
private boolean traceTaskExecution = true;

/**
* Managed ledger prometheus stats Latency Rollover Seconds
* Managed ledger prometheus stats Latency Rollover Seconds.
*/
private int prometheusStatsLatencyRolloverSeconds = 60;

Expand All @@ -73,7 +73,12 @@ public class ManagedLedgerFactoryConfig {
private int cursorPositionFlushSeconds = 60;

/**
* cluster name for prometheus stats
* How frequently to refresh the stats.
*/
private int statsPeriodSeconds = 60;

/**
* cluster name for prometheus stats.
*/
private String clusterName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

public static final int StatsPeriodSeconds = 60;

private static class PendingInitializeManagedLedger {

private final ManagedLedgerImpl ledger;
Expand Down Expand Up @@ -177,7 +175,7 @@ private ManagedLedgerFactoryImpl(MetadataStore metadataStore,
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats,
0, StatsPeriodSeconds, TimeUnit.SECONDS);
config.getStatsPeriodSeconds(), config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(this::flushCursors,
config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds")
private int managedLedgerCursorPositionFlushSeconds = 60;

@FieldContext(minValue = 1,
category = CATEGORY_STORAGE_ML,
doc = "How frequently to refresh the stats. (seconds). Default is 60 seconds")
private int managedLedgerStatsPeriodSeconds = 60;

//
//
@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void initialize(ServiceConfiguration conf, MetadataStore metadataStore,
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType());
managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds());

Configuration configuration = new ClientConfiguration();
if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
private static final Buckets
BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", ENTRY_SIZE_BUCKETS_BYTES);

private int statsPeriodSeconds;

public ManagedLedgerMetrics(PulsarService pulsar) {
super(pulsar);
this.metricsCollection = Lists.newArrayList();
this.ledgersByDimensionMap = Maps.newHashMap();
this.tempAggregatedMetricsMap = Maps.newHashMap();
this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
.getConfig().getStatsPeriodSeconds();
}

@Override
Expand Down Expand Up @@ -112,16 +116,16 @@ private List<Metrics> aggregate(Map<Metrics, List<ManagedLedgerImpl>> ledgersByD
// handle bucket entries initialization here
BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getAddEntryLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
statsPeriodSeconds);
BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getLedgerAddEntryLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
statsPeriodSeconds);
BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getLedgerSwitchLatencyBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
statsPeriodSeconds);
BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
lStats.getEntrySizeBuckets(),
ManagedLedgerFactoryImpl.StatsPeriodSeconds);
statsPeriodSeconds);
populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate",
lStats.getMarkDeleteRate());
}
Expand Down