Skip to content

Commit

Permalink
Fixes for large size clusters. (#10880)
Browse files Browse the repository at this point in the history
- Increase listener backlog.
- Check for empty kernels.
  • Loading branch information
trivialfis authored Oct 14, 2024
1 parent d9123a5 commit 347bb14
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 32 deletions.
9 changes: 3 additions & 6 deletions include/xgboost/collective/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,13 +538,10 @@ class TCPSocket {
[[nodiscard]] HandleT const &Handle() const { return handle_; }
/**
* @brief Listen to incoming requests. Should be called after bind.
*
* Both the default and minimum backlog is set to 256.
*/
[[nodiscard]] Result Listen(std::int32_t backlog = 16) {
if (listen(handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
return Success();
}
[[nodiscard]] Result Listen(std::int32_t backlog = 256);
/**
* @brief Bind socket to INADDR_ANY, return the port selected by the OS.
*/
Expand Down
9 changes: 9 additions & 0 deletions src/collective/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
#include "xgboost/collective/socket.h"

#include <algorithm> // for max
#include <array> // for array
#include <cstddef> // for size_t
#include <cstdint> // for int32_t
Expand Down Expand Up @@ -56,6 +57,14 @@ SockAddrV4 SockAddrV4::InaddrAny() { return MakeSockAddress("0.0.0.0", 0).V4();
SockAddrV6 SockAddrV6::Loopback() { return MakeSockAddress("::1", 0).V6(); }
SockAddrV6 SockAddrV6::InaddrAny() { return MakeSockAddress("::", 0).V6(); }

[[nodiscard]] Result TCPSocket::Listen(std::int32_t backlog) {
backlog = std::max(backlog, 256);
if (listen(this->handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
return Success();
}

std::size_t TCPSocket::Send(StringView str) {
CHECK(!this->IsClosed());
CHECK_LT(str.size(), std::numeric_limits<std::int32_t>::max());
Expand Down
3 changes: 2 additions & 1 deletion src/collective/tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ RabitTracker::RabitTracker(Json const& config) : Tracker{config} {
listener_ = TCPSocket::Create(addr.IsV4() ? SockDomain::kV4 : SockDomain::kV6);
return listener_.Bind(host_, &this->port_);
} << [&] {
return listener_.Listen();
CHECK_GT(this->n_workers_, 0);
return listener_.Listen(this->n_workers_);
};
SafeColl(rc);
}
Expand Down
7 changes: 0 additions & 7 deletions src/common/device_helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,6 @@ __global__ void LaunchNKernel(size_t begin, size_t end, L lambda) {
lambda(i);
}
}
template <typename L>
__global__ void LaunchNKernel(int device_idx, size_t begin, size_t end,
L lambda) {
for (auto i : GridStrideRange(begin, end)) {
lambda(i, device_idx);
}
}

/* \brief A wrapper around kernel launching syntax, used to guard against empty input.
*
Expand Down
28 changes: 15 additions & 13 deletions src/tree/gpu_hist/row_partitioner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,10 @@ void SortPositionBatch(Context const* ctx, common::Span<const PerNodeData<OpData

// Value found by experimentation
const int kItemsThread = 12;
const int grid_size = xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);

SortPositionCopyKernel<kBlockSize, OpDataT>
<<<grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()>>>(batch_info_itr, ridx, ridx_tmp,
total_rows);
std::uint32_t const kGridSize =
xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);
dh::LaunchKernel{kGridSize, kBlockSize, 0, ctx->CUDACtx()->Stream()}(
SortPositionCopyKernel<kBlockSize, OpDataT>, batch_info_itr, ridx, ridx_tmp, total_rows);
}

struct NodePositionInfo {
Expand Down Expand Up @@ -211,13 +210,14 @@ XGBOOST_DEV_INLINE int GetPositionFromSegments(std::size_t idx,
return position;
}

template <int kBlockSize, typename RowIndexT, typename OpT>
template <int kBlockSize, typename OpT>
__global__ __launch_bounds__(kBlockSize) void FinalisePositionKernel(
const common::Span<const NodePositionInfo> d_node_info, bst_idx_t base_ridx,
const common::Span<const RowIndexT> d_ridx, common::Span<bst_node_t> d_out_position, OpT op) {
common::Span<const NodePositionInfo> d_node_info, bst_idx_t base_ridx,
common::Span<const cuda_impl::RowIndexT> d_ridx, common::Span<bst_node_t> d_out_position,
OpT op) {
for (auto idx : dh::GridStrideRange<std::size_t>(0, d_ridx.size())) {
auto position = GetPositionFromSegments(idx, d_node_info.data());
RowIndexT ridx = d_ridx[idx] - base_ridx;
cuda_impl::RowIndexT ridx = d_ridx[idx] - base_ridx;
bst_node_t new_position = op(ridx, position);
d_out_position[ridx] = new_position;
}
Expand Down Expand Up @@ -377,12 +377,14 @@ class RowPartitioner {
sizeof(NodePositionInfo) * ridx_segments_.size(),
cudaMemcpyDefault, ctx->CUDACtx()->Stream()));

constexpr int kBlockSize = 512;
constexpr std::uint32_t kBlockSize = 512;
const int kItemsThread = 8;
const int grid_size = xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
const std::uint32_t grid_size =
xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
common::Span<RowIndexT const> d_ridx{ridx_.data(), ridx_.size()};
FinalisePositionKernel<kBlockSize><<<grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()>>>(
dh::ToSpan(d_node_info_storage), base_ridx, d_ridx, d_out_position, op);
dh::LaunchKernel{grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()}(
FinalisePositionKernel<kBlockSize, FinalisePositionOpT>, dh::ToSpan(d_node_info_storage),
base_ridx, d_ridx, d_out_position, op);
}
};
}; // namespace xgboost::tree
45 changes: 40 additions & 5 deletions tests/cpp/tree/gpu_hist/test_row_partitioner.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
#include "../../../../src/data/ellpack_page.cuh"
#include "../../../../src/tree/gpu_hist/expand_entry.cuh" // for GPUExpandEntry
#include "../../../../src/tree/gpu_hist/row_partitioner.cuh"
#include "../../../../src/tree/param.h" // for TrainParam
#include "../../helpers.h" // for RandomDataGenerator
#include "../../../../src/tree/param.h" // for TrainParam
#include "../../collective/test_worker.h" // for TestDistributedGlobal
#include "../../helpers.h" // for RandomDataGenerator

namespace xgboost::tree {
void TestUpdatePositionBatch() {
Expand Down Expand Up @@ -61,7 +62,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
thrust::device_vector<cuda_impl::RowIndexT> ridx_tmp(ridx_in.size());
thrust::device_vector<cuda_impl::RowIndexT> counts(segments.size());

auto op = [=] __device__(auto ridx, int split_index, int data) { return ridx % 2 == 0; };
auto op = [=] __device__(auto ridx, int split_index, int data) {
return ridx % 2 == 0;
};
std::vector<int> op_data(segments.size());
std::vector<PerNodeData<int>> h_batch_info(segments.size());
dh::TemporaryArray<PerNodeData<int>> d_batch_info(segments.size());
Expand All @@ -79,7 +82,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
dh::ToSpan(ridx_tmp), dh::ToSpan(counts), total_rows, op,
&tmp);

auto op_without_data = [=] __device__(auto ridx) { return ridx % 2 == 0; };
auto op_without_data = [=] __device__(auto ridx) {
return ridx % 2 == 0;
};
for (size_t i = 0; i < segments.size(); i++) {
auto begin = ridx.begin() + segments[i].begin;
auto end = ridx.begin() + segments[i].end;
Expand All @@ -93,7 +98,7 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
}
}

TEST(GpuHist, SortPositionBatch) {
TEST(RowPartitioner, SortPositionBatch) {
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 3}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 1}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 6}});
Expand Down Expand Up @@ -178,4 +183,34 @@ void TestExternalMemory() {
} // anonymous namespace

TEST(RowPartitioner, LeafPartitionExternalMemory) { TestExternalMemory(); }

namespace {
void TestEmptyNode(std::int32_t n_workers) {
collective::TestDistributedGlobal(n_workers, [] {
auto ctx = MakeCUDACtx(DistGpuIdx());
RowPartitioner partitioner;
bst_idx_t n_samples = (collective::GetRank() == 0) ? 0 : 1024;
bst_idx_t base_rowid = 0;
partitioner.Reset(&ctx, n_samples, base_rowid);
std::vector<RegTree::Node> splits(1);
partitioner.UpdatePositionBatch(
&ctx, {0}, {1}, {2}, splits,
[] XGBOOST_DEVICE(bst_idx_t ridx, std::int32_t /*nidx_in_batch*/, RegTree::Node) {
return ridx < 3;
});
ASSERT_EQ(partitioner.GetNumNodes(), 3);
if (collective::GetRank() == 0) {
for (std::size_t i = 0; i < 3; ++i) {
ASSERT_TRUE(partitioner.GetRows(i).empty());
}
}
ctx.CUDACtx()->Stream().Sync();
});
}
} // anonymous namespace

TEST(RowPartitioner, MGPUEmpty) {
std::int32_t n_workers = curt::AllVisibleGPUs();
TestEmptyNode(n_workers);
}
} // namespace xgboost::tree

0 comments on commit 347bb14

Please sign in to comment.