Skip to content

Commit

Permalink
Thread-safe prediction by making the prediction cache thread-local.
Browse files Browse the repository at this point in the history
Also remove prediction cache mutex, which did not help: a race
condition still occured whenever the cache was cleaned but at the
same time entries pointing into the cache were alive.
  • Loading branch information
boxdot committed Jul 3, 2020
1 parent 1a08012 commit 310ac93
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
1 change: 0 additions & 1 deletion include/xgboost/predictor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ struct PredictionCacheEntry {
class PredictionContainer {
std::unordered_map<DMatrix *, PredictionCacheEntry> container_;
void ClearExpiredEntries();
std::mutex cache_lock_;

public:
PredictionContainer() = default;
Expand Down
26 changes: 16 additions & 10 deletions src/learner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,12 @@ void GenericParameter::ConfigureGpuId(bool require_gpu) {
using XGBAPIThreadLocalStore =
dmlc::ThreadLocalStore<std::map<Learner const *, XGBAPIThreadLocalEntry>>;

using ThreadLocalPredictionCache = dmlc::ThreadLocalStore<PredictionContainer>;

class LearnerConfiguration : public Learner {
protected:
static std::string const kEvalMetric; // NOLINT

protected:
PredictionContainer cache_;

protected:
std::atomic<bool> need_configuration_;
std::map<std::string, std::string> cfg_;
Expand All @@ -229,7 +228,8 @@ class LearnerConfiguration : public Learner {
: need_configuration_{true} {
monitor_.Init("Learner");
for (std::shared_ptr<DMatrix> const& d : cache) {
cache_.Cache(d, GenericParameter::kCpuId);
auto local_cache = ThreadLocalPredictionCache::Get();
local_cache->Cache(d, GenericParameter::kCpuId);
}
}
// Configuration before data is known.
Expand Down Expand Up @@ -491,7 +491,8 @@ class LearnerConfiguration : public Learner {
if (mparam_.num_feature == 0) {
// TODO(hcho3): Change num_feature to 64-bit integer
unsigned num_feature = 0;
for (auto& matrix : cache_.Container()) {
auto local_cache = ThreadLocalPredictionCache::Get();
for (auto& matrix : local_cache->Container()) {
CHECK(matrix.first);
CHECK(!matrix.second.ref.expired());
const uint64_t num_col = matrix.first->Info().num_col_;
Expand Down Expand Up @@ -928,7 +929,8 @@ class LearnerImpl : public LearnerIO {
this->CheckDataSplitMode();
this->ValidateDMatrix(train.get());

auto& predt = this->cache_.Cache(train, generic_parameters_.gpu_id);
auto local_cache = ThreadLocalPredictionCache::Get();
auto& predt = local_cache->Cache(train, generic_parameters_.gpu_id);

monitor_.Start("PredictRaw");
this->PredictRaw(train.get(), &predt, true);
Expand All @@ -953,9 +955,10 @@ class LearnerImpl : public LearnerIO {
}
this->CheckDataSplitMode();
this->ValidateDMatrix(train.get());
this->cache_.Cache(train, generic_parameters_.gpu_id);
auto local_cache = ThreadLocalPredictionCache::Get();
local_cache->Cache(train, generic_parameters_.gpu_id);

gbm_->DoBoost(train.get(), in_gpair, &cache_.Entry(train.get()));
gbm_->DoBoost(train.get(), in_gpair, &local_cache->Entry(train.get()));
monitor_.Stop("BoostOneIter");
}

Expand All @@ -971,9 +974,11 @@ class LearnerImpl : public LearnerIO {
metrics_.emplace_back(Metric::Create(obj_->DefaultEvalMetric(), &generic_parameters_));
metrics_.back()->Configure({cfg_.begin(), cfg_.end()});
}

auto local_cache = ThreadLocalPredictionCache::Get();
for (size_t i = 0; i < data_sets.size(); ++i) {
std::shared_ptr<DMatrix> m = data_sets[i];
auto &predt = this->cache_.Cache(m, generic_parameters_.gpu_id);
auto &predt = local_cache->Cache(m, generic_parameters_.gpu_id);
this->ValidateDMatrix(m.get());
this->PredictRaw(m.get(), &predt, false);

Expand Down Expand Up @@ -1010,7 +1015,8 @@ class LearnerImpl : public LearnerIO {
} else if (pred_leaf) {
gbm_->PredictLeaf(data.get(), &out_preds->HostVector(), ntree_limit);
} else {
auto& prediction = cache_.Cache(data, generic_parameters_.gpu_id);
auto local_cache = ThreadLocalPredictionCache::Get();
auto& prediction = local_cache->Cache(data, generic_parameters_.gpu_id);
this->PredictRaw(data.get(), &prediction, training, ntree_limit);
// Copy the prediction cache to output prediction. out_preds comes from C API
out_preds->SetDevice(generic_parameters_.gpu_id);
Expand Down
1 change: 0 additions & 1 deletion src/predictor/predictor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ void PredictionContainer::ClearExpiredEntries() {
}

PredictionCacheEntry &PredictionContainer::Cache(std::shared_ptr<DMatrix> m, int32_t device) {
std::lock_guard<std::mutex> guard { cache_lock_ };
this->ClearExpiredEntries();
container_[m.get()].ref = m;
if (device != GenericParameter::kCpuId) {
Expand Down

0 comments on commit 310ac93

Please sign in to comment.