Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid omp reduction in coordinate descent and aft metrics. #7316

Merged
merged 9 commits into from
Oct 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/linear/coordinate_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,32 @@ inline std::pair<double, double> GetGradient(int group_idx, int num_group, int f
*
* \return The gradient and diagonal Hessian entry for a given feature.
*/
inline std::pair<double, double> GetGradientParallel(int group_idx, int num_group, int fidx,
const std::vector<GradientPair> &gpair,
DMatrix *p_fmat) {
double sum_grad = 0.0, sum_hess = 0.0;
inline std::pair<double, double>
GetGradientParallel(GenericParameter const *ctx, int group_idx, int num_group,
int fidx, const std::vector<GradientPair> &gpair,
DMatrix *p_fmat) {
std::vector<double> sum_grad_tloc(ctx->Threads(), 0.0);
std::vector<double> sum_hess_tloc(ctx->Threads(), 0.0);

for (const auto &batch : p_fmat->GetBatches<CSCPage>()) {
auto page = batch.GetView();
auto col = page[fidx];
const auto ndata = static_cast<bst_omp_uint>(col.size());
dmlc::OMPException exc;
#pragma omp parallel for schedule(static) reduction(+ : sum_grad, sum_hess)
for (bst_omp_uint j = 0; j < ndata; ++j) {
exc.Run([&]() {
const bst_float v = col[j].fvalue;
auto &p = gpair[col[j].index * num_group + group_idx];
if (p.GetHess() < 0.0f) return;
sum_grad += p.GetGrad() * v;
sum_hess += p.GetHess() * v * v;
});
}
exc.Rethrow();
common::ParallelFor(ndata, ctx->Threads(), [&](size_t j) {
const bst_float v = col[j].fvalue;
auto &p = gpair[col[j].index * num_group + group_idx];
if (p.GetHess() < 0.0f) {
return;
}
auto t_idx = omp_get_thread_num();
sum_grad_tloc[t_idx] += p.GetGrad() * v;
sum_hess_tloc[t_idx] += p.GetHess() * v * v;
});
}
double sum_grad =
std::accumulate(sum_grad_tloc.cbegin(), sum_grad_tloc.cend(), 0.0);
double sum_hess =
std::accumulate(sum_hess_tloc.cbegin(), sum_hess_tloc.cend(), 0.0);
return std::make_pair(sum_grad, sum_hess);
}

Expand Down
4 changes: 2 additions & 2 deletions src/linear/updater_coordinate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class CoordinateUpdater : public LinearUpdater {
DMatrix *p_fmat, gbm::GBLinearModel *model) {
const int ngroup = model->learner_model_param->num_output_group;
bst_float &w = (*model)[fidx][group_idx];
auto gradient =
GetGradientParallel(group_idx, ngroup, fidx, *in_gpair, p_fmat);
auto gradient = GetGradientParallel(learner_param_, group_idx, ngroup, fidx,
*in_gpair, p_fmat);
auto dw = static_cast<float>(
tparam_.learning_rate *
CoordinateDelta(gradient.first, gradient.second, w, tparam_.reg_alpha_denorm,
Expand Down
104 changes: 51 additions & 53 deletions src/metric/survival_metric.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "metric_common.h"
#include "../common/math.h"
#include "../common/survival_util.h"
#include "../common/threading_utils.h"

#if defined(XGBOOST_USE_CUDA)
#include <thrust/execution_policy.h> // thrust::cuda::par
Expand All @@ -42,11 +43,12 @@ class ElementWiseSurvivalMetricsReduction {
policy_ = policy;
}

PackedReduceResult CpuReduceMetrics(
const HostDeviceVector<bst_float>& weights,
const HostDeviceVector<bst_float>& labels_lower_bound,
const HostDeviceVector<bst_float>& labels_upper_bound,
const HostDeviceVector<bst_float>& preds) const {
PackedReduceResult
CpuReduceMetrics(const HostDeviceVector<bst_float> &weights,
const HostDeviceVector<bst_float> &labels_lower_bound,
const HostDeviceVector<bst_float> &labels_upper_bound,
const HostDeviceVector<bst_float> &preds,
int32_t n_threads) const {
size_t ndata = labels_lower_bound.Size();
CHECK_EQ(ndata, labels_upper_bound.Size());

Expand All @@ -55,22 +57,24 @@ class ElementWiseSurvivalMetricsReduction {
const auto& h_weights = weights.HostVector();
const auto& h_preds = preds.HostVector();

double residue_sum = 0;
double weights_sum = 0;

dmlc::OMPException exc;
#pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static)
for (omp_ulong i = 0; i < ndata; ++i) {
exc.Run([&]() {
const double wt = h_weights.empty() ? 1.0 : static_cast<double>(h_weights[i]);
residue_sum += policy_.EvalRow(
static_cast<double>(h_labels_lower_bound[i]),
static_cast<double>(h_labels_upper_bound[i]),
static_cast<double>(h_preds[i])) * wt;
weights_sum += wt;
});
}
exc.Rethrow();
std::vector<double> score_tloc(n_threads, 0.0);
std::vector<double> weight_tloc(n_threads, 0.0);

common::ParallelFor(ndata, n_threads, [&](size_t i) {
const double wt =
h_weights.empty() ? 1.0 : static_cast<double>(h_weights[i]);
auto t_idx = omp_get_thread_num();
score_tloc[t_idx] +=
policy_.EvalRow(static_cast<double>(h_labels_lower_bound[i]),
static_cast<double>(h_labels_upper_bound[i]),
static_cast<double>(h_preds[i])) *
wt;
weight_tloc[t_idx] += wt;
});

double residue_sum = std::accumulate(score_tloc.cbegin(), score_tloc.cend(), 0.0);
double weights_sum = std::accumulate(weight_tloc.cbegin(), weight_tloc.cend(), 0.0);

PackedReduceResult res{residue_sum, weights_sum};
return res;
}
Expand Down Expand Up @@ -119,25 +123,25 @@ class ElementWiseSurvivalMetricsReduction {
#endif // XGBOOST_USE_CUDA

PackedReduceResult Reduce(
int device,
const GenericParameter &ctx,
const HostDeviceVector<bst_float>& weights,
const HostDeviceVector<bst_float>& labels_lower_bound,
const HostDeviceVector<bst_float>& labels_upper_bound,
const HostDeviceVector<bst_float>& preds) {
PackedReduceResult result;

if (device < 0) {
result = CpuReduceMetrics(weights, labels_lower_bound, labels_upper_bound, preds);
if (ctx.gpu_id < 0) {
result = CpuReduceMetrics(weights, labels_lower_bound, labels_upper_bound,
preds, ctx.Threads());
}
#if defined(XGBOOST_USE_CUDA)
else { // NOLINT
device_ = device;
preds.SetDevice(device_);
labels_lower_bound.SetDevice(device_);
labels_upper_bound.SetDevice(device_);
weights.SetDevice(device_);
preds.SetDevice(ctx.gpu_id);
labels_lower_bound.SetDevice(ctx.gpu_id);
labels_upper_bound.SetDevice(ctx.gpu_id);
weights.SetDevice(ctx.gpu_id);

dh::safe_cuda(cudaSetDevice(device_));
dh::safe_cuda(cudaSetDevice(ctx.gpu_id));
result = DeviceReduceMetrics(weights, labels_lower_bound, labels_upper_bound, preds);
}
#endif // defined(XGBOOST_USE_CUDA)
Expand All @@ -146,9 +150,6 @@ class ElementWiseSurvivalMetricsReduction {

private:
EvalRow policy_;
#if defined(XGBOOST_USE_CUDA)
int device_{-1};
#endif // defined(XGBOOST_USE_CUDA)
};

struct EvalIntervalRegressionAccuracy {
Expand Down Expand Up @@ -193,28 +194,27 @@ struct EvalAFTNLogLik {
AFTParam param_;
};

template<typename Policy>
struct EvalEWiseSurvivalBase : public Metric {
template <typename Policy> struct EvalEWiseSurvivalBase : public Metric {
explicit EvalEWiseSurvivalBase(GenericParameter const *ctx) {
tparam_ = ctx;
}
EvalEWiseSurvivalBase() = default;

void Configure(const Args& args) override {
policy_.Configure(args);
for (const auto& e : args) {
if (e.first == "gpu_id") {
device_ = dmlc::ParseSignedInt<int>(e.second.c_str(), nullptr, 10);
}
}
reducer_.Configure(policy_);
CHECK(tparam_);
}

bst_float Eval(const HostDeviceVector<bst_float>& preds,
const MetaInfo& info,
bool distributed) override {
CHECK_EQ(preds.Size(), info.labels_lower_bound_.Size());
CHECK_EQ(preds.Size(), info.labels_upper_bound_.Size());

auto result = reducer_.Reduce(
device_, info.weights_, info.labels_lower_bound_, info.labels_upper_bound_, preds);
CHECK(tparam_);
auto result =
reducer_.Reduce(*tparam_, info.weights_, info.labels_lower_bound_,
info.labels_upper_bound_, preds);

double dat[2] {result.Residue(), result.Weights()};

Expand Down Expand Up @@ -252,24 +252,22 @@ struct AFTNLogLikDispatcher : public Metric {
param_.UpdateAllowUnknown(args);
switch (param_.aft_loss_distribution) {
case common::ProbabilityDistributionType::kNormal:
metric_.reset(new EvalEWiseSurvivalBase<EvalAFTNLogLik<common::NormalDistribution>>());
metric_.reset(
new EvalEWiseSurvivalBase<EvalAFTNLogLik<common::NormalDistribution>>(
tparam_));
break;
case common::ProbabilityDistributionType::kLogistic:
metric_.reset(new EvalEWiseSurvivalBase<EvalAFTNLogLik<common::LogisticDistribution>>());
metric_.reset(new EvalEWiseSurvivalBase<
EvalAFTNLogLik<common::LogisticDistribution>>(tparam_));
break;
case common::ProbabilityDistributionType::kExtreme:
metric_.reset(new EvalEWiseSurvivalBase<EvalAFTNLogLik<common::ExtremeDistribution>>());
metric_.reset(new EvalEWiseSurvivalBase<
EvalAFTNLogLik<common::ExtremeDistribution>>(tparam_));
break;
default:
LOG(FATAL) << "Unknown probability distribution";
}
Args new_args{args};
// tparam_ doesn't get propagated to the inner metric object because we didn't use
// Metric::Create(). I don't think it's a good idea to pollute the metric registry with
// specialized versions of the AFT metric, so as a work-around, manually pass the GPU ID
// into the inner metric via configuration.
new_args.emplace_back("gpu_id", std::to_string(tparam_->gpu_id));
metric_->Configure(new_args);
metric_->Configure(args);
}

void SaveConfig(Json* p_out) const override {
Expand Down
39 changes: 39 additions & 0 deletions tests/cpp/metric/test_survival_metric.cu
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,41 @@

namespace xgboost {
namespace common {
namespace {
inline void CheckDeterministicMetricElementWise(StringView name, int32_t device) {
auto lparam = CreateEmptyGenericParam(device);
std::unique_ptr<Metric> metric{Metric::Create(name.c_str(), &lparam)};
metric->Configure(Args{});

HostDeviceVector<float> predts;
MetaInfo info;
auto &h_predts = predts.HostVector();

SimpleLCG lcg;
SimpleRealUniformDistribution<float> dist{0.0f, 1.0f};

size_t n_samples = 2048;
h_predts.resize(n_samples);

for (size_t i = 0; i < n_samples; ++i) {
h_predts[i] = dist(&lcg);
}

auto &h_upper = info.labels_upper_bound_.HostVector();
auto &h_lower = info.labels_lower_bound_.HostVector();
h_lower.resize(n_samples);
h_upper.resize(n_samples);
for (size_t i = 0; i < n_samples; ++i) {
h_lower[i] = 1;
h_upper[i] = 10;
}

auto result = metric->Eval(predts, info, false);
for (size_t i = 0; i < 8; ++i) {
ASSERT_EQ(metric->Eval(predts, info, false), result);
}
}
} // anonymous namespace

TEST(Metric, DeclareUnifiedTest(AFTNegLogLik)) {
auto lparam = xgboost::CreateEmptyGenericParam(GPUIDX);
Expand Down Expand Up @@ -61,6 +96,8 @@ TEST(Metric, DeclareUnifiedTest(IntervalRegressionAccuracy)) {
EXPECT_FLOAT_EQ(metric->Eval(preds, info, false), 0.50f);
info.labels_lower_bound_.HostVector()[0] = 70.0f;
EXPECT_FLOAT_EQ(metric->Eval(preds, info, false), 0.25f);

CheckDeterministicMetricElementWise(StringView{"interval-regression-accuracy"}, GPUIDX);
}

// Test configuration of AFT metric
Expand All @@ -75,6 +112,8 @@ TEST(AFTNegLogLikMetric, DeclareUnifiedTest(Configuration)) {
auto aft_param_json = j_obj["aft_loss_param"];
EXPECT_EQ(get<String>(aft_param_json["aft_loss_distribution"]), "normal");
EXPECT_EQ(get<String>(aft_param_json["aft_loss_distribution_scale"]), "10");

CheckDeterministicMetricElementWise(StringView{"aft-nloglik"}, GPUIDX);
}

} // namespace common
Expand Down