Skip to content

Commit

Permalink
feat: add gtest for flush oldest cf
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Apr 26, 2024
1 parent c675ba9 commit c7b38d4
Show file tree
Hide file tree
Showing 7 changed files with 494 additions and 22 deletions.
4 changes: 2 additions & 2 deletions cmake/zlib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ INCLUDE(ExternalProject)

SET(ZLIB_SOURCES_DIR ${THIRD_PARTY_PATH}/zlib)
SET(ZLIB_INSTALL_DIR ${THIRD_PARTY_PATH}/install/zlib)
SET(ZLIB_ROOT ${ZLIB_INSTALL_DIR} CACHE FILEPATH "zlib root directory." FORCE)
SET(ZLIB_INCLUDE_DIR "${ZLIB_INSTALL_DIR}/include" CACHE PATH "zlib include directory." FORCE)
# SET(ZLIB_ROOT ${ZLIB_INSTALL_DIR} CACHE FILEPATH "zlib root directory." FORCE)
# SET(ZLIB_INCLUDE_DIR "${ZLIB_INSTALL_DIR}/include" CACHE PATH "zlib include directory." FORCE)

ExternalProject_Add(
extern_zlib
Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ struct StorageOptions {
int db_id = 0;
AppendLogFunction append_log_function = nullptr;
uint32_t raft_timeout_s = std::numeric_limits<uint32_t>::max();
int64_t max_gap = 1000;
uint64_t mem_manager_size = 100000000;
Status ResetOptions(const OptionType& option_type, const std::unordered_map<std::string, std::string>& options_map);
};

Expand Down
25 changes: 20 additions & 5 deletions src/storage/src/log_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

namespace storage {

static constexpr int64_t kGapMax = 1000;

rocksdb::Status storage::LogIndexOfColumnFamilies::Init(Redis *db) {
for (int i = 0; i < cf_.size(); i++) {
rocksdb::TablePropertiesCollection collection;
Expand All @@ -35,10 +33,10 @@ rocksdb::Status storage::LogIndexOfColumnFamilies::Init(Redis *db) {
return Status::OK();
}

LogIndexOfColumnFamilies::SmallestIndexRes LogIndexOfColumnFamilies::GetSmallestLogIndex() const {
LogIndexOfColumnFamilies::SmallestIndexRes LogIndexOfColumnFamilies::GetSmallestLogIndex(int flush_cf) const {
SmallestIndexRes res;
for (int i = 0; i < cf_.size(); i++) {
if (cf_[i].flushed_index <= last_flush_index_ && cf_[i].flushed_index == cf_[i].applied_index) {
if (i != flush_cf && cf_[i].flushed_index == cf_[i].applied_index) {
continue;
}
auto applied_log_index = cf_[i].applied_index.GetLogIndex();
Expand Down Expand Up @@ -69,9 +67,26 @@ bool LogIndexOfColumnFamilies::IsPendingFlush() const {
}
auto iter_first = s.begin();
auto iter_last = s.end();
return *std::prev(iter_last) - *iter_first >= kGapMax;
return *std::prev(iter_last) - *iter_first >= max_gap_.load();
};

size_t LogIndexOfColumnFamilies::GetPendingFlushGap() const {
std::set<int> s;
for (int i = 0; i < kColumnFamilyNum; i++) {
s.insert(cf_[i].applied_index.GetLogIndex());
s.insert(cf_[i].flushed_index.GetLogIndex());
}
assert(!s.empty());
if (s.size() == 1) {
return false;
}
auto iter_first = s.begin();
auto iter_last = s.end();
return *std::prev(iter_last) - *iter_first;
};

std::atomic_int64_t LogIndexOfColumnFamilies::max_gap_ = 1000;

std::optional<LogIndexAndSequencePair> storage::LogIndexTablePropertiesCollector::ReadStatsFromTableProps(
const std::shared_ptr<const rocksdb::TableProperties> &table_props) {
const auto &user_properties = table_props->user_collected_properties;
Expand Down
53 changes: 42 additions & 11 deletions src/storage/src/log_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <atomic>
#include <deque>
#include <functional>
#include <future>
#include <optional>
#include <shared_mutex>
#include <string_view>
Expand Down Expand Up @@ -62,6 +63,8 @@ struct LogIndexSeqnoPair {
bool operator==(const LogIndexSeqnoPair &other) const { return seqno.load() == other.seqno.load(); }

bool operator<=(const LogIndexSeqnoPair &other) const { return seqno.load() <= other.seqno.load(); }

bool operator<(const LogIndexSeqnoPair &other) const { return seqno.load() < other.seqno.load(); }
};

class LogIndexOfColumnFamilies {
Expand All @@ -83,10 +86,10 @@ class LogIndexOfColumnFamilies {
// Read the largest log index of each column family from all sst files
rocksdb::Status Init(Redis *db);

SmallestIndexRes GetSmallestLogIndex() const;
SmallestIndexRes GetSmallestLogIndex(int flush_cf) const;

void SetFlushedLogIndex(size_t cf_id, LogIndex log_index, SequenceNumber seqno) {
cf_[cf_id].applied_index.log_index.store(std::max(cf_[cf_id].applied_index.log_index.load(), log_index));
cf_[cf_id].flushed_index.log_index.store(std::max(cf_[cf_id].flushed_index.log_index.load(), log_index));
cf_[cf_id].flushed_index.seqno.store(std::max(cf_[cf_id].flushed_index.seqno.load(), seqno));
}

Expand Down Expand Up @@ -117,12 +120,20 @@ class LogIndexOfColumnFamilies {
}
bool IsPendingFlush() const;

size_t GetPendingFlushGap() const;

void SetLastFlushIndex(LogIndex flushed_logindex, SequenceNumber flushed_seqno) {
auto lastest_flush_log_index = std::max(last_flush_index_.GetLogIndex(), flushed_logindex);
auto lastest_flush_sequence_number = std::max(last_flush_index_.GetSequenceNumber(), flushed_seqno);
last_flush_index_.SetLogIndexSeqnoPair(lastest_flush_log_index, lastest_flush_sequence_number);
}

LogIndexSeqnoPair &GetLastFlushIndex() { return last_flush_index_; }

LogIndexPair &GetCFStatus(size_t cf) { return cf_[cf]; }

static std::atomic_int64_t max_gap_;

private:
std::array<LogIndexPair, kColumnFamilyNum> cf_;
LogIndexSeqnoPair last_flush_index_;
Expand All @@ -141,6 +152,16 @@ class LogIndexAndSequenceCollector {
// purge out dated log index after memtable flushed.
void Purge(LogIndex smallest_applied_log_index);

uint64_t GetSize() const {
std::shared_lock<std::shared_mutex> share_lock;
return list_.size();
}

std::deque<LogIndexAndSequencePair> &GetList() {
std::shared_lock<std::shared_mutex> share_lock;
return list_;
}

private:
uint64_t step_length_mask_ = 0;
mutable std::shared_mutex mutex_;
Expand Down Expand Up @@ -214,34 +235,44 @@ class LogIndexAndSequenceCollectorPurger : public rocksdb::EventListener {
flush_job_info.largest_seqno);

auto [smallest_applied_log_index_cf, smallest_applied_log_index, smallest_flushed_log_index_cf,
smallest_flushed_log_index, smallest_flushed_seqno] = cf_->GetSmallestLogIndex();
smallest_flushed_log_index, smallest_flushed_seqno] = cf_->GetSmallestLogIndex(flush_job_info.cf_id);
collector_->Purge(smallest_applied_log_index);

cf_->SetFlushedLogIndexGlobal(smallest_flushed_log_index, smallest_flushed_seqno);
if (smallest_flushed_log_index_cf != -1) {
cf_->SetFlushedLogIndexGlobal(smallest_flushed_log_index, smallest_flushed_seqno);
}
auto count = count_.fetch_add(1);

if (count % 10 == 0) {
// TODO(dingxiaoshuai) 主动触发 snapshot 截断日志
}
if (flush_job_info.cf_id == manul_flushing_cf_.load()) {
manul_flushing_cf_.store(-1);
}

auto is_flushing = manul_flushing_.load();
if (is_flushing || count % kColumnFamilyNum != 0 || !cf_->IsPendingFlush()) {
auto flushing_cf = manul_flushing_cf_.load();
if (flushing_cf != -1 || !cf_->IsPendingFlush()) {
return;
}
if (!manul_flushing_.compare_exchange_strong(is_flushing, true)) {

assert(flushing_cf == -1);

if (!manul_flushing_cf_.compare_exchange_strong(flushing_cf, smallest_flushed_log_index_cf)) {
return;
}
// default: wait = true, allow_write_stall = false.

assert(manul_flushing_cf_.load() == smallest_flushed_log_index_cf);
rocksdb::FlushOptions flush_option;
flush_option.wait = false;
db->Flush(flush_option, column_families_->at(smallest_flushed_log_index_cf));
manul_flushing_.store(false);
}

private:
std::vector<rocksdb::ColumnFamilyHandle *> *column_families_;
LogIndexAndSequenceCollector *collector_ = nullptr;
LogIndexOfColumnFamilies *cf_ = nullptr;
std::atomic<uint64_t> count_ = 0;
std::atomic<bool> manul_flushing_ = false;
std::atomic_uint64_t count_ = 0;
std::atomic<size_t> manul_flushing_cf_ = -1;
};

} // namespace storage
4 changes: 4 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ class Redis {
return nullptr;
}

LogIndexOfColumnFamilies& GetLogIndexOfColumnFamilies() { return log_index_of_all_cfs_; }

LogIndexAndSequenceCollector& GetCollector() { return log_index_collector_; }

private:
int32_t index_ = 0;
Storage* const storage_;
Expand Down
10 changes: 6 additions & 4 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ Status Storage::Open(const StorageOptions& storage_options, const std::string& d
mkpath(db_path.c_str(), 0755);
db_instance_num_ = storage_options.db_instance_num;
// Temporarily set to 100000
storage_options.options.write_buffer_manager = std::make_shared<rocksdb::WriteBufferManager>(100000);
LogIndexOfColumnFamilies::max_gap_.store(storage_options.max_gap);
storage_options.options.write_buffer_manager =
std::make_shared<rocksdb::WriteBufferManager>(storage_options.mem_manager_size);
for (size_t index = 0; index < db_instance_num_; index++) {
insts_.emplace_back(std::make_unique<Redis>(this, index));
Status s = insts_.back()->Open(storage_options, AppendSubDirectory(db_path, index));
Expand Down Expand Up @@ -2335,6 +2337,8 @@ Status Storage::OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx) {

rocksdb::WriteBatch batch;
bool is_finished_start = true;
// 提前获取 seq, 每次自增, 需要保证该操作串行执行?
auto seqno = inst->GetDB()->GetLatestSequenceNumber();
for (const auto& entry : log.entries()) {
if (inst->IsRestarting() && inst->IsApplied(entry.cf_idx(), log_idx)) [[unlikely]] {
// If the starting phase is over, the log must not have been applied
Expand All @@ -2358,9 +2362,7 @@ Status Storage::OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx) {
ERROR(msg);
return Status::Incomplete(msg);
}

auto seqno = inst->GetDB()->GetLatestSequenceNumber() + 1;
inst->UpdateAppliedLogIndexOfColumnFamily(entry.cf_idx(), log_idx, seqno);
inst->UpdateAppliedLogIndexOfColumnFamily(entry.cf_idx(), log_idx, ++seqno);
}
if (inst->IsRestarting() && is_finished_start) [[unlikely]] {
INFO("Redis {} finished start phase", inst->GetIndex());
Expand Down
Loading

0 comments on commit c7b38d4

Please sign in to comment.