Skip to content

Commit

Permalink
Implement cudf construction with adapters. (#5189)
Browse files Browse the repository at this point in the history
  • Loading branch information
RAMitchell authored Jan 9, 2020
1 parent 9559f81 commit 87ebfc1
Show file tree
Hide file tree
Showing 14 changed files with 705 additions and 34 deletions.
2 changes: 1 addition & 1 deletion include/xgboost/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ struct Entry {
* \param index The feature or row index.
* \param fvalue The feature value.
*/
Entry(bst_feature_t index, bst_float fvalue) : index(index), fvalue(fvalue) {}
XGBOOST_DEVICE Entry(bst_feature_t index, bst_float fvalue) : index(index), fvalue(fvalue) {}
/*! \brief reversely compare feature values */
inline static bool CmpValue(const Entry& a, const Entry& b) {
return a.fvalue < b.fvalue;
Expand Down
9 changes: 4 additions & 5 deletions src/c_api/c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,14 @@ int XGDMatrixCreateFromDataIter(
API_END();
}

#ifndef XGBOOST_USE_CUDA
XGB_DLL int XGDMatrixCreateFromArrayInterfaces(
char const* c_json_strs, bst_int has_missing, bst_float missing, DMatrixHandle* out) {
char const* c_json_strs, bst_int has_missing, bst_float missing, DMatrixHandle* out) {
API_BEGIN();
std::string json_str {c_json_strs};
std::unique_ptr<data::SimpleCSRSource> source (new data::SimpleCSRSource());
source->CopyFrom(json_str, has_missing, missing);
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(std::move(source)));
LOG(FATAL) << "Xgboost not compiled with cuda";
API_END();
}
#endif

XGB_DLL int XGDMatrixCreateFromCSREx(const size_t* indptr,
const unsigned* indices,
Expand Down
20 changes: 20 additions & 0 deletions src/c_api/c_api.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2014-2019 by Contributors

#include "xgboost/data.h"
#include "xgboost/c_api.h"
#include "c_api_error.h"
#include "../data/simple_csr_source.h"
#include "../data/device_adapter.cuh"

namespace xgboost {
XGB_DLL int XGDMatrixCreateFromArrayInterfaces(char const* c_json_strs,
bst_int has_missing,
bst_float missing,
DMatrixHandle* out) {
API_BEGIN();
std::string json_str{c_json_strs};
data::CudfAdapter adapter(json_str);
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(&adapter, missing, 1));
API_END();
}
} // namespace xgboost
5 changes: 5 additions & 0 deletions src/common/device_helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <thrust/device_malloc_allocator.h>
#include <thrust/system/cuda/error.h>
#include <thrust/system_error.h>
#include <thrust/logical.h>

#include <omp.h>
#include <rabit/rabit.h>
Expand Down Expand Up @@ -372,6 +373,10 @@ public:
safe_cuda(cudaGetDevice(&current_device));
stats_.RegisterDeallocation(ptr, n, current_device);
}
size_t PeakMemory()
{
return stats_.peak_allocated_bytes;
}
void Log() {
if (!xgboost::ConsoleLogger::ShouldLog(xgboost::ConsoleLogger::LV::kDebug))
return;
Expand Down
4 changes: 3 additions & 1 deletion src/data/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
*/
#ifndef XGBOOST_DATA_ADAPTER_H_
#define XGBOOST_DATA_ADAPTER_H_
#include <dmlc/data.h>
#include <limits>
#include <memory>
#include <string>

namespace xgboost {
namespace data {

Expand Down Expand Up @@ -56,7 +58,7 @@ namespace data {
constexpr size_t kAdapterUnknownSize = std::numeric_limits<size_t >::max();

struct COOTuple {
COOTuple(size_t row_idx, size_t column_idx, float value)
XGBOOST_DEVICE COOTuple(size_t row_idx, size_t column_idx, float value)
: row_idx(row_idx), column_idx(column_idx), value(value) {}

size_t row_idx{0};
Expand Down
1 change: 1 addition & 0 deletions src/data/columnar.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class Columnar {
using index_type = int32_t;

public:
Columnar() = default;
explicit Columnar(std::map<std::string, Json> const& column) {
ArrayInterfaceHandler::Validate(column);
data = ArrayInterfaceHandler::GetPtrFromArrayData<void*>(column);
Expand Down
15 changes: 15 additions & 0 deletions src/data/data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "xgboost/json.h"
#include "columnar.h"
#include "../common/device_helpers.cuh"
#include "device_adapter.cuh"
#include "simple_dmatrix.h"

namespace xgboost {

Expand Down Expand Up @@ -67,4 +69,17 @@ void MetaInfo::SetInfo(const char * c_key, std::string const& interface_str) {
LOG(FATAL) << "Unknown metainfo: " << key;
}
}

template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size) {
CHECK_EQ(cache_prefix.size(), 0)
<< "Device memory construction is not currently supported with external "
"memory.";
return new data::SimpleDMatrix(adapter, missing, nthread);
}

template DMatrix* DMatrix::Create<data::CudfAdapter>(
data::CudfAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
} // namespace xgboost
95 changes: 95 additions & 0 deletions src/data/device_adapter.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*!
* Copyright (c) 2019 by Contributors
* \file device_adapter.cuh
*/
#ifndef XGBOOST_DATA_DEVICE_ADAPTER_H_
#define XGBOOST_DATA_DEVICE_ADAPTER_H_
#include <limits>
#include <memory>
#include <string>
#include "columnar.h"
#include "adapter.h"
#include "../common/device_helpers.cuh"

namespace xgboost {
namespace data {

class CudfAdapterBatch : public detail::NoMetaInfo {
public:
CudfAdapterBatch() = default;
CudfAdapterBatch(common::Span<Columnar> columns,
common::Span<size_t> column_ptr, size_t num_elements)
: columns_(columns),column_ptr_(column_ptr), num_elements(num_elements) {}
size_t Size()const { return num_elements; }
__device__ COOTuple GetElement(size_t idx)const
{
size_t column_idx =
dh::UpperBound(column_ptr_.data(), column_ptr_.size(), idx) - 1;
auto& column = columns_[column_idx];
size_t row_idx = idx - column_ptr_[column_idx];
float value = column.valid.Data() == nullptr || column.valid.Check(row_idx)
? column.GetElement(row_idx)
: std::numeric_limits<float>::quiet_NaN();
return COOTuple(row_idx, column_idx, value);
}

private:
common::Span<Columnar> columns_;
common::Span<size_t> column_ptr_;
size_t num_elements;
};

class CudfAdapter : public detail::SingleBatchDataIter<CudfAdapterBatch> {
public:
explicit CudfAdapter(std::string cuda_interfaces_str) {
Json interfaces =
Json::Load({cuda_interfaces_str.c_str(), cuda_interfaces_str.size()});
std::vector<Json> const& json_columns = get<Array>(interfaces);
size_t n_columns = json_columns.size();
CHECK_GT(n_columns, 0) << "Number of columns must not equal to 0.";

auto const& typestr = get<String const>(json_columns[0]["typestr"]);
CHECK_EQ(typestr.size(), 3) << ColumnarErrors::TypestrFormat();
CHECK_NE(typestr.front(), '>') << ColumnarErrors::BigEndian();
std::vector<Columnar> columns;
std::vector<size_t> column_ptr({0});
auto first_column = Columnar(get<Object const>(json_columns[0]));
device_idx_ = dh::CudaGetPointerDevice(first_column.data);
CHECK_NE(device_idx_, -1);
dh::safe_cuda(cudaSetDevice(device_idx_));
num_rows_ = first_column.size;
for (auto& json_col : json_columns) {
auto column = Columnar(get<Object const>(json_col));
columns.push_back(column);
column_ptr.emplace_back(column_ptr.back() + column.size);
num_rows_ = std::max(num_rows_, size_t(column.size));
CHECK_EQ(device_idx_, dh::CudaGetPointerDevice(column.data))
<< "All columns should use the same device.";
CHECK_EQ(num_rows_, column.size)
<< "All columns should have same number of rows.";
}
columns_ = columns;
column_ptr_ = column_ptr;
batch = CudfAdapterBatch(dh::ToSpan(columns_), dh::ToSpan(column_ptr_),
column_ptr.back());
}
const CudfAdapterBatch& Value() const override { return batch; }

size_t NumRows() const { return num_rows_; }
size_t NumColumns() const { return columns_.size(); }
size_t DeviceIdx()const {
return device_idx_;
}

// Cudf is column major
bool IsRowMajor() { return false; }
private:
CudfAdapterBatch batch;
dh::device_vector<Columnar> columns_;
dh::device_vector<size_t> column_ptr_; // Exclusive scan of column sizes
size_t num_rows_{0};
int device_idx_;
};
}; // namespace data
} // namespace xgboost
#endif // XGBOOST_DATA_DEVICE_ADAPTER_H_
120 changes: 120 additions & 0 deletions src/data/simple_dmatrix.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*!
* Copyright 2019 by Contributors
* \file simple_dmatrix.cu
*/
#include <thrust/copy.h>
#include <thrust/execution_policy.h>
#include <thrust/sort.h>
#include <xgboost/data.h>
#include "../common/random.h"
#include "./simple_dmatrix.h"
#include "device_adapter.cuh"

namespace xgboost {
namespace data {

XGBOOST_DEVICE bool IsValid(float value, float missing) {
if (common::CheckNAN(value) || value == missing) {
return false;
}
return true;
}

template <typename AdapterBatchT>
void CountRowOffsets(const AdapterBatchT& batch, common::Span<bst_row_t> offset,
int device_idx, float missing) {
// Count elements per row
dh::LaunchN(device_idx, batch.Size(), [=] __device__(size_t idx) {
auto element = batch.GetElement(idx);
if (IsValid(element.value, missing)) {
atomicAdd(reinterpret_cast<unsigned long long*>( // NOLINT
&offset[element.row_idx]),
static_cast<unsigned long long>(1)); // NOLINT
}
});

dh::XGBCachingDeviceAllocator<char> alloc;
thrust::exclusive_scan(thrust::cuda::par(alloc),
thrust::device_pointer_cast(offset.data()),
thrust::device_pointer_cast(offset.data() + offset.size()),
thrust::device_pointer_cast(offset.data()));
}

template <typename AdapterT>
void CopyDataColumnMajor(AdapterT* adapter, common::Span<Entry> data,
int device_idx, float missing,
common::Span<size_t> row_ptr) {
// Step 1: Get the sizes of the input columns
dh::device_vector<size_t> column_sizes(adapter->NumColumns());
auto d_column_sizes = column_sizes.data().get();
auto& batch = adapter->Value();
// Populate column sizes
dh::LaunchN(device_idx, batch.Size(), [=] __device__(size_t idx) {
const auto& e = batch.GetElement(idx);
atomicAdd(reinterpret_cast<unsigned long long*>( // NOLINT
&d_column_sizes[e.column_idx]),
static_cast<unsigned long long>(1)); // NOLINT
});

thrust::host_vector<size_t> host_column_sizes = column_sizes;

// Step 2: Iterate over columns, place elements in correct row, increment
// temporary row pointers
dh::device_vector<size_t> temp_row_ptr(
thrust::device_pointer_cast(row_ptr.data()),
thrust::device_pointer_cast(row_ptr.data() + row_ptr.size()));
auto d_temp_row_ptr = temp_row_ptr.data().get();
size_t begin = 0;
for (auto size : host_column_sizes) {
size_t end = begin + size;
dh::LaunchN(device_idx, end - begin, [=] __device__(size_t idx) {
const auto& e = batch.GetElement(idx + begin);
if (!IsValid(e.value, missing)) return;
data[d_temp_row_ptr[e.row_idx]] = Entry(e.column_idx, e.value);
d_temp_row_ptr[e.row_idx] += 1;
});

begin = end;
}
}

// Does not currently support metainfo as no on-device data source contains this
// Current implementation assumes a single batch. More batches can
// be supported in future. Does not currently support inferring row/column size
template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
source_.reset(new SimpleCSRSource());
SimpleCSRSource& mat = *reinterpret_cast<SimpleCSRSource*>(source_.get());
CHECK(adapter->NumRows() != kAdapterUnknownSize);
CHECK(adapter->NumColumns() != kAdapterUnknownSize);

adapter->BeforeFirst();
adapter->Next();
auto& batch = adapter->Value();
mat.page_.offset.SetDevice(adapter->DeviceIdx());
mat.page_.data.SetDevice(adapter->DeviceIdx());

// Enforce single batch
CHECK(!adapter->Next());
mat.page_.offset.Resize(adapter->NumRows() + 1);
auto s_offset = mat.page_.offset.DeviceSpan();
CountRowOffsets(batch, s_offset, adapter->DeviceIdx(), missing);
mat.info.num_nonzero_ = mat.page_.offset.HostVector().back();
mat.page_.data.Resize(mat.info.num_nonzero_);
if (adapter->IsRowMajor()) {
LOG(FATAL) << "Not implemented.";
} else {
CopyDataColumnMajor(adapter, mat.page_.data.DeviceSpan(),
adapter->DeviceIdx(), missing, s_offset);
}

mat.info.num_col_ = adapter->NumColumns();
mat.info.num_row_ = adapter->NumRows();
// Synchronise worker columns
rabit::Allreduce<rabit::op::Max>(&mat.info.num_col_, 1);
}

template SimpleDMatrix::SimpleDMatrix(CudfAdapter* adapter, float missing,
int nthread);
} // namespace data
} // namespace xgboost
4 changes: 2 additions & 2 deletions tests/cpp/data/test_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "../../../src/common/timer.h"
#include "../helpers.h"
using namespace xgboost; // NOLINT
TEST(c_api, CSRAdapter) {
TEST(adapter, CSRAdapter) {
int m = 3;
int n = 2;
std::vector<float> data = {1, 2, 3, 4, 5};
Expand All @@ -29,7 +29,7 @@ TEST(c_api, CSRAdapter) {
EXPECT_EQ(line2 .GetElement(0).column_idx, 1);
}

TEST(c_api, CSCAdapterColsMoreThanRows) {
TEST(adapter, CSCAdapterColsMoreThanRows) {
std::vector<float> data = {1, 2, 3, 4, 5, 6, 7, 8};
std::vector<unsigned> row_idx = {0, 1, 0, 1, 0, 1, 0, 1};
std::vector<size_t> col_ptr = {0, 2, 4, 6, 8};
Expand Down
Loading

0 comments on commit 87ebfc1

Please sign in to comment.