Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement cudf construction with adapters. #5189

Merged
merged 2 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/xgboost/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ struct Entry {
* \param index The feature or row index.
* \param fvalue The feature value.
*/
Entry(bst_feature_t index, bst_float fvalue) : index(index), fvalue(fvalue) {}
XGBOOST_DEVICE Entry(bst_feature_t index, bst_float fvalue) : index(index), fvalue(fvalue) {}
/*! \brief reversely compare feature values */
inline static bool CmpValue(const Entry& a, const Entry& b) {
return a.fvalue < b.fvalue;
Expand Down
9 changes: 4 additions & 5 deletions src/c_api/c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,14 @@ int XGDMatrixCreateFromDataIter(
API_END();
}

#ifndef XGBOOST_USE_CUDA
XGB_DLL int XGDMatrixCreateFromArrayInterfaces(
char const* c_json_strs, bst_int has_missing, bst_float missing, DMatrixHandle* out) {
char const* c_json_strs, bst_int has_missing, bst_float missing, DMatrixHandle* out) {
API_BEGIN();
std::string json_str {c_json_strs};
std::unique_ptr<data::SimpleCSRSource> source (new data::SimpleCSRSource());
source->CopyFrom(json_str, has_missing, missing);
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(std::move(source)));
LOG(FATAL) << "Xgboost not compiled with cuda";
API_END();
}
#endif

XGB_DLL int XGDMatrixCreateFromCSREx(const size_t* indptr,
const unsigned* indices,
Expand Down
20 changes: 20 additions & 0 deletions src/c_api/c_api.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2014-2019 by Contributors

#include "xgboost/data.h"
#include "xgboost/c_api.h"
#include "c_api_error.h"
#include "../data/simple_csr_source.h"
#include "../data/device_adapter.cuh"

namespace xgboost {
XGB_DLL int XGDMatrixCreateFromArrayInterfaces(char const* c_json_strs,
bst_int has_missing,
bst_float missing,
DMatrixHandle* out) {
API_BEGIN();
std::string json_str{c_json_strs};
data::CudfAdapter adapter(json_str);
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(&adapter, missing, 1));
API_END();
}
} // namespace xgboost
5 changes: 5 additions & 0 deletions src/common/device_helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <thrust/device_malloc_allocator.h>
#include <thrust/system/cuda/error.h>
#include <thrust/system_error.h>
#include <thrust/logical.h>

#include <omp.h>
#include <rabit/rabit.h>
Expand Down Expand Up @@ -372,6 +373,10 @@ public:
safe_cuda(cudaGetDevice(&current_device));
stats_.RegisterDeallocation(ptr, n, current_device);
}
size_t PeakMemory()
{
return stats_.peak_allocated_bytes;
}
void Log() {
if (!xgboost::ConsoleLogger::ShouldLog(xgboost::ConsoleLogger::LV::kDebug))
return;
Expand Down
4 changes: 3 additions & 1 deletion src/data/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
*/
#ifndef XGBOOST_DATA_ADAPTER_H_
#define XGBOOST_DATA_ADAPTER_H_
#include <dmlc/data.h>
#include <limits>
#include <memory>
#include <string>

namespace xgboost {
namespace data {

Expand Down Expand Up @@ -56,7 +58,7 @@ namespace data {
constexpr size_t kAdapterUnknownSize = std::numeric_limits<size_t >::max();

struct COOTuple {
COOTuple(size_t row_idx, size_t column_idx, float value)
XGBOOST_DEVICE COOTuple(size_t row_idx, size_t column_idx, float value)
: row_idx(row_idx), column_idx(column_idx), value(value) {}

size_t row_idx{0};
Expand Down
1 change: 1 addition & 0 deletions src/data/columnar.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class Columnar {
using index_type = int32_t;

public:
Columnar() = default;
explicit Columnar(std::map<std::string, Json> const& column) {
ArrayInterfaceHandler::Validate(column);
data = ArrayInterfaceHandler::GetPtrFromArrayData<void*>(column);
Expand Down
15 changes: 15 additions & 0 deletions src/data/data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "xgboost/json.h"
#include "columnar.h"
#include "../common/device_helpers.cuh"
#include "device_adapter.cuh"
#include "simple_dmatrix.h"

namespace xgboost {

Expand Down Expand Up @@ -67,4 +69,17 @@ void MetaInfo::SetInfo(const char * c_key, std::string const& interface_str) {
LOG(FATAL) << "Unknown metainfo: " << key;
}
}

template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size) {
CHECK_EQ(cache_prefix.size(), 0)
<< "Device memory construction is not currently supported with external "
"memory.";
return new data::SimpleDMatrix(adapter, missing, nthread);
}

template DMatrix* DMatrix::Create<data::CudfAdapter>(
data::CudfAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
} // namespace xgboost
95 changes: 95 additions & 0 deletions src/data/device_adapter.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*!
* Copyright (c) 2019 by Contributors
* \file device_adapter.cuh
*/
#ifndef XGBOOST_DATA_DEVICE_ADAPTER_H_
#define XGBOOST_DATA_DEVICE_ADAPTER_H_
#include <limits>
#include <memory>
#include <string>
#include "columnar.h"
#include "adapter.h"
#include "../common/device_helpers.cuh"

namespace xgboost {
namespace data {

class CudfAdapterBatch : public detail::NoMetaInfo {
public:
CudfAdapterBatch() = default;
CudfAdapterBatch(common::Span<Columnar> columns,
common::Span<size_t> column_ptr, size_t num_elements)
: columns_(columns),column_ptr_(column_ptr), num_elements(num_elements) {}
size_t Size()const { return num_elements; }
__device__ COOTuple GetElement(size_t idx)const
{
size_t column_idx =
dh::UpperBound(column_ptr_.data(), column_ptr_.size(), idx) - 1;
auto& column = columns_[column_idx];
size_t row_idx = idx - column_ptr_[column_idx];
float value = column.valid.Data() == nullptr || column.valid.Check(row_idx)
? column.GetElement(row_idx)
: std::numeric_limits<float>::quiet_NaN();
return COOTuple(row_idx, column_idx, value);
}

private:
common::Span<Columnar> columns_;
common::Span<size_t> column_ptr_;
size_t num_elements;
};

class CudfAdapter : public detail::SingleBatchDataIter<CudfAdapterBatch> {
public:
explicit CudfAdapter(std::string cuda_interfaces_str) {
Json interfaces =
Json::Load({cuda_interfaces_str.c_str(), cuda_interfaces_str.size()});
std::vector<Json> const& json_columns = get<Array>(interfaces);
size_t n_columns = json_columns.size();
CHECK_GT(n_columns, 0) << "Number of columns must not equal to 0.";

auto const& typestr = get<String const>(json_columns[0]["typestr"]);
CHECK_EQ(typestr.size(), 3) << ColumnarErrors::TypestrFormat();
CHECK_NE(typestr.front(), '>') << ColumnarErrors::BigEndian();
std::vector<Columnar> columns;
std::vector<size_t> column_ptr({0});
auto first_column = Columnar(get<Object const>(json_columns[0]));
device_idx_ = dh::CudaGetPointerDevice(first_column.data);
CHECK_NE(device_idx_, -1);
dh::safe_cuda(cudaSetDevice(device_idx_));
num_rows_ = first_column.size;
for (auto& json_col : json_columns) {
auto column = Columnar(get<Object const>(json_col));
columns.push_back(column);
column_ptr.emplace_back(column_ptr.back() + column.size);
num_rows_ = std::max(num_rows_, size_t(column.size));
CHECK_EQ(device_idx_, dh::CudaGetPointerDevice(column.data))
<< "All columns should use the same device.";
CHECK_EQ(num_rows_, column.size)
<< "All columns should have same number of rows.";
}
columns_ = columns;
column_ptr_ = column_ptr;
batch = CudfAdapterBatch(dh::ToSpan(columns_), dh::ToSpan(column_ptr_),
column_ptr.back());
}
const CudfAdapterBatch& Value() const override { return batch; }

size_t NumRows() const { return num_rows_; }
size_t NumColumns() const { return columns_.size(); }
size_t DeviceIdx()const {
return device_idx_;
}

// Cudf is column major
bool IsRowMajor() { return false; }
private:
CudfAdapterBatch batch;
dh::device_vector<Columnar> columns_;
dh::device_vector<size_t> column_ptr_; // Exclusive scan of column sizes
size_t num_rows_{0};
int device_idx_;
};
}; // namespace data
} // namespace xgboost
#endif // XGBOOST_DATA_DEVICE_ADAPTER_H_
120 changes: 120 additions & 0 deletions src/data/simple_dmatrix.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*!
* Copyright 2019 by Contributors
* \file simple_dmatrix.cu
*/
#include <thrust/copy.h>
#include <thrust/execution_policy.h>
#include <thrust/sort.h>
#include <xgboost/data.h>
#include "../common/random.h"
#include "./simple_dmatrix.h"
#include "device_adapter.cuh"

namespace xgboost {
namespace data {

XGBOOST_DEVICE bool IsValid(float value, float missing) {
if (common::CheckNAN(value) || value == missing) {
return false;
}
return true;
}

template <typename AdapterBatchT>
void CountRowOffsets(const AdapterBatchT& batch, common::Span<bst_row_t> offset,
int device_idx, float missing) {
// Count elements per row
dh::LaunchN(device_idx, batch.Size(), [=] __device__(size_t idx) {
auto element = batch.GetElement(idx);
if (IsValid(element.value, missing)) {
atomicAdd(reinterpret_cast<unsigned long long*>( // NOLINT
&offset[element.row_idx]),
static_cast<unsigned long long>(1)); // NOLINT
}
});

dh::XGBCachingDeviceAllocator<char> alloc;
thrust::exclusive_scan(thrust::cuda::par(alloc),
thrust::device_pointer_cast(offset.data()),
thrust::device_pointer_cast(offset.data() + offset.size()),
thrust::device_pointer_cast(offset.data()));
}

template <typename AdapterT>
void CopyDataColumnMajor(AdapterT* adapter, common::Span<Entry> data,
int device_idx, float missing,
common::Span<size_t> row_ptr) {
// Step 1: Get the sizes of the input columns
dh::device_vector<size_t> column_sizes(adapter->NumColumns());
auto d_column_sizes = column_sizes.data().get();
auto& batch = adapter->Value();
// Populate column sizes
dh::LaunchN(device_idx, batch.Size(), [=] __device__(size_t idx) {
const auto& e = batch.GetElement(idx);
atomicAdd(reinterpret_cast<unsigned long long*>( // NOLINT
&d_column_sizes[e.column_idx]),
static_cast<unsigned long long>(1)); // NOLINT
});

thrust::host_vector<size_t> host_column_sizes = column_sizes;

// Step 2: Iterate over columns, place elements in correct row, increment
// temporary row pointers
dh::device_vector<size_t> temp_row_ptr(
thrust::device_pointer_cast(row_ptr.data()),
thrust::device_pointer_cast(row_ptr.data() + row_ptr.size()));
auto d_temp_row_ptr = temp_row_ptr.data().get();
size_t begin = 0;
for (auto size : host_column_sizes) {
size_t end = begin + size;
dh::LaunchN(device_idx, end - begin, [=] __device__(size_t idx) {
const auto& e = batch.GetElement(idx + begin);
if (!IsValid(e.value, missing)) return;
data[d_temp_row_ptr[e.row_idx]] = Entry(e.column_idx, e.value);
d_temp_row_ptr[e.row_idx] += 1;
});

begin = end;
}
}

// Does not currently support metainfo as no on-device data source contains this
// Current implementation assumes a single batch. More batches can
// be supported in future. Does not currently support inferring row/column size
template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
source_.reset(new SimpleCSRSource());
SimpleCSRSource& mat = *reinterpret_cast<SimpleCSRSource*>(source_.get());
CHECK(adapter->NumRows() != kAdapterUnknownSize);
CHECK(adapter->NumColumns() != kAdapterUnknownSize);

adapter->BeforeFirst();
adapter->Next();
auto& batch = adapter->Value();
mat.page_.offset.SetDevice(adapter->DeviceIdx());
mat.page_.data.SetDevice(adapter->DeviceIdx());

// Enforce single batch
CHECK(!adapter->Next());
mat.page_.offset.Resize(adapter->NumRows() + 1);
auto s_offset = mat.page_.offset.DeviceSpan();
CountRowOffsets(batch, s_offset, adapter->DeviceIdx(), missing);
mat.info.num_nonzero_ = mat.page_.offset.HostVector().back();
mat.page_.data.Resize(mat.info.num_nonzero_);
if (adapter->IsRowMajor()) {
LOG(FATAL) << "Not implemented.";
} else {
CopyDataColumnMajor(adapter, mat.page_.data.DeviceSpan(),
adapter->DeviceIdx(), missing, s_offset);
}

mat.info.num_col_ = adapter->NumColumns();
mat.info.num_row_ = adapter->NumRows();
// Synchronise worker columns
rabit::Allreduce<rabit::op::Max>(&mat.info.num_col_, 1);
}

template SimpleDMatrix::SimpleDMatrix(CudfAdapter* adapter, float missing,
int nthread);
} // namespace data
} // namespace xgboost
4 changes: 2 additions & 2 deletions tests/cpp/data/test_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "../../../src/common/timer.h"
#include "../helpers.h"
using namespace xgboost; // NOLINT
TEST(c_api, CSRAdapter) {
TEST(adapter, CSRAdapter) {
int m = 3;
int n = 2;
std::vector<float> data = {1, 2, 3, 4, 5};
Expand All @@ -29,7 +29,7 @@ TEST(c_api, CSRAdapter) {
EXPECT_EQ(line2 .GetElement(0).column_idx, 1);
}

TEST(c_api, CSCAdapterColsMoreThanRows) {
TEST(adapter, CSCAdapterColsMoreThanRows) {
std::vector<float> data = {1, 2, 3, 4, 5, 6, 7, 8};
std::vector<unsigned> row_idx = {0, 1, 0, 1, 0, 1, 0, 1};
std::vector<size_t> col_ptr = {0, 2, 4, 6, 8};
Expand Down
Loading