From f2b8cd2922ecf62f3e6208a1b49d988375c75cf2 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Tue, 25 Feb 2020 23:42:01 +0800 Subject: [PATCH] Add number of columns to native data iterator. (#5202) * Change native data iter into an adapter. --- include/xgboost/c_api.h | 4 +- .../xgboost4j/src/native/xgboost4j.cpp | 5 +- src/c_api/c_api.cc | 132 ++---------------- src/common/group_data.h | 20 +-- src/data/adapter.h | 118 +++++++++++++++- src/data/data.cc | 13 +- src/data/simple_dmatrix.cc | 6 +- src/objective/regression_obj.cu | 5 +- tests/cpp/data/test_adapter.cc | 78 ++++++++++- tests/cpp/data/test_simple_dmatrix.cc | 17 ++- tests/cpp/data/test_sparse_page_dmatrix.cc | 2 + 11 files changed, 244 insertions(+), 156 deletions(-) diff --git a/include/xgboost/c_api.h b/include/xgboost/c_api.h index 64685ab9f1d6..52b837e26e9d 100644 --- a/include/xgboost/c_api.h +++ b/include/xgboost/c_api.h @@ -1,5 +1,5 @@ /*! - * Copyright (c) 2015 by Contributors + * Copyright (c) 2015~2020 by Contributors * \file c_api.h * \author Tianqi Chen * \brief C API of XGBoost, used for interfacing to other languages. @@ -40,6 +40,8 @@ typedef void *DataHolderHandle; // NOLINT(*) typedef struct { // NOLINT(*) /*! \brief number of rows in the minibatch */ size_t size; + /* \brief number of columns in the minibatch. */ + size_t columns; /*! \brief row pointer to the rows in the data */ #ifdef __APPLE__ /* Necessary as Java on MacOS defines jlong as long int diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j.cpp b/jvm-packages/xgboost4j/src/native/xgboost4j.cpp index 3111b957006a..5bc6f13b11a9 100644 --- a/jvm-packages/xgboost4j/src/native/xgboost4j.cpp +++ b/jvm-packages/xgboost4j/src/native/xgboost4j.cpp @@ -12,7 +12,9 @@ limitations under the License. */ +#include #include +#include #include #include #include @@ -88,9 +90,10 @@ XGB_EXTERN_C int XGBoost4jCallbackDataIterNext( jintArray jindex = (jintArray)jenv->GetObjectField( batch, jenv->GetFieldID(batchClass, "featureIndex", "[I")); jfloatArray jvalue = (jfloatArray)jenv->GetObjectField( - batch, jenv->GetFieldID(batchClass, "featureValue", "[F")); + batch, jenv->GetFieldID(batchClass, "featureValue", "[F")); XGBoostBatchCSR cbatch; cbatch.size = jenv->GetArrayLength(joffset) - 1; + cbatch.columns = std::numeric_limits::max(); cbatch.offset = reinterpret_cast( jenv->GetLongArrayElements(joffset, 0)); if (jlabel != nullptr) { diff --git a/src/c_api/c_api.cc b/src/c_api/c_api.cc index 7283c70d4dc8..5c034bb076ef 100644 --- a/src/c_api/c_api.cc +++ b/src/c_api/c_api.cc @@ -11,7 +11,7 @@ #include #include - +#include "xgboost/base.h" #include "xgboost/data.h" #include "xgboost/learner.h" #include "xgboost/c_api.h" @@ -24,116 +24,6 @@ #include "../data/adapter.h" #include "../data/simple_dmatrix.h" -namespace xgboost { -// declare the data callback. -XGB_EXTERN_C int XGBoostNativeDataIterSetData( - void *handle, XGBoostBatchCSR batch); - -/*! \brief Native data iterator that takes callback to return data */ -class NativeDataIter : public dmlc::Parser { - public: - NativeDataIter(DataIterHandle data_handle, - XGBCallbackDataIterNext* next_callback) - : at_first_(true), bytes_read_(0), - data_handle_(data_handle), next_callback_(next_callback) { - } - - // override functions - void BeforeFirst() override { - CHECK(at_first_) << "cannot reset NativeDataIter"; - } - - bool Next() override { - if ((*next_callback_)(data_handle_, - XGBoostNativeDataIterSetData, - this) != 0) { - at_first_ = false; - return true; - } else { - return false; - } - } - - const dmlc::RowBlock& Value() const override { - return block_; - } - - size_t BytesRead() const override { - return bytes_read_; - } - - // callback to set the data - void SetData(const XGBoostBatchCSR& batch) { - offset_.clear(); - label_.clear(); - weight_.clear(); - index_.clear(); - value_.clear(); - offset_.insert(offset_.end(), batch.offset, batch.offset + batch.size + 1); - if (batch.label != nullptr) { - label_.insert(label_.end(), batch.label, batch.label + batch.size); - } - if (batch.weight != nullptr) { - weight_.insert(weight_.end(), batch.weight, batch.weight + batch.size); - } - if (batch.index != nullptr) { - index_.insert(index_.end(), batch.index + offset_[0], batch.index + offset_.back()); - } - if (batch.value != nullptr) { - value_.insert(value_.end(), batch.value + offset_[0], batch.value + offset_.back()); - } - if (offset_[0] != 0) { - size_t base = offset_[0]; - for (size_t& item : offset_) { - item -= base; - } - } - block_.size = batch.size; - block_.offset = dmlc::BeginPtr(offset_); - block_.label = dmlc::BeginPtr(label_); - block_.weight = dmlc::BeginPtr(weight_); - block_.qid = nullptr; - block_.field = nullptr; - block_.index = dmlc::BeginPtr(index_); - block_.value = dmlc::BeginPtr(value_); - bytes_read_ += offset_.size() * sizeof(size_t) + - label_.size() * sizeof(dmlc::real_t) + - weight_.size() * sizeof(dmlc::real_t) + - index_.size() * sizeof(uint32_t) + - value_.size() * sizeof(dmlc::real_t); - } - - private: - // at the beinning. - bool at_first_; - // bytes that is read. - size_t bytes_read_; - // handle to the iterator, - DataIterHandle data_handle_; - // call back to get the data. - XGBCallbackDataIterNext* next_callback_; - // internal offset - std::vector offset_; - // internal label data - std::vector label_; - // internal weight data - std::vector weight_; - // internal index. - std::vector index_; - // internal value. - std::vector value_; - // internal Rowblock - dmlc::RowBlock block_; -}; - -int XGBoostNativeDataIterSetData( - void *handle, XGBoostBatchCSR batch) { - API_BEGIN(); - static_cast(handle)->SetData(batch); - API_END(); -} -} // namespace xgboost - using namespace xgboost; // NOLINT(*); /*! \brief entry to to easily hold returning information */ @@ -186,21 +76,23 @@ int XGDMatrixCreateFromFile(const char *fname, API_END(); } -int XGDMatrixCreateFromDataIter( - void* data_handle, - XGBCallbackDataIterNext* callback, - const char *cache_info, - DMatrixHandle *out) { +XGB_DLL int XGDMatrixCreateFromDataIter( + void *data_handle, // a Java interator + XGBCallbackDataIterNext *callback, // C++ callback defined in xgboost4j.cpp + const char *cache_info, DMatrixHandle *out) { API_BEGIN(); std::string scache; if (cache_info != nullptr) { scache = cache_info; } - NativeDataIter parser(data_handle, callback); - data::FileAdapter adapter(&parser); - *out = new std::shared_ptr(DMatrix::Create( - &adapter, std::numeric_limits::quiet_NaN(), 1, scache)); + xgboost::data::IteratorAdapter adapter(data_handle, callback); + *out = new std::shared_ptr { + DMatrix::Create( + &adapter, std::numeric_limits::quiet_NaN(), + 1, scache + ) + }; API_END(); } diff --git a/src/common/group_data.h b/src/common/group_data.h index 22c1431905aa..0144d8099926 100644 --- a/src/common/group_data.h +++ b/src/common/group_data.h @@ -1,5 +1,5 @@ /*! - * Copyright 2014 by Contributors + * Copyright 2014-2020 by Contributors * \file group_data.h * \brief this file defines utils to group data by integer keys * Input: given input sequence (key,value), (k1,v1), (k2,v2) @@ -14,6 +14,7 @@ #ifndef XGBOOST_COMMON_GROUP_DATA_H_ #define XGBOOST_COMMON_GROUP_DATA_H_ +#include #include #include @@ -44,15 +45,6 @@ class ParallelGroupBuilder { size_t base_row_offset = 0) : rptr_(*p_rptr), data_(*p_data), - thread_rptr_(tmp_thread_rptr_), - base_row_offset_(base_row_offset) {} - ParallelGroupBuilder(std::vector *p_rptr, - std::vector *p_data, - std::vector > *p_thread_rptr, - size_t base_row_offset = 0) - : rptr_(*p_rptr), - data_(*p_data), - thread_rptr_(*p_thread_rptr), base_row_offset_(base_row_offset) {} /*! @@ -61,7 +53,7 @@ class ParallelGroupBuilder { * \param max_key number of keys in the matrix, can be smaller than expected * \param nthread number of thread that will be used in construction */ - inline void InitBudget(std::size_t max_key, int nthread) { + void InitBudget(std::size_t max_key, int nthread) { thread_rptr_.resize(nthread); for (std::size_t i = 0; i < thread_rptr_.size(); ++i) { thread_rptr_[i].resize(max_key - std::min(base_row_offset_, max_key)); @@ -74,7 +66,7 @@ class ParallelGroupBuilder { * \param threadid the id of thread that calls this function * \param nelem number of element budget add to this row */ - inline void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) { + void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) { std::vector &trptr = thread_rptr_[threadid]; size_t offset_key = key - base_row_offset_; if (trptr.size() < offset_key + 1) { @@ -129,9 +121,7 @@ class ParallelGroupBuilder { /*! \brief index of nonzero entries in each row */ std::vector &data_; /*! \brief thread local data structure */ - std::vector > &thread_rptr_; - /*! \brief local temp thread ptr, use this if not specified by the constructor */ - std::vector > tmp_thread_rptr_; + std::vector > thread_rptr_; /** \brief Used when rows being pushed into the builder are strictly above some number. */ size_t base_row_offset_; }; diff --git a/src/data/adapter.h b/src/data/adapter.h index 57a0d77909e0..ad40e390246f 100644 --- a/src/data/adapter.h +++ b/src/data/adapter.h @@ -1,18 +1,26 @@ /*! - * Copyright (c) 2019 by Contributors + * Copyright (c) 2019~2020 by Contributors * \file adapter.h */ #ifndef XGBOOST_DATA_ADAPTER_H_ #define XGBOOST_DATA_ADAPTER_H_ #include + +#include +#include #include #include #include #include #include +#include "xgboost/logging.h" #include "xgboost/base.h" #include "xgboost/data.h" +#include "xgboost/span.h" +#include "xgboost/c_api.h" + +#include "../c_api/c_api_error.h" namespace xgboost { namespace data { @@ -418,7 +426,7 @@ class FileAdapterBatch { public: class Line { public: - Line(size_t row_idx, const uint32_t* feature_idx, const float* value, + Line(size_t row_idx, const uint32_t *feature_idx, const float *value, size_t size) : row_idx_(row_idx), feature_idx_(feature_idx), @@ -485,6 +493,112 @@ class FileAdapter : dmlc::DataIter { dmlc::Parser* parser_; }; +/*! \brief Data iterator that takes callback to return data, used in JVM package for + * accepting data iterator. */ +class IteratorAdapter : public dmlc::DataIter { + public: + IteratorAdapter(DataIterHandle data_handle, + XGBCallbackDataIterNext* next_callback) + : columns_{data::kAdapterUnknownSize}, row_offset_{0}, + at_first_(true), + data_handle_(data_handle), next_callback_(next_callback) {} + + // override functions + void BeforeFirst() override { + CHECK(at_first_) << "Cannot reset IteratorAdapter"; + } + + bool Next() override { + if ((*next_callback_)( + data_handle_, + [](void *handle, XGBoostBatchCSR batch) -> int { + API_BEGIN(); + static_cast(handle)->SetData(batch); + API_END(); + }, + this) != 0) { + at_first_ = false; + return true; + } else { + return false; + } + } + + FileAdapterBatch const& Value() const override { + return *batch_.get(); + } + + // callback to set the data + void SetData(const XGBoostBatchCSR& batch) { + offset_.clear(); + label_.clear(); + weight_.clear(); + index_.clear(); + value_.clear(); + offset_.insert(offset_.end(), batch.offset, batch.offset + batch.size + 1); + + if (batch.label != nullptr) { + label_.insert(label_.end(), batch.label, batch.label + batch.size); + } + if (batch.weight != nullptr) { + weight_.insert(weight_.end(), batch.weight, batch.weight + batch.size); + } + if (batch.index != nullptr) { + index_.insert(index_.end(), batch.index + offset_[0], + batch.index + offset_.back()); + } + if (batch.value != nullptr) { + value_.insert(value_.end(), batch.value + offset_[0], + batch.value + offset_.back()); + } + if (offset_[0] != 0) { + size_t base = offset_[0]; + for (size_t &item : offset_) { + item -= base; + } + } + CHECK(columns_ == data::kAdapterUnknownSize || columns_ == batch.columns) + << "Number of columns between batches changed from " << columns_ + << " to " << batch.columns; + + columns_ = batch.columns; + block_.size = batch.size; + + block_.offset = dmlc::BeginPtr(offset_); + block_.label = dmlc::BeginPtr(label_); + block_.weight = dmlc::BeginPtr(weight_); + block_.qid = nullptr; + block_.field = nullptr; + block_.index = dmlc::BeginPtr(index_); + block_.value = dmlc::BeginPtr(value_); + + batch_.reset(new FileAdapterBatch(&block_, row_offset_)); + row_offset_ += offset_.size() - 1; + } + + size_t NumColumns() const { return columns_; } + size_t NumRows() const { return kAdapterUnknownSize; } + + private: + std::vector offset_; + std::vector label_; + std::vector weight_; + std::vector index_; + std::vector value_; + + size_t columns_; + size_t row_offset_; + // at the beinning. + bool at_first_; + // handle to the iterator, + DataIterHandle data_handle_; + // call back to get the data. + XGBCallbackDataIterNext *next_callback_; + // internal Rowblock + dmlc::RowBlock block_; + std::unique_ptr batch_; +}; + class DMatrixSliceAdapterBatch { public: // Fetch metainfo values according to sliced rows diff --git a/src/data/data.cc b/src/data/data.cc index 9fca3c3058e5..92b2840f4b8d 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -463,6 +463,9 @@ template DMatrix* DMatrix::Create( template DMatrix* DMatrix::Create( data::DMatrixSliceAdapter* adapter, float missing, int nthread, const std::string& cache_prefix, size_t page_size); +template DMatrix* DMatrix::Create( + data::IteratorAdapter* adapter, float missing, int nthread, + const std::string& cache_prefix, size_t page_size); SparsePage SparsePage::GetTranspose(int num_columns) const { SparsePage transpose; @@ -544,15 +547,15 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread int tid = omp_get_thread_num(); auto line = batch.GetLine(i); for (auto j = 0ull; j < line.Size(); j++) { - auto element = line.GetElement(j); + data::COOTuple element = line.GetElement(j); max_columns = std::max(max_columns, static_cast(element.column_idx + 1)); if (!common::CheckNAN(element.value) && element.value != missing) { - size_t key = element.row_idx - - base_rowid; // Adapter row index is absolute, here we want - // it relative to current page + size_t key = element.row_idx - base_rowid; + // Adapter row index is absolute, here we want it relative to + // current page CHECK_GE(key, builder_base_row_offset); - builder.AddBudget(element.row_idx - base_rowid, tid); + builder.AddBudget(key, tid); } } } diff --git a/src/data/simple_dmatrix.cc b/src/data/simple_dmatrix.cc index 86974e09cf83..833949eabb5c 100644 --- a/src/data/simple_dmatrix.cc +++ b/src/data/simple_dmatrix.cc @@ -1,5 +1,5 @@ /*! - * Copyright 2014 by Contributors + * Copyright 2014~2020 by Contributors * \file simple_dmatrix.cc * \brief the input data structure for gradient boosting * \author Tianqi Chen @@ -8,7 +8,7 @@ #include #include "./simple_batch_iterator.h" #include "../common/random.h" -#include "../data/adapter.h" +#include "adapter.h" namespace xgboost { namespace data { @@ -175,5 +175,7 @@ template SimpleDMatrix::SimpleDMatrix(FileAdapter* adapter, float missing, int nthread); template SimpleDMatrix::SimpleDMatrix(DMatrixSliceAdapter* adapter, float missing, int nthread); +template SimpleDMatrix::SimpleDMatrix(IteratorAdapter* adapter, float missing, + int nthread); } // namespace data } // namespace xgboost diff --git a/src/objective/regression_obj.cu b/src/objective/regression_obj.cu index cd9b1e4ce594..90799d226dfb 100644 --- a/src/objective/regression_obj.cu +++ b/src/objective/regression_obj.cu @@ -58,8 +58,9 @@ class RegLossObj : public ObjFunction { LOG(WARNING) << "Label set is empty."; } CHECK_EQ(preds.Size(), info.labels_.Size()) - << "labels are not correctly provided" - << "preds.size=" << preds.Size() << ", label.size=" << info.labels_.Size(); + << " " << "labels are not correctly provided" + << "preds.size=" << preds.Size() << ", label.size=" << info.labels_.Size() << ", " + << "Loss: " << Loss::Name(); size_t const ndata = preds.Size(); out_gpair->Resize(ndata); auto device = tparam_->gpu_id; diff --git a/tests/cpp/data/test_adapter.cc b/tests/cpp/data/test_adapter.cc index 4c048ff97488..343c6e085132 100644 --- a/tests/cpp/data/test_adapter.cc +++ b/tests/cpp/data/test_adapter.cc @@ -1,18 +1,24 @@ // Copyright (c) 2019 by Contributors #include +#include +#include #include #include "../../../src/data/adapter.h" #include "../../../src/data/simple_dmatrix.h" #include "../../../src/common/timer.h" #include "../helpers.h" -using namespace xgboost; // NOLINT -TEST(adapter, CSRAdapter) { + +#include "xgboost/base.h" +#include "xgboost/c_api.h" + +namespace xgboost { +TEST(Adapter, CSRAdapter) { int n = 2; std::vector data = {1, 2, 3, 4, 5}; std::vector feature_idx = {0, 1, 0, 1, 1}; std::vector row_ptr = {0, 2, 4, 5}; data::CSRAdapter adapter(row_ptr.data(), feature_idx.data(), data.data(), - row_ptr.size() - 1, data.size(), n); + row_ptr.size() - 1, data.size(), n); adapter.Next(); auto & batch = adapter.Value(); auto line0 = batch.GetLine(0); @@ -28,7 +34,7 @@ TEST(adapter, CSRAdapter) { EXPECT_EQ(line2 .GetElement(0).column_idx, 1); } -TEST(adapter, CSCAdapterColsMoreThanRows) { +TEST(Adapter, CSCAdapterColsMoreThanRows) { std::vector data = {1, 2, 3, 4, 5, 6, 7, 8}; std::vector row_idx = {0, 1, 0, 1, 0, 1, 0, 1}; std::vector col_ptr = {0, 2, 4, 6, 8}; @@ -88,3 +94,67 @@ TEST(c_api, DMatrixSliceAdapterFromSimpleDMatrix) { delete pp_dmat; } + +// A mock for JVM data iterator. +class DataIterForTest { + std::vector data_ {1, 2, 3, 4, 5}; + std::vector().index)>::type> + feature_idx_ {0, 1, 0, 1, 1}; + std::vector().offset)>::type> + row_ptr_ {0, 2, 4, 5}; + size_t iter_ {0}; + + public: + size_t static constexpr kCols { 13 }; // Test for having some missing columns + + XGBoostBatchCSR Next() { + for (auto& v : data_) { + v += iter_; + } + XGBoostBatchCSR batch; + batch.columns = 2; + batch.offset = dmlc::BeginPtr(row_ptr_); + batch.index = dmlc::BeginPtr(feature_idx_); + batch.value = dmlc::BeginPtr(data_); + batch.size = 3; + + batch.label = nullptr; + batch.weight = nullptr; + + iter_++; + + return batch; + } + size_t Iter() const { return iter_; } +}; + +size_t constexpr DataIterForTest::kCols; + +int SetDataNextForTest(DataIterHandle data_handle, + XGBCallbackSetData *set_function, + DataHolderHandle set_function_handle) { + size_t constexpr kIters { 2 }; + auto iter = static_cast(data_handle); + if (iter->Iter() < kIters) { + auto batch = iter->Next(); + batch.columns = DataIterForTest::kCols; + set_function(set_function_handle, batch); + return 1; + } else { + return 0; // stoping condition + } +} + +TEST(Adapter, IteratorAdaper) { + DataIterForTest iter; + data::IteratorAdapter adapter{&iter, SetDataNextForTest}; + constexpr size_t kRows { 6 }; + + std::unique_ptr data { + DMatrix::Create(&adapter, std::numeric_limits::quiet_NaN(), 1) + }; + ASSERT_EQ(data->Info().num_col_, DataIterForTest::kCols); + ASSERT_EQ(data->Info().num_row_, kRows); +} + +} // namespace xgboost diff --git a/tests/cpp/data/test_simple_dmatrix.cc b/tests/cpp/data/test_simple_dmatrix.cc index b5295bd3a62b..b792cc9e3bfa 100644 --- a/tests/cpp/data/test_simple_dmatrix.cc +++ b/tests/cpp/data/test_simple_dmatrix.cc @@ -5,6 +5,7 @@ #include "../../../src/data/adapter.h" #include "../helpers.h" +#include "xgboost/base.h" using namespace xgboost; // NOLINT @@ -185,10 +186,8 @@ TEST(SimpleDMatrix, FromFile) { CreateBigTestData(filename, 3 * 5); std::unique_ptr> parser( dmlc::Parser::Create(filename.c_str(), 0, 1, "auto")); - data::FileAdapter adapter(parser.get()); - data::SimpleDMatrix dmat(&adapter, std::numeric_limits::quiet_NaN(), - 1); - for (auto &batch : dmat.GetBatches()) { + + auto verify_batch = [](SparsePage const &batch) { EXPECT_EQ(batch.Size(), 5); EXPECT_EQ(batch.offset.HostVector(), std::vector({0, 3, 6, 9, 12, 15})); @@ -205,6 +204,16 @@ TEST(SimpleDMatrix, FromFile) { EXPECT_EQ(batch[i][2].index, 4); } } + }; + + constexpr bst_feature_t kCols = 5; + data::FileAdapter adapter(parser.get()); + data::SimpleDMatrix dmat(&adapter, std::numeric_limits::quiet_NaN(), + 1); + ASSERT_EQ(dmat.Info().num_col_, kCols); + + for (auto &batch : dmat.GetBatches()) { + verify_batch(batch); } } diff --git a/tests/cpp/data/test_sparse_page_dmatrix.cc b/tests/cpp/data/test_sparse_page_dmatrix.cc index 356b2fba4dfa..341630016a15 100644 --- a/tests/cpp/data/test_sparse_page_dmatrix.cc +++ b/tests/cpp/data/test_sparse_page_dmatrix.cc @@ -263,8 +263,10 @@ TEST(SparsePageDMatrix, FromFile) { data::FileAdapter adapter(parser.get()); dmlc::TemporaryDirectory tempdir; const std::string tmp_file = tempdir.path + "/simple.libsvm"; + data::SparsePageDMatrix dmat( &adapter, std::numeric_limits::quiet_NaN(), -1, tmp_file, 1); + ASSERT_EQ(dmat.Info().num_col_, 5); for (auto &batch : dmat.GetBatches()) { std::vector expected_offset(batch.Size() + 1);