Skip to content

Commit

Permalink
Delete config 'disable_bg_flush ' (#4869)
Browse files Browse the repository at this point in the history
close #4641
  • Loading branch information
hzh0425 authored May 16, 2022
1 parent 07f39f9 commit e03dcc1
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 168 deletions.
1 change: 0 additions & 1 deletion dbms/src/Server/DTTool/DTTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ class ImitativeEnv

raft_config.ignore_databases = {"default", "system"};
raft_config.engine = TiDB::StorageEngine::DT;
raft_config.disable_bg_flush = true;
global_context->createTMTContext(raft_config, pingcap::ClusterConfig());

global_context->setDeltaIndexManager(1024 * 1024 * 100 /*100MB*/);
Expand Down
23 changes: 0 additions & 23 deletions dbms/src/Server/RaftConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,29 +82,6 @@ TiFlashRaftConfig TiFlashRaftConfig::parseSettings(Poco::Util::LayeredConfigurat
res.engine = DEFAULT_ENGINE;
}

/// "tmt" engine ONLY support disable_bg_flush = false.
/// "dt" engine ONLY support disable_bg_flush = true.

String disable_bg_flush_conf = "raft.disable_bg_flush";
if (res.engine == ::TiDB::StorageEngine::TMT)
{
if (config.has(disable_bg_flush_conf) && config.getBool(disable_bg_flush_conf))
throw Exception(
fmt::format("Illegal arguments: disable background flush while using engine {}", MutableSupport::txn_storage_name),
ErrorCodes::INVALID_CONFIG_PARAMETER);
res.disable_bg_flush = false;
}
else if (res.engine == ::TiDB::StorageEngine::DT)
{
/// If background flush is enabled, read will not triggle schema sync.
/// Which means that we may get the wrong result with outdated schema.
if (config.has(disable_bg_flush_conf) && !config.getBool(disable_bg_flush_conf))
throw Exception(
fmt::format("Illegal arguments: enable background flush while using engine {}", MutableSupport::delta_tree_storage_name),
ErrorCodes::INVALID_CONFIG_PARAMETER);
res.disable_bg_flush = true;
}

// just for test
if (config.has("raft.enable_compatible_mode"))
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Server/RaftConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ struct TiFlashRaftConfig
bool enable_compatible_mode = true;

static constexpr TiDB::StorageEngine DEFAULT_ENGINE = TiDB::StorageEngine::DT;
bool disable_bg_flush = false;
TiDB::StorageEngine engine = DEFAULT_ENGINE;
TiDB::SnapshotApplyMethod snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Directory;

Expand Down
75 changes: 6 additions & 69 deletions dbms/src/Storages/Transaction/BackgroundService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,65 +36,12 @@ BackgroundService::BackgroundService(TMTContext & tmt_)
},
false);

if (!tmt.isBgFlushDisabled())
{
table_flush_handle = background_pool.addTask([this] {
RegionTable & region_table = tmt.getRegionTable();

return region_table.tryFlushRegions();
});

region_handle = background_pool.addTask([this] {
bool ok = false;

{
RegionPtr region = nullptr;
{
std::lock_guard lock(region_mutex);
if (!regions_to_flush.empty())
{
auto it = regions_to_flush.begin();
region = it->second;
regions_to_flush.erase(it);
ok = true;
}
}
if (region)
tmt.getRegionTable().tryFlushRegion(region, true);
}
return ok;
});

{
std::vector<RegionPtr> regions;
tmt.getKVStore()->traverseRegions([&regions](RegionID, const RegionPtr & region) {
if (region->dataSize())
regions.emplace_back(region);
});
}
}
else
{
LOG_FMT_INFO(log, "Configuration raft.disable_bg_flush is set to true, background flush tasks are disabled.");
auto & global_settings = tmt.getContext().getSettingsRef();
storage_gc_handle = background_pool.addTask(
[this] { return tmt.getGCManager().work(); },
false,
/*interval_ms=*/global_settings.dt_bg_gc_check_interval * 1000);
LOG_FMT_INFO(log, "Start background storage gc worker with interval {} seconds.", global_settings.dt_bg_gc_check_interval);
}
}

void BackgroundService::addRegionToFlush(const DB::RegionPtr & region)
{
if (tmt.isBgFlushDisabled())
throw Exception("Try to addRegionToFlush while background flush is disabled.", ErrorCodes::LOGICAL_ERROR);

{
std::lock_guard lock(region_mutex);
regions_to_flush.emplace(region->id(), region);
}
region_handle->wake();
auto & global_settings = tmt.getContext().getSettingsRef();
storage_gc_handle = background_pool.addTask(
[this] { return tmt.getGCManager().work(); },
false,
/*interval_ms=*/global_settings.dt_bg_gc_check_interval * 1000);
LOG_FMT_INFO(log, "Start background storage gc worker with interval {} seconds.", global_settings.dt_bg_gc_check_interval);
}

BackgroundService::~BackgroundService()
Expand All @@ -104,17 +51,7 @@ BackgroundService::~BackgroundService()
background_pool.removeTask(single_thread_task_handle);
single_thread_task_handle = nullptr;
}
if (table_flush_handle)
{
background_pool.removeTask(table_flush_handle);
table_flush_handle = nullptr;
}

if (region_handle)
{
background_pool.removeTask(region_handle);
region_handle = nullptr;
}
if (storage_gc_handle)
{
background_pool.removeTask(storage_gc_handle);
Expand Down
7 changes: 0 additions & 7 deletions dbms/src/Storages/Transaction/BackgroundService.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,13 @@ class BackgroundService : boost::noncopyable

~BackgroundService();

void addRegionToFlush(const RegionPtr & region);

private:
TMTContext & tmt;
BackgroundProcessingPool & background_pool;

Poco::Logger * log;

std::mutex region_mutex;
RegionMap regions_to_flush;

BackgroundProcessingPool::TaskHandle single_thread_task_handle;
BackgroundProcessingPool::TaskHandle table_flush_handle;
BackgroundProcessingPool::TaskHandle region_handle;
BackgroundProcessingPool::TaskHandle storage_gc_handle;
};

Expand Down
69 changes: 30 additions & 39 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Storages/Transaction/RegionExecutionResult.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/TMTContext.h>
#include <common/likely.h>

namespace DB
{
Expand Down Expand Up @@ -129,35 +130,33 @@ void KVStore::traverseRegions(std::function<void(RegionID, const RegionPtr &)> &

void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log)
{
if (tmt.isBgFlushDisabled())
auto table_id = region.getMappedTableID();
auto storage = tmt.getStorages().get(table_id);
if (unlikely(storage == nullptr))
{
auto table_id = region.getMappedTableID();
auto storage = tmt.getStorages().get(table_id);
if (storage == nullptr)
{
LOG_WARNING(log,
"tryFlushRegionCacheInStorage can not get table for region:" + region.toString()
+ " with table id: " + DB::toString(table_id) + ", ignored");
return;
}
LOG_FMT_WARNING(log,
"tryFlushRegionCacheInStorage can not get table for region {} with table id {}, ignored",
region.toString(),
table_id);
return;
}

try
{
// Acquire `drop_lock` so that no other threads can drop the storage during `flushCache`. `alter_lock` is not required.
auto storage_lock = storage->lockForShare(getThreadName());
auto rowkey_range = DM::RowKeyRange::fromRegionRange(
region.getRange(),
region.getRange()->getMappedTableID(),
storage->isCommonHandle(),
storage->getRowKeyColumnSize());
storage->flushCache(tmt.getContext(), rowkey_range);
}
catch (DB::Exception & e)
{
// We can ignore if storage is already dropped.
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
}
try
{
// Acquire `drop_lock` so that no other threads can drop the storage during `flushCache`. `alter_lock` is not required.
auto storage_lock = storage->lockForShare(getThreadName());
auto rowkey_range = DM::RowKeyRange::fromRegionRange(
region.getRange(),
region.getRange()->getMappedTableID(),
storage->isCommonHandle(),
storage->getRowKeyColumnSize());
storage->flushCache(tmt.getContext(), rowkey_range);
}
catch (DB::Exception & e)
{
// We can ignore if storage is already dropped.
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
}
}

Expand Down Expand Up @@ -444,21 +443,13 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ

// After region split / merge, try to flush it
const auto try_to_flush_region = [&tmt](const RegionPtr & region) {
if (tmt.isBgFlushDisabled())
try
{
try
{
tmt.getRegionTable().tryFlushRegion(region, false);
}
catch (const Exception & e)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
tmt.getRegionTable().tryFlushRegion(region, false);
}
else
catch (...)
{
if (region->writeCFCount() >= 8192)
tmt.getBackgroundService().addRegionToFlush(region);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
};

Expand Down
9 changes: 3 additions & 6 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,12 +707,9 @@ EngineStoreApplyRes Region::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt6
// If transfer-leader happened during ingest-sst, there might be illegal data.
if (0 != cmds.len)
{
if (tmt.isBgFlushDisabled())
{
/// Flush data right after they are committed.
RegionDataReadInfoList data_list_to_remove;
RegionTable::writeBlockByRegion(context, shared_from_this(), data_list_to_remove, log, false);
}
/// Flush data right after they are committed.
RegionDataReadInfoList data_list_to_remove;
RegionTable::writeBlockByRegion(context, shared_from_this(), data_list_to_remove, log, false);
}

meta.setApplied(index, term);
Expand Down
18 changes: 2 additions & 16 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,7 @@ RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & regio
{
auto & tmt = context->getTMTContext();

if (tmt.isBgFlushDisabled())
{
LOG_FMT_TRACE(log, "table {}, {} original {} bytes", region->getMappedTableID(), region->toString(false), region->dataSize());
}
else
{
LOG_FMT_INFO(log, "table {}, {} original {} bytes", region->getMappedTableID(), region->toString(false), region->dataSize());
}
LOG_FMT_TRACE(log, "table {}, {} original {} bytes", region->getMappedTableID(), region->toString(false), region->dataSize());

/// Write region data into corresponding storage.
RegionDataReadInfoList data_list_to_remove;
Expand All @@ -144,14 +137,7 @@ RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & regio
}
}

if (tmt.isBgFlushDisabled())
{
LOG_FMT_TRACE(log, "table {}, {} after flush {} bytes", region->getMappedTableID(), region->toString(false), cache_size);
}
else
{
LOG_FMT_INFO(log, "table {}, {} after flush {} bytes", region->getMappedTableID(), region->toString(false), cache_size);
}
LOG_FMT_TRACE(log, "table {}, {} after flush {} bytes", region->getMappedTableID(), region->toString(false), cache_size);
}

return data_list_to_remove;
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config
context.getSettingsRef().task_scheduler_thread_soft_limit,
context.getSettingsRef().task_scheduler_thread_hard_limit)))
, engine(raft_config.engine)
, disable_bg_flush(raft_config.disable_bg_flush)
, replica_read_max_thread(1)
, batch_read_index_timeout_ms(DEFAULT_BATCH_READ_INDEX_TIMEOUT_MS)
, wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC)
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Storages/Transaction/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ class TMTContext : private boost::noncopyable

const Context & getContext() const;

bool isBgFlushDisabled() const { return disable_bg_flush; }

explicit TMTContext(Context & context_, const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config_);

SchemaSyncerPtr getSchemaSyncer() const;
Expand Down Expand Up @@ -140,8 +138,6 @@ class TMTContext : private boost::noncopyable

::TiDB::StorageEngine engine;

bool disable_bg_flush;

std::atomic_uint64_t replica_read_max_thread;
std::atomic_uint64_t batch_read_index_timeout_ms;
std::atomic_uint64_t wait_index_timeout_ms;
Expand Down
1 change: 0 additions & 1 deletion dbms/src/TestUtils/TiFlashTestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, bool enable_

raft_config.ignore_databases = {"default", "system"};
raft_config.engine = TiDB::StorageEngine::DT;
raft_config.disable_bg_flush = true;
global_context->createTMTContext(raft_config, pingcap::ClusterConfig());

global_context->setDeltaIndexManager(1024 * 1024 * 100 /*100MB*/);
Expand Down

0 comments on commit e03dcc1

Please sign in to comment.