From 1e8677c32d41e5da1e66c56228848ab21c633caf Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 26 May 2022 22:26:02 +0800 Subject: [PATCH] This is an automated cherry-pick of #5008 Signed-off-by: ti-chi-bot --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 17 ++++++++++++++++- dbms/src/Functions/FunctionsDateTime.cpp | 9 ++++++++- dbms/src/Functions/FunctionsDateTime.h | 6 ++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 98e784affa0..ff0893738a1 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -92,10 +92,14 @@ class AsyncRequestHandler : public UnaryCallback switch (stage) { case AsyncRequestStage::WAIT_MAKE_READER: + { + // Use lock to ensure reader is created already in reactor thread + std::unique_lock lock(mu); if (!ok) reader.reset(); notifyReactor(); break; + } case AsyncRequestStage::WAIT_BATCH_READ: if (ok) ++read_packet_index; @@ -227,6 +231,8 @@ class AsyncRequestHandler : public UnaryCallback void start() { stage = AsyncRequestStage::WAIT_MAKE_READER; + // Use lock to ensure async reader is unreachable from grpc thread before this function returns + std::unique_lock lock(mu); rpc_context->makeAsyncReader(*request, reader, thisAsUnaryCallback()); } @@ -282,7 +288,12 @@ class AsyncRequestHandler : public UnaryCallback MPPDataPacketPtrs packets; size_t read_packet_index = 0; Status finish_status = RPCContext::getStatusOK(); +<<<<<<< HEAD LogWithPrefixPtr log; +======= + LoggerPtr log; + std::mutex mu; +>>>>>>> f10b6d27c6 (Add mutex to protect exchange receiver's async client (#5008)) }; } // namespace @@ -369,10 +380,14 @@ void ExchangeReceiverBase::reactor(const std::vector & asyn MPMCQueue ready_requests(alive_async_connections * 2); std::vector waiting_for_retry_requests; - std::vector> handlers; + std::vector> handlers; handlers.reserve(alive_async_connections); for (const auto & req : async_requests) +<<<<<<< HEAD handlers.emplace_back(&ready_requests, &msg_channel, rpc_context, req, exc_log); +======= + handlers.emplace_back(std::make_unique(&ready_requests, &msg_channel, rpc_context, req, exc_log->identifier())); +>>>>>>> f10b6d27c6 (Add mutex to protect exchange receiver's async client (#5008)) while (alive_async_connections > 0) { diff --git a/dbms/src/Functions/FunctionsDateTime.cpp b/dbms/src/Functions/FunctionsDateTime.cpp index ebfaf176945..16375092f17 100644 --- a/dbms/src/Functions/FunctionsDateTime.cpp +++ b/dbms/src/Functions/FunctionsDateTime.cpp @@ -136,7 +136,14 @@ void registerFunctionsDateTime(FunctionFactory & factory) factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); - +<<<<<<< HEAD + +======= + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); +>>>>>>> f10b6d27c6 (Add mutex to protect exchange receiver's async client (#5008)) factory.registerFunction(); factory.registerFunction(); } diff --git a/dbms/src/Functions/FunctionsDateTime.h b/dbms/src/Functions/FunctionsDateTime.h index 77e0ff6dca1..f522558e7d7 100644 --- a/dbms/src/Functions/FunctionsDateTime.h +++ b/dbms/src/Functions/FunctionsDateTime.h @@ -3336,7 +3336,13 @@ using FunctionToTime = FunctionDateOrDateTimeToSomething; using FunctionToTiDBDayOfWeek = FunctionMyDateOrMyDateTimeToSomething; using FunctionToTiDBDayOfYear = FunctionMyDateOrMyDateTimeToSomething; +<<<<<<< HEAD +======= +using FunctionToTiDBWeekOfYear = FunctionMyDateOrMyDateTimeToSomething; +using FunctionToTiDBToSeconds = FunctionMyDateOrMyDateTimeToSomething; +using FunctionToTiDBToDays = FunctionMyDateOrMyDateTimeToSomething; +>>>>>>> f10b6d27c6 (Add mutex to protect exchange receiver's async client (#5008)) using FunctionToRelativeYearNum = FunctionDateOrDateTimeToSomething; using FunctionToRelativeQuarterNum = FunctionDateOrDateTimeToSomething; using FunctionToRelativeMonthNum = FunctionDateOrDateTimeToSomething;