This repository has been archived by the owner on Jun 23, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 58
replica-server: remove checkpoint dirs when cold backup completed #216
Merged
Changes from 15 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
c3cb025
replica-server: delete checkpoint dir when backup completed
qinzuoyan 8dcf7b5
add logging
qinzuoyan f0f7d81
improve
qinzuoyan 4fb589b
smallfix
qinzuoyan aeee6a2
improve
qinzuoyan 66ca280
improve
qinzuoyan c21c80a
improve
qinzuoyan 41d8b74
improve
qinzuoyan d53f853
improve
qinzuoyan ab5eb6b
add comment
qinzuoyan aa512dd
smallfix
qinzuoyan 53fa032
fix typo
qinzuoyan 65be386
fix test
qinzuoyan 7c966e2
fix test
qinzuoyan 626f1d5
fix test
qinzuoyan 117dfc9
Merge branch 'master' into qinzuoyan
qinzuoyan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
#include <boost/lexical_cast.hpp> | ||
|
||
#include <dsn/utility/filesystem.h> | ||
#include <dsn/dist/fmt_logging.h> | ||
#include <dsn/dist/replication/replication_app_base.h> | ||
|
||
#include "dist/replication/common/block_service_manager.h" | ||
|
@@ -13,6 +14,7 @@ | |
namespace dsn { | ||
namespace replication { | ||
|
||
// backup_id == 0 means clear backup context and checkpoint dirs of the policy. | ||
void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response) | ||
{ | ||
_checker.only_one_thread_access(); | ||
|
@@ -22,13 +24,30 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo | |
cold_backup_context_ptr new_context( | ||
new cold_backup_context(this, request, _options->max_concurrent_uploading_file_count)); | ||
|
||
ddebug("%s: received cold backup request, partition_status = %s%s", | ||
new_context->name, | ||
enum_to_string(status()), | ||
backup_id == 0 ? ", this is a clear request" : ""); | ||
|
||
if (status() == partition_status::type::PS_PRIMARY || | ||
status() == partition_status::type::PS_SECONDARY) { | ||
cold_backup_context_ptr backup_context = nullptr; | ||
auto find = _cold_backup_contexts.find(policy_name); | ||
if (find != _cold_backup_contexts.end()) { | ||
backup_context = find->second; | ||
} else { | ||
if (backup_id == 0) { | ||
if (status() == partition_status::type::PS_PRIMARY) { | ||
// send clear request to secondaries | ||
send_backup_request_to_secondary(request); | ||
} | ||
// clear local checkpoint dirs in background thread | ||
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() { | ||
clear_backup_checkpoint(policy_name); | ||
}); | ||
return; | ||
} | ||
|
||
/// TODO: policy may change provider | ||
dist::block_service::block_filesystem *block_service = | ||
_stub->_block_service_manager.get_block_filesystem( | ||
|
@@ -55,17 +74,29 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo | |
policy_name.c_str()); | ||
cold_backup_status backup_status = backup_context->status(); | ||
|
||
if (backup_context->request.backup_id < backup_id || backup_status == ColdBackupCanceled) { | ||
// clear obsoleted backup firstly | ||
/// TODO: clear dir | ||
ddebug("%s: clear obsoleted cold backup, old_backup_id = %" PRId64 | ||
if (backup_id == 0 || backup_context->request.backup_id < backup_id || | ||
backup_status == ColdBackupCanceled) { | ||
// clear obsoleted backup context firstly | ||
ddebug("%s: clear obsoleted cold backup context, old_backup_id = %" PRId64 | ||
", old_backup_status = %s", | ||
new_context->name, | ||
backup_context->request.backup_id, | ||
cold_backup_status_to_string(backup_status)); | ||
backup_context->cancel(); | ||
_cold_backup_contexts.erase(policy_name); | ||
on_cold_backup(request, response); | ||
if (backup_id != 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 现在的backup_id是开始备份的时间戳吧?所以肯定不是0? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 是的,是时间戳 |
||
// go to another round | ||
on_cold_backup(request, response); | ||
} else { // backup_id == 0 | ||
if (status() == partition_status::type::PS_PRIMARY) { | ||
// send clear request to secondaries | ||
send_backup_request_to_secondary(request); | ||
} | ||
// clear local checkpoint dirs in background thread | ||
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() { | ||
clear_backup_checkpoint(policy_name); | ||
}); | ||
} | ||
return; | ||
} | ||
|
||
|
@@ -144,6 +175,14 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo | |
_cold_backup_contexts.erase(policy_name); | ||
} else if (backup_status == ColdBackupCompleted) { | ||
ddebug("%s: upload checkpoint completed, response ERR_OK", backup_context->name); | ||
// send clear request to secondaries | ||
backup_request new_request = request; | ||
new_request.backup_id = 0; | ||
send_backup_request_to_secondary(new_request); | ||
// clear local checkpoint dirs in background thread | ||
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() { | ||
clear_backup_checkpoint(policy_name); | ||
}); | ||
response.err = ERR_OK; | ||
} else { | ||
dwarn( | ||
|
@@ -201,6 +240,39 @@ backup_get_tmp_dir_name(const std::string &policy_name, int64_t backup_id, int64 | |
return std::string(buffer); | ||
} | ||
|
||
// returns true if this checkpoint dir belongs to the policy | ||
static bool is_policy_checkpoint(const std::string &chkpt_dirname, const std::string &policy_name) | ||
{ | ||
std::vector<std::string> strs; | ||
utils::split_args(chkpt_dirname.c_str(), strs, '.'); | ||
// backup_tmp.<policy_name>.* or backup.<policy_name>.* | ||
return strs.size() >= 2 && | ||
(strs[0] == std::string("backup_tmp") || strs[0] == std::string("backup")) && | ||
strs[1] == policy_name; | ||
} | ||
|
||
// get all backup checkpoint dirs which belong to the policy | ||
static bool get_policy_checkpoint_dirs(const std::string &dir, | ||
const std::string &policy, | ||
/*out*/ std::vector<std::string> &chkpt_dirs) | ||
{ | ||
chkpt_dirs.clear(); | ||
// list sub dirs | ||
std::vector<std::string> sub_dirs; | ||
if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) { | ||
derror("list sub dirs of dir %s failed", dir.c_str()); | ||
return false; | ||
} | ||
|
||
for (std::string &d : sub_dirs) { | ||
std::string dirname = utils::filesystem::get_file_name(d); | ||
if (is_policy_checkpoint(dirname, policy)) { | ||
chkpt_dirs.emplace_back(std::move(dirname)); | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
// returns: | ||
// 0 : not related | ||
// 1 : related (belong to this policy but not belong to this backup_context) | ||
|
@@ -263,13 +335,13 @@ static bool filter_checkpoint(const std::string &dir, | |
related_chkpt_dirs.clear(); | ||
// list sub dirs | ||
std::vector<std::string> sub_dirs; | ||
if (!dsn::utils::filesystem::get_subdirectories(dir, sub_dirs, false)) { | ||
if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) { | ||
derror("%s: list sub dirs of dir %s failed", backup_context->name, dir.c_str()); | ||
return false; | ||
} | ||
|
||
for (std::string &d : sub_dirs) { | ||
std::string dirname = ::dsn::utils::filesystem::get_file_name(d); | ||
std::string dirname = utils::filesystem::get_file_name(d); | ||
int ret = is_related_or_valid_checkpoint(dirname, backup_context); | ||
if (ret == 1) { | ||
related_chkpt_dirs.emplace_back(std::move(dirname)); | ||
|
@@ -291,7 +363,7 @@ statistic_file_infos_under_dir(const std::string &dir, | |
/*out*/ int64_t &total_size) | ||
{ | ||
std::vector<std::string> sub_files; | ||
if (!dsn::utils::filesystem::get_subfiles(dir, sub_files, false)) { | ||
if (!utils::filesystem::get_subfiles(dir, sub_files, false)) { | ||
derror("list sub files of dir %s failed", dir.c_str()); | ||
return false; | ||
} | ||
|
@@ -302,11 +374,11 @@ statistic_file_infos_under_dir(const std::string &dir, | |
for (std::string &file : sub_files) { | ||
std::pair<std::string, int64_t> file_info; | ||
|
||
if (!::dsn::utils::filesystem::file_size(file, file_info.second)) { | ||
if (!utils::filesystem::file_size(file, file_info.second)) { | ||
derror("get file size of %s failed", file.c_str()); | ||
return false; | ||
} | ||
file_info.first = ::dsn::utils::filesystem::get_file_name(file); | ||
file_info.first = utils::filesystem::get_file_name(file); | ||
total_size += file_info.second; | ||
|
||
file_infos.emplace_back(std::move(file_info)); | ||
|
@@ -334,6 +406,29 @@ static bool backup_parse_dir_name(const char *name, | |
} | ||
} | ||
|
||
// clear all checkpoint dirs of the policy | ||
void replica::clear_backup_checkpoint(const std::string &policy_name) | ||
{ | ||
ddebug_replica("clear all checkpoint dirs of policy({})", policy_name); | ||
auto backup_dir = _app->backup_dir(); | ||
if (!utils::filesystem::directory_exists(backup_dir)) { | ||
return; | ||
} | ||
std::vector<std::string> chkpt_dirs; | ||
if (!get_policy_checkpoint_dirs(backup_dir, policy_name, chkpt_dirs)) { | ||
dwarn_replica("get checkpoint dirs in backup dir({}) failed", backup_dir); | ||
return; | ||
} | ||
for (const std::string &dirname : chkpt_dirs) { | ||
std::string full_path = utils::filesystem::path_combine(backup_dir, dirname); | ||
if (utils::filesystem::remove_path(full_path)) { | ||
ddebug_replica("remove backup checkpoint dir({}) succeed", full_path); | ||
} else { | ||
dwarn_replica("remove backup checkpoint dir({}) failed", full_path); | ||
} | ||
} | ||
} | ||
|
||
// run in REPLICATION_LONG thread | ||
// Effection: | ||
// - may ignore_checkpoint() if in invalid status | ||
|
@@ -353,8 +448,8 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context) | |
|
||
// prepare back dir | ||
auto backup_dir = _app->backup_dir(); | ||
if (!dsn::utils::filesystem::directory_exists(backup_dir) && | ||
!dsn::utils::filesystem::create_directory(backup_dir)) { | ||
if (!utils::filesystem::directory_exists(backup_dir) && | ||
!utils::filesystem::create_directory(backup_dir)) { | ||
derror("%s: create backup dir %s failed", backup_context->name, backup_dir.c_str()); | ||
backup_context->fail_checkpoint("create backup dir failed"); | ||
return; | ||
|
@@ -372,7 +467,7 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context) | |
std::vector<std::pair<std::string, int64_t>> file_infos; | ||
int64_t total_size = 0; | ||
std::string valid_chkpt_full_path = | ||
::dsn::utils::filesystem::path_combine(backup_dir, valid_backup_chkpt_dirname); | ||
utils::filesystem::path_combine(backup_dir, valid_backup_chkpt_dirname); | ||
// parse checkpoint dirname | ||
std::string policy_name; | ||
int64_t backup_id = 0, decree = 0, timestamp = 0; | ||
|
@@ -423,11 +518,11 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context) | |
|
||
// clear related but not valid checkpoint | ||
for (const std::string &dirname : related_backup_chkpt_dirname) { | ||
std::string full_path = ::dsn::utils::filesystem::path_combine(backup_dir, dirname); | ||
std::string full_path = utils::filesystem::path_combine(backup_dir, dirname); | ||
ddebug("%s: found obsolete backup checkpoint dir(%s), remove it", | ||
backup_context->name, | ||
full_path.c_str()); | ||
if (!dsn::utils::filesystem::remove_path(full_path)) { | ||
if (!utils::filesystem::remove_path(full_path)) { | ||
dwarn("%s: remove obsolete backup checkpoint dir(%s) failed", | ||
backup_context->name, | ||
full_path.c_str()); | ||
|
@@ -578,7 +673,7 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont | |
|
||
// the real checkpoint decree may be larger than backup_context->checkpoint_decree, | ||
// so we need copy checkpoint to backup_checkpoint_tmp_dir_path, and then rename it. | ||
std::string backup_checkpoint_tmp_dir_path = ::dsn::utils::filesystem::path_combine( | ||
std::string backup_checkpoint_tmp_dir_path = utils::filesystem::path_combine( | ||
_app->backup_dir(), | ||
backup_get_tmp_dir_name(backup_context->request.policy.policy_name, | ||
backup_context->request.backup_id, | ||
|
@@ -592,7 +687,7 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont | |
"local_create_backup_checkpoint 10s later", | ||
backup_context->name, | ||
err.to_string()); | ||
dsn::utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path); | ||
utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path); | ||
tasking::enqueue( | ||
LPC_BACKGROUND_COLD_BACKUP, | ||
&_tracker, | ||
|
@@ -605,20 +700,20 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont | |
last_decree, | ||
backup_context->checkpoint_decree); | ||
backup_context->checkpoint_decree = last_decree; // update to real decree | ||
std::string backup_checkpoint_dir_path = ::dsn::utils::filesystem::path_combine( | ||
std::string backup_checkpoint_dir_path = utils::filesystem::path_combine( | ||
_app->backup_dir(), | ||
backup_get_dir_name(backup_context->request.policy.policy_name, | ||
backup_context->request.backup_id, | ||
backup_context->checkpoint_decree, | ||
backup_context->checkpoint_timestamp)); | ||
if (!dsn::utils::filesystem::rename_path(backup_checkpoint_tmp_dir_path, | ||
backup_checkpoint_dir_path)) { | ||
if (!utils::filesystem::rename_path(backup_checkpoint_tmp_dir_path, | ||
backup_checkpoint_dir_path)) { | ||
derror("%s: rename checkpoint dir(%s) to dir(%s) failed", | ||
backup_context->name, | ||
backup_checkpoint_tmp_dir_path.c_str(), | ||
backup_checkpoint_dir_path.c_str()); | ||
dsn::utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path); | ||
dsn::utils::filesystem::remove_path(backup_checkpoint_dir_path); | ||
utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path); | ||
utils::filesystem::remove_path(backup_checkpoint_dir_path); | ||
backup_context->fail_checkpoint("rename checkpoint dir failed"); | ||
return; | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感觉这个 check 应该放在 send_backup_request_to_secondary 里面,这样就不用每次调用都写一遍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
也就多写一遍,而且外面确实就需要检查一下,调用这个函数的角色就应当是primary