Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry pick recent updates in graph-engine to release2.3 #42027

Merged
merged 2 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 157 additions & 23 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,112 @@ namespace paddle {
namespace distributed {

#ifdef PADDLE_WITH_HETERPS
paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
std::vector<int64_t> ids) {
std::vector<std::vector<int64_t>> bags(task_pool_size_);
for (auto x : ids) {
int location = x % shard_num % task_pool_size_;
bags[location].push_back(x);
}
std::vector<std::future<int>> tasks;
std::vector<int64_t> edge_array[task_pool_size_];
std::vector<paddle::framework::GpuPsGraphNode> node_array[task_pool_size_];
for (int i = 0; i < (int)bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int {
paddle::framework::GpuPsGraphNode x;
for (int j = 0; j < (int)bags[i].size(); j++) {
Node *v = find_node(bags[i][j]);
x.node_id = bags[i][j];
if (v == NULL) {
x.neighbor_size = 0;
x.neighbor_offset = 0;
node_array[i].push_back(x);
} else {
x.neighbor_size = v->get_neighbor_size();
x.neighbor_offset = edge_array[i].size();
node_array[i].push_back(x);
for (int k = 0; k < x.neighbor_size; k++) {
edge_array[i].push_back(v->get_neighbor_id(k));
}
}
}
return 0;
}));
}
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
paddle::framework::GpuPsCommGraph res;
int tot_len = 0;
for (int i = 0; i < task_pool_size_; i++) {
tot_len += (int)edge_array[i].size();
}
res.neighbor_size = tot_len;
res.node_size = ids.size();
res.neighbor_list = new int64_t[tot_len];
res.node_list = new paddle::framework::GpuPsGraphNode[ids.size()];
int offset = 0, ind = 0;
for (int i = 0; i < task_pool_size_; i++) {
for (int j = 0; j < (int)node_array[i].size(); j++) {
res.node_list[ind] = node_array[i][j];
res.node_list[ind++].neighbor_offset += offset;
}
for (int j = 0; j < (int)edge_array[i].size(); j++) {
res.neighbor_list[offset + j] = edge_array[i][j];
}
offset += edge_array[i].size();
}
return res;
}
int32_t GraphTable::add_node_to_ssd(int64_t src_id, char *data, int len) {
if (_db != NULL)
_db->put(src_id % shard_num % task_pool_size_, (char *)&src_id,
sizeof(uint64_t), (char *)data, sizeof(int64_t) * len);
return 0;
}
char *GraphTable::random_sample_neighbor_from_ssd(
int64_t id, int sample_size, const std::shared_ptr<std::mt19937_64> rng,
int &actual_size) {
if (_db == NULL) {
actual_size = 0;
return NULL;
}
std::string str;
if (_db->get(id % shard_num % task_pool_size_, (char *)&id, sizeof(uint64_t),
str) == 0) {
int64_t *data = ((int64_t *)str.c_str());
int n = str.size() / sizeof(int64_t);
std::unordered_map<int, int> m;
// std::vector<int64_t> res;
int sm_size = std::min(n, sample_size);
actual_size = sm_size * Node::id_size;
char *buff = new char[actual_size];
for (int i = 0; i < sm_size; i++) {
std::uniform_int_distribution<int> distrib(0, n - i - 1);
int t = distrib(*rng);
// int t = rand() % (n-i);
int pos = 0;
auto iter = m.find(t);
if (iter != m.end()) {
pos = iter->second;
} else {
pos = t;
}
auto iter2 = m.find(n - i - 1);

int key2 = iter2 == m.end() ? n - i - 1 : iter2->second;
m[t] = key2;
m.erase(n - i - 1);
memcpy(buff + i * Node::id_size, &data[pos], Node::id_size);
// res.push_back(data[pos]);
}
return buff;
}
actual_size = 0;
return NULL;
}
#endif
/*
int CompleteGraphSampler::run_graph_sampling() {
pthread_rwlock_t *rw_lock = graph_table->rw_lock.get();
pthread_rwlock_rdlock(rw_lock);
Expand Down Expand Up @@ -136,7 +241,8 @@ int BasicBfsGraphSampler::run_graph_sampling() {
int task_size = 0;
std::vector<std::future<int>> tasks;
int init_size = 0;
std::function<int(int, int64_t)> bfs = [&, this](int i, int64_t id) -> int {
//__sync_fetch_and_add
std::function<int(int, int64_t)> bfs = [&, this](int i, int id) -> int {
if (this->status == GraphSamplerStatus::terminating) {
int task_left = __sync_sub_and_fetch(&task_size, 1);
if (task_left == 0) {
Expand Down Expand Up @@ -289,6 +395,7 @@ int BasicBfsGraphSampler::run_graph_sampling() {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
VLOG(0)<<"bfs returning";
}
return 0;
}
Expand All @@ -304,7 +411,7 @@ void BasicBfsGraphSampler::init(size_t gpu_num, GraphTable *graph_table,
}

#endif

*/
std::vector<Node *> GraphShard::get_batch(int start, int end, int step) {
if (start < 0) start = 0;
std::vector<Node *> res;
Expand All @@ -316,6 +423,18 @@ std::vector<Node *> GraphShard::get_batch(int start, int end, int step) {

size_t GraphShard::get_size() { return bucket.size(); }

int32_t GraphTable::add_comm_edge(int64_t src_id, int64_t dst_id) {
size_t src_shard_id = src_id % shard_num;

if (src_shard_id >= shard_end || src_shard_id < shard_start) {
return -1;
}
size_t index = src_shard_id - shard_start;
VLOG(0) << "index add edge " << src_id << " " << dst_id;
shards[index]->add_graph_node(src_id)->build_edges(false);
shards[index]->add_neighbor(src_id, dst_id, 1.0);
return 0;
}
int32_t GraphTable::add_graph_node(std::vector<int64_t> &id_list,
std::vector<bool> &is_weight_list) {
size_t node_size = id_list.size();
Expand Down Expand Up @@ -554,9 +673,9 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) {
}

int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge) {
#ifdef PADDLE_WITH_HETERPS
if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get());
#endif
// #ifdef PADDLE_WITH_HETERPS
// if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get());
// #endif
auto paths = paddle::string::split_string<std::string>(path, ";");
int64_t count = 0;
std::string sample_type = "random";
Expand Down Expand Up @@ -633,9 +752,9 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge) {
relocate the duplicate nodes to make them distributed evenly among threads.
*/
if (!use_duplicate_nodes) {
#ifdef PADDLE_WITH_HETERPS
if (gpups_mode) pthread_rwlock_unlock(rw_lock.get());
#endif
// #ifdef PADDLE_WITH_HETERPS
// if (gpups_mode) pthread_rwlock_unlock(rw_lock.get());
// #endif

return 0;
}
Expand Down Expand Up @@ -712,9 +831,9 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge) {
delete extra_shards[i];
extra_shards[i] = extra_shards_copy[i];
}
#ifdef PADDLE_WITH_HETERPS
if (gpups_mode) pthread_rwlock_unlock(rw_lock.get());
#endif
// #ifdef PADDLE_WITH_HETERPS
// if (gpups_mode) pthread_rwlock_unlock(rw_lock.get());
// #endif
return 0;
}

Expand Down Expand Up @@ -878,6 +997,17 @@ int32_t GraphTable::random_sample_neighbors(
idx = seq_id[i][k];
int &actual_size = actual_sizes[idx];
if (node == nullptr) {
#ifdef PADDLE_WITH_HETERPS
if (search_level == 2) {
char *buffer_addr = random_sample_neighbor_from_ssd(
node_id, sample_size, rng, actual_size);
if (actual_size != 0) {
std::shared_ptr<char> &buffer = buffers[idx];
buffer.reset(buffer_addr, char_del);
}
continue;
}
#endif
actual_size = 0;
continue;
}
Expand Down Expand Up @@ -1085,25 +1215,29 @@ int32_t GraphTable::Initialize(const TableParameter &config,
return Initialize(graph);
}
int32_t GraphTable::Initialize(const GraphParameter &graph) {
task_pool_size_ = graph.task_pool_size();
#ifdef PADDLE_WITH_HETERPS
if (graph.gpups_mode()) {
gpups_mode = true;
auto *sampler =
CREATE_PSCORE_CLASS(GraphSampler, graph.gpups_graph_sample_class());
auto slices =
string::split_string<std::string>(graph.gpups_graph_sample_args(), ",");
std::cout << "slices" << std::endl;
for (auto x : slices) std::cout << x << std::endl;
sampler->init(graph.gpu_num(), this, slices);
graph_sampler.reset(sampler);
}
_db = NULL;
search_level = graph.search_level();
if (search_level >= 2) {
_db = paddle::distributed::RocksDBHandler::GetInstance();
_db->initialize("./temp_gpups_db", task_pool_size_);
}
// gpups_mode = true;
// auto *sampler =
// CREATE_PSCORE_CLASS(GraphSampler, graph.gpups_graph_sample_class());
// auto slices =
// string::split_string<std::string>(graph.gpups_graph_sample_args(), ",");
// std::cout << "slices" << std::endl;
// for (auto x : slices) std::cout << x << std::endl;
// sampler->init(graph.gpu_num(), this, slices);
// graph_sampler.reset(sampler);
#endif
if (shard_num == 0) {
server_num = 1;
_shard_idx = 0;
shard_num = graph.shard_num();
}
task_pool_size_ = graph.task_pool_size();
use_cache = graph.use_cache();
if (use_cache) {
cache_size_limit = graph.cache_size_limit();
Expand Down
54 changes: 33 additions & 21 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <vector>
#include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/common_table.h"
#include "paddle/fluid/distributed/ps/table/depends/rocksdb_warpper.h"
#include "paddle/fluid/distributed/ps/table/graph/class_macro.h"
#include "paddle/fluid/distributed/ps/table/graph/graph_node.h"
#include "paddle/fluid/string/string_helper.h"
Expand Down Expand Up @@ -351,6 +352,7 @@ class ScaledLRU {
friend class RandomSampleLRU<K, V>;
};

/*
#ifdef PADDLE_WITH_HETERPS
enum GraphSamplerStatus { waiting = 0, running = 1, terminating = 2 };
class GraphTable;
Expand All @@ -363,6 +365,9 @@ class GraphSampler {
return;
};
}
virtual int loadData(const std::string &path){
return 0;
}
virtual int run_graph_sampling() = 0;
virtual int start_graph_sampling() {
if (status != GraphSamplerStatus::waiting) {
Expand Down Expand Up @@ -403,15 +408,13 @@ class GraphSampler {
std::vector<paddle::framework::GpuPsCommGraph> sample_res;
};
#endif
*/

class GraphTable : public Table {
public:
GraphTable() {
use_cache = false;
shard_num = 0;
#ifdef PADDLE_WITH_HETERPS
gpups_mode = false;
#endif
rw_lock.reset(new pthread_rwlock_t());
}
virtual ~GraphTable();
Expand Down Expand Up @@ -516,21 +519,28 @@ class GraphTable : public Table {
return 0;
}
#ifdef PADDLE_WITH_HETERPS
virtual int32_t start_graph_sampling() {
return this->graph_sampler->start_graph_sampling();
}
virtual int32_t end_graph_sampling() {
return this->graph_sampler->end_graph_sampling();
}
virtual int32_t set_graph_sample_callback(
std::function<void(std::vector<paddle::framework::GpuPsCommGraph> &)>
callback) {
graph_sampler->set_graph_sample_callback(callback);
return 0;
}
// virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); }
// virtual int32_t start_graph_sampling() {
// return this->graph_sampler->start_graph_sampling();
// }
// virtual int32_t end_graph_sampling() {
// return this->graph_sampler->end_graph_sampling();
// }
// virtual int32_t set_graph_sample_callback(
// std::function<void(std::vector<paddle::framework::GpuPsCommGraph> &)>
// callback) {
// graph_sampler->set_graph_sample_callback(callback);
// return 0;
// }
virtual char *random_sample_neighbor_from_ssd(
int64_t id, int sample_size, const std::shared_ptr<std::mt19937_64> rng,
int &actual_size);
virtual int32_t add_node_to_ssd(int64_t id, char *data, int len);
virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph(
std::vector<int64_t> ids);
// virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); }
int search_level;
#endif
protected:
virtual int32_t add_comm_edge(int64_t src_id, int64_t dst_id);
std::vector<GraphShard *> shards, extra_shards;
size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num;
int task_pool_size_ = 24;
Expand All @@ -555,13 +565,14 @@ class GraphTable : public Table {
std::shared_ptr<pthread_rwlock_t> rw_lock;
#ifdef PADDLE_WITH_HETERPS
// paddle::framework::GpuPsGraphTable gpu_graph_table;
bool gpups_mode;
// std::shared_ptr<::ThreadPool> graph_sample_pool;
std::shared_ptr<GraphSampler> graph_sampler;
REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler)
paddle::distributed::RocksDBHandler *_db;
// std::shared_ptr<::ThreadPool> graph_sample_pool;
// std::shared_ptr<GraphSampler> graph_sampler;
// REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler)
#endif
};

/*
#ifdef PADDLE_WITH_HETERPS
REGISTER_PSCORE_REGISTERER(GraphSampler);
class CompleteGraphSampler : public GraphSampler {
Expand Down Expand Up @@ -603,6 +614,7 @@ class BasicBfsGraphSampler : public GraphSampler {
sample_neighbors_map;
};
#endif
*/
} // namespace distributed

}; // namespace paddle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#ifdef PADDLE_WITH_HETERPS
#include <glog/logging.h>
#include <rocksdb/db.h>
Expand Down
4 changes: 0 additions & 4 deletions paddle/fluid/distributed/ps/table/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ namespace paddle {
namespace distributed {
REGISTER_PSCORE_CLASS(Table, GraphTable);
REGISTER_PSCORE_CLASS(Table, MemoryDenseTable);
#ifdef PADDLE_WITH_HETERPS
REGISTER_PSCORE_CLASS(GraphSampler, CompleteGraphSampler);
REGISTER_PSCORE_CLASS(GraphSampler, BasicBfsGraphSampler);
#endif
REGISTER_PSCORE_CLASS(Table, BarrierTable);
REGISTER_PSCORE_CLASS(Table, TensorTable);
REGISTER_PSCORE_CLASS(Table, DenseTensorTable);
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ set_source_files_properties(graph_node_split_test.cc PROPERTIES COMPILE_FLAGS ${
cc_test(graph_node_split_test SRCS graph_node_split_test.cc DEPS graph_py_service scope server client communicator ps_service boost table ps_framework_proto ${COMMON_DEPS})

set_source_files_properties(graph_table_sample_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(graph_table_sample_test SRCS graph_table_sample_test.cc DEPS scope server communicator ps_service boost table ps_framework_proto ${COMMON_DEPS})
cc_test(graph_table_sample_test SRCS graph_table_sample_test.cc DEPS table ps_framework_proto ${COMMON_DEPS})

set_source_files_properties(feature_value_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(feature_value_test SRCS feature_value_test.cc DEPS ${COMMON_DEPS} boost table)
Expand Down
Loading