From 6eee86db6bbfd6ef191a8d76f454242d464011a2 Mon Sep 17 00:00:00 2001 From: huwei02 <53012141+huwei02@users.noreply.github.com> Date: Mon, 15 May 2023 22:20:54 +0800 Subject: [PATCH] change arg of FillWalkBuf to be vector (#286) Co-authored-by: root --- paddle/fluid/framework/data_feed.cu | 206 ++++++++++-------- paddle/fluid/framework/data_feed.h | 16 +- .../fleet/heter_ps/graph_gpu_wrapper.cu | 19 +- .../fleet/heter_ps/graph_gpu_wrapper.h | 8 +- 4 files changed, 136 insertions(+), 113 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cu b/paddle/fluid/framework/data_feed.cu index 132be7d4b5f034..b3f2769425821b 100644 --- a/paddle/fluid/framework/data_feed.cu +++ b/paddle/fluid/framework/data_feed.cu @@ -661,7 +661,7 @@ int GraphDataGenerator::FillIdShowClkTensor(int total_instance, } else { // infer uint64_t *d_type_keys = - reinterpret_cast(d_device_keys_[infer_cursor_]->ptr()); + reinterpret_cast(d_device_keys_[0][infer_cursor_]->ptr()); d_type_keys += infer_node_start_; infer_node_start_ += total_instance / 2; CopyDuplicateKeys<<(d_walk_ntype_[0]->ptr()); - } - - auto gpu_graph_ptr = GraphGpuWrapper::GetInstance(); - if (FLAGS_graph_metapath_split_opt) { - train_flag = FillWalkBufMultiPath( - h_device_keys_len_, - d_device_keys_, - gpu_graph_ptr->meta_path_, - conf_, - &epoch_finish_, - ©_unique_len_, - place_, - gpu_graph_ptr->first_node_type_, - &(gpu_graph_ptr->node_type_start_[conf_.gpuid]), - reinterpret_cast(d_walk_[0]->ptr()), - walk_ntype, - &d_uniq_node_num_, - reinterpret_cast(d_random_row_[0]->ptr()), - reinterpret_cast(d_random_row_col_shift_[0]->ptr()), - &host_vec_, - &total_row_[0], - &jump_rows_, - &shuffle_seed_, - reinterpret_cast(d_train_metapath_keys_->ptr()), - &h_train_metapath_keys_len_, - table_, - &buf_state_[0], - sample_stream_); - } else { - train_flag = FillWalkBuf(h_device_keys_len_, - d_device_keys_, - gpu_graph_ptr->meta_path_, + for (int tensor_pair_idx = 0; tensor_pair_idx < conf_.tensor_pair_num; + ++tensor_pair_idx) { + uint8_t *walk_ntype = NULL; + if (conf_.need_walk_ntype) { + walk_ntype = reinterpret_cast(d_walk_ntype_[tensor_pair_idx]->ptr()); + } + + auto gpu_graph_ptr = GraphGpuWrapper::GetInstance(); + if (FLAGS_graph_metapath_split_opt) { + train_flag = FillWalkBufMultiPath( + h_device_keys_len_[tensor_pair_idx], + d_device_keys_[tensor_pair_idx], + gpu_graph_ptr->meta_path_[tensor_pair_idx], conf_, &epoch_finish_, - ©_unique_len_, + ©_unique_len_[tensor_pair_idx], place_, - gpu_graph_ptr->first_node_type_, - &(gpu_graph_ptr->node_type_start_[conf_.gpuid]), - &(gpu_graph_ptr->finish_node_type_[conf_.gpuid]), - reinterpret_cast(d_walk_[0]->ptr()), + gpu_graph_ptr->first_node_type_[tensor_pair_idx], + &(gpu_graph_ptr->node_type_start_[tensor_pair_idx][conf_.gpuid]), + reinterpret_cast(d_walk_[tensor_pair_idx]->ptr()), walk_ntype, - &d_uniq_node_num_, - reinterpret_cast(d_random_row_[0]->ptr()), - reinterpret_cast(d_random_row_col_shift_[0]->ptr()), - &multi_node_sync_stat_, + &d_uniq_node_num_[tensor_pair_idx], + reinterpret_cast(d_random_row_[tensor_pair_idx]->ptr()), + reinterpret_cast(d_random_row_col_shift_[tensor_pair_idx]->ptr()), &host_vec_, - &total_row_[0], - &jump_rows_, - &shuffle_seed_, + &total_row_[tensor_pair_idx], + &jump_rows_[tensor_pair_idx], + &shuffle_seed_[tensor_pair_idx], + reinterpret_cast(d_train_metapath_keys_[tensor_pair_idx]->ptr()), + &h_train_metapath_keys_len_[tensor_pair_idx], table_, - &buf_state_[0], + &buf_state_[tensor_pair_idx], sample_stream_); + } else { + train_flag = FillWalkBuf(h_device_keys_len_[tensor_pair_idx], + d_device_keys_[tensor_pair_idx], + gpu_graph_ptr->meta_path_[tensor_pair_idx], + conf_, + &epoch_finish_, + ©_unique_len_[tensor_pair_idx], + place_, + gpu_graph_ptr->first_node_type_[tensor_pair_idx], + &(gpu_graph_ptr->node_type_start_[tensor_pair_idx][conf_.gpuid]), + &(gpu_graph_ptr->finish_node_type_[tensor_pair_idx][conf_.gpuid]), + reinterpret_cast(d_walk_[tensor_pair_idx]->ptr()), + walk_ntype, + &d_uniq_node_num_[tensor_pair_idx], + reinterpret_cast(d_random_row_[tensor_pair_idx]->ptr()), + reinterpret_cast(d_random_row_col_shift_[tensor_pair_idx]->ptr()), + &multi_node_sync_stat_, + &host_vec_, + &total_row_[tensor_pair_idx], + &jump_rows_[tensor_pair_idx], + &shuffle_seed_[tensor_pair_idx], + table_, + &buf_state_[tensor_pair_idx], + sample_stream_); + } } return train_flag; @@ -3430,9 +3433,9 @@ void GraphDataGenerator::DoSageForTrain() { cudaStreamSynchronize(sample_stream_); InsertTable(reinterpret_cast(final_sage_nodes->ptr()), uniq_instance, - &d_uniq_node_num_, + &d_uniq_node_num_[0], conf_, - ©_unique_len_, + ©_unique_len_[0], place_, table_, &host_vec_, @@ -3458,7 +3461,7 @@ void GraphDataGenerator::DoSageForInfer() { total_instance *= 2; while (total_instance != 0) { uint64_t *d_type_keys = reinterpret_cast( - d_device_keys_[infer_cursor_]->ptr()); + d_device_keys_[0][infer_cursor_]->ptr()); d_type_keys += infer_node_start_; infer_node_start_ += total_instance / 2; auto node_buf = memory::AllocShared( @@ -3496,9 +3499,9 @@ void GraphDataGenerator::DoSageForInfer() { cudaStreamSynchronize(sample_stream_); InsertTable(reinterpret_cast(final_sage_nodes->ptr()), uniq_instance, - &d_uniq_node_num_, + &d_uniq_node_num_[0], conf_, - ©_unique_len_, + ©_unique_len_[0], place_, table_, &host_vec_, @@ -3526,16 +3529,16 @@ int GraphDataGenerator::FillInferBuf() { gpu_graph_ptr->global_infer_node_type_start_[conf_.gpuid]; auto &infer_cursor = gpu_graph_ptr->infer_cursor_[conf_.thread_id]; total_row_[0] = 0; - if (infer_cursor < h_device_keys_len_.size()) { + if (infer_cursor < h_device_keys_len_[0].size()) { while (global_infer_node_type_start[infer_cursor] >= - h_device_keys_len_[infer_cursor]) { + h_device_keys_len_[0][infer_cursor]) { infer_cursor++; - if (infer_cursor >= h_device_keys_len_.size()) { + if (infer_cursor >= h_device_keys_len_[0].size()) { return 0; } } if (!infer_node_type_index_set_.empty()) { - while (infer_cursor < h_device_keys_len_.size()) { + while (infer_cursor < h_device_keys_len_[0].size()) { if (infer_node_type_index_set_.find(infer_cursor) == infer_node_type_index_set_.end()) { VLOG(2) << "Skip cursor[" << infer_cursor << "]"; @@ -3546,12 +3549,12 @@ int GraphDataGenerator::FillInferBuf() { break; } } - if (infer_cursor >= h_device_keys_len_.size()) { + if (infer_cursor >= h_device_keys_len_[0].size()) { return 0; } } - size_t device_key_size = h_device_keys_len_[infer_cursor]; + size_t device_key_size = h_device_keys_len_[0][infer_cursor]; total_row_[0] = (global_infer_node_type_start[infer_cursor] + conf_.buf_size <= device_key_size) @@ -3559,7 +3562,7 @@ int GraphDataGenerator::FillInferBuf() { : device_key_size - global_infer_node_type_start[infer_cursor]; uint64_t *d_type_keys = - reinterpret_cast(d_device_keys_[infer_cursor]->ptr()); + reinterpret_cast(d_device_keys_[0][infer_cursor]->ptr()); if (!conf_.sage_mode) { host_vec_.resize(total_row_[0]); cudaMemcpyAsync(host_vec_.data(), @@ -3583,12 +3586,15 @@ int GraphDataGenerator::FillInferBuf() { void GraphDataGenerator::ClearSampleState() { auto gpu_graph_ptr = GraphGpuWrapper::GetInstance(); - auto &finish_node_type = gpu_graph_ptr->finish_node_type_[conf_.gpuid]; - auto &node_type_start = gpu_graph_ptr->node_type_start_[conf_.gpuid]; - finish_node_type.clear(); - for (auto iter = node_type_start.begin(); iter != node_type_start.end(); - iter++) { - iter->second = 0; + for (int tensor_pair_idx = 0; tensor_pair_idx < conf_.tensor_pair_num; + ++tensor_pair_idx) { + auto &finish_node_type = gpu_graph_ptr->finish_node_type_[tensor_pair_idx][conf_.gpuid]; + auto &node_type_start = gpu_graph_ptr->node_type_start_[tensor_pair_idx][conf_.gpuid]; + finish_node_type.clear(); + for (auto iter = node_type_start.begin(); iter != node_type_start.end(); + iter++) { + iter->second = 0; + } } } @@ -3666,33 +3672,47 @@ void GraphDataGenerator::AllocResource( // stream_)); // } if (conf_.gpu_graph_training && FLAGS_graph_metapath_split_opt) { - d_train_metapath_keys_ = - gpu_graph_ptr->d_node_iter_graph_metapath_keys_[thread_id]; - h_train_metapath_keys_len_ = - gpu_graph_ptr->h_node_iter_graph_metapath_keys_len_[thread_id]; - VLOG(2) << "h train metapaths key len: " << h_train_metapath_keys_len_; + d_train_metapath_keys_.resize(conf_.tensor_pair_num); + h_train_metapath_keys_len_.resize(conf_.tensor_pair_num); + for (int tensor_pair_idx = 0; tensor_pair_idx < conf_.tensor_pair_num; + ++tensor_pair_idx) { + d_train_metapath_keys_[tensor_pair_idx] = + gpu_graph_ptr->d_node_iter_graph_metapath_keys_[thread_id]; + h_train_metapath_keys_len_[tensor_pair_idx] = + gpu_graph_ptr->h_node_iter_graph_metapath_keys_len_[thread_id]; + VLOG(2) << "h train metapaths key len: " << h_train_metapath_keys_len_[tensor_pair_idx]; + } } else { - auto &d_graph_all_type_keys = - gpu_graph_ptr->d_node_iter_graph_all_type_keys_; - auto &h_graph_all_type_keys_len = - gpu_graph_ptr->h_node_iter_graph_all_type_keys_len_; - - for (size_t i = 0; i < d_graph_all_type_keys.size(); i++) { - d_device_keys_.push_back(d_graph_all_type_keys[i][thread_id]); - h_device_keys_len_.push_back(h_graph_all_type_keys_len[i][thread_id]); + d_device_keys_.resize(conf_.tensor_pair_num); + h_device_keys_len_.resize(conf_.tensor_pair_num); + for (int tensor_pair_idx = 0; tensor_pair_idx < conf_.tensor_pair_num; + ++tensor_pair_idx) { + auto &d_graph_all_type_keys = + gpu_graph_ptr->d_node_iter_graph_all_type_keys_; + auto &h_graph_all_type_keys_len = + gpu_graph_ptr->h_node_iter_graph_all_type_keys_len_; + + for (size_t i = 0; i < d_graph_all_type_keys.size(); i++) { + d_device_keys_[tensor_pair_idx].push_back(d_graph_all_type_keys[i][thread_id]); + h_device_keys_len_[tensor_pair_idx].push_back(h_graph_all_type_keys_len[i][thread_id]); + } + VLOG(2) << "h_device_keys size: " << h_device_keys_len_[tensor_pair_idx].size(); } - VLOG(2) << "h_device_keys size: " << h_device_keys_len_.size(); } infer_cursor_ = 0; - jump_rows_ = 0; - d_uniq_node_num_ = memory::AllocShared( - place_, - sizeof(uint64_t), - phi::Stream(reinterpret_cast(sample_stream_))); - cudaMemsetAsync(d_uniq_node_num_->ptr(), 0, sizeof(uint64_t), sample_stream_); + jump_rows_.assign(conf_.tensor_pair_num, 0); + d_uniq_node_num_.resize(conf_.tensor_pair_num); + for (int tensor_pair_idx = 0; tensor_pair_idx < conf_.tensor_pair_num; + ++tensor_pair_idx) { + d_uniq_node_num_[tensor_pair_idx] = memory::AllocShared( + place_, + sizeof(uint64_t), + phi::Stream(reinterpret_cast(sample_stream_))); + cudaMemsetAsync(d_uniq_node_num_[tensor_pair_idx]->ptr(), 0, sizeof(uint64_t), sample_stream_); + } - total_row_.resize(conf_.tensor_pair_num); + total_row_.assign(conf_.tensor_pair_num, 0); d_walk_.resize(conf_.tensor_pair_num); for (int tensor_pair_idx = 0; tensor_pair_idx < conf_.tensor_pair_num; ++tensor_pair_idx) { @@ -3742,14 +3762,12 @@ void GraphDataGenerator::AllocResource( phi::Stream(reinterpret_cast(sample_stream_))); } - shuffle_seed_ = 0; - - ins_buf_pair_len_.resize(conf_.tensor_pair_num); + shuffle_seed_.assign(conf_.tensor_pair_num, 0); + ins_buf_pair_len_.assign(conf_.tensor_pair_num, 0); d_ins_buf_.resize(conf_.tensor_pair_num); d_pair_num_.resize(conf_.tensor_pair_num); for (int tensor_pair_idx = 0; tensor_pair_idx < conf_.tensor_pair_num; ++tensor_pair_idx) { - ins_buf_pair_len_[tensor_pair_idx] = 0; d_ins_buf_[tensor_pair_idx] = memory::AllocShared( place_, (conf_.batch_size * 2 * 2) * sizeof(uint64_t), @@ -3931,7 +3949,7 @@ void GraphDataGenerator::SetConfig( int sample_size = std::stoi(samples[i]); conf_.samples.emplace_back(sample_size); } - copy_unique_len_ = 0; + copy_unique_len_.assign(conf_.tensor_pair_num, 0); if (!conf_.gpu_graph_training) { infer_node_type_ = graph_config.infer_node_type(); diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 7929df2a7f42d0..d18113c31866dc 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -1027,7 +1027,7 @@ class GraphDataGenerator { HashTable* table_; GraphDataGeneratorConfig conf_; size_t infer_cursor_; - size_t jump_rows_; + std::vector jump_rows_; int64_t* id_tensor_ptr_; int* index_tensor_ptr_; int64_t* show_tensor_ptr_; @@ -1041,8 +1041,8 @@ class GraphDataGenerator { std::vector feed_vec_; std::vector* feed_info_; // adapt for float feature std::vector offset_; - std::vector> d_device_keys_; - std::shared_ptr d_train_metapath_keys_; + std::vector>> d_device_keys_; + std::vector> d_train_metapath_keys_; std::vector> d_walk_; std::vector> d_walk_ntype_; @@ -1050,7 +1050,7 @@ class GraphDataGenerator { std::shared_ptr d_feature_; std::vector> d_random_row_; std::vector> d_random_row_col_shift_; - std::shared_ptr d_uniq_node_num_; + std::vector> d_uniq_node_num_; std::shared_ptr d_slot_feature_num_map_; std::shared_ptr d_actual_slot_id_map_; std::shared_ptr d_fea_offset_map_; @@ -1086,13 +1086,13 @@ class GraphDataGenerator { int uint_slot_num_ = 0; // uint slot num std::vector h_slot_feature_num_map_; int fea_num_per_node_; - int shuffle_seed_; + std::vector shuffle_seed_; bool epoch_finish_; int pass_end_ = 0; std::vector host_vec_; - std::vector h_device_keys_len_; - uint64_t h_train_metapath_keys_len_; - uint64_t copy_unique_len_; + std::vector> h_device_keys_len_; + std::vector h_train_metapath_keys_len_; + std::vector copy_unique_len_; std::vector total_row_; size_t infer_node_start_; size_t infer_node_end_; diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu index fce156124b19bd..0fb7fa595a19a9 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu @@ -48,6 +48,7 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type, auto node_types = paddle::string::split_string(first_node_type, ";"); VLOG(2) << "node_types: " << first_node_type; + first_node_type_.resize(1); for (auto &type : node_types) { auto iter = node_to_id.find(type); PADDLE_ENFORCE_NE( @@ -55,9 +56,10 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type, node_to_id.end(), platform::errors::NotFound("(%s) is not found in node_to_id.", type)); VLOG(2) << "node_to_id[" << type << "] = " << iter->second; - first_node_type_.push_back(iter->second); + first_node_type_[0].push_back(iter->second); } - meta_path_.resize(first_node_type_.size()); + meta_path_.resize(1); + meta_path_[0].resize(first_node_type_[0].size()); auto meta_paths = paddle::string::split_string(meta_path, ";"); for (size_t i = 0; i < meta_paths.size(); i++) { @@ -70,7 +72,7 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type, platform::errors::NotFound( "(%s) is not found in edge_to_id.", edge)); VLOG(2) << "edge_to_id[" << edge << "] = " << iter->second; - meta_path_[i].push_back(iter->second); + meta_path_[0][i].push_back(iter->second); if (edge_to_node_map_.find(iter->second) == edge_to_node_map_.end()) { auto nodes = get_ntype_from_etype(edge); uint64_t src_node_id = node_to_id.find(nodes[0])->second; @@ -135,14 +137,17 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type, max_dev_id = device_id_mapping[i]; } } - finish_node_type_.resize(max_dev_id + 1); - node_type_start_.resize(max_dev_id + 1); + + finish_node_type_.resize(1); + node_type_start_.resize(1); + finish_node_type_[0].resize(max_dev_id + 1); + node_type_start_[0].resize(max_dev_id + 1); global_infer_node_type_start_.resize(max_dev_id + 1); for (size_t i = 0; i < device_id_mapping.size(); i++) { int dev_id = device_id_mapping[i]; - auto &node_type_start = node_type_start_[i]; + auto &node_type_start = node_type_start_[0][i]; auto &infer_node_type_start = global_infer_node_type_start_[i]; - auto &finish_node_type = finish_node_type_[i]; + auto &finish_node_type = finish_node_type_[0][i]; finish_node_type.clear(); for (size_t idx = 0; idx < node_to_id.size(); idx++) { diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h index 1f9f87e43e6020..2a5b22f226d8a1 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h @@ -230,13 +230,13 @@ class GraphGpuWrapper { std::string feature_separator_ = std::string(" "); bool conf_initialized_ = false; bool type_keys_initialized_ = false; - std::vector first_node_type_; + std::vector> first_node_type_; std::vector excluded_train_pair_; std::vector pair_label_conf_; - std::vector> meta_path_; + std::vector>> meta_path_; - std::vector> finish_node_type_; - std::vector> node_type_start_; + std::vector>> finish_node_type_; + std::vector>> node_type_start_; std::vector cur_metapath_start_; std::vector> global_infer_node_type_start_; std::vector infer_cursor_;