From 12778fa80ca739bfd248891452ae80b8c5cf52b5 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 27 Jun 2023 18:28:06 +0800 Subject: [PATCH] This is an automated cherry-pick of #44764 Signed-off-by: ti-chi-bot --- executor/BUILD.bazel | 487 +++++++++++++++++++++++++++++++++++++++++++ executor/change.go | 7 +- executor/show.go | 32 ++- 3 files changed, 517 insertions(+), 9 deletions(-) create mode 100644 executor/BUILD.bazel diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel new file mode 100644 index 0000000000000..83391f81e975e --- /dev/null +++ b/executor/BUILD.bazel @@ -0,0 +1,487 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "executor", + srcs = [ + "adapter.go", + "admin.go", + "admin_plugins.go", + "admin_telemetry.go", + "aggregate.go", + "analyze.go", + "analyze_col.go", + "analyze_col_v2.go", + "analyze_fast.go", + "analyze_global_stats.go", + "analyze_idx.go", + "analyze_incremental.go", + "analyze_utils.go", + "analyze_worker.go", + "apply_cache.go", + "batch_checker.go", + "batch_point_get.go", + "bind.go", + "brie.go", + "builder.go", + "calibrate_resource.go", + "change.go", + "checksum.go", + "compact_table.go", + "compiler.go", + "concurrent_map.go", + "coprocessor.go", + "cte.go", + "cte_table_reader.go", + "ddl.go", + "delete.go", + "distsql.go", + "executor.go", + "explain.go", + "foreign_key.go", + "grant.go", + "hash_table.go", + "import_into.go", + "index_advise.go", + "index_lookup_hash_join.go", + "index_lookup_join.go", + "index_lookup_merge_join.go", + "index_merge_reader.go", + "infoschema_reader.go", + "insert.go", + "insert_common.go", + "inspection_common.go", + "inspection_profile.go", + "inspection_result.go", + "inspection_summary.go", + "join.go", + "joiner.go", + "load_data.go", + "load_stats.go", + "lock_stats.go", + "mem_reader.go", + "memtable_reader.go", + "merge_join.go", + "metrics_reader.go", + "mpp_gather.go", + "opt_rule_blacklist.go", + "parallel_apply.go", + "pipelined_window.go", + "plan_replayer.go", + "point_get.go", + "prepared.go", + "projection.go", + "reload_expr_pushdown_blacklist.go", + "replace.go", + "revoke.go", + "sample.go", + "select_into.go", + "set.go", + "set_config.go", + "show.go", + "show_placement.go", + "show_stats.go", + "shuffle.go", + "simple.go", + "slow_query.go", + "sort.go", + "split.go", + "stmtsummary.go", + "table_reader.go", + "trace.go", + "union_scan.go", + "update.go", + "utils.go", + "window.go", + "write.go", + ], + importpath = "github.com/pingcap/tidb/executor", + visibility = ["//visibility:public"], + deps = [ + "//bindinfo", + "//br/pkg/glue", + "//br/pkg/lightning/mydump", + "//br/pkg/storage", + "//br/pkg/task", + "//br/pkg/task/show", + "//br/pkg/utils", + "//config", + "//ddl", + "//ddl/label", + "//ddl/placement", + "//ddl/schematracker", + "//distsql", + "//disttask/framework/proto", + "//disttask/framework/storage", + "//disttask/importinto", + "//domain", + "//domain/infosync", + "//domain/resourcegroup", + "//errno", + "//executor/aggfuncs", + "//executor/asyncloaddata", + "//executor/importer", + "//executor/internal/builder", + "//executor/internal/mpp", + "//executor/internal/util", + "//executor/metrics", + "//expression", + "//expression/aggregation", + "//infoschema", + "//keyspace", + "//kv", + "//meta", + "//meta/autoid", + "//metrics", + "//parser", + "//parser/ast", + "//parser/auth", + "//parser/charset", + "//parser/duration", + "//parser/format", + "//parser/model", + "//parser/mysql", + "//parser/terror", + "//parser/tidb", + "//parser/types", + "//planner", + "//planner/core", + "//planner/util", + "//plugin", + "//privilege", + "//privilege/privileges", + "//resourcemanager/pool/workerpool", + "//resourcemanager/util", + "//session/txninfo", + "//sessionctx", + "//sessionctx/binloginfo", + "//sessionctx/sessionstates", + "//sessionctx/stmtctx", + "//sessionctx/variable", + "//sessiontxn", + "//sessiontxn/staleread", + "//statistics", + "//statistics/handle", + "//store/driver/backoff", + "//store/driver/error", + "//store/driver/txn", + "//store/helper", + "//table", + "//table/tables", + "//table/temptable", + "//tablecodec", + "//telemetry", + "//tidb-binlog/node", + "//types", + "//types/parser_driver", + "//util", + "//util/admin", + "//util/bitmap", + "//util/breakpoint", + "//util/channel", + "//util/chunk", + "//util/codec", + "//util/collate", + "//util/cteutil", + "//util/dbterror", + "//util/dbterror/exeerrors", + "//util/deadlockhistory", + "//util/disk", + "//util/etcd", + "//util/execdetails", + "//util/format", + "//util/gcutil", + "//util/globalconn", + "//util/hack", + "//util/hint", + "//util/intest", + "//util/keydecoder", + "//util/kvcache", + "//util/logutil", + "//util/logutil/consistency", + "//util/mathutil", + "//util/memory", + "//util/mvmap", + "//util/password-validation", + "//util/pdapi", + "//util/plancodec", + "//util/printer", + "//util/ranger", + "//util/replayer", + "//util/resourcegrouptag", + "//util/rowDecoder", + "//util/rowcodec", + "//util/sem", + "//util/servermemorylimit", + "//util/set", + "//util/size", + "//util/sqlexec", + "//util/stmtsummary", + "//util/stmtsummary/v2:stmtsummary", + "//util/stringutil", + "//util/syncutil", + "//util/table-filter", + "//util/timeutil", + "//util/tls", + "//util/topsql", + "//util/topsql/state", + "//util/tracing", + "@com_github_burntsushi_toml//:toml", + "@com_github_docker_go_units//:go-units", + "@com_github_gogo_protobuf//proto", + "@com_github_ngaut_pools//:pools", + "@com_github_opentracing_basictracer_go//:basictracer-go", + "@com_github_opentracing_opentracing_go//:opentracing-go", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_pingcap_kvproto//pkg/deadlock", + "@com_github_pingcap_kvproto//pkg/diagnosticspb", + "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/resource_manager", + "@com_github_pingcap_kvproto//pkg/tikvpb", + "@com_github_pingcap_log//:log", + "@com_github_pingcap_sysutil//:sysutil", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_prometheus_client_golang//api", + "@com_github_prometheus_client_golang//api/prometheus/v1:prometheus", + "@com_github_prometheus_client_golang//prometheus", + "@com_github_prometheus_common//model", + "@com_github_tikv_client_go_v2//error", + "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", + "@com_github_tikv_client_go_v2//txnkv", + "@com_github_tikv_client_go_v2//txnkv/txnlock", + "@com_github_tikv_client_go_v2//txnkv/txnsnapshot", + "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//:client", + "@com_github_twmb_murmur3//:murmur3", + "@com_sourcegraph_sourcegraph_appdash//:appdash", + "@com_sourcegraph_sourcegraph_appdash//opentracing", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//credentials/insecure", + "@org_golang_google_grpc//status", + "@org_golang_x_exp//slices", + "@org_golang_x_sync//errgroup", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) + +go_test( + name = "executor_test", + timeout = "moderate", + srcs = [ + "adapter_test.go", + "aggregate_test.go", + "analyze_test.go", + "apply_cache_test.go", + "batch_point_get_test.go", + "benchmark_test.go", + "brie_test.go", + "calibrate_resource_test.go", + "charset_test.go", + "chunk_size_control_test.go", + "cluster_table_test.go", + "collation_test.go", + "compact_table_test.go", + "concurrent_map_test.go", + "copr_cache_test.go", + "cte_test.go", + "ddl_test.go", + "delete_test.go", + "distsql_test.go", + "executor_failpoint_test.go", + "executor_pkg_test.go", + "executor_required_rows_test.go", + "executor_test.go", + "executor_txn_test.go", + "explain_test.go", + "explain_unit_test.go", + "explainfor_test.go", + "grant_test.go", + "hash_table_test.go", + "historical_stats_test.go", + "hot_regions_history_table_test.go", + "import_into_test.go", + "index_advise_test.go", + "index_lookup_join_test.go", + "index_lookup_merge_join_test.go", + "infoschema_cluster_table_test.go", + "infoschema_reader_test.go", + "insert_test.go", + "inspection_common_test.go", + "inspection_result_test.go", + "inspection_summary_test.go", + "join_pkg_test.go", + "join_test.go", + "joiner_test.go", + "main_test.go", + "memtable_reader_test.go", + "merge_join_test.go", + "metrics_reader_test.go", + "parallel_apply_test.go", + "partition_table_test.go", + "pkg_test.go", + "point_get_test.go", + "prepared_test.go", + "recover_test.go", + "resource_tag_test.go", + "revoke_test.go", + "rowid_test.go", + "sample_test.go", + "select_into_test.go", + "set_test.go", + "show_placement_labels_test.go", + "show_placement_test.go", + "show_stats_test.go", + "show_test.go", + "shuffle_test.go", + "simple_test.go", + "slow_query_sql_test.go", + "slow_query_test.go", + "sort_test.go", + "split_test.go", + "stale_txn_test.go", + "statement_context_test.go", + "stmtsummary_test.go", + "table_readers_required_rows_test.go", + "temporary_table_test.go", + "tikv_regions_peers_table_test.go", + "trace_test.go", + "union_scan_test.go", + "update_test.go", + "utils_test.go", + "window_test.go", + "write_concurrent_test.go", + ], + data = glob(["testdata/**"]), + embed = [":executor"], + flaky = True, + shard_count = 50, + deps = [ + "//config", + "//ddl", + "//ddl/placement", + "//ddl/schematracker", + "//ddl/testutil", + "//ddl/util", + "//distsql", + "//domain", + "//domain/infosync", + "//errno", + "//executor/aggfuncs", + "//executor/importer", + "//executor/internal/builder", + "//expression", + "//expression/aggregation", + "//infoschema", + "//kv", + "//meta", + "//meta/autoid", + "//metrics", + "//parser", + "//parser/ast", + "//parser/auth", + "//parser/model", + "//parser/mysql", + "//parser/terror", + "//planner", + "//planner/core", + "//planner/property", + "//planner/util", + "//server", + "//session", + "//sessionctx", + "//sessionctx/binloginfo", + "//sessionctx/stmtctx", + "//sessionctx/variable", + "//sessionctx/variable/featuretag/disttask", + "//sessiontxn", + "//sessiontxn/staleread", + "//statistics", + "//statistics/handle", + "//store/copr", + "//store/driver/error", + "//store/helper", + "//store/mockstore", + "//store/mockstore/unistore", + "//table", + "//table/tables", + "//tablecodec", + "//testkit", + "//testkit/external", + "//testkit/testdata", + "//testkit/testmain", + "//testkit/testsetup", + "//testkit/testutil", + "//types", + "//util", + "//util/benchdaily", + "//util/chunk", + "//util/codec", + "//util/collate", + "//util/dbterror", + "//util/dbterror/exeerrors", + "//util/deadlockhistory", + "//util/disk", + "//util/execdetails", + "//util/gcutil", + "//util/globalconn", + "//util/hack", + "//util/logutil", + "//util/mathutil", + "//util/memory", + "//util/mock", + "//util/paging", + "//util/pdapi", + "//util/plancodec", + "//util/ranger", + "//util/replayer", + "//util/rowcodec", + "//util/sem", + "//util/set", + "//util/sqlexec", + "//util/stmtsummary/v2:stmtsummary", + "//util/stringutil", + "//util/syncutil", + "//util/tableutil", + "//util/timeutil", + "//util/topsql/state", + "@com_github_golang_protobuf//proto", + "@com_github_gorilla_mux//:mux", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_fn//:fn", + "@com_github_pingcap_kvproto//pkg/diagnosticspb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_log//:log", + "@com_github_pingcap_sysutil//:sysutil", + "@com_github_pingcap_tipb//go-binlog", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_prometheus_client_golang//prometheus", + "@com_github_prometheus_client_model//go", + "@com_github_prometheus_common//model", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", + "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//resource_group/controller", + "@org_golang_google_grpc//:grpc", + "@org_golang_x_exp//slices", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_goleak//:goleak", + "@org_uber_go_zap//zapcore", + ], +) diff --git a/executor/change.go b/executor/change.go index 7233863501660..354d5e7e86679 100644 --- a/executor/change.go +++ b/executor/change.go @@ -35,10 +35,15 @@ type ChangeExec struct { func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error { kind := strings.ToLower(e.NodeType) urls := config.GetGlobalConfig().Path - registry, err := createRegistry(urls) + registry, needToClose, err := getOrCreateBinlogRegistry(urls) if err != nil { return err } + if needToClose { + defer func() { + _ = registry.Close() + }() + } nodes, _, err := registry.Nodes(ctx, node.NodePrefix[kind]) if err != nil { return err diff --git a/executor/show.go b/executor/show.go index 07bbe50bb75d6..32e6cfb2e5bbe 100644 --- a/executor/show.go +++ b/executor/show.go @@ -50,6 +50,11 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/sessionctx" +<<<<<<< HEAD +======= + "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/sessionctx/sessionstates" +>>>>>>> ffb0654ae85 (binlog: fix show pump/drainer status (#44764)) "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/helper" @@ -1606,16 +1611,18 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { // fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows. func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { - registry, err := createRegistry(config.GetGlobalConfig().Path) + registry, needToClose, err := getOrCreateBinlogRegistry(config.GetGlobalConfig().Path) if err != nil { return errors.Trace(err) } - nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) - if err != nil { - return errors.Trace(err) + if needToClose { + defer func() { + _ = registry.Close() + }() } - err = registry.Close() + + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } @@ -1630,18 +1637,27 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { return nil } +<<<<<<< HEAD // createRegistry returns an ectd registry func createRegistry(urls string) (*node.EtcdRegistry, error) { ectdEndpoints, err := utils.ParseHostPortAddr(urls) +======= +// getOrCreateBinlogRegistry returns an etcd registry for binlog, need to close, and error +func getOrCreateBinlogRegistry(urls string) (*node.EtcdRegistry, bool, error) { + if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil { + return pumpClient.EtcdRegistry, false, nil + } + ectdEndpoints, err := util.ParseHostPortAddr(urls) +>>>>>>> ffb0654ae85 (binlog: fix show pump/drainer status (#44764)) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } - return node.NewEtcdRegistry(cli, etcdDialTimeout), nil + return node.NewEtcdRegistry(cli, etcdDialTimeout), true, nil } func (e *ShowExec) getTable() (table.Table, error) {