From 08aabc5e44e835eb48059fe7b3cac5fa2814d4b2 Mon Sep 17 00:00:00 2001 From: lu_peng_fan Date: Thu, 30 May 2024 09:54:23 +0800 Subject: [PATCH 1/7] fix(bulkload) bulkload downloading may cause many node coredump Resolve https://github.com/apache/incubator-pegasus/issues/2006 --- src/replica/bulk_load/replica_bulk_loader.cpp | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index b58afccc70..efe93a2c2e 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -534,7 +534,30 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, int32_t file_index, dist::block_service::block_filesystem *fs) { - const file_meta &f_meta = _metadata.files[file_index]; + if (_status != bulk_load_status::BLS_DOWNLOADING) { + LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}. " + "local_dir: {} , file_index is {}.", + enum_to_string(_status), + local_dir, + file_index); + return; + } + file_meta f_meta; + bool get_f_meta = true; + { + zauto_read_lock l(_lock); + if (file_index < _metadata.files.size()) { + f_meta = _metadata.files[file_index]; + } else { + get_f_meta = false; + } + } + if (!get_f_meta) { + LOG_WARNING_PREFIX("sst file index {} exceeds number of bulkload sst files, Cancel " + "download_sst_file task.", + file_index); + return; + } uint64_t f_size = 0; std::string f_md5; error_code ec = _stub->_block_service_manager.download_file( @@ -590,9 +613,17 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size); // download next file - if (file_index + 1 < _metadata.files.size()) { - const file_meta &next_f_meta = _metadata.files[file_index + 1]; - _download_files_task[next_f_meta.name] = + get_f_meta = true; + { + zauto_read_lock l(_lock); + if (file_index + 1 < _metadata.files.size()) { + f_meta = _metadata.files[file_index + 1]; + } else { + get_f_meta = false; + } + } + if (get_f_meta) { + _download_files_task[f_meta.name] = tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, tracker(), std::bind(&replica_bulk_loader::download_sst_file, From 2b90137a7cae311dbe0fa7adf11ea6513e966490 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Wed, 24 Jul 2024 20:54:51 +0800 Subject: [PATCH 2/7] fix(bulkload) Follow acelyc111's advice, remove get_f_meta variable. --- src/replica/bulk_load/replica_bulk_loader.cpp | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index efe93a2c2e..697083cb76 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -543,18 +543,14 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, return; } file_meta f_meta; - bool get_f_meta = true; { zauto_read_lock l(_lock); if (file_index < _metadata.files.size()) { f_meta = _metadata.files[file_index]; - } else { - get_f_meta = false; } } - if (!get_f_meta) { - LOG_WARNING_PREFIX("sst file index {} exceeds number of bulkload sst files, Cancel " - "download_sst_file task.", + if (f_meta.name.empty()) { + LOG_WARNING_PREFIX("Cannot get file_meta of {}, cancel download_sst_file task.", file_index); return; } @@ -613,16 +609,13 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size); // download next file - get_f_meta = true; { zauto_read_lock l(_lock); if (file_index + 1 < _metadata.files.size()) { f_meta = _metadata.files[file_index + 1]; - } else { - get_f_meta = false; } } - if (get_f_meta) { + if (!f_meta.name.empty()) { _download_files_task[f_meta.name] = tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, tracker(), From d040484bf89e69f273838ca9c8aaf256aed4c31e Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Tue, 6 Aug 2024 17:46:42 +0800 Subject: [PATCH 3/7] fix(Bulkload): Avoid use reference of `_metadata.files` in download_sst_file (#2006) replica_bulk_loader::clear_bulk_load_states function cannot cancel already downloading sst task, which access `_metadata.files` references. But clear_bulk_load_states function will clear `_metadata.files`. It's cause core dump. I use a copy of `_metadata.files` to solve this problem. --- src/replica/bulk_load/replica_bulk_loader.cpp | 83 +++++++++---------- src/replica/bulk_load/replica_bulk_loader.h | 2 +- 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 697083cb76..bda9fa8a97 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -474,11 +474,12 @@ error_code replica_bulk_loader::start_download(const std::string &remote_dir, // start download _is_downloading.store(true); - _download_task = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, - tracker(), - std::bind( - &replica_bulk_loader::download_files, this, provider_name, remote_dir, local_dir)); + _download_task = + tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, + tracker(), + [this, remote_dir, local_dir, download_file_metas, fs] { + download_sst_file(remote_dir, local_dir, download_file_metas, fs); + }); return ERR_OK; } @@ -519,41 +520,39 @@ void replica_bulk_loader::download_files(const std::string &provider_name, } // download sst files asynchronously - if (!_metadata.files.empty()) { - const file_meta &f_meta = _metadata.files[0]; - _download_files_task[f_meta.name] = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, - tracker(), - std::bind(&replica_bulk_loader::download_sst_file, this, remote_dir, local_dir, 0, fs)); + std::vector<::dsn::replication::file_meta> download_file_metas; + { + zauto_read_lock l(_lock); + std::copy(_metadata.files.begin(), + _metadata.files.end(), + std::back_inserter(download_file_metas)); + } + if (!download_file_metas.empty()) { + _download_files_task[download_file_metas.back().name] = + tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, + tracker(), + std::bind(&replica_bulk_loader::download_sst_file, + this, + remote_dir, + local_dir, + download_file_metas, + fs)); } } // ThreadPool: THREAD_POOL_DEFAULT -void replica_bulk_loader::download_sst_file(const std::string &remote_dir, - const std::string &local_dir, - int32_t file_index, - dist::block_service::block_filesystem *fs) +void replica_bulk_loader::download_sst_file( + const std::string &remote_dir, + const std::string &local_dir, + std::vector<::dsn::replication::file_meta> &download_file_metas, + dist::block_service::block_filesystem *fs) { if (_status != bulk_load_status::BLS_DOWNLOADING) { - LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}. " - "local_dir: {} , file_index is {}.", - enum_to_string(_status), - local_dir, - file_index); - return; - } - file_meta f_meta; - { - zauto_read_lock l(_lock); - if (file_index < _metadata.files.size()) { - f_meta = _metadata.files[file_index]; - } - } - if (f_meta.name.empty()) { - LOG_WARNING_PREFIX("Cannot get file_meta of {}, cancel download_sst_file task.", - file_index); + LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}.", + enum_to_string(_status)); return; } + const file_meta &f_meta = download_file_metas.back(); uint64_t f_size = 0; std::string f_md5; error_code ec = _stub->_block_service_manager.download_file( @@ -604,27 +603,19 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, return; } // download file succeed, update progress + download_file_metas.pop_back(); update_bulk_load_download_progress(f_size, f_meta.name); METRIC_VAR_INCREMENT(bulk_load_download_file_successful_count); METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size); // download next file - { - zauto_read_lock l(_lock); - if (file_index + 1 < _metadata.files.size()) { - f_meta = _metadata.files[file_index + 1]; - } - } - if (!f_meta.name.empty()) { - _download_files_task[f_meta.name] = + if (!download_file_metas.empty()) { + _download_files_task[download_file_metas.back().name] = tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, tracker(), - std::bind(&replica_bulk_loader::download_sst_file, - this, - remote_dir, - local_dir, - file_index + 1, - fs)); + [this, remote_dir, local_dir, download_file_metas, fs] { + download_sst_file(remote_dir, local_dir, download_file_metas, fs); + }); } } diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index 92f76810cd..18daa45a19 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -89,7 +89,7 @@ class replica_bulk_loader : replica_base // download sst files from remote provider void download_sst_file(const std::string &remote_dir, const std::string &local_dir, - int32_t file_index, + std::vector<::dsn::replication::file_meta> &download_file_metas, dist::block_service::block_filesystem *fs); // \return ERR_PATH_NOT_FOUND: file not exist From 020b105effaabb295dc23a7162f2173a0c2ef632 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Wed, 21 Aug 2024 17:16:46 +0800 Subject: [PATCH 4/7] fix(bulkload) follow clang-tidy advice --- src/replica/bulk_load/replica_bulk_loader.cpp | 42 +++++++++---------- src/replica/bulk_load/replica_bulk_loader.h | 1 + 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index bda9fa8a97..0028657b86 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -16,13 +16,15 @@ // under the License. #include +#include #include +#include #include +#include #include #include #include -#include #include "block_service/block_service_manager.h" #include "common/bulk_load_common.h" #include "common/gpid.h" @@ -474,12 +476,11 @@ error_code replica_bulk_loader::start_download(const std::string &remote_dir, // start download _is_downloading.store(true); - _download_task = - tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, - tracker(), - [this, remote_dir, local_dir, download_file_metas, fs] { - download_sst_file(remote_dir, local_dir, download_file_metas, fs); - }); + _download_task = tasking::enqueue( + LPC_BACKGROUND_BULK_LOAD, + tracker(), + std::bind( + &replica_bulk_loader::download_files, this, provider_name, remote_dir, local_dir)); return ERR_OK; } @@ -528,15 +529,12 @@ void replica_bulk_loader::download_files(const std::string &provider_name, std::back_inserter(download_file_metas)); } if (!download_file_metas.empty()) { - _download_files_task[download_file_metas.back().name] = - tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, - tracker(), - std::bind(&replica_bulk_loader::download_sst_file, - this, - remote_dir, - local_dir, - download_file_metas, - fs)); + _download_files_task[download_file_metas.back().name] = tasking::enqueue( + LPC_BACKGROUND_BULK_LOAD, + tracker(), + [this, remote_dir, local_dir, download_file_metas, fs]() mutable { + this->download_sst_file(remote_dir, local_dir, download_file_metas, fs); + }); } } @@ -610,12 +608,12 @@ void replica_bulk_loader::download_sst_file( // download next file if (!download_file_metas.empty()) { - _download_files_task[download_file_metas.back().name] = - tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, - tracker(), - [this, remote_dir, local_dir, download_file_metas, fs] { - download_sst_file(remote_dir, local_dir, download_file_metas, fs); - }); + _download_files_task[download_file_metas.back().name] = tasking::enqueue( + LPC_BACKGROUND_BULK_LOAD, + tracker(), + [this, remote_dir, local_dir, download_file_metas, fs]() mutable { + this->download_sst_file(remote_dir, local_dir, download_file_metas, fs); + }); } } diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index 18daa45a19..61156acf9e 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "bulk_load_types.h" #include "common/replication_other_types.h" From 46d15af3300522927d1e7e0ddd95546a1bf4baf4 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Fri, 23 Aug 2024 13:11:46 +0800 Subject: [PATCH 5/7] fix(bulkload) fix unit test. --- src/replica/bulk_load/replica_bulk_loader.cpp | 14 +++++--------- src/replica/bulk_load/replica_bulk_loader.h | 2 +- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 0028657b86..0f887bf61d 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -530,10 +530,8 @@ void replica_bulk_loader::download_files(const std::string &provider_name, } if (!download_file_metas.empty()) { _download_files_task[download_file_metas.back().name] = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, - tracker(), - [this, remote_dir, local_dir, download_file_metas, fs]() mutable { - this->download_sst_file(remote_dir, local_dir, download_file_metas, fs); + LPC_BACKGROUND_BULK_LOAD, tracker(), [=, file_metas = download_file_metas]() mutable { + this->download_sst_file(remote_dir, local_dir, std::move(file_metas), fs); }); } } @@ -542,7 +540,7 @@ void replica_bulk_loader::download_files(const std::string &provider_name, void replica_bulk_loader::download_sst_file( const std::string &remote_dir, const std::string &local_dir, - std::vector<::dsn::replication::file_meta> &download_file_metas, + std::vector<::dsn::replication::file_meta> &&download_file_metas, dist::block_service::block_filesystem *fs) { if (_status != bulk_load_status::BLS_DOWNLOADING) { @@ -609,10 +607,8 @@ void replica_bulk_loader::download_sst_file( // download next file if (!download_file_metas.empty()) { _download_files_task[download_file_metas.back().name] = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, - tracker(), - [this, remote_dir, local_dir, download_file_metas, fs]() mutable { - this->download_sst_file(remote_dir, local_dir, download_file_metas, fs); + LPC_BACKGROUND_BULK_LOAD, tracker(), [=, file_metas = download_file_metas]() mutable { + this->download_sst_file(remote_dir, local_dir, std::move(file_metas), fs); }); } } diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index 61156acf9e..042f5a8628 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -90,7 +90,7 @@ class replica_bulk_loader : replica_base // download sst files from remote provider void download_sst_file(const std::string &remote_dir, const std::string &local_dir, - std::vector<::dsn::replication::file_meta> &download_file_metas, + std::vector<::dsn::replication::file_meta> &&download_file_metas, dist::block_service::block_filesystem *fs); // \return ERR_PATH_NOT_FOUND: file not exist From ac4dcfbd7ac1370d38f2f74d59a4c97ba969fc4f Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Wed, 25 Sep 2024 11:20:18 +0800 Subject: [PATCH 6/7] fix(bulkload) remove file_metas --- src/replica/bulk_load/replica_bulk_loader.cpp | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 0f887bf61d..7e97b8ffd0 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -521,18 +521,16 @@ void replica_bulk_loader::download_files(const std::string &provider_name, } // download sst files asynchronously - std::vector<::dsn::replication::file_meta> download_file_metas; { zauto_read_lock l(_lock); - std::copy(_metadata.files.begin(), - _metadata.files.end(), - std::back_inserter(download_file_metas)); - } - if (!download_file_metas.empty()) { - _download_files_task[download_file_metas.back().name] = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, tracker(), [=, file_metas = download_file_metas]() mutable { - this->download_sst_file(remote_dir, local_dir, std::move(file_metas), fs); - }); + if (!_metadata.files.empty()) { + _download_files_task[_metadata.files.back().name] = tasking::enqueue( + LPC_BACKGROUND_BULK_LOAD, + tracker(), + [this, remote_dir, local_dir, file_meta = _metadata.files, fs]() mutable { + this->download_sst_file(remote_dir, local_dir, std::move(file_meta), fs); + }); + } } } @@ -607,8 +605,10 @@ void replica_bulk_loader::download_sst_file( // download next file if (!download_file_metas.empty()) { _download_files_task[download_file_metas.back().name] = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, tracker(), [=, file_metas = download_file_metas]() mutable { - this->download_sst_file(remote_dir, local_dir, std::move(file_metas), fs); + LPC_BACKGROUND_BULK_LOAD, + tracker(), + [this, remote_dir, local_dir, download_file_metas, fs]() mutable { + this->download_sst_file(remote_dir, local_dir, std::move(download_file_metas), fs); }); } } From f34680ff595831a389d8733a40dbfa6ed10aae02 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Wed, 25 Sep 2024 14:23:00 +0800 Subject: [PATCH 7/7] fix(bulkload) follow IWYU suggestion. --- src/replica/bulk_load/replica_bulk_loader.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 7e97b8ffd0..e659f47b57 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include