Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine tiflash shutdown logic #4291

Merged
merged 9 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ namespace DB
F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_server_info, "Indicate the tiflash server info, and the value is the start timestamp (s).", Gauge, \
F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()})) \
M(tiflash_object_count, "Number of objects", Gauge, \
F(type_count_of_establish_calldata, {"type", "count_of_establish_calldata"}), \
F(type_count_of_mpptunnel, {"type", "count_of_mpptunnel"})) \
M(tiflash_thread_count, "Number of threads", Gauge, \
F(type_max_threads_of_thdpool, {"type", "thread_pool_total_max"}), \
F(type_active_threads_of_thdpool, {"type", "thread_pool_active"}), \
Expand Down
25 changes: 22 additions & 3 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Flash/EstablishCall.h>
#include <Flash/FlashService.h>
#include <Flash/Mpp/Utils.h>
Expand All @@ -26,12 +27,18 @@ EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCo
, responder(&ctx)
, state(NEW_REQUEST)
{
GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Increment();
// As part of the initial CREATE state, we *request* that the system
// start processing requests. In this request, "this" acts are
// the tag uniquely identifying the request.
service->RequestEstablishMPPConnection(&ctx, &request, &responder, cq, notify_cq, this);
}

EstablishCallData::~EstablishCallData()
{
GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Decrement();
}

EstablishCallData * EstablishCallData::spawn(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr<std::atomic<bool>> & is_shutdown)
{
return new EstablishCallData(service, cq, notify_cq, is_shutdown);
Expand All @@ -53,7 +60,9 @@ void EstablishCallData::tryFlushOne()

void EstablishCallData::responderFinish(const grpc::Status & status)
{
if (!(*is_shutdown))
if (*is_shutdown)
finishTunnelAndResponder();
else
responder.Finish(status, this);
}

Expand All @@ -79,7 +88,10 @@ void EstablishCallData::initRpc()
bool EstablishCallData::write(const mpp::MPPDataPacket & packet)
{
if (*is_shutdown)
return false;
{
finishTunnelAndResponder();
return true;
windtalker marked this conversation as resolved.
Show resolved Hide resolved
}
responder.Write(packet, this);
return true;
}
Expand Down Expand Up @@ -116,11 +128,18 @@ void EstablishCallData::cancel()
delete this;
return;
}
finishTunnelAndResponder();
}

void EstablishCallData::finishTunnelAndResponder()
{
state = FINISH;
if (mpp_tunnel)
{
mpp_tunnel->consumerFinish("grpc writes failed.", true); //trigger mpp tunnel finish work
}
grpc::Status status(static_cast<grpc::StatusCode>(GRPC_STATUS_UNKNOWN), "Consumer exits unexpected, grpc writes failed.");
responderFinish(status);
responder.Finish(status, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it save to call responder.Finish multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Finish will not be called multiple times

}

void EstablishCallData::proceed()
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/EstablishCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class EstablishCallData : public PacketWriter
grpc::ServerCompletionQueue * notify_cq,
const std::shared_ptr<std::atomic<bool>> & is_shutdown);

~EstablishCallData();

bool write(const mpp::MPPDataPacket & packet) override;

void tryFlushOne() override;
Expand Down Expand Up @@ -79,6 +81,8 @@ class EstablishCallData : public PacketWriter

void initRpc();

void finishTunnelAndResponder();

void responderFinish(const grpc::Status & status);

std::mutex mu;
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ MPPTunnelBase<Writer>::MPPTunnelBase(
, log(getMPPTaskLog(log_, tunnel_id))
{
assert(!(is_local && is_async));
GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment();
}

template <typename Writer>
MPPTunnelBase<Writer>::~MPPTunnelBase()
{
SCOPE_EXIT({
GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Decrement();
});
try
{
{
Expand Down Expand Up @@ -308,6 +312,9 @@ void MPPTunnelBase<Writer>::consumerFinish(const String & err_msg, bool need_loc
send_queue.finish();

auto rest_work = [this, &err_msg] {
// it's safe to call it multiple times
if (finished && consumer_state.errHasSet())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is the case that finished is true, while consumer_state.errHasSet() is false?

Copy link
Contributor Author

@bestwoody bestwoody Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when shutdown is set true and consumerFinish in sendjob is called, subsequent writeDone will call consumerFinish again. So make consumerFinish idempotenta will be more safe.

return;
finished = true;
// must call setError in the critical area to keep consistent with `finished` from outside.
consumer_state.setError(err_msg);
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,18 @@ class MPPTunnelBase : private boost::noncopyable
void setError(const String & err_msg)
{
promise.set_value(err_msg);
err_has_set = true;
}

bool errHasSet() const
{
return err_has_set.load();
}

private:
std::promise<String> promise;
std::shared_future<String> future;
std::atomic<bool> err_has_set{false};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think err_has_set is always be accessed under lock's protection, why still need to be atomic variable?

Copy link
Contributor Author

@bestwoody bestwoody Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for furture usage. So that we won't make a mistake that forget to check the flag externel with a lock. Since it's a indepentdent subclass, it need protect himself.

};
ConsumerState consumer_state;

Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,14 +642,17 @@ class Server::FlashGrpcServerHolder

~FlashGrpcServerHolder()
{
*is_shutdown = true;
const int wait_calldata_after_shutdown_interval_ms = 500;
std::this_thread::sleep_for(std::chrono::milliseconds(wait_calldata_after_shutdown_interval_ms)); // sleep 500ms to let operations of calldata called by MPPTunnel done.
/// Shut down grpc server.
// wait 5 seconds for pending rpcs to gracefully stop
gpr_timespec deadline{5, 0, GPR_TIMESPAN};
LOG_FMT_INFO(log, "Begin to shut down flash grpc server");
flash_grpc_server->Shutdown(deadline);
flash_grpc_server->Shutdown();
*is_shutdown = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to the previous version, why set is_shutdown after flash_grpc_server->Shutdown() now?

Copy link
Contributor Author

@bestwoody bestwoody Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if flash_grpc_server->Shutdown(); is called later, then canceled query will cause client(such as TiDB、TiFlash) send a lot of retry queries, those retried queries will be accept if flash_grpc_server->Shutdown(); is not called

// Wait all existed MPPTunnels done to prevent crash.
// If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done.
const int max_wait_cnt = 300;
int wait_cnt = 0;
while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt))
std::this_thread::sleep_for(std::chrono::seconds(1));

for (auto & cq : cqs)
cq->Shutdown();
for (auto & cq : notify_cqs)
Expand Down