From 0aec04afc4c51c1965277af7cac58ddf0c918a1d Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 7 Jul 2022 17:37:03 +0800 Subject: [PATCH 1/7] KVStore: decouple flush region and CompactLog with a new FFI fn_try_flush_data (#5283) ref pingcap/tiflash#5170 --- contrib/tiflash-proxy | 2 +- dbms/src/Storages/Transaction/KVStore.cpp | 88 +++++++++++++------ dbms/src/Storages/Transaction/KVStore.h | 8 ++ dbms/src/Storages/Transaction/ProxyFFI.cpp | 28 ++++++ dbms/src/Storages/Transaction/ProxyFFI.h | 4 + .../Transaction/tests/gtest_kvstore.cpp | 6 ++ 6 files changed, 107 insertions(+), 29 deletions(-) diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 6ea4d608b1c..0080c6866d8 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 6ea4d608b1c03fab89d17f54a2e399602231e27c +Subproject commit 0080c6866d8bbba32bdf198437dbc98cc078ea7e diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index f9d6d01955e..fb31e4476bb 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -327,6 +327,64 @@ void KVStore::persistRegion(const Region & region, const RegionTaskLock & region LOG_FMT_DEBUG(log, "Persist {} done", region.toString(false)); } +bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt) +{ + auto region_task_lock = region_manager.genRegionTaskLock(region_id); + const RegionPtr curr_region_ptr = getRegion(region_id); + return canFlushRegionDataImpl(curr_region_ptr, false, false, tmt, region_task_lock); +} + +bool KVStore::tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt) +{ + auto region_task_lock = region_manager.genRegionTaskLock(region_id); + const RegionPtr curr_region_ptr = getRegion(region_id); + return canFlushRegionDataImpl(curr_region_ptr, true, try_until_succeed, tmt, region_task_lock); +} + +bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock) +{ + if (curr_region_ptr == nullptr) + { + throw Exception(fmt::format("region not found when trying flush", ErrorCodes::LOGICAL_ERROR)); + } + auto & curr_region = *curr_region_ptr; + + auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo(); + + LOG_FMT_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes); + + bool can_flush = false; + if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed) + || size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) + { + // if rows or bytes more than threshold, flush cache and persist mem data. + can_flush = true; + } + else + { + // if there is little data in mem, wait until time interval reached threshold. + // use random period so that lots of regions will not be persisted at same time. + auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT + can_flush = !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now()); + } + if (can_flush && flush_if_possible) + { + LOG_FMT_DEBUG(log, "{} flush region due to can_flush_data", curr_region.toString(false)); + if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed)) + { + persistRegion(curr_region, region_task_lock, "compact raft log"); + curr_region.markCompactLog(); + curr_region.cleanApproxMemCacheInfo(); + return true; + } + else + { + return false; + } + } + return can_flush; +} + EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( raft_cmdpb::AdminCmdType cmd_type, UInt64 curr_region_id, @@ -360,39 +418,13 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( } else { - auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo(); - - LOG_FMT_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes); - - if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed) - || size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) - { - // if rows or bytes more than threshold, try to flush cache and persist mem data. - return true; - } - else - { - // if there is little data in mem, wait until time interval reached threshold. - // use random period so that lots of regions will not be persisted at same time. - auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT - return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now()); - } + return canFlushRegionDataImpl(curr_region_ptr, true, /* try_until_succeed */ false, tmt, region_task_lock); } }; if (check_sync_log()) { - if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false)) - { - persistRegion(curr_region, region_task_lock, "compact raft log"); - curr_region.markCompactLog(); - curr_region.cleanApproxMemCacheInfo(); - return EngineStoreApplyRes::Persist; - } - else - { - return EngineStoreApplyRes::None; - } + return EngineStoreApplyRes::Persist; } return EngineStoreApplyRes::None; } diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 66e2fe32b75..b58083557a1 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -108,6 +108,9 @@ class KVStore final : private boost::noncopyable TMTContext & tmt); EngineStoreApplyRes handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt); + bool needFlushRegionData(UInt64 region_id, TMTContext & tmt); + bool tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt); + void handleApplySnapshot(metapb::Region && region, uint64_t peer_id, const SSTViewVec, uint64_t index, uint64_t term, TMTContext & tmt); std::vector /* */ preHandleSnapshotToFiles( @@ -219,6 +222,11 @@ class KVStore final : private boost::noncopyable UInt64 term, TMTContext & tmt); + /// Notice that if flush_if_possible is set to false, we only check if a flush is allowed by rowsize/size/interval. + /// It will not check if a flush will eventually succeed. + /// In other words, `canFlushRegionDataImpl(flush_if_possible=true)` can return false. + bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock); + void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller); void releaseReadIndexWorkers(); void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &); diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index 8a40ca9b15e..d4ba50d5714 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -128,6 +128,34 @@ EngineStoreApplyRes HandleAdminRaftCmd( } } +uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id) +{ + try + { + auto & kvstore = server->tmt->getKVStore(); + return kvstore->needFlushRegionData(region_id, *server->tmt); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + +uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed) +{ + try + { + auto & kvstore = server->tmt->getKVStore(); + return kvstore->tryFlushRegionData(region_id, until_succeed, *server->tmt); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + static_assert(sizeof(RaftStoreProxyFFIHelper) == sizeof(TiFlashRaftProxyHelper)); static_assert(alignof(RaftStoreProxyFFIHelper) == alignof(TiFlashRaftProxyHelper)); diff --git a/dbms/src/Storages/Transaction/ProxyFFI.h b/dbms/src/Storages/Transaction/ProxyFFI.h index e1c01599275..aafe4b375eb 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.h +++ b/dbms/src/Storages/Transaction/ProxyFFI.h @@ -125,6 +125,8 @@ EngineStoreApplyRes HandleAdminRaftCmd( EngineStoreApplyRes HandleWriteRaftCmd(const EngineStoreServerWrap * server, WriteCmdsView cmds, RaftCmdHeader header); +uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id); +uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed); void AtomicUpdateProxy(EngineStoreServerWrap * server, RaftStoreProxyFFIHelper * proxy); void HandleDestroy(EngineStoreServerWrap * server, uint64_t region_id); EngineStoreApplyRes HandleIngestSST(EngineStoreServerWrap * server, SSTViewVec snaps, RaftCmdHeader header); @@ -158,6 +160,8 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper( .fn_gen_cpp_string = GenCppRawString, .fn_handle_write_raft_cmd = HandleWriteRaftCmd, .fn_handle_admin_raft_cmd = HandleAdminRaftCmd, + .fn_need_flush_data = NeedFlushData, + .fn_try_flush_data = TryFlushData, .fn_atomic_update_proxy = AtomicUpdateProxy, .fn_handle_destroy = HandleDestroy, .fn_handle_ingest_sst = HandleIngestSST, diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index 36a91522bb6..77aab06f6cf 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -1179,6 +1179,12 @@ void RegionKVStoreTest::testKVStore() ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin"); } } + { + // There shall be data to flush. + ASSERT_EQ(kvs.needFlushRegionData(19, ctx.getTMTContext()), true); + // Force flush until succeed only for testing. + ASSERT_EQ(kvs.tryFlushRegionData(19, true, ctx.getTMTContext()), true); + } } void test_mergeresult() From 3e8df4bd1e569a0592bfdabd7e8fb0c8dd827a63 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 7 Jul 2022 18:31:02 +0800 Subject: [PATCH 2/7] refine error message in mpptask (#5304) ref pingcap/tiflash#5095 --- dbms/src/Flash/EstablishCall.cpp | 2 +- dbms/src/Flash/Mpp/MPPTask.cpp | 4 +- dbms/src/Flash/Mpp/MPPTunnel.cpp | 4 ++ dbms/src/Flash/Mpp/Utils.cpp | 11 +++++ dbms/src/Flash/Mpp/Utils.h | 1 + dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 4 +- tests/fullstack-test/mpp/issue_2471.test | 10 +--- tests/fullstack-test/mpp/mpp_fail.test | 50 ++------------------ 8 files changed, 28 insertions(+), 58 deletions(-) diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 89857a2407e..2f8c7c15f56 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -143,7 +143,7 @@ void EstablishCallData::finishTunnelAndResponder() state = FINISH; if (mpp_tunnel) { - mpp_tunnel->consumerFinish("grpc writes failed.", true); //trigger mpp tunnel finish work + mpp_tunnel->consumerFinish(fmt::format("{}: finishTunnelAndResponder called.", mpp_tunnel->id()), true); //trigger mpp tunnel finish work } grpc::Status status(static_cast(GRPC_STATUS_UNKNOWN), "Consumer exits unexpected, grpc writes failed."); responder.Finish(status, this); diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index c2d5e6f49f8..da8f3034abc 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -379,7 +379,7 @@ void MPPTask::runImpl() } catch (...) { - err_msg = getCurrentExceptionMessage(true); + err_msg = getCurrentExceptionMessage(true, true); } if (err_msg.empty()) @@ -405,6 +405,8 @@ void MPPTask::runImpl() if (status == RUNNING) { LOG_FMT_ERROR(log, "task running meets error: {}", err_msg); + /// trim the stack trace to avoid too many useless information in log + trimStackTrace(err_msg); try { handleError(err_msg); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 13a7eaad95e..16fe4ae42cc 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -220,7 +220,11 @@ void MPPTunnelBase::sendJob(bool need_lock) err_msg = "fatal error in sendJob()"; } if (!err_msg.empty()) + { + /// append tunnel id to error message + err_msg = fmt::format("{} meet error: {}", tunnel_id, err_msg); LOG_ERROR(log, err_msg); + } consumerFinish(err_msg, need_lock); if (is_async) writer->writeDone(grpc::Status::OK); diff --git a/dbms/src/Flash/Mpp/Utils.cpp b/dbms/src/Flash/Mpp/Utils.cpp index 477c478eef7..21d89b3cd52 100644 --- a/dbms/src/Flash/Mpp/Utils.cpp +++ b/dbms/src/Flash/Mpp/Utils.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include @@ -27,4 +28,14 @@ mpp::MPPDataPacket getPacketWithError(String reason) return data; } +void trimStackTrace(String & message) +{ + auto stack_trace_pos = message.find("Stack trace"); + if (stack_trace_pos != String::npos) + { + message.resize(stack_trace_pos); + Poco::trimRightInPlace(message); + } +} + } // namespace DB diff --git a/dbms/src/Flash/Mpp/Utils.h b/dbms/src/Flash/Mpp/Utils.h index 67e2dc3f641..021dc4407d5 100644 --- a/dbms/src/Flash/Mpp/Utils.h +++ b/dbms/src/Flash/Mpp/Utils.h @@ -23,5 +23,6 @@ namespace DB { mpp::MPPDataPacket getPacketWithError(String reason); +void trimStackTrace(String & message); } // namespace DB diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 47ce2ee6ee6..706c17ed036 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -382,7 +382,7 @@ TEST_F(TestMPPTunnelBase, WriteError) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, grpc writes failed."); + GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, 0000_0001 meet error: grpc writes failed."); } } @@ -631,7 +631,7 @@ TEST_F(TestMPPTunnelBase, AsyncWriteError) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, grpc writes failed."); + GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, 0000_0001 meet error: grpc writes failed."); } } diff --git a/tests/fullstack-test/mpp/issue_2471.test b/tests/fullstack-test/mpp/issue_2471.test index 497ce605893..9966eaadec3 100644 --- a/tests/fullstack-test/mpp/issue_2471.test +++ b/tests/fullstack-test/mpp/issue_2471.test @@ -35,15 +35,7 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_bro => DBGInvoke __enable_fail_point(exception_in_creating_set_input_stream) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_broadcast_cartesian_join=2; select * from a as t1 left join a as t2 on t1.id = t2.id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered., e.what() = DB::Exception, Stack trace: -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered., e.what() = DB::Exception, => DBGInvoke __disable_fail_point(exception_in_creating_set_input_stream) diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index e03c6150be6..0e272c0b621 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -71,44 +71,20 @@ ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_be ## exception during mpp run non root task => DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered., e.what() = DB::Exception, Stack trace: -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered., e.what() = DB::Exception,, e.what() = DB::Exception, => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) ## exception during mpp run root task => DBGInvoke __enable_fail_point(exception_during_mpp_root_task_run) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_root_task_run is triggered., e.what() = DB::Exception, Stack trace: -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_root_task_run is triggered., e.what() = DB::Exception, => DBGInvoke __disable_fail_point(exception_during_mpp_root_task_run) ## exception during mpp write err to tunnel => DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __enable_fail_point(exception_during_mpp_write_err_to_tunnel) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Failed to write error msg to tunnel, e.what() = DB::Exception, Stack trace: -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Failed to write error msg to tunnel, e.what() = DB::Exception, => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_write_err_to_tunnel) @@ -116,14 +92,7 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText => DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __enable_fail_point(exception_during_mpp_close_tunnel) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered., e.what() = DB::Exception, Stack trace: -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered., e.what() = DB::Exception,, e.what() = DB::Exception, => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) @@ -156,16 +125,7 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText ## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs. => DBGInvoke __enable_fail_point(exception_mpp_hash_build) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered., e.what() = DB::Exception, Stack trace: -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} -{#LINE} +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered., e.what() = DB::Exception, => DBGInvoke __disable_fail_point(exception_mpp_hash_build) # Clean up. From 5295223a3a65b3e5793d9744520ad1c46e4ce405 Mon Sep 17 00:00:00 2001 From: lizhenhuan <1916038084@qq.com> Date: Thu, 7 Jul 2022 19:05:02 +0800 Subject: [PATCH 3/7] Implement ReverseUTF8/Reverse function push down (#5233) close pingcap/tiflash#5111 --- dbms/src/Flash/Coprocessor/DAGUtils.cpp | 4 +- .../Functions/tests/gtest_strings_reverse.cpp | 120 ++++++++++++++++++ tests/fullstack-test/expr/reverse.test | 44 +++++++ 3 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 dbms/src/Functions/tests/gtest_strings_reverse.cpp create mode 100644 tests/fullstack-test/expr/reverse.test diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index f0800eda4df..a4d491e5637 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -648,8 +648,8 @@ const std::unordered_map scalar_func_map({ //{tipb::ScalarFuncSig::Quote, "cast"}, //{tipb::ScalarFuncSig::Repeat, "cast"}, {tipb::ScalarFuncSig::Replace, "replaceAll"}, - //{tipb::ScalarFuncSig::ReverseUTF8, "cast"}, - //{tipb::ScalarFuncSig::Reverse, "cast"}, + {tipb::ScalarFuncSig::ReverseUTF8, "reverseUTF8"}, + {tipb::ScalarFuncSig::Reverse, "reverse"}, {tipb::ScalarFuncSig::RightUTF8, "rightUTF8"}, //{tipb::ScalarFuncSig::Right, "cast"}, {tipb::ScalarFuncSig::RpadUTF8, "rpadUTF8"}, diff --git a/dbms/src/Functions/tests/gtest_strings_reverse.cpp b/dbms/src/Functions/tests/gtest_strings_reverse.cpp new file mode 100644 index 00000000000..304a403db83 --- /dev/null +++ b/dbms/src/Functions/tests/gtest_strings_reverse.cpp @@ -0,0 +1,120 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include +#include + +#pragma GCC diagnostic pop + +namespace DB::tests +{ +class StringReverse : public DB::tests::FunctionTest +{ +protected: + static ColumnWithTypeAndName toVec(const std::vector & v) + { + return createColumn(v); + } + + static ColumnWithTypeAndName toNullableVec(const std::vector> & v) + { + return createColumn>(v); + } + + static ColumnWithTypeAndName toConst(const String & s) + { + return createConstColumn(1, s); + } +}; +// test reverse +TEST_F(StringReverse, stringReverseTest) +try +{ + std::vector candidate_strings = {"one week's time test", "abcdef", "abcabc", "moc.pacgnip"}; + std::vector reversed_strings = {"tset emit s'keew eno", "fedcba", "cbacba", "pingcap.com"}; + + // test vector + ASSERT_COLUMN_EQ( + toVec(reversed_strings), + executeFunction( + "reverse", + toVec(candidate_strings))); + + // test nullable + ASSERT_COLUMN_EQ( + toNullableVec({"", " ", {}, "pacgnip"}), + executeFunction( + "reverse", + toNullableVec({"", " ", {}, "pingcap"}))); + + // test const + ASSERT_COLUMN_EQ( + toConst("pacgnip"), + executeFunction( + "reverse", + toConst("pingcap"))); + + // test null + ASSERT_COLUMN_EQ( + toConst({}), + executeFunction( + "reverse", + toConst({}))); +} +CATCH + +// test reverseUTF8 +TEST_F(StringReverse, stringReverseUTF8Test) +try +{ + std::vector candidate_strings = {"one week's time test", "abc测试def", "abcテストabc", "ѐёђѓєѕіїјљњћќѝўџ", "+ѐ-ё*ђ/ѓ!є@ѕ#і$@ї%ј……љ&њ(ћ)ќ¥ѝ#ў@џ!^", "αβγδεζηθικλμνξοπρστυφχψωσ", "▲α▼βγ➨δε☎ζη✂θι€κλ♫μν✓ξο✚πρ℉στ♥υφ♖χψ♘ω★σ✕", "թփձջրչճժծքոեռտըւիօպասդֆգհյկլխզղցվբնմշ"}; + std::vector reversed_strings = {"tset emit s'keew eno", "fed试测cba", "cbaトステcba", "џўѝќћњљјїіѕєѓђёѐ", "^!џ@ў#ѝ¥ќ)ћ(њ&љ……ј%ї@$і#ѕ@є!ѓ/ђ*ё-ѐ+", "σωψχφυτσρποξνμλκιθηζεδγβα", "✕σ★ω♘ψχ♖φυ♥τσ℉ρπ✚οξ✓νμ♫λκ€ιθ✂ηζ☎εδ➨γβ▼α▲", "շմնբվցղզխլկյհգֆդսապօիւըտռեոքծժճչրջձփթ"}; + + // test vector + ASSERT_COLUMN_EQ( + toVec(reversed_strings), + executeFunction( + "reverseUTF8", + toVec(candidate_strings))); + + // test nullable + ASSERT_COLUMN_EQ( + toNullableVec({"", " ", {}, "pacgnip"}), + executeFunction( + "reverseUTF8", + toNullableVec({"", " ", {}, "pingcap"}))); + + // test const + ASSERT_COLUMN_EQ( + toConst("pacgnip"), + executeFunction( + "reverseUTF8", + toConst("pingcap"))); + + // test null + ASSERT_COLUMN_EQ( + toConst({}), + executeFunction( + "reverseUTF8", + toConst({}))); +} +CATCH + +} // namespace DB::tests \ No newline at end of file diff --git a/tests/fullstack-test/expr/reverse.test b/tests/fullstack-test/expr/reverse.test new file mode 100644 index 00000000000..9195adf2b7d --- /dev/null +++ b/tests/fullstack-test/expr/reverse.test @@ -0,0 +1,44 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a varchar(256)); + + +mysql> insert into test.t values('one week’s time test'); +mysql> insert into test.t values('abc测试def'); +mysql> insert into test.t values('abcテストabc'); +mysql> insert into test.t values('ѐёђѓєѕіїјљњћќѝўџ'); +mysql> insert into test.t values('+ѐ-ё*ђ/ѓ!є@ѕ#і@ї%ј……љ&њ(ћ)ќ¥ѝ#ў@џ!^'); +mysql> insert into test.t values('αβγδεζηθικλμνξοπρστυφχψωσ'); +mysql> insert into test.t values('▲α▼βγ➨δε☎ζη✂θι€κλ♫μν✓ξο✚πρ℉στ♥υφ♖χψ♘ω★σ✕'); +mysql> insert into test.t values('թփձջրչճժծքոեռտըւիօպասդֆգհյկլխզղցվբնմշ'); +mysql> insert into test.t values(NULL); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t + +mysql> set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select reverse(a) from test.t; ++-------------------------------------------------------------------------------------------------+ +| reverse(a) | ++-------------------------------------------------------------------------------------------------+ +| tset emit s’keew eno | +| fed试测cba | +| cbaトステcba | +| џўѝќћњљјїіѕєѓђёѐ | +| ^!џ@ў#ѝ¥ќ)ћ(њ&љ……ј%ї@і#ѕ@є!ѓ/ђ*ё-ѐ+ | +| σωψχφυτσρποξνμλκιθηζεδγβα | +| ✕σ★ω♘ψχ♖φυ♥τσ℉ρπ✚οξ✓νμ♫λκ€ιθ✂ηζ☎εδ➨γβ▼α▲ | +| շմնբվցղզխլկյհգֆդսապօիւըտռեոքծժճչրջձփթ | +| NULL | ++-------------------------------------------------------------------------------------------------+ From 97342db2a1dd705b1bd98f3481d51f48a4f0300d Mon Sep 17 00:00:00 2001 From: Zhigao Tong Date: Thu, 7 Jul 2022 19:57:02 +0800 Subject: [PATCH 4/7] Optimize comparision for collation `UTF8_BIN` and `UTF8MB4_BIN` (#5299) ref pingcap/tiflash#5294 --- dbms/src/Columns/ColumnConst.h | 3 +- .../Functions/CollationOperatorOptimized.h | 210 ++++++++++++++++++ dbms/src/Functions/FunctionsComparison.h | 54 ++++- dbms/src/Storages/Transaction/Collator.cpp | 36 ++- .../tidb-ci/new_collation_fullstack/expr.test | 14 ++ 5 files changed, 287 insertions(+), 30 deletions(-) create mode 100644 dbms/src/Functions/CollationOperatorOptimized.h diff --git a/dbms/src/Columns/ColumnConst.h b/dbms/src/Columns/ColumnConst.h index 27283c0f24a..da071507a72 100644 --- a/dbms/src/Columns/ColumnConst.h +++ b/dbms/src/Columns/ColumnConst.h @@ -233,7 +233,8 @@ class ColumnConst final : public COWPtrHelper template T getValue() const { - return getField().safeGet::Type>(); + auto && tmp = getField(); + return std::move(tmp.safeGet::Type>()); } }; diff --git a/dbms/src/Functions/CollationOperatorOptimized.h b/dbms/src/Functions/CollationOperatorOptimized.h new file mode 100644 index 00000000000..395ecc5b9eb --- /dev/null +++ b/dbms/src/Functions/CollationOperatorOptimized.h @@ -0,0 +1,210 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + + +namespace DB +{ + +template +ALWAYS_INLINE inline int signum(T val) +{ + return (0 < val) - (val < 0); +} + +// Check equality is much faster than other comparison. +// - check size first +// - return 0 if equal else 1 +__attribute__((flatten, always_inline, pure)) inline uint8_t RawStrEqualCompare(const std::string_view & lhs, const std::string_view & rhs) +{ + return StringRef(lhs) == StringRef(rhs) ? 0 : 1; +} + +// Compare str view by memcmp +__attribute__((flatten, always_inline, pure)) inline int RawStrCompare(const std::string_view & v1, const std::string_view & v2) +{ + return signum(v1.compare(v2)); +} + +constexpr char SPACE = ' '; + +// Remove tail space +__attribute__((flatten, always_inline, pure)) inline std::string_view RightTrim(const std::string_view & v) +{ + if (likely(v.empty() || v.back() != SPACE)) + return v; + size_t end = v.find_last_not_of(SPACE); + return end == std::string_view::npos ? std::string_view{} : std::string_view(v.data(), end + 1); +} + +__attribute__((flatten, always_inline, pure)) inline int RtrimStrCompare(const std::string_view & va, const std::string_view & vb) +{ + return RawStrCompare(RightTrim(va), RightTrim(vb)); +} + +// If true, only need to check equal or not. +template +struct IsEqualRelated +{ + static constexpr const bool value = false; +}; + +// For `EqualsOp` and `NotEqualsOp`, value is true. +template +struct IsEqualRelated> +{ + static constexpr const bool value = true; +}; +template +struct IsEqualRelated> +{ + static constexpr const bool value = true; +}; + +// Loop columns and invoke callback for each pair. +template +__attribute__((flatten, always_inline)) inline void LoopTwoColumns( + const ColumnString::Chars_t & a_data, + const ColumnString::Offsets & a_offsets, + const ColumnString::Chars_t & b_data, + const ColumnString::Offsets & b_offsets, + size_t size, + F && func) +{ + for (size_t i = 0; i < size; ++i) + { + size_t a_size = StringUtil::sizeAt(a_offsets, i) - 1; + size_t b_size = StringUtil::sizeAt(b_offsets, i) - 1; + const auto * a_ptr = reinterpret_cast(&a_data[StringUtil::offsetAt(a_offsets, i)]); + const auto * b_ptr = reinterpret_cast(&b_data[StringUtil::offsetAt(b_offsets, i)]); + + func({a_ptr, a_size}, {b_ptr, b_size}, i); + } +} + +// Loop one column and invoke callback for each pair. +template +__attribute__((flatten, always_inline)) inline void LoopOneColumn( + const ColumnString::Chars_t & a_data, + const ColumnString::Offsets & a_offsets, + size_t size, + F && func) +{ + for (size_t i = 0; i < size; ++i) + { + size_t a_size = StringUtil::sizeAt(a_offsets, i) - 1; + const auto * a_ptr = reinterpret_cast(&a_data[StringUtil::offsetAt(a_offsets, i)]); + + func({a_ptr, a_size}, i); + } +} + +// Handle str-column compare str-column. +// - Optimize UTF8_BIN and UTF8MB4_BIN +// - Check if columns do NOT contain tail space +// - If Op is `EqualsOp` or `NotEqualsOp`, optimize comparison by faster way +template +ALWAYS_INLINE inline bool StringVectorStringVector( + const ColumnString::Chars_t & a_data, + const ColumnString::Offsets & a_offsets, + const ColumnString::Chars_t & b_data, + const ColumnString::Offsets & b_offsets, + const TiDB::TiDBCollatorPtr & collator, + Result & c) +{ + bool use_optimized_path = false; + + switch (collator->getCollatorId()) + { + case TiDB::ITiDBCollator::UTF8MB4_BIN: + case TiDB::ITiDBCollator::UTF8_BIN: + { + size_t size = a_offsets.size(); + + LoopTwoColumns(a_data, a_offsets, b_data, b_offsets, size, [&c](const std::string_view & va, const std::string_view & vb, size_t i) { + if constexpr (IsEqualRelated::value) + { + c[i] = Op::apply(RawStrEqualCompare(RightTrim(va), RightTrim(vb)), 0); + } + else + { + c[i] = Op::apply(RtrimStrCompare(va, vb), 0); + } + }); + + use_optimized_path = true; + + break; + } + default: + break; + } + return use_optimized_path; +} + +// Handle str-column compare const-str. +// - Optimize UTF8_BIN and UTF8MB4_BIN +// - Right trim const-str first +// - Check if column does NOT contain tail space +// - If Op is `EqualsOp` or `NotEqualsOp`, optimize comparison by faster way +template +ALWAYS_INLINE inline bool StringVectorConstant( + const ColumnString::Chars_t & a_data, + const ColumnString::Offsets & a_offsets, + const std::string_view & b, + const TiDB::TiDBCollatorPtr & collator, + Result & c) +{ + bool use_optimized_path = false; + + switch (collator->getCollatorId()) + { + case TiDB::ITiDBCollator::UTF8MB4_BIN: + case TiDB::ITiDBCollator::UTF8_BIN: + { + size_t size = a_offsets.size(); + + std::string_view tar_str_view = RightTrim(b); // right trim const-str first + + LoopOneColumn(a_data, a_offsets, size, [&c, &tar_str_view](const std::string_view & view, size_t i) { + if constexpr (IsEqualRelated::value) + { + c[i] = Op::apply(RawStrEqualCompare(RightTrim(view), tar_str_view), 0); + } + else + { + c[i] = Op::apply(RawStrCompare(RightTrim(view), tar_str_view), 0); + } + }); + + use_optimized_path = true; + break; + } + default: + break; + } + return use_optimized_path; +} + +} // namespace DB diff --git a/dbms/src/Functions/FunctionsComparison.h b/dbms/src/Functions/FunctionsComparison.h index 1c63a286452..8f7502fba85 100644 --- a/dbms/src/Functions/FunctionsComparison.h +++ b/dbms/src/Functions/FunctionsComparison.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -301,6 +302,12 @@ struct StringComparisonWithCollatorImpl const TiDB::TiDBCollatorPtr & collator, PaddedPODArray & c) { + bool optimized_path = StringVectorStringVector(a_data, a_offsets, b_data, b_offsets, collator, c); + if (optimized_path) + { + return; + } + size_t size = a_offsets.size(); for (size_t i = 0; i < size; ++i) @@ -317,10 +324,17 @@ struct StringComparisonWithCollatorImpl static void NO_INLINE stringVectorConstant( const ColumnString::Chars_t & a_data, const ColumnString::Offsets & a_offsets, - const std::string & b, + const std::string_view & b, const TiDB::TiDBCollatorPtr & collator, PaddedPODArray & c) { + bool optimized_path = StringVectorConstant(a_data, a_offsets, b, collator, c); + + if (optimized_path) + { + return; + } + size_t size = a_offsets.size(); ColumnString::Offset b_size = b.size(); const char * b_data = reinterpret_cast(b.data()); @@ -332,7 +346,7 @@ struct StringComparisonWithCollatorImpl } static void constantStringVector( - const std::string & a, + const std::string_view & a, const ColumnString::Chars_t & b_data, const ColumnString::Offsets & b_offsets, const TiDB::TiDBCollatorPtr & collator, @@ -342,8 +356,8 @@ struct StringComparisonWithCollatorImpl } static void constantConstant( - const std::string & a, - const std::string & b, + const std::string_view & a, + const std::string_view & b, const TiDB::TiDBCollatorPtr & collator, ResultType & c) { @@ -706,6 +720,25 @@ class FunctionComparison : public IFunction } } + static inline std::string_view genConstStrRef(const ColumnConst * c0_const) + { + std::string_view c0_const_str_ref{}; + if (c0_const) + { + if (const auto * c0_const_string = checkAndGetColumn(&c0_const->getDataColumn()); c0_const_string) + { + c0_const_str_ref = std::string_view(c0_const_string->getDataAt(0)); + } + else if (const auto * c0_const_fixed_string = checkAndGetColumn(&c0_const->getDataColumn()); c0_const_fixed_string) + { + c0_const_str_ref = std::string_view(c0_const_fixed_string->getDataAt(0)); + } + else + throw Exception("Logical error: ColumnConst contains not String nor FixedString column", ErrorCodes::ILLEGAL_COLUMN); + } + return c0_const_str_ref; + } + template bool executeStringWithCollator( Block & block, @@ -720,10 +753,13 @@ class FunctionComparison : public IFunction using ResultType = typename ResultColumnType::value_type; using StringImpl = StringComparisonWithCollatorImpl, ResultType>; + std::string_view c0_const_str_ref = genConstStrRef(c0_const); + std::string_view c1_const_str_ref = genConstStrRef(c1_const); + if (c0_const && c1_const) { ResultType res = 0; - StringImpl::constantConstant(c0_const->getValue(), c1_const->getValue(), collator, res); + StringImpl::constantConstant(c0_const_str_ref, c1_const_str_ref, collator, res); block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(c0_const->size(), toField(res)); return true; } @@ -745,12 +781,12 @@ class FunctionComparison : public IFunction StringImpl::stringVectorConstant( c0_string->getChars(), c0_string->getOffsets(), - c1_const->getValue(), + c1_const_str_ref, collator, c_res->getData()); else if (c0_const && c1_string) StringImpl::constantStringVector( - c0_const->getValue(), + c0_const_str_ref, c1_string->getChars(), c1_string->getOffsets(), collator, @@ -770,8 +806,8 @@ class FunctionComparison : public IFunction template bool executeString(Block & block, size_t result, const IColumn * c0, const IColumn * c1) const { - const ColumnString * c0_string = checkAndGetColumn(c0); - const ColumnString * c1_string = checkAndGetColumn(c1); + const auto * c0_string = checkAndGetColumn(c0); + const auto * c1_string = checkAndGetColumn(c1); const ColumnConst * c0_const = checkAndGetColumnConstStringOrFixedString(c0); const ColumnConst * c1_const = checkAndGetColumnConstStringOrFixedString(c1); diff --git a/dbms/src/Storages/Transaction/Collator.cpp b/dbms/src/Storages/Transaction/Collator.cpp index a9b4d0784be..1b0221a6829 100644 --- a/dbms/src/Storages/Transaction/Collator.cpp +++ b/dbms/src/Storages/Transaction/Collator.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -29,17 +30,10 @@ TiDBCollators dummy_collators; std::vector dummy_sort_key_contaners; std::string dummy_sort_key_contaner; -std::string_view rtrim(const char * s, size_t length) +ALWAYS_INLINE std::string_view rtrim(const char * s, size_t length) { auto v = std::string_view(s, length); - size_t end = v.find_last_not_of(' '); - return end == std::string_view::npos ? "" : v.substr(0, end + 1); -} - -template -int signum(T val) -{ - return (0 < val) - (val < 0); + return DB::RightTrim(v); } using Rune = int32_t; @@ -183,26 +177,26 @@ class Pattern : public ITiDBCollator::IPattern }; template -class BinCollator : public ITiDBCollator +class BinCollator final : public ITiDBCollator { public: explicit BinCollator(int32_t id) : ITiDBCollator(id) {} + int compare(const char * s1, size_t length1, const char * s2, size_t length2) const override { if constexpr (padding) - return signum(rtrim(s1, length1).compare(rtrim(s2, length2))); + return DB::RtrimStrCompare({s1, length1}, {s2, length2}); else - return signum(std::string_view(s1, length1).compare(std::string_view(s2, length2))); + return DB::RawStrCompare({s1, length1}, {s2, length2}); } StringRef sortKey(const char * s, size_t length, std::string &) const override { if constexpr (padding) { - auto v = rtrim(s, length); - return StringRef(v.data(), v.length()); + return StringRef(rtrim(s, length)); } else { @@ -249,7 +243,7 @@ using WeightType = uint16_t; extern const std::array weight_lut; } // namespace GeneralCI -class GeneralCICollator : public ITiDBCollator +class GeneralCICollator final : public ITiDBCollator { public: explicit GeneralCICollator(int32_t id) @@ -270,7 +264,7 @@ class GeneralCICollator : public ITiDBCollator auto sk2 = weight(c2); auto cmp = sk1 - sk2; if (cmp != 0) - return signum(cmp); + return DB::signum(cmp); } return (offset1 < v1.length()) - (offset2 < v2.length()); @@ -365,7 +359,7 @@ const std::array weight_lut_long = { } // namespace UnicodeCI -class UnicodeCICollator : public ITiDBCollator +class UnicodeCICollator final : public ITiDBCollator { public: explicit UnicodeCICollator(int32_t id) @@ -420,7 +414,7 @@ class UnicodeCICollator : public ITiDBCollator } else { - return signum(static_cast(s1_first & 0xFFFF) - static_cast(s2_first & 0xFFFF)); + return DB::signum(static_cast(s1_first & 0xFFFF) - static_cast(s2_first & 0xFFFF)); } } } @@ -593,6 +587,8 @@ class UnicodeCICollator : public ITiDBCollator friend class Pattern; }; +using UTF8MB4_BIN_TYPE = BinCollator; + TiDBCollatorPtr ITiDBCollator::getCollator(int32_t id) { switch (id) @@ -607,10 +603,10 @@ TiDBCollatorPtr ITiDBCollator::getCollator(int32_t id) static const auto latin1_collator = BinCollator(LATIN1_BIN); return &latin1_collator; case ITiDBCollator::UTF8MB4_BIN: - static const auto utf8mb4_collator = BinCollator(UTF8MB4_BIN); + static const auto utf8mb4_collator = UTF8MB4_BIN_TYPE(UTF8MB4_BIN); return &utf8mb4_collator; case ITiDBCollator::UTF8_BIN: - static const auto utf8_collator = BinCollator(UTF8_BIN); + static const auto utf8_collator = UTF8MB4_BIN_TYPE(UTF8_BIN); return &utf8_collator; case ITiDBCollator::UTF8_GENERAL_CI: static const auto utf8_general_ci_collator = GeneralCICollator(UTF8_GENERAL_CI); diff --git a/tests/tidb-ci/new_collation_fullstack/expr.test b/tests/tidb-ci/new_collation_fullstack/expr.test index 15ada0f335c..1e2135c4f2d 100644 --- a/tests/tidb-ci/new_collation_fullstack/expr.test +++ b/tests/tidb-ci/new_collation_fullstack/expr.test @@ -35,6 +35,13 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select /*+ read_from_s | 2 | abc | +------+-------+ +mysql> set session tidb_isolation_read_engines='tiflash'; select /*+ read_from_storage(tiflash[t]) */ id, value1 from test.t where value1 = 'abc '; ++------+-------+ +| id | value1| ++------+-------+ +| 1 | abc | +| 2 | abc | ++------+-------+ mysql> set session tidb_isolation_read_engines='tiflash'; select /*+ read_from_storage(tiflash[t]) */ id, value from test.t where value like 'aB%'; +------+-------+ @@ -62,6 +69,13 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select /*+ read_from_s | 3 | def | +------+-------+ +mysql> set session tidb_isolation_read_engines='tiflash'; select /*+ read_from_storage(tiflash[t]) */ id, value1 from test.t where value1 = 'def '; ++------+-------+ +| id | value1| ++------+-------+ +| 3 | def | ++------+-------+ + mysql> set session tidb_isolation_read_engines='tiflash'; select /*+ read_from_storage(tiflash[t]) */ id, value1 from test.t where value1 in ('Abc','def'); +------+-------+ | id | value1| From 47657d34c75d8993c598217f7d96b190666e5f55 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 7 Jul 2022 20:37:02 +0800 Subject: [PATCH 5/7] feat : support set tiflash mode ddl action (#5256) ref pingcap/tiflash#5252 --- dbms/src/Debug/DBGInvoker.cpp | 2 + dbms/src/Debug/dbgFuncSchemaName.cpp | 52 +++++++++++++ dbms/src/Debug/dbgFuncSchemaName.h | 10 +++ dbms/src/Storages/Transaction/TiDB.cpp | 42 ++++++++++ dbms/src/Storages/Transaction/TiDB.h | 11 +++ .../Transaction/tests/gtest_table_info.cpp | 34 ++++---- dbms/src/TiDB/Schema/SchemaBuilder.cpp | 77 +++++++++++++++++++ dbms/src/TiDB/Schema/SchemaBuilder.h | 4 + dbms/src/TiDB/Schema/SchemaGetter.h | 5 +- ...alter_table_tiflash_replica_and_mode.test} | 34 ++++++++ .../ddl/alter_tiflash_mode.test | 48 ++++++++++++ 11 files changed, 301 insertions(+), 18 deletions(-) rename tests/fullstack-test2/ddl/{alter_table_tiflash_replica.test => alter_table_tiflash_replica_and_mode.test} (60%) create mode 100644 tests/fullstack-test2/ddl/alter_tiflash_mode.test diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 3654a437bf7..df993d8e6e9 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -120,6 +120,8 @@ DBGInvoker::DBGInvoker() regSchemafulFunc("query_mapped", dbgFuncQueryMapped); regSchemalessFunc("get_tiflash_replica_count", dbgFuncGetTiflashReplicaCount); regSchemalessFunc("get_partition_tables_tiflash_replica_count", dbgFuncGetPartitionTablesTiflashReplicaCount); + regSchemalessFunc("get_tiflash_mode", dbgFuncGetTiflashMode); + regSchemalessFunc("get_partition_tables_tiflash_mode", dbgFuncGetPartitionTablesTiflashMode); regSchemalessFunc("search_log_for_key", dbgFuncSearchLogForKey); regSchemalessFunc("tidb_dag", dbgFuncTiDBQueryFromNaturalDag); diff --git a/dbms/src/Debug/dbgFuncSchemaName.cpp b/dbms/src/Debug/dbgFuncSchemaName.cpp index 5f10da6685d..3aa7b6e3af4 100644 --- a/dbms/src/Debug/dbgFuncSchemaName.cpp +++ b/dbms/src/Debug/dbgFuncSchemaName.cpp @@ -181,4 +181,56 @@ void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs output(fmt_buf.toString()); } +void dbgFuncGetTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output) +{ + if (args.empty() || args.size() != 2) + throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS); + + const String & database_name = typeid_cast(*args[0]).name; + FmtBuffer fmt_buf; + + const String & table_name = typeid_cast(*args[1]).name; + auto mapped = mappedTable(context, database_name, table_name); + auto storage = context.getTable(mapped->first, mapped->second); + auto managed_storage = std::dynamic_pointer_cast(storage); + if (!managed_storage) + throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS); + + fmt_buf.append((TiFlashModeToString(managed_storage->getTableInfo().tiflash_mode))); + + output(fmt_buf.toString()); +} + +void dbgFuncGetPartitionTablesTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output) +{ + if (args.empty() || args.size() != 2) + throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS); + + const String & database_name = typeid_cast(*args[0]).name; + FmtBuffer fmt_buf; + + const String & table_name = typeid_cast(*args[1]).name; + auto mapped = mappedTable(context, database_name, table_name); + auto storage = context.getTable(mapped->first, mapped->second); + auto managed_storage = std::dynamic_pointer_cast(storage); + if (!managed_storage) + throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS); + + auto table_info = managed_storage->getTableInfo(); + + if (!table_info.isLogicalPartitionTable()) + throw Exception(database_name + "." + table_name + " is not logical partition table", ErrorCodes::BAD_ARGUMENTS); + + SchemaNameMapper name_mapper; + for (const auto & part_def : table_info.partition.definitions) + { + auto paritition_table_info = table_info.producePartitionTableInfo(part_def.id, name_mapper); + auto partition_storage = context.getTMTContext().getStorages().get(paritition_table_info->id); + fmt_buf.append((TiFlashModeToString(partition_storage->getTableInfo().tiflash_mode))); + fmt_buf.append("/"); + } + + output(fmt_buf.toString()); +} + } // namespace DB diff --git a/dbms/src/Debug/dbgFuncSchemaName.h b/dbms/src/Debug/dbgFuncSchemaName.h index 2a31b3a7f6d..ec18f89e911 100644 --- a/dbms/src/Debug/dbgFuncSchemaName.h +++ b/dbms/src/Debug/dbgFuncSchemaName.h @@ -50,4 +50,14 @@ void dbgFuncGetTiflashReplicaCount(Context & context, const ASTs & args, DBGInvo // ./storage-client.sh "DBGInvoke get_partition_tables_tiflash_replica_count(db_name, table_name)" void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs & args, DBGInvoker::Printer output); +// Get table's tiflash mode with mapped table name +// Usage: +// ./storage-client.sh "DBGInvoke get_tiflash_mode(db_name, table_name)" +void dbgFuncGetTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output); + +// Get the logical table's partition tables' tiflash replica counts with mapped table name +// Usage: +// ./storage-client.sh "DBGInvoke get_partition_tables_tiflash_mode(db_name, table_name)" +void dbgFuncGetPartitionTablesTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output); + } // namespace DB diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index dc7f1f3e348..6d07c47f235 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include @@ -772,6 +773,37 @@ catch (const Poco::Exception & e) DB::Exception(e)); } +String TiFlashModeToString(TiFlashMode tiflash_mode) +{ + switch (tiflash_mode) + { + case TiFlashMode::Normal: + return ""; + case TiFlashMode::Fast: + return "fast"; + default: + LOG_FMT_WARNING(&Poco::Logger::get("TiDB"), "TiFlashModeToString with invalid tiflash mode {}", tiflash_mode); + return ""; + } +} + +TiFlashMode parseTiFlashMode(String mode_str) +{ + if (mode_str.empty()) + { + return TiFlashMode::Normal; + } + else if (mode_str == "fast") + { + return TiFlashMode::Fast; + } + else + { + throw DB::Exception( + std::string(__PRETTY_FUNCTION__) + + " ParseTiFlashMode Failed. mode " + mode_str + " is unvalid, please set mode as fast/normal"); + } +} /////////////////////// ////// TableInfo ////// /////////////////////// @@ -840,6 +872,8 @@ try json->set("tiflash_replica", replica_info.getJSONObject()); + json->set("tiflash_mode", std::string(TiFlashModeToString(tiflash_mode))); + json->stringify(buf); return buf.str(); @@ -926,6 +960,14 @@ try replica_info.deserialize(replica_obj); } } + if (obj->has("tiflash_mode")) + { + auto mode = obj->getValue("tiflash_mode"); + if (!mode.empty()) + { + tiflash_mode = parseTiFlashMode(mode); + } + } if (is_common_handle && index_infos.size() != 1) { throw DB::Exception( diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 4c28a614857..a9d46b60c13 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -333,6 +333,12 @@ struct IndexInfo bool is_global; }; +enum class TiFlashMode +{ + Normal, + Fast, +}; + struct TableInfo { TableInfo() = default; @@ -382,6 +388,8 @@ struct TableInfo // The TiFlash replica info persisted by TiDB TiFlashReplicaInfo replica_info; + TiFlashMode tiflash_mode = TiFlashMode::Normal; + ::TiDB::StorageEngine engine_type = ::TiDB::StorageEngine::UNSPECIFIED; ColumnID getColumnID(const String & name) const; @@ -413,4 +421,7 @@ String genJsonNull(); tipb::FieldType columnInfoToFieldType(const ColumnInfo & ci); ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type); +String TiFlashModeToString(TiFlashMode tiflash_mode); +TiFlashMode parseTiFlashMode(String mode_str); + } // namespace TiDB diff --git a/dbms/src/Storages/Transaction/tests/gtest_table_info.cpp b/dbms/src/Storages/Transaction/tests/gtest_table_info.cpp index 516a173b151..871153cb0e9 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_table_info.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_table_info.cpp @@ -42,7 +42,7 @@ struct ParseCase std::function check; }; -TEST(TiDBTableInfo_test, ParseFromJSON) +TEST(TiDBTableInfoTest, ParseFromJSON) try { auto cases = { @@ -136,54 +136,54 @@ struct StmtCase } }; -TEST(TiDBTableInfo_test, GenCreateTableStatement) +TEST(TiDBTableInfoTest, GenCreateTableStatement) try { auto cases = // {StmtCase{ 1145, // R"json({"id":1939,"db_name":{"O":"customer","L":"customer"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // - R"json({"id":1145,"name":{"O":"customerdebt","L":"customerdebt"},"cols":[{"id":1,"name":{"O":"id","L":"id"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"type":{"Tp":8,"Flag":515,"Flen":20,"Decimal":0},"state":5,"comment":"i\"d"}],"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"负债信息","partition":null})json", // - R"stmt(CREATE TABLE `customer`.`customerdebt`(`id` Int64) Engine = DeltaMerge((`id`), '{"cols":[{"comment":"i\\"d","default":null,"default_bit":null,"id":1,"name":{"L":"id","O":"id"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":null,"Collate":null,"Decimal":0,"Elems":null,"Flag":515,"Flen":20,"Tp":8}}],"comment":"\\u8D1F\\u503A\\u4FE1\\u606F","id":1145,"index_info":[],"is_common_handle":false,"name":{"L":"customerdebt","O":"customerdebt"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"tiflash_replica":{"Count":0},"update_timestamp":0}'))stmt", // + R"json({"id":1145,"name":{"O":"customerdebt","L":"customerdebt"},"cols":[{"id":1,"name":{"O":"id","L":"id"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"type":{"Tp":8,"Flag":515,"Flen":20,"Decimal":0},"state":5,"comment":"i\"d"}],"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"负债信息","partition":null,"tiflash_mode":"fast"})json", // + R"stmt(CREATE TABLE `customer`.`customerdebt`(`id` Int64) Engine = DeltaMerge((`id`), '{"cols":[{"comment":"i\\"d","default":null,"default_bit":null,"id":1,"name":{"L":"id","O":"id"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":null,"Collate":null,"Decimal":0,"Elems":null,"Flag":515,"Flen":20,"Tp":8}}],"comment":"\\u8D1F\\u503A\\u4FE1\\u606F","id":1145,"index_info":[],"is_common_handle":false,"name":{"L":"customerdebt","O":"customerdebt"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"tiflash_mode":"fast","tiflash_replica":{"Count":0},"update_timestamp":0}'))stmt", // }, StmtCase{ 2049, // R"json({"id":1939,"db_name":{"O":"customer","L":"customer"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // - R"json({"id":2049,"name":{"O":"customerdebt","L":"customerdebt"},"cols":[{"id":1,"name":{"O":"id","L":"id"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"type":{"Tp":8,"Flag":515,"Flen":20,"Decimal":0},"state":5,"comment":"i\"d"}],"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"负债信息","update_timestamp":404545295996944390,"partition":null})json", // - R"stmt(CREATE TABLE `customer`.`customerdebt`(`id` Int64) Engine = DeltaMerge((`id`), '{"cols":[{"comment":"i\\"d","default":null,"default_bit":null,"id":1,"name":{"L":"id","O":"id"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":null,"Collate":null,"Decimal":0,"Elems":null,"Flag":515,"Flen":20,"Tp":8}}],"comment":"\\u8D1F\\u503A\\u4FE1\\u606F","id":2049,"index_info":[],"is_common_handle":false,"name":{"L":"customerdebt","O":"customerdebt"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"tiflash_replica":{"Count":0},"update_timestamp":404545295996944390}'))stmt", // + R"json({"id":2049,"name":{"O":"customerdebt","L":"customerdebt"},"cols":[{"id":1,"name":{"O":"id","L":"id"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"type":{"Tp":8,"Flag":515,"Flen":20,"Decimal":0},"state":5,"comment":"i\"d"}],"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"负债信息","update_timestamp":404545295996944390,"partition":null,"tiflash_mode":""})json", // + R"stmt(CREATE TABLE `customer`.`customerdebt`(`id` Int64) Engine = DeltaMerge((`id`), '{"cols":[{"comment":"i\\"d","default":null,"default_bit":null,"id":1,"name":{"L":"id","O":"id"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":null,"Collate":null,"Decimal":0,"Elems":null,"Flag":515,"Flen":20,"Tp":8}}],"comment":"\\u8D1F\\u503A\\u4FE1\\u606F","id":2049,"index_info":[],"is_common_handle":false,"name":{"L":"customerdebt","O":"customerdebt"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"tiflash_mode":"","tiflash_replica":{"Count":0},"update_timestamp":404545295996944390}'))stmt", // }, StmtCase{ 31, // R"json({"id":1,"db_name":{"O":"db1","L":"db1"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // - R"json({"id":31,"name":{"O":"simple_t","L":"simple_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":0,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545295996944390,"ShardRowIDBits":0,"partition":null})json", // - R"stmt(CREATE TABLE `db1`.`simple_t`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":31,"index_info":[],"is_common_handle":false,"name":{"L":"simple_t","O":"simple_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"tiflash_replica":{"Count":0},"update_timestamp":404545295996944390}'))stmt", // + R"json({"id":31,"name":{"O":"simple_t","L":"simple_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":0,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545295996944390,"ShardRowIDBits":0,"partition":null,"tiflash_mode":""})json", // + R"stmt(CREATE TABLE `db1`.`simple_t`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":31,"index_info":[],"is_common_handle":false,"name":{"L":"simple_t","O":"simple_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"tiflash_mode":"","tiflash_replica":{"Count":0},"update_timestamp":404545295996944390}'))stmt", // }, StmtCase{ 33, // R"json({"id":2,"db_name":{"O":"db2","L":"db2"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // - R"json({"id":33,"name":{"O":"pk_t","L":"pk_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":3,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545312978108418,"ShardRowIDBits":0,"partition":null})json", // - R"stmt(CREATE TABLE `db2`.`pk_t`(`i` Int32) Engine = DeltaMerge((`i`), '{"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Elems":null,"Flag":3,"Flen":11,"Tp":3}}],"comment":"","id":33,"index_info":[],"is_common_handle":false,"name":{"L":"pk_t","O":"pk_t"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"tiflash_replica":{"Count":0},"update_timestamp":404545312978108418}'))stmt", // + R"json({"id":33,"name":{"O":"pk_t","L":"pk_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":3,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545312978108418,"ShardRowIDBits":0,"partition":null,"tiflash_mode":""})json", // + R"stmt(CREATE TABLE `db2`.`pk_t`(`i` Int32) Engine = DeltaMerge((`i`), '{"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Elems":null,"Flag":3,"Flen":11,"Tp":3}}],"comment":"","id":33,"index_info":[],"is_common_handle":false,"name":{"L":"pk_t","O":"pk_t"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"tiflash_mode":"","tiflash_replica":{"Count":0},"update_timestamp":404545312978108418}'))stmt", // }, StmtCase{ 35, // R"json({"id":1,"db_name":{"O":"db1","L":"db1"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // - R"json({"id":35,"name":{"O":"not_null_t","L":"not_null_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":4097,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545324922961926,"ShardRowIDBits":0,"partition":null})json", // - R"stmt(CREATE TABLE `db1`.`not_null_t`(`i` Int32, `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Elems":null,"Flag":4097,"Flen":11,"Tp":3}}],"comment":"","id":35,"index_info":[],"is_common_handle":false,"name":{"L":"not_null_t","O":"not_null_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"tiflash_replica":{"Count":0},"update_timestamp":404545324922961926}'))stmt", // + R"json({"id":35,"name":{"O":"not_null_t","L":"not_null_t"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":4097,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":""}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":404545324922961926,"ShardRowIDBits":0,"partition":null,"tiflash_mode":""})json", // + R"stmt(CREATE TABLE `db1`.`not_null_t`(`i` Int32, `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Elems":null,"Flag":4097,"Flen":11,"Tp":3}}],"comment":"","id":35,"index_info":[],"is_common_handle":false,"name":{"L":"not_null_t","O":"not_null_t"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"tiflash_mode":"","tiflash_replica":{"Count":0},"update_timestamp":404545324922961926}'))stmt", // }, StmtCase{ 37, // R"json({"id":2,"db_name":{"O":"db2","L":"db2"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", - R"json({"id":37,"name":{"O":"mytable","L":"mytable"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"mycol","L":"mycol"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":15,"Flag":4099,"Flen":256,"Decimal":0,"Charset":"utf8","Collate":"utf8_bin","Elems":null},"state":5,"comment":""}],"index_info":[{"id":1,"idx_name":{"O":"PRIMARY","L":"primary"},"tbl_name":{"O":"","L":""},"idx_cols":[{"name":{"O":"mycol","L":"mycol"},"offset":0,"length":-1}],"is_unique":true,"is_primary":true,"state":5,"comment":"","index_type":1}],"fk_info":null,"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":1,"update_timestamp":404566455285710853,"ShardRowIDBits":0,"partition":null})json", // - R"stmt(CREATE TABLE `db2`.`mytable`(`mycol` String) Engine = DeltaMerge((`mycol`), '{"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"mycol","O":"mycol"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"utf8","Collate":"utf8_bin","Decimal":0,"Elems":null,"Flag":4099,"Flen":256,"Tp":15}}],"comment":"","id":37,"index_info":[],"is_common_handle":false,"name":{"L":"mytable","O":"mytable"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"tiflash_replica":{"Count":0},"update_timestamp":404566455285710853}'))stmt", // + R"json({"id":37,"name":{"O":"mytable","L":"mytable"},"charset":"","collate":"","cols":[{"id":1,"name":{"O":"mycol","L":"mycol"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":15,"Flag":4099,"Flen":256,"Decimal":0,"Charset":"utf8","Collate":"utf8_bin","Elems":null},"state":5,"comment":""}],"index_info":[{"id":1,"idx_name":{"O":"PRIMARY","L":"primary"},"tbl_name":{"O":"","L":""},"idx_cols":[{"name":{"O":"mycol","L":"mycol"},"offset":0,"length":-1}],"is_unique":true,"is_primary":true,"state":5,"comment":"","index_type":1}],"fk_info":null,"state":5,"pk_is_handle":true,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":1,"update_timestamp":404566455285710853,"ShardRowIDBits":0,"partition":null,"tiflash_mode":""})json", // + R"stmt(CREATE TABLE `db2`.`mytable`(`mycol` String) Engine = DeltaMerge((`mycol`), '{"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"mycol","O":"mycol"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"utf8","Collate":"utf8_bin","Decimal":0,"Elems":null,"Flag":4099,"Flen":256,"Tp":15}}],"comment":"","id":37,"index_info":[],"is_common_handle":false,"name":{"L":"mytable","O":"mytable"},"partition":null,"pk_is_handle":true,"schema_version":-1,"state":5,"tiflash_mode":"","tiflash_replica":{"Count":0},"update_timestamp":404566455285710853}'))stmt", // }, StmtCase{ 32, // R"json({"id":1,"db_name":{"O":"test","L":"test"},"charset":"utf8mb4","collate":"utf8mb4_bin","state":5})json", // - R"json({"id":31,"name":{"O":"range_part_t","L":"range_part_t"},"charset":"utf8mb4","collate":"utf8mb4_bin","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":0,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":"","version":0}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":407445773801488390,"ShardRowIDBits":0,"partition":{"type":1,"expr":"`i`","columns":null,"enable":true,"definitions":[{"id":32,"name":{"O":"p0","L":"p0"},"less_than":["0"]},{"id":33,"name":{"O":"p1","L":"p1"},"less_than":["100"]}],"num":0},"compression":"","version":1})json", // - R"stmt(CREATE TABLE `test`.`range_part_t_32`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"belonging_table_id":31,"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":32,"index_info":[],"is_common_handle":false,"is_partition_sub_table":true,"name":{"L":"range_part_t_32","O":"range_part_t_32"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"tiflash_replica":{"Count":0},"update_timestamp":407445773801488390}'))stmt", // + R"json({"id":31,"name":{"O":"range_part_t","L":"range_part_t"},"charset":"utf8mb4","collate":"utf8mb4_bin","cols":[{"id":1,"name":{"O":"i","L":"i"},"offset":0,"origin_default":null,"default":null,"default_bit":null,"generated_expr_string":"","generated_stored":false,"dependences":null,"type":{"Tp":3,"Flag":0,"Flen":11,"Decimal":0,"Charset":"binary","Collate":"binary","Elems":null},"state":5,"comment":"","version":0}],"index_info":null,"fk_info":null,"state":5,"pk_is_handle":false,"schema_version":-1,"comment":"","auto_inc_id":0,"max_col_id":1,"max_idx_id":0,"update_timestamp":407445773801488390,"ShardRowIDBits":0,"partition":{"type":1,"expr":"`i`","columns":null,"enable":true,"definitions":[{"id":32,"name":{"O":"p0","L":"p0"},"less_than":["0"]},{"id":33,"name":{"O":"p1","L":"p1"},"less_than":["100"]}],"num":0},"compression":"","version":1,"tiflash_mode":""})json", // + R"stmt(CREATE TABLE `test`.`range_part_t_32`(`i` Nullable(Int32), `_tidb_rowid` Int64) Engine = DeltaMerge((`_tidb_rowid`), '{"belonging_table_id":31,"cols":[{"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"i","O":"i"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Elems":null,"Flag":0,"Flen":11,"Tp":3}}],"comment":"","id":32,"index_info":[],"is_common_handle":false,"is_partition_sub_table":true,"name":{"L":"range_part_t_32","O":"range_part_t_32"},"partition":null,"pk_is_handle":false,"schema_version":-1,"state":5,"tiflash_mode":"","tiflash_replica":{"Count":0},"update_timestamp":407445773801488390}'))stmt", // }}; - for (auto & c : cases) + for (const auto & c : cases) { c.verifyTableInfo(); } diff --git a/dbms/src/TiDB/Schema/SchemaBuilder.cpp b/dbms/src/TiDB/Schema/SchemaBuilder.cpp index abb575c75bc..6e4ad10e344 100644 --- a/dbms/src/TiDB/Schema/SchemaBuilder.cpp +++ b/dbms/src/TiDB/Schema/SchemaBuilder.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -542,6 +543,11 @@ void SchemaBuilder::applyDiff(const SchemaDiff & diff) applySetTiFlashReplica(db_info, diff.table_id); break; } + case SchemaActionType::SetTiFlashMode: + { + applySetTiFlashMode(db_info, diff.table_id); + break; + } default: { if (diff.type < SchemaActionType::MaxRecognizedType) @@ -1258,6 +1264,75 @@ void SchemaBuilder::applySetTiFlashReplicaOnPhysicalTable( LOG_FMT_INFO(log, "Updated replica info for {}", name_mapper.debugCanonicalName(*db_info, table_info)); } + +template +void SchemaBuilder::applySetTiFlashMode(const TiDB::DBInfoPtr & db_info, TableID table_id) +{ + auto latest_table_info = getter.getTableInfo(db_info->id, table_id); + + if (unlikely(latest_table_info == nullptr)) + { + throw TiFlashException(fmt::format("miss table in TiKV : {}", table_id), Errors::DDL::StaleSchema); + } + + auto & tmt_context = context.getTMTContext(); + auto storage = tmt_context.getStorages().get(latest_table_info->id); + if (unlikely(storage == nullptr)) + { + throw TiFlashException(fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*db_info, *latest_table_info)), + Errors::DDL::MissingTable); + } + + applySetTiFlashModeOnLogicalTable(db_info, latest_table_info, storage); +} + +template +void SchemaBuilder::applySetTiFlashModeOnLogicalTable( + const TiDB::DBInfoPtr & db_info, + const TiDB::TableInfoPtr & table_info, + const ManageableStoragePtr & storage) +{ + applySetTiFlashModeOnPhysicalTable(db_info, table_info, storage); + + if (table_info->isLogicalPartitionTable()) + { + auto & tmt_context = context.getTMTContext(); + for (const auto & part_def : table_info->partition.definitions) + { + auto new_part_table_info = table_info->producePartitionTableInfo(part_def.id, name_mapper); + auto part_storage = tmt_context.getStorages().get(table_info->id); + if (unlikely(part_storage == nullptr)) + { + throw TiFlashException(fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*db_info, *new_part_table_info)), + Errors::DDL::MissingTable); + } + applySetTiFlashModeOnPhysicalTable(db_info, new_part_table_info, part_storage); + } + } +} + + +template +void SchemaBuilder::applySetTiFlashModeOnPhysicalTable( + const TiDB::DBInfoPtr & db_info, + const TiDB::TableInfoPtr & latest_table_info, + const ManageableStoragePtr & storage) +{ + if (storage->getTableInfo().tiflash_mode == latest_table_info->tiflash_mode) + return; + + TiDB::TableInfo table_info = storage->getTableInfo(); + table_info.tiflash_mode = latest_table_info->tiflash_mode; + AlterCommands commands; + + LOG_FMT_INFO(log, "Updating tiflash mode for {} to {}", name_mapper.debugCanonicalName(*db_info, table_info), TiFlashModeToString(table_info.tiflash_mode)); + + auto alter_lock = storage->lockForAlter(getThreadName()); + storage->alterFromTiDB(alter_lock, commands, name_mapper.mapDatabaseName(*db_info), table_info, name_mapper, context); + LOG_FMT_INFO(log, "Updated tiflash mode for {} to {}", name_mapper.debugCanonicalName(*db_info, table_info), TiFlashModeToString(table_info.tiflash_mode)); +} + + template void SchemaBuilder::syncAllSchema() { @@ -1327,6 +1402,8 @@ void SchemaBuilder::syncAllSchema() applyRenameLogicalTable(db, table, storage); /// Update replica info if needed. applySetTiFlashReplicaOnLogicalTable(db, table, storage); + /// Update tiflash mode if needed. + applySetTiFlashModeOnLogicalTable(db, table, storage); /// Alter if needed. applyAlterLogicalTable(db, table, storage); LOG_FMT_DEBUG(log, "Table {} synced during sync all schemas", name_mapper.debugCanonicalName(*db, *table)); diff --git a/dbms/src/TiDB/Schema/SchemaBuilder.h b/dbms/src/TiDB/Schema/SchemaBuilder.h index 461d7ff9c12..827203a682f 100644 --- a/dbms/src/TiDB/Schema/SchemaBuilder.h +++ b/dbms/src/TiDB/Schema/SchemaBuilder.h @@ -89,6 +89,10 @@ struct SchemaBuilder void applySetTiFlashReplica(const TiDB::DBInfoPtr & db_info, TableID table_id); void applySetTiFlashReplicaOnLogicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info, const ManageableStoragePtr & storage); void applySetTiFlashReplicaOnPhysicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info, const ManageableStoragePtr & storage); + + void applySetTiFlashMode(const TiDB::DBInfoPtr & db_info, TableID table_id); + void applySetTiFlashModeOnLogicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info, const ManageableStoragePtr & storage); + void applySetTiFlashModeOnPhysicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info, const ManageableStoragePtr & storage); }; } // namespace DB diff --git a/dbms/src/TiDB/Schema/SchemaGetter.h b/dbms/src/TiDB/Schema/SchemaGetter.h index fe0ecd59af0..72fd00678f7 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.h +++ b/dbms/src/TiDB/Schema/SchemaGetter.h @@ -94,11 +94,14 @@ enum class SchemaActionType : Int8 AlterTableStatsOptions = 58, AlterNoCacheTable = 59, CreateTables = 60, + ActionMultiSchemaChange = 61, + SetTiFlashMode = 62, + // If we supporte new type from TiDB. // MaxRecognizedType also needs to be changed. // It should always be equal to the maximum supported type + 1 - MaxRecognizedType = 61, + MaxRecognizedType = 63, }; struct AffectedOption diff --git a/tests/fullstack-test2/ddl/alter_table_tiflash_replica.test b/tests/fullstack-test2/ddl/alter_table_tiflash_replica_and_mode.test similarity index 60% rename from tests/fullstack-test2/ddl/alter_table_tiflash_replica.test rename to tests/fullstack-test2/ddl/alter_table_tiflash_replica_and_mode.test index 1262cd60882..5e43936379b 100644 --- a/tests/fullstack-test2/ddl/alter_table_tiflash_replica.test +++ b/tests/fullstack-test2/ddl/alter_table_tiflash_replica_and_mode.test @@ -24,9 +24,26 @@ func> wait_table test t │ 1 │ └────────────────────────────────────┘ +# test tiflash mode in normal mode +>> DBGInvoke get_tiflash_mode("test", "t") +┌─get_tiflash_mode(test, t)─┐ +│ │ +└───────────────────────────┘ + +mysql> alter table test.t set tiflash mode fast + +>> DBGInvoke __refresh_schemas() + +# test tiflash mode in fast mode +>> DBGInvoke get_tiflash_mode("test", "t") +┌─get_tiflash_mode(test, t)───┐ +│ fast │ +└─────────────────────────────┘ + # test replica for partition tables mysql> drop table if exists test.t mysql> create table test.t (x int) partition by range (x) (partition p0 values less than (5), partition p1 values less than (10)); +mysql> alter table test.t set tiflash mode fast mysql> alter table test.t set tiflash replica 1 func> wait_table test t @@ -36,11 +53,22 @@ func> wait_table test t │ 1 │ └────────────────────────────────────┘ +>> DBGInvoke get_tiflash_mode("test", "t") +┌─get_tiflash_mode(test, t)──────────┐ +│ fast │ +└────────────────────────────────────┘ + >> DBGInvoke get_partition_tables_tiflash_replica_count("test", "t") ┌─get_partition_tables_tiflash_replica_count(test, t)─┐ │ 1/1/ │ └─────────────────────────────────────────────────────┘ +# test tiflash mode for partition tables +>> DBGInvoke get_partition_tables_tiflash_mode("test", "t") +┌─get_partition_tables_tiflash_mode(test, t)─┐ +│ fast/fast/ │ +└────────────────────────────────────────────┘ + # test replica for add partition tables after set replica mysql> alter table test.t add partition (partition p2 values less than (2010)); @@ -51,5 +79,11 @@ mysql> alter table test.t add partition (partition p2 values less than (2010)); │ 1/1/1/ │ └─────────────────────────────────────────────────────┘ +# test tiflash mode for add partition tables after set replica +>> DBGInvoke get_partition_tables_tiflash_mode("test", "t") +┌─get_partition_tables_tiflash_mode(test, t)─┐ +│ fast/fast/fast/ │ +└────────────────────────────────────────────┘ + diff --git a/tests/fullstack-test2/ddl/alter_tiflash_mode.test b/tests/fullstack-test2/ddl/alter_tiflash_mode.test new file mode 100644 index 00000000000..c9f3ef488c4 --- /dev/null +++ b/tests/fullstack-test2/ddl/alter_tiflash_mode.test @@ -0,0 +1,48 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t +mysql> create table test.t(a int, b int) +mysql> alter table test.t set tiflash replica 1 + +func> wait_table test t + +# check default mode of tiflash table +mysql> select table_schema,table_name,replica_count,available,table_mode from information_schema.tiflash_replica where table_schema='test' and table_name='t'; ++--------------+------------+---------------+-----------+-----------+ +| table_schema | table_name | replica_count | available | table_mode| ++--------------+------------+---------------+-----------+-----------+ +| test | t | 1 | 1 | NORMAL | ++--------------+------------+---------------+-----------+-----------+ + +# check change mode + +mysql> alter table test.t set tiflash mode fast +mysql> select table_schema,table_name,replica_count,available,table_mode from information_schema.tiflash_replica where table_schema='test' and table_name='t'; ++--------------+------------+---------------+-----------+-----------+ +| table_schema | table_name | replica_count | available | table_mode| ++--------------+------------+---------------+-----------+-----------+ +| test | t | 1 | 1 | FAST | ++--------------+------------+---------------+-----------+-----------+ + +# check change mode +mysql> alter table test.t set tiflash mode normal +mysql> select table_schema,table_name,replica_count,available,table_mode from information_schema.tiflash_replica where table_schema='test' and table_name='t'; ++--------------+------------+---------------+-----------+-----------+ +| table_schema | table_name | replica_count | available | table_mode| ++--------------+------------+---------------+-----------+-----------+ +| test | t | 1 | 1 | NORMAL | ++--------------+------------+---------------+-----------+-----------+ + +mysql> drop table if exists test.t \ No newline at end of file From cbc6a95312903ca5b931221c129425c7cf4c76fc Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Thu, 7 Jul 2022 23:25:02 +0800 Subject: [PATCH 6/7] Add non-blocking functions for MPMCQueue (#5311) close pingcap/tiflash#5310 --- dbms/src/Common/MPMCQueue.h | 95 ++++++++++++------- dbms/src/Common/tests/gtest_mpmc_queue.cpp | 25 ++--- dbms/src/Common/tests/mpmc_queue_perftest.cpp | 2 +- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 2 +- 4 files changed, 78 insertions(+), 46 deletions(-) diff --git a/dbms/src/Common/MPMCQueue.h b/dbms/src/Common/MPMCQueue.h index f550ecc7ca2..31dfc65a174 100644 --- a/dbms/src/Common/MPMCQueue.h +++ b/dbms/src/Common/MPMCQueue.h @@ -74,56 +74,80 @@ class MPMCQueue destruct(getObj(read_pos)); } - /// Block util: + /// Block until: /// 1. Pop succeeds with a valid T: return true. /// 2. The queue is cancelled or finished: return false. - bool pop(T & obj) + ALWAYS_INLINE bool pop(T & obj) { - return popObj(obj); + return popObj(obj); } - /// Besides all conditions mentioned at `pop`, `tryPop` will return false if `timeout` is exceeded. + /// Besides all conditions mentioned at `pop`, `popTimeout` will return false if `timeout` is exceeded. template - bool tryPop(T & obj, const Duration & timeout) + ALWAYS_INLINE bool popTimeout(T & obj, const Duration & timeout) { /// std::condition_variable::wait_until will always use system_clock. auto deadline = std::chrono::system_clock::now() + timeout; - return popObj(obj, &deadline); + return popObj(obj, &deadline); } - /// Block util: + /// Non-blocking function. + /// Return true if pop succeed. + /// else return false. + ALWAYS_INLINE bool tryPop(T & obj) + { + return popObj(obj); + } + + /// Block until: /// 1. Push succeeds and return true. /// 2. The queue is cancelled and return false. /// 3. The queue has finished and return false. template ALWAYS_INLINE bool push(U && u) { - return pushObj(std::forward(u)); + return pushObj(std::forward(u)); } - /// Besides all conditions mentioned at `push`, `tryPush` will return false if `timeout` is exceeded. + /// Besides all conditions mentioned at `push`, `pushTimeout` will return false if `timeout` is exceeded. template - ALWAYS_INLINE bool tryPush(U && u, const Duration & timeout) + ALWAYS_INLINE bool pushTimeout(U && u, const Duration & timeout) { /// std::condition_variable::wait_until will always use system_clock. auto deadline = std::chrono::system_clock::now() + timeout; - return pushObj(std::forward(u), &deadline); + return pushObj(std::forward(u), &deadline); + } + + /// Non-blocking function. + /// Return true if push succeed. + /// else return false. + template + ALWAYS_INLINE bool tryPush(U && u) + { + return pushObj(std::forward(u)); } /// The same as `push` except it will construct the object in place. template ALWAYS_INLINE bool emplace(Args &&... args) { - return emplaceObj(nullptr, std::forward(args)...); + return emplaceObj(nullptr, std::forward(args)...); } - /// The same as `tryPush` except it will construct the object in place. + /// The same as `pushTimeout` except it will construct the object in place. template - ALWAYS_INLINE bool tryEmplace(Args &&... args, const Duration & timeout) + ALWAYS_INLINE bool emplaceTimeout(Args &&... args, const Duration & timeout) { /// std::condition_variable::wait_until will always use system_clock. auto deadline = std::chrono::system_clock::now() + timeout; - return emplaceObj(&deadline, std::forward(args)...); + return emplaceObj(&deadline, std::forward(args)...); + } + + /// The same as `tryPush` except it will construct the object in place. + template + ALWAYS_INLINE bool tryEmplace(Args &&... args) + { + return emplaceObj(nullptr, std::forward(args)...); } /// Cancel a NORMAL queue will wake up all blocking readers and writers. @@ -233,7 +257,8 @@ class MPMCQueue } } - bool popObj(T & res, const TimePoint * deadline = nullptr) + template + bool popObj(T & res, [[maybe_unused]] const TimePoint * deadline = nullptr) { #ifdef __APPLE__ WaitingNode node; @@ -241,14 +266,16 @@ class MPMCQueue thread_local WaitingNode node; #endif { - /// read_pos < write_pos means the queue isn't empty - auto pred = [&] { - return read_pos < write_pos || !isNormal(); - }; - std::unique_lock lock(mu); - wait(lock, reader_head, node, pred, deadline); + if constexpr (need_wait) + { + /// read_pos < write_pos means the queue isn't empty + auto pred = [&] { + return read_pos < write_pos || !isNormal(); + }; + wait(lock, reader_head, node, pred, deadline); + } if (!isCancelled() && read_pos < write_pos) { @@ -272,21 +299,23 @@ class MPMCQueue return false; } - template - bool assignObj(const TimePoint * deadline, F && assigner) + template + bool assignObj([[maybe_unused]] const TimePoint * deadline, F && assigner) { #ifdef __APPLE__ WaitingNode node; #else thread_local WaitingNode node; #endif - auto pred = [&] { - return write_pos - read_pos < capacity || !isNormal(); - }; - std::unique_lock lock(mu); - wait(lock, writer_head, node, pred, deadline); + if constexpr (need_wait) + { + auto pred = [&] { + return write_pos - read_pos < capacity || !isNormal(); + }; + wait(lock, writer_head, node, pred, deadline); + } /// double check status after potential wait /// check write_pos because timeouted will also reach here. @@ -305,16 +334,16 @@ class MPMCQueue return false; } - template + template ALWAYS_INLINE bool pushObj(U && u, const TimePoint * deadline = nullptr) { - return assignObj(deadline, [&](void * addr) { new (addr) T(std::forward(u)); }); + return assignObj(deadline, [&](void * addr) { new (addr) T(std::forward(u)); }); } - template + template ALWAYS_INLINE bool emplaceObj(const TimePoint * deadline, Args &&... args) { - return assignObj(deadline, [&](void * addr) { new (addr) T(std::forward(args)...); }); + return assignObj(deadline, [&](void * addr) { new (addr) T(std::forward(args)...); }); } ALWAYS_INLINE bool isNormal() const diff --git a/dbms/src/Common/tests/gtest_mpmc_queue.cpp b/dbms/src/Common/tests/gtest_mpmc_queue.cpp index 85ad1892067..3f2748b452b 100644 --- a/dbms/src/Common/tests/gtest_mpmc_queue.cpp +++ b/dbms/src/Common/tests/gtest_mpmc_queue.cpp @@ -98,12 +98,14 @@ class MPMCQueueTest : public ::testing::Test void testCannotTryPush(MPMCQueue & queue) { auto old_size = queue.size(); - auto res = queue.tryPush(ValueHelper::make(-1), std::chrono::microseconds(1)); - auto new_size = queue.size(); - if (res) + bool ok1 = queue.tryPush(ValueHelper::make(-1)); + auto new_size1 = queue.size(); + bool ok2 = queue.pushTimeout(ValueHelper::make(-1), std::chrono::microseconds(1)); + auto new_size2 = queue.size(); + if (ok1 || ok2) throw TiFlashTestException("Should push fail"); - if (old_size != new_size) - throw TiFlashTestException(fmt::format("Size changed from {} to {} without push", old_size, new_size)); + if (old_size != new_size1 || old_size != new_size2) + throw TiFlashTestException(fmt::format("Size changed from {} to {} and {} without push", old_size, new_size1, new_size2)); } template @@ -124,12 +126,14 @@ class MPMCQueueTest : public ::testing::Test { auto old_size = queue.size(); T res; - bool ok = queue.tryPop(res, std::chrono::microseconds(1)); - auto new_size = queue.size(); - if (ok) + bool ok1 = queue.tryPop(res); + auto new_size1 = queue.size(); + bool ok2 = queue.popTimeout(res, std::chrono::microseconds(1)); + auto new_size2 = queue.size(); + if (ok1 || ok2) throw TiFlashTestException("Should pop fail"); - if (old_size != new_size) - throw TiFlashTestException(fmt::format("Size changed from {} to {} without pop", old_size, new_size)); + if (old_size != new_size1 || old_size != new_size2) + throw TiFlashTestException(fmt::format("Size changed from {} to {} and {} without pop", old_size, new_size1, new_size2)); } template @@ -474,7 +478,6 @@ class MPMCQueueTest : public ::testing::Test throwOrMove(std::move(rhs)); } - ThrowInjectable & operator=(ThrowInjectable && rhs) { if (this != &rhs) diff --git a/dbms/src/Common/tests/mpmc_queue_perftest.cpp b/dbms/src/Common/tests/mpmc_queue_perftest.cpp index d047b5d498f..ba0d00001a3 100644 --- a/dbms/src/Common/tests/mpmc_queue_perftest.cpp +++ b/dbms/src/Common/tests/mpmc_queue_perftest.cpp @@ -87,7 +87,7 @@ struct Helper> template static void pushOneTo(MPMCQueue & queue, U && data) { - queue.tryPush(std::forward(data), std::chrono::milliseconds(1)); + queue.pushTimeout(std::forward(data), std::chrono::milliseconds(1)); } }; diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 3b36adf2c40..966babb832f 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -424,7 +424,7 @@ void ExchangeReceiverBase::reactor(const std::vector & asyn for (Int32 i = 0; i < check_waiting_requests_freq; ++i) { AsyncHandler * handler = nullptr; - if (unlikely(!ready_requests.tryPop(handler, timeout))) + if (unlikely(!ready_requests.popTimeout(handler, timeout))) break; handler->handle(); From e58a007b48308369caa5fd458469c86fbd069602 Mon Sep 17 00:00:00 2001 From: hehechen Date: Fri, 8 Jul 2022 10:57:02 +0800 Subject: [PATCH 7/7] add random segment test for CI weekly (#5300) close pingcap/tiflash#5301 --- .../Storages/DeltaMerge/tests/gtest_segment.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 1c68ba3bb2a..dc43ef3713b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -75,12 +75,26 @@ CATCH TEST_F(SegmentOperationTest, TestSegmentRandom) try { + srand(time(nullptr)); SegmentTestOptions options; options.is_common_handle = true; reloadWithOptions(options); randomSegmentTest(100); } CATCH + +// run in CI weekly +TEST_F(SegmentOperationTest, DISABLED_TestSegmentRandomForCI) +try +{ + srand(time(nullptr)); + SegmentTestOptions options; + options.is_common_handle = true; + reloadWithOptions(options); + randomSegmentTest(10000); +} +CATCH + } // namespace tests } // namespace DM } // namespace DB