Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mpp hang error if some error happens during compile of mpp plan in TiFlash #1533

Merged
merged 12 commits into from
Mar 11, 2021
Merged
49 changes: 29 additions & 20 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,35 @@ namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;

#define APPLY_FOR_FAILPOINTS(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
M(exception_drop_table_during_remove_meta) \
M(exception_between_rename_table_data_and_metadata) \
M(exception_between_create_database_meta_and_directory) \
M(exception_before_rename_table_old_meta_removed) \
M(exception_after_step_1_in_exchange_partition) \
M(exception_before_step_2_rename_in_exchange_partition) \
M(exception_after_step_2_in_exchange_partition) \
M(exception_before_step_3_rename_in_exchange_partition) \
M(exception_after_step_3_in_exchange_partition) \
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error) \
M(exception_before_dmfile_remove_encryption) \
M(exception_before_dmfile_remove_from_disk) \
M(force_enable_region_persister_compatible_mode) \
M(force_disable_region_persister_compatible_mode) \
M(force_triggle_background_merge_delta) \
M(force_triggle_foreground_flush)
#define APPLY_FOR_FAILPOINTS(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
M(exception_drop_table_during_remove_meta) \
M(exception_between_rename_table_data_and_metadata) \
M(exception_between_create_database_meta_and_directory) \
M(exception_before_rename_table_old_meta_removed) \
M(exception_after_step_1_in_exchange_partition) \
M(exception_before_step_2_rename_in_exchange_partition) \
M(exception_after_step_2_in_exchange_partition) \
M(exception_before_step_3_rename_in_exchange_partition) \
M(exception_after_step_3_in_exchange_partition) \
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error) \
M(exception_before_dmfile_remove_encryption) \
M(exception_before_dmfile_remove_from_disk) \
M(force_enable_region_persister_compatible_mode) \
M(force_disable_region_persister_compatible_mode) \
M(force_triggle_background_merge_delta) \
M(force_triggle_foreground_flush) \
M(exception_before_mpp_register_non_root_mpp_task) \
M(exception_before_mpp_register_tunnel_for_non_root_mpp_task) \
M(exception_during_mpp_register_tunnel_for_non_root_mpp_task) \
M(exception_before_mpp_non_root_task_run) \
M(exception_during_mpp_non_root_task_run) \
M(exception_before_mpp_register_root_mpp_task) \
M(exception_before_mpp_register_tunnel_for_root_mpp_task) \
M(exception_before_mpp_root_task_run) \
M(exception_during_mpp_root_task_run)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
81 changes: 76 additions & 5 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ namespace DB
namespace FailPoints
{
extern const char hang_in_execution[];
extern const char exception_before_mpp_register_non_root_mpp_task[];
extern const char exception_before_mpp_register_root_mpp_task[];
extern const char exception_before_mpp_register_tunnel_for_non_root_mpp_task[];
extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[];
extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[];
extern const char exception_before_mpp_non_root_task_run[];
extern const char exception_before_mpp_root_task_run[];
extern const char exception_during_mpp_non_root_task_run[];
extern const char exception_during_mpp_root_task_run[];
} // namespace FailPoints

bool MPPTaskProgress::isTaskHanging(const Context & context)
Expand Down Expand Up @@ -108,21 +117,37 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
context.getTimezoneInfo().resetByDAGRequest(*dag_req);
context.setProgressCallback([this](const Progress & progress) { this->updateProgress(progress); });

dag_context = std::make_unique<DAGContext>(*dag_req, task_request.meta());
context.setDAGContext(dag_context.get());

// register task.
TMTContext & tmt_context = context.getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
LOG_DEBUG(log, "begin to register the task " << id.toString());

if (dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_root_mpp_task);
}
else
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_non_root_mpp_task);
}
if (!task_manager->registerTask(shared_from_this()))
{
throw TiFlashException(std::string(__PRETTY_FUNCTION__) + ": Failed to register MPP Task", Errors::Coprocessor::BadRequest);
}


dag_context = std::make_unique<DAGContext>(*dag_req, task_request.meta());
context.setDAGContext(dag_context.get());

DAGQuerySource dag(context, regions, *dag_req, true);

if (dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_tunnel_for_root_mpp_task);
}
else
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_tunnel_for_non_root_mpp_task);
}
// register tunnels
MPPTunnelSetPtr tunnel_set = std::make_shared<MPPTunnelSet>();
const auto & exchangeSender = dag_req->root_executor().exchange_sender();
Expand All @@ -136,6 +161,10 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
LOG_DEBUG(log, "begin to register the tunnel " << tunnel->tunnel_id);
registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set->tunnels.emplace_back(tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
}
}
// read index , this may take a long time.
io = executeQuery(dag, context, false, QueryProcessingStage::Complete);
Expand Down Expand Up @@ -202,6 +231,14 @@ void MPPTask::runImpl()
count += block.rows();
to->write(block);
FAIL_POINT_PAUSE(FailPoints::hang_in_execution);
if (dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_root_task_run);
}
else
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_non_root_task_run);
}
}

/// For outputting additional information in some formats.
Expand Down Expand Up @@ -278,14 +315,45 @@ void MPPTask::cancel()
LOG_WARNING(log, "Finish cancel task: " + id.toString());
}

void MPPHandler::handleError(MPPTaskPtr task, String error)
{
try
{
if (task != nullptr)
{
/// for root task, the tunnel is only connected after DispatchMPPTask
/// finishes without error, for non-root task, tunnel can be connected
/// even if the DispatchMPPTask fails, so for non-root task, we write
/// error to all tunnels, while for root task, we just close the tunnel.
if (!task->dag_context->isRootMPPTask())
task->writeErrToAllTunnel(error);
else
task->finishAllTunnel();
task->unregisterTask();
}
}
catch (...)
{
LOG_ERROR(log, "Fail to handle error in while trying to handle error and clean task");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}
// execute is responsible for making plan , register tasks and tunnels and start the running thread.
grpc::Status MPPHandler::execute(Context & context, mpp::DispatchTaskResponse * response)
{
MPPTaskPtr task = nullptr;
try
{
Stopwatch stopwatch;
MPPTaskPtr task = std::make_shared<MPPTask>(task_request.meta(), context);
task = std::make_shared<MPPTask>(task_request.meta(), context);
task->prepare(task_request);
if (task->dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_root_task_run);
}
else
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_non_root_task_run);
}
task->memory_tracker = current_memory_tracker;
task->run();
LOG_INFO(log, "processing dispatch is over; the time cost is " << std::to_string(stopwatch.elapsedMilliseconds()) << " ms");
Expand All @@ -295,18 +363,21 @@ grpc::Status MPPHandler::execute(Context & context, mpp::DispatchTaskResponse *
LOG_ERROR(log, "dispatch task meet error : " << e.displayText());
auto * err = response->mutable_error();
err->set_msg(e.displayText());
handleError(task, e.displayText());
}
catch (std::exception & e)
{
LOG_ERROR(log, "dispatch task meet error : " << e.what());
auto * err = response->mutable_error();
err->set_msg(e.what());
handleError(task, e.what());
}
catch (...)
{
LOG_ERROR(log, "dispatch task meet fatal error");
auto * err = response->mutable_error();
err->set_msg("fatal error");
handleError(task, "fatal error");
}
return grpc::Status::OK;
}
Expand Down
29 changes: 28 additions & 1 deletion dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ struct MPPTunnel
cv_for_finished.notify_all();
}

/// finish the tunnel without checking the connect status, this function
/// should only be used when handling error if DispatchMPPTask fails for
/// root task. Because for root task, if DispatchMPPTask fails, TiDB does
/// not sending establish MPP connection request at all, it is meaningless
/// to check the connect status in this case, just finish the tunnel.
void finish()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about use a more critical word like destroy?

Copy link
Contributor Author

@windtalker windtalker Mar 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will use close

{
std::unique_lock<std::mutex> lk(mu);
finished = true;
cv_for_finished.notify_all();
}

// a MPPConn request has arrived. it will build connection by this tunnel;
void connect(::grpc::ServerWriter<::mpp::MPPDataPacket> * writer_)
{
Expand Down Expand Up @@ -256,7 +268,7 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab
// which targeted task we should send data by which tunnel.
std::map<MPPTaskId, MPPTunnelPtr> tunnel_map;

MPPTaskManager * manager;
MPPTaskManager * manager = nullptr;

Logger * log;

Expand All @@ -279,6 +291,20 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab

void cancel();

void finishAllTunnel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

{
try
{
for (auto & it : tunnel_map)
{
it.second->finish();
}
}
catch (...)
{
LOG_WARNING(log, "Failed to finish all tunnels");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could leverage the follow method for logging the exception either. Otherwise we know only it failed but lost the reason.

https://github.com/pingcap/tics/blob/6e89bad624e4a9bd4255ee86684dceea6eeed153/dbms/src/Common/Exception.cpp#L59

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

}
}
void writeErrToAllTunnel(const String & e)
{
try
Expand Down Expand Up @@ -516,6 +542,7 @@ class MPPHandler
public:
MPPHandler(const mpp::DispatchTaskRequest & task_request_) : task_request(task_request_), log(&Logger::get("MPPHandler")) {}
grpc::Status execute(Context & context, mpp::DispatchTaskResponse * response);
void handleError(MPPTaskPtr task, String error);
};

} // namespace DB
69 changes: 69 additions & 0 deletions tests/fullstack-test/mpp/mpp_fail.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Preparation.
=> DBGInvoke __init_fail_point()

mysql> drop table if exists test.t
mysql> create table test.t (id int, value varchar(64))
mysql> insert into test.t values(1,'a'),(2,'b'),(3,'c')
mysql> alter table test.t set tiflash replica 1

func> wait_table test t


# Data.

## exception before mpp register non root mpp task
=> DBGInvoke __enable_fail_point(exception_before_mpp_register_non_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_non_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_register_non_root_mpp_task)

## exception before mpp register root mpp task
=> DBGInvoke __enable_fail_point(exception_before_mpp_register_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_register_root_mpp_task)

## exception before mpp register tunnel for non root mpp task
=> DBGInvoke __enable_fail_point(exception_before_mpp_register_tunnel_for_non_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_tunnel_for_non_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_register_tunnel_for_non_root_mpp_task)

## exception before mpp register tunnel for root mpp task
=> DBGInvoke __enable_fail_point(exception_before_mpp_register_tunnel_for_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_tunnel_for_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_register_tunnel_for_root_mpp_task)

## exception during mpp register tunnel for non root mpp task
=> DBGInvoke __enable_fail_point(exception_during_mpp_register_tunnel_for_non_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_during_mpp_register_tunnel_for_non_root_mpp_task)

## exception before mpp run non root task
=> DBGInvoke __enable_fail_point(exception_before_mpp_non_root_task_run)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_non_root_task_run is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_non_root_task_run)

## exception before mpp run root task
=> DBGInvoke __enable_fail_point(exception_before_mpp_root_task_run)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_root_task_run is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_root_task_run)

## exception during mpp run non root task
=> DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: exchange receiver meet error : DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered.
=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run)

## exception during mpp run root task
=> DBGInvoke __enable_fail_point(exception_during_mpp_root_task_run)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_during_mpp_root_task_run is triggered.
=> DBGInvoke __disable_fail_point(exception_during_mpp_root_task_run)

# Clean up.
mysql> drop table if exists test.t