Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new_metrics): retire stale metric entities that are not used by any other object #1304

Merged
merged 18 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
510fd0d
feat(new_metrics): retire old metrics
empiredan Dec 27, 2022
1e4cee3
feat(new_metrics): retire old metrics
empiredan Dec 28, 2022
a0bd674
feat(new_metrics): retire old metrics
empiredan Dec 28, 2022
1f3992e
feat(new_metrics): retire old metrics
empiredan Dec 28, 2022
af636ed
feat(new_metrics): retire old metrics
empiredan Dec 29, 2022
5967754
feat(new_metrics): retire the old metrics and entities that are not i…
empiredan Dec 29, 2022
0cf2c9c
feat(new_metrics): retire the old metrics and entities that are not i…
empiredan Dec 30, 2022
b48b7b6
feat(new_metrics): retire the old metrics and entities that are not i…
empiredan Jan 3, 2023
2ec9235
feat(new_metrics): retire the old metrics and entities that are not i…
empiredan Jan 3, 2023
9d77373
feat(new_metrics): retire the old metrics and entities that are not i…
empiredan Jan 5, 2023
2aeee49
feat(new_metrics): retire the old metrics and entities that are not i…
empiredan Jan 5, 2023
99d5e68
feat(new_metrics): retire the old metrics and entities that are not i…
empiredan Jan 6, 2023
5da991b
feat(new_metrics): retire the old metrics and entities that are not i…
empiredan Jan 11, 2023
506cf80
Merge remote-tracking branch 'apache/master' into retire-old-metrics
Jan 31, 2023
3b02d78
feat(new_metrics): retire stale metric entities that are not used by …
empiredan Feb 1, 2023
390ec9c
feat(new_metrics): retire stale metric entities that are not used by …
empiredan Feb 1, 2023
e042cb7
feat(new_metrics): retire stale metric entities that are not used by …
empiredan Feb 2, 2023
b39c572
feat(new_metrics): retire stale metric entities that are not used by …
empiredan Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 173 additions & 18 deletions src/utils/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

#include "utils/metrics.h"

#include "runtime/api_layer1.h"
#include "utils/api_utilities.h"
#include "utils/flags.h"
#include "utils/rand.h"
#include "utils/shared_io_service.h"
#include "utils/string_conv.h"
#include "utils/strings.h"

namespace dsn {

DSN_DEFINE_uint64(metrics,
entity_retirement_delay_ms,
10 * 60 * 1000,
"The retention internal (milliseconds) for an entity after it becomes stale.");

metric_entity::metric_entity(const metric_entity_prototype *prototype,
const std::string &id,
const attr_map &attrs)
: _prototype(prototype), _id(id), _attrs(attrs)
: _prototype(prototype), _id(id), _attrs(attrs), _retire_time_ms(0)
{
}

Expand All @@ -45,13 +52,10 @@ void metric_entity::close(close_option option)
{
utils::auto_write_lock l(_lock);

// The reason why each metric is closed in the entity rather than in the destructor of each
// metric is that close() for the metric will return immediately without waiting for any close
// operation to be finished.
//
// Thus, to close all metrics owned by an entity, it's more efficient to firstly issue a close
// request for all metrics; then, just wait for all of the close operations to be finished.
// It's inefficient to wait for each metric to be closed one by one.
// To close all metrics owned by an entity, it's more efficient to firstly issue an asynchronous
// close request to each metric; then, just wait for all of the close operations to be finished.
// It's inefficient to wait for each metric to be closed one by one. Therefore, the metric is
// not closed in its destructor.
for (auto &m : _metrics) {
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
if (m.second->prototype()->type() == metric_type::kPercentile) {
auto p = down_cast<closeable_metric *>(m.second.get());
Expand Down Expand Up @@ -177,6 +181,17 @@ void metric_entity::take_snapshot(metric_json_writer &writer, const metric_filte
writer.EndObject();
}

bool metric_entity::is_stale() const
{
// Since this entity itself is still being accessed, its reference count should be 1
// at least.
CHECK_GE(get_count(), 1);

// This entity is considered stale once there is only one reference for it kept in the
// registry.
return get_count() == 1;
}

void metric_filters::extract_entity_metrics(const metric_entity::metric_map &candidates,
metric_entity::metric_map &target_metrics) const
{
Expand Down Expand Up @@ -312,10 +327,12 @@ metric_registry::metric_registry() : _http_service(this)
{
// We should ensure that metric_registry is destructed before shared_io_service is destructed.
// Once shared_io_service is destructed before metric_registry is destructed,
// boost::asio::io_service needed by metrics in metric_registry such as percentile_timer will
// boost::asio::io_service needed by metrics in metric_registry such as metric_timer will
// be released firstly, then will lead to heap-use-after-free error since percentiles in
// metric_registry are still running but the resources they needed have been released.
tools::shared_io_service::instance();

start_timer();
}

metric_registry::~metric_registry()
Expand All @@ -336,6 +353,39 @@ metric_registry::~metric_registry()
for (auto &entity : _entities) {
entity.second->close(metric_entity::close_option::kNoWait);
}

stop_timer();
}

void metric_registry::on_close() {}

void metric_registry::start_timer()
{
if (_timer) {
return;
}

// Once an entity is considered stale, it will be retired after the retention interval,
// namely FLAGS_entity_retirement_delay_ms milliseconds. Therefore, if the interval of
// the timer is also set to FLAGS_entity_retirement_delay_ms, in the next round, it's
// just about time to retire this entity.
_timer.reset(new metric_timer(FLAGS_entity_retirement_delay_ms,
std::bind(&metric_registry::process_stale_entities, this),
std::bind(&metric_registry::on_close, this)));
}

void metric_registry::stop_timer()
{
if (!_timer) {
return;
}

// Close the timer synchronously.
_timer->close();
_timer->wait();

// Reset the timer to mark that it has been stopped, now it could be started.
_timer.reset();
}

metric_registry::entity_map metric_registry::entities() const
Expand Down Expand Up @@ -383,6 +433,111 @@ metric_entity_ptr metric_registry::find_or_create_entity(const metric_entity_pro
return entity;
}

metric_registry::collected_entities_info metric_registry::collect_stale_entities() const
{
collected_entities_info collected_info;

auto now = dsn_now_ms();

utils::auto_read_lock l(_lock);

for (const auto &entity : _entities) {
if (!entity.second->is_stale()) {
if (entity.second->_retire_time_ms > 0) {
// This entity had been scheduled to be retired. However, it was reemployed
// after that. It has been in use since then, therefore its scheduled time
// for retirement should be reset to 0.
collected_info.collected_entities.insert(entity.first);
}
continue;
}

if (entity.second->_retire_time_ms > now) {
// This entity has been scheduled to be retired, however it is still within
// the retention interval. Thus do not collect it.
++collected_info.num_scheduled_entities;
continue;
}

collected_info.collected_entities.insert(entity.first);
}

collected_info.num_all_entities = _entities.size();
return collected_info;
}

metric_registry::retired_entities_stat
metric_registry::retire_stale_entities(const collected_entity_list &collected_entities)
{
if (collected_entities.empty()) {
// Do not lock for empty list.
return retired_entities_stat();
}

retired_entities_stat retired_stat;

auto now = dsn_now_ms();

utils::auto_write_lock l(_lock);

for (const auto &collected_entity : collected_entities) {
auto iter = _entities.find(collected_entity);
if (dsn_unlikely(iter == _entities.end())) {
// The entity has been removed from the registry for some unusual reason.
continue;
}

if (!iter->second->is_stale()) {
if (iter->second->_retire_time_ms > 0) {
// For those entities which are reemployed, their scheduled time for retirement
// should be reset to 0 though previously they could have been scheduled to be
// retired.
iter->second->_retire_time_ms = 0;
++retired_stat.num_reemployed_entities;
}
continue;
}

if (dsn_unlikely(iter->second->_retire_time_ms > now)) {
// Since in collect_stale_entities() we've filtered the metrics which have been
// outside the retention interval, this is unlikely to happen. However, we still
// check here.
continue;
}

if (iter->second->_retire_time_ms == 0) {
// The entity should be marked with a scheduled time for retirement, since it has
// already been considered stale.
iter->second->_retire_time_ms = now + FLAGS_entity_retirement_delay_ms;
++retired_stat.num_recently_scheduled_entities;
continue;
}

// Once the entity is outside the retention interval, retire it from the registry.
_entities.erase(iter);
++retired_stat.num_retired_entities;
}

return retired_stat;
}

void metric_registry::process_stale_entities()
{
LOG_INFO("begin to process stale metric entities");

const auto &collected_info = collect_stale_entities();
const auto &retired_stat = retire_stale_entities(collected_info.collected_entities);

LOG_INFO("stat for metric entities: total={}, collected={}, retired={}, scheduled={}, "
"recently_scheduled={}, reemployed={}",
collected_info.num_all_entities,
collected_info.collected_entities.size(),
retired_stat.num_retired_entities,
collected_info.num_scheduled_entities,
retired_stat.num_recently_scheduled_entities,
retired_stat.num_reemployed_entities);
}

metric_prototype::metric_prototype(const ctor_args &args) : _args(args) {}

metric_prototype::~metric_prototype() {}
Expand All @@ -391,7 +546,7 @@ metric::metric(const metric_prototype *prototype) : _prototype(prototype) {}

closeable_metric::closeable_metric(const metric_prototype *prototype) : metric(prototype) {}

uint64_t percentile_timer::generate_initial_delay_ms(uint64_t interval_ms)
uint64_t metric_timer::generate_initial_delay_ms(uint64_t interval_ms)
{
CHECK_GT(interval_ms, 0);

Expand All @@ -403,7 +558,7 @@ uint64_t percentile_timer::generate_initial_delay_ms(uint64_t interval_ms)
return (rand::next_u64() % interval_seconds + 1) * 1000 + rand::next_u64() % 1000;
}

percentile_timer::percentile_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close)
metric_timer::metric_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close)
: _initial_delay_ms(generate_initial_delay_ms(interval_ms)),
_interval_ms(interval_ms),
_on_exec(on_exec),
Expand All @@ -413,10 +568,10 @@ percentile_timer::percentile_timer(uint64_t interval_ms, on_exec_fn on_exec, on_
_timer(new boost::asio::deadline_timer(tools::shared_io_service::instance().ios))
{
_timer->expires_from_now(boost::posix_time::milliseconds(_initial_delay_ms));
_timer->async_wait(std::bind(&percentile_timer::on_timer, this, std::placeholders::_1));
_timer->async_wait(std::bind(&metric_timer::on_timer, this, std::placeholders::_1));
}

void percentile_timer::close()
void metric_timer::close()
{
// If the timer has already expired when cancel() is called, then the handlers for asynchronous
// wait operations will:
Expand All @@ -433,15 +588,15 @@ void percentile_timer::close()
}
}

void percentile_timer::wait() { _completed.wait(); }
void metric_timer::wait() { _completed.wait(); }

void percentile_timer::on_close()
void metric_timer::on_close()
{
_on_close();
_completed.notify();
}

void percentile_timer::on_timer(const boost::system::error_code &ec)
void metric_timer::on_timer(const boost::system::error_code &ec)
{
// This macro is defined for the case that handlers for asynchronous wait operations are no
// longer cancelled. It just checks the internal state atomically (since close() can also be
Expand All @@ -465,7 +620,7 @@ void percentile_timer::on_timer(const boost::system::error_code &ec)
// Cancel can only be launched by close().
auto expected_state = state::kClosing;
CHECK(_state.compare_exchange_strong(expected_state, state::kClosed),
"wrong state for percentile_timer: {}, while expecting closing state",
"wrong state for metric_timer: {}, while expecting closing state",
static_cast<int>(expected_state));
on_close();

Expand All @@ -477,7 +632,7 @@ void percentile_timer::on_timer(const boost::system::error_code &ec)

TRY_PROCESS_TIMER_CLOSING();
_timer->expires_from_now(boost::posix_time::milliseconds(_interval_ms));
_timer->async_wait(std::bind(&percentile_timer::on_timer, this, std::placeholders::_1));
_timer->async_wait(std::bind(&metric_timer::on_timer, this, std::placeholders::_1));
#undef TRY_PROCESS_TIMER_CLOSING
}

Expand Down
Loading