Skip to content

Commit

Permalink
MPP: add CancelMPPTask for FlashService (#1505)
Browse files Browse the repository at this point in the history
* add CancelMPPTask for FlashService

* updated

Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
  • Loading branch information
fzhedu and ti-srebot authored Mar 12, 2021
1 parent 4538877 commit dee8374
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
12 changes: 8 additions & 4 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ namespace DB
M(tiflash_coprocessor_request_count, "Total number of request", Counter, F(type_batch, {"type", "batch"}), \
F(type_batch_cop, {"type", "batch_cop"}), F(type_cop, {"type", "cop"}), F(type_cop_dag, {"type", "cop_dag"}), \
F(type_super_batch, {"type", "super_batch"}), F(type_super_batch_cop_dag, {"type", "super_batch_cop_dag"}), \
F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), F(type_mpp_establish_conn, {"type", "mpp_establish_conn"})) \
F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), F(type_mpp_establish_conn, {"type", "mpp_establish_conn"}), \
F(type_cancel_mpp_task, {"type", "mpp_establish_conn"})) \
M(tiflash_coprocessor_handling_request_count, "Number of handling request", Gauge, F(type_batch, {"type", "batch"}), \
F(type_batch_cop, {"type", "batch_cop"}), F(type_cop, {"type", "cop"}), F(type_cop_dag, {"type", "cop_dag"}), \
F(type_super_batch, {"type", "super_batch"}), F(type_super_batch_cop_dag, {"type", "super_batch_cop_dag"}), \
F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), F(type_mpp_establish_conn, {"type", "mpp_establish_conn"})) \
F(type_dispatch_mpp_task, {"type", "dispatch_mpp_task"}), F(type_mpp_establish_conn, {"type", "mpp_establish_conn"}), \
F(type_cancel_mpp_task, {"type", "mpp_establish_conn"})) \
M(tiflash_coprocessor_executor_count, "Total number of each executor", Counter, F(type_ts, {"type", "table_scan"}), \
F(type_sel, {"type", "selection"}), F(type_agg, {"type", "aggregation"}), F(type_topn, {"type", "top_n"}), \
F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"}), F(type_exchange_sender, {"type", "exchange_sender"}), \
Expand All @@ -36,7 +38,8 @@ namespace DB
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 20}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30})) \
F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}, ExpBuckets{0.0005, 2, 30})) \
M(tiflash_coprocessor_request_memory_usage, "Bucketed histogram of request memory usage", Histogram, \
F(type_cop, {{"type", "cop"}}, ExpBuckets{1024 * 1024, 2, 16}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{1024 * 1024, 2, 16}), \
Expand All @@ -49,7 +52,8 @@ namespace DB
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 20}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_dispatch_mpp_task, {{"type", "dispatch_mpp_task"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30})) \
F(type_mpp_establish_conn, {{"type", "mpp_establish_conn"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}}, ExpBuckets{0.0005, 2, 30})) \
M(tiflash_coprocessor_response_bytes, "Total bytes of response body", Counter) \
M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \
M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \
Expand Down
31 changes: 31 additions & 0 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,37 @@ ::grpc::Status FlashService::EstablishMPPConnection(::grpc::ServerContext * grpc
return grpc::Status::OK;
}

::grpc::Status FlashService::CancelMPPTask(::grpc::ServerContext* grpc_context, const ::mpp::CancelTaskRequest* request, ::mpp::CancelTaskResponse* response) {
// CancelMPPTask cancels the query of the task.
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": cancel mpp task request: " << request->DebugString());

if (!security_config.checkGrpcContext(grpc_context))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}
GET_METRIC(metrics, tiflash_coprocessor_request_count, type_cancel_mpp_task).Increment();
GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Increment();
Stopwatch watch;
SCOPE_EXIT({
GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Decrement();
GET_METRIC(metrics, tiflash_coprocessor_request_duration_seconds, type_cancel_mpp_task).Observe(watch.elapsedSeconds());
GET_METRIC(metrics, tiflash_coprocessor_response_bytes).Increment(response->ByteSizeLong());
});

auto [context, status] = createDBContext(grpc_context);
auto err = new mpp::Error();
if (!status.ok())
{
err->set_msg("error status");
response->set_allocated_error(err);
return status;
}
auto & tmt_context = context.getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->cancelMPPQuery(request->meta().start_ts());
return grpc::Status::OK;
}

// This function is deprecated.
grpc::Status FlashService::BatchCommands(
grpc::ServerContext * grpc_context, grpc::ServerReaderWriter<::tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar
const ::mpp::EstablishMPPConnectionRequest * request,
::grpc::ServerWriter<::mpp::MPPDataPacket> * writer) override;

::grpc::Status CancelMPPTask(::grpc::ServerContext* context, const ::mpp::CancelTaskRequest* request, ::mpp::CancelTaskResponse* response) override;

private:
std::tuple<Context, ::grpc::Status> createDBContext(const grpc::ServerContext * grpc_context) const;

Expand Down

0 comments on commit dee8374

Please sign in to comment.