diff --git a/include/xgboost/collective/socket.h b/include/xgboost/collective/socket.h index a025edddd409..4bc285a515c5 100644 --- a/include/xgboost/collective/socket.h +++ b/include/xgboost/collective/socket.h @@ -538,13 +538,10 @@ class TCPSocket { [[nodiscard]] HandleT const &Handle() const { return handle_; } /** * @brief Listen to incoming requests. Should be called after bind. + * + * Both the default and minimum backlog is set to 256. */ - [[nodiscard]] Result Listen(std::int32_t backlog = 16) { - if (listen(handle_, backlog) != 0) { - return system::FailWithCode("Failed to listen."); - } - return Success(); - } + [[nodiscard]] Result Listen(std::int32_t backlog = 256); /** * @brief Bind socket to INADDR_ANY, return the port selected by the OS. */ diff --git a/src/collective/socket.cc b/src/collective/socket.cc index aedddbcfb670..e37648c8edd3 100644 --- a/src/collective/socket.cc +++ b/src/collective/socket.cc @@ -3,6 +3,7 @@ */ #include "xgboost/collective/socket.h" +#include // for max #include // for array #include // for size_t #include // for int32_t @@ -56,6 +57,14 @@ SockAddrV4 SockAddrV4::InaddrAny() { return MakeSockAddress("0.0.0.0", 0).V4(); SockAddrV6 SockAddrV6::Loopback() { return MakeSockAddress("::1", 0).V6(); } SockAddrV6 SockAddrV6::InaddrAny() { return MakeSockAddress("::", 0).V6(); } +[[nodiscard]] Result TCPSocket::Listen(std::int32_t backlog) { + backlog = std::max(backlog, 256); + if (listen(this->handle_, backlog) != 0) { + return system::FailWithCode("Failed to listen."); + } + return Success(); +} + std::size_t TCPSocket::Send(StringView str) { CHECK(!this->IsClosed()); CHECK_LT(str.size(), std::numeric_limits::max()); diff --git a/src/collective/tracker.cc b/src/collective/tracker.cc index b1081fe8e789..0b0431e4b860 100644 --- a/src/collective/tracker.cc +++ b/src/collective/tracker.cc @@ -123,7 +123,8 @@ RabitTracker::RabitTracker(Json const& config) : Tracker{config} { listener_ = TCPSocket::Create(addr.IsV4() ? SockDomain::kV4 : SockDomain::kV6); return listener_.Bind(host_, &this->port_); } << [&] { - return listener_.Listen(); + CHECK_GT(this->n_workers_, 0); + return listener_.Listen(this->n_workers_); }; SafeColl(rc); } diff --git a/src/common/device_helpers.cuh b/src/common/device_helpers.cuh index 1678c8786010..c774df32fada 100644 --- a/src/common/device_helpers.cuh +++ b/src/common/device_helpers.cuh @@ -203,13 +203,6 @@ __global__ void LaunchNKernel(size_t begin, size_t end, L lambda) { lambda(i); } } -template -__global__ void LaunchNKernel(int device_idx, size_t begin, size_t end, - L lambda) { - for (auto i : GridStrideRange(begin, end)) { - lambda(i, device_idx); - } -} /* \brief A wrapper around kernel launching syntax, used to guard against empty input. * diff --git a/src/tree/gpu_hist/row_partitioner.cuh b/src/tree/gpu_hist/row_partitioner.cuh index 8eb5fb7f7de5..a23a443ed92e 100644 --- a/src/tree/gpu_hist/row_partitioner.cuh +++ b/src/tree/gpu_hist/row_partitioner.cuh @@ -177,11 +177,10 @@ void SortPositionBatch(Context const* ctx, common::Span - <<CUDACtx()->Stream()>>>(batch_info_itr, ridx, ridx_tmp, - total_rows); + std::uint32_t const kGridSize = + xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread); + dh::LaunchKernel{kGridSize, kBlockSize, 0, ctx->CUDACtx()->Stream()}( + SortPositionCopyKernel, batch_info_itr, ridx, ridx_tmp, total_rows); } struct NodePositionInfo { @@ -211,13 +210,14 @@ XGBOOST_DEV_INLINE int GetPositionFromSegments(std::size_t idx, return position; } -template +template __global__ __launch_bounds__(kBlockSize) void FinalisePositionKernel( - const common::Span d_node_info, bst_idx_t base_ridx, - const common::Span d_ridx, common::Span d_out_position, OpT op) { + common::Span d_node_info, bst_idx_t base_ridx, + common::Span d_ridx, common::Span d_out_position, + OpT op) { for (auto idx : dh::GridStrideRange(0, d_ridx.size())) { auto position = GetPositionFromSegments(idx, d_node_info.data()); - RowIndexT ridx = d_ridx[idx] - base_ridx; + cuda_impl::RowIndexT ridx = d_ridx[idx] - base_ridx; bst_node_t new_position = op(ridx, position); d_out_position[ridx] = new_position; } @@ -377,12 +377,14 @@ class RowPartitioner { sizeof(NodePositionInfo) * ridx_segments_.size(), cudaMemcpyDefault, ctx->CUDACtx()->Stream())); - constexpr int kBlockSize = 512; + constexpr std::uint32_t kBlockSize = 512; const int kItemsThread = 8; - const int grid_size = xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread); + const std::uint32_t grid_size = + xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread); common::Span d_ridx{ridx_.data(), ridx_.size()}; - FinalisePositionKernel<<CUDACtx()->Stream()>>>( - dh::ToSpan(d_node_info_storage), base_ridx, d_ridx, d_out_position, op); + dh::LaunchKernel{grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()}( + FinalisePositionKernel, dh::ToSpan(d_node_info_storage), + base_ridx, d_ridx, d_out_position, op); } }; }; // namespace xgboost::tree diff --git a/tests/cpp/tree/gpu_hist/test_row_partitioner.cu b/tests/cpp/tree/gpu_hist/test_row_partitioner.cu index 76d3c7d07692..04d26cda8e26 100644 --- a/tests/cpp/tree/gpu_hist/test_row_partitioner.cu +++ b/tests/cpp/tree/gpu_hist/test_row_partitioner.cu @@ -16,8 +16,9 @@ #include "../../../../src/data/ellpack_page.cuh" #include "../../../../src/tree/gpu_hist/expand_entry.cuh" // for GPUExpandEntry #include "../../../../src/tree/gpu_hist/row_partitioner.cuh" -#include "../../../../src/tree/param.h" // for TrainParam -#include "../../helpers.h" // for RandomDataGenerator +#include "../../../../src/tree/param.h" // for TrainParam +#include "../../collective/test_worker.h" // for TestDistributedGlobal +#include "../../helpers.h" // for RandomDataGenerator namespace xgboost::tree { void TestUpdatePositionBatch() { @@ -61,7 +62,9 @@ void TestSortPositionBatch(const std::vector& ridx_in, const std::vector ridx_tmp(ridx_in.size()); thrust::device_vector counts(segments.size()); - auto op = [=] __device__(auto ridx, int split_index, int data) { return ridx % 2 == 0; }; + auto op = [=] __device__(auto ridx, int split_index, int data) { + return ridx % 2 == 0; + }; std::vector op_data(segments.size()); std::vector> h_batch_info(segments.size()); dh::TemporaryArray> d_batch_info(segments.size()); @@ -79,7 +82,9 @@ void TestSortPositionBatch(const std::vector& ridx_in, const std::vector& ridx_in, const std::vector splits(1); + partitioner.UpdatePositionBatch( + &ctx, {0}, {1}, {2}, splits, + [] XGBOOST_DEVICE(bst_idx_t ridx, std::int32_t /*nidx_in_batch*/, RegTree::Node) { + return ridx < 3; + }); + ASSERT_EQ(partitioner.GetNumNodes(), 3); + if (collective::GetRank() == 0) { + for (std::size_t i = 0; i < 3; ++i) { + ASSERT_TRUE(partitioner.GetRows(i).empty()); + } + } + ctx.CUDACtx()->Stream().Sync(); + }); +} +} // anonymous namespace + +TEST(RowPartitioner, MGPUEmpty) { + std::int32_t n_workers = curt::AllVisibleGPUs(); + TestEmptyNode(n_workers); +} } // namespace xgboost::tree