From c09e0363f53aa06ff4a2754dca93c80ae419d5ec Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 17 Jun 2022 10:22:35 +0800 Subject: [PATCH] MPP: update the state of building a hash table when createOnce throw exceptions (#4202) (#4267) close pingcap/tiflash#4195 --- dbms/src/Common/FailPoint.cpp | 4 +- .../CreatingSetsBlockInputStream.cpp | 44 +++++++++++++------ dbms/src/Interpreters/Join.cpp | 10 +++-- dbms/src/Interpreters/Join.h | 10 ++++- tests/fullstack-test/mpp/mpp_fail.test | 32 ++++++++++++++ 5 files changed, 80 insertions(+), 20 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 0f17111cd03..2dec612cb17 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -47,7 +47,9 @@ std::unordered_map> FailPointHelper::f M(segment_merge_after_ingest_packs) \ M(force_formal_page_file_not_exists) \ M(force_legacy_or_checkpoint_page_file_exists) \ - M(exception_in_creating_set_input_stream) + M(exception_in_creating_set_input_stream) \ + M(exception_when_read_from_log) \ + M(exception_mpp_hash_build) #define APPLY_FOR_FAILPOINTS(M) \ M(force_set_page_file_write_errno) \ diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp index 3fcef57544d..f7ec1185735 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp @@ -15,16 +15,18 @@ namespace DB namespace FailPoints { extern const char exception_in_creating_set_input_stream[]; -} +extern const char exception_mpp_hash_build[]; +} // namespace FailPoints namespace ErrorCodes { extern const int SET_SIZE_LIMIT_EXCEEDED; } CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(const BlockInputStreamPtr & input, - std::vector && subqueries_for_sets_list_, - const SizeLimits & network_transfer_limits, Int64 mpp_task_id_) - : subqueries_for_sets_list(std::move(subqueries_for_sets_list_)), network_transfer_limits(network_transfer_limits), mpp_task_id(mpp_task_id_) + std::vector && subqueries_for_sets_list_, const SizeLimits & network_transfer_limits, Int64 mpp_task_id_) + : subqueries_for_sets_list(std::move(subqueries_for_sets_list_)), + network_transfer_limits(network_transfer_limits), + mpp_task_id(mpp_task_id_) { init(input); } @@ -90,12 +92,14 @@ void CreatingSetsBlockInputStream::createAll() { for (auto & subqueries_for_sets : subqueries_for_sets_list) { - for (auto &elem : subqueries_for_sets) + for (auto & elem : subqueries_for_sets) { if (elem.second.join) - elem.second.join->setFinishBuildTable(false); + elem.second.join->setBuildTableState(Join::BuildTableState::WAITING); } } + + Stopwatch watch; for (auto & subqueries_for_sets : subqueries_for_sets_list) { for (auto & elem : subqueries_for_sets) @@ -104,7 +108,8 @@ void CreatingSetsBlockInputStream::createAll() { if (isCancelledOrThrowIfKilled()) return; - workers.push_back(std::thread(&CreatingSetsBlockInputStream::createOne, this, std::ref(elem.second), current_memory_tracker)); + workers.push_back( + std::thread(&CreatingSetsBlockInputStream::createOne, this, std::ref(elem.second), current_memory_tracker)); FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream); } } @@ -115,7 +120,15 @@ void CreatingSetsBlockInputStream::createAll() } if (!exception_from_workers.empty()) + { + LOG_ERROR(log, + "Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) + << " sec with exception and rethrow the first of total " << exception_from_workers.size() + << " exceptions."); std::rethrow_exception(exception_from_workers.front()); + } + LOG_DEBUG( + log, "Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) << "sec. "); created = true; } @@ -123,21 +136,18 @@ void CreatingSetsBlockInputStream::createAll() void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTracker * memory_tracker) { + Stopwatch watch; try { - current_memory_tracker = memory_tracker; LOG_DEBUG(log, (subquery.set ? "Creating set. " : "") << (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task " << std::to_string(mpp_task_id)); - Stopwatch watch; - BlockOutputStreamPtr table_out; if (subquery.table) table_out = subquery.table->write({}, {}); - bool done_with_set = !subquery.set; bool done_with_join = !subquery.join; bool done_with_table = !subquery.table; @@ -194,7 +204,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr if (subquery.join) - subquery.join->setFinishBuildTable(true); + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build); + subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED); + } if (table_out) table_out->writeSuffix(); @@ -235,10 +248,15 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr LOG_DEBUG(log, "Subquery has empty result for task " << std::to_string(mpp_task_id) << "."); } } - catch (std::exception & e) + catch (...) { std::unique_lock lock(exception_mutex); exception_from_workers.push_back(std::current_exception()); + if (subquery.join) + subquery.join->setBuildTableState(Join::BuildTableState::FAILED); + LOG_ERROR(log, + "task" << std::to_string(mpp_task_id) << " throw exception: " << getCurrentExceptionMessage(false, true) << " In " + << std::to_string(watch.elapsedSeconds()) << " sec. "); } } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index ace1b61ba7a..3d24ce38aae 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -72,7 +72,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u other_condition_ptr(other_condition_ptr_), original_strictness(strictness), max_block_size_for_cross_join(max_block_size_), - have_finish_build(true), + build_table_state(BuildTableState::SUCCEED), log(&Logger::get("Join")), limits(limits) { @@ -98,10 +98,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u throw Exception("Not supported: non right join with right conditions"); } -void Join::setFinishBuildTable(bool finish_) +void Join::setBuildTableState(BuildTableState state_) { std::lock_guard lk(build_table_mutex); - have_finish_build = finish_; + build_table_state = state_; build_table_cv.notify_all(); } @@ -1517,7 +1517,9 @@ void Join::joinBlock(Block & block) const { std::unique_lock lk(build_table_mutex); - build_table_cv.wait(lk, [&]() { return have_finish_build; }); + build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; }); + if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table + throw Exception("Build failed before join probe!"); } std::shared_lock lock(rwlock); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 74fbfca4ee0..457c9bdc184 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -292,7 +292,13 @@ class Join bool isBuildSetExceeded() const { return build_set_exceeded.load(); } size_t getNotJoinedStreamConcurrency() const { return build_concurrency; }; - void setFinishBuildTable(bool); + enum BuildTableState + { + WAITING, + FAILED, + SUCCEED + }; + void setBuildTableState(BuildTableState state_); /// Reference to the row in block. struct RowRef @@ -446,7 +452,7 @@ class Join mutable std::mutex build_table_mutex; mutable std::condition_variable build_table_cv; - bool have_finish_build; + BuildTableState build_table_state; Poco::Logger * log; diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index 006d2abf77d..ebf09408e06 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -81,5 +81,37 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) +## exception during mpp hash build +## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## | id | estRows | task | access object | operator info | +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## | Projection | 0.99 | root | | test.t.id | +## | └─TableReader | 0.99 | root | | data:ExchangeSender | +## | └─ExchangeSender | 0.99 | batchCop[tiflash] | | ExchangeType: PassThrough | +## | └─HashJoin | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] | +## | ├─HashJoin(Build) | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] | +## | │ ├─ExchangeReceiver(Build) | 1.00 | batchCop[tiflash] | | | +## | │ │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | │ │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) | +## | │ │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t1 | keep order:false, stats:pseudo | +## | │ └─ExchangeReceiver(Probe) | 1.00 | batchCop[tiflash] | | | +## | │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) | +## | │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t2 | keep order:false, stats:pseudo | +## | └─Projection(Probe) | 2.40 | batchCop[tiflash] | | test.t.id | +## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, funcs:firstrow(test.t.id)->test.t.id | +## | └─ExchangeReceiver | 2.40 | batchCop[tiflash] | | | +## | └─ExchangeSender | 2.40 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, | +## | └─Selection | 3.00 | batchCop[tiflash] | | not(isnull(test.t.id)) | +## | └─TableFullScan | 3.00 | batchCop[tiflash] | table:t | keep order:false, stats:pseudo | +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs. +=> DBGInvoke __enable_fail_point(exception_mpp_hash_build) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; +ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered. +=> DBGInvoke __disable_fail_point(exception_mpp_hash_build) + # Clean up. mysql> drop table if exists test.t