Skip to content

Commit

Permalink
Merge pull request #10 from laipaang/auc-runner-pr
Browse files Browse the repository at this point in the history
perf: auc runner
  • Loading branch information
qingshui authored Jan 6, 2021
2 parents a9b21c6 + 89805f7 commit ec1db44
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 149 deletions.
213 changes: 211 additions & 2 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ limitations under the License. */
#include <semaphore.h>

#include <algorithm>
#include <deque>
#include <fstream>
#include <future> // NOLINT
#include <memory>
#include <mutex> // NOLINT
#include <queue>
#include <set>
#include <sstream>
#include <string>
#include <thread> // NOLINT
Expand All @@ -50,7 +53,7 @@ USE_INT_STAT(STAT_slot_pool_size);
DECLARE_int32(padbox_record_pool_max_size);
DECLARE_int32(padbox_slotpool_thread_num);
DECLARE_int32(padbox_slotrecord_extend_dim);
DECLARE_int32(padbox_slotrecord_extptr_num);
DECLARE_bool(padbox_auc_runner_mode);

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -817,6 +820,13 @@ struct SlotValues {
slot_values.clear();
}
};

struct AucRunnerInfo {
size_t record_id_;
size_t pool_id_;
size_t replaced_id_;
};

// sizeof Record is much less than std::vector<MultiSlotType>
struct SlotRecordObject {
uint64_t search_id;
Expand All @@ -842,12 +852,19 @@ inline SlotRecord make_slotrecord() {
static const size_t slot_record_byte_size =
sizeof(SlotRecordObject) +
sizeof(float) * FLAGS_padbox_slotrecord_extend_dim +
sizeof(void*) * FLAGS_padbox_slotrecord_extptr_num;
sizeof(AucRunnerInfo) * static_cast<int>(FLAGS_padbox_auc_runner_mode);
void* p = malloc(slot_record_byte_size);
new (p) SlotRecordObject;
return reinterpret_cast<SlotRecordObject*>(p);
}

inline AucRunnerInfo* get_auc_runner_info(SlotRecord record) {
auto offset = sizeof(SlotRecordObject) +
sizeof(float) * FLAGS_padbox_slotrecord_extend_dim;
return reinterpret_cast<AucRunnerInfo*>(
&(reinterpret_cast<char*>(record)[offset]));
}

inline void free_slotrecord(SlotRecordObject* p) {
p->~SlotRecordObject();
free(p);
Expand Down Expand Up @@ -1019,6 +1036,198 @@ inline SlotObjPool& SlotRecordPool() {
return pool;
}

using FeasignValues = SlotValues<uint64_t>;

struct FeasignValuesCandidate {
// slot -> slot_values
std::unordered_map<uint16_t, std::vector<uint64_t>> feasign_values_;
std::vector<FeasignValuesCandidate>* candidates_v_ = nullptr;
size_t offset_ = 0;
size_t replaced_idx_ = 0;

FeasignValuesCandidate() {}

FeasignValuesCandidate(const FeasignValues& values,
const std::set<uint16_t>& slots_idx) {
for (auto idx : slots_idx) {
const auto& begin = values.slot_values.begin() + values.slot_offsets[idx];
const auto& end =
values.slot_values.begin() + values.slot_offsets[idx + 1];
feasign_values_.emplace(idx, std::vector<uint64_t>(begin, end));
}
}
};

class FeasignValuesCandidateList {
public:
FeasignValuesCandidateList() = default;
FeasignValuesCandidateList(const FeasignValuesCandidateList&) {}

size_t Size() { return size_; }
bool Full() { return size_ == capacity_; }

void Resize(size_t len) {
capacity_ = len;
id_ = 0;
size_ = 0;

candidates_.resize(capacity_);

new_candidates_q_.clear();
candidates_cache_.clear();
}

void SetReplacedSlots(const std::set<uint16_t>& replaced_slots) {
replaced_slots_ = replaced_slots;
}

size_t AddAndGet(const FeasignValues& values) {
size_t replaced_id = 0;
auto& random = FleetWrapper::GetInstance()->LocalRandomEngine();

++id_;
if (!Full()) {
candidates_[size_] = FeasignValuesCandidate(values, replaced_slots_);
++size_;
} else {
replaced_id = random() % id_;
if (replaced_id < capacity_) {
size_t cur_offset = new_candidates_q_.back().size();
new_candidates_q_.back().emplace_back(values, replaced_slots_);
new_candidates_q_.back()[cur_offset].replaced_idx_ = replaced_id;

candidates_[replaced_id].candidates_v_ = &new_candidates_q_.back();
candidates_[replaced_id].offset_ = cur_offset;
}
}

replaced_id = random() % size_;
if (candidates_[replaced_id].candidates_v_ == &new_candidates_q_.back()) {
replaced_id = size_ + candidates_[replaced_id].offset_;
}

return replaced_id;
}

FeasignValuesCandidate& GetUseId(size_t id) {
CHECK(id < candidates_.size());
if (candidates_[id].candidates_v_ == nullptr) {
return candidates_[id];
}
return candidates_[id].candidates_v_->at(candidates_[id].offset_);
}

FeasignValuesCandidate& GetUseReplaceId(size_t replaced_id) {
if (replaced_id >= capacity_) {
CHECK(replaced_id - capacity_ < new_candidates_q_.front().size());
return new_candidates_q_.front()[replaced_id - capacity_];
}

return candidates_[replaced_id];
}

void Push() {
std::lock_guard<std::mutex> lock(mutex4cache_);

if (!candidates_cache_.empty()) {
candidates_cache_.front().clear();
new_candidates_q_.push_back(std::move(candidates_cache_.front()));
candidates_cache_.pop_front();
} else {
new_candidates_q_.push_back({});
}
}

void Pop() {
for (auto& c : new_candidates_q_.front()) {
if (candidates_[c.replaced_idx_].candidates_v_ ==
&new_candidates_q_.front()) {
candidates_[c.replaced_idx_].candidates_v_ = nullptr;
}
candidates_[c.replaced_idx_].feasign_values_ =
std::move(c.feasign_values_);
}

std::lock_guard<std::mutex> lock(mutex4cache_);
candidates_cache_.push_back(std::move(new_candidates_q_.front()));

new_candidates_q_.pop_front();
}

private:
size_t capacity_ = 0;
size_t id_ = 0;
size_t size_ = 0;

std::set<uint16_t> replaced_slots_;

std::vector<FeasignValuesCandidate> candidates_;
std::deque<std::vector<FeasignValuesCandidate>> new_candidates_q_;
std::deque<std::vector<FeasignValuesCandidate>> candidates_cache_;

std::mutex mutex4cache_;
};

struct FeasignValuesReplacer {
std::vector<std::vector<uint64_t>> feasign_values_;

void replace(FeasignValues* fea,
const std::unordered_map<uint16_t, std::vector<uint64_t>>& vals,
const std::set<uint16_t>& slot_idxs, int* del_num,
int* add_num) {
size_t i = 0;
feasign_values_.resize(slot_idxs.size());
for (auto idx : slot_idxs) {
const auto& begin = fea->slot_values.begin() + fea->slot_offsets[idx];
const auto& end = fea->slot_values.begin() + fea->slot_offsets[idx + 1];

feasign_values_[i].insert(feasign_values_[i].begin(), begin, end);
fea->slot_values.erase(begin, end);
fea->slot_values.insert(begin, vals.at(idx).begin(), vals.at(idx).end());

// update offset
int diff = static_cast<int>(vals.at(idx).size()) -
static_cast<int>(feasign_values_[i].size());
for (size_t i = idx + 1; i < fea->slot_offsets.size(); ++i) {
fea->slot_offsets[i] += diff;
}

*del_num += feasign_values_[i].size();
*add_num += vals.at(idx).size();

++i;
}
}

void replace_back(FeasignValues* fea, const std::set<uint16_t>& slot_idxs,
int* del_num, int* add_num) {
size_t i = 0;
for (auto idx : slot_idxs) {
const auto& begin = fea->slot_values.begin() + fea->slot_offsets[idx];
const auto& end = fea->slot_values.begin() + fea->slot_offsets[idx + 1];
size_t dis = std::distance(begin, end);

fea->slot_values.erase(begin, end);
fea->slot_values.insert(begin, feasign_values_[i].begin(),
feasign_values_[i].end());

// update offset
int diff =
static_cast<int>(feasign_values_[i].size()) - static_cast<int>(dis);
for (size_t i = idx + 1; i < fea->slot_offsets.size(); ++i) {
fea->slot_offsets[i] += diff;
}

*del_num += dis;
*add_num += feasign_values_[i].size();

++i;
}

feasign_values_.clear();
}
};

struct AllSlotInfo {
std::string slot;
std::string type;
Expand Down
7 changes: 7 additions & 0 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1836,6 +1836,13 @@ void PadBoxSlotDataset::PreprocessInstance() {
return;
}

if (!input_pv_ins_.empty()) { // for auc runner
for (auto pv : input_pv_ins_) {
delete pv;
}
input_pv_ins_.clear();
}

size_t all_records_num = input_records_.size();
std::sort(input_records_.data(), input_records_.data() + all_records_num,
[](const SlotRecord& lhs, const SlotRecord& rhs) {
Expand Down
16 changes: 16 additions & 0 deletions paddle/fluid/framework/data_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,22 @@ class DatasetImpl : public Dataset {
bool discard_remaining_ins = false);
virtual void DynamicAdjustReadersNum(int thread_num);
virtual void SetFleetSendSleepSeconds(int seconds);
virtual std::vector<T>& GetInputRecord() { return input_records_; }

virtual std::set<uint16_t> GetSlotsIdx(
const std::set<std::string>& str_slots) {
std::set<uint16_t> slots_idx;

auto multi_slot_desc = data_feed_desc_.multi_slot_desc();
for (int i = 0; i < multi_slot_desc.slots_size(); ++i) {
std::string cur_slot = multi_slot_desc.slots(i).name();
if (str_slots.find(cur_slot) != str_slots.end()) {
slots_idx.insert(i);
}
}

return slots_idx;
}

protected:
virtual int ReceiveFromClient(int msg_type, int client_id,
Expand Down
Loading

0 comments on commit ec1db44

Please sign in to comment.