Skip to content

Commit

Permalink
Fix imported pump & drainer start script (#903)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucklove committed Nov 19, 2020
1 parent 5aedcfa commit d0e1436
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 18 deletions.
8 changes: 4 additions & 4 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (p *Playground) removePumpWhenTombstone(c *api.BinlogClient, inst *instance
defer logIfErr(p.renderSDFile())

for {
tombstone, err := c.IsPumpTombstone(inst.NodeID())
tombstone, err := c.IsPumpTombstone(inst.Addr())
if err != nil {
fmt.Println(err)
}
Expand All @@ -199,7 +199,7 @@ func (p *Playground) removeDrainerWhenTombstone(c *api.BinlogClient, inst *insta
defer logIfErr(p.renderSDFile())

for {
tombstone, err := c.IsDrainerTombstone(inst.NodeID())
tombstone, err := c.IsDrainerTombstone(inst.Addr())
if err != nil {
fmt.Println(err)
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
if err != nil {
return errors.AddStack(err)
}
err = c.OfflinePump(inst.Addr(), inst.NodeID())
err = c.OfflinePump(inst.Addr())
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -344,7 +344,7 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
if err != nil {
return errors.AddStack(err)
}
err = c.OfflineDrainer(inst.Addr(), inst.NodeID())
err = c.OfflineDrainer(inst.Addr())
if err != nil {
return errors.AddStack(err)
}
Expand Down
51 changes: 45 additions & 6 deletions pkg/cluster/api/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,20 @@ type NodeStatus struct {
}

// IsPumpTombstone check if drainer is tombstone.
func (c *BinlogClient) IsPumpTombstone(nodeID string) (bool, error) {
func (c *BinlogClient) IsPumpTombstone(addr string) (bool, error) {
nodeID, err := c.nodeID(addr, "pumps")
if err != nil {
return false, err
}
return c.isTombstone("pumps", nodeID)
}

// IsDrainerTombstone check if drainer is tombstone.
func (c *BinlogClient) IsDrainerTombstone(nodeID string) (bool, error) {
func (c *BinlogClient) IsDrainerTombstone(addr string) (bool, error) {
nodeID, err := c.nodeID(addr, "drainers")
if err != nil {
return false, err
}
return c.isTombstone("drainer", nodeID)
}

Expand Down Expand Up @@ -121,13 +129,36 @@ func (c *BinlogClient) drainerNodeStatus() (status []*NodeStatus, err error) {
return c.nodeStatus("drainers")
}

func (c *BinlogClient) nodeID(addr, ty string) (string, error) {
nodes, err := c.nodeStatus(ty)
if err != nil {
return "", err
}

for _, node := range nodes {
if addr == node.Addr {
return node.NodeID, nil
}
}

return "", errors.Errorf("pump node id for address %s not found", addr)
}

// UpdateDrainerState update the specify state as the specified state.
func (c *BinlogClient) UpdateDrainerState(nodeID string, state string) error {
func (c *BinlogClient) UpdateDrainerState(addr string, state string) error {
nodeID, err := c.nodeID(addr, "drainers")
if err != nil {
return err
}
return c.updateStatus("drainers", nodeID, state)
}

// UpdatePumpState update the specify state as the specified state.
func (c *BinlogClient) UpdatePumpState(nodeID string, state string) error {
func (c *BinlogClient) UpdatePumpState(addr string, state string) error {
nodeID, err := c.nodeID(addr, "pumps")
if err != nil {
return err
}
return c.updateStatus("pumps", nodeID, state)
}

Expand Down Expand Up @@ -228,11 +259,19 @@ func (c *BinlogClient) offline(addr string, nodeID string) error {
}

// OfflinePump offline a pump.
func (c *BinlogClient) OfflinePump(addr string, nodeID string) error {
func (c *BinlogClient) OfflinePump(addr string) error {
nodeID, err := c.nodeID(addr, "pumps")
if err != nil {
return err
}
return c.offline(addr, nodeID)
}

// OfflineDrainer offline a drainer.
func (c *BinlogClient) OfflineDrainer(addr string, nodeID string) error {
func (c *BinlogClient) OfflineDrainer(addr string) error {
nodeID, err := c.nodeID(addr, "drainers")
if err != nil {
return err
}
return c.offline(addr, nodeID)
}
4 changes: 2 additions & 2 deletions pkg/cluster/embed/autogen_pkger.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
// Check if there is some instance in tombstone state
nodes, _ := operator.DestroyTombstone(ctx, t, true /* returnNodesOnly */, opt, tlsCfg)
if len(nodes) != 0 {
color.Green("There are some nodes in state: `Tombstone`\n\tNodes: %+v\n\tYou can destroy them with the command: `tiup cluster prune %s`", nodes, clusterName)
color.Green("There are some nodes can be pruned: \n\tNodes: %+v\n\tYou can destroy them with the command: `tiup cluster prune %s`", nodes, clusterName)
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/operation/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ func deleteMember(
}
case spec.ComponentDrainer:
addr := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort())
err := binlogClient.OfflineDrainer(addr, addr)
err := binlogClient.OfflineDrainer(addr)
if err != nil {
return errors.AddStack(err)
return err
}
case spec.ComponentPump:
addr := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort())
err := binlogClient.OfflinePump(addr, addr)
err := binlogClient.OfflinePump(addr)
if err != nil {
return errors.AddStack(err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ func (i *DrainerInstance) InitConfig(

enableTLS := i.topo.GlobalOptions.TLSEnabled
spec := i.InstanceSpec.(DrainerSpec)
nodeID := i.GetHost() + ":" + strconv.Itoa(i.GetPort())
// keep origin node id if is imported
if i.IsImported() {
nodeID = ""
}
cfg := scripts.NewDrainerScript(
i.GetHost()+":"+strconv.Itoa(i.GetPort()),
nodeID,
i.GetHost(),
paths.Deploy,
paths.Data[0],
Expand Down
7 changes: 6 additions & 1 deletion pkg/cluster/spec/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,13 @@ func (i *PumpInstance) InitConfig(

enableTLS := i.topo.GlobalOptions.TLSEnabled
spec := i.InstanceSpec.(PumpSpec)
nodeID := i.GetHost() + ":" + strconv.Itoa(i.GetPort())
// keep origin node id if is imported
if i.IsImported() {
nodeID = ""
}
cfg := scripts.NewPumpScript(
i.GetHost()+":"+strconv.Itoa(i.GetPort()),
nodeID,
i.GetHost(),
paths.Deploy,
paths.Data[0],
Expand Down
2 changes: 2 additions & 0 deletions templates/scripts/run_drainer.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/drainer \
{{- else}}
exec bin/drainer \
{{- end}}
{{- if .NodeID}}
--node-id="{{.NodeID}}" \
{{- end}}
--addr="{{.IP}}:{{.Port}}" \
--pd-urls="{{template "PDList" .Endpoints}}" \
--data-dir="{{.DataDir}}" \
Expand Down
2 changes: 2 additions & 0 deletions templates/scripts/run_pump.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/pump \
{{- else}}
exec bin/pump \
{{- end}}
{{- if .NodeID}}
--node-id="{{.NodeID}}" \
{{- end}}
--addr="0.0.0.0:{{.Port}}" \
--advertise-addr="{{.Host}}:{{.Port}}" \
--pd-urls="{{template "PDList" .Endpoints}}" \
Expand Down

0 comments on commit d0e1436

Please sign in to comment.