Skip to content

Commit

Permalink
binlog: fix show pump/drainer status (#44764) (#44994)
Browse files Browse the repository at this point in the history
ref #42643
  • Loading branch information
ti-chi-bot authored Jul 7, 2023
1 parent 1ec0d02 commit c9fc45d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ go_library(
"//privilege/privileges",
"//session/txninfo",
"//sessionctx",
"//sessionctx/binloginfo",
"//sessionctx/sessionstates",
"//sessionctx/stmtctx",
"//sessionctx/variable",
Expand Down
7 changes: 6 additions & 1 deletion executor/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ type ChangeExec struct {
func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error {
kind := strings.ToLower(e.NodeType)
urls := config.GetGlobalConfig().Path
registry, err := createRegistry(urls)
registry, needToClose, err := getOrCreateBinlogRegistry(urls)
if err != nil {
return err
}
if needToClose {
defer func() {
_ = registry.Close()
}()
}
nodes, _, err := registry.Nodes(ctx, node.NodePrefix[kind])
if err != nil {
return err
Expand Down
26 changes: 16 additions & 10 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/sessionstates"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -1833,16 +1834,18 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error {

// fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows.
func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
registry, err := createRegistry(config.GetGlobalConfig().Path)
registry, needToClose, err := getOrCreateBinlogRegistry(config.GetGlobalConfig().Path)
if err != nil {
return errors.Trace(err)
}

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
if needToClose {
defer func() {
_ = registry.Close()
}()
}
err = registry.Close()

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1857,18 +1860,21 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
return nil
}

// createRegistry returns an ectd registry
func createRegistry(urls string) (*node.EtcdRegistry, error) {
// getOrCreateBinlogRegistry returns an etcd registry for binlog, need to close, and error
func getOrCreateBinlogRegistry(urls string) (*node.EtcdRegistry, bool, error) {
if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil {
return pumpClient.EtcdRegistry, false, nil
}
ectdEndpoints, err := util.ParseHostPortAddr(urls)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}

return node.NewEtcdRegistry(cli, etcdDialTimeout), nil
return node.NewEtcdRegistry(cli, etcdDialTimeout), true, nil
}

func (e *ShowExec) getTable() (table.Table, error) {
Expand Down

0 comments on commit c9fc45d

Please sign in to comment.