Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: decouple flush region and CompactLog with a new FFI fn_try_flush_data #5283

Merged
merged 22 commits into from
Jul 7, 2022
Merged
88 changes: 60 additions & 28 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,64 @@ void KVStore::persistRegion(const Region & region, const RegionTaskLock & region
LOG_FMT_DEBUG(log, "Persist {} done", region.toString(false));
}

bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt)
{
auto region_task_lock = region_manager.genRegionTaskLock(region_id);
const RegionPtr curr_region_ptr = getRegion(region_id);
return canFlushRegionDataImpl(curr_region_ptr, false, false, tmt, region_task_lock);
}

bool KVStore::tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt)
{
auto region_task_lock = region_manager.genRegionTaskLock(region_id);
const RegionPtr curr_region_ptr = getRegion(region_id);
return canFlushRegionDataImpl(curr_region_ptr, true, try_until_succeed, tmt, region_task_lock);
}

bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock)
{
if (curr_region_ptr == nullptr)
{
throw Exception(fmt::format("region not found when trying flush", ErrorCodes::LOGICAL_ERROR));
}
auto & curr_region = *curr_region_ptr;

auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo();

LOG_FMT_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes);

bool can_flush = false;
if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, flush cache and persist mem data.
can_flush = true;
}
else
{
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
can_flush = !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
}
if (can_flush && flush_if_possible)
{
LOG_FMT_DEBUG(log, "{} flush region due to can_flush_data", curr_region.toString(false));
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed))
{
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return true;
}
else
{
return false;
}
}
return can_flush;
}

EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
raft_cmdpb::AdminCmdType cmd_type,
UInt64 curr_region_id,
Expand Down Expand Up @@ -360,39 +418,13 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
}
else
{
auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo();

LOG_FMT_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes);

if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, try to flush cache and persist mem data.
return true;
}
else
{
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
}
return canFlushRegionDataImpl(curr_region_ptr, true, /* try_until_succeed */ false, tmt, region_task_lock);
}
};

if (check_sync_log())
{
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false))
{
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
}
else
{
return EngineStoreApplyRes::None;
}
return EngineStoreApplyRes::Persist;
}
return EngineStoreApplyRes::None;
}
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ class KVStore final : private boost::noncopyable
TMTContext & tmt);
EngineStoreApplyRes handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt);

bool needFlushRegionData(UInt64 region_id, TMTContext & tmt);
bool tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt);

void handleApplySnapshot(metapb::Region && region, uint64_t peer_id, const SSTViewVec, uint64_t index, uint64_t term, TMTContext & tmt);

std::vector<UInt64> /* */ preHandleSnapshotToFiles(
Expand Down Expand Up @@ -219,6 +222,11 @@ class KVStore final : private boost::noncopyable
UInt64 term,
TMTContext & tmt);

/// Notice that if flush_if_possible is set to false, we only check if a flush is allowed by rowsize/size/interval.
/// It will not check if a flush will eventually succeed.
/// In other words, `canFlushRegionDataImpl(flush_if_possible=true)` can return false.
bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock);

void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller);
void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/Transaction/ProxyFFI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,34 @@ EngineStoreApplyRes HandleAdminRaftCmd(
}
}

uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id)
{
try
{
auto & kvstore = server->tmt->getKVStore();
return kvstore->needFlushRegionData(region_id, *server->tmt);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed)
{
try
{
auto & kvstore = server->tmt->getKVStore();
return kvstore->tryFlushRegionData(region_id, until_succeed, *server->tmt);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

static_assert(sizeof(RaftStoreProxyFFIHelper) == sizeof(TiFlashRaftProxyHelper));
static_assert(alignof(RaftStoreProxyFFIHelper) == alignof(TiFlashRaftProxyHelper));

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Transaction/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ EngineStoreApplyRes HandleAdminRaftCmd(
EngineStoreApplyRes HandleWriteRaftCmd(const EngineStoreServerWrap * server,
WriteCmdsView cmds,
RaftCmdHeader header);
uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id);
uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed);
void AtomicUpdateProxy(EngineStoreServerWrap * server, RaftStoreProxyFFIHelper * proxy);
void HandleDestroy(EngineStoreServerWrap * server, uint64_t region_id);
EngineStoreApplyRes HandleIngestSST(EngineStoreServerWrap * server, SSTViewVec snaps, RaftCmdHeader header);
Expand Down Expand Up @@ -158,6 +160,8 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper(
.fn_gen_cpp_string = GenCppRawString,
.fn_handle_write_raft_cmd = HandleWriteRaftCmd,
.fn_handle_admin_raft_cmd = HandleAdminRaftCmd,
.fn_need_flush_data = NeedFlushData,
.fn_try_flush_data = TryFlushData,
.fn_atomic_update_proxy = AtomicUpdateProxy,
.fn_handle_destroy = HandleDestroy,
.fn_handle_ingest_sst = HandleIngestSST,
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,12 @@ void RegionKVStoreTest::testKVStore()
ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin");
}
}
{
// There shall be data to flush.
ASSERT_EQ(kvs.needFlushRegionData(19, ctx.getTMTContext()), true);
// Force flush until succeed only for testing.
ASSERT_EQ(kvs.tryFlushRegionData(19, true, ctx.getTMTContext()), true);
}
}

void test_mergeresult()
Expand Down