diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 20e91a5d93f6..133e151e5e4f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -174,7 +174,7 @@ jobs: - uses: actions/checkout@e2f20e631ae6d7dd3b768f56a5d2af784dd54791 # v2.5.0 with: submodules: 'true' - - uses: actions/setup-python@7f80679172b057fc5e90d70d197929d454754a5a # v4.3.0 + - uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0 with: python-version: "3.8" architecture: 'x64' diff --git a/.github/workflows/python_tests.yml b/.github/workflows/python_tests.yml index 0fca76673962..3fbcc7a01acf 100644 --- a/.github/workflows/python_tests.yml +++ b/.github/workflows/python_tests.yml @@ -310,7 +310,7 @@ jobs: submodules: 'true' - name: Set up Python 3.8 - uses: actions/setup-python@v4 + uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0 with: python-version: 3.8 diff --git a/.github/workflows/python_wheels.yml b/.github/workflows/python_wheels.yml index f46b772950c9..12ae8a244e4f 100644 --- a/.github/workflows/python_wheels.yml +++ b/.github/workflows/python_wheels.yml @@ -21,7 +21,7 @@ jobs: with: submodules: 'true' - name: Setup Python - uses: actions/setup-python@7f80679172b057fc5e90d70d197929d454754a5a # v4.3.0 + uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0 with: python-version: "3.8" - name: Build wheels diff --git a/.github/workflows/r_tests.yml b/.github/workflows/r_tests.yml index d004ab15ca15..ad6853281c1f 100644 --- a/.github/workflows/r_tests.yml +++ b/.github/workflows/r_tests.yml @@ -74,7 +74,7 @@ jobs: key: ${{ runner.os }}-r-${{ matrix.config.r }}-6-${{ hashFiles('R-package/DESCRIPTION') }} restore-keys: ${{ runner.os }}-r-${{ matrix.config.r }}-6-${{ hashFiles('R-package/DESCRIPTION') }} - - uses: actions/setup-python@7f80679172b057fc5e90d70d197929d454754a5a # v4.3.0 + - uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0 with: python-version: "3.8" architecture: 'x64' diff --git a/R-package/src/Makevars.in b/R-package/src/Makevars.in index dd13983f5b59..0f4b3ac6f6a7 100644 --- a/R-package/src/Makevars.in +++ b/R-package/src/Makevars.in @@ -104,6 +104,7 @@ OBJECTS= \ $(PKGROOT)/src/collective/broadcast.o \ $(PKGROOT)/src/collective/comm.o \ $(PKGROOT)/src/collective/coll.o \ + $(PKGROOT)/src/collective/communicator-inl.o \ $(PKGROOT)/src/collective/tracker.o \ $(PKGROOT)/src/collective/communicator.o \ $(PKGROOT)/src/collective/in_memory_communicator.o \ diff --git a/R-package/src/Makevars.win b/R-package/src/Makevars.win index 46a862711dc6..0c2084de940c 100644 --- a/R-package/src/Makevars.win +++ b/R-package/src/Makevars.win @@ -104,6 +104,7 @@ OBJECTS= \ $(PKGROOT)/src/collective/broadcast.o \ $(PKGROOT)/src/collective/comm.o \ $(PKGROOT)/src/collective/coll.o \ + $(PKGROOT)/src/collective/communicator-inl.o \ $(PKGROOT)/src/collective/tracker.o \ $(PKGROOT)/src/collective/communicator.o \ $(PKGROOT)/src/collective/in_memory_communicator.o \ diff --git a/src/collective/communicator-inl.cc b/src/collective/communicator-inl.cc new file mode 100644 index 000000000000..4164855f1cef --- /dev/null +++ b/src/collective/communicator-inl.cc @@ -0,0 +1,34 @@ +/** + * Copyright 2024, XGBoost contributors + */ +#include "communicator-inl.h" + +namespace xgboost::collective { +[[nodiscard]] std::vector> VectorAllgatherV( + std::vector> const &input) { + auto n_inputs = input.size(); + std::vector sizes(n_inputs); + std::transform(input.cbegin(), input.cend(), sizes.begin(), + [](auto const &vec) { return vec.size(); }); + + std::vector global_sizes = AllgatherV(sizes); + std::vector offset(global_sizes.size() + 1); + offset[0] = 0; + for (std::size_t i = 1; i < offset.size(); i++) { + offset[i] = offset[i - 1] + global_sizes[i - 1]; + } + + std::vector collected; + for (auto const &vec : input) { + collected.insert(collected.end(), vec.cbegin(), vec.cend()); + } + auto out = AllgatherV(collected); + + std::vector> result; + for (std::size_t i = 1; i < offset.size(); ++i) { + std::vector local(out.cbegin() + offset[i - 1], out.cbegin() + offset[i]); + result.emplace_back(std::move(local)); + } + return result; +} +} // namespace xgboost::collective diff --git a/src/collective/communicator-inl.h b/src/collective/communicator-inl.h index 34212def2377..991e19f2c65a 100644 --- a/src/collective/communicator-inl.h +++ b/src/collective/communicator-inl.h @@ -1,5 +1,5 @@ /** - * Copyright 2022-2023 by XGBoost contributors + * Copyright 2022-2024, XGBoost contributors */ #pragma once #include @@ -192,6 +192,18 @@ inline std::vector AllgatherV(std::vector const &input) { return result; } +/** + * @brief Gathers variable-length data from all processes and distributes it to all processes. + * + * @param inputs All the inputs from the local worker. The number of inputs can vary + * across different workers. Along with which, the size of each vector in + * the input can also vary. + * + * @return The AllgatherV result, containing vectors from all workers. + */ +[[nodiscard]] std::vector> VectorAllgatherV( + std::vector> const &input); + /** * @brief Gathers variable-length strings from all processes and distributes them to all processes. * @param input Variable-length list of variable-length strings. @@ -294,38 +306,5 @@ template inline void Allreduce(double *send_receive_buffer, size_t count) { Communicator::Get()->AllReduce(send_receive_buffer, count, DataType::kDouble, op); } - -template -struct SpecialAllgatherVResult { - std::vector offsets; - std::vector sizes; - std::vector result; -}; - -/** - * @brief Gathers variable-length data from all processes and distributes it to all processes. - * - * We assume each worker has the same number of inputs, but each input may be of a different size. - * - * @param inputs All the inputs from the local worker. - * @param sizes Sizes of each input. - */ -template -inline SpecialAllgatherVResult SpecialAllgatherV(std::vector const &inputs, - std::vector const &sizes) { - // Gather the sizes across all workers. - auto const all_sizes = Allgather(sizes); - - // Calculate input offsets (std::exclusive_scan). - std::vector offsets(all_sizes.size()); - for (std::size_t i = 1; i < offsets.size(); i++) { - offsets[i] = offsets[i - 1] + all_sizes[i - 1]; - } - - // Gather all the inputs. - auto const all_inputs = AllgatherV(inputs); - - return {offsets, all_sizes, all_inputs}; -} } // namespace collective } // namespace xgboost diff --git a/src/tree/hist/evaluate_splits.h b/src/tree/hist/evaluate_splits.h index bc534d351f17..d25a41cb0b3c 100644 --- a/src/tree/hist/evaluate_splits.h +++ b/src/tree/hist/evaluate_splits.h @@ -1,5 +1,5 @@ /** - * Copyright 2021-2023 by XGBoost Contributors + * Copyright 2021-2024, XGBoost Contributors */ #ifndef XGBOOST_TREE_HIST_EVALUATE_SPLITS_H_ #define XGBOOST_TREE_HIST_EVALUATE_SPLITS_H_ @@ -26,6 +26,47 @@ #include "xgboost/linalg.h" // for Constants, Vector namespace xgboost::tree { +/** + * @brief Gather the expand entries from all the workers. + * @param entries Local expand entries on this worker. + * @return Global expand entries gathered from all workers. + */ +template +std::enable_if_t || + std::is_same_v, + std::vector> +AllgatherColumnSplit(std::vector const &entries) { + auto const n_entries = entries.size(); + + // First, gather all the primitive fields. + std::vector local_entries(n_entries); + + // Collect and serialize all entries + std::vector> serialized_entries; + for (std::size_t i = 0; i < n_entries; ++i) { + Json jentry{Object{}}; + entries[i].Save(&jentry); + + std::vector out; + Json::Dump(jentry, &out, std::ios::binary); + + serialized_entries.emplace_back(std::move(out)); + } + auto all_serialized = collective::VectorAllgatherV(serialized_entries); + CHECK_GE(all_serialized.size(), local_entries.size()); + + std::vector all_entries(all_serialized.size()); + std::transform(all_serialized.cbegin(), all_serialized.cend(), all_entries.begin(), + [](std::vector const &e) { + ExpandEntry entry; + auto je = Json::Load(StringView{e.data(), e.size()}, std::ios::binary); + entry.Load(je); + return entry; + }); + + return all_entries; +} + class HistEvaluator { private: struct NodeEntry { @@ -36,8 +77,8 @@ class HistEvaluator { }; private: - Context const* ctx_; - TrainParam const* param_; + Context const *ctx_; + TrainParam const *param_; std::shared_ptr column_sampler_; TreeEvaluator tree_evaluator_; bool is_col_split_{false}; @@ -202,7 +243,7 @@ class HistEvaluator { common::CatBitField cat_bits{best.cat_bits}; bst_bin_t partition = d_step == 1 ? (best_thresh - it_begin + 1) : (best_thresh - f_begin); CHECK_GT(partition, 0); - std::for_each(sorted_idx.begin(), sorted_idx.begin() + partition, [&](size_t c) { + std::for_each(sorted_idx.begin(), sorted_idx.begin() + partition, [&](std::size_t c) { auto cat = cut_val[c + f_begin]; cat_bits.Set(cat); }); @@ -285,57 +326,23 @@ class HistEvaluator { return left_sum; } - /** - * @brief Gather the expand entries from all the workers. - * @param entries Local expand entries on this worker. - * @return Global expand entries gathered from all workers. - */ - std::vector Allgather(std::vector const &entries) { - auto const world = collective::GetWorldSize(); - auto const num_entries = entries.size(); - - // First, gather all the primitive fields. - std::vector local_entries(num_entries); - std::vector cat_bits; - std::vector cat_bits_sizes; - for (std::size_t i = 0; i < num_entries; i++) { - local_entries[i].CopyAndCollect(entries[i], &cat_bits, &cat_bits_sizes); - } - auto all_entries = collective::Allgather(local_entries); - - // Gather all the cat_bits. - auto gathered = collective::SpecialAllgatherV(cat_bits, cat_bits_sizes); - - common::ParallelFor(num_entries * world, ctx_->Threads(), [&] (auto i) { - // Copy the cat_bits back into all expand entries. - all_entries[i].split.cat_bits.resize(gathered.sizes[i]); - std::copy_n(gathered.result.cbegin() + gathered.offsets[i], gathered.sizes[i], - all_entries[i].split.cat_bits.begin()); - }); - - return all_entries; - } - public: void EvaluateSplits(const BoundedHistCollection &hist, common::HistogramCuts const &cut, common::Span feature_types, const RegTree &tree, std::vector *p_entries) { auto n_threads = ctx_->Threads(); - auto& entries = *p_entries; + auto &entries = *p_entries; // All nodes are on the same level, so we can store the shared ptr. - std::vector>> features( - entries.size()); + std::vector>> features(entries.size()); for (size_t nidx_in_set = 0; nidx_in_set < entries.size(); ++nidx_in_set) { auto nidx = entries[nidx_in_set].nid; - features[nidx_in_set] = - column_sampler_->GetFeatureSet(tree.GetDepth(nidx)); + features[nidx_in_set] = column_sampler_->GetFeatureSet(tree.GetDepth(nidx)); } CHECK(!features.empty()); - const size_t grain_size = - std::max(1, features.front()->Size() / n_threads); - common::BlockedSpace2d space(entries.size(), [&](size_t nidx_in_set) { - return features[nidx_in_set]->Size(); - }, grain_size); + const size_t grain_size = std::max(1, features.front()->Size() / n_threads); + common::BlockedSpace2d space( + entries.size(), [&](size_t nidx_in_set) { return features[nidx_in_set]->Size(); }, + grain_size); std::vector tloc_candidates(n_threads * entries.size()); for (size_t i = 0; i < entries.size(); ++i) { @@ -344,7 +351,7 @@ class HistEvaluator { } } auto evaluator = tree_evaluator_.GetEvaluator(); - auto const& cut_ptrs = cut.Ptrs(); + auto const &cut_ptrs = cut.Ptrs(); common::ParallelFor2d(space, n_threads, [&](size_t nidx_in_set, common::Range1d r) { auto tidx = omp_get_thread_num(); @@ -385,18 +392,16 @@ class HistEvaluator { } }); - for (unsigned nidx_in_set = 0; nidx_in_set < entries.size(); - ++nidx_in_set) { + for (unsigned nidx_in_set = 0; nidx_in_set < entries.size(); ++nidx_in_set) { for (auto tidx = 0; tidx < n_threads; ++tidx) { - entries[nidx_in_set].split.Update( - tloc_candidates[n_threads * nidx_in_set + tidx].split); + entries[nidx_in_set].split.Update(tloc_candidates[n_threads * nidx_in_set + tidx].split); } } if (is_col_split_) { // With column-wise data split, we gather the best splits from all the workers and update the // expand entries accordingly. - auto all_entries = Allgather(entries); + auto all_entries = AllgatherColumnSplit(entries); for (auto worker = 0; worker < collective::GetWorldSize(); ++worker) { for (std::size_t nidx_in_set = 0; nidx_in_set < entries.size(); ++nidx_in_set) { entries[nidx_in_set].split.Update( @@ -407,7 +412,7 @@ class HistEvaluator { } // Add splits to tree, handles all statistic - void ApplyTreeSplit(CPUExpandEntry const& candidate, RegTree *p_tree) { + void ApplyTreeSplit(CPUExpandEntry const &candidate, RegTree *p_tree) { auto evaluator = tree_evaluator_.GetEvaluator(); RegTree &tree = *p_tree; @@ -437,8 +442,7 @@ class HistEvaluator { auto left_child = tree[candidate.nid].LeftChild(); auto right_child = tree[candidate.nid].RightChild(); tree_evaluator_.AddSplit(candidate.nid, left_child, right_child, - tree[candidate.nid].SplitIndex(), left_weight, - right_weight); + tree[candidate.nid].SplitIndex(), left_weight, right_weight); evaluator = tree_evaluator_.GetEvaluator(); snode_.resize(tree.GetNodes().size()); @@ -449,8 +453,7 @@ class HistEvaluator { snode_.at(right_child).root_gain = evaluator.CalcGain(candidate.nid, *param_, GradStats{candidate.split.right_sum}); - interaction_constraints_.Split(candidate.nid, - tree[candidate.nid].SplitIndex(), left_child, + interaction_constraints_.Split(candidate.nid, tree[candidate.nid].SplitIndex(), left_child, right_child); } @@ -571,53 +574,6 @@ class HistMultiEvaluator { return false; } - /** - * @brief Gather the expand entries from all the workers. - * @param entries Local expand entries on this worker. - * @return Global expand entries gathered from all workers. - */ - std::vector Allgather(std::vector const &entries) { - auto const world = collective::GetWorldSize(); - auto const num_entries = entries.size(); - - // First, gather all the primitive fields. - std::vector local_entries(num_entries); - std::vector cat_bits; - std::vector cat_bits_sizes; - std::vector gradients; - for (std::size_t i = 0; i < num_entries; i++) { - local_entries[i].CopyAndCollect(entries[i], &cat_bits, &cat_bits_sizes, &gradients); - } - auto all_entries = collective::Allgather(local_entries); - - // Gather all the cat_bits. - auto gathered_cat_bits = collective::SpecialAllgatherV(cat_bits, cat_bits_sizes); - - // Gather all the gradients. - auto const num_gradients = gradients.size(); - auto const all_gradients = collective::Allgather(gradients); - - auto const total_entries = num_entries * world; - auto const gradients_per_entry = num_gradients / num_entries; - auto const gradients_per_side = gradients_per_entry / 2; - common::ParallelFor(total_entries, ctx_->Threads(), [&] (auto i) { - // Copy the cat_bits back into all expand entries. - all_entries[i].split.cat_bits.resize(gathered_cat_bits.sizes[i]); - std::copy_n(gathered_cat_bits.result.cbegin() + gathered_cat_bits.offsets[i], - gathered_cat_bits.sizes[i], all_entries[i].split.cat_bits.begin()); - - // Copy the gradients back into all expand entries. - all_entries[i].split.left_sum.resize(gradients_per_side); - std::copy_n(all_gradients.cbegin() + i * gradients_per_entry, gradients_per_side, - all_entries[i].split.left_sum.begin()); - all_entries[i].split.right_sum.resize(gradients_per_side); - std::copy_n(all_gradients.cbegin() + i * gradients_per_entry + gradients_per_side, - gradients_per_side, all_entries[i].split.right_sum.begin()); - }); - - return all_entries; - } - public: void EvaluateSplits(RegTree const &tree, common::Span hist, common::HistogramCuts const &cut, std::vector *p_entries) { @@ -676,7 +632,7 @@ class HistMultiEvaluator { if (is_col_split_) { // With column-wise data split, we gather the best splits from all the workers and update the // expand entries accordingly. - auto all_entries = Allgather(entries); + auto all_entries = AllgatherColumnSplit(entries); for (auto worker = 0; worker < collective::GetWorldSize(); ++worker) { for (std::size_t nidx_in_set = 0; nidx_in_set < entries.size(); ++nidx_in_set) { entries[nidx_in_set].split.Update( diff --git a/src/tree/hist/expand_entry.h b/src/tree/hist/expand_entry.h index d6315877d1e0..fd16397e1193 100644 --- a/src/tree/hist/expand_entry.h +++ b/src/tree/hist/expand_entry.h @@ -90,7 +90,6 @@ struct ExpandEntryImpl { } self->split.is_cat = get(split["is_cat"]); - self->LoadGrad(split); } }; @@ -106,8 +105,8 @@ struct CPUExpandEntry : public ExpandEntryImpl { void SaveGrad(Json* p_out) const { auto& out = *p_out; auto save = [&](std::string const& name, GradStats const& sum) { - out[name] = F32Array{2}; - auto& array = get(out[name]); + out[name] = F64Array{2}; + auto& array = get(out[name]); array[0] = sum.GetGrad(); array[1] = sum.GetHess(); }; @@ -115,9 +114,9 @@ struct CPUExpandEntry : public ExpandEntryImpl { save("right_sum", this->split.right_sum); } void LoadGrad(Json const& in) { - auto const& left_sum = get(in["left_sum"]); + auto const& left_sum = get(in["left_sum"]); this->split.left_sum = GradStats{left_sum[0], left_sum[1]}; - auto const& right_sum = get(in["right_sum"]); + auto const& right_sum = get(in["right_sum"]); this->split.right_sum = GradStats{right_sum[0], right_sum[1]}; } @@ -173,8 +172,8 @@ struct MultiExpandEntry : public ExpandEntryImpl { void SaveGrad(Json* p_out) const { auto& out = *p_out; auto save = [&](std::string const& name, std::vector const& sum) { - out[name] = F32Array{sum.size() * 2}; - auto& array = get(out[name]); + out[name] = F64Array{sum.size() * 2}; + auto& array = get(out[name]); for (std::size_t i = 0, j = 0; i < sum.size(); i++, j += 2) { array[j] = sum[i].GetGrad(); array[j + 1] = sum[i].GetHess(); @@ -185,7 +184,7 @@ struct MultiExpandEntry : public ExpandEntryImpl { } void LoadGrad(Json const& in) { auto load = [&](std::string const& name, std::vector* p_sum) { - auto const& array = get(in[name]); + auto const& array = get(in[name]); auto& sum = *p_sum; sum.resize(array.size() / 2); for (std::size_t i = 0, j = 0; i < sum.size(); ++i, j += 2) { diff --git a/src/tree/updater_quantile_hist.cc b/src/tree/updater_quantile_hist.cc index c2aaedafac95..cd60e6602cf7 100644 --- a/src/tree/updater_quantile_hist.cc +++ b/src/tree/updater_quantile_hist.cc @@ -1,5 +1,5 @@ /** - * Copyright 2017-2023, XGBoost Contributors + * Copyright 2017-2024, XGBoost Contributors * \file updater_quantile_hist.cc * \brief use quantized feature values to construct a tree * \author Philip Cho, Tianqi Checn, Egor Smirnov @@ -149,9 +149,6 @@ class MultiTargetHistBuilder { } void InitData(DMatrix *p_fmat, RegTree const *p_tree) { - if (collective::IsDistributed()) { - LOG(FATAL) << "Distributed training for vector-leaf is not yet supported."; - } monitor_->Start(__func__); p_last_fmat_ = p_fmat; diff --git a/tests/cpp/collective/test_rabit_communicator.cc b/tests/cpp/collective/test_rabit_communicator.cc index ba22d8fdb84f..9711e1aede71 100644 --- a/tests/cpp/collective/test_rabit_communicator.cc +++ b/tests/cpp/collective/test_rabit_communicator.cc @@ -1,13 +1,12 @@ -/*! - * Copyright 2022 XGBoost contributors +/** + * Copyright 2022-2024, XGBoost contributors */ #include #include "../../../src/collective/rabit_communicator.h" +#include "../helpers.h" -namespace xgboost { -namespace collective { - +namespace xgboost::collective { TEST(RabitCommunicatorSimpleTest, ThrowOnWorldSizeTooSmall) { auto construct = []() { RabitCommunicator comm{0, 0}; }; EXPECT_THROW(construct(), dmlc::Error); @@ -35,5 +34,37 @@ TEST(RabitCommunicatorSimpleTest, IsNotDistributed) { EXPECT_FALSE(comm.IsDistributed()); } -} // namespace collective -} // namespace xgboost +namespace { +void VerifyVectorAllgatherV() { + auto n_workers = collective::GetWorldSize(); + ASSERT_EQ(n_workers, 3); + auto rank = collective::GetRank(); + // Construct input that has different length for each worker. + std::vector> inputs; + for (std::int32_t i = 0; i < rank + 1; ++i) { + std::vector in; + for (std::int32_t j = 0; j < rank + 1; ++j) { + in.push_back(static_cast(j)); + } + inputs.emplace_back(std::move(in)); + } + + auto outputs = VectorAllgatherV(inputs); + + ASSERT_EQ(outputs.size(), (1 + n_workers) * n_workers / 2); + auto const& res = outputs; + + for (std::int32_t i = 0; i < n_workers; ++i) { + std::int32_t k = 0; + for (auto v : res[i]) { + ASSERT_EQ(v, k++); + } + } +} +} // namespace + +TEST(VectorAllgatherV, Basic) { + std::int32_t n_workers{3}; + RunWithInMemoryCommunicator(n_workers, VerifyVectorAllgatherV); +} +} // namespace xgboost::collective diff --git a/tests/cpp/common/test_json.cc b/tests/cpp/common/test_json.cc index 72163efd78cc..3ee041a339ed 100644 --- a/tests/cpp/common/test_json.cc +++ b/tests/cpp/common/test_json.cc @@ -1,5 +1,5 @@ /** - * Copyright 2019-2023, XGBoost Contributors + * Copyright 2019-2024, XGBoost Contributors */ #include diff --git a/tests/cpp/tree/test_quantile_hist.cc b/tests/cpp/tree/test_quantile_hist.cc index cf806536a861..4021c9959440 100644 --- a/tests/cpp/tree/test_quantile_hist.cc +++ b/tests/cpp/tree/test_quantile_hist.cc @@ -1,5 +1,5 @@ /** - * Copyright 2018-2023 by XGBoost Contributors + * Copyright 2018-2024, XGBoost Contributors */ #include #include @@ -18,7 +18,6 @@ #include "xgboost/data.h" namespace xgboost::tree { - namespace { template void TestPartitioner(bst_target_t n_targets) { @@ -253,5 +252,5 @@ void TestColumnSplit(bst_target_t n_targets) { TEST(QuantileHist, ColumnSplit) { TestColumnSplit(1); } -TEST(QuantileHist, DISABLED_ColumnSplitMultiTarget) { TestColumnSplit(3); } +TEST(QuantileHist, ColumnSplitMultiTarget) { TestColumnSplit(3); } } // namespace xgboost::tree