Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsan: fix data race on ComputeServerRunner.cancelJoinTasks #7434

Merged
merged 2 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ bool MPPTask::isRootMPPTask() const
void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
{
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (unlikely(tunnel_set == nullptr))
return;
}
Expand All @@ -125,7 +125,7 @@ void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
void MPPTask::abortReceivers()
{
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if unlikely (receiver_set == nullptr)
return;
}
Expand Down Expand Up @@ -180,7 +180,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
injectFailPointDuringRegisterTunnel(dag_context->isRootMPPTask());
}
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnels can not be registered, because the task is not in initializing state"));
tunnel_set = std::move(tunnel_set_local);
Expand Down Expand Up @@ -222,7 +222,7 @@ void MPPTask::initExchangeReceivers()
return true;
});
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
Expand All @@ -238,7 +238,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
"can't find tunnel ({} + {}) because the task is aborted, error message = {}",
request->sender_meta().task_id(),
request->receiver_meta().task_id(),
err_string);
getErrString());
return {nullptr, err_msg};
}

Expand All @@ -256,6 +256,18 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {tunnel_ptr, ""};
}

String MPPTask::getErrString() const
{
std::lock_guard lock(mtx);
return err_string;
}

void MPPTask::setErrString(const String & message)
{
std::lock_guard lock(mtx);
err_string = message;
}

void MPPTask::unregisterTask()
{
auto [result, reason] = manager->unregisterTask(id);
Expand Down Expand Up @@ -349,7 +361,7 @@ void MPPTask::preprocess()
query_executor_holder.set(queryExecute(*context));
LOG_DEBUG(log, "init query executor done");
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != RUNNING)
throw Exception("task not in running state, may be cancelled");
for (auto & r : dag_context->getCoprocessorReaders())
Expand Down Expand Up @@ -481,7 +493,7 @@ void MPPTask::runImpl()
}
}
}
mpp_task_statistics.end(status.load(), err_string);
mpp_task_statistics.end(status.load(), getErrString());
mpp_task_statistics.logTracingJson();

LOG_DEBUG(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
Expand Down Expand Up @@ -521,7 +533,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
}
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status))
{
err_string = message;
setErrString(message);
/// if the task is in initializing state, mpp task can return error to TiDB directly,
/// so just close all tunnels here
abortTunnels("", false);
Expand All @@ -533,7 +545,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
/// abort the components from top to bottom because if bottom components are aborted
/// first, the top components may see an error caused by the abort, which is not
/// the original error
err_string = message;
setErrString(message);
abortTunnels(message, false);
abortQueryExecutor();
abortReceivers();
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void initExchangeReceivers();

String getErrString() const;
void setErrString(const String & message);

private:
// To make sure dag_req is not destroyed before the mpp task ends.
tipb::DAGRequest dag_req;
mpp::TaskMeta meta;
Expand All @@ -130,9 +134,11 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
QueryExecutorHolder query_executor_holder;

std::atomic<TaskStatus> status{INITIALIZING};
String err_string;

std::mutex tunnel_and_receiver_mu;
/// Used to protect concurrent access to `err_string`, `tunnel_set`, and `receiver_set`.
mutable std::mutex mtx;

String err_string;

MPPTunnelSetPtr tunnel_set;

Expand Down