Skip to content

Commit

Permalink
Merge branch 'dmlc:master' into add-igb-to-sage
Browse files Browse the repository at this point in the history
  • Loading branch information
BowenYao18 authored Aug 17, 2024
2 parents c5e10fd + 09ea319 commit c5285d1
Show file tree
Hide file tree
Showing 28 changed files with 663 additions and 190 deletions.
87 changes: 51 additions & 36 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ if (${BUILD_TYPE} STREQUAL "dev")
endif()
else()
if (MSVC)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /O2 /DNDEBUG")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /O2 /DNDEBUG")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /O2")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /O2")
else()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O2 -DNDEBUG")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2 -DNDEBUG")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O2")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2")
endif()
endif()

Expand Down Expand Up @@ -186,11 +186,48 @@ else(MSVC)
endif(NOT APPLE)
endif(MSVC)

if(USE_OPENMP)
include(FindOpenMP)
if(OPENMP_FOUND)
set(CMAKE_C_FLAGS "${OpenMP_C_FLAGS} ${CMAKE_C_FLAGS}")
set(CMAKE_CXX_FLAGS "${OpenMP_CXX_FLAGS} ${CMAKE_CXX_FLAGS}")
endif(OPENMP_FOUND)
message(STATUS "Build with OpenMP.")
endif(USE_OPENMP)

if(NOT CMAKE_SYSTEM_PROCESSOR MATCHES "(x86)|(X86)|(amd64)|(AMD64)")
message(STATUS "Disabling LIBXSMM on ${CMAKE_SYSTEM_PROCESSOR}.")
set(USE_LIBXSMM OFF)
endif()

if(USE_LIBXSMM)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DUSE_LIBXSMM -DDGL_CPU_LLC_SIZE=40000000 -D__BLAS=0")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_LIBXSMM -DDGL_CPU_LLC_SIZE=40000000 -D__BLAS=0")
message(STATUS "Build with LIBXSMM optimization.")
endif(USE_LIBXSMM)

if ((NOT MSVC) AND USE_EPOLL)
INCLUDE(CheckIncludeFile)
check_include_file("sys/epoll.h" EPOLL_AVAILABLE)
if (EPOLL_AVAILABLE)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DUSE_EPOLL")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_EPOLL")
else()
message(WARNING "EPOLL is not available on this platform...")
endif()
endif ()

# To compile METIS correct for DGL.
if(MSVC)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /DIDXTYPEWIDTH=64 /DREALTYPEWIDTH=32")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /DIDXTYPEWIDTH=64 /DREALTYPEWIDTH=32")
else(MSVC)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DIDXTYPEWIDTH=64 -DREALTYPEWIDTH=32")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DIDXTYPEWIDTH=64 -DREALTYPEWIDTH=32")
endif(MSVC)

# configure minigun
add_definitions(-DENABLE_PARTIAL_FRONTIER=0) # disable minigun partial frontier compile
# Source file lists
file(GLOB DGL_SRC
src/*.cc
Expand Down Expand Up @@ -219,12 +256,6 @@ else()
endif()
list(APPEND DGL_SRC ${DGL_RPC_SRC})

if(USE_OPENMP)
find_package(OpenMP REQUIRED)
list(APPEND DGL_LINKER_LIBS OpenMP::OpenMP_CXX)
message(STATUS "Build with OpenMP.")
endif(USE_OPENMP)

# Configure cuda
if(USE_CUDA)
file(GLOB_RECURSE DGL_CUDA_SRC
Expand All @@ -248,16 +279,6 @@ else(USE_CUDA)
add_library(dgl SHARED ${DGL_SRC})
endif(USE_CUDA)

if ((NOT MSVC) AND USE_EPOLL)
INCLUDE(CheckIncludeFile)
check_include_file("sys/epoll.h" EPOLL_AVAILABLE)
if (EPOLL_AVAILABLE)
target_compile_definitions(dgl PRIVATE USE_EPOLL)
else()
message(WARNING "EPOLL is not available on this platform...")
endif()
endif ()

# include directories
target_include_directories(dgl PRIVATE "include")
# check for conda includes
Expand Down Expand Up @@ -330,26 +351,18 @@ else(EXTERNAL_NANOFLANN_PATH)
endif(EXTERNAL_NANOFLANN_PATH)

if (USE_LIBXSMM)
target_compile_definitions(dgl PRIVATE USE_LIBXSMM DGL_CPU_LLC_SIZE=40000000 __BLAS=0)
target_include_directories(dgl PRIVATE "third_party/libxsmm/include")
message(STATUS "Build with LIBXSMM optimization.")
endif()

# To compile METIS correct for DGL.
add_compile_definitions(IDXTYPEWIDTH=64 REALTYPEWIDTH=32)
if (EXTERNAL_METIS_PATH)
# To compile METIS correct for DGL.
if(MSVC)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /DIDXTYPEWIDTH=64 /DREALTYPEWIDTH=32")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /DIDXTYPEWIDTH=64 /DREALTYPEWIDTH=32")
else(MSVC)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DIDXTYPEWIDTH=64 -DREALTYPEWIDTH=32")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DIDXTYPEWIDTH=64 -DREALTYPEWIDTH=32")
endif(MSVC)
find_package(METIS REQUIRED)
message(STATUS "Found METIS library")
target_include_directories(dgl SYSTEM PUBLIC ${METIS_INCLUDE_DIR})
list(APPEND DGL_LINKER_LIBS ${METIS_LIBRARIES})
if (NOT METIS_FOUND)
message(FATAL_ERROR "Failed to find METIS library")
else()
message(STATUS "Found METIS library")
target_include_directories(dgl SYSTEM PUBLIC ${METIS_INCLUDE_DIR})
list(APPEND DGL_LINKER_LIBS ${METIS_LIBRARIES})
endif()
else(EXTERNAL_METIS_PATH)
target_include_directories(dgl PRIVATE "third_party/METIS/include")
# Compile METIS
Expand Down Expand Up @@ -378,6 +391,8 @@ endif()

# Compile gpu_cache
if(USE_CUDA)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DUSE_GPU_CACHE")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_GPU_CACHE")
# Manually build gpu_cache because CMake always builds it as shared
file(GLOB gpu_cache_src
third_party/HugeCTR/gpu_cache/src/nv_gpu_cache.cu
Expand All @@ -391,7 +406,7 @@ endif(USE_CUDA)

# support PARALLEL_ALGORITHMS
if (LIBCXX_ENABLE_PARALLEL_ALGORITHMS)
target_compile_definitions(dgl PRIVATE PARALLEL_ALGORITHMS)
add_definitions(-DPARALLEL_ALGORITHMS)
endif(LIBCXX_ENABLE_PARALLEL_ALGORITHMS)

target_link_libraries(dgl ${DGL_LINKER_LIBS} ${DGL_RUNTIME_LINKER_LIBS})
Expand Down
4 changes: 0 additions & 4 deletions cmake/modules/CUDA.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ macro(dgl_config_cuda linker_libs)
string(CONCAT CXX_HOST_FLAGS ${CXX_HOST_FLAGS} ",/MD")
endif()
list(APPEND CUDA_NVCC_FLAGS "-Xcompiler" "${CXX_HOST_FLAGS}")
if(USE_OPENMP)
# Needed by CUDA disjoint union source file.
list(APPEND CUDA_NVCC_FLAGS "-Xcompiler" "${OpenMP_CXX_FLAGS}")
endif(USE_OPENMP)

# 1. Add arch flags
dgl_select_nvcc_arch_flags(NVCC_FLAGS_ARCH)
Expand Down
5 changes: 4 additions & 1 deletion examples/multigpu/graphbolt/node_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ def create_dataloader(
if args.storage_device != "cpu":
datapipe = datapipe.copy_to(device)
datapipe = datapipe.sample_neighbor(
graph, args.fanout, overlap_fetch=args.storage_device == "pinned"
graph,
args.fanout,
overlap_fetch=args.storage_device == "pinned",
asynchronous=args.storage_device != "cpu",
)
datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"])
if args.storage_device == "cpu":
Expand Down
78 changes: 67 additions & 11 deletions graphbolt/include/graphbolt/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <future>
#include <memory>
#include <mutex>
#include <variant>

#ifdef BUILD_WITH_TASKFLOW
#include <taskflow/algorithm/for_each.hpp>
Expand All @@ -37,6 +38,7 @@
#endif

#ifdef GRAPHBOLT_USE_CUDA
#include <ATen/cuda/CUDAEvent.h>
#include <c10/cuda/CUDAGuard.h>
#include <c10/cuda/CUDAStream.h>
#include <torch/csrc/api/include/torch/cuda.h>
Expand Down Expand Up @@ -92,15 +94,50 @@ inline int get_num_interop_threads() {

template <typename T>
class Future : public torch::CustomClassHolder {
#ifdef GRAPHBOLT_USE_CUDA
using T_no_event = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
using T_with_event = std::conditional_t<
std::is_void_v<T>, at::cuda::CUDAEvent,
std::pair<T, at::cuda::CUDAEvent>>;
using future_type = std::future<std::variant<T_no_event, T_with_event>>;
#else
using future_type = std::future<T>;
#endif

public:
Future(std::future<T>&& future) : future_(std::move(future)) {}
#ifdef GRAPHBOLT_USE_CUDA
using return_type = std::variant<T_no_event, T_with_event>;
#else
using return_type = T;
#endif

Future(future_type&& future) : future_(std::move(future)) {}

Future() = default;

T Wait() { return future_.get(); }
T Wait() {
#ifdef GRAPHBOLT_USE_CUDA
auto result = future_.get();
if constexpr (std::is_void_v<T>) {
if (std::holds_alternative<T_with_event>(result)) {
auto&& event = std::get<T_with_event>(result);
event.block(c10::cuda::getCurrentCUDAStream());
}
return;
} else if (std::holds_alternative<T_with_event>(result)) {
auto&& [value, event] = std::get<T_with_event>(result);
event.block(c10::cuda::getCurrentCUDAStream());
return value;
} else {
return std::get<T_no_event>(result);
}
#else
return future_.get();
#endif
}

private:
std::future<T> future_;
future_type future_;
};

/**
Expand All @@ -109,36 +146,55 @@ class Future : public torch::CustomClassHolder {
* task to avoid spawning a new OpenMP threadpool on each interop thread.
*/
template <typename F>
inline auto async(F&& function) {
inline auto async(F&& function, bool is_cuda = false) {
using T = decltype(function());
#ifdef GRAPHBOLT_USE_CUDA
const auto is_cuda_available = torch::cuda::is_available();
struct c10::StreamData3 stream_data;
if (is_cuda_available) {
if (is_cuda) {
stream_data = c10::cuda::getCurrentCUDAStream().pack3();
}
#endif
auto fn = [=, func = std::move(function)] {
using return_type = typename Future<T>::return_type;
auto fn = [=, func = std::move(function)]() -> return_type {
#ifdef GRAPHBOLT_USE_CUDA
// We make sure to use the same CUDA stream as the thread launching the
// async operation.
if (is_cuda_available) {
if (is_cuda) {
auto stream = c10::cuda::CUDAStream::unpack3(
stream_data.stream_id, stream_data.device_index,
stream_data.device_type);
c10::cuda::CUDAStreamGuard guard(stream);
at::cuda::CUDAEvent event;
// Might be executed on the GPU so we record an event to be able to
// synchronize with it later, in case it is executed on an alternative
// CUDA stream.
if constexpr (std::is_void_v<T>) {
func();
event.record();
return event;
} else {
auto result = func();
event.record();
return std::make_pair(std::move(result), std::move(event));
}
}
if constexpr (std::is_void_v<T>) {
func();
return std::monostate{};
} else {
return func();
}
#endif
#else
return func();
#endif
};
#ifdef BUILD_WITH_TASKFLOW
auto future = interop_pool().async(std::move(fn));
#else
auto promise = std::make_shared<std::promise<T>>();
auto promise = std::make_shared<std::promise<return_type>>();
auto future = promise->get_future();
at::launch([promise, func = std::move(fn)]() {
if constexpr (std::is_void_v<T>) {
if constexpr (std::is_void_v<return_type>) {
func();
promise->set_value();
} else
Expand Down
18 changes: 13 additions & 5 deletions graphbolt/include/graphbolt/fused_sampled_subgraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ struct FusedSampledSubgraph : torch::CustomClassHolder {
* graph.
* @param original_row_node_ids Column's reverse node ids in the original
* graph.
* @param original_edge_ids Reverse edge ids in the original graph.
* @param original_edge_ids Mapping of subgraph edge IDs to original
* FusedCSCSamplingGraph edge IDs.
* @param type_per_edge Type id of each edge.
* @param etype_offsets Edge offsets for the sampled edges for the sampled
* edges that are sorted w.r.t. edge types.
Expand Down Expand Up @@ -91,10 +92,17 @@ struct FusedSampledSubgraph : torch::CustomClassHolder {
torch::optional<torch::Tensor> indices;

/**
* @brief Reverse edge ids in the original graph, the edge with id
* `original_edge_ids[i]` in the original graph is mapped to `i` in this
* subgraph. This is useful when edge features are needed. The edges are
* sorted w.r.t. their edge types for the heterogenous case.
* @brief Mapping of subgraph edge IDs to original FusedCSCSamplingGraph
* edge IDs.
*
* In this subgraph, the edge at index i corresponds to the edge with ID
* original_edge_ids[i] in the original FusedCSCSamplingGraph. Edges are
* sorted by type for heterogeneous graphs.
*
* Note: To retrieve the actual original edge IDs for feature fetching, use
* the `_ORIGINAL_EDGE_ID` edge attribute in FusedCSCSamplingGraph to map the
* `original_edge_ids` agin, as IDs may have been remapped during conversion
* to FusedCSCSamplingGraph.
*/
torch::Tensor original_edge_ids;

Expand Down
10 changes: 10 additions & 0 deletions graphbolt/src/cuda/extension/gpu_cache.cu
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ std::tuple<torch::Tensor, torch::Tensor, torch::Tensor> GpuCache::Query(
return std::make_tuple(values, missing_index, missing_keys);
}

c10::intrusive_ptr<Future<std::vector<torch::Tensor>>> GpuCache::QueryAsync(
torch::Tensor keys) {
return async(
[=] {
auto [values, missing_index, missing_keys] = Query(keys);
return std::vector{values, missing_index, missing_keys};
},
true);
}

void GpuCache::Replace(torch::Tensor keys, torch::Tensor values) {
TORCH_CHECK(keys.device().is_cuda(), "Keys should be on a CUDA device.");
TORCH_CHECK(
Expand Down
4 changes: 4 additions & 0 deletions graphbolt/src/cuda/extension/gpu_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#ifndef GRAPHBOLT_GPU_CACHE_H_
#define GRAPHBOLT_GPU_CACHE_H_

#include <graphbolt/async.h>
#include <torch/custom_class.h>
#include <torch/torch.h>

Expand Down Expand Up @@ -53,6 +54,9 @@ class GpuCache : public torch::CustomClassHolder {
std::tuple<torch::Tensor, torch::Tensor, torch::Tensor> Query(
torch::Tensor keys);

c10::intrusive_ptr<Future<std::vector<torch::Tensor>>> QueryAsync(
torch::Tensor keys);

void Replace(torch::Tensor keys, torch::Tensor values);

static c10::intrusive_ptr<GpuCache> Create(
Expand Down
Loading

0 comments on commit c5285d1

Please sign in to comment.