Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Float feature final #261

Merged
merged 44 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
5eb7b8c
adapt float feature
zmxdream Apr 3, 2023
8c6e185
ad float feature
zmxdream Apr 10, 2023
24ded28
fix float feature
zmxdream Apr 12, 2023
d411d28
fix float feature
zmxdream Apr 12, 2023
a99e3f6
fix float feature
zmxdream Apr 13, 2023
724cb05
fix float feature
zmxdream Apr 13, 2023
41121e2
fix float feature
zmxdream Apr 13, 2023
056dfae
fix float feature
zmxdream Apr 14, 2023
ae9d110
fix float feature
zmxdream Apr 15, 2023
3525c85
fix float feature
zmxdream Apr 17, 2023
09e102f
fix float feature
zmxdream Apr 17, 2023
37a614f
fix float feature
zmxdream Apr 17, 2023
f144d73
fix float feature
zmxdream Apr 19, 2023
af73ba7
fix float feature
zmxdream Apr 19, 2023
d1c044a
fix float feature
zmxdream Apr 20, 2023
d0b44c2
fix float feature
zmxdream Apr 21, 2023
dec4195
fix float feature
zmxdream Apr 21, 2023
131b47a
fix float feature
zmxdream Apr 21, 2023
26767ab
fix float feature
zmxdream Apr 22, 2023
b19897b
fix float feature
zmxdream Apr 22, 2023
72f3883
fix float feature
zmxdream Apr 23, 2023
5c1c15e
fix float feature
zmxdream Apr 23, 2023
6c023fc
fix float feature
zmxdream Apr 23, 2023
9a8940c
fix float feature
zmxdream Apr 23, 2023
09b2918
fix float feature
zmxdream Apr 23, 2023
9b704f3
fix float feature
zmxdream Apr 23, 2023
7ed9e47
fix float feature
zmxdream Apr 23, 2023
3d97c68
fix float feature
zmxdream Apr 23, 2023
05aa8bf
fix float feature
zmxdream Apr 24, 2023
ec3c14f
fix float feature
zmxdream Apr 24, 2023
99c34bb
fix float feature
zmxdream Apr 24, 2023
8149a4c
fix float feature
zmxdream Apr 25, 2023
3a0b4c5
fix float feature
zmxdream Apr 25, 2023
a950c02
fix float feature
zmxdream Apr 26, 2023
35a727a
fix float feature
zmxdream Apr 26, 2023
5ef3a5a
fix float feature
zmxdream Apr 26, 2023
aec8010
fix float feature
zmxdream Apr 26, 2023
4b454cb
fix memory increase
zmxdream Apr 28, 2023
63af2eb
code clean
zmxdream Apr 28, 2023
69424d4
code clean
zmxdream Apr 28, 2023
17b4892
code clean
zmxdream Apr 28, 2023
de193bd
Merge remote-tracking branch 'origin/gpugraph_new' into fix_memory
zmxdream May 9, 2023
0e41e35
code clean
zmxdream May 9, 2023
ecf5787
code clean
zmxdream May 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 156 additions & 32 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,96 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
return res;
}

paddle::framework::GpuPsCommGraphFloatFea GraphTable::make_gpu_ps_graph_float_fea(
int gpu_id, std::vector<uint64_t> &node_ids, int float_slot_num) {
size_t shard_num = 64;
std::vector<std::vector<uint64_t>> bags(shard_num);
std::vector<float> feature_array[shard_num];
std::vector<uint8_t> slot_id_array[shard_num];
std::vector<uint64_t> node_id_array[shard_num];
std::vector<paddle::framework::GpuPsFeaInfo> node_fea_info_array[shard_num];
for (size_t i = 0; i < shard_num; i++) {
auto predsize = node_ids.size() / shard_num;
bags[i].reserve(predsize * 1.2);
feature_array[i].reserve(predsize * 1.2 * float_slot_num);
slot_id_array[i].reserve(predsize * 1.2 * float_slot_num);
node_id_array[i].reserve(predsize * 1.2);
node_fea_info_array[i].reserve(predsize * 1.2);
}

for (auto x : node_ids) {
int location = x % shard_num;
bags[location].push_back(x);
}

std::vector<std::future<int>> tasks;

for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_cpu_worker_pool[gpu_id]->enqueue([&, i, this]() -> int {
uint64_t node_id;
paddle::framework::GpuPsFeaInfo x;
// std::vector<uint64_t> feature_ids;
for (size_t j = 0; j < bags[i].size(); j++) {
Node *v = find_node(GraphTableType::FEATURE_TABLE, bags[i][j]);
node_id = bags[i][j];
if (v == NULL) {
x.feature_size = 0;
x.feature_offset = 0;
node_fea_info_array[i].push_back(x);
} else {
// x <- v
x.feature_offset = feature_array[i].size();
int total_feature_size = 0;
for (int k = 0; k < float_slot_num; ++k) {
auto float_feature_size =
v->get_float_feature(k, feature_array[i], slot_id_array[i]);
total_feature_size += float_feature_size;
}
x.feature_size = total_feature_size;
node_fea_info_array[i].push_back(x);
}
node_id_array[i].push_back(node_id);
}
return 0;
}));
}
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();

tasks.clear();

paddle::framework::GpuPsCommGraphFloatFea res;
uint64_t tot_len = 0;
for (size_t i = 0; i < shard_num; i++) {
tot_len += feature_array[i].size();
}
VLOG(1) << "Loaded float feature table on cpu, float feature_list_size[" << tot_len
<< "] node_ids_size[" << node_ids.size() << "]";
res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), float_slot_num);
unsigned int offset = 0, ind = 0;
for (size_t i = 0; i < shard_num; i++) {
tasks.push_back(
_cpu_worker_pool[gpu_id]->enqueue([&, i, ind, offset, this]() -> int {
auto start = ind;
for (size_t j = 0; j < node_id_array[i].size(); j++) {
res.node_list[start] = node_id_array[i][j];
res.fea_info_list[start] = node_fea_info_array[i][j];
res.fea_info_list[start++].feature_offset += offset;
}
for (size_t j = 0; j < feature_array[i].size(); j++) {
res.feature_list[offset + j] = feature_array[i][j];
res.slot_id_list[offset + j] = slot_id_array[i][j];
}
return 0;
}));
offset += feature_array[i].size();
ind += node_id_array[i].size();
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
return res;
}

paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
int idx, const std::vector<uint64_t> &ids) {
std::vector<std::vector<uint64_t>> bags(task_pool_size_);
Expand Down Expand Up @@ -1231,16 +1321,19 @@ GraphNode *GraphShard::add_graph_node(Node *node) {
return reinterpret_cast<GraphNode *>(bucket[node_location[id]]);
}

FeatureNode *GraphShard::add_feature_node(uint64_t id, bool is_overlap) {
FeatureNode *GraphShard::add_feature_node(uint64_t id, bool is_overlap, int float_fea_num) {
if (node_location.find(id) == node_location.end()) {
node_location[id] = bucket.size();
bucket.push_back(new FeatureNode(id));
if (float_fea_num > 0) {
bucket.push_back(new FloatFeatureNode(id));
} else {
bucket.push_back(new FeatureNode(id));
}
return reinterpret_cast<FeatureNode *>(bucket[node_location[id]]);
}
if (is_overlap) {
return reinterpret_cast<FeatureNode *>(bucket[node_location[id]]);
}

return NULL;
}

Expand Down Expand Up @@ -1851,10 +1944,15 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
local_count++;

size_t index = shard_id - shard_start;
int slot_fea_num = 0;
if (feat_name.size() > 0) slot_fea_num = feat_name[idx].size();
int float_fea_num = 0;
if (float_feat_id_map.size() > 0) float_fea_num = float_feat_id_map[idx].size();
if (load_slot) {
auto node = feature_shards[idx][index]->add_feature_node(id, false);
auto node = feature_shards[idx][index]->add_feature_node(id, false, float_fea_num);
if (node != NULL) {
node->set_feature_size(feat_name[idx].size());
if (slot_fea_num > 0) node->set_feature_size(slot_fea_num);
if (float_fea_num > 0) node->set_float_feature_size(float_fea_num);
for (int i = 1; i < num; ++i) {
auto &v = vals[i];
int ret = parse_feature(idx, v.ptr, v.len, node);
Expand All @@ -1866,7 +1964,7 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
}
}
} else {
node_shards[idx][index]->add_feature_node(id, false);
node_shards[idx][index]->add_feature_node(id, false, float_fea_num);
}
local_valid_count++;
}
Expand Down Expand Up @@ -1920,8 +2018,10 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
}
}
size_t index = shard_id - shard_start;
int float_fea_num = 0;
if (float_feat_id_map.size() > 0) float_fea_num = float_feat_id_map[idx].size();
if (load_slot) {
auto node = feature_shards[idx][index]->add_feature_node(id, false);
auto node = feature_shards[idx][index]->add_feature_node(id, false, float_fea_num);
if (node != NULL) {
for (int i = 2; i < num; ++i) {
auto &v = vals[i];
Expand All @@ -1934,7 +2034,7 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
}
}
} else {
node_shards[idx][index]->add_feature_node(id, false);
node_shards[idx][index]->add_feature_node(id, false, float_fea_num);
}
local_valid_count++;
}
Expand Down Expand Up @@ -2529,22 +2629,6 @@ int GraphTable::parse_feature(int idx,
string_vector_2_string(
fea_fields.begin(), fea_fields.end(), ' ', fea_ptr);
return 0;
} else if (dtype == "float32") {
int ret = FeatureNode::parse_value_to_bytes<float>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "float64") {
int ret = FeatureNode::parse_value_to_bytes<double>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "int32") {
int ret = FeatureNode::parse_value_to_bytes<int32_t>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
Expand All @@ -2563,10 +2647,36 @@ int GraphTable::parse_feature(int idx,
return 0;
}
} else {
VLOG(10) << "feature_name[" << name << "] is not in feat_id_map, ntype_id["
<< idx << "] feat_id_map_size[" << feat_id_map.size() << "]";
if (float_feat_id_map.size() > (size_t)idx) {
auto float_it = float_feat_id_map[idx].find(name);
if (float_it != float_feat_id_map[idx].end()) {
int32_t id = float_it->second;
std::string *fea_ptr = node->mutable_float_feature(id);
std::string dtype = this->float_feat_dtype[idx][id];
if (dtype == "float32") {
int ret = FeatureNode::parse_value_to_bytes<float>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
}
// else if (dtype == "float64") { // not used
// int ret = FeatureNode::parse_value_to_bytes<double>(
// fea_fields.begin(), fea_fields.end(), fea_ptr);
// if (ret != 0) {
// VLOG(0) << "Fail to parse value";
// return -1;
// }
// return 0;
// }
} else {
VLOG(4) << "feature_name[" << name << "] is not in feat_id_map, ntype_id["
<< idx << "] feat_id_map_size[" << feat_id_map.size() << "]";
}
}
}

return 0;
}
// thread safe shard vector merge
Expand Down Expand Up @@ -2930,18 +3040,32 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {
auto feature = graph_feature[k];
id_to_feature.push_back(node_type);
int feat_conf_size = static_cast<int>(feature.name().size());

int feasign_idx = 0, float_idx = 0;
for (int i = 0; i < feat_conf_size; i++) {
// auto &f_name = common.attributes()[i];
// auto &f_shape = common.dims()[i];
// auto &f_dtype = common.params()[i];
auto &f_name = feature.name()[i];
auto &f_shape = feature.shape()[i];
auto &f_dtype = feature.dtype()[i];
feat_name[k].push_back(f_name);
feat_shape[k].push_back(f_shape);
feat_dtype[k].push_back(f_dtype);
feat_id_map[k][f_name] = i;
if (f_dtype == "feasign" || f_dtype == "int64") {
feat_name[k].push_back(f_name);
feat_shape[k].push_back(f_shape);
feat_dtype[k].push_back(f_dtype);
feat_id_map[k][f_name] = feasign_idx++;
}
else if (f_dtype == "float32"){
if (float_feat_id_map.size() < (size_t)node_types.size()) {
float_feat_name.resize(node_types.size());
float_feat_shape.resize(node_types.size());
float_feat_dtype.resize(node_types.size());
float_feat_id_map.resize(node_types.size());
}
float_feat_name[k].push_back(f_name);
float_feat_shape[k].push_back(f_shape);
float_feat_dtype[k].push_back(f_dtype);
float_feat_id_map[k][f_name] = float_idx++;
}
VLOG(0) << "init graph table feat conf name:" << f_name
<< " shape:" << f_shape << " dtype:" << f_dtype;
}
Expand Down
10 changes: 9 additions & 1 deletion paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class GraphShard {
}
GraphNode *add_graph_node(uint64_t id);
GraphNode *add_graph_node(Node *node);
FeatureNode *add_feature_node(uint64_t id, bool is_overlap = true);
FeatureNode *add_feature_node(uint64_t id, bool is_overlap = true, int float_fea_num = 0);
Node *find_node(uint64_t id);
void delete_node(uint64_t id);
void clear();
Expand Down Expand Up @@ -725,6 +725,8 @@ class GraphTable : public Table {
int idx, const std::vector<uint64_t> &ids);
virtual paddle::framework::GpuPsCommGraphFea make_gpu_ps_graph_fea(
int gpu_id, std::vector<uint64_t> &node_ids, int slot_num); // NOLINT
virtual paddle::framework::GpuPsCommGraphFloatFea make_gpu_ps_graph_float_fea(
int gpu_id, std::vector<uint64_t> &node_ids, int float_slot_num); // NOLINT
int32_t Load_to_ssd(const std::string &path, const std::string &param);
int64_t load_graph_to_memory_from_ssd(int idx,
std::vector<uint64_t> &ids); // NOLINT
Expand Down Expand Up @@ -779,7 +781,13 @@ class GraphTable : public Table {
std::vector<std::vector<std::string>> feat_name;
std::vector<std::vector<std::string>> feat_dtype;
std::vector<std::vector<int32_t>> feat_shape;
std::vector<std::vector<std::string>> float_feat_name;
std::vector<std::vector<std::string>> float_feat_dtype;
std::vector<std::vector<int32_t>> float_feat_shape;
// int slot_fea_num_{-1};
// int float_fea_num_{-1};
std::vector<std::unordered_map<std::string, int32_t>> feat_id_map;
std::vector<std::unordered_map<std::string, int32_t>> float_feat_id_map;
std::unordered_map<std::string, int> feature_to_id, edge_to_id;
std::vector<std::string> id_to_feature, id_to_edge;
std::string table_name;
Expand Down
Loading