Skip to content

Commit

Permalink
binlog: fix show pump/drainer status (#44764) (#44992)
Browse files Browse the repository at this point in the history
ref #42643
  • Loading branch information
ti-chi-bot authored Jul 2, 2023
1 parent b5e89cb commit 267ae7d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
7 changes: 6 additions & 1 deletion executor/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ type ChangeExec struct {
func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error {
kind := strings.ToLower(e.NodeType)
urls := config.GetGlobalConfig().Path
registry, err := createRegistry(urls)
registry, needToClose, err := getOrCreateBinlogRegistry(urls)
if err != nil {
return err
}
if needToClose {
defer func() {
_ = registry.Close()
}()
}
nodes, _, err := registry.Nodes(ctx, node.NodePrefix[kind])
if err != nil {
return err
Expand Down
26 changes: 16 additions & 10 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -1681,16 +1682,18 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error {

// fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows.
func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
registry, err := createRegistry(config.GetGlobalConfig().Path)
registry, needToClose, err := getOrCreateBinlogRegistry(config.GetGlobalConfig().Path)
if err != nil {
return errors.Trace(err)
}

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
if needToClose {
defer func() {
_ = registry.Close()
}()
}
err = registry.Close()

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1705,18 +1708,21 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
return nil
}

// createRegistry returns an ectd registry
func createRegistry(urls string) (*node.EtcdRegistry, error) {
// getOrCreateBinlogRegistry returns an etcd registry for binlog, need to close, and error
func getOrCreateBinlogRegistry(urls string) (*node.EtcdRegistry, bool, error) {
if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil {
return pumpClient.EtcdRegistry, false, nil
}
ectdEndpoints, err := util.ParseHostPortAddr(urls)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}

return node.NewEtcdRegistry(cli, etcdDialTimeout), nil
return node.NewEtcdRegistry(cli, etcdDialTimeout), true, nil
}

func (e *ShowExec) getTable() (table.Table, error) {
Expand Down

0 comments on commit 267ae7d

Please sign in to comment.