Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#7434
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
SeaRise authored and ti-chi-bot committed Jan 24, 2024
1 parent 09c8da9 commit 2d46e03
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
30 changes: 21 additions & 9 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ MPPTask::~MPPTask()
void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
{
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (unlikely(tunnel_set == nullptr))
return;
}
Expand All @@ -87,7 +87,7 @@ void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
void MPPTask::abortReceivers()
{
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if unlikely (receiver_set == nullptr)
return;
}
Expand Down Expand Up @@ -146,7 +146,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
}
}
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnels can not be registered, because the task is not in initializing state"));
tunnel_set = std::move(tunnel_set_local);
Expand Down Expand Up @@ -184,7 +184,7 @@ void MPPTask::initExchangeReceivers()
return true;
});
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
Expand All @@ -200,7 +200,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
"can't find tunnel ({} + {}) because the task is aborted, error message = {}",
request->sender_meta().task_id(),
request->receiver_meta().task_id(),
err_string);
getErrString());
return {nullptr, err_msg};
}

Expand All @@ -218,6 +218,18 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {tunnel_ptr, ""};
}

String MPPTask::getErrString() const
{
std::lock_guard lock(mtx);
return err_string;
}

void MPPTask::setErrString(const String & message)
{
std::lock_guard lock(mtx);
err_string = message;
}

void MPPTask::unregisterTask()
{
auto [result, reason] = manager->unregisterTask(id);
Expand Down Expand Up @@ -328,7 +340,7 @@ void MPPTask::preprocess()
query_executor_holder.set(queryExecute(*context));
LOG_DEBUG(log, "init query executor done");
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != RUNNING)
throw Exception("task not in running state, may be cancelled");
for (auto & r : dag_context->getCoprocessorReaders())
Expand Down Expand Up @@ -446,7 +458,7 @@ void MPPTask::runImpl()
}
}
}
mpp_task_statistics.end(status.load(), err_string);
mpp_task_statistics.end(status.load(), getErrString());
mpp_task_statistics.logTracingJson();

LOG_DEBUG(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
Expand Down Expand Up @@ -486,7 +498,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
}
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status))
{
err_string = message;
setErrString(message);
/// if the task is in initializing state, mpp task can return error to TiDB directly,
/// so just close all tunnels here
abortTunnels("", false);
Expand All @@ -498,7 +510,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
/// abort the components from top to bottom because if bottom components are aborted
/// first, the top components may see an error caused by the abort, which is not
/// the original error
err_string = message;
setErrString(message);
abortTunnels(message, false);
abortDataStreams(abort_type);
abortReceivers();
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void initExchangeReceivers();

<<<<<<< HEAD
=======
String getErrString() const;
void setErrString(const String & message);

private:
// To make sure dag_req is not destroyed before the mpp task ends.
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
tipb::DAGRequest dag_req;
mpp::TaskMeta meta;
MPPTaskId id;
Expand All @@ -128,9 +136,11 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
QueryExecutorHolder query_executor_holder;

std::atomic<TaskStatus> status{INITIALIZING};
String err_string;

std::mutex tunnel_and_receiver_mu;
/// Used to protect concurrent access to `err_string`, `tunnel_set`, and `receiver_set`.
mutable std::mutex mtx;

String err_string;

MPPTunnelSetPtr tunnel_set;

Expand Down

0 comments on commit 2d46e03

Please sign in to comment.