Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge feature #284

Open
wants to merge 31 commits into
base: gpugraph_new
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
425 changes: 414 additions & 11 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc

Large diffs are not rendered by default.

32 changes: 28 additions & 4 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,8 @@ class GraphTable : public Table {
int32_t load_edges(const std::string &path,
bool reverse,
const std::string &edge_type,
bool use_weight = false);
bool use_weight = false,
bool load_edge_slot = true);
int get_all_id(GraphTableType table_type,
int slice_num,
std::vector<std::vector<uint64_t>> *output);
Expand All @@ -616,7 +617,8 @@ class GraphTable : public Table {
std::pair<uint64_t, uint64_t> parse_edge_file(const std::string &path,
int idx,
bool reverse,
bool use_weight);
bool use_weight,
bool load_edge_slot);
std::pair<uint64_t, uint64_t> parse_node_file(const std::string &path,
const std::string &node_type,
int idx,
Expand Down Expand Up @@ -663,6 +665,10 @@ class GraphTable : public Table {
size_t len,
FeatureNode *node);

virtual int parse_edge_feature(int idx,
const char *feat_str,
size_t len,
GraphNode *node);
virtual int32_t get_node_feat(
int idx,
const std::vector<uint64_t> &node_ids,
Expand Down Expand Up @@ -723,6 +729,10 @@ class GraphTable : public Table {
int type_id, int idx, uint64_t src_id, char *data, int len);
virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph(
int idx, const std::vector<uint64_t> &ids);
paddle::framework::GpuPsCommGraphEdgeFea make_gpu_ps_graph_edge_fea(
int idx, const std::vector<uint64_t> &ids, int slot_num);
paddle::framework::GpuPsCommGraphEdgeFloatFea make_gpu_ps_graph_edge_float_fea(
int idx, const std::vector<uint64_t> &ids, int slot_num);
virtual paddle::framework::GpuPsCommGraphFea make_gpu_ps_graph_fea(
int gpu_id, std::vector<uint64_t> &node_ids, int slot_num); // NOLINT
virtual paddle::framework::GpuPsCommGraphFloatFea make_gpu_ps_graph_float_fea(
Expand Down Expand Up @@ -778,16 +788,29 @@ class GraphTable : public Table {
const int random_sample_nodes_ranges = 3;

std::vector<std::vector<std::unordered_map<uint64_t, double>>> node_weight;

std::vector<std::vector<std::string>> feat_name;
std::vector<std::vector<std::string>> feat_dtype;
std::vector<std::vector<int32_t>> feat_shape;
std::vector<std::vector<std::string>> float_feat_name;
std::vector<std::vector<std::string>> float_feat_dtype;
std::vector<std::vector<int32_t>> float_feat_shape;
// int slot_fea_num_{-1};
// int float_fea_num_{-1};
std::vector<std::unordered_map<std::string, int32_t>> feat_id_map;
std::vector<std::unordered_map<std::string, int32_t>> float_feat_id_map;

// ==== edge feature ====
std::vector<std::vector<std::string>> edge_feat_name;
std::vector<std::vector<std::string>> edge_feat_dtype;
std::vector<std::vector<int32_t>> edge_feat_shape;
std::vector<std::vector<std::string>> edge_float_feat_name;
std::vector<std::vector<std::string>> edge_float_feat_dtype;
std::vector<std::vector<int32_t>> edge_float_feat_shape;
std::vector<std::unordered_map<std::string, int32_t>> edge_feat_id_map;
std::vector<std::unordered_map<std::string, int32_t>> edge_float_feat_id_map;
std::unordered_map<std::string, int> edge_feature_to_id;
std::vector<std::string> id_to_edge_feature;
// ==== edge feature ====

std::unordered_map<std::string, int> feature_to_id, edge_to_id;
std::vector<std::string> id_to_feature, id_to_edge;
std::string table_name;
Expand Down Expand Up @@ -819,6 +842,7 @@ class GraphTable : public Table {
std::string feature_separator_ = std::string(" ");
std::vector<int> slot_feature_num_map_;
bool is_parse_node_fail_ = false;
bool is_parse_edge_fail_ = false;
int node_num_ = 1;
int node_id_ = 0;
bool is_weighted_ = false;
Expand Down
17 changes: 16 additions & 1 deletion paddle/fluid/distributed/ps/table/graph/graph_edge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#include "paddle/fluid/distributed/ps/table/graph/graph_edge.h"
#include <cstring>
// #include <cstring>
namespace paddle {
namespace distributed {

Expand All @@ -25,5 +25,20 @@ void WeightedGraphEdgeBlob::add_edge(int64_t id, float weight = 1) {
id_arr.push_back(id);
weight_arr.push_back((half)weight);
}

// 每次新加一条边,新增一个feature vector, 用来保存这条边的所有特征
void GraphEdgeBlobWithFeature::add_edge(int64_t id, float weight = 1) {
id_arr.push_back(id);
offsets.emplace_back(0);
feature.emplace_back(std::vector<std::string>());
}

void WeightedGraphEdgeBlobWithFeature::add_edge(int64_t id, float weight = 1) {
id_arr.push_back(id);
weight_arr.push_back((half)weight);
offsets.emplace_back(0);
feature.emplace_back(std::vector<std::string>());
}

} // namespace distributed
} // namespace paddle
219 changes: 219 additions & 0 deletions paddle/fluid/distributed/ps/table/graph/graph_edge.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#include <cstdint>
#include <vector>
#include <cuda_fp16.h>
#include <string>
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {

Expand All @@ -30,6 +34,27 @@ class GraphEdgeBlob {
virtual half get_weight(int idx) { return (half)(1.0); }
std::vector<int64_t>& export_id_array() { return id_arr; }

// ==== adapt for edge feature ===
virtual int get_feature_ids(int edge_idx,
int slot_idx,
std::vector<uint64_t> &feature_id, // NOLINT
std::vector<uint8_t> &slot_id) const { // NOLINT
return 0;
}
virtual int get_float_feature(int edge_idx,
int slot_idx,
std::vector<float> &feature_id, // NOLINT
std::vector<uint8_t> &slot_id) const { // NOLINT
return 0;
}
// virtual void set_feature(int idx, const std::string &str) {}
// virtual void set_feature_size(int size) {}
virtual void shrink_to_fit() {}
virtual std::string *mutable_feature(int idx) { return nullptr; }
virtual std::string *mutable_float_feature(int idx) { return nullptr; }
// virtual int get_feature_size() { return 0; }
// ==== adapt for edge feature ===

protected:
std::vector<int64_t> id_arr;
};
Expand All @@ -44,5 +69,199 @@ class WeightedGraphEdgeBlob : public GraphEdgeBlob {
protected:
std::vector<half> weight_arr;
};


class WeightedGraphEdgeBlobWithFeature : public GraphEdgeBlob {
public:
WeightedGraphEdgeBlobWithFeature() {}
virtual ~WeightedGraphEdgeBlobWithFeature() {}
virtual void add_edge(int64_t id, float weight);
virtual half get_weight(int idx) { return weight_arr[idx]; }

// === to adapt ====
virtual int get_feature_ids(int edge_idx,
int slot_idx,
std::vector<uint64_t> &feature_id, // NOLINT
std::vector<uint8_t> &slot_id) const { // NOLINT
errno = 0;
size_t num = 0;
if (edge_idx < static_cast<int>(this->feature.size())) {
int offset = this->offsets[edge_idx];
if (slot_idx < static_cast<int>(offset)) {
const std::string &s = this->feature[edge_idx][slot_idx];
const uint64_t *feas = (const uint64_t *)(s.c_str());
num = s.length() / sizeof(uint64_t);
CHECK((s.length() % sizeof(uint64_t)) == 0)
<< "bad feature_item: [" << s << "]";
for (size_t i = 0; i < num; ++i) {
feature_id.push_back(feas[i]);
slot_id.push_back(slot_idx);
}
}
}
PADDLE_ENFORCE_EQ(
errno,
0,
paddle::platform::errors::InvalidArgument(
"get_feature_ids get errno should be 0, but got %d.", errno));
return num;
}

virtual int get_float_feature(int edge_idx,
int slot_idx,
std::vector<float> &float_feature, // NOLINT
std::vector<uint8_t> &slot_id) const { // NOLINT
errno = 0;
size_t num = 0;
if (edge_idx < static_cast<int>(this->feature.size())) {
int offset = this->offsets[edge_idx];
if (offset + slot_idx < static_cast<int>(this->feature[edge_idx].size())) {
const std::string &s = this->feature[edge_idx][offset + slot_idx];
const float *feas = (const float *)(s.c_str());
num = s.length() / sizeof(float);
CHECK((s.length() % sizeof(float)) == 0)
<< "bad feature_item: [" << s << "]";
for (size_t i = 0; i < num; ++i) {
float_feature.push_back(feas[i]);
slot_id.push_back(slot_idx);
}
}
}
PADDLE_ENFORCE_EQ(
errno,
0,
paddle::platform::errors::InvalidArgument(
"get_feature_ids get errno should be 0, but got %d.", errno));
return num;
}

virtual std::string *mutable_feature(int idx) {
if (idx >= static_cast<int>(this->feature.back().size())) {
this->feature.back().resize(idx + 1);
}
if (idx + 1 > this->offsets.back()) this->offsets.back() = idx + 1;
return &(this->feature.back()[idx]);
}

virtual std::string *mutable_float_feature(int idx) {
int offset = this->offsets.back();
if (offset + idx >= static_cast<int>(this->feature.back().size())) {
this->feature.back().resize(offset + idx + 1);
}
return &(this->feature.back()[offset + idx]);
}

virtual void shrink_to_fit() {
feature.shrink_to_fit();
for (auto &edge : feature) {
edge.shrink_to_fit();
for (auto& slot: edge) {
slot.shrink_to_fit();
}
}
}
// === to adapt ====
protected:
std::vector<half> weight_arr;
std::vector<int> offsets;
std::vector<std::vector<std::string>> feature;
};

class GraphEdgeBlobWithFeature : public GraphEdgeBlob {
public:
GraphEdgeBlobWithFeature() {}
virtual ~GraphEdgeBlobWithFeature() {}
virtual void add_edge(int64_t id, float weight);

// === to adapt ====
virtual int get_feature_ids(int edge_idx,
int slot_idx,
std::vector<uint64_t> &feature_id, // NOLINT
std::vector<uint8_t> &slot_id) const { // NOLINT
errno = 0;
size_t num = 0;
if (edge_idx < static_cast<int>(this->feature.size())) {
int offset = this->offsets[edge_idx];
if (slot_idx < static_cast<int>(offset)) {
const std::string &s = this->feature[edge_idx][slot_idx];
const uint64_t *feas = (const uint64_t *)(s.c_str());
num = s.length() / sizeof(uint64_t);
CHECK((s.length() % sizeof(uint64_t)) == 0)
<< "bad feature_item: [" << s << "]";
for (size_t i = 0; i < num; ++i) {
feature_id.push_back(feas[i]);
slot_id.push_back(slot_idx);
}
}
}
PADDLE_ENFORCE_EQ(
errno,
0,
paddle::platform::errors::InvalidArgument(
"get_feature_ids get errno should be 0, but got %d.", errno));
return num;
}

virtual int get_float_feature(int edge_idx,
int slot_idx,
std::vector<float> &float_feature, // NOLINT
std::vector<uint8_t> &slot_id) const { // NOLINT
errno = 0;
size_t num = 0;

if (edge_idx < static_cast<int>(this->feature.size())) {
int offset = this->offsets[edge_idx];
if (offset + slot_idx < static_cast<int>(this->feature[edge_idx].size())) {
const std::string &s = this->feature[edge_idx][offset + slot_idx];
const float *feas = (const float *)(s.c_str());
num = s.length() / sizeof(float);
CHECK((s.length() % sizeof(float)) == 0)
<< "bad feature_item: [" << s << "]";
for (size_t i = 0; i < num; ++i) {
float_feature.push_back(feas[i]);
slot_id.push_back(slot_idx);
}
}
}
PADDLE_ENFORCE_EQ(
errno,
0,
paddle::platform::errors::InvalidArgument(
"get_feature_ids get errno should be 0, but got %d.", errno));
return num;
}

virtual std::string *mutable_feature(int idx) {
if (idx >= static_cast<int>(this->feature.back().size())) {
this->feature.back().resize(idx + 1);
}
if (idx + 1 > this->offsets.back()) this->offsets.back() = idx + 1;
return &(this->feature.back()[idx]);
}

virtual std::string *mutable_float_feature(int idx) {
int offset = this->offsets.back();
if (offset + idx >= static_cast<int>(this->feature.back().size())) {
this->feature.back().resize(offset + idx + 1);
}
return &(this->feature.back()[offset + idx]);
}
virtual void shrink_to_fit() {
feature.shrink_to_fit();
for (auto &edge : feature) {
edge.shrink_to_fit();
for (auto& slot: edge) {
slot.shrink_to_fit();
}
}
}
// === to adapt ====
protected:
std::vector<int> offsets;
std::vector<std::vector<std::string>> feature;
};



} // namespace distributed
} // namespace paddle
8 changes: 5 additions & 3 deletions paddle/fluid/distributed/ps/table/graph/graph_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ int FeatureNode::get_size(bool need_feature) {
return size;
}

void GraphNode::build_edges(bool is_weighted) {
void GraphNode::build_edges(bool is_weighted, bool has_edge_feature) {
if (edges == nullptr) {
if (is_weighted == true) {
edges = new WeightedGraphEdgeBlob();
if (!has_edge_feature) edges = new WeightedGraphEdgeBlob();
else edges = new WeightedGraphEdgeBlobWithFeature();
} else {
edges = new GraphEdgeBlob();
if (!has_edge_feature) edges = new GraphEdgeBlob();
else edges = new GraphEdgeBlobWithFeature();
}
}
}
Expand Down
Loading