Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#5008
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
yibin87 authored and ti-chi-bot committed May 26, 2022
1 parent 25545c4 commit 1e8677c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
17 changes: 16 additions & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ class AsyncRequestHandler : public UnaryCallback<bool>
switch (stage)
{
case AsyncRequestStage::WAIT_MAKE_READER:
{
// Use lock to ensure reader is created already in reactor thread
std::unique_lock lock(mu);
if (!ok)
reader.reset();
notifyReactor();
break;
}
case AsyncRequestStage::WAIT_BATCH_READ:
if (ok)
++read_packet_index;
Expand Down Expand Up @@ -227,6 +231,8 @@ class AsyncRequestHandler : public UnaryCallback<bool>
void start()
{
stage = AsyncRequestStage::WAIT_MAKE_READER;
// Use lock to ensure async reader is unreachable from grpc thread before this function returns
std::unique_lock lock(mu);
rpc_context->makeAsyncReader(*request, reader, thisAsUnaryCallback());
}

Expand Down Expand Up @@ -282,7 +288,12 @@ class AsyncRequestHandler : public UnaryCallback<bool>
MPPDataPacketPtrs packets;
size_t read_packet_index = 0;
Status finish_status = RPCContext::getStatusOK();
<<<<<<< HEAD
LogWithPrefixPtr log;
=======
LoggerPtr log;
std::mutex mu;
>>>>>>> f10b6d27c6 (Add mutex to protect exchange receiver's async client (#5008))
};
} // namespace
Expand Down Expand Up @@ -369,10 +380,14 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
MPMCQueue<AsyncHandler *> ready_requests(alive_async_connections * 2);
std::vector<AsyncHandler *> waiting_for_retry_requests;
std::vector<AsyncRequestHandler<RPCContext>> handlers;
std::vector<std::unique_ptr<AsyncHandler>> handlers;
handlers.reserve(alive_async_connections);
for (const auto & req : async_requests)
<<<<<<< HEAD
handlers.emplace_back(&ready_requests, &msg_channel, rpc_context, req, exc_log);
=======
handlers.emplace_back(std::make_unique<AsyncHandler>(&ready_requests, &msg_channel, rpc_context, req, exc_log->identifier()));
>>>>>>> f10b6d27c6 (Add mutex to protect exchange receiver's async client (#5008))

while (alive_async_connections > 0)
{
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Functions/FunctionsDateTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,14 @@ void registerFunctionsDateTime(FunctionFactory & factory)
factory.registerFunction<FunctionTiDBDateDiff>();
factory.registerFunction<FunctionToTiDBDayOfWeek>();
factory.registerFunction<FunctionToTiDBDayOfYear>();

<<<<<<< HEAD

=======
factory.registerFunction<FunctionToTiDBWeekOfYear>();
factory.registerFunction<FunctionToTiDBToSeconds>();
factory.registerFunction<FunctionToTiDBToDays>();
factory.registerFunction<FunctionTiDBFromDays>();
>>>>>>> f10b6d27c6 (Add mutex to protect exchange receiver's async client (#5008))
factory.registerFunction<FunctionToTimeZone>();
factory.registerFunction<FunctionToLastDay>();
}
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Functions/FunctionsDateTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -3336,7 +3336,13 @@ using FunctionToTime = FunctionDateOrDateTimeToSomething<DataTypeDateTime, ToTim
using FunctionToLastDay = FunctionMyDateOrMyDateTimeToSomething<DataTypeMyDate, TiDBLastDayTransformerImpl, return_nullable>;
using FunctionToTiDBDayOfWeek = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt16, TiDBDayOfWeekTransformerImpl, return_nullable>;
using FunctionToTiDBDayOfYear = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt16, TiDBDayOfYearTransformerImpl, return_nullable>;
<<<<<<< HEAD

=======
using FunctionToTiDBWeekOfYear = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt16, TiDBWeekOfYearTransformerImpl, return_nullable>;
using FunctionToTiDBToSeconds = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt64, TiDBToSecondsTransformerImpl, return_nullable>;
using FunctionToTiDBToDays = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt32, TiDBToDaysTransformerImpl, return_nullable>;
>>>>>>> f10b6d27c6 (Add mutex to protect exchange receiver's async client (#5008))
using FunctionToRelativeYearNum = FunctionDateOrDateTimeToSomething<DataTypeUInt16, ToRelativeYearNumImpl>;
using FunctionToRelativeQuarterNum = FunctionDateOrDateTimeToSomething<DataTypeUInt32, ToRelativeQuarterNumImpl>;
using FunctionToRelativeMonthNum = FunctionDateOrDateTimeToSomething<DataTypeUInt32, ToRelativeMonthNumImpl>;
Expand Down

0 comments on commit 1e8677c

Please sign in to comment.