Skip to content

Commit

Permalink
Support adam&&passid (PaddlePaddle#45)
Browse files Browse the repository at this point in the history
* add general accessor

* fix general accessor

* code clean

* code clean

* code clean

* code clean

* code clean

* fix type

* code clean

* adapt kernel optimize for general accessor

* fix typo

* fix kernel optimizer

* remove HeterComm::set_gpu_accessor

* code clean

* code clean

* code clean

* code clean

* code clean

* code clean

* code clean

* support adam optimizer&&pass id cache stratege

* support adam optimizer&&pass id cache stratege

* fix

* fix

* fix

* fix

* fix
  • Loading branch information
zmxdream committed Jul 25, 2022
1 parent a571fa6 commit 99e949b
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 42 deletions.
14 changes: 13 additions & 1 deletion paddle/fluid/framework/data_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ class Dataset {

virtual std::vector<std::string> GetSlots() = 0;

virtual void SetPassId(uint32_t pass_id) = 0;
virtual uint32_t GetPassID() = 0;

protected:
virtual int ReceiveFromClient(int msg_type, int client_id,
const std::string& msg) = 0;
Expand Down Expand Up @@ -248,7 +251,15 @@ class DatasetImpl : public Dataset {
bool discard_remaining_ins = false);
virtual void DynamicAdjustReadersNum(int thread_num);
virtual void SetFleetSendSleepSeconds(int seconds);
virtual std::vector<std::string> GetSlots();
virtual std::vector<std::string> GetSlots();

virtual void SetPassId(uint32_t pass_id) {
pass_id_ = pass_id;
}
virtual uint32_t GetPassID() {
return pass_id_;
}

/* for enable_heterps_
virtual void EnableHeterps(bool enable_heterps) {
enable_heterps_ = enable_heterps;
Expand All @@ -275,6 +286,7 @@ class DatasetImpl : public Dataset {
// TODO(yaoxuefeng) for SlotRecordDataset
return -1;
}
uint32_t pass_id_ = 0;
std::vector<std::shared_ptr<paddle::framework::DataFeed>> readers_;
std::vector<std::shared_ptr<paddle::framework::DataFeed>> preload_readers_;
paddle::framework::Channel<T> input_channel_;
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/framework/fleet/fleet_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1402,9 +1402,9 @@ void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) {
#endif
}

void FleetWrapper::PrintTableStat(const uint64_t table_id) {
void FleetWrapper::PrintTableStat(const uint64_t table_id, uint32_t pass_id, size_t threshold) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id);
auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id, pass_id, threshold);
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/fleet/fleet_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class FleetWrapper {
std::vector<std::string> table_var_list,
bool load_combine);

void PrintTableStat(const uint64_t table_id);
void PrintTableStat(const uint64_t table_id, uint32_t pass_id, uint64_t threshold);
void SetFileNumOneShard(const uint64_t table_id, int file_num);
// mode = 0, load all feature
// mode = 1, load delta feature, which means load diff
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/fleet/heter_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ class HeterContext {
t.join();
}
}
uint16_t pass_id_;
};

} // end namespace framework
Expand Down
60 changes: 35 additions & 25 deletions paddle/fluid/framework/fleet/heter_ps/feature_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ class CommonFeatureValueAccessor {
// 根据mf_dim计算的总长度
__host__ __device__ int Dim(int mf_dim) {
int tmp_embedx_sgd_dim = 1;
if (optimizer_type_ == 3) {//adam
if (mf_optimizer_type_ == 3) {//adam
tmp_embedx_sgd_dim = mf_dim * 2 + 2;
} else if (optimizer_type_ == 4) { //shared_adam
} else if (mf_optimizer_type_ == 4) { //shared_adam
tmp_embedx_sgd_dim = 4;
}
return 9 + embed_sgd_dim + tmp_embedx_sgd_dim + mf_dim;
Expand All @@ -184,9 +184,9 @@ class CommonFeatureValueAccessor {
// 根据mf_dim 计算的 mf_size byte数
__host__ __device__ int MFSize(int mf_dim) {
int tmp_embedx_sgd_dim = 1;
if (optimizer_type_ == 3) { //adam
if (mf_optimizer_type_ == 3) { //adam
tmp_embedx_sgd_dim = mf_dim * 2 + 2;
} else if (optimizer_type_ == 4) { //shared_adam
} else if (mf_optimizer_type_ == 4) { //shared_adam
tmp_embedx_sgd_dim = 4;
}
return (tmp_embedx_sgd_dim + mf_dim) * sizeof(float);
Expand All @@ -198,9 +198,9 @@ class CommonFeatureValueAccessor {
// has mf
int tmp_embedx_sgd_dim = 1;
if ((int)MfSize(val) > 0) {
if (optimizer_type_ == 3) {//adam
if (mf_optimizer_type_ == 3) {//adam
tmp_embedx_sgd_dim = int(MfDim(val)) * 2 + 2;
} else if (optimizer_type_ == 4) { //shared_adam
} else if (mf_optimizer_type_ == 4) { //shared_adam
tmp_embedx_sgd_dim = 4;
}
return EmbedxG2SumIndex() + tmp_embedx_sgd_dim;
Expand All @@ -225,7 +225,8 @@ class CommonFeatureValueAccessor {
int embed_sgd_dim;
int embedx_dim;
int embedx_sgd_dim;
int optimizer_type_ = 1; // default optimizer is adagrad
int optimizer_type_ = 1; // default embed optimizer is adagrad
int mf_optimizer_type_ = 1; // default embedx optimizer is adagrad
};

struct CommonPushValue {
Expand Down Expand Up @@ -301,15 +302,21 @@ class CommonFeatureValueAccessor {
__host__ __device__ ~CommonFeatureValueAccessor() {}

__host__ int Initialize() {

// TODO(zhangminxu): support adam/shared_adam
// NOTE(zhangminxu): gpups' sparse table optimizer type,
// now only support embed&embedx 's sparse optimizer is the same
int optimizer_type = (_config.find("optimizer_type") == _config.end())
? 1
: int(_config["optimizer_type"]);
int mf_optimizer_type = (_config.find("mf_optimizer_type") == _config.end())
? 1
: int(_config["mf_optimizer_type"]);
int sparse_embedx_dim = (_config.find("mf_embedx_dim") == _config.end())
? 8
: int(_config["mf_embedx_dim"]);

// NOTE(zhangminxu): gpups' sparse table optimizer type,
// now only support embed&embedx 's sparse optimizer is the same
// we will set embedx_sgd_dim according to mf_optimizer_type later
if (optimizer_type == 3) { //adam
common_feature_value.embed_sgd_dim = 4;
common_feature_value.embedx_sgd_dim = sparse_embedx_dim * 2 + 2;
Expand All @@ -321,9 +328,11 @@ class CommonFeatureValueAccessor {
common_feature_value.embedx_sgd_dim = 1;
}
common_feature_value.optimizer_type_ = optimizer_type;
common_feature_value.mf_optimizer_type_ = mf_optimizer_type;
common_feature_value.embedx_dim = sparse_embedx_dim;

VLOG(0) << "Initialize optimizer type: " << common_feature_value.optimizer_type_
<< " mf_optimizer_type: " << common_feature_value.mf_optimizer_type_
<< " embed_sgd_dim: " << common_feature_value.embed_sgd_dim
<< " embedx_sgd_dim: " << common_feature_value.embedx_sgd_dim;

Expand All @@ -336,24 +345,24 @@ __host__ void BuildFill(float* gpu_val,
::paddle::ps::ValueAccessor* _cpu_accessor,
int mf_dim) {
#if defined PADDLE_WITH_PSLIB
// auto* cpu_accessor = dynamic_cast<::paddle::ps::DownpourCtrDymfAccessor*>(_cpu_accessor);
auto* cpu_accessor = dynamic_cast<::paddle::ps::DownpourCtrDymfAccessor*>(_cpu_accessor);
auto* cpu_val = reinterpret_cast<::paddle::ps::DownpourFixedFeatureValue*>(_cpu_val);
float* ptr_val = cpu_val->data();
size_t cpu_dim = cpu_val->size();

gpu_val[common_feature_value.DeltaScoreIndex()] =
ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::delta_score_index()];
ptr_val[cpu_accessor->get_delta_score_index()];
gpu_val[common_feature_value.ShowIndex()] =
ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::show_index()];
ptr_val[cpu_accessor->get_show_index()];
gpu_val[common_feature_value.ClickIndex()] =
ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::click_index()];
ptr_val[cpu_accessor->get_click_index()];

gpu_val[common_feature_value.SlotIndex()] =
ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::slot_index()];
ptr_val[cpu_accessor->get_slot_index()];

// lr
gpu_val[common_feature_value.EmbedWIndex()] =
ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::embed_w_index()];
ptr_val[cpu_accessor->get_embed_w_index()];

// cpu_ptr
*(reinterpret_cast<uint64_t*>(gpu_val + common_feature_value.CpuPtrIndex())) = (uint64_t)(cpu_val);
Expand All @@ -362,10 +371,10 @@ __host__ void BuildFill(float* gpu_val,
// for dymf && adagrad, embed_dim = 1
for (int i = 0; i < common_feature_value.EmbedDim(); i++) {
gpu_val[common_feature_value.EmbedG2SumIndex() + i] =
ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::embed_g2sum_index() + i];
ptr_val[cpu_accessor->get_embed_g2sum_index() + i];
}

ptr_val[::paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::mf_dim_index()] = float(mf_dim);
ptr_val[cpu_accessor->get_mf_dim_index()] = float(mf_dim);
gpu_val[common_feature_value.MfDimIndex()] = float(mf_dim);

if (cpu_dim > 8) {
Expand All @@ -392,7 +401,7 @@ __host__ void DumpFill(float* gpu_val,
::paddle::ps::ValueAccessor* _cpu_accessor,
int mf_dim) {
#if defined PADDLE_WITH_PSLIB
// auto* cpu_accessor = dynamic_cast<::paddle::ps::DownpourCtrDymfAccessor*>(_cpu_accessor);
auto* cpu_accessor = dynamic_cast<::paddle::ps::DownpourCtrDymfAccessor*>(_cpu_accessor);
uint64_t cpu_addr = *(uint64_t*)(gpu_val + common_feature_value.CpuPtrIndex());
auto* downpour_value = (::paddle::ps::DownpourFixedFeatureValue*)cpu_addr;
int downpour_value_size = downpour_value->size();
Expand All @@ -402,19 +411,20 @@ __host__ void DumpFill(float* gpu_val,
}
float* cpu_val = downpour_value->data();

cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::delta_score_index()] =
cpu_val[cpu_accessor->get_delta_score_index()] =
gpu_val[common_feature_value.DeltaScoreIndex()];
cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::show_index()] =
cpu_val[cpu_accessor->get_show_index()] =
gpu_val[common_feature_value.ShowIndex()];
cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::click_index()] =
cpu_val[cpu_accessor->get_click_index()] =
gpu_val[common_feature_value.ClickIndex()];
cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::embed_w_index()] =
cpu_val[cpu_accessor->get_embed_w_index()] =
gpu_val[common_feature_value.EmbedWIndex()];
cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::slot_index()] =
cpu_val[cpu_accessor->get_slot_index()] =
gpu_val[common_feature_value.SlotIndex()];
// for dymf, i = 0

// for dymf && adagrad, embed_dim = 1
for (int i = 0; i < common_feature_value.EmbedDim(); i++) {
cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::embed_g2sum_index() + i] =
cpu_val[cpu_accessor->get_embed_g2sum_index() + i] =
gpu_val[common_feature_value.EmbedG2SumIndex() + i];
}
if ((int)gpu_val[common_feature_value.MfSizeIndex()] > 0) {
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ void HeterComm<KeyType, ValType, GradType, GPUAccessor>::init_path() {
}
}
}
VLOG(0) << "HeterComm init_path done";
}

template <typename KeyType, typename ValType, typename GradType, typename GPUAccessor>
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/framework/fleet/heter_ps/heter_ps.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ namespace framework {

HeterPsBase* HeterPsBase::get_instance(
size_t capacity, std::shared_ptr<HeterPsResource> resource, std::string accessor_type, int optimizer_type) {
// NOTE(zhangminxu): gpups' sparse table optimizer type,
// now only support embed&embedx 's sparse optimizer is the same
// we will support using diff optimizer for embed&embedx
if (accessor_type == "DownpourCtrDymfAccessor" && optimizer_type == 1) { // optimizer_type == 1 means adagrad
auto* accessor_wrapper_ptr =
GlobalAccessorFactory::GetInstance().GetAccessorWrapper();
Expand Down
21 changes: 13 additions & 8 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
#endif

timeline.Start();
auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) {
auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr, &gpu_task](int i) {
size_t key_size = local_keys[i].size();
int32_t status = -1;
#ifdef PADDLE_WITH_PSLIB
Expand All @@ -374,7 +374,7 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
while (true) {
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(i,
reinterpret_cast<char**>(local_ptr[i].data()), this->table_id_,
local_keys[i].data(), key_size);
local_keys[i].data(), key_size, gpu_task->pass_id_);
bool flag = true;

tt.wait();
Expand Down Expand Up @@ -444,15 +444,15 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
};

auto ptl_dynamic_mf_func = [this, &local_dim_keys, &local_dim_ptr,
&fleet_ptr](int i, int j) {
&fleet_ptr, &gpu_task](int i, int j) {
#ifdef PADDLE_WITH_PSLIB
size_t key_size = local_dim_keys[i][j].size();
int32_t status = -1;
int32_t cnt = 0;
while (true) {
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(i,
reinterpret_cast<char**>(local_dim_ptr[i][j].data()), this->table_id_,
local_dim_keys[i][j].data(), key_size);
local_dim_keys[i][j].data(), key_size, gpu_task->pass_id_);
bool flag = true;

tt.wait();
Expand Down Expand Up @@ -818,7 +818,6 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
//};

// multi-thread process

auto build_dymf_mem_pool = [this, &gpu_task, &accessor_wrapper_ptr](int i, int j) {
this->HeterPs_->set_multi_mf_dim(multi_mf_dim_, max_mf_dim_);
int mf_dim = this->index_dim_vec_[j];
Expand Down Expand Up @@ -942,7 +941,8 @@ void PSGPUWrapper::LoadIntoMemory(bool is_shuffle) {
InitSlotInfo();
std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->Reset();

gpu_task->pass_id_ = (uint16_t)(dataset_->GetPassID());

dataset_mutex_.lock();
dataset_pipe_.push(dataset_);
dataset_mutex_.unlock();
Expand Down Expand Up @@ -1367,13 +1367,15 @@ void add_sparse_optimizer(
const std::string& prefix = "") {
auto optimizer_name = sgd_param.name();
if (optimizer_name == "naive") {
config[prefix + "optimizer_type"] = 0;
config[prefix + "learning_rate"] = sgd_param.naive().learning_rate();
config[prefix + "initial_range"] = sgd_param.naive().initial_range();
if (sgd_param.naive().weight_bounds_size() == 2) {
config[prefix + "min_bound"] = sgd_param.naive().weight_bounds()[0];
config[prefix + "max_bound"] = sgd_param.naive().weight_bounds()[1];
}
} else if (optimizer_name == "adagrad") {
config[prefix + "optimizer_type"] = 1;
config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate();
config[prefix + "initial_range"] = sgd_param.adagrad().initial_range();
config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum();
Expand All @@ -1382,6 +1384,7 @@ void add_sparse_optimizer(
config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1];
}
} else if (optimizer_name == "std_adagrad") {
config[prefix + "optimizer_type"] = 2;
config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate();
config[prefix + "initial_range"] = sgd_param.adagrad().initial_range();
config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum();
Expand All @@ -1390,6 +1393,7 @@ void add_sparse_optimizer(
config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1];
}
} else if (optimizer_name == "adam") {
config[prefix + "optimizer_type"] = 3;
config[prefix + "learning_rate"] = sgd_param.adam().learning_rate();
config[prefix + "initial_range"] = sgd_param.adam().initial_range();
if (sgd_param.adam().weight_bounds_size() == 2) {
Expand All @@ -1415,7 +1419,7 @@ void PSGPUWrapper::InitializeGPUServer(const std::string& fleet_desc) {
std::unordered_map<std::string, float> config;
if (accessor_class == "DownpourFeatureValueAccessor" ||
accessor_class == "DownpourCtrAccessor" ||
accessor_class == "DownpourCtrDoubleAccessor" || accessor_class == "DownpourCtrDymfAccessor") {
accessor_class == "DownpourCtrDoubleAccessor") {

config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff();
config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff();
Expand Down Expand Up @@ -1498,7 +1502,8 @@ void PSGPUWrapper::InitializeGPUServer(const std::string& fleet_desc) {
}
}
} else if (accessor_class == "DownpourUnitAccessor" ||
accessor_class == "DownpourDoubleUnitAccessor") {
accessor_class == "DownpourDoubleUnitAccessor" ||
accessor_class == "DownpourCtrDymfAccessor") {
config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff();
config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff();
config["mf_create_thresholds"] = sparse_table_accessor.embedx_threshold();
Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/framework/fleet/ps_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ class PSGPUWrapper {
#ifdef PADDLE_WITH_PSLIB
auto fleet_ptr = FleetWrapper::GetInstance();
std::string dist_desc = fleet_ptr->GetDistDesc();
fleet_ptr->GetCPUAccessor(cpu_accessor_);
PADDLE_ENFORCE_EQ(
cpu_accessor_ != nullptr, true,
platform::errors::PreconditionNotMet(
"You must initialize the cpu accessor first to use it."));
VLOG(0) << "PSGPUWrapper initialize cpu accessor";
InitializeGPUServer(dist_desc);
#endif
}
Expand Down Expand Up @@ -413,7 +419,7 @@ class PSGPUWrapper {
private:

#ifdef PADDLE_WITH_PSLIB
::paddle::ps::ValueAccessor* cpu_accessor_;
::paddle::ps::ValueAccessor* cpu_accessor_ = nullptr;
#endif

static std::shared_ptr<PSGPUWrapper> s_instance_;
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/pybind/data_set_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ void BindDataset(py::module *m) {
&framework::Dataset::SetFleetSendSleepSeconds,
py::call_guard<py::gil_scoped_release>())
.def("enable_pv_merge", &framework::Dataset::EnablePvMerge,
py::call_guard<py::gil_scoped_release>())
.def("set_pass_id", &framework::Dataset::SetPassId,
py::call_guard<py::gil_scoped_release>());

py::class_<IterableDatasetWrapper>(*m, "IterableDatasetWrapper")
Expand Down
Loading

0 comments on commit 99e949b

Please sign in to comment.