Skip to content

Commit

Permalink
[GraphBolt][CUDA] Overlap original edge ids fetch. (#7714)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin authored Aug 18, 2024
1 parent b1e3943 commit fc29d0e
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 91 deletions.
101 changes: 64 additions & 37 deletions graphbolt/src/cuda/extension/gpu_graph_cache.cu
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,16 @@ constexpr int kIntBlockSize = 512;

c10::intrusive_ptr<GpuGraphCache> GpuGraphCache::Create(
const int64_t num_edges, const int64_t threshold,
torch::ScalarType indptr_dtype, std::vector<torch::ScalarType> dtypes) {
torch::ScalarType indptr_dtype, std::vector<torch::ScalarType> dtypes,
bool has_original_edge_ids) {
return c10::make_intrusive<GpuGraphCache>(
num_edges, threshold, indptr_dtype, dtypes);
num_edges, threshold, indptr_dtype, dtypes, has_original_edge_ids);
}

GpuGraphCache::GpuGraphCache(
const int64_t num_edges, const int64_t threshold,
torch::ScalarType indptr_dtype, std::vector<torch::ScalarType> dtypes) {
torch::ScalarType indptr_dtype, std::vector<torch::ScalarType> dtypes,
bool has_original_edge_ids) {
const int64_t initial_node_capacity = 1024;
AT_DISPATCH_INDEX_TYPES(
dtypes.at(0), "GpuGraphCache::GpuGraphCache", ([&] {
Expand All @@ -149,7 +151,9 @@ GpuGraphCache::GpuGraphCache(
num_edges_ = 0;
indptr_ =
torch::zeros(initial_node_capacity + 1, options.dtype(indptr_dtype));
offset_ = torch::empty(indptr_.size(0) - 1, indptr_.options());
if (!has_original_edge_ids) {
offset_ = torch::empty(indptr_.size(0) - 1, indptr_.options());
}
for (auto dtype : dtypes) {
cached_edge_tensors_.push_back(
torch::empty(num_edges, options.dtype(dtype)));
Expand Down Expand Up @@ -249,8 +253,9 @@ std::tuple<torch::Tensor, std::vector<torch::Tensor>> GpuGraphCache::Replace(
torch::Tensor seeds, torch::Tensor indices, torch::Tensor positions,
int64_t num_hit, int64_t num_threshold, torch::Tensor indptr,
std::vector<torch::Tensor> edge_tensors) {
const auto with_edge_ids = offset_.has_value();
// The last element of edge_tensors has the edge ids.
const auto num_tensors = edge_tensors.size() - 1;
const auto num_tensors = edge_tensors.size() - with_edge_ids;
TORCH_CHECK(
num_tensors == cached_edge_tensors_.size(),
"Same number of tensors need to be passed!");
Expand Down Expand Up @@ -312,21 +317,28 @@ std::tuple<torch::Tensor, std::vector<torch::Tensor>> GpuGraphCache::Replace(
auto input = allocator.AllocateStorage<std::byte*>(num_buffers);
auto input_size =
allocator.AllocateStorage<size_t>(num_buffers + 1);
auto edge_id_offsets = torch::empty(
num_nodes, seeds.options().dtype(offset_.scalar_type()));
torch::optional<torch::Tensor> edge_id_offsets;
if (with_edge_ids) {
edge_id_offsets = torch::empty(
num_nodes,
seeds.options().dtype(offset_.value().scalar_type()));
}
const auto cache_missing_dtype_dev_ptr =
cache_missing_dtype_dev.get();
const auto indices_ptr = indices.data_ptr<indices_t>();
const auto positions_ptr = positions.data_ptr<indices_t>();
const auto input_ptr = input.get();
const auto input_size_ptr = input_size.get();
const auto edge_id_offsets_ptr =
edge_id_offsets.data_ptr<indptr_t>();
edge_id_offsets ? edge_id_offsets->data_ptr<indptr_t>()
: nullptr;
const auto cache_indptr = indptr_.data_ptr<indptr_t>();
const auto missing_indptr = indptr.data_ptr<indptr_t>();
const auto cache_offset = offset_.data_ptr<indptr_t>();
const auto cache_offset =
offset_ ? offset_->data_ptr<indptr_t>() : nullptr;
const auto missing_edge_ids =
edge_tensors.back().data_ptr<indptr_t>();
edge_id_offsets ? edge_tensors.back().data_ptr<indptr_t>()
: nullptr;
CUB_CALL(DeviceFor::Bulk, num_buffers, [=] __device__(int64_t i) {
const auto tensor_idx = i / num_nodes;
const auto idx = i % num_nodes;
Expand All @@ -340,14 +352,14 @@ std::tuple<torch::Tensor, std::vector<torch::Tensor>> GpuGraphCache::Replace(
const auto offset_end = is_cached
? cache_indptr[pos + 1]
: missing_indptr[idx - num_hit + 1];
const auto edge_id =
is_cached ? cache_offset[pos] : missing_edge_ids[offset];
const auto out_idx = tensor_idx * num_nodes + original_idx;

input_ptr[out_idx] =
(is_cached ? cache_ptr : missing_ptr) + offset * size;
input_size_ptr[out_idx] = size * (offset_end - offset);
if (i < num_nodes) {
if (edge_id_offsets_ptr && i < num_nodes) {
const auto edge_id =
is_cached ? cache_offset[pos] : missing_edge_ids[offset];
edge_id_offsets_ptr[out_idx] = edge_id;
}
});
Expand Down Expand Up @@ -390,10 +402,12 @@ std::tuple<torch::Tensor, std::vector<torch::Tensor>> GpuGraphCache::Replace(
indptr_.size(0) * kIntGrowthFactor, indptr_.options());
new_indptr.slice(0, 0, indptr_.size(0)) = indptr_;
indptr_ = new_indptr;
auto new_offset =
torch::empty(indptr_.size(0) - 1, offset_.options());
new_offset.slice(0, 0, offset_.size(0)) = offset_;
offset_ = new_offset;
if (offset_) {
auto new_offset =
torch::empty(indptr_.size(0) - 1, offset_->options());
new_offset.slice(0, 0, offset_->size(0)) = *offset_;
offset_ = new_offset;
}
}
torch::Tensor sindptr;
bool enough_space;
Expand All @@ -415,22 +429,32 @@ std::tuple<torch::Tensor, std::vector<torch::Tensor>> GpuGraphCache::Replace(
}
if (enough_space) {
auto num_edges = num_edges_;
auto transform_input_it = thrust::make_zip_iterator(
sindptr.data_ptr<indptr_t>() + 1,
sliced_indptr.data_ptr<indptr_t>());
auto transform_output_it = thrust::make_zip_iterator(
indptr_.data_ptr<indptr_t>() + num_nodes_ + 1,
offset_.data_ptr<indptr_t>() + num_nodes_);
THRUST_CALL(
transform, transform_input_it,
transform_input_it + sindptr.size(0) - 1,
transform_output_it,
[=] __host__ __device__(
const thrust::tuple<indptr_t, indptr_t>& x) {
return thrust::make_tuple(
thrust::get<0>(x) + num_edges,
missing_edge_ids[thrust::get<1>(x)]);
});
if (offset_) {
auto transform_input_it = thrust::make_zip_iterator(
sindptr.data_ptr<indptr_t>() + 1,
sliced_indptr.data_ptr<indptr_t>());
auto transform_output_it = thrust::make_zip_iterator(
indptr_.data_ptr<indptr_t>() + num_nodes_ + 1,
offset_->data_ptr<indptr_t>() + num_nodes_);
THRUST_CALL(
transform, transform_input_it,
transform_input_it + sindptr.size(0) - 1,
transform_output_it,
[=] __host__ __device__(
const thrust::tuple<indptr_t, indptr_t>& x) {
return thrust::make_tuple(
thrust::get<0>(x) + num_edges,
missing_edge_ids[thrust::get<1>(x)]);
});
} else {
THRUST_CALL(
transform, sindptr.data_ptr<indptr_t>() + 1,
sindptr.data_ptr<indptr_t>() + sindptr.size(0),
indptr_.data_ptr<indptr_t>() + num_nodes_ + 1,
[=] __host__ __device__(const indptr_t& x) {
return x + num_edges;
});
}
auto map = reinterpret_cast<map_t<indices_t>*>(map_);
const dim3 block(kIntBlockSize);
const dim3 grid(
Expand Down Expand Up @@ -467,10 +491,13 @@ std::tuple<torch::Tensor, std::vector<torch::Tensor>> GpuGraphCache::Replace(
.view(edge_tensors[i].scalar_type())
.slice(0, 0, static_cast<indptr_t>(output_size)));
}
// Append the edge ids as the last element of the output.
output_edge_tensors.push_back(ops::IndptrEdgeIdsImpl(
output_indptr, output_indptr.scalar_type(), edge_id_offsets,
static_cast<int64_t>(static_cast<indptr_t>(output_size))));
if (edge_id_offsets) {
// Append the edge ids as the last element of the output.
output_edge_tensors.push_back(ops::IndptrEdgeIdsImpl(
output_indptr, output_indptr.scalar_type(),
*edge_id_offsets,
static_cast<int64_t>(static_cast<indptr_t>(output_size))));
}

{
thrust::counting_iterator<int64_t> iota{0};
Expand Down
11 changes: 8 additions & 3 deletions graphbolt/src/cuda/extension/gpu_graph_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ class GpuGraphCache : public torch::CustomClassHolder {
* @param indptr_dtype The node id datatype.
* @param dtypes The dtypes of the edge tensors to be cached. dtypes[0] is
* reserved for the indices edge tensor holding node ids.
* @param has_original_edge_ids Whether the graph to be cached has original
* edge ids.
*/
GpuGraphCache(
const int64_t num_edges, const int64_t threshold,
torch::ScalarType indptr_dtype, std::vector<torch::ScalarType> dtypes);
torch::ScalarType indptr_dtype, std::vector<torch::ScalarType> dtypes,
bool has_original_edge_ids);

GpuGraphCache() = default;

Expand Down Expand Up @@ -109,7 +112,8 @@ class GpuGraphCache : public torch::CustomClassHolder {

static c10::intrusive_ptr<GpuGraphCache> Create(
const int64_t num_edges, const int64_t threshold,
torch::ScalarType indptr_dtype, std::vector<torch::ScalarType> dtypes);
torch::ScalarType indptr_dtype, std::vector<torch::ScalarType> dtypes,
bool has_original_edge_ids);

private:
void* map_; // pointer to the hash table.
Expand All @@ -119,7 +123,8 @@ class GpuGraphCache : public torch::CustomClassHolder {
int64_t num_nodes_; // The number of cached nodes in the cache.
int64_t num_edges_; // The number of cached edges in the cache.
torch::Tensor indptr_; // The cached graph structure indptr tensor.
torch::Tensor offset_; // The original graph's sliced_indptr tensor.
torch::optional<torch::Tensor>
offset_; // The original graph's sliced_indptr tensor.
std::vector<torch::Tensor> cached_edge_tensors_; // The cached graph
// structure edge tensors.
std::mutex mtx_; // Protects the data structure and makes it threadsafe.
Expand Down
Loading

0 comments on commit fc29d0e

Please sign in to comment.