From 99e949bd7f102f6ac59f0326be8587694c8ec95b Mon Sep 17 00:00:00 2001 From: zmxdream Date: Mon, 25 Jul 2022 11:46:42 +0800 Subject: [PATCH] Support adam&&passid (#45) * add general accessor * fix general accessor * code clean * code clean * code clean * code clean * code clean * fix type * code clean * adapt kernel optimize for general accessor * fix typo * fix kernel optimizer * remove HeterComm::set_gpu_accessor * code clean * code clean * code clean * code clean * code clean * code clean * code clean * support adam optimizer&&pass id cache stratege * support adam optimizer&&pass id cache stratege * fix * fix * fix * fix * fix --- paddle/fluid/framework/data_set.h | 14 ++++- paddle/fluid/framework/fleet/fleet_wrapper.cc | 4 +- paddle/fluid/framework/fleet/fleet_wrapper.h | 2 +- paddle/fluid/framework/fleet/heter_context.h | 1 + .../framework/fleet/heter_ps/feature_value.h | 60 +++++++++++-------- .../framework/fleet/heter_ps/heter_comm_inl.h | 1 + .../framework/fleet/heter_ps/heter_ps.cu | 3 + .../fluid/framework/fleet/ps_gpu_wrapper.cc | 21 ++++--- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 8 ++- paddle/fluid/pybind/data_set_py.cc | 2 + python/paddle/fluid/dataset.py | 14 +++++ .../fleet/parameter_server/pslib/__init__.py | 4 +- .../fleet/parameter_server/pslib/node.py | 5 +- 13 files changed, 97 insertions(+), 42 deletions(-) diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 100fa9b776292..1989e2e57a796 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -161,6 +161,9 @@ class Dataset { virtual std::vector GetSlots() = 0; + virtual void SetPassId(uint32_t pass_id) = 0; + virtual uint32_t GetPassID() = 0; + protected: virtual int ReceiveFromClient(int msg_type, int client_id, const std::string& msg) = 0; @@ -248,7 +251,15 @@ class DatasetImpl : public Dataset { bool discard_remaining_ins = false); virtual void DynamicAdjustReadersNum(int thread_num); virtual void SetFleetSendSleepSeconds(int seconds); - virtual std::vector GetSlots(); + virtual std::vector GetSlots(); + + virtual void SetPassId(uint32_t pass_id) { + pass_id_ = pass_id; + } + virtual uint32_t GetPassID() { + return pass_id_; + } + /* for enable_heterps_ virtual void EnableHeterps(bool enable_heterps) { enable_heterps_ = enable_heterps; @@ -275,6 +286,7 @@ class DatasetImpl : public Dataset { // TODO(yaoxuefeng) for SlotRecordDataset return -1; } + uint32_t pass_id_ = 0; std::vector> readers_; std::vector> preload_readers_; paddle::framework::Channel input_channel_; diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 7daa93ba00c4d..8c118e245c898 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -1402,9 +1402,9 @@ void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) { #endif } -void FleetWrapper::PrintTableStat(const uint64_t table_id) { +void FleetWrapper::PrintTableStat(const uint64_t table_id, uint32_t pass_id, size_t threshold) { #ifdef PADDLE_WITH_PSLIB - auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id); + auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id, pass_id, threshold); ret.wait(); int32_t err_code = ret.get(); if (err_code == -1) { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index c264f37ba93bb..a1a73ee98bf8d 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -272,7 +272,7 @@ class FleetWrapper { std::vector table_var_list, bool load_combine); - void PrintTableStat(const uint64_t table_id); + void PrintTableStat(const uint64_t table_id, uint32_t pass_id, uint64_t threshold); void SetFileNumOneShard(const uint64_t table_id, int file_num); // mode = 0, load all feature // mode = 1, load delta feature, which means load diff diff --git a/paddle/fluid/framework/fleet/heter_context.h b/paddle/fluid/framework/fleet/heter_context.h index 1c78c3d5355ae..9f25aa6f90a1d 100644 --- a/paddle/fluid/framework/fleet/heter_context.h +++ b/paddle/fluid/framework/fleet/heter_context.h @@ -250,6 +250,7 @@ class HeterContext { t.join(); } } + uint16_t pass_id_; }; } // end namespace framework diff --git a/paddle/fluid/framework/fleet/heter_ps/feature_value.h b/paddle/fluid/framework/fleet/heter_ps/feature_value.h index b201c31489cd5..0147622a84f6f 100644 --- a/paddle/fluid/framework/fleet/heter_ps/feature_value.h +++ b/paddle/fluid/framework/fleet/heter_ps/feature_value.h @@ -168,9 +168,9 @@ class CommonFeatureValueAccessor { // 根据mf_dim计算的总长度 __host__ __device__ int Dim(int mf_dim) { int tmp_embedx_sgd_dim = 1; - if (optimizer_type_ == 3) {//adam + if (mf_optimizer_type_ == 3) {//adam tmp_embedx_sgd_dim = mf_dim * 2 + 2; - } else if (optimizer_type_ == 4) { //shared_adam + } else if (mf_optimizer_type_ == 4) { //shared_adam tmp_embedx_sgd_dim = 4; } return 9 + embed_sgd_dim + tmp_embedx_sgd_dim + mf_dim; @@ -184,9 +184,9 @@ class CommonFeatureValueAccessor { // 根据mf_dim 计算的 mf_size byte数 __host__ __device__ int MFSize(int mf_dim) { int tmp_embedx_sgd_dim = 1; - if (optimizer_type_ == 3) { //adam + if (mf_optimizer_type_ == 3) { //adam tmp_embedx_sgd_dim = mf_dim * 2 + 2; - } else if (optimizer_type_ == 4) { //shared_adam + } else if (mf_optimizer_type_ == 4) { //shared_adam tmp_embedx_sgd_dim = 4; } return (tmp_embedx_sgd_dim + mf_dim) * sizeof(float); @@ -198,9 +198,9 @@ class CommonFeatureValueAccessor { // has mf int tmp_embedx_sgd_dim = 1; if ((int)MfSize(val) > 0) { - if (optimizer_type_ == 3) {//adam + if (mf_optimizer_type_ == 3) {//adam tmp_embedx_sgd_dim = int(MfDim(val)) * 2 + 2; - } else if (optimizer_type_ == 4) { //shared_adam + } else if (mf_optimizer_type_ == 4) { //shared_adam tmp_embedx_sgd_dim = 4; } return EmbedxG2SumIndex() + tmp_embedx_sgd_dim; @@ -225,7 +225,8 @@ class CommonFeatureValueAccessor { int embed_sgd_dim; int embedx_dim; int embedx_sgd_dim; - int optimizer_type_ = 1; // default optimizer is adagrad + int optimizer_type_ = 1; // default embed optimizer is adagrad + int mf_optimizer_type_ = 1; // default embedx optimizer is adagrad }; struct CommonPushValue { @@ -301,15 +302,21 @@ class CommonFeatureValueAccessor { __host__ __device__ ~CommonFeatureValueAccessor() {} __host__ int Initialize() { - - // TODO(zhangminxu): support adam/shared_adam + // NOTE(zhangminxu): gpups' sparse table optimizer type, + // now only support embed&embedx 's sparse optimizer is the same int optimizer_type = (_config.find("optimizer_type") == _config.end()) ? 1 : int(_config["optimizer_type"]); + int mf_optimizer_type = (_config.find("mf_optimizer_type") == _config.end()) + ? 1 + : int(_config["mf_optimizer_type"]); int sparse_embedx_dim = (_config.find("mf_embedx_dim") == _config.end()) ? 8 : int(_config["mf_embedx_dim"]); + // NOTE(zhangminxu): gpups' sparse table optimizer type, + // now only support embed&embedx 's sparse optimizer is the same + // we will set embedx_sgd_dim according to mf_optimizer_type later if (optimizer_type == 3) { //adam common_feature_value.embed_sgd_dim = 4; common_feature_value.embedx_sgd_dim = sparse_embedx_dim * 2 + 2; @@ -321,9 +328,11 @@ class CommonFeatureValueAccessor { common_feature_value.embedx_sgd_dim = 1; } common_feature_value.optimizer_type_ = optimizer_type; + common_feature_value.mf_optimizer_type_ = mf_optimizer_type; common_feature_value.embedx_dim = sparse_embedx_dim; VLOG(0) << "Initialize optimizer type: " << common_feature_value.optimizer_type_ + << " mf_optimizer_type: " << common_feature_value.mf_optimizer_type_ << " embed_sgd_dim: " << common_feature_value.embed_sgd_dim << " embedx_sgd_dim: " << common_feature_value.embedx_sgd_dim; @@ -336,24 +345,24 @@ __host__ void BuildFill(float* gpu_val, ::paddle::ps::ValueAccessor* _cpu_accessor, int mf_dim) { #if defined PADDLE_WITH_PSLIB - // auto* cpu_accessor = dynamic_cast<::paddle::ps::DownpourCtrDymfAccessor*>(_cpu_accessor); + auto* cpu_accessor = dynamic_cast<::paddle::ps::DownpourCtrDymfAccessor*>(_cpu_accessor); auto* cpu_val = reinterpret_cast<::paddle::ps::DownpourFixedFeatureValue*>(_cpu_val); float* ptr_val = cpu_val->data(); size_t cpu_dim = cpu_val->size(); gpu_val[common_feature_value.DeltaScoreIndex()] = - ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::delta_score_index()]; + ptr_val[cpu_accessor->get_delta_score_index()]; gpu_val[common_feature_value.ShowIndex()] = - ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::show_index()]; + ptr_val[cpu_accessor->get_show_index()]; gpu_val[common_feature_value.ClickIndex()] = - ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::click_index()]; + ptr_val[cpu_accessor->get_click_index()]; gpu_val[common_feature_value.SlotIndex()] = - ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::slot_index()]; + ptr_val[cpu_accessor->get_slot_index()]; // lr gpu_val[common_feature_value.EmbedWIndex()] = - ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::embed_w_index()]; + ptr_val[cpu_accessor->get_embed_w_index()]; // cpu_ptr *(reinterpret_cast(gpu_val + common_feature_value.CpuPtrIndex())) = (uint64_t)(cpu_val); @@ -362,10 +371,10 @@ __host__ void BuildFill(float* gpu_val, // for dymf && adagrad, embed_dim = 1 for (int i = 0; i < common_feature_value.EmbedDim(); i++) { gpu_val[common_feature_value.EmbedG2SumIndex() + i] = - ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::embed_g2sum_index() + i]; + ptr_val[cpu_accessor->get_embed_g2sum_index() + i]; } - ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::mf_dim_index()] = float(mf_dim); + ptr_val[cpu_accessor->get_mf_dim_index()] = float(mf_dim); gpu_val[common_feature_value.MfDimIndex()] = float(mf_dim); if (cpu_dim > 8) { @@ -392,7 +401,7 @@ __host__ void DumpFill(float* gpu_val, ::paddle::ps::ValueAccessor* _cpu_accessor, int mf_dim) { #if defined PADDLE_WITH_PSLIB - // auto* cpu_accessor = dynamic_cast<::paddle::ps::DownpourCtrDymfAccessor*>(_cpu_accessor); + auto* cpu_accessor = dynamic_cast<::paddle::ps::DownpourCtrDymfAccessor*>(_cpu_accessor); uint64_t cpu_addr = *(uint64_t*)(gpu_val + common_feature_value.CpuPtrIndex()); auto* downpour_value = (::paddle::ps::DownpourFixedFeatureValue*)cpu_addr; int downpour_value_size = downpour_value->size(); @@ -402,19 +411,20 @@ __host__ void DumpFill(float* gpu_val, } float* cpu_val = downpour_value->data(); - cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::delta_score_index()] = + cpu_val[cpu_accessor->get_delta_score_index()] = gpu_val[common_feature_value.DeltaScoreIndex()]; - cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::show_index()] = + cpu_val[cpu_accessor->get_show_index()] = gpu_val[common_feature_value.ShowIndex()]; - cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::click_index()] = + cpu_val[cpu_accessor->get_click_index()] = gpu_val[common_feature_value.ClickIndex()]; - cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::embed_w_index()] = + cpu_val[cpu_accessor->get_embed_w_index()] = gpu_val[common_feature_value.EmbedWIndex()]; - cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::slot_index()] = + cpu_val[cpu_accessor->get_slot_index()] = gpu_val[common_feature_value.SlotIndex()]; - // for dymf, i = 0 + + // for dymf && adagrad, embed_dim = 1 for (int i = 0; i < common_feature_value.EmbedDim(); i++) { - cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::embed_g2sum_index() + i] = + cpu_val[cpu_accessor->get_embed_g2sum_index() + i] = gpu_val[common_feature_value.EmbedG2SumIndex() + i]; } if ((int)gpu_val[common_feature_value.MfSizeIndex()] > 0) { diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h b/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h index ccabdff467f14..a692d1f0a51e7 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h @@ -308,6 +308,7 @@ void HeterComm::init_path() { } } } + VLOG(0) << "HeterComm init_path done"; } template diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_ps.cu b/paddle/fluid/framework/fleet/heter_ps/heter_ps.cu index a9e3c5216f2ae..0acb7086fabb9 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_ps.cu +++ b/paddle/fluid/framework/fleet/heter_ps/heter_ps.cu @@ -22,6 +22,9 @@ namespace framework { HeterPsBase* HeterPsBase::get_instance( size_t capacity, std::shared_ptr resource, std::string accessor_type, int optimizer_type) { + // NOTE(zhangminxu): gpups' sparse table optimizer type, + // now only support embed&embedx 's sparse optimizer is the same + // we will support using diff optimizer for embed&embedx if (accessor_type == "DownpourCtrDymfAccessor" && optimizer_type == 1) { // optimizer_type == 1 means adagrad auto* accessor_wrapper_ptr = GlobalAccessorFactory::GetInstance().GetAccessorWrapper(); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 7ad8c2b9669e6..4e10d22d02de1 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -363,7 +363,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { #endif timeline.Start(); - auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) { + auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr, &gpu_task](int i) { size_t key_size = local_keys[i].size(); int32_t status = -1; #ifdef PADDLE_WITH_PSLIB @@ -374,7 +374,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { while (true) { auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(i, reinterpret_cast(local_ptr[i].data()), this->table_id_, - local_keys[i].data(), key_size); + local_keys[i].data(), key_size, gpu_task->pass_id_); bool flag = true; tt.wait(); @@ -444,7 +444,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { }; auto ptl_dynamic_mf_func = [this, &local_dim_keys, &local_dim_ptr, - &fleet_ptr](int i, int j) { + &fleet_ptr, &gpu_task](int i, int j) { #ifdef PADDLE_WITH_PSLIB size_t key_size = local_dim_keys[i][j].size(); int32_t status = -1; @@ -452,7 +452,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { while (true) { auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(i, reinterpret_cast(local_dim_ptr[i][j].data()), this->table_id_, - local_dim_keys[i][j].data(), key_size); + local_dim_keys[i][j].data(), key_size, gpu_task->pass_id_); bool flag = true; tt.wait(); @@ -818,7 +818,6 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { //}; // multi-thread process - auto build_dymf_mem_pool = [this, &gpu_task, &accessor_wrapper_ptr](int i, int j) { this->HeterPs_->set_multi_mf_dim(multi_mf_dim_, max_mf_dim_); int mf_dim = this->index_dim_vec_[j]; @@ -942,7 +941,8 @@ void PSGPUWrapper::LoadIntoMemory(bool is_shuffle) { InitSlotInfo(); std::shared_ptr gpu_task = gpu_task_pool_.Get(); gpu_task->Reset(); - + gpu_task->pass_id_ = (uint16_t)(dataset_->GetPassID()); + dataset_mutex_.lock(); dataset_pipe_.push(dataset_); dataset_mutex_.unlock(); @@ -1367,6 +1367,7 @@ void add_sparse_optimizer( const std::string& prefix = "") { auto optimizer_name = sgd_param.name(); if (optimizer_name == "naive") { + config[prefix + "optimizer_type"] = 0; config[prefix + "learning_rate"] = sgd_param.naive().learning_rate(); config[prefix + "initial_range"] = sgd_param.naive().initial_range(); if (sgd_param.naive().weight_bounds_size() == 2) { @@ -1374,6 +1375,7 @@ void add_sparse_optimizer( config[prefix + "max_bound"] = sgd_param.naive().weight_bounds()[1]; } } else if (optimizer_name == "adagrad") { + config[prefix + "optimizer_type"] = 1; config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate(); config[prefix + "initial_range"] = sgd_param.adagrad().initial_range(); config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum(); @@ -1382,6 +1384,7 @@ void add_sparse_optimizer( config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1]; } } else if (optimizer_name == "std_adagrad") { + config[prefix + "optimizer_type"] = 2; config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate(); config[prefix + "initial_range"] = sgd_param.adagrad().initial_range(); config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum(); @@ -1390,6 +1393,7 @@ void add_sparse_optimizer( config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1]; } } else if (optimizer_name == "adam") { + config[prefix + "optimizer_type"] = 3; config[prefix + "learning_rate"] = sgd_param.adam().learning_rate(); config[prefix + "initial_range"] = sgd_param.adam().initial_range(); if (sgd_param.adam().weight_bounds_size() == 2) { @@ -1415,7 +1419,7 @@ void PSGPUWrapper::InitializeGPUServer(const std::string& fleet_desc) { std::unordered_map config; if (accessor_class == "DownpourFeatureValueAccessor" || accessor_class == "DownpourCtrAccessor" || - accessor_class == "DownpourCtrDoubleAccessor" || accessor_class == "DownpourCtrDymfAccessor") { + accessor_class == "DownpourCtrDoubleAccessor") { config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff(); config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff(); @@ -1498,7 +1502,8 @@ void PSGPUWrapper::InitializeGPUServer(const std::string& fleet_desc) { } } } else if (accessor_class == "DownpourUnitAccessor" || - accessor_class == "DownpourDoubleUnitAccessor") { + accessor_class == "DownpourDoubleUnitAccessor" || + accessor_class == "DownpourCtrDymfAccessor") { config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff(); config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff(); config["mf_create_thresholds"] = sparse_table_accessor.embedx_threshold(); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index ad821fa8bf8a6..405fecca1da04 100755 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -210,6 +210,12 @@ class PSGPUWrapper { #ifdef PADDLE_WITH_PSLIB auto fleet_ptr = FleetWrapper::GetInstance(); std::string dist_desc = fleet_ptr->GetDistDesc(); + fleet_ptr->GetCPUAccessor(cpu_accessor_); + PADDLE_ENFORCE_EQ( + cpu_accessor_ != nullptr, true, + platform::errors::PreconditionNotMet( + "You must initialize the cpu accessor first to use it.")); + VLOG(0) << "PSGPUWrapper initialize cpu accessor"; InitializeGPUServer(dist_desc); #endif } @@ -413,7 +419,7 @@ class PSGPUWrapper { private: #ifdef PADDLE_WITH_PSLIB - ::paddle::ps::ValueAccessor* cpu_accessor_; + ::paddle::ps::ValueAccessor* cpu_accessor_ = nullptr; #endif static std::shared_ptr s_instance_; diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 5e2274cb65138..dee112179be32 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -313,6 +313,8 @@ void BindDataset(py::module *m) { &framework::Dataset::SetFleetSendSleepSeconds, py::call_guard()) .def("enable_pv_merge", &framework::Dataset::EnablePvMerge, + py::call_guard()) + .def("set_pass_id", &framework::Dataset::SetPassId, py::call_guard()); py::class_(*m, "IterableDatasetWrapper") diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 84064669c0dc6..1cb58af07c296 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -73,10 +73,24 @@ def __init__(self): self.proto_desc.pipe_command = "cat" self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 + self.pass_id = 0 self.filelist = [] self.use_ps_gpu = False self.psgpu = None + def set_pass_id(self, pass_id): + """ + set_pass_id + """ + self.pass_id = pass_id + self.dataset.set_pass_id(pass_id) + + def get_pass_id(self): + """ + get_pass_id + """ + return self.pass_id + def set_pipe_command(self, pipe_command): """ Set pipe command of current dataset diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 8d803c0d5bd7d..9bcf78b6c9f02 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -313,7 +313,7 @@ def save_inference_model(self, """ self._fleet_ptr.save_model(dirname, 0) - def print_table_stat(self, table_id): + def print_table_stat(self, table_id, pass_id, threshold): """ print stat info of table_id, format: tableid, feasign size, mf size @@ -325,7 +325,7 @@ def print_table_stat(self, table_id): """ self._role_maker._barrier_worker() if self._role_maker.is_first_worker(): - self._fleet_ptr.print_table_stat(table_id) + self._fleet_ptr.print_table_stat(table_id, pass_id, threshold) self._role_maker._barrier_worker() def set_file_num_one_shard(self, table_id, file_num): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index b66ba0830ec00..91790af0b4804 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -141,7 +141,6 @@ def add_sparse_table(self, table_id, strategy): if accessor_class == 'DownpourFeatureValueAccessor' \ or accessor_class == 'DownpourCtrAccessor' \ - or accessor_class == 'DownpourCtrDymfAccessor' \ or accessor_class == 'DownpourCtrDoubleAccessor': table.accessor.sparse_sgd_param.learning_rate = strategy.get( 'sparse_learning_rate', 0.05) @@ -257,7 +256,9 @@ def add_sparse_table(self, table_id, strategy): table2.param = 2 table2.converter = converter table2.deconverter = deconverter - elif accessor_class == 'DownpourUnitAccessor' or accessor_class == 'DownpourDoubleUnitAccessor': + elif accessor_class == 'DownpourUnitAccessor' \ + or accessor_class == 'DownpourDoubleUnitAccessor' \ + or accessor_class == 'DownpourCtrDymfAccessor': self.add_sparse_table_common_config(table, strategy) self.add_sparse_optimizer(table.accessor.embed_sgd_param, strategy, "embed_")