From de32b2ca301b0bd6bac7767f744045010f90ae1d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 22 Apr 2022 13:50:04 +0800 Subject: [PATCH] fix bug that ExchangeReceiver is not cancelled if exception happens in union/agg block input stream (#4285) (#4290) ref pingcap/tiflash#4229 --- .../ParallelAggregatingBlockInputStream.cpp | 3 +- dbms/src/DataStreams/UnionBlockInputStream.h | 33 ++++++++++++------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 7705ef8847a..79511dd30c8 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -183,7 +183,8 @@ void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_pt parent.exceptions[thread_num] = exception; /// can not cancel parent inputStream or the exception might be lost if (!parent.executed) - parent.processor.cancel(false); + /// kill the processor so ExchangeReceiver will be closed + parent.processor.cancel(true); } diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index 7677d9c2b62..af744d76e36 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -26,11 +26,11 @@ struct OutputData Block block; std::exception_ptr exception; - OutputData() {} + OutputData() = default; explicit OutputData(Block & block_) : block(block_) {} - explicit OutputData(std::exception_ptr & exception_) + explicit OutputData(const std::exception_ptr & exception_) : exception(exception_) {} }; @@ -43,12 +43,12 @@ struct OutputData BlockExtraInfo extra_info; std::exception_ptr exception; - OutputData() {} + OutputData() = default; OutputData(Block & block_, BlockExtraInfo & extra_info_) : block(block_) , extra_info(extra_info_) {} - explicit OutputData(std::exception_ptr & exception_) + explicit OutputData(const std::exception_ptr & exception_) : exception(exception_) {} }; @@ -260,6 +260,23 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream * otherwise ParallelInputsProcessor can be blocked during insertion into the queue. */ OutputQueue output_queue; + std::mutex mu; + bool meet_exception = false; + + void handleException(const std::exception_ptr & exception) + { + std::unique_lock lock(mu); + if (meet_exception) + return; + meet_exception = true; + /// The order of the rows matters. If it is changed, then the situation is possible, + /// when before exception, an empty block (end of data) will be put into the queue, + /// and the exception is lost. + output_queue.emplace(exception); + /// can not cancel itself or the exception might be lost + /// kill the processor so ExchangeReceiver will be closed + processor.cancel(true); + } struct Handler { @@ -290,13 +307,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream void onException(std::exception_ptr & exception, size_t /*thread_num*/) { - /// The order of the rows matters. If it is changed, then the situation is possible, - /// when before exception, an empty block (end of data) will be put into the queue, - /// and the exception is lost. - - parent.output_queue.emplace(exception); - /// can not cancel parent inputStream or the exception might be lost - parent.processor.cancel(false); /// Does not throw exceptions. + parent.handleException(exception); } String getName() const