Skip to content

Commit

Permalink
Revert to using the version with independent atomic variables.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Apr 23, 2024
1 parent 0972a7b commit c1d6ea8
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 110 deletions.
71 changes: 27 additions & 44 deletions src/storage/src/log_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

#include <algorithm>
#include <cinttypes>
#include <mutex>
#include <set>
#include <shared_mutex>

#include "redis.h"

Expand All @@ -26,53 +24,42 @@ rocksdb::Status storage::LogIndexOfColumnFamilies::Init(Redis *db) {
}
auto res = LogIndexTablePropertiesCollector::GetLargestLogIndexFromTableCollection(collection);
if (res.has_value()) {
cf_[i].applied_log_index.log_index.store(res->GetAppliedLogIndex());
cf_[i].applied_log_index.seqno.store(res->GetSequenceNumber());
cf_[i].flushed_log_index.log_index.store(res->GetAppliedLogIndex());
cf_[i].flushed_log_index.seqno.store(res->GetSequenceNumber());
auto log_index = res->GetAppliedLogIndex();
auto sequence_number = res->GetSequenceNumber();
cf_[i].applied_index.SetLogIndexSeqnoPair(log_index, sequence_number);
cf_[i].flushed_index.SetLogIndexSeqnoPair(log_index, sequence_number);
}
}
return Status::OK();
}

std::tuple<int, LogIndex, SequenceNumber, int, LogIndex> LogIndexOfColumnFamilies::GetSmallestLogIndex() const {
auto smallest_applied_log_index = std::numeric_limits<LogIndex>::max();
auto smallest_flushed_log_index = std::numeric_limits<LogIndex>::max();
auto smallest_flushed_seqno = std::numeric_limits<SequenceNumber>::max();
auto smallest_applied_log_index_cf = -1;
auto smallest_flushed_log_index_cf = -1;
LogIndexOfColumnFamilies::SmallestIndexRes LogIndexOfColumnFamilies::GetSmallestLogIndex() const {
SmallestIndexRes res;
for (int i = 0; i < cf_.size(); i++) {
// 同一个 CF 以及不同的 CF 的 Flush 事件可能并发, 所有每一个 CF 的 Flushed LogIndex 和 Applied Flushed LogIndex
// 还可能向前推进.故最后找出的 min 值可能小于真正的 min 值, 但是不影响正确性. 考虑一种情况:某一个 cf
// 刚好把所有的数据 flush, 此时 Flushed LogIndex == Applied LogIndex, 但是不能将当前 cf 跳过. 所以还需要判断当前 cf
// 的 Flushed seq 与 last min flushed seq 的大小.
if (cf_[i].flushed_log_index.seqno <= last_min_flushed_seqno_.load() &&
cf_[i].flushed_log_index == cf_[i].flushed_log_index) {
if (cf_[i].flushed_index <= last_flush_index_ && cf_[i].flushed_index == cf_[i].applied_index) {
continue;
}
auto applied_log_index = cf_[i].applied_log_index.log_index.load();
auto flushed_log_index = cf_[i].flushed_log_index.log_index.load();
auto flushed_seqno = cf_[i].flushed_log_index.seqno.load();
// 此时会读到中间状态, 导致读到到 LogIndex 和 Seq 并不是真正的对应关系.
if (applied_log_index < smallest_applied_log_index) {
smallest_applied_log_index = applied_log_index;
smallest_applied_log_index_cf = i;
auto applied_log_index = cf_[i].applied_index.GetLogIndex();
auto flushed_log_index = cf_[i].flushed_index.GetLogIndex();
auto flushed_seqno = cf_[i].flushed_index.GetSequenceNumber();
if (applied_log_index < res.smallest_applied_log_index) {
res.smallest_applied_log_index = applied_log_index;
res.smallest_applied_log_index_cf = i;
}
if (flushed_log_index < smallest_flushed_log_index) {
smallest_flushed_log_index = flushed_log_index;
smallest_flushed_seqno = flushed_seqno;
smallest_flushed_log_index_cf = i;
if (flushed_log_index < res.smallest_flushed_log_index) {
res.smallest_flushed_log_index = flushed_log_index;
res.smallest_flushed_seqno = flushed_seqno;
res.smallest_flushed_log_index_cf = i;
}
}
return {smallest_flushed_log_index_cf, smallest_flushed_log_index, smallest_flushed_seqno,
smallest_applied_log_index_cf, smallest_applied_log_index};
return res;
}

bool LogIndexOfColumnFamilies::IsPendingFlush() const {
std::set<int> s;
for (int i = 0; i < kColumnFamilyNum; i++) {
s.insert(cf_[i].applied_log_index.log_index);
s.insert(cf_[i].flushed_log_index.log_index);
s.insert(cf_[i].applied_index.GetLogIndex());
s.insert(cf_[i].flushed_index.GetLogIndex());
}
assert(!s.empty());
if (s.size() == 1) {
Expand Down Expand Up @@ -122,10 +109,8 @@ LogIndex LogIndexAndSequenceCollector::FindAppliedLogIndex(SequenceNumber seqno)
}

void LogIndexAndSequenceCollector::Update(LogIndex smallest_applied_log_index, SequenceNumber smallest_flush_seqno) {
/*
If step length > 1, log index is sampled and sacrifice precision to save memory usage.
It means that extra applied log may be applied again on start stage.
*/
// If step length > 1, log index is sampled and sacrifice precision to save memory usage.
// It means that extra applied log may be applied again on start stage.
if ((smallest_applied_log_index & step_length_mask_) == 0) {
std::lock_guard gd(mutex_);
list_.emplace_back(smallest_applied_log_index, smallest_flush_seqno);
Expand All @@ -134,13 +119,11 @@ void LogIndexAndSequenceCollector::Update(LogIndex smallest_applied_log_index, S

// TODO(longfar): find the iterator which should be deleted and erase from begin to the iterator
void LogIndexAndSequenceCollector::Purge(LogIndex smallest_applied_log_index) {
/*
* The reason that we use smallest applied log index of all column families instead of smallest flushed log index is
* that the log index corresponding to the largest sequence number in the next flush must be greater than or equal to
* the smallest applied log index at this moment.
* So we just need to make sure that there is an element in the queue which is less than or equal to the smallest
* applied log index to ensure that we can find a correct log index while doing next flush.
*/
// The reason that we use smallest applied log index of all column families instead of smallest flushed log index is
// that the log index corresponding to the largest sequence number in the next flush must be greater than or equal to
// the smallest applied log index at this moment.
// So we just need to make sure that there is an element in the queue which is less than or equal to the smallest
// applied log index to ensure that we can find a correct log index while doing next flush.
std::lock_guard gd(mutex_);
if (list_.size() < 2) {
return;
Expand Down
119 changes: 53 additions & 66 deletions src/storage/src/log_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,80 +51,84 @@ struct LogIndexSeqnoPair {
std::atomic<LogIndex> log_index = 0;
std::atomic<SequenceNumber> seqno = 0;

LogIndexSeqnoPair() = default;
LogIndex GetLogIndex() const { return log_index.load(); }

LogIndexSeqnoPair(LogIndex log_index, SequenceNumber seqno) : log_index(log_index), seqno(seqno) {}
SequenceNumber GetSequenceNumber() const { return seqno.load(); }

void SetAppliedLogIndex(LogIndex applied_log_index) { log_index.store(applied_log_index); }
void SetSequenceNumber(SequenceNumber seqnum) { seqno.store(seqnum); }
void SetLogIndexSeqnoPair(LogIndex l, SequenceNumber s) {
log_index.store(l);
seqno.store(s);
}

LogIndex GetAppliedLogIndex() const { return log_index.load(); }
SequenceNumber GetSequenceNumber() const { return seqno.load(); }
LogIndexSeqnoPair() = default;

bool operator==(const LogIndexSeqnoPair &other) const { return seqno.load() == other.seqno.load(); }

bool operator==(const LogIndexSeqnoPair &other) const { return seqno == other.seqno; }
bool operator<=(const LogIndexSeqnoPair &other) const { return seqno.load() <= other.seqno.load(); }
};

class LogIndexOfColumnFamilies {
struct LogIndexPair {
LogIndexSeqnoPair applied_log_index; // newest record in memtable.
LogIndexSeqnoPair flushed_log_index; // newest record in sst file.
LogIndexSeqnoPair applied_index; // newest record in memtable.
LogIndexSeqnoPair flushed_index; // newest record in sst file.
};

struct SmallestIndexRes {
int smallest_applied_log_index_cf = -1;
LogIndex smallest_applied_log_index = std::numeric_limits<LogIndex>::max();

int smallest_flushed_log_index_cf = -1;
LogIndex smallest_flushed_log_index = std::numeric_limits<LogIndex>::max();
SequenceNumber smallest_flushed_seqno = std::numeric_limits<SequenceNumber>::max();
};

public:
// Read the largest log index of each column family from all sst files
rocksdb::Status Init(Redis *db);

std::tuple<int, LogIndex, SequenceNumber, int, LogIndex> GetSmallestLogIndex() const;
SmallestIndexRes GetSmallestLogIndex() const;

// LogIndex GetSmallestFlushedLogIndex() const {
// return GetSmallestLogIndex([](const LogIndexPair &p) { return p.flushed_log_index.load(); });
// }
void SetFlushedLogIndex(size_t cf_id, LogIndex log_index, SequenceNumber seqno) {
// 当同一个 cf 的两个 flush 同时发生的时候, 都会调用该函数, 非原子的更新两个值并的不会影响最终正确性.
// 但是会有中间状态. 但我们只需要保证 Flushed LogIndex 是正确的, Seq 只作为是否存在数据的判断
cf_[cf_id].flushed_log_index.log_index.store(std::max(cf_[cf_id].flushed_log_index.log_index.load(), log_index));
cf_[cf_id].flushed_log_index.seqno.store(std::max(cf_[cf_id].flushed_log_index.seqno.load(), seqno));
cf_[cf_id].applied_index.log_index.store(std::max(cf_[cf_id].applied_index.log_index.load(), log_index));
cf_[cf_id].flushed_index.seqno.store(std::max(cf_[cf_id].flushed_index.seqno.load(), seqno));
}

void SetFlushedLogIndexGlobal(LogIndex log_index, SequenceNumber seqno) {
SetLastMinFlushStatus(log_index, seqno);
// 对"不包含数据的 CF" 提升 Flushed 点位.
SetLastFlushIndex(log_index, seqno);
for (int i = 0; i < kColumnFamilyNum; i++) {
if (cf_[i].flushed_log_index == cf_[i].applied_log_index) {
// 只要该值能能够被提升, 那么当需要主动获取最小 Flushed 点位作为日志截断点时, 就不会收到一直不更新 CF 的影响.
cf_[i].flushed_log_index.log_index.store(std::max(cf_[i].flushed_log_index.log_index.load(), log_index));
cf_[i].flushed_log_index.seqno.store(std::max(cf_[i].flushed_log_index.seqno.load(), seqno));
if (cf_[i].flushed_index <= last_flush_index_) {
auto flush_log_index = std::max(cf_[i].flushed_index.GetLogIndex(), last_flush_index_.GetLogIndex());
auto flush_sequence_number =
std::max(cf_[i].flushed_index.GetSequenceNumber(), last_flush_index_.GetSequenceNumber());
cf_[i].flushed_index.SetLogIndexSeqnoPair(flush_log_index, flush_sequence_number);
}
}
}

bool IsApplied(size_t cf_id, LogIndex cur_log_index) const {
return cur_log_index < cf_[cf_id].applied_log_index.log_index.load();
return cur_log_index < cf_[cf_id].applied_index.GetLogIndex();
}
void Update(size_t cf_id, LogIndex cur_log_index, SequenceNumber seqno) {
if (cf_[cf_id].flushed_log_index == cf_[cf_id].applied_log_index) {
// 利用 max 保证最终的一致性. 但是会有中间状态.
cf_[cf_id].flushed_log_index.log_index.store(
std::max(cf_[cf_id].flushed_log_index.log_index.load(), last_min_flushed_logindex_.load()));
cf_[cf_id].flushed_log_index.seqno.store(
std::max(cf_[cf_id].flushed_log_index.seqno.load(), last_min_flushed_seqno_.load()));
void Update(size_t cf_id, LogIndex cur_log_index, SequenceNumber cur_seqno) {
if (cf_[cf_id].flushed_index <= last_flush_index_ && cf_[cf_id].flushed_index == cf_[cf_id].applied_index) {
auto flush_log_index = std::max(cf_[cf_id].flushed_index.GetLogIndex(), last_flush_index_.GetLogIndex());
auto flush_sequence_number =
std::max(cf_[cf_id].flushed_index.GetSequenceNumber(), last_flush_index_.GetSequenceNumber());
cf_[cf_id].flushed_index.SetLogIndexSeqnoPair(flush_log_index, flush_sequence_number);
}

cf_[cf_id].applied_log_index.log_index.store(
std::max(cf_[cf_id].applied_log_index.log_index.load(), cur_log_index));
cf_[cf_id].applied_log_index.seqno.store(std::max(cf_[cf_id].applied_log_index.seqno.load(), seqno));
cf_[cf_id].applied_index.SetLogIndexSeqnoPair(cur_log_index, cur_seqno);
}
bool IsPendingFlush() const;

void SetLastMinFlushStatus(LogIndex min_flushed_logindex, SequenceNumber min_flushed_seqno) {
last_min_flushed_logindex_.store(std::max(last_min_flushed_logindex_.load(), min_flushed_logindex));
last_min_flushed_seqno_.store(std::max(last_min_flushed_seqno_.load(), min_flushed_seqno));
void SetLastFlushIndex(LogIndex flushed_logindex, SequenceNumber flushed_seqno) {
auto lastest_flush_log_index = std::max(last_flush_index_.GetLogIndex(), flushed_logindex);
auto lastest_flush_sequence_number = std::max(last_flush_index_.GetSequenceNumber(), flushed_seqno);
last_flush_index_.SetLogIndexSeqnoPair(lastest_flush_log_index, lastest_flush_sequence_number);
}

private:
std::array<LogIndexPair, kColumnFamilyNum> cf_;
std::atomic<LogIndex> last_min_flushed_logindex_ = 0;
std::atomic<SequenceNumber> last_min_flushed_seqno_ = 0;
LogIndexSeqnoPair last_flush_index_;
};

class LogIndexAndSequenceCollector {
Expand Down Expand Up @@ -204,39 +208,25 @@ class LogIndexTablePropertiesCollectorFactory : public rocksdb::TablePropertiesC

class LogIndexAndSequenceCollectorPurger : public rocksdb::EventListener {
public:
explicit LogIndexAndSequenceCollectorPurger(std::vector<rocksdb::ColumnFamilyHandle*>* column_families, LogIndexAndSequenceCollector *collector, LogIndexOfColumnFamilies *cf)
explicit LogIndexAndSequenceCollectorPurger(std::vector<rocksdb::ColumnFamilyHandle *> *column_families,
LogIndexAndSequenceCollector *collector, LogIndexOfColumnFamilies *cf)
: column_families_(column_families), collector_(collector), cf_(cf) {}

void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &flush_job_info) override {
// 做三件事情.
// 1) 更新一些 CF 的 Flushed 点位.
// 首先对当前 Flush Completed CF 的 Flushed LogIndex 设置一个可能不准确的 LogIndex(当步长 > 1) 和一个准确的 SequenceNumber.
cf_->SetFlushedLogIndex(flush_job_info.cf_id, collector_->FindAppliedLogIndex(flush_job_info.largest_seqno),
flush_job_info.largest_seqno);
LogIndex smallest_applied_log_index, smallest_flushed_log_index;
SequenceNumber smallest_flushed_seqno;
int smallest_applied_log_index_cf, smallest_flushed_log_index_cf;
// 从所有 CF 中寻找 Min*数据. 在寻找的过程中跳过"没有数据的 CF", 只考虑有数据的 CF.
// 此时返回的 Seqno 可能不是跟 Flushed LogIndex 是不匹配的, 但是可以保证是所有 CF 中最小的,
// 但是我们对精确性要求不高, 可以忍受.
std::tie(smallest_flushed_log_index_cf, smallest_flushed_log_index, smallest_flushed_seqno,
smallest_applied_log_index_cf, smallest_applied_log_index) = cf_->GetSmallestLogIndex();

// 此时 LastMinFlushed 点位可能需要更新, 同时对 "不包含数据的 CF 提升 Flushed 点位"
cf_->SetFlushedLogIndexGlobal(smallest_flushed_log_index, smallest_flushed_seqno);

auto [smallest_applied_log_index_cf, smallest_applied_log_index, smallest_flushed_log_index_cf,
smallest_flushed_log_index, smallest_flushed_seqno] = cf_->GetSmallestLogIndex();

cf_->SetFlushedLogIndexGlobal(smallest_flushed_log_index, smallest_flushed_seqno);
auto count = count_.fetch_add(1);
// 2) 统计累计的数据量, 主动调用 snapshot, 保存快照.
std::unique_lock<std::mutex> Lock(mutex_);
total_size_ += (flush_job_info.table_properties.raw_value_size + flush_job_info.table_properties.raw_key_size);
if (total_size_ >= kMaxTotalSize) {
// TODO(dingxiaoshuai) 主动触发 snapshot
total_size_ = 0;

if (count % 10 == 0) {
// TODO(dingxiaoshuai) 主动触发 snapshot 截断日志
}
Lock.unlock();

// 3) 防止 collector 队列过长.
// 周期性的对队列长度进行检查, 如果存在长时间没有 flush 的 CF, 主动 Flush .
auto count = count_.fetch_add(1);
auto is_flushing = manul_flushing_.load();
if (is_flushing || count % kColumnFamilyNum != 0 || !cf_->IsPendingFlush()) {
return;
Expand All @@ -245,7 +235,6 @@ class LogIndexAndSequenceCollectorPurger : public rocksdb::EventListener {
return;
}
// default: wait = true, allow_write_stall = false.
// 同步做手动 Flush.
rocksdb::FlushOptions flush_option;
db->Flush(flush_option, column_families_->at(smallest_flushed_log_index_cf));
manul_flushing_.store(false);
Expand All @@ -257,8 +246,6 @@ class LogIndexAndSequenceCollectorPurger : public rocksdb::EventListener {
LogIndexOfColumnFamilies *cf_ = nullptr;
std::atomic<uint64_t> count_ = 0;
std::atomic<bool> manul_flushing_ = false;
std::mutex mutex_;
int64_t total_size_ = 0;
};

} // namespace storage

0 comments on commit c1d6ea8

Please sign in to comment.