Skip to content

Commit

Permalink
Merge pull request PaddlePaddle#62 from paddlebox-xpu/support_ascvrq
Browse files Browse the repository at this point in the history
for support ascvrq
  • Loading branch information
xymyeah authored Jun 6, 2024
2 parents 4acc69b + ef9e3b3 commit 783db57
Show file tree
Hide file tree
Showing 21 changed files with 1,044 additions and 324 deletions.
2 changes: 1 addition & 1 deletion cmake/external/xpu.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ if (WITH_BOX_PS OR WITH_XPU_KP)
CACHE STRING "" FORCE)
#"https://klx-sdk-release-public.su.bcebos.com/xdnn/release/2.6.0.1/${XPU_XDNN_DIR_NAME}.tar.gz"
set(XPU_XDNN_URL
"https://klx-sdk-release-public.su.bcebos.com/xdnn_train/dev/paddlebox/20240408/${XPU_XDNN_DIR_NAME}.tar.gz"
"https://klx-sdk-release-public.su.bcebos.com/xdnn_train/dev/paddlebox/20240605/${XPU_XDNN_DIR_NAME}.tar.gz"
CACHE STRING "" FORCE)
set(SCALOPUS_URL
"https://klx-sdk-release-public.su.bcebos.com/xdnn_train/dev/paddlebox/20230306/scalopus.tar.gz"
Expand Down
58 changes: 30 additions & 28 deletions paddle/fluid/framework/boxps_trainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc,
}

void BoxPSTrainer::InitOtherEnv(const ProgramDesc& main_program) {
if (need_dump_field_ || need_dump_param_) {
if (need_dump_field_) {
InitDumpEnv();
}
VLOG(3) << "init other env done.";
Expand Down Expand Up @@ -138,28 +138,29 @@ void BoxPSTrainer::DumpWork(int tid) {
}
}
void BoxPSTrainer::InitDumpEnv() {
queue_ = paddle::framework::MakeChannel<std::string>();
// Only set dump channel on the last section
for (int i = 0; i < thread_num_; ++i) {
workers_[i]->SetChannelWriter(queue_.get());
}
// TODO(hutuxian): should make it as a config
dump_futures_.clear();
auto pool = GetDumpThreadPool(dump_thread_num_);
for (int i = 0; i < dump_thread_num_; i++) {
dump_futures_.emplace_back(pool->Run([this, i]() { this->DumpWork(i); }));
}
VLOG(0) << "init dump write file thread num=" << dump_thread_num_;
// queue_ = paddle::framework::MakeChannel<std::string>();
// // Only set dump channel on the last section
// for (int i = 0; i < thread_num_; ++i) {
// workers_[i]->SetChannelWriter(queue_.get());
// }
// // TODO(hutuxian): should make it as a config
// dump_futures_.clear();
// auto pool = GetDumpThreadPool(dump_thread_num_);
// for (int i = 0; i < dump_thread_num_; i++) {
// dump_futures_.emplace_back(pool->Run([this, i]() { this->DumpWork(i); }));
// }
// VLOG(0) << "init dump write file thread num=" << dump_thread_num_;
localfs_mkdir(dump_fields_path_);
}
// final dump env
void BoxPSTrainer::FinalizeDumpEnv() {
queue_->Close();
for (auto& th : dump_futures_) {
th.get();
}
dump_futures_.clear();
queue_.reset();
VLOG(0) << "finalize dump write file thread";
// queue_->Close();
// for (auto& th : dump_futures_) {
// th.get();
// }
// dump_futures_.clear();
// queue_.reset();
// VLOG(0) << "finalize dump write file thread";
}

inline std::vector<std::shared_ptr<paddle::framework::ThreadPool>>&
Expand Down Expand Up @@ -221,15 +222,15 @@ void BoxPSTrainer::InitTrainerEnv(const ProgramDesc& main_program,
for (int i = 0; i < thread_num_; ++i) {
wait_futures_.emplace_back(
pool[i]->Run([this, i, &async_param_name, &main_program]() {
auto this_worker =
auto this_worker =
std::dynamic_pointer_cast<paddle::framework::BoxPSWorker>(
workers_[i]);
this_worker->SetRootScope(root_scope_);
if (async_mode_) {
this_worker->SetDenseTable(dense_table_.get());
this_worker->SetAsyncParamName(async_param_name);
}
this_worker->CreateDeviceResource(main_program);
this_worker->SetRootScope(root_scope_);
if (async_mode_) {
this_worker->SetDenseTable(dense_table_.get());
this_worker->SetAsyncParamName(async_param_name);
}
this_worker->CreateDeviceResource(main_program);
}));
}
RemoveOtherDeviceVars(main_program, root_scope_);
Expand Down Expand Up @@ -263,6 +264,7 @@ void BoxPSTrainer::RemoveOtherDeviceVars(const ProgramDesc& main_program,
unpersist_var_names.insert(name);
}
}

}
VLOG(0) << "root scope remove_params size = " << unpersist_var_names.size();
// 2. Get moment param
Expand Down Expand Up @@ -308,7 +310,7 @@ void BoxPSTrainer::Finalize() {
// must be after train thread, otherwise the ps_buffer_ will be closed first
dense_table_->Finalize();
}
if (need_dump_field_ || need_dump_param_) {
if (need_dump_field_) {
FinalizeDumpEnv();
}
root_scope_->DropKids();
Expand Down
Loading

0 comments on commit 783db57

Please sign in to comment.