Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsan: fix data race on ComputeServerRunner.cancelJoinTasks (#7434) #8728

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,16 @@ MPPTask::~MPPTask()
closeAllTunnels("");
if (schedule_state == ScheduleState::SCHEDULED)
{
<<<<<<< HEAD
/// the threads of this task are not fully freed now, since the BlockIO and DAGContext are not destructed
/// TODO: finish all threads before here, except the current one.
manager->releaseThreadsFromScheduler(needed_threads);
schedule_state = ScheduleState::COMPLETED;
=======
std::unique_lock lock(mtx);
if (unlikely(tunnel_set == nullptr))
return;
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
}
LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString());
}
Expand All @@ -81,7 +87,23 @@ void MPPTask::closeAllTunnels(const String & reason)
{
for (auto & it : tunnel_map)
{
<<<<<<< HEAD
it.second->close(reason);
=======
std::unique_lock lock(mtx);
if unlikely (receiver_set == nullptr)
return;
}
receiver_set->cancel();
}

void MPPTask::abortQueryExecutor()
{
if (auto query_executor = query_executor_holder.tryGet(); query_executor)
{
assert(query_executor.value());
(*query_executor)->cancel();
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
}
}

Expand All @@ -106,7 +128,71 @@ void MPPTask::registerTunnel(const MPPTaskId & id, MPPTunnelPtr tunnel)
if (tunnel_map.find(id) != tunnel_map.end())
throw Exception("the tunnel " + tunnel->id() + " has been registered");

<<<<<<< HEAD
tunnel_map[id] = tunnel;
=======
bool is_local = context->getSettingsRef().enable_local_tunnel && meta.address() == task_meta.address();
bool is_async = !is_local && context->getSettingsRef().enable_async_server;
MPPTunnelPtr tunnel = std::make_shared<MPPTunnel>(task_meta, task_request.meta(), timeout, context->getSettingsRef().max_threads, is_local, is_async, log->identifier());

LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);

if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));

tunnel_set_local->registerTunnel(MPPTaskId(task_meta), tunnel);
injectFailPointDuringRegisterTunnel(dag_context->isRootMPPTask());
}
{
std::unique_lock lock(mtx);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnels can not be registered, because the task is not in initializing state"));
tunnel_set = std::move(tunnel_set_local);
}
dag_context->tunnel_set = tunnel_set;
}

void MPPTask::initExchangeReceivers()
{
auto receiver_set_local = std::make_shared<MPPReceiverSet>(log->identifier());
dag_context->dag_request.traverse([&](const tipb::Executor & executor) {
if (executor.tp() == tipb::ExecType::TypeExchangeReceiver)
{
assert(executor.has_executor_id());
const auto & executor_id = executor.executor_id();
// In order to distinguish different exchange receivers.
auto exchange_receiver = std::make_shared<ExchangeReceiver>(
std::make_shared<GRPCReceiverContext>(
executor.exchange_receiver(),
dag_context->getMPPTaskMeta(),
context->getTMTContext().getKVCluster(),
context->getTMTContext().getMPPTaskManager(),
context->getSettingsRef().enable_local_tunnel,
context->getSettingsRef().enable_async_grpc_client),
executor.exchange_receiver().encoded_task_meta_size(),
context->getMaxStreams(),
log->identifier(),
executor_id,
executor.fine_grained_shuffle_stream_count(),
context->getSettings().local_tunnel_version,
context->getSettings().async_recv_version,
context->getSettings().recv_queue_size);

if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");

receiver_set_local->addExchangeReceiver(executor_id, exchange_receiver);
}
return true;
});
{
std::unique_lock lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
}
dag_context->setMPPReceiverSet(receiver_set);
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
}

std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request)
Expand All @@ -116,7 +202,12 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
auto err_msg = fmt::format(
"can't find tunnel ({} + {}) because the task is cancelled",
request->sender_meta().task_id(),
<<<<<<< HEAD
request->receiver_meta().task_id());
=======
request->receiver_meta().task_id(),
getErrString());
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
return {nullptr, err_msg};
}

Expand All @@ -133,6 +224,18 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {it->second, ""};
}

String MPPTask::getErrString() const
{
std::lock_guard lock(mtx);
return err_string;
}

void MPPTask::setErrString(const String & message)
{
std::lock_guard lock(mtx);
err_string = message;
}

void MPPTask::unregisterTask()
{
if (manager != nullptr)
Expand Down Expand Up @@ -260,8 +363,20 @@ void MPPTask::preprocess()
DAGQuerySource dag(*context);
auto block_io = executeQuery(dag, *context, false, QueryProcessingStage::Complete);
{
<<<<<<< HEAD
std::lock_guard lock(stream_mu);
data_stream = block_io.in;
=======
std::unique_lock lock(mtx);
if (status != RUNNING)
throw Exception("task not in running state, may be cancelled");
for (auto & r : dag_context->getCoprocessorReaders())
receiver_set->addCoprocessorReader(r);
const auto & receiver_opt = dag_context->getDisaggregatedComputeExchangeReceiver();
if (receiver_opt.has_value())
receiver_set->addExchangeReceiver(receiver_opt->first, receiver_opt->second);
new_thread_count_of_mpp_receiver += receiver_set->getExternalThreadCnt();
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
}
auto end_time = Clock::now();
dag_context->compile_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
Expand Down Expand Up @@ -402,6 +517,14 @@ void MPPTask::writeErrToAllTunnels(const String & e)
tryLogCurrentException(log, "Failed to write error " + e + " to tunnel: " + it.second->id());
}
}
<<<<<<< HEAD
=======
mpp_task_statistics.end(status.load(), getErrString());
mpp_task_statistics.logTracingJson();

LOG_DEBUG(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
unregisterTask();
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
}

void MPPTask::cancel(const String & reason)
Expand All @@ -418,13 +541,31 @@ void MPPTask::cancel(const String & reason)
}
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, CANCELLED))
{
<<<<<<< HEAD
closeAllTunnels(reason);
unregisterTask();
LOG_WARNING(log, "Finish cancel task from uninitialized");
=======
setErrString(message);
/// if the task is in initializing state, mpp task can return error to TiDB directly,
/// so just close all tunnels here
abortTunnels("", false);
LOG_WARNING(log, "Finish abort task from uninitialized");
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
return;
}
else if (previous_status == RUNNING && switchStatus(RUNNING, CANCELLED))
{
<<<<<<< HEAD
=======
/// abort the components from top to bottom because if bottom components are aborted
/// first, the top components may see an error caused by the abort, which is not
/// the original error
setErrString(message);
abortTunnels(message, false);
abortQueryExecutor();
abortReceivers();
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
scheduleThisTask(ScheduleState::FAILED);
sendCancelToQuery(true);
closeAllTunnels(reason);
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

BlockInputStreamPtr getDataStream();

<<<<<<< HEAD
=======
String getErrString() const;
void setErrString(const String & message);

private:
// To make sure dag_req is not destroyed before the mpp task ends.
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))
tipb::DAGRequest dag_req;

ContextPtr context;
Expand All @@ -121,9 +129,16 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

std::atomic<TaskStatus> status{INITIALIZING};

<<<<<<< HEAD
mpp::TaskMeta meta;

MPPTaskId id;
=======
/// Used to protect concurrent access to `err_string`, `tunnel_set`, and `receiver_set`.
mutable std::mutex mtx;

String err_string;
>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434))

MPPTunnelSetPtr tunnel_set;

Expand Down