Skip to content

Commit

Permalink
feat(new_metrics): retire stale metric entities that are not used by …
Browse files Browse the repository at this point in the history
…any other object (#1304)
  • Loading branch information
empiredan authored Feb 7, 2023
1 parent 390ccf8 commit 624cae5
Show file tree
Hide file tree
Showing 3 changed files with 529 additions and 70 deletions.
191 changes: 173 additions & 18 deletions src/utils/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

#include "utils/metrics.h"

#include "runtime/api_layer1.h"
#include "utils/api_utilities.h"
#include "utils/flags.h"
#include "utils/rand.h"
#include "utils/shared_io_service.h"
#include "utils/string_conv.h"
#include "utils/strings.h"

namespace dsn {

DSN_DEFINE_uint64(metrics,
entity_retirement_delay_ms,
10 * 60 * 1000,
"The retention internal (milliseconds) for an entity after it becomes stale.");

metric_entity::metric_entity(const metric_entity_prototype *prototype,
const std::string &id,
const attr_map &attrs)
: _prototype(prototype), _id(id), _attrs(attrs)
: _prototype(prototype), _id(id), _attrs(attrs), _retire_time_ms(0)
{
}

Expand All @@ -45,13 +52,10 @@ void metric_entity::close(close_option option)
{
utils::auto_write_lock l(_lock);

// The reason why each metric is closed in the entity rather than in the destructor of each
// metric is that close() for the metric will return immediately without waiting for any close
// operation to be finished.
//
// Thus, to close all metrics owned by an entity, it's more efficient to firstly issue a close
// request for all metrics; then, just wait for all of the close operations to be finished.
// It's inefficient to wait for each metric to be closed one by one.
// To close all metrics owned by an entity, it's more efficient to firstly issue an asynchronous
// close request to each metric; then, just wait for all of the close operations to be finished.
// It's inefficient to wait for each metric to be closed one by one. Therefore, the metric is
// not closed in its destructor.
for (auto &m : _metrics) {
if (m.second->prototype()->type() == metric_type::kPercentile) {
auto p = down_cast<closeable_metric *>(m.second.get());
Expand Down Expand Up @@ -177,6 +181,17 @@ void metric_entity::take_snapshot(metric_json_writer &writer, const metric_filte
writer.EndObject();
}

bool metric_entity::is_stale() const
{
// Since this entity itself is still being accessed, its reference count should be 1
// at least.
CHECK_GE(get_count(), 1);

// This entity is considered stale once there is only one reference for it kept in the
// registry.
return get_count() == 1;
}

void metric_filters::extract_entity_metrics(const metric_entity::metric_map &candidates,
metric_entity::metric_map &target_metrics) const
{
Expand Down Expand Up @@ -312,10 +327,12 @@ metric_registry::metric_registry() : _http_service(this)
{
// We should ensure that metric_registry is destructed before shared_io_service is destructed.
// Once shared_io_service is destructed before metric_registry is destructed,
// boost::asio::io_service needed by metrics in metric_registry such as percentile_timer will
// boost::asio::io_service needed by metrics in metric_registry such as metric_timer will
// be released firstly, then will lead to heap-use-after-free error since percentiles in
// metric_registry are still running but the resources they needed have been released.
tools::shared_io_service::instance();

start_timer();
}

metric_registry::~metric_registry()
Expand All @@ -336,6 +353,39 @@ metric_registry::~metric_registry()
for (auto &entity : _entities) {
entity.second->close(metric_entity::close_option::kNoWait);
}

stop_timer();
}

void metric_registry::on_close() {}

void metric_registry::start_timer()
{
if (_timer) {
return;
}

// Once an entity is considered stale, it will be retired after the retention interval,
// namely FLAGS_entity_retirement_delay_ms milliseconds. Therefore, if the interval of
// the timer is also set to FLAGS_entity_retirement_delay_ms, in the next round, it's
// just about time to retire this entity.
_timer.reset(new metric_timer(FLAGS_entity_retirement_delay_ms,
std::bind(&metric_registry::process_stale_entities, this),
std::bind(&metric_registry::on_close, this)));
}

void metric_registry::stop_timer()
{
if (!_timer) {
return;
}

// Close the timer synchronously.
_timer->close();
_timer->wait();

// Reset the timer to mark that it has been stopped, now it could be started.
_timer.reset();
}

metric_registry::entity_map metric_registry::entities() const
Expand Down Expand Up @@ -383,6 +433,111 @@ metric_entity_ptr metric_registry::find_or_create_entity(const metric_entity_pro
return entity;
}

metric_registry::collected_entities_info metric_registry::collect_stale_entities() const
{
collected_entities_info collected_info;

auto now = dsn_now_ms();

utils::auto_read_lock l(_lock);

for (const auto &entity : _entities) {
if (!entity.second->is_stale()) {
if (entity.second->_retire_time_ms > 0) {
// This entity had been scheduled to be retired. However, it was reemployed
// after that. It has been in use since then, therefore its scheduled time
// for retirement should be reset to 0.
collected_info.collected_entities.insert(entity.first);
}
continue;
}

if (entity.second->_retire_time_ms > now) {
// This entity has been scheduled to be retired, however it is still within
// the retention interval. Thus do not collect it.
++collected_info.num_scheduled_entities;
continue;
}

collected_info.collected_entities.insert(entity.first);
}

collected_info.num_all_entities = _entities.size();
return collected_info;
}

metric_registry::retired_entities_stat
metric_registry::retire_stale_entities(const collected_entity_list &collected_entities)
{
if (collected_entities.empty()) {
// Do not lock for empty list.
return retired_entities_stat();
}

retired_entities_stat retired_stat;

auto now = dsn_now_ms();

utils::auto_write_lock l(_lock);

for (const auto &collected_entity : collected_entities) {
auto iter = _entities.find(collected_entity);
if (dsn_unlikely(iter == _entities.end())) {
// The entity has been removed from the registry for some unusual reason.
continue;
}

if (!iter->second->is_stale()) {
if (iter->second->_retire_time_ms > 0) {
// For those entities which are reemployed, their scheduled time for retirement
// should be reset to 0 though previously they could have been scheduled to be
// retired.
iter->second->_retire_time_ms = 0;
++retired_stat.num_reemployed_entities;
}
continue;
}

if (dsn_unlikely(iter->second->_retire_time_ms > now)) {
// Since in collect_stale_entities() we've filtered the metrics which have been
// outside the retention interval, this is unlikely to happen. However, we still
// check here.
continue;
}

if (iter->second->_retire_time_ms == 0) {
// The entity should be marked with a scheduled time for retirement, since it has
// already been considered stale.
iter->second->_retire_time_ms = now + FLAGS_entity_retirement_delay_ms;
++retired_stat.num_recently_scheduled_entities;
continue;
}

// Once the entity is outside the retention interval, retire it from the registry.
_entities.erase(iter);
++retired_stat.num_retired_entities;
}

return retired_stat;
}

void metric_registry::process_stale_entities()
{
LOG_INFO("begin to process stale metric entities");

const auto &collected_info = collect_stale_entities();
const auto &retired_stat = retire_stale_entities(collected_info.collected_entities);

LOG_INFO("stat for metric entities: total={}, collected={}, retired={}, scheduled={}, "
"recently_scheduled={}, reemployed={}",
collected_info.num_all_entities,
collected_info.collected_entities.size(),
retired_stat.num_retired_entities,
collected_info.num_scheduled_entities,
retired_stat.num_recently_scheduled_entities,
retired_stat.num_reemployed_entities);
}

metric_prototype::metric_prototype(const ctor_args &args) : _args(args) {}

metric_prototype::~metric_prototype() {}
Expand All @@ -391,7 +546,7 @@ metric::metric(const metric_prototype *prototype) : _prototype(prototype) {}

closeable_metric::closeable_metric(const metric_prototype *prototype) : metric(prototype) {}

uint64_t percentile_timer::generate_initial_delay_ms(uint64_t interval_ms)
uint64_t metric_timer::generate_initial_delay_ms(uint64_t interval_ms)
{
CHECK_GT(interval_ms, 0);

Expand All @@ -403,7 +558,7 @@ uint64_t percentile_timer::generate_initial_delay_ms(uint64_t interval_ms)
return (rand::next_u64() % interval_seconds + 1) * 1000 + rand::next_u64() % 1000;
}

percentile_timer::percentile_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close)
metric_timer::metric_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close)
: _initial_delay_ms(generate_initial_delay_ms(interval_ms)),
_interval_ms(interval_ms),
_on_exec(on_exec),
Expand All @@ -413,10 +568,10 @@ percentile_timer::percentile_timer(uint64_t interval_ms, on_exec_fn on_exec, on_
_timer(new boost::asio::deadline_timer(tools::shared_io_service::instance().ios))
{
_timer->expires_from_now(boost::posix_time::milliseconds(_initial_delay_ms));
_timer->async_wait(std::bind(&percentile_timer::on_timer, this, std::placeholders::_1));
_timer->async_wait(std::bind(&metric_timer::on_timer, this, std::placeholders::_1));
}

void percentile_timer::close()
void metric_timer::close()
{
// If the timer has already expired when cancel() is called, then the handlers for asynchronous
// wait operations will:
Expand All @@ -433,15 +588,15 @@ void percentile_timer::close()
}
}

void percentile_timer::wait() { _completed.wait(); }
void metric_timer::wait() { _completed.wait(); }

void percentile_timer::on_close()
void metric_timer::on_close()
{
_on_close();
_completed.notify();
}

void percentile_timer::on_timer(const boost::system::error_code &ec)
void metric_timer::on_timer(const boost::system::error_code &ec)
{
// This macro is defined for the case that handlers for asynchronous wait operations are no
// longer cancelled. It just checks the internal state atomically (since close() can also be
Expand All @@ -465,7 +620,7 @@ void percentile_timer::on_timer(const boost::system::error_code &ec)
// Cancel can only be launched by close().
auto expected_state = state::kClosing;
CHECK(_state.compare_exchange_strong(expected_state, state::kClosed),
"wrong state for percentile_timer: {}, while expecting closing state",
"wrong state for metric_timer: {}, while expecting closing state",
static_cast<int>(expected_state));
on_close();

Expand All @@ -477,7 +632,7 @@ void percentile_timer::on_timer(const boost::system::error_code &ec)

TRY_PROCESS_TIMER_CLOSING();
_timer->expires_from_now(boost::posix_time::milliseconds(_interval_ms));
_timer->async_wait(std::bind(&percentile_timer::on_timer, this, std::placeholders::_1));
_timer->async_wait(std::bind(&metric_timer::on_timer, this, std::placeholders::_1));
#undef TRY_PROCESS_TIMER_CLOSING
}

Expand Down
Loading

0 comments on commit 624cae5

Please sign in to comment.