Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
fix(meta): bind task_tracker for each task in meta server (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored and acelyc111 committed Dec 3, 2019
1 parent d926d3a commit 0de9e39
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 39 deletions.
65 changes: 33 additions & 32 deletions src/dist/replication/meta_server/meta_backup_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void policy_context::start_backup_app_meta_unlocked(int32_t app_id)
_backup_sig.c_str(),
create_file_req.file_name.c_str());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, app_id]() {
zauto_lock l(_lock);
start_backup_app_meta_unlocked(app_id);
Expand Down Expand Up @@ -113,7 +113,7 @@ void policy_context::start_backup_app_meta_unlocked(int32_t app_id)
remote_file->file_name().c_str(),
resp.err.to_string());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, app_id]() {
zauto_lock l(_lock);
start_backup_app_meta_unlocked(app_id);
Expand All @@ -122,7 +122,7 @@ void policy_context::start_backup_app_meta_unlocked(int32_t app_id)
_backup_service->backup_option().block_retry_delay_ms);
}
},
nullptr);
&_tracker);
}

void policy_context::start_backup_app_partitions_unlocked(int32_t app_id)
Expand Down Expand Up @@ -181,7 +181,7 @@ void policy_context::write_backup_app_finish_flag_unlocked(int32_t app_id,
_backup_sig.c_str(),
create_file_req.file_name.c_str());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, app_id, write_callback]() {
zauto_lock l(_lock);
write_backup_app_finish_flag_unlocked(app_id, write_callback);
Expand Down Expand Up @@ -223,7 +223,7 @@ void policy_context::write_backup_app_finish_flag_unlocked(int32_t app_id,
remote_file->file_name().c_str(),
resp.err.to_string());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, app_id, write_callback]() {
zauto_lock l(_lock);
write_backup_app_finish_flag_unlocked(app_id, write_callback);
Expand All @@ -245,9 +245,9 @@ void policy_context::finish_backup_app_unlocked(int32_t app_id)
_cur_backup.end_time_ms = dsn_now_ms();

task_ptr write_backup_info_callback =
tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() {
tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() {
task_ptr start_a_new_backup =
tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() {
tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() {
zauto_lock l(_lock);
auto iter = _backup_history.emplace(_cur_backup.backup_id, _cur_backup);
dassert(iter.second,
Expand Down Expand Up @@ -293,7 +293,7 @@ void policy_context::write_backup_info_unlocked(const backup_info &b_info,
_backup_sig.c_str(),
create_file_req.file_name.c_str());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, b_info, write_callback]() {
zauto_lock l(_lock);
write_backup_info_unlocked(b_info, write_callback);
Expand Down Expand Up @@ -326,7 +326,7 @@ void policy_context::write_backup_info_unlocked(const backup_info &b_info,
resp.err.to_string());
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, b_info, write_callback]() {
zauto_lock l(_lock);
write_backup_info_unlocked(b_info, write_callback);
Expand Down Expand Up @@ -383,7 +383,7 @@ bool policy_context::update_partition_progress_unlocked(gpid pid,
// let's update the progress-chain: partition => app => current_backup_instance
if (--_progress.unfinished_partitions_per_app[pid.get_app_id()] == 0) {
dsn::task_ptr task_after_write_finish_flag =
tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this, pid]() {
tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, pid]() {
zauto_lock l(_lock);
finish_backup_app_unlocked(pid.get_app_id());
});
Expand Down Expand Up @@ -428,7 +428,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid)
pid.get_app_id(),
pid.get_partition_index());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, pid]() {
zauto_lock l(_lock);
start_backup_partition_unlocked(pid);
Expand All @@ -446,7 +446,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid)
dsn::marshall(request, req);
dsn::rpc_response_task_ptr rpc_callback = rpc::create_rpc_response_task(
request,
nullptr,
&_tracker,
[this, pid, partition_primary](error_code err, backup_response &&response) {
on_backup_reply(err, std::move(response), pid, partition_primary);
});
Expand Down Expand Up @@ -521,7 +521,7 @@ void policy_context::on_backup_reply(error_code err,

// start another turn of backup no matter we encounter error or not finished
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, pid]() {
zauto_lock l(_lock);
start_backup_partition_unlocked(pid);
Expand Down Expand Up @@ -602,7 +602,7 @@ void policy_context::sync_backup_to_remote_storage_unlocked(const backup_info &b
_policy.policy_name.c_str(),
b_info.backup_id);
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, b_info, sync_callback, create_new_node]() {
zauto_lock l(_lock);
sync_backup_to_remote_storage_unlocked(
Expand Down Expand Up @@ -635,7 +635,7 @@ void policy_context::continue_current_backup_unlocked()
start_backup_app_meta_unlocked(app);
} else {
dsn::task_ptr task_after_write_finish_flag =
tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this, app]() {
tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, app]() {
zauto_lock l(_lock);
finish_backup_app_unlocked(app);
});
Expand Down Expand Up @@ -705,7 +705,7 @@ void policy_context::issue_new_backup_unlocked()
ddebug("%s: policy is disable, just ignore backup, try it later",
_policy.policy_name.c_str());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
Expand All @@ -717,7 +717,7 @@ void policy_context::issue_new_backup_unlocked()

if (!should_start_backup_unlocked()) {
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
Expand All @@ -737,18 +737,19 @@ void policy_context::issue_new_backup_unlocked()
dwarn("%s: all apps have been dropped, ignore this backup and retry it later",
_backup_sig.c_str());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this]() {
zauto_lock l(_lock);
issue_new_backup_unlocked();
},
0,
_backup_service->backup_option().issue_backup_interval_ms);
} else {
task_ptr continue_to_backup = tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() {
zauto_lock l(_lock);
continue_current_backup_unlocked();
});
task_ptr continue_to_backup =
tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() {
zauto_lock l(_lock);
continue_current_backup_unlocked();
});
sync_backup_to_remote_storage_unlocked(_cur_backup, continue_to_backup, true);
}
}
Expand Down Expand Up @@ -895,7 +896,7 @@ void policy_context::gc_backup_info_unlocked(const backup_info &info_to_gc)
end_time);

dsn::task_ptr sync_callback =
::dsn::tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this, info_to_gc]() {
::dsn::tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, info_to_gc]() {
dist::block_service::remove_path_request req;
req.path = cold_backup::get_backup_path(
_backup_service->backup_root(), _policy.policy_name, info_to_gc.backup_id);
Expand All @@ -907,7 +908,7 @@ void policy_context::gc_backup_info_unlocked(const backup_info &info_to_gc)
// remove dir ok or dir is not exist
if (resp.err == ERR_OK || resp.err == ERR_OBJECT_NOT_FOUND) {
dsn::task_ptr remove_local_backup_info_task = tasking::create_task(
LPC_DEFAULT_CALLBACK, nullptr, [this, info_to_gc]() {
LPC_DEFAULT_CALLBACK, &_tracker, [this, info_to_gc]() {
zauto_lock l(_lock);
_backup_history.erase(info_to_gc.backup_id);
issue_gc_backup_info_task_unlocked();
Expand Down Expand Up @@ -935,14 +936,14 @@ void policy_context::issue_gc_backup_info_task_unlocked()
_policy.policy_name.c_str(),
info.backup_id);

tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this, info]() {
tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, info]() {
gc_backup_info_unlocked(info);
})->enqueue();
} else {
// there is no extra backup to gc, we just issue a new task to call
// issue_gc_backup_info_task_unlocked later
dinfo("%s: no need to gc backup info, start it later", _policy.policy_name.c_str());
tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() {
tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() {
zauto_lock l(_lock);
issue_gc_backup_info_task_unlocked();
})->enqueue(std::chrono::minutes(3));
Expand Down Expand Up @@ -983,7 +984,7 @@ void policy_context::sync_remove_backup_info(const backup_info &info, dsn::task_
_policy.policy_name.c_str());
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
[this, info, sync_callback]() { sync_remove_backup_info(info, sync_callback); },
0,
_backup_service->backup_option().meta_retry_delay_ms);
Expand Down Expand Up @@ -1035,7 +1036,7 @@ void backup_service::start_create_policy_meta_root(dsn::task_ptr callback)
_policy_meta_root.c_str());
dsn::tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
std::bind(&backup_service::start_create_policy_meta_root, this, callback),
0,
_opt.meta_retry_delay_ms);
Expand Down Expand Up @@ -1063,7 +1064,7 @@ void backup_service::start_sync_policies()
} else if (err == dsn::ERR_TIMEOUT) {
derror("sync policies got timeout, retry it later");
dsn::tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
std::bind(&backup_service::start_sync_policies, this),
0,
_opt.meta_retry_delay_ms);
Expand Down Expand Up @@ -1196,7 +1197,7 @@ error_code backup_service::sync_policies_from_remote_storage()
void backup_service::start()
{
dsn::task_ptr after_create_policy_meta_root =
tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() { start_sync_policies(); });
tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() { start_sync_policies(); });
start_create_policy_meta_root(after_create_policy_meta_root);
}

Expand Down Expand Up @@ -1308,7 +1309,7 @@ void backup_service::do_add_policy(dsn::message_ex *req,
"(ms)",
_opt.meta_retry_delay_ms.count());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
std::bind(&backup_service::do_add_policy, this, req, p, hint_msg),
0,
_opt.meta_retry_delay_ms);
Expand Down Expand Up @@ -1343,7 +1344,7 @@ void backup_service::do_update_policy_to_remote_storage(
p.policy_name.c_str(),
_opt.meta_retry_delay_ms.count());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
&_tracker,
std::bind(&backup_service::do_update_policy_to_remote_storage,
this,
req,
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/meta_server/meta_backup_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ mock_private :

perf_counter_wrapper _counter_policy_recent_backup_duration_ms;
//clang-format on
dsn::task_tracker _tracker;
};

class backup_service
Expand Down Expand Up @@ -363,6 +364,7 @@ class backup_service

backup_opt _opt;
std::atomic_bool _in_initialize;
dsn::task_tracker _tracker;
};
}
} // namespace
15 changes: 8 additions & 7 deletions src/dist/replication/meta_server/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ void server_state::init_app_partition_node(std::shared_ptr<app_state> &app,
// TODO: add parameter of the retry time interval in config file
tasking::enqueue(
LPC_META_STATE_HIGH,
nullptr,
tracker(),
std::bind(&server_state::init_app_partition_node, this, app, pidx, callback),
0,
std::chrono::milliseconds(1000));
Expand Down Expand Up @@ -1022,7 +1022,7 @@ void server_state::do_app_create(std::shared_ptr<app_state> &app)
} else if (ERR_TIMEOUT == ec) {
dwarn("the storage service is not available currently, continue to create later");
tasking::enqueue(LPC_META_STATE_HIGH,
nullptr,
tracker(),
std::bind(&server_state::do_app_create, this, app),
0,
std::chrono::seconds(1));
Expand Down Expand Up @@ -1127,7 +1127,7 @@ void server_state::do_app_drop(std::shared_ptr<app_state> &app)
} else if (ERR_TIMEOUT == ec) {
dinfo("drop app(%s) prepare timeout, continue to drop later", app->get_logname());
tasking::enqueue(LPC_META_STATE_HIGH,
nullptr,
tracker(),
std::bind(&server_state::do_app_drop, this, app),
0,
std::chrono::seconds(1));
Expand Down Expand Up @@ -1510,7 +1510,7 @@ task_ptr server_state::update_configuration_on_remote(
// NOTICE: pending_sync_task need to be reassigned
return tasking::enqueue(
LPC_META_STATE_HIGH,
nullptr,
tracker(),
[this, config_request]() mutable {
std::shared_ptr<app_state> app = get_app(config_request->config.pid.get_app_id());
config_context &cc =
Expand All @@ -1532,7 +1532,8 @@ task_ptr server_state::update_configuration_on_remote(
std::bind(&server_state::on_update_configuration_on_remote_reply,
this,
std::placeholders::_1,
config_request));
config_request),
tracker());
}

void server_state::on_update_configuration_on_remote_reply(
Expand All @@ -1549,7 +1550,7 @@ void server_state::on_update_configuration_on_remote_reply(
if (ec == ERR_TIMEOUT) {
cc.pending_sync_task =
tasking::enqueue(LPC_META_STATE_HIGH,
nullptr,
tracker(),
[this, config_request, &cc]() mutable {
cc.pending_sync_task =
update_configuration_on_remote(config_request);
Expand Down Expand Up @@ -1603,7 +1604,7 @@ void server_state::recall_partition(std::shared_ptr<app_state> &app, int pidx)
process_one_partition(app);
} else if (error == dsn::ERR_TIMEOUT) {
tasking::enqueue(LPC_META_STATE_HIGH,
nullptr,
tracker(),
std::bind(&server_state::recall_partition, this, app, pidx),
server_state::sStateHash,
std::chrono::seconds(1));
Expand Down

0 comments on commit 0de9e39

Please sign in to comment.