Skip to content

Commit

Permalink
feat: add log index purger event listener to rocksdb
Browse files Browse the repository at this point in the history
  • Loading branch information
longfar-ncy committed Apr 10, 2024
1 parent 5b622b3 commit 834534d
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 20 deletions.
10 changes: 0 additions & 10 deletions src/storage/src/log_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@ rocksdb::Status storage::LogIndexOfCF::Init(Redis *db) {
if (!s.ok()) {
return s;
}
// LogIndex max_applied_log_index{};
// LogIndex max_flushed_log_index{};
// for (const auto &[_, props] : collection) {
// assert(props->column_family_id == i);
// auto res = LogIndexTablePropertiesCollector::ReadStatsFromTableProps(props);
// if (res.has_value()) {
// max_applied_log_index = std::max(max_applied_log_index, res->GetAppliedLogIndex());
// max_flushed_log_index = std::max(max_flushed_log_index, res->GetAppliedLogIndex());
// }
// }
auto res = LogIndexTablePropertiesCollector::GetLargestLogIndexFromTableCollection(collection);
if (res.has_value()) {
cf_[i].applied_log_index.store(res->GetAppliedLogIndex());
Expand Down
10 changes: 0 additions & 10 deletions src/storage/src/log_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ class LogIndexAndSequenceCollector {
void Update(LogIndex smallest_applied_log_index, SequenceNumber smallest_flush_seqno);

// purge out dated log index after memtable flushed.
// void Purge(LogIndex smallest_applied_log_index, LogIndex smallest_flushed_log_index) {
// std::lock_guard<std::mutex> guard(mutex_);
// Purge(list_, smallest_applied_log_index, smallest_flushed_log_index);
// // Purge(skip_list_, smallest_applied_log_index, smallest_flushed_log_index);
// }
void Purge(LogIndex smallest_flushed_log_index);

private:
Expand Down Expand Up @@ -139,7 +134,6 @@ class LogIndexTablePropertiesCollector : public rocksdb::TablePropertiesCollecto
const LogIndexAndSequenceCollector &collector_;
SequenceNumber largest_seqno_ = 0;
mutable LogIndex cache_{-1};
// SequenceNumber smallest_seqno_ = 0;
};

class LogIndexTablePropertiesCollectorFactory : public rocksdb::TablePropertiesCollectorFactory {
Expand All @@ -165,10 +159,6 @@ class LogIndexAndSequenceCollectorPurger : public rocksdb::EventListener {

void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &flush_job_info) override {
cf_->SetFlushedLogIndex(flush_job_info.cf_id, collector_->FindAppliedLogIndex(flush_job_info.largest_seqno));
// auto smallest_applied_log_index = cf_->GetSmallestAppliedLogIndex();
// auto smallest_flushed_log_index = cf_->GetSmallestFlushedLogIndex();
// collector_->Purge(smallest_applied_log_index, smallest_flushed_log_index);

auto smallest_flushed_log_index = cf_->GetSmallestFlushedLogIndex();
collector_->Purge(smallest_flushed_log_index);
}
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_
zset_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_data_cf_table_ops));
zset_score_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_score_cf_table_ops));

// Add log index table property collector factory to each column family
if (append_log_function_) {
ADD_TABLE_PROPERTY_COLLECTOR_FACTORY(string);
ADD_TABLE_PROPERTY_COLLECTOR_FACTORY(hash_meta);
Expand All @@ -153,6 +154,9 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_
ADD_TABLE_PROPERTY_COLLECTOR_FACTORY(zset_data);
ADD_TABLE_PROPERTY_COLLECTOR_FACTORY(zset_score);
}
// Add a listener on flush to purge log index collector
db_ops.listeners.push_back(
std::make_shared<LogIndexAndSequenceCollectorPurger>(&log_index_collector_, &log_index_of_all_cfs_));

std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, string_cf_ops);
Expand Down

0 comments on commit 834534d

Please sign in to comment.