Skip to content

Commit

Permalink
feat(slog): apply and remove shared logs
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan authored and wangdan committed Sep 11, 2023
1 parent 3930997 commit 8718310
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 108 deletions.
52 changes: 27 additions & 25 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1403,36 +1403,37 @@ int mutation_log::garbage_collection(gpid gpid,

namespace {

struct stop_replica_gc_info
struct gc_summary_info
{
dsn::gpid pid;
int file_index = 0;
dsn::replication::decree decree_gap = 0;
int smallest_file_index = 0;
dsn::replication::decree max_decree_gap = 0;
dsn::replication::decree garbage_max_decree = 0;
dsn::replication::decree log_max_decree = 0;
dsn::replication::decree slog_max_decree = 0;

std::string to_string() const
{
return fmt::format("stop_replica_gc_info = [pid = {}, file_index = {}, decree_gap = {}, "
"garbage_max_decree = {}, log_max_decree = {}]",
pid,
file_index,
decree_gap,
garbage_max_decree,
log_max_decree);
return fmt::format(
"gc_summary_info = [pid = {}, smallest_file_index = {}, max_decree_gap = {}, "
"garbage_max_decree = {}, slog_max_decree = {}]",
pid,
smallest_file_index,
max_decree_gap,
garbage_max_decree,
slog_max_decree);
}

friend std::ostream &operator<<(std::ostream &os, const stop_replica_gc_info &stop_replica_gc)
friend std::ostream &operator<<(std::ostream &os, const gc_summary_info &gc_summary)
{
return os << stop_replica_gc.to_string();
return os << gc_summary.to_string();
}
};

bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_decrees,
const dsn::replication::log_file_ptr &file,
const dsn::gpid &pid,
const dsn::replication::replica_log_info &replica_durable_info,
stop_replica_gc_info &stop_replica_gc)
gc_summary_info &gc_summary)
{
const auto &garbage_max_decree = replica_durable_info.max_decree;
const auto &valid_start_offset = replica_durable_info.valid_start_offset;
Expand Down Expand Up @@ -1489,13 +1490,14 @@ bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_
garbage_max_decree);

auto gap = it->second.max_decree - garbage_max_decree;
if (file->index() < stop_replica_gc.file_index || gap > stop_replica_gc.decree_gap) {
// Record the max decree gap between the garbage max decree and the oldest log file.
stop_replica_gc.pid = pid;
stop_replica_gc.file_index = file->index();
stop_replica_gc.decree_gap = gap;
stop_replica_gc.garbage_max_decree = garbage_max_decree;
stop_replica_gc.log_max_decree = it->second.max_decree;
if (file->index() < gc_summary.smallest_file_index || gap > gc_summary.max_decree_gap) {
// Find the oldest file of this round of iteration for gc of slog files, with the
// max decree gap between the garbage max decree and the oldest slog file.
gc_summary.pid = pid;
gc_summary.smallest_file_index = file->index();
gc_summary.max_decree_gap = gap;
gc_summary.garbage_max_decree = garbage_max_decree;
gc_summary.slog_max_decree = it->second.max_decree;
}

return false;
Expand Down Expand Up @@ -1536,7 +1538,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
// iterating, `mark_it` would point to the newest file that could be deleted).
log_file_map::reverse_iterator mark_it;
std::set<gpid> kickout_replicas;
stop_replica_gc_info stop_replica_gc;
gc_summary_info gc_summary;
for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
const auto &file = mark_it->second;
CHECK_EQ(mark_it->first, file->index());
Expand All @@ -1553,7 +1555,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
file,
replica_durable_info.first,
replica_durable_info.second,
stop_replica_gc)) {
gc_summary)) {
// Log files before this file is useless for this replica,
// so from now on, this replica would not be considered any more.
kickout_replicas.insert(replica_durable_info.first);
Expand All @@ -1578,7 +1580,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
// There's no file that could be deleted.
LOG_INFO("gc_shared: no file can be deleted: {}, {}, prevent_gc_replicas = {}",
reserved_slog,
stop_replica_gc,
gc_summary,
prevent_gc_replicas.size());
return;
}
Expand All @@ -1591,7 +1593,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
LOG_INFO("gc_shared: deleted some files: {}, {}, {}, prevent_gc_replicas = {}",
reserved_slog,
slog_deletion,
stop_replica_gc,
gc_summary,
prevent_gc_replicas.size());
}

Expand Down
24 changes: 12 additions & 12 deletions src/replica/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,15 @@ class mutation_log : public ref_counter
int64_t reserve_max_size,
int64_t reserve_max_time);

// TODO(wangdan): fix comments
// Garbage collection for shared log.
// `prevent_gc_replicas' will store replicas which prevent log files from being deleted
// for gc.
//
// garbage collection for shared log, returns reserved file count.
// `prevent_gc_replicas' will store replicas which prevent log files out of `file_count_limit'
// to be deleted.
// remove log files if satisfy:
// - for each replica "r":
// r is not in file.max_decree
// || file.max_decree[r] <= replica_durable_decrees[r].max_decree
// || file.end_offset[r] <= replica_durable_decrees[r].valid_start_offset
// - the current log file should not be removed
// thread safe
// Since slog had been deprecated, no new slog files would be created. Therefore, our
// target is to remove all of the existing slog files according to the progressive durable
// decree for each replica.
//
// Thread safe.
void garbage_collection(const replica_log_info_map &replica_durable_decrees,
std::set<gpid> &prevent_gc_replicas);

Expand Down Expand Up @@ -401,7 +398,10 @@ class mutation_log : public ref_counter
};

using log_file_map = std::map<int, log_file_ptr>;
void remove_obsolete_slog_files(const int largest_log_to_delete,

// Closing and remove all of slog files that are smaller (i.e. older) than the largest
// file index.
void remove_obsolete_slog_files(const int largest_file_index_to_delete,
log_file_map &files,
reserved_slog_info &reserved_log,
slog_deletion_info &log_deletion);
Expand Down
126 changes: 64 additions & 62 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1778,69 +1778,52 @@ void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
}
}

void replica_stub::gc_slog(const replica_gc_map &rs)
void replica_stub::gc_slog(const replica_gc_info_map &replica_gc_map)
{
if (_log == nullptr) {
return;
}

// TODO(wangdan): fix comments
//
// gc shared prepare log
//
// Now that checkpoint is very important for gc, we must be able to trigger checkpoint when
// necessary.
// that is, we should be able to trigger memtable flush when necessary.
//
// How to trigger memtable flush?
// we add a parameter `is_emergency' in dsn_app_async_checkpoint() function, when set true,
// the undering storage system should flush memtable as soon as possiable.
//
// When to trigger memtable flush?
// 1. Using `[replication].checkpoint_max_interval_hours' option, we can set max interval time
// of two adjacent checkpoints; If the time interval is arrived, then emergency checkpoint
// will be triggered.
// 2. Using `[replication].log_shared_file_count_limit' option, we can set max file count of
// shared log; If the limit is exceeded, then emergency checkpoint will be triggered; Instead
// of triggering all replicas to do checkpoint, we will only trigger a few of necessary
// replicas which block garbage collection of the oldest log file.
//

replica_log_info_map replica_durable_decrees;
for (auto &kv : rs) {
replica_log_info ri;
auto &rep = kv.second.rep;
auto &plog = kv.second.plog;
for (auto &replica_gc : replica_gc_map) {
replica_log_info replica_log;
auto &rep = replica_gc.second.rep;
auto &plog = replica_gc.second.plog;
if (plog) {
// flush private log to update plog_max_commit_on_disk,
// and just flush once to avoid flushing infinitely
// Flush private log to update `plog_max_commit_on_disk`, and just flush once
// to avoid flushing infinitely.
plog->flush_once();

auto plog_max_commit_on_disk = plog->max_commit_on_disk();
ri.max_decree = std::min(kv.second.last_durable_decree, plog_max_commit_on_disk);

replica_log.max_decree =
std::min(replica_gc.second.last_durable_decree, plog_max_commit_on_disk);
LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, "
"last_durable_decree= {}, plog_max_commit_on_disk = {}",
rep->name(),
enum_to_string(kv.second.status),
ri.max_decree,
kv.second.last_durable_decree,
enum_to_string(replica_gc.second.status),
replica_log.max_decree,
replica_gc.second.last_durable_decree,
plog_max_commit_on_disk);
} else {
ri.max_decree = kv.second.last_durable_decree;
replica_log.max_decree = replica_gc.second.last_durable_decree;
LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, "
"last_durable_decree = {}",
rep->name(),
enum_to_string(kv.second.status),
ri.max_decree,
kv.second.last_durable_decree);
enum_to_string(replica_gc.second.status),
replica_log.max_decree,
replica_gc.second.last_durable_decree);
}
ri.valid_start_offset = kv.second.init_offset_in_shared_log;
replica_durable_decrees[kv.first] = ri;
replica_log.valid_start_offset = replica_gc.second.init_offset_in_shared_log;
replica_durable_decrees[replica_gc.first] = replica_log;
}

// Garbage collection for shared log files.
std::set<gpid> prevent_gc_replicas;
_log->garbage_collection(replica_durable_decrees, prevent_gc_replicas);
flush_replicas_for_slog_gc(rs, prevent_gc_replicas);

// Trigger checkpoint to flush memtables once some replicas were found that prevent
// slog files from being removed for gc.
flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);

auto total_size = _log->total_size();
_counter_shared_log_size->set(total_size / (1024 * 1024));
Expand Down Expand Up @@ -1903,9 +1886,24 @@ void replica_stub::limit_flush_replicas_for_slog_gc(size_t prevent_gc_replica_co
std::min(log_shared_gc_flush_replicas_limit, _real_log_shared_gc_flush_replicas_limit << 1);
}

void replica_stub::flush_replicas_for_slog_gc(const replica_gc_map &rs,
void replica_stub::flush_replicas_for_slog_gc(const replica_gc_info_map &replica_gc_map,
const std::set<gpid> &prevent_gc_replicas)
{
// Trigger checkpoints to flush memtables once some replicas were found that prevent slog files
// from being removed for gc.
//
// How to trigger memtable flush ?
// A parameter `is_emergency' was added for `replica::background_async_checkpoint()` function;
// once it's set true, underlying storage engine would flush memtable as soon as possiable.
//
// When memtable flush is triggered ?
// 1. After a fixed interval (specified by `[replication].gc_interval_ms` option), try to find
// if there are some replicas preventing slog files from being removed for gc; if any, all of
// them would be deleted "gradually" ("gradually" means the number of the replicas whose
// memtables are submitted to storage engine to be flushed would be limited).
// 2. `[replication].checkpoint_max_interval_hours' option specified the max interval between
// the two adjacent checkpoints.

if (prevent_gc_replicas.empty()) {
return;
}
Expand All @@ -1931,8 +1929,8 @@ void replica_stub::flush_replicas_for_slog_gc(const replica_gc_map &rs,

i = 0;
for (const auto &pid : prevent_gc_replicas) {
const auto &r = rs.find(pid);
if (r == rs.end()) {
const auto &replica_gc = replica_gc_map.find(pid);
if (replica_gc == replica_gc_map.end()) {
continue;
}

Expand All @@ -1952,35 +1950,39 @@ void replica_stub::flush_replicas_for_slog_gc(const replica_gc_map &rs,
continue;
}

tasking::enqueue(LPC_PER_REPLICA_CHECKPOINT_TIMER,
r->second.rep->tracker(),
std::bind(&replica_stub::trigger_checkpoint, this, r->second.rep, true),
pid.thread_hash(),
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2)));
tasking::enqueue(
LPC_PER_REPLICA_CHECKPOINT_TIMER,
replica_gc->second.rep->tracker(),
std::bind(&replica_stub::trigger_checkpoint, this, replica_gc->second.rep, true),
pid.thread_hash(),
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2)));
}
}

void replica_stub::on_gc()
{
uint64_t start = dsn_now_ns();

replica_gc_map rs;
replica_gc_info_map replica_gc_map;
{
zauto_read_lock l(_replicas_lock);
// collect info in lock to prevent the case that the replica is closed in replica::close()
for (const auto &kv : _replicas) {
const replica_ptr &rep = kv.second;
auto &info = rs[kv.first];
info.rep = rep;
info.status = rep->status();
info.plog = rep->private_log();
info.last_durable_decree = rep->last_durable_decree();
info.init_offset_in_shared_log = rep->get_app()->init_info().init_offset_in_shared_log;
// A replica was removed from _replicas before it would be closed by replica::close().
// Thus it's safe to use the replica after fetching its ref pointer from _replicas.
for (const auto &rep_pair : _replicas) {
const replica_ptr &rep = rep_pair.second;

auto &replica_gc = replica_gc_map[rep_pair.first];
replica_gc.rep = rep;
replica_gc.status = rep->status();
replica_gc.plog = rep->private_log();
replica_gc.last_durable_decree = rep->last_durable_decree();
replica_gc.init_offset_in_shared_log =
rep->get_app()->init_info().init_offset_in_shared_log;
}
}

LOG_INFO("start to garbage collection, replica_count = {}", rs.size());
gc_slog(rs);
LOG_INFO("start to garbage collection, replica_count = {}", replica_gc_map.size());
gc_slog(replica_gc_map);

// statistic learning info
uint64_t learning_count = 0;
Expand All @@ -1996,7 +1998,7 @@ void replica_stub::on_gc()
uint64_t splitting_max_duration_time_ms = 0;
uint64_t splitting_max_async_learn_time_ms = 0;
uint64_t splitting_max_copy_file_size = 0;
for (auto &kv : rs) {
for (auto &kv : replica_gc_map) {
replica_ptr &rep = kv.second.rep;
if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
learning_count++;
Expand Down
8 changes: 4 additions & 4 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,18 +360,18 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
replica_life_cycle get_replica_life_cycle(gpid id);
void on_gc_replica(replica_stub_ptr this_, gpid id);

struct gc_info
struct replica_gc_info
{
replica_ptr rep;
partition_status::type status;
mutation_log_ptr plog;
decree last_durable_decree;
int64_t init_offset_in_shared_log;
};
using replica_gc_map = std::unordered_map<gpid, gc_info>;
void gc_slog(const replica_gc_map &rs);
using replica_gc_info_map = std::unordered_map<gpid, replica_gc_info>;
void gc_slog(const replica_gc_info_map &replica_gc_map);
void limit_flush_replicas_for_slog_gc(size_t prevent_gc_replica_count);
void flush_replicas_for_slog_gc(const replica_gc_map &rs,
void flush_replicas_for_slog_gc(const replica_gc_info_map &replica_gc_map,
const std::set<gpid> &prevent_gc_replicas);

void response_client(gpid id,
Expand Down
6 changes: 3 additions & 3 deletions src/replica/test/mutation_log_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,9 @@ TEST_P(GcSlogFlushFeplicasTest, FlushReplicas)
last_limit,
expected_flush_replicas) = GetParam();

replica_stub::replica_gc_map rs;
replica_stub::replica_gc_info_map replica_gc_map;
for (const auto &r : prevent_gc_replicas) {
rs.emplace(r, replica_stub::gc_info());
replica_gc_map.emplace(r, replica_stub::replica_gc_info());
}

const auto reserved_log_shared_gc_flush_replicas_limit =
Expand All @@ -825,7 +825,7 @@ TEST_P(GcSlogFlushFeplicasTest, FlushReplicas)
stub._last_prevent_gc_replica_count = last_prevent_gc_replica_count;
stub._real_log_shared_gc_flush_replicas_limit = last_limit;

stub.flush_replicas_for_slog_gc(rs, prevent_gc_replicas);
stub.flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);
EXPECT_EQ(expected_flush_replicas, stub._mock_flush_replicas_for_test);

dsn::fail::teardown();
Expand Down
2 changes: 1 addition & 1 deletion src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ stateful = true
plog_force_flush = false

log_shared_file_size_mb = 128
log_shared_file_count_limit = 100
log_shared_gc_flush_replicas_limit = 64
log_shared_batch_buffer_kb = 0
log_shared_force_flush = false
log_shared_pending_size_throttling_threshold_kb = 0
Expand Down
Loading

0 comments on commit 8718310

Please sign in to comment.