Skip to content

Commit

Permalink
MPP: update the state of building a hash table when createOnce throw …
Browse files Browse the repository at this point in the history
…exceptions (#4202) (#4267)

close #4195
  • Loading branch information
ti-chi-bot authored Jun 17, 2022
1 parent fccc6c8 commit c09e036
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 20 deletions.
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(segment_merge_after_ingest_packs) \
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)
M(exception_in_creating_set_input_stream) \
M(exception_when_read_from_log) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
44 changes: 31 additions & 13 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ namespace DB
namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits, Int64 mpp_task_id_)
: subqueries_for_sets_list(std::move(subqueries_for_sets_list_)), network_transfer_limits(network_transfer_limits), mpp_task_id(mpp_task_id_)
std::vector<SubqueriesForSets> && subqueries_for_sets_list_, const SizeLimits & network_transfer_limits, Int64 mpp_task_id_)
: subqueries_for_sets_list(std::move(subqueries_for_sets_list_)),
network_transfer_limits(network_transfer_limits),
mpp_task_id(mpp_task_id_)
{
init(input);
}
Expand Down Expand Up @@ -90,12 +92,14 @@ void CreatingSetsBlockInputStream::createAll()
{
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
for (auto &elem : subqueries_for_sets)
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}

Stopwatch watch;
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
for (auto & elem : subqueries_for_sets)
Expand All @@ -104,7 +108,8 @@ void CreatingSetsBlockInputStream::createAll()
{
if (isCancelledOrThrowIfKilled())
return;
workers.push_back(std::thread(&CreatingSetsBlockInputStream::createOne, this, std::ref(elem.second), current_memory_tracker));
workers.push_back(
std::thread(&CreatingSetsBlockInputStream::createOne, this, std::ref(elem.second), current_memory_tracker));
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream);
}
}
Expand All @@ -115,29 +120,34 @@ void CreatingSetsBlockInputStream::createAll()
}

if (!exception_from_workers.empty())
{
LOG_ERROR(log,
"Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds())
<< " sec with exception and rethrow the first of total " << exception_from_workers.size()
<< " exceptions.");
std::rethrow_exception(exception_from_workers.front());
}
LOG_DEBUG(
log, "Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) << "sec. ");

created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTracker * memory_tracker)
{
Stopwatch watch;
try
{

current_memory_tracker = memory_tracker;
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< std::to_string(mpp_task_id));
Stopwatch watch;

BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand Down Expand Up @@ -194,7 +204,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -235,10 +248,15 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr
LOG_DEBUG(log, "Subquery has empty result for task " << std::to_string(mpp_task_id) << ".");
}
}
catch (std::exception & e)
catch (...)
{
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_ERROR(log,
"task" << std::to_string(mpp_task_id) << " throw exception: " << getCurrentExceptionMessage(false, true) << " In "
<< std::to_string(watch.elapsedSeconds()) << " sec. ");
}
}

Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
other_condition_ptr(other_condition_ptr_),
original_strictness(strictness),
max_block_size_for_cross_join(max_block_size_),
have_finish_build(true),
build_table_state(BuildTableState::SUCCEED),
log(&Logger::get("Join")),
limits(limits)
{
Expand All @@ -98,10 +98,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
throw Exception("Not supported: non right join with right conditions");
}

void Join::setFinishBuildTable(bool finish_)
void Join::setBuildTableState(BuildTableState state_)
{
std::lock_guard<std::mutex> lk(build_table_mutex);
have_finish_build = finish_;
build_table_state = state_;
build_table_cv.notify_all();
}

Expand Down Expand Up @@ -1517,7 +1517,9 @@ void Join::joinBlock(Block & block) const
{
std::unique_lock lk(build_table_mutex);

build_table_cv.wait(lk, [&]() { return have_finish_build; });
build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; });
if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table
throw Exception("Build failed before join probe!");
}

std::shared_lock lock(rwlock);
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,13 @@ class Join
bool isBuildSetExceeded() const { return build_set_exceeded.load(); }
size_t getNotJoinedStreamConcurrency() const { return build_concurrency; };

void setFinishBuildTable(bool);
enum BuildTableState
{
WAITING,
FAILED,
SUCCEED
};
void setBuildTableState(BuildTableState state_);

/// Reference to the row in block.
struct RowRef
Expand Down Expand Up @@ -446,7 +452,7 @@ class Join

mutable std::mutex build_table_mutex;
mutable std::condition_variable build_table_cv;
bool have_finish_build;
BuildTableState build_table_state;

Poco::Logger * log;

Expand Down
32 changes: 32 additions & 0 deletions tests/fullstack-test/mpp/mpp_fail.test
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,37 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang
=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run)
=> DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel)

## exception during mpp hash build
## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id;
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## | id | estRows | task | access object | operator info |
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## | Projection | 0.99 | root | | test.t.id |
## | └─TableReader | 0.99 | root | | data:ExchangeSender |
## | └─ExchangeSender | 0.99 | batchCop[tiflash] | | ExchangeType: PassThrough |
## | └─HashJoin | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] |
## | ├─HashJoin(Build) | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] |
## | │ ├─ExchangeReceiver(Build) | 1.00 | batchCop[tiflash] | | |
## | │ │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | │ │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) |
## | │ │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t1 | keep order:false, stats:pseudo |
## | │ └─ExchangeReceiver(Probe) | 1.00 | batchCop[tiflash] | | |
## | │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) |
## | │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t2 | keep order:false, stats:pseudo |
## | └─Projection(Probe) | 2.40 | batchCop[tiflash] | | test.t.id |
## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, funcs:firstrow(test.t.id)->test.t.id |
## | └─ExchangeReceiver | 2.40 | batchCop[tiflash] | | |
## | └─ExchangeSender | 2.40 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, |
## | └─Selection | 3.00 | batchCop[tiflash] | | not(isnull(test.t.id)) |
## | └─TableFullScan | 3.00 | batchCop[tiflash] | table:t | keep order:false, stats:pseudo |
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs.
=> DBGInvoke __enable_fail_point(exception_mpp_hash_build)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered.
=> DBGInvoke __disable_fail_point(exception_mpp_hash_build)

# Clean up.
mysql> drop table if exists test.t

0 comments on commit c09e036

Please sign in to comment.