From a27cd7c59b39f0032366f0c7dcfa04eeab472efe Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Mon, 23 Sep 2024 21:53:29 +0800 Subject: [PATCH 1/7] [coll] Increase listener backlog. Debug GCP. log. don't clean. Disable log. Logs. More. Less. check grid size. work on test. finalise. Log early. Type. build. Log. Remove. Cleanup. --- include/xgboost/collective/socket.h | 2 +- src/collective/comm.cc | 3 +- src/common/device_helpers.cuh | 7 --- src/data/iterative_dmatrix.cc | 9 ++++ src/tree/gpu_hist/row_partitioner.cuh | 28 ++++++------ .../cpp/tree/gpu_hist/test_row_partitioner.cu | 45 ++++++++++++++++--- 6 files changed, 67 insertions(+), 27 deletions(-) diff --git a/include/xgboost/collective/socket.h b/include/xgboost/collective/socket.h index a025edddd409..0b2d3daae060 100644 --- a/include/xgboost/collective/socket.h +++ b/include/xgboost/collective/socket.h @@ -539,7 +539,7 @@ class TCPSocket { /** * @brief Listen to incoming requests. Should be called after bind. */ - [[nodiscard]] Result Listen(std::int32_t backlog = 16) { + [[nodiscard]] Result Listen(std::int32_t backlog = 256) { if (listen(handle_, backlog) != 0) { return system::FailWithCode("Failed to listen."); } diff --git a/src/collective/comm.cc b/src/collective/comm.cc index 32631442b88f..2c7d507c9f8a 100644 --- a/src/collective/comm.cc +++ b/src/collective/comm.cc @@ -197,7 +197,8 @@ std::string InitLog(std::string task_id, std::int32_t rank) { if (task_id.empty()) { return "Rank " + std::to_string(rank); } - return "Task " + task_id + " got rank " + std::to_string(rank); + return "Task " + task_id + " pid:" + std::to_string(getpid()) + " got rank " + + std::to_string(rank); } } // namespace diff --git a/src/common/device_helpers.cuh b/src/common/device_helpers.cuh index 1678c8786010..c774df32fada 100644 --- a/src/common/device_helpers.cuh +++ b/src/common/device_helpers.cuh @@ -203,13 +203,6 @@ __global__ void LaunchNKernel(size_t begin, size_t end, L lambda) { lambda(i); } } -template -__global__ void LaunchNKernel(int device_idx, size_t begin, size_t end, - L lambda) { - for (auto i : GridStrideRange(begin, end)) { - lambda(i, device_idx); - } -} /* \brief A wrapper around kernel launching syntax, used to guard against empty input. * diff --git a/src/data/iterative_dmatrix.cc b/src/data/iterative_dmatrix.cc index c9830fa093e8..33826fdab693 100644 --- a/src/data/iterative_dmatrix.cc +++ b/src/data/iterative_dmatrix.cc @@ -27,6 +27,10 @@ IterativeDMatrix::IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle pro XGDMatrixCallbackNext* next, float missing, int nthread, bst_bin_t max_bin) : proxy_{proxy}, reset_{reset}, next_{next} { + common::Monitor monitor; + monitor.Init("Iterator-Ctor"); + + monitor.Start(__func__); // fetch the first batch auto iter = DataIterProxy{iter_handle, reset_, next_}; @@ -42,9 +46,13 @@ IterativeDMatrix::IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle pro BatchParam p{max_bin, tree::TrainParam::DftSparseThreshold()}; if (ctx.IsCUDA()) { + monitor.Start("InitCUDA"); this->InitFromCUDA(&ctx, p, iter_handle, missing, ref); + monitor.Stop("InitCUDA"); } else { + monitor.Start("InitCPU"); this->InitFromCPU(&ctx, p, iter_handle, missing, ref); + monitor.Stop("InitCPU"); } this->fmat_ctx_ = ctx; @@ -52,6 +60,7 @@ IterativeDMatrix::IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle pro LOG(INFO) << "Finished constructing the `IterativeDMatrix`: (" << this->Info().num_row_ << ", " << this->Info().num_col_ << ", " << this->Info().num_nonzero_ << ")."; + monitor.Stop(__func__); } void IterativeDMatrix::InitFromCPU(Context const* ctx, BatchParam const& p, diff --git a/src/tree/gpu_hist/row_partitioner.cuh b/src/tree/gpu_hist/row_partitioner.cuh index 8eb5fb7f7de5..a23a443ed92e 100644 --- a/src/tree/gpu_hist/row_partitioner.cuh +++ b/src/tree/gpu_hist/row_partitioner.cuh @@ -177,11 +177,10 @@ void SortPositionBatch(Context const* ctx, common::Span - <<CUDACtx()->Stream()>>>(batch_info_itr, ridx, ridx_tmp, - total_rows); + std::uint32_t const kGridSize = + xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread); + dh::LaunchKernel{kGridSize, kBlockSize, 0, ctx->CUDACtx()->Stream()}( + SortPositionCopyKernel, batch_info_itr, ridx, ridx_tmp, total_rows); } struct NodePositionInfo { @@ -211,13 +210,14 @@ XGBOOST_DEV_INLINE int GetPositionFromSegments(std::size_t idx, return position; } -template +template __global__ __launch_bounds__(kBlockSize) void FinalisePositionKernel( - const common::Span d_node_info, bst_idx_t base_ridx, - const common::Span d_ridx, common::Span d_out_position, OpT op) { + common::Span d_node_info, bst_idx_t base_ridx, + common::Span d_ridx, common::Span d_out_position, + OpT op) { for (auto idx : dh::GridStrideRange(0, d_ridx.size())) { auto position = GetPositionFromSegments(idx, d_node_info.data()); - RowIndexT ridx = d_ridx[idx] - base_ridx; + cuda_impl::RowIndexT ridx = d_ridx[idx] - base_ridx; bst_node_t new_position = op(ridx, position); d_out_position[ridx] = new_position; } @@ -377,12 +377,14 @@ class RowPartitioner { sizeof(NodePositionInfo) * ridx_segments_.size(), cudaMemcpyDefault, ctx->CUDACtx()->Stream())); - constexpr int kBlockSize = 512; + constexpr std::uint32_t kBlockSize = 512; const int kItemsThread = 8; - const int grid_size = xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread); + const std::uint32_t grid_size = + xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread); common::Span d_ridx{ridx_.data(), ridx_.size()}; - FinalisePositionKernel<<CUDACtx()->Stream()>>>( - dh::ToSpan(d_node_info_storage), base_ridx, d_ridx, d_out_position, op); + dh::LaunchKernel{grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()}( + FinalisePositionKernel, dh::ToSpan(d_node_info_storage), + base_ridx, d_ridx, d_out_position, op); } }; }; // namespace xgboost::tree diff --git a/tests/cpp/tree/gpu_hist/test_row_partitioner.cu b/tests/cpp/tree/gpu_hist/test_row_partitioner.cu index 76d3c7d07692..ce677be7d0a2 100644 --- a/tests/cpp/tree/gpu_hist/test_row_partitioner.cu +++ b/tests/cpp/tree/gpu_hist/test_row_partitioner.cu @@ -16,8 +16,9 @@ #include "../../../../src/data/ellpack_page.cuh" #include "../../../../src/tree/gpu_hist/expand_entry.cuh" // for GPUExpandEntry #include "../../../../src/tree/gpu_hist/row_partitioner.cuh" -#include "../../../../src/tree/param.h" // for TrainParam -#include "../../helpers.h" // for RandomDataGenerator +#include "../../../../src/tree/param.h" // for TrainParam +#include "../../collective/test_worker.h" // for TestDistributedGlobal +#include "../../helpers.h" // for RandomDataGenerator namespace xgboost::tree { void TestUpdatePositionBatch() { @@ -61,7 +62,9 @@ void TestSortPositionBatch(const std::vector& ridx_in, const std::vector ridx_tmp(ridx_in.size()); thrust::device_vector counts(segments.size()); - auto op = [=] __device__(auto ridx, int split_index, int data) { return ridx % 2 == 0; }; + auto op = [=] __device__(auto ridx, int split_index, int data) { + return ridx % 2 == 0; + }; std::vector op_data(segments.size()); std::vector> h_batch_info(segments.size()); dh::TemporaryArray> d_batch_info(segments.size()); @@ -79,7 +82,9 @@ void TestSortPositionBatch(const std::vector& ridx_in, const std::vector& ridx_in, const std::vector splits(1); + partitioner.UpdatePositionBatch( + &ctx, {0}, {1}, {2}, splits, + [] XGBOOST_DEVICE(bst_idx_t ridx, std::int32_t /*nidx_in_batch*/, RegTree::Node) { + return ridx < 3; + }); + ASSERT_EQ(partitioner.GetNumNodes(), 3); + if (collective::GetRank() == 0) { + for (std::size_t i = 0; i < 3; ++i) { + ASSERT_TRUE(partitioner.GetRows(i).empty()); + } + } + ctx.CUDACtx()->Stream().Sync(); + }); +} +} // anonymous namespace + +TEST(RowPartitioner, MGPUEmpty) { + std::int32_t n_workers = curt::AllVisibleGPUs(); + TestEmptyNode(n_workers); +} } // namespace xgboost::tree From b516ad00249cf534255b8ee5bb61840f2b666c40 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Fri, 11 Oct 2024 17:02:19 +0800 Subject: [PATCH 2/7] Cleanup. --- src/collective/comm.cc | 3 +-- src/data/iterative_dmatrix.cc | 9 --------- tests/cpp/tree/gpu_hist/test_row_partitioner.cu | 2 +- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/collective/comm.cc b/src/collective/comm.cc index 2c7d507c9f8a..32631442b88f 100644 --- a/src/collective/comm.cc +++ b/src/collective/comm.cc @@ -197,8 +197,7 @@ std::string InitLog(std::string task_id, std::int32_t rank) { if (task_id.empty()) { return "Rank " + std::to_string(rank); } - return "Task " + task_id + " pid:" + std::to_string(getpid()) + " got rank " + - std::to_string(rank); + return "Task " + task_id + " got rank " + std::to_string(rank); } } // namespace diff --git a/src/data/iterative_dmatrix.cc b/src/data/iterative_dmatrix.cc index 33826fdab693..c9830fa093e8 100644 --- a/src/data/iterative_dmatrix.cc +++ b/src/data/iterative_dmatrix.cc @@ -27,10 +27,6 @@ IterativeDMatrix::IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle pro XGDMatrixCallbackNext* next, float missing, int nthread, bst_bin_t max_bin) : proxy_{proxy}, reset_{reset}, next_{next} { - common::Monitor monitor; - monitor.Init("Iterator-Ctor"); - - monitor.Start(__func__); // fetch the first batch auto iter = DataIterProxy{iter_handle, reset_, next_}; @@ -46,13 +42,9 @@ IterativeDMatrix::IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle pro BatchParam p{max_bin, tree::TrainParam::DftSparseThreshold()}; if (ctx.IsCUDA()) { - monitor.Start("InitCUDA"); this->InitFromCUDA(&ctx, p, iter_handle, missing, ref); - monitor.Stop("InitCUDA"); } else { - monitor.Start("InitCPU"); this->InitFromCPU(&ctx, p, iter_handle, missing, ref); - monitor.Stop("InitCPU"); } this->fmat_ctx_ = ctx; @@ -60,7 +52,6 @@ IterativeDMatrix::IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle pro LOG(INFO) << "Finished constructing the `IterativeDMatrix`: (" << this->Info().num_row_ << ", " << this->Info().num_col_ << ", " << this->Info().num_nonzero_ << ")."; - monitor.Stop(__func__); } void IterativeDMatrix::InitFromCPU(Context const* ctx, BatchParam const& p, diff --git a/tests/cpp/tree/gpu_hist/test_row_partitioner.cu b/tests/cpp/tree/gpu_hist/test_row_partitioner.cu index ce677be7d0a2..04d26cda8e26 100644 --- a/tests/cpp/tree/gpu_hist/test_row_partitioner.cu +++ b/tests/cpp/tree/gpu_hist/test_row_partitioner.cu @@ -191,7 +191,7 @@ void TestEmptyNode(std::int32_t n_workers) { RowPartitioner partitioner; bst_idx_t n_samples = (collective::GetRank() == 0) ? 0 : 1024; bst_idx_t base_rowid = 0; - partitioner.Reset(&ctx, /*n_samples=*/0, base_rowid); + partitioner.Reset(&ctx, n_samples, base_rowid); std::vector splits(1); partitioner.UpdatePositionBatch( &ctx, {0}, {1}, {2}, splits, From d673a5121cfdd6cae9b53b8058819acd6e78e3dc Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Fri, 11 Oct 2024 17:06:56 +0800 Subject: [PATCH 3/7] add backlog to tracker. --- src/collective/tracker.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/collective/tracker.cc b/src/collective/tracker.cc index b1081fe8e789..0b0431e4b860 100644 --- a/src/collective/tracker.cc +++ b/src/collective/tracker.cc @@ -123,7 +123,8 @@ RabitTracker::RabitTracker(Json const& config) : Tracker{config} { listener_ = TCPSocket::Create(addr.IsV4() ? SockDomain::kV4 : SockDomain::kV6); return listener_.Bind(host_, &this->port_); } << [&] { - return listener_.Listen(); + CHECK_GT(this->n_workers_, 0); + return listener_.Listen(this->n_workers_); }; SafeColl(rc); } From 6edd4f12905cf384237ce46c79829ba2c2e84cbd Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Fri, 11 Oct 2024 17:07:42 +0800 Subject: [PATCH 4/7] minimum. --- include/xgboost/collective/socket.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/xgboost/collective/socket.h b/include/xgboost/collective/socket.h index 0b2d3daae060..88dead8f212f 100644 --- a/include/xgboost/collective/socket.h +++ b/include/xgboost/collective/socket.h @@ -540,6 +540,7 @@ class TCPSocket { * @brief Listen to incoming requests. Should be called after bind. */ [[nodiscard]] Result Listen(std::int32_t backlog = 256) { + backlog = std::max(backlog, 256); if (listen(handle_, backlog) != 0) { return system::FailWithCode("Failed to listen."); } From f067799741327a69f472f99e1aef9e56fb49fee9 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Mon, 14 Oct 2024 00:24:43 +0800 Subject: [PATCH 5/7] Move impl. --- include/xgboost/collective/socket.h | 8 +------- src/collective/socket.cc | 8 ++++++++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/include/xgboost/collective/socket.h b/include/xgboost/collective/socket.h index 88dead8f212f..7d32e13286df 100644 --- a/include/xgboost/collective/socket.h +++ b/include/xgboost/collective/socket.h @@ -539,13 +539,7 @@ class TCPSocket { /** * @brief Listen to incoming requests. Should be called after bind. */ - [[nodiscard]] Result Listen(std::int32_t backlog = 256) { - backlog = std::max(backlog, 256); - if (listen(handle_, backlog) != 0) { - return system::FailWithCode("Failed to listen."); - } - return Success(); - } + [[nodiscard]] Result Listen(std::int32_t backlog = 256); /** * @brief Bind socket to INADDR_ANY, return the port selected by the OS. */ diff --git a/src/collective/socket.cc b/src/collective/socket.cc index aedddbcfb670..e0431fa5cc70 100644 --- a/src/collective/socket.cc +++ b/src/collective/socket.cc @@ -56,6 +56,14 @@ SockAddrV4 SockAddrV4::InaddrAny() { return MakeSockAddress("0.0.0.0", 0).V4(); SockAddrV6 SockAddrV6::Loopback() { return MakeSockAddress("::1", 0).V6(); } SockAddrV6 SockAddrV6::InaddrAny() { return MakeSockAddress("::", 0).V6(); } +[[nodiscard]] Result TCPSocket::Listen(std::int32_t backlog) { + backlog = std::max(backlog, 256); + if (listen(this->handle_, backlog) != 0) { + return system::FailWithCode("Failed to listen."); + } + return Success(); +} + std::size_t TCPSocket::Send(StringView str) { CHECK(!this->IsClosed()); CHECK_LT(str.size(), std::numeric_limits::max()); From 195483021fc7e3bb4453592b20a72948ceb21e0a Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Mon, 14 Oct 2024 00:25:25 +0800 Subject: [PATCH 6/7] lint. --- src/collective/socket.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/collective/socket.cc b/src/collective/socket.cc index e0431fa5cc70..e37648c8edd3 100644 --- a/src/collective/socket.cc +++ b/src/collective/socket.cc @@ -3,6 +3,7 @@ */ #include "xgboost/collective/socket.h" +#include // for max #include // for array #include // for size_t #include // for int32_t From 9e4c76f2db9375924c72c3f9261174f1cd9cb403 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Mon, 14 Oct 2024 22:40:20 +0800 Subject: [PATCH 7/7] Doc. --- include/xgboost/collective/socket.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/xgboost/collective/socket.h b/include/xgboost/collective/socket.h index 7d32e13286df..4bc285a515c5 100644 --- a/include/xgboost/collective/socket.h +++ b/include/xgboost/collective/socket.h @@ -538,6 +538,8 @@ class TCPSocket { [[nodiscard]] HandleT const &Handle() const { return handle_; } /** * @brief Listen to incoming requests. Should be called after bind. + * + * Both the default and minimum backlog is set to 256. */ [[nodiscard]] Result Listen(std::int32_t backlog = 256); /**