From dee83742f6d3c6321ca8c70bc67fb83575c8a3ec Mon Sep 17 00:00:00 2001 From: Zhuhe Fang Date: Fri, 12 Mar 2021 13:55:06 +0800 Subject: [PATCH] MPP: add CancelMPPTask for FlashService (#1505) * add CancelMPPTask for FlashService * updated Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com> --- dbms/src/Common/TiFlashMetrics.h | 12 ++++++++---- dbms/src/Flash/FlashService.cpp | 31 +++++++++++++++++++++++++++++++ dbms/src/Flash/FlashService.h | 2 ++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 2917c338c99..445643680f7 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -23,11 +23,13 @@ namespace DB M(tiflash_coprocessor_request_count, "Total number of request", Counter, F(type_batch, {"type", "batch"}), \ F(type_batch_cop, {"type", "batch_cop"}), F(type_cop, {"type", "cop"}), F(type_cop_dag, {"type", "cop_dag"}), \ F(type_super_batch, {"type", "super_batch"}), F(type_super_batch_cop_dag, {"type", "super_batch_cop_dag"}), \ - F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), F(type_mpp_establish_conn, {"type", "mpp_establish_conn"})) \ + F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), F(type_mpp_establish_conn, {"type", "mpp_establish_conn"}), \ + F(type_cancel_mpp_task, {"type", "mpp_establish_conn"})) \ M(tiflash_coprocessor_handling_request_count, "Number of handling request", Gauge, F(type_batch, {"type", "batch"}), \ F(type_batch_cop, {"type", "batch_cop"}), F(type_cop, {"type", "cop"}), F(type_cop_dag, {"type", "cop_dag"}), \ F(type_super_batch, {"type", "super_batch"}), F(type_super_batch_cop_dag, {"type", "super_batch_cop_dag"}), \ - F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), F(type_mpp_establish_conn, {"type", "mpp_establish_conn"})) \ + F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), F(type_mpp_establish_conn, {"type", "mpp_establish_conn"}), \ + F(type_cancel_mpp_task, {"type", "mpp_establish_conn"})) \ M(tiflash_coprocessor_executor_count, "Total number of each executor", Counter, F(type_ts, {"type", "table_scan"}), \ F(type_sel, {"type", "selection"}), F(type_agg, {"type", "aggregation"}), F(type_topn, {"type", "top_n"}), \ F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"}), F(type_exchange_sender, {"type", "exchange_sender"}), \ @@ -36,7 +38,8 @@ namespace DB F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 20}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.0005, 2, 20}), \ - F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30})) \ + F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30}), \ + F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}, ExpBuckets{0.0005, 2, 30})) \ M(tiflash_coprocessor_request_memory_usage, "Bucketed histogram of request memory usage", Histogram, \ F(type_cop, {{"type", "cop"}}, ExpBuckets{1024 * 1024, 2, 16}), \ F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{1024 * 1024, 2, 16}), \ @@ -49,7 +52,8 @@ namespace DB F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 20}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.0005, 2, 20}), \ - F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30})) \ + F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30}), \ + F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}, ExpBuckets{0.0005, 2, 30})) \ M(tiflash_coprocessor_response_bytes, "Total bytes of response body", Counter) \ M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \ M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \ diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 98bfae1a995..9b2d05885b0 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -205,6 +205,37 @@ ::grpc::Status FlashService::EstablishMPPConnection(::grpc::ServerContext * grpc return grpc::Status::OK; } +::grpc::Status FlashService::CancelMPPTask(::grpc::ServerContext* grpc_context, const ::mpp::CancelTaskRequest* request, ::mpp::CancelTaskResponse* response) { + // CancelMPPTask cancels the query of the task. + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": cancel mpp task request: " << request->DebugString()); + + if (!security_config.checkGrpcContext(grpc_context)) + { + return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg); + } + GET_METRIC(metrics, tiflash_coprocessor_request_count, type_cancel_mpp_task).Increment(); + GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Increment(); + Stopwatch watch; + SCOPE_EXIT({ + GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Decrement(); + GET_METRIC(metrics, tiflash_coprocessor_request_duration_seconds, type_cancel_mpp_task).Observe(watch.elapsedSeconds()); + GET_METRIC(metrics, tiflash_coprocessor_response_bytes).Increment(response->ByteSizeLong()); + }); + + auto [context, status] = createDBContext(grpc_context); + auto err = new mpp::Error(); + if (!status.ok()) + { + err->set_msg("error status"); + response->set_allocated_error(err); + return status; + } + auto & tmt_context = context.getTMTContext(); + auto task_manager = tmt_context.getMPPTaskManager(); + task_manager->cancelMPPQuery(request->meta().start_ts()); + return grpc::Status::OK; +} + // This function is deprecated. grpc::Status FlashService::BatchCommands( grpc::ServerContext * grpc_context, grpc::ServerReaderWriter<::tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream) diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index c389cd923d4..9bf7b6f7da5 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -40,6 +40,8 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar const ::mpp::EstablishMPPConnectionRequest * request, ::grpc::ServerWriter<::mpp::MPPDataPacket> * writer) override; + ::grpc::Status CancelMPPTask(::grpc::ServerContext* context, const ::mpp::CancelTaskRequest* request, ::mpp::CancelTaskResponse* response) override; + private: std::tuple createDBContext(const grpc::ServerContext * grpc_context) const;