From 14924add94e874e22e6a3cadbbfd0cc4efff0c2e Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 19 Jun 2023 11:13:15 +0800 Subject: [PATCH 1/4] fix --- executor/show.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/executor/show.go b/executor/show.go index ce0fc40cd9059..bf219ca3a66f0 100644 --- a/executor/show.go +++ b/executor/show.go @@ -55,6 +55,7 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/sessionstates" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -1902,6 +1903,9 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { // createRegistry returns an ectd registry func createRegistry(urls string) (*node.EtcdRegistry, error) { + if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil { + return pumpClient.EtcdRegistry, nil + } ectdEndpoints, err := util.ParseHostPortAddr(urls) if err != nil { return nil, errors.Trace(err) From e8e75c66a39f19d0f83ebb17841003fe9a3d5c4e Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 20 Jun 2023 14:51:08 +0800 Subject: [PATCH 2/4] fix close problem --- executor/change.go | 7 ++++++- executor/show.go | 26 ++++++++++++++------------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/executor/change.go b/executor/change.go index a261cbf9c14c9..03d4cd6ef4a12 100644 --- a/executor/change.go +++ b/executor/change.go @@ -35,10 +35,15 @@ type ChangeExec struct { func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error { kind := strings.ToLower(e.NodeType) urls := config.GetGlobalConfig().Path - registry, err := createRegistry(urls) + registry, needToClose, err := createRegistry(urls) if err != nil { return err } + if needToClose { + defer func() { + _ = registry.Close() + }() + } nodes, _, err := registry.Nodes(ctx, node.NodePrefix[kind]) if err != nil { return err diff --git a/executor/show.go b/executor/show.go index bf219ca3a66f0..e0ebd791b5e7d 100644 --- a/executor/show.go +++ b/executor/show.go @@ -1877,16 +1877,18 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { // fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows. func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { - registry, err := createRegistry(config.GetGlobalConfig().Path) + registry, needToClose, err := createRegistry(config.GetGlobalConfig().Path) if err != nil { return errors.Trace(err) } - nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) - if err != nil { - return errors.Trace(err) + if needToClose { + defer func() { + _ = registry.Close() + }() } - err = registry.Close() + + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } @@ -1901,21 +1903,21 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { return nil } -// createRegistry returns an ectd registry -func createRegistry(urls string) (*node.EtcdRegistry, error) { - if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil { - return pumpClient.EtcdRegistry, nil +// createRegistry returns an ectd registry, need to close, and error +func createRegistry(urls string) (*node.EtcdRegistry, bool, error) { + if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil { + return pumpClient.EtcdRegistry, false, nil } ectdEndpoints, err := util.ParseHostPortAddr(urls) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } - return node.NewEtcdRegistry(cli, etcdDialTimeout), nil + return node.NewEtcdRegistry(cli, etcdDialTimeout), true, nil } func (e *ShowExec) getTable() (table.Table, error) { From 43cf74fef76f2ee01514075f344136199400102a Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 20 Jun 2023 15:17:06 +0800 Subject: [PATCH 3/4] fix bazel --- executor/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 385bb2b490414..29000af261b98 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -151,6 +151,7 @@ go_library( "//privilege/privileges", "//session/txninfo", "//sessionctx", + "//sessionctx/binloginfo", "//sessionctx/sessionstates", "//sessionctx/stmtctx", "//sessionctx/variable", From fccba30b34da168899bb434971eb73b9e85ca5bc Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 27 Jun 2023 17:06:54 +0800 Subject: [PATCH 4/4] address comment --- executor/change.go | 2 +- executor/show.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/change.go b/executor/change.go index 03d4cd6ef4a12..df2a211d141fd 100644 --- a/executor/change.go +++ b/executor/change.go @@ -35,7 +35,7 @@ type ChangeExec struct { func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error { kind := strings.ToLower(e.NodeType) urls := config.GetGlobalConfig().Path - registry, needToClose, err := createRegistry(urls) + registry, needToClose, err := getOrCreateBinlogRegistry(urls) if err != nil { return err } diff --git a/executor/show.go b/executor/show.go index e0ebd791b5e7d..29549b9f4aed6 100644 --- a/executor/show.go +++ b/executor/show.go @@ -1877,7 +1877,7 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { // fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows. func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { - registry, needToClose, err := createRegistry(config.GetGlobalConfig().Path) + registry, needToClose, err := getOrCreateBinlogRegistry(config.GetGlobalConfig().Path) if err != nil { return errors.Trace(err) } @@ -1903,8 +1903,8 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { return nil } -// createRegistry returns an ectd registry, need to close, and error -func createRegistry(urls string) (*node.EtcdRegistry, bool, error) { +// getOrCreateBinlogRegistry returns an etcd registry for binlog, need to close, and error +func getOrCreateBinlogRegistry(urls string) (*node.EtcdRegistry, bool, error) { if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil { return pumpClient.EtcdRegistry, false, nil }