diff --git a/src/storage/src/log_index.cc b/src/storage/src/log_index.cc index 3bcadde22..28d980bc9 100644 --- a/src/storage/src/log_index.cc +++ b/src/storage/src/log_index.cc @@ -22,16 +22,6 @@ rocksdb::Status storage::LogIndexOfCF::Init(Redis *db) { if (!s.ok()) { return s; } - // LogIndex max_applied_log_index{}; - // LogIndex max_flushed_log_index{}; - // for (const auto &[_, props] : collection) { - // assert(props->column_family_id == i); - // auto res = LogIndexTablePropertiesCollector::ReadStatsFromTableProps(props); - // if (res.has_value()) { - // max_applied_log_index = std::max(max_applied_log_index, res->GetAppliedLogIndex()); - // max_flushed_log_index = std::max(max_flushed_log_index, res->GetAppliedLogIndex()); - // } - // } auto res = LogIndexTablePropertiesCollector::GetLargestLogIndexFromTableCollection(collection); if (res.has_value()) { cf_[i].applied_log_index.store(res->GetAppliedLogIndex()); diff --git a/src/storage/src/log_index.h b/src/storage/src/log_index.h index 49fc2b8b8..189f7f1ee 100644 --- a/src/storage/src/log_index.h +++ b/src/storage/src/log_index.h @@ -87,11 +87,6 @@ class LogIndexAndSequenceCollector { void Update(LogIndex smallest_applied_log_index, SequenceNumber smallest_flush_seqno); // purge out dated log index after memtable flushed. - // void Purge(LogIndex smallest_applied_log_index, LogIndex smallest_flushed_log_index) { - // std::lock_guard guard(mutex_); - // Purge(list_, smallest_applied_log_index, smallest_flushed_log_index); - // // Purge(skip_list_, smallest_applied_log_index, smallest_flushed_log_index); - // } void Purge(LogIndex smallest_flushed_log_index); private: @@ -139,7 +134,6 @@ class LogIndexTablePropertiesCollector : public rocksdb::TablePropertiesCollecto const LogIndexAndSequenceCollector &collector_; SequenceNumber largest_seqno_ = 0; mutable LogIndex cache_{-1}; - // SequenceNumber smallest_seqno_ = 0; }; class LogIndexTablePropertiesCollectorFactory : public rocksdb::TablePropertiesCollectorFactory { @@ -165,10 +159,6 @@ class LogIndexAndSequenceCollectorPurger : public rocksdb::EventListener { void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &flush_job_info) override { cf_->SetFlushedLogIndex(flush_job_info.cf_id, collector_->FindAppliedLogIndex(flush_job_info.largest_seqno)); - // auto smallest_applied_log_index = cf_->GetSmallestAppliedLogIndex(); - // auto smallest_flushed_log_index = cf_->GetSmallestFlushedLogIndex(); - // collector_->Purge(smallest_applied_log_index, smallest_flushed_log_index); - auto smallest_flushed_log_index = cf_->GetSmallestFlushedLogIndex(); collector_->Purge(smallest_flushed_log_index); } diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 46ad7c27f..001f49363 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -141,6 +141,7 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_ zset_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_data_cf_table_ops)); zset_score_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_score_cf_table_ops)); + // Add log index table property collector factory to each column family if (append_log_function_) { ADD_TABLE_PROPERTY_COLLECTOR_FACTORY(string); ADD_TABLE_PROPERTY_COLLECTOR_FACTORY(hash_meta); @@ -153,6 +154,9 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_ ADD_TABLE_PROPERTY_COLLECTOR_FACTORY(zset_data); ADD_TABLE_PROPERTY_COLLECTOR_FACTORY(zset_score); } + // Add a listener on flush to purge log index collector + db_ops.listeners.push_back( + std::make_shared(&log_index_collector_, &log_index_of_all_cfs_)); std::vector column_families; column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, string_cf_ops);