diff --git a/src/dist/replication/meta_server/meta_backup_service.cpp b/src/dist/replication/meta_server/meta_backup_service.cpp index a404c7938b..7caf4e76e5 100644 --- a/src/dist/replication/meta_server/meta_backup_service.cpp +++ b/src/dist/replication/meta_server/meta_backup_service.cpp @@ -77,7 +77,7 @@ void policy_context::start_backup_app_meta_unlocked(int32_t app_id) _backup_sig.c_str(), create_file_req.file_name.c_str()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, app_id]() { zauto_lock l(_lock); start_backup_app_meta_unlocked(app_id); @@ -113,7 +113,7 @@ void policy_context::start_backup_app_meta_unlocked(int32_t app_id) remote_file->file_name().c_str(), resp.err.to_string()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, app_id]() { zauto_lock l(_lock); start_backup_app_meta_unlocked(app_id); @@ -122,7 +122,7 @@ void policy_context::start_backup_app_meta_unlocked(int32_t app_id) _backup_service->backup_option().block_retry_delay_ms); } }, - nullptr); + &_tracker); } void policy_context::start_backup_app_partitions_unlocked(int32_t app_id) @@ -181,7 +181,7 @@ void policy_context::write_backup_app_finish_flag_unlocked(int32_t app_id, _backup_sig.c_str(), create_file_req.file_name.c_str()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, app_id, write_callback]() { zauto_lock l(_lock); write_backup_app_finish_flag_unlocked(app_id, write_callback); @@ -223,7 +223,7 @@ void policy_context::write_backup_app_finish_flag_unlocked(int32_t app_id, remote_file->file_name().c_str(), resp.err.to_string()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, app_id, write_callback]() { zauto_lock l(_lock); write_backup_app_finish_flag_unlocked(app_id, write_callback); @@ -245,9 +245,9 @@ void policy_context::finish_backup_app_unlocked(int32_t app_id) _cur_backup.end_time_ms = dsn_now_ms(); task_ptr write_backup_info_callback = - tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() { + tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() { task_ptr start_a_new_backup = - tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() { + tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() { zauto_lock l(_lock); auto iter = _backup_history.emplace(_cur_backup.backup_id, _cur_backup); dassert(iter.second, @@ -293,7 +293,7 @@ void policy_context::write_backup_info_unlocked(const backup_info &b_info, _backup_sig.c_str(), create_file_req.file_name.c_str()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, b_info, write_callback]() { zauto_lock l(_lock); write_backup_info_unlocked(b_info, write_callback); @@ -326,7 +326,7 @@ void policy_context::write_backup_info_unlocked(const backup_info &b_info, resp.err.to_string()); tasking::enqueue( LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, b_info, write_callback]() { zauto_lock l(_lock); write_backup_info_unlocked(b_info, write_callback); @@ -383,7 +383,7 @@ bool policy_context::update_partition_progress_unlocked(gpid pid, // let's update the progress-chain: partition => app => current_backup_instance if (--_progress.unfinished_partitions_per_app[pid.get_app_id()] == 0) { dsn::task_ptr task_after_write_finish_flag = - tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this, pid]() { + tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, pid]() { zauto_lock l(_lock); finish_backup_app_unlocked(pid.get_app_id()); }); @@ -428,7 +428,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid) pid.get_app_id(), pid.get_partition_index()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, pid]() { zauto_lock l(_lock); start_backup_partition_unlocked(pid); @@ -446,7 +446,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid) dsn::marshall(request, req); dsn::rpc_response_task_ptr rpc_callback = rpc::create_rpc_response_task( request, - nullptr, + &_tracker, [this, pid, partition_primary](error_code err, backup_response &&response) { on_backup_reply(err, std::move(response), pid, partition_primary); }); @@ -521,7 +521,7 @@ void policy_context::on_backup_reply(error_code err, // start another turn of backup no matter we encounter error or not finished tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, pid]() { zauto_lock l(_lock); start_backup_partition_unlocked(pid); @@ -602,7 +602,7 @@ void policy_context::sync_backup_to_remote_storage_unlocked(const backup_info &b _policy.policy_name.c_str(), b_info.backup_id); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, b_info, sync_callback, create_new_node]() { zauto_lock l(_lock); sync_backup_to_remote_storage_unlocked( @@ -635,7 +635,7 @@ void policy_context::continue_current_backup_unlocked() start_backup_app_meta_unlocked(app); } else { dsn::task_ptr task_after_write_finish_flag = - tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this, app]() { + tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, app]() { zauto_lock l(_lock); finish_backup_app_unlocked(app); }); @@ -705,7 +705,7 @@ void policy_context::issue_new_backup_unlocked() ddebug("%s: policy is disable, just ignore backup, try it later", _policy.policy_name.c_str()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this]() { zauto_lock l(_lock); issue_new_backup_unlocked(); @@ -717,7 +717,7 @@ void policy_context::issue_new_backup_unlocked() if (!should_start_backup_unlocked()) { tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this]() { zauto_lock l(_lock); issue_new_backup_unlocked(); @@ -737,7 +737,7 @@ void policy_context::issue_new_backup_unlocked() dwarn("%s: all apps have been dropped, ignore this backup and retry it later", _backup_sig.c_str()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this]() { zauto_lock l(_lock); issue_new_backup_unlocked(); @@ -745,10 +745,11 @@ void policy_context::issue_new_backup_unlocked() 0, _backup_service->backup_option().issue_backup_interval_ms); } else { - task_ptr continue_to_backup = tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() { - zauto_lock l(_lock); - continue_current_backup_unlocked(); - }); + task_ptr continue_to_backup = + tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() { + zauto_lock l(_lock); + continue_current_backup_unlocked(); + }); sync_backup_to_remote_storage_unlocked(_cur_backup, continue_to_backup, true); } } @@ -895,7 +896,7 @@ void policy_context::gc_backup_info_unlocked(const backup_info &info_to_gc) end_time); dsn::task_ptr sync_callback = - ::dsn::tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this, info_to_gc]() { + ::dsn::tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, info_to_gc]() { dist::block_service::remove_path_request req; req.path = cold_backup::get_backup_path( _backup_service->backup_root(), _policy.policy_name, info_to_gc.backup_id); @@ -907,7 +908,7 @@ void policy_context::gc_backup_info_unlocked(const backup_info &info_to_gc) // remove dir ok or dir is not exist if (resp.err == ERR_OK || resp.err == ERR_OBJECT_NOT_FOUND) { dsn::task_ptr remove_local_backup_info_task = tasking::create_task( - LPC_DEFAULT_CALLBACK, nullptr, [this, info_to_gc]() { + LPC_DEFAULT_CALLBACK, &_tracker, [this, info_to_gc]() { zauto_lock l(_lock); _backup_history.erase(info_to_gc.backup_id); issue_gc_backup_info_task_unlocked(); @@ -935,14 +936,14 @@ void policy_context::issue_gc_backup_info_task_unlocked() _policy.policy_name.c_str(), info.backup_id); - tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this, info]() { + tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this, info]() { gc_backup_info_unlocked(info); })->enqueue(); } else { // there is no extra backup to gc, we just issue a new task to call // issue_gc_backup_info_task_unlocked later dinfo("%s: no need to gc backup info, start it later", _policy.policy_name.c_str()); - tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() { + tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() { zauto_lock l(_lock); issue_gc_backup_info_task_unlocked(); })->enqueue(std::chrono::minutes(3)); @@ -983,7 +984,7 @@ void policy_context::sync_remove_backup_info(const backup_info &info, dsn::task_ _policy.policy_name.c_str()); tasking::enqueue( LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, [this, info, sync_callback]() { sync_remove_backup_info(info, sync_callback); }, 0, _backup_service->backup_option().meta_retry_delay_ms); @@ -1035,7 +1036,7 @@ void backup_service::start_create_policy_meta_root(dsn::task_ptr callback) _policy_meta_root.c_str()); dsn::tasking::enqueue( LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, std::bind(&backup_service::start_create_policy_meta_root, this, callback), 0, _opt.meta_retry_delay_ms); @@ -1063,7 +1064,7 @@ void backup_service::start_sync_policies() } else if (err == dsn::ERR_TIMEOUT) { derror("sync policies got timeout, retry it later"); dsn::tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, std::bind(&backup_service::start_sync_policies, this), 0, _opt.meta_retry_delay_ms); @@ -1196,7 +1197,7 @@ error_code backup_service::sync_policies_from_remote_storage() void backup_service::start() { dsn::task_ptr after_create_policy_meta_root = - tasking::create_task(LPC_DEFAULT_CALLBACK, nullptr, [this]() { start_sync_policies(); }); + tasking::create_task(LPC_DEFAULT_CALLBACK, &_tracker, [this]() { start_sync_policies(); }); start_create_policy_meta_root(after_create_policy_meta_root); } @@ -1308,7 +1309,7 @@ void backup_service::do_add_policy(dsn::message_ex *req, "(ms)", _opt.meta_retry_delay_ms.count()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, std::bind(&backup_service::do_add_policy, this, req, p, hint_msg), 0, _opt.meta_retry_delay_ms); @@ -1343,7 +1344,7 @@ void backup_service::do_update_policy_to_remote_storage( p.policy_name.c_str(), _opt.meta_retry_delay_ms.count()); tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, + &_tracker, std::bind(&backup_service::do_update_policy_to_remote_storage, this, req, diff --git a/src/dist/replication/meta_server/meta_backup_service.h b/src/dist/replication/meta_server/meta_backup_service.h index 3b4d55aa3e..eeebdf89b8 100644 --- a/src/dist/replication/meta_server/meta_backup_service.h +++ b/src/dist/replication/meta_server/meta_backup_service.h @@ -286,6 +286,7 @@ mock_private : perf_counter_wrapper _counter_policy_recent_backup_duration_ms; //clang-format on + dsn::task_tracker _tracker; }; class backup_service @@ -363,6 +364,7 @@ class backup_service backup_opt _opt; std::atomic_bool _in_initialize; + dsn::task_tracker _tracker; }; } } // namespace diff --git a/src/dist/replication/meta_server/server_state.cpp b/src/dist/replication/meta_server/server_state.cpp index 2a5f6f883d..af9478e629 100644 --- a/src/dist/replication/meta_server/server_state.cpp +++ b/src/dist/replication/meta_server/server_state.cpp @@ -990,7 +990,7 @@ void server_state::init_app_partition_node(std::shared_ptr &app, // TODO: add parameter of the retry time interval in config file tasking::enqueue( LPC_META_STATE_HIGH, - nullptr, + tracker(), std::bind(&server_state::init_app_partition_node, this, app, pidx, callback), 0, std::chrono::milliseconds(1000)); @@ -1022,7 +1022,7 @@ void server_state::do_app_create(std::shared_ptr &app) } else if (ERR_TIMEOUT == ec) { dwarn("the storage service is not available currently, continue to create later"); tasking::enqueue(LPC_META_STATE_HIGH, - nullptr, + tracker(), std::bind(&server_state::do_app_create, this, app), 0, std::chrono::seconds(1)); @@ -1127,7 +1127,7 @@ void server_state::do_app_drop(std::shared_ptr &app) } else if (ERR_TIMEOUT == ec) { dinfo("drop app(%s) prepare timeout, continue to drop later", app->get_logname()); tasking::enqueue(LPC_META_STATE_HIGH, - nullptr, + tracker(), std::bind(&server_state::do_app_drop, this, app), 0, std::chrono::seconds(1)); @@ -1510,7 +1510,7 @@ task_ptr server_state::update_configuration_on_remote( // NOTICE: pending_sync_task need to be reassigned return tasking::enqueue( LPC_META_STATE_HIGH, - nullptr, + tracker(), [this, config_request]() mutable { std::shared_ptr app = get_app(config_request->config.pid.get_app_id()); config_context &cc = @@ -1532,7 +1532,8 @@ task_ptr server_state::update_configuration_on_remote( std::bind(&server_state::on_update_configuration_on_remote_reply, this, std::placeholders::_1, - config_request)); + config_request), + tracker()); } void server_state::on_update_configuration_on_remote_reply( @@ -1549,7 +1550,7 @@ void server_state::on_update_configuration_on_remote_reply( if (ec == ERR_TIMEOUT) { cc.pending_sync_task = tasking::enqueue(LPC_META_STATE_HIGH, - nullptr, + tracker(), [this, config_request, &cc]() mutable { cc.pending_sync_task = update_configuration_on_remote(config_request); @@ -1603,7 +1604,7 @@ void server_state::recall_partition(std::shared_ptr &app, int pidx) process_one_partition(app); } else if (error == dsn::ERR_TIMEOUT) { tasking::enqueue(LPC_META_STATE_HIGH, - nullptr, + tracker(), std::bind(&server_state::recall_partition, this, app, pidx), server_state::sStateHash, std::chrono::seconds(1));