Skip to content

Commit

Permalink
fead: Truncate log.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Apr 28, 2024
1 parent 32a227e commit 1f56258
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 46 deletions.
4 changes: 2 additions & 2 deletions cmake/braft.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ ExternalProject_Add(
extern_braft
${EXTERNAL_PROJECT_LOG_ARGS}
DEPENDS brpc
URL "https://github.com/pikiwidb/braft/archive/refs/heads/stable.zip"
URL_HASH SHA256=e73831f9768ac57d07f01ed81a11c8368e259c25315a960c29a6422f31f42fd1
GIT_REPOSITORY "https://github.com/pikiwidb/braft.git"
GIT_TAG master
PREFIX ${BRAFT_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
Expand Down
2 changes: 1 addition & 1 deletion src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index, bool is_sync) {
// TODO(panlei) Increase the self_log_index parameter
// TODO(panlei) Use the is_sync parameter to determine whether
// to use synchronous waiting.
node_->snapshot(&done);
node_->snapshot(&done, self_snapshot_index);
done.wait();
return done.status();
}
Expand Down
14 changes: 7 additions & 7 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ namespace pikiwidb {

#define RAFT_GROUPID_LEN 32

#define OK "+OK"
#define DATABASES_NUM "databases_num"
#define ROCKSDB_NUM "rocksdb_num"
#define ROCKSDB_VERSION "rocksdb_version"
#define WRONG_LEADER "-ERR wrong leader"
#define RAFT_GROUP_ID "raft_group_id:"
#define NOT_LEADER "Not leader"
constexpr const char* OK = "+OK";
constexpr const char* DATABASES_NUM = "databases_num";
constexpr const char* ROCKSDB_NUM = "rocksdb_num";
constexpr const char* ROCKSDB_VERSION = "rocksdb_version";
constexpr const char* WRONG_LEADER = "-ERR wrong leader";
constexpr const char* RAFT_GROUP_ID = "raft_group_id:";
constexpr const char* NOT_LEADER = "Not leader";

#define PRAFT PRaft::Instance()

Expand Down
42 changes: 42 additions & 0 deletions src/storage/src/log_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
#include <cinttypes>
#include <set>

#include "praft/praft.h"
#include "redis.h"

namespace storage {

using pikiwidb::PRaft;

rocksdb::Status storage::LogIndexOfColumnFamilies::Init(Redis *db) {
for (int i = 0; i < cf_.size(); i++) {
rocksdb::TablePropertiesCollection collection;
Expand Down Expand Up @@ -152,4 +155,43 @@ auto LogIndexTablePropertiesCollector::GetLargestLogIndexFromTableCollection(
: std::make_optional<LogIndexAndSequencePair>(max_flushed_log_index, seqno);
}

void LogIndexAndSequenceCollectorPurger::OnFlushCompleted(rocksdb::DB *db,
const rocksdb::FlushJobInfo &flush_job_info) {
cf_->SetFlushedLogIndex(flush_job_info.cf_id, collector_->FindAppliedLogIndex(flush_job_info.largest_seqno),
flush_job_info.largest_seqno);

auto [smallest_applied_log_index_cf, smallest_applied_log_index, smallest_flushed_log_index_cf,
smallest_flushed_log_index, smallest_flushed_seqno] = cf_->GetSmallestLogIndex(flush_job_info.cf_id);
collector_->Purge(smallest_applied_log_index);

if (smallest_flushed_log_index_cf != -1) {
cf_->SetFlushedLogIndexGlobal(smallest_flushed_log_index, smallest_flushed_seqno);
}
auto count = count_.fetch_add(1);

if (count % 10 == 0) {
PRAFT.DoSnapshot(smallest_flushed_log_index, false);
}

if (flush_job_info.cf_id == manul_flushing_cf_.load()) {
manul_flushing_cf_.store(-1);
}

auto flushing_cf = manul_flushing_cf_.load();
if (flushing_cf != -1 || !collector_->IsFlushPending()) {
return;
}

assert(flushing_cf == -1);

if (!manul_flushing_cf_.compare_exchange_strong(flushing_cf, smallest_flushed_log_index_cf)) {
return;
}

assert(manul_flushing_cf_.load() == smallest_flushed_log_index_cf);
rocksdb::FlushOptions flush_option;
flush_option.wait = false;
db->Flush(flush_option, column_families_->at(smallest_flushed_log_index_cf));
}

} // namespace storage
40 changes: 4 additions & 36 deletions src/storage/src/log_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rocksdb/listener.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"

#include "storage/storage_define.h"

namespace storage {
Expand Down Expand Up @@ -110,6 +111,7 @@ class LogIndexOfColumnFamilies {
bool IsApplied(size_t cf_id, LogIndex cur_log_index) const {
return cur_log_index < cf_[cf_id].applied_index.GetLogIndex();
}

void Update(size_t cf_id, LogIndex cur_log_index, SequenceNumber cur_seqno) {
if (cf_[cf_id].flushed_index <= last_flush_index_ && cf_[cf_id].flushed_index == cf_[cf_id].applied_index) {
auto flush_log_index = std::max(cf_[cf_id].flushed_index.GetLogIndex(), last_flush_index_.GetLogIndex());
Expand All @@ -120,6 +122,7 @@ class LogIndexOfColumnFamilies {

cf_[cf_id].applied_index.SetLogIndexSeqnoPair(cur_log_index, cur_seqno);
}

bool IsPendingFlush() const;

size_t GetPendingFlushGap() const;
Expand Down Expand Up @@ -238,42 +241,7 @@ class LogIndexAndSequenceCollectorPurger : public rocksdb::EventListener {
LogIndexAndSequenceCollector *collector, LogIndexOfColumnFamilies *cf)
: column_families_(column_families), collector_(collector), cf_(cf) {}

void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &flush_job_info) override {
cf_->SetFlushedLogIndex(flush_job_info.cf_id, collector_->FindAppliedLogIndex(flush_job_info.largest_seqno),
flush_job_info.largest_seqno);

auto [smallest_applied_log_index_cf, smallest_applied_log_index, smallest_flushed_log_index_cf,
smallest_flushed_log_index, smallest_flushed_seqno] = cf_->GetSmallestLogIndex(flush_job_info.cf_id);
collector_->Purge(smallest_applied_log_index);

if (smallest_flushed_log_index_cf != -1) {
cf_->SetFlushedLogIndexGlobal(smallest_flushed_log_index, smallest_flushed_seqno);
}
auto count = count_.fetch_add(1);

if (count % 10 == 0) {
// TODO(dingxiaoshuai) 主动触发 snapshot 截断日志
}
if (flush_job_info.cf_id == manul_flushing_cf_.load()) {
manul_flushing_cf_.store(-1);
}

auto flushing_cf = manul_flushing_cf_.load();
if (flushing_cf != -1 || !collector_->IsFlushPending()) {
return;
}

assert(flushing_cf == -1);

if (!manul_flushing_cf_.compare_exchange_strong(flushing_cf, smallest_flushed_log_index_cf)) {
return;
}

assert(manul_flushing_cf_.load() == smallest_flushed_log_index_cf);
rocksdb::FlushOptions flush_option;
flush_option.wait = false;
db->Flush(flush_option, column_families_->at(smallest_flushed_log_index_cf));
}
void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &flush_job_info) override;

private:
std::vector<rocksdb::ColumnFamilyHandle *> *column_families_;
Expand Down

0 comments on commit 1f56258

Please sign in to comment.