diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index b58afccc70..e659f47b57 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -16,13 +16,14 @@ // under the License. #include +#include #include #include +#include #include #include #include -#include #include "block_service/block_service_manager.h" #include "common/bulk_load_common.h" #include "common/gpid.h" @@ -519,22 +520,32 @@ void replica_bulk_loader::download_files(const std::string &provider_name, } // download sst files asynchronously - if (!_metadata.files.empty()) { - const file_meta &f_meta = _metadata.files[0]; - _download_files_task[f_meta.name] = tasking::enqueue( - LPC_BACKGROUND_BULK_LOAD, - tracker(), - std::bind(&replica_bulk_loader::download_sst_file, this, remote_dir, local_dir, 0, fs)); + { + zauto_read_lock l(_lock); + if (!_metadata.files.empty()) { + _download_files_task[_metadata.files.back().name] = tasking::enqueue( + LPC_BACKGROUND_BULK_LOAD, + tracker(), + [this, remote_dir, local_dir, file_meta = _metadata.files, fs]() mutable { + this->download_sst_file(remote_dir, local_dir, std::move(file_meta), fs); + }); + } } } // ThreadPool: THREAD_POOL_DEFAULT -void replica_bulk_loader::download_sst_file(const std::string &remote_dir, - const std::string &local_dir, - int32_t file_index, - dist::block_service::block_filesystem *fs) +void replica_bulk_loader::download_sst_file( + const std::string &remote_dir, + const std::string &local_dir, + std::vector<::dsn::replication::file_meta> &&download_file_metas, + dist::block_service::block_filesystem *fs) { - const file_meta &f_meta = _metadata.files[file_index]; + if (_status != bulk_load_status::BLS_DOWNLOADING) { + LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}.", + enum_to_string(_status)); + return; + } + const file_meta &f_meta = download_file_metas.back(); uint64_t f_size = 0; std::string f_md5; error_code ec = _stub->_block_service_manager.download_file( @@ -585,22 +596,19 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir, return; } // download file succeed, update progress + download_file_metas.pop_back(); update_bulk_load_download_progress(f_size, f_meta.name); METRIC_VAR_INCREMENT(bulk_load_download_file_successful_count); METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size); // download next file - if (file_index + 1 < _metadata.files.size()) { - const file_meta &next_f_meta = _metadata.files[file_index + 1]; - _download_files_task[next_f_meta.name] = - tasking::enqueue(LPC_BACKGROUND_BULK_LOAD, - tracker(), - std::bind(&replica_bulk_loader::download_sst_file, - this, - remote_dir, - local_dir, - file_index + 1, - fs)); + if (!download_file_metas.empty()) { + _download_files_task[download_file_metas.back().name] = tasking::enqueue( + LPC_BACKGROUND_BULK_LOAD, + tracker(), + [this, remote_dir, local_dir, download_file_metas, fs]() mutable { + this->download_sst_file(remote_dir, local_dir, std::move(download_file_metas), fs); + }); } } diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index 92f76810cd..042f5a8628 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "bulk_load_types.h" #include "common/replication_other_types.h" @@ -89,7 +90,7 @@ class replica_bulk_loader : replica_base // download sst files from remote provider void download_sst_file(const std::string &remote_dir, const std::string &local_dir, - int32_t file_index, + std::vector<::dsn::replication::file_meta> &&download_file_metas, dist::block_service::block_filesystem *fs); // \return ERR_PATH_NOT_FOUND: file not exist