Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize]SparseKV speedup and memory save #32048

Merged
merged 29 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
00738e2
add PullSparseValue for pull sparse
seiriosPlus Mar 16, 2021
2180218
fix bug for PullSparseValue
seiriosPlus Mar 16, 2021
4603446
fix bug for PullSparseValue
seiriosPlus Mar 17, 2021
67fe1ef
fix bug for PullSparseValue
seiriosPlus Mar 17, 2021
67861ed
add test mode in lookuptable
seiriosPlus Mar 18, 2021
15a2691
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
seiriosPlus Mar 23, 2021
7e41ea3
fix bug
seiriosPlus Apr 1, 2021
b3ac492
revert API change
seiriosPlus Apr 2, 2021
f2d233b
merge develop
seiriosPlus Apr 2, 2021
00ffb70
merge develop
seiriosPlus Apr 2, 2021
5f5a4e1
large scale kv store optimize
seiriosPlus Apr 2, 2021
3af8fbc
replace std::unorded_map with roundrobin map
seiriosPlus Apr 2, 2021
ea42db7
replace std::unorded_map with roundrobin map
seiriosPlus Apr 6, 2021
a901f9f
merge sparse struct
seiriosPlus Apr 6, 2021
f8bfe1c
merge sparse struct
seiriosPlus Apr 6, 2021
1d798a0
merge develop
seiriosPlus Apr 7, 2021
b8e9e3c
Merge branch 'develop' into fix/sparse_embedding_test
seiriosPlus Apr 12, 2021
89eef16
remove cpplint
seiriosPlus Apr 12, 2021
454b9a8
remove cpplint
seiriosPlus Apr 13, 2021
c3c11f6
fix bug
seiriosPlus Apr 13, 2021
be27f4b
fix bug
seiriosPlus Apr 13, 2021
8065820
remove license
seiriosPlus Apr 14, 2021
b371177
remove license
seiriosPlus Apr 14, 2021
9785e47
Merge branch 'develop' into optimize/sparse_kv
seiriosPlus Apr 15, 2021
05c0838
merge develop
seiriosPlus Apr 15, 2021
d5305af
merge develop
seiriosPlus Apr 15, 2021
ac024bb
merge develop
seiriosPlus Apr 15, 2021
555e13a
fix cpp lint
seiriosPlus Apr 16, 2021
427c272
fix cpp lint
seiriosPlus Apr 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions paddle/fluid/distributed/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,44 @@ void FleetWrapper::CreateClient2ClientConnection() {
client2client_max_retry_);
}

std::future<int32_t> FleetWrapper::PullSparseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, std::vector<uint64_t>* fea_keys,
std::vector<std::vector<float>>* fea_values, int fea_value_dim) {
fea_keys->clear();
fea_keys->resize(0);
fea_keys->reserve(MAX_FEASIGN_NUM);
for (auto name : var_names) {
Variable* var = scope.FindVar(name);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
CHECK(tensor != nullptr) << "tensor of var " << name << " is null";
int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();
for (auto i = 0u; i < len; ++i) {
if (ids[i] == 0u) {
continue;
}
fea_keys->push_back(static_cast<uint64_t>(ids[i]));
}
}
fea_values->resize(fea_keys->size() + 1);
for (auto& t : *fea_values) {
t.resize(fea_value_dim);
}
std::vector<float*> pull_result_ptr;
for (auto& t : *fea_values) {
pull_result_ptr.push_back(t.data());
}

bool training = true;
return pserver_ptr_->_worker_ptr->pull_sparse(pull_result_ptr.data(),
table_id, fea_keys->data(),
fea_keys->size(), training);
}

void FleetWrapper::PullSparseVarsSync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, std::vector<uint64_t>* fea_keys,
Expand Down
9 changes: 9 additions & 0 deletions paddle/fluid/distributed/fleet.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ class FleetWrapper {
int fea_dim,
const std::vector<std::string>& var_emb_names);

// Pull sparse variables from server in async mode
// Param<in>: scope, table_id, var_names, fea_keys, fea_dim
// Param<out>: fea_values std::future
std::future<int32_t> PullSparseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<uint64_t>* fea_keys,
std::vector<std::vector<float>>* fea_values, int fea_dim);

// Pull sparse variables from server in sync mode
// pull immediately to tensors
// is_training is true means training, false means inference, the behavior is
Expand Down
12 changes: 6 additions & 6 deletions paddle/fluid/distributed/table/common_sparse_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,17 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const int mode) {
int64_t not_save_num = 0;
for (auto value : block->values_) {
if (mode == SaveMode::delta && !value.second->need_save_) {
for (auto& value : block->values_) {
if (mode == SaveMode::delta && !value.second.need_save_) {
not_save_num++;
continue;
}

auto* vs = value.second->data_.data();
auto* vs = value.second.data_;
std::stringstream ss;
auto id = value.first;
ss << id << "\t" << value.second->count_ << "\t"
<< value.second->unseen_days_ << "\t" << value.second->is_entry_ << "\t";
ss << id << "\t" << value.second.count_ << "\t" << value.second.unseen_days_
<< "\t" << value.second.is_entry_ << "\t";

for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
Expand All @@ -148,7 +148,7 @@ int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());

if (mode == SaveMode::base || mode == SaveMode::delta) {
value.second->need_save_ = false;
value.second.need_save_ = false;
}
}

Expand Down
69 changes: 50 additions & 19 deletions paddle/fluid/distributed/table/depends/large_scale_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
#include "paddle/fluid/distributed/thirdparty/round_robin.h"
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/rw_lock.h"
Expand All @@ -54,23 +55,53 @@ struct VALUE {
unseen_days_(0),
need_save_(false),
is_entry_(false) {
data_.resize(length);
memset(data_.data(), 0, sizeof(float) * length);
data_ = new float[length];
memset(data_, 0, sizeof(float) * length);
}

VALUE(const VALUE &value) {
length_ = value.length_;
count_ = value.count_;
unseen_days_ = value.unseen_days_;
need_save_ = value.need_save_;
is_entry_ = value.is_entry_;
data_ = new float[length_];
memcpy(data_, value.data_, sizeof(float) * length_);
}

VALUE &operator=(const VALUE &value) {
if (this != &value) {
delete[] data_;
length_ = value.length_;
count_ = value.count_;
unseen_days_ = value.unseen_days_;
need_save_ = value.need_save_;
is_entry_ = value.is_entry_;

data_ = new float[length_];
memcpy(data_, value.data_, sizeof(float) * length_);
}
return *this;
}

~VALUE() {
delete[] data_;
data_ = nullptr;
}

size_t length_;
std::vector<float> data_;
int count_;
int unseen_days_; // use to check knock-out
bool need_save_; // whether need to save
bool is_entry_; // whether knock-in
float *data_;
};

inline bool count_entry(std::shared_ptr<VALUE> value, int threshold) {
inline bool count_entry(VALUE *value, int threshold) {
return value->count_ >= threshold;
}

inline bool probility_entry(std::shared_ptr<VALUE> value, float threshold) {
inline bool probility_entry(VALUE *value, float threshold) {
UniformInitializer uniform = UniformInitializer({"uniform", "0", "0", "1"});
return uniform.GetValue() >= threshold;
}
Expand Down Expand Up @@ -148,7 +179,7 @@ class ValueBlock {
PADDLE_ENFORCE_EQ(
value_dims[i], value_dims_[i],
platform::errors::InvalidArgument("value dims is not match"));
pts.push_back(values->data_.data() +
pts.push_back(values.data_ +
value_offsets_.at(value_idx_.at(value_names[i])));
}
return pts;
Expand All @@ -158,19 +189,19 @@ class ValueBlock {
float *Init(const uint64_t &id, const bool with_update = true,
const int counter = 1) {
if (!Has(id)) {
values_[id] = std::make_shared<VALUE>(value_length_);
values_.emplace(std::make_pair(id, VALUE(value_length_)));
}

auto &value = values_.at(id);

if (with_update) {
AttrUpdate(value, counter);
AttrUpdate(&value, counter);
}

return value->data_.data();
return value.data_;
}

void AttrUpdate(std::shared_ptr<VALUE> value, const int counter) {
void AttrUpdate(VALUE *value, const int counter) {
// update state
value->unseen_days_ = 0;
value->count_ += counter;
Expand All @@ -180,7 +211,7 @@ class ValueBlock {
if (value->is_entry_) {
// initialize
for (int x = 0; x < value_names_.size(); ++x) {
initializers_[x]->GetValue(value->data_.data() + value_offsets_[x],
initializers_[x]->GetValue(value->data_ + value_offsets_[x],
value_dims_[x]);
}
value->need_save_ = true;
Expand All @@ -195,27 +226,27 @@ class ValueBlock {
// dont jude if (has(id))
float *Get(const uint64_t &id) {
auto &value = values_.at(id);
return value->data_.data();
return value.data_;
}

// for load, to reset count, unseen_days
std::shared_ptr<VALUE> GetValue(const uint64_t &id) { return values_.at(id); }
VALUE *GetValue(const uint64_t &id) { return &values_.at(id); }

bool GetEntry(const uint64_t &id) {
auto &value = values_.at(id);
return value->is_entry_;
return value.is_entry_;
}

void SetEntry(const uint64_t &id, const bool state) {
auto &value = values_.at(id);
value->is_entry_ = state;
value.is_entry_ = state;
}

void Shrink(const int threshold) {
for (auto iter = values_.begin(); iter != values_.end();) {
auto &value = iter->second;
value->unseen_days_++;
if (value->unseen_days_ >= threshold) {
value.unseen_days_++;
if (value.unseen_days_ >= threshold) {
iter = values_.erase(iter);
} else {
++iter;
Expand All @@ -235,7 +266,7 @@ class ValueBlock {
}

public:
std::unordered_map<uint64_t, std::shared_ptr<VALUE>> values_;
robin_hood::unordered_map<uint64_t, VALUE> values_;
size_t value_length_ = 0;

private:
Expand All @@ -244,7 +275,7 @@ class ValueBlock {
const std::vector<int> &value_offsets_;
const std::unordered_map<std::string, int> &value_idx_;

std::function<bool(std::shared_ptr<VALUE>)> entry_func_;
std::function<bool(VALUE *)> entry_func_;
std::vector<std::shared_ptr<Initializer>> initializers_;
};

Expand Down
Loading