Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for large size clusters. #10880

Merged
merged 7 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions include/xgboost/collective/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,13 +538,10 @@ class TCPSocket {
[[nodiscard]] HandleT const &Handle() const { return handle_; }
/**
* @brief Listen to incoming requests. Should be called after bind.
*
* Both the default and minimum backlog is set to 256.
*/
[[nodiscard]] Result Listen(std::int32_t backlog = 16) {
if (listen(handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
return Success();
}
[[nodiscard]] Result Listen(std::int32_t backlog = 256);
/**
* @brief Bind socket to INADDR_ANY, return the port selected by the OS.
*/
Expand Down
9 changes: 9 additions & 0 deletions src/collective/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
#include "xgboost/collective/socket.h"

#include <algorithm> // for max
#include <array> // for array
#include <cstddef> // for size_t
#include <cstdint> // for int32_t
Expand Down Expand Up @@ -56,6 +57,14 @@ SockAddrV4 SockAddrV4::InaddrAny() { return MakeSockAddress("0.0.0.0", 0).V4();
SockAddrV6 SockAddrV6::Loopback() { return MakeSockAddress("::1", 0).V6(); }
SockAddrV6 SockAddrV6::InaddrAny() { return MakeSockAddress("::", 0).V6(); }

[[nodiscard]] Result TCPSocket::Listen(std::int32_t backlog) {
backlog = std::max(backlog, 256);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're imposing a limit of 256, should that be added to the method docs as well?

Copy link
Member Author

@trivialfis trivialfis Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a brief mention, thank you for pointing this out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (listen(this->handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
return Success();
}

std::size_t TCPSocket::Send(StringView str) {
CHECK(!this->IsClosed());
CHECK_LT(str.size(), std::numeric_limits<std::int32_t>::max());
Expand Down
3 changes: 2 additions & 1 deletion src/collective/tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ RabitTracker::RabitTracker(Json const& config) : Tracker{config} {
listener_ = TCPSocket::Create(addr.IsV4() ? SockDomain::kV4 : SockDomain::kV6);
return listener_.Bind(host_, &this->port_);
} << [&] {
return listener_.Listen();
CHECK_GT(this->n_workers_, 0);
return listener_.Listen(this->n_workers_);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens now if this->n_workers_ > 256, will those be in the backlog and processed only after some of the first 256 listeners terminate?

Copy link
Member Author

@trivialfis trivialfis Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the listener should put them in the backlog until the queue is not full. The tracker handles workers whenever they connect, first come first serve.

};
SafeColl(rc);
}
Expand Down
7 changes: 0 additions & 7 deletions src/common/device_helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,6 @@ __global__ void LaunchNKernel(size_t begin, size_t end, L lambda) {
lambda(i);
}
}
template <typename L>
__global__ void LaunchNKernel(int device_idx, size_t begin, size_t end,
L lambda) {
for (auto i : GridStrideRange(begin, end)) {
lambda(i, device_idx);
}
}
Comment on lines -206 to -212

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see this is being removed because it's not used anywhere, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not used, small cleanup.


/* \brief A wrapper around kernel launching syntax, used to guard against empty input.
*
Expand Down
28 changes: 15 additions & 13 deletions src/tree/gpu_hist/row_partitioner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,10 @@ void SortPositionBatch(Context const* ctx, common::Span<const PerNodeData<OpData

// Value found by experimentation
const int kItemsThread = 12;
const int grid_size = xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);

SortPositionCopyKernel<kBlockSize, OpDataT>
<<<grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()>>>(batch_info_itr, ridx, ridx_tmp,
total_rows);
std::uint32_t const kGridSize =
xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);
dh::LaunchKernel{kGridSize, kBlockSize, 0, ctx->CUDACtx()->Stream()}(
SortPositionCopyKernel<kBlockSize, OpDataT>, batch_info_itr, ridx, ridx_tmp, total_rows);
}

struct NodePositionInfo {
Expand Down Expand Up @@ -211,13 +210,14 @@ XGBOOST_DEV_INLINE int GetPositionFromSegments(std::size_t idx,
return position;
}

template <int kBlockSize, typename RowIndexT, typename OpT>
template <int kBlockSize, typename OpT>
__global__ __launch_bounds__(kBlockSize) void FinalisePositionKernel(
const common::Span<const NodePositionInfo> d_node_info, bst_idx_t base_ridx,
const common::Span<const RowIndexT> d_ridx, common::Span<bst_node_t> d_out_position, OpT op) {
common::Span<const NodePositionInfo> d_node_info, bst_idx_t base_ridx,
common::Span<const cuda_impl::RowIndexT> d_ridx, common::Span<bst_node_t> d_out_position,
OpT op) {
for (auto idx : dh::GridStrideRange<std::size_t>(0, d_ridx.size())) {
auto position = GetPositionFromSegments(idx, d_node_info.data());
RowIndexT ridx = d_ridx[idx] - base_ridx;
cuda_impl::RowIndexT ridx = d_ridx[idx] - base_ridx;
bst_node_t new_position = op(ridx, position);
d_out_position[ridx] = new_position;
}
Expand Down Expand Up @@ -377,12 +377,14 @@ class RowPartitioner {
sizeof(NodePositionInfo) * ridx_segments_.size(),
cudaMemcpyDefault, ctx->CUDACtx()->Stream()));

constexpr int kBlockSize = 512;
constexpr std::uint32_t kBlockSize = 512;
const int kItemsThread = 8;
const int grid_size = xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
const std::uint32_t grid_size =
xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
common::Span<RowIndexT const> d_ridx{ridx_.data(), ridx_.size()};
FinalisePositionKernel<kBlockSize><<<grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()>>>(
dh::ToSpan(d_node_info_storage), base_ridx, d_ridx, d_out_position, op);
dh::LaunchKernel{grid_size, kBlockSize, 0, ctx->CUDACtx()->Stream()}(
FinalisePositionKernel<kBlockSize, FinalisePositionOpT>, dh::ToSpan(d_node_info_storage),
base_ridx, d_ridx, d_out_position, op);
}
};
}; // namespace xgboost::tree
45 changes: 40 additions & 5 deletions tests/cpp/tree/gpu_hist/test_row_partitioner.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
#include "../../../../src/data/ellpack_page.cuh"
#include "../../../../src/tree/gpu_hist/expand_entry.cuh" // for GPUExpandEntry
#include "../../../../src/tree/gpu_hist/row_partitioner.cuh"
#include "../../../../src/tree/param.h" // for TrainParam
#include "../../helpers.h" // for RandomDataGenerator
#include "../../../../src/tree/param.h" // for TrainParam
#include "../../collective/test_worker.h" // for TestDistributedGlobal
#include "../../helpers.h" // for RandomDataGenerator

namespace xgboost::tree {
void TestUpdatePositionBatch() {
Expand Down Expand Up @@ -61,7 +62,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
thrust::device_vector<cuda_impl::RowIndexT> ridx_tmp(ridx_in.size());
thrust::device_vector<cuda_impl::RowIndexT> counts(segments.size());

auto op = [=] __device__(auto ridx, int split_index, int data) { return ridx % 2 == 0; };
auto op = [=] __device__(auto ridx, int split_index, int data) {
return ridx % 2 == 0;
};
std::vector<int> op_data(segments.size());
std::vector<PerNodeData<int>> h_batch_info(segments.size());
dh::TemporaryArray<PerNodeData<int>> d_batch_info(segments.size());
Expand All @@ -79,7 +82,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
dh::ToSpan(ridx_tmp), dh::ToSpan(counts), total_rows, op,
&tmp);

auto op_without_data = [=] __device__(auto ridx) { return ridx % 2 == 0; };
auto op_without_data = [=] __device__(auto ridx) {
return ridx % 2 == 0;
};
for (size_t i = 0; i < segments.size(); i++) {
auto begin = ridx.begin() + segments[i].begin;
auto end = ridx.begin() + segments[i].end;
Expand All @@ -93,7 +98,7 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
}
}

TEST(GpuHist, SortPositionBatch) {
TEST(RowPartitioner, SortPositionBatch) {
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 3}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 1}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 6}});
Expand Down Expand Up @@ -178,4 +183,34 @@ void TestExternalMemory() {
} // anonymous namespace

TEST(RowPartitioner, LeafPartitionExternalMemory) { TestExternalMemory(); }

namespace {
void TestEmptyNode(std::int32_t n_workers) {
collective::TestDistributedGlobal(n_workers, [] {
auto ctx = MakeCUDACtx(DistGpuIdx());
RowPartitioner partitioner;
bst_idx_t n_samples = (collective::GetRank() == 0) ? 0 : 1024;
bst_idx_t base_rowid = 0;
partitioner.Reset(&ctx, n_samples, base_rowid);
std::vector<RegTree::Node> splits(1);
partitioner.UpdatePositionBatch(
&ctx, {0}, {1}, {2}, splits,
[] XGBOOST_DEVICE(bst_idx_t ridx, std::int32_t /*nidx_in_batch*/, RegTree::Node) {
return ridx < 3;
});
ASSERT_EQ(partitioner.GetNumNodes(), 3);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cumulative sum of left and right indices, in line 197 above: 1 + 2 = 3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The n_nodes_ internal variable is used for sanity checks, it's initialized with 1 with the root node included. The reasoning of choice is that we don't update the partitioner if there's no split for the root node, but the n_nodes >= 1 is an invariant for all cases since we must have a root node.

if (collective::GetRank() == 0) {
for (std::size_t i = 0; i < 3; ++i) {
ASSERT_TRUE(partitioner.GetRows(i).empty());
}
}
ctx.CUDACtx()->Stream().Sync();
});
}
} // anonymous namespace

TEST(RowPartitioner, MGPUEmpty) {
std::int32_t n_workers = curt::AllVisibleGPUs();
TestEmptyNode(n_workers);
}
} // namespace xgboost::tree
Loading