Skip to content

Commit

Permalink
Fix/scalein pump monitor notreload (#958)
Browse files Browse the repository at this point in the history
* style(cluster): use IterInstance instead of for-loop twice

* fix(cluster/update_meta): alter the metedata directly

* fix(dm/update_meta): alter the metedata directly

* feat(cluster/scale_in): don't need to alter pd's topo independent

* tests(cluster,dm): add prometheus's config regenerate after scale-in

* feat(cluster/prune): updateMeta and reload config after destroy tombstone

* tests(cluster): add pump scale testcase

* fix(tests/tiup): remove ipprefix
  • Loading branch information
jsvisa authored Dec 9, 2020
1 parent 7166440 commit a9370cd
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 161 deletions.
70 changes: 1 addition & 69 deletions components/cluster/command/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,6 @@
package command

import (
"fmt"

"github.com/fatih/color"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cliutil"
operator "github.com/pingcap/tiup/pkg/cluster/operation"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/pingcap/tiup/pkg/cluster/task"
"github.com/pingcap/tiup/pkg/logger/log"
"github.com/spf13/cobra"
)

Expand All @@ -37,68 +28,9 @@ func newPruneCmd() *cobra.Command {

clusterName := args[0]

metadata, err := spec.ClusterMetadata(clusterName)
if err != nil {
return err
}

return destroyTombstoneIfNeed(clusterName, metadata, gOpt, skipConfirm)
return manager.DestroyTombstone(clusterName, gOpt, skipConfirm)
},
}

return cmd
}

func destroyTombstoneIfNeed(clusterName string, metadata *spec.ClusterMeta, opt operator.Options, skipConfirm bool) error {
topo := metadata.Topology

if !operator.NeedCheckTombstone(topo) {
return nil
}

tlsCfg, err := topo.TLSConfig(tidbSpec.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
}

ctx := task.NewContext()
err = ctx.SetSSHKeySet(spec.ClusterPath(clusterName, "ssh", "id_rsa"),
spec.ClusterPath(clusterName, "ssh", "id_rsa.pub"))
if err != nil {
return perrs.AddStack(err)
}

err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout, gOpt.SSHType, topo.BaseTopo().GlobalOptions.SSHType)
if err != nil {
return perrs.AddStack(err)
}

nodes, err := operator.DestroyTombstone(ctx, topo, true /* returnNodesOnly */, opt, tlsCfg)
if err != nil {
return perrs.AddStack(err)
}

if len(nodes) == 0 {
return nil
}

if !skipConfirm {
err = cliutil.PromptForConfirmOrAbortError(
color.HiYellowString(fmt.Sprintf("Will destroy these nodes: %v\nDo you confirm this action? [y/N]:", nodes)),
)
if err != nil {
return err
}
}

log.Infof("Start destroy Tombstone nodes: %v ...", nodes)

_, err = operator.DestroyTombstone(ctx, topo, false /* returnNodesOnly */, opt, tlsCfg)
if err != nil {
return perrs.AddStack(err)
}

log.Infof("Destroy success")

return spec.SaveClusterMeta(clusterName, metadata)
}
38 changes: 23 additions & 15 deletions components/dm/task/update_dm_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,50 +41,58 @@ func NewUpdateDMMeta(cluster string, metadata *dmspec.Metadata, deletedNodesID [
}

// Execute implements the Task interface
// the metadata especially the topology is in wide use,
// the other callers point to this field by a pointer,
// so we should update the original topology directly, and don't make a copy
func (u *UpdateDMMeta) Execute(ctx *task.Context) error {
// make a copy
newMeta := &dmspec.Metadata{}
*newMeta = *u.metadata
newMeta.Topology = &dmspec.Specification{
GlobalOptions: u.metadata.Topology.GlobalOptions,
// MonitoredOptions: u.metadata.Topology.MonitoredOptions,
ServerConfigs: u.metadata.Topology.ServerConfigs,
}

deleted := set.NewStringSet(u.deletedNodesID...)
topo := u.metadata.Topology
masters := make([]dmspec.MasterSpec, 0)
for i, instance := range (&dmspec.DMMasterComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Masters = append(newMeta.Topology.Masters, topo.Masters[i])
masters = append(masters, topo.Masters[i])
}
topo.Masters = masters

workers := make([]dmspec.WorkerSpec, 0)
for i, instance := range (&dmspec.DMWorkerComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Workers = append(newMeta.Topology.Workers, topo.Workers[i])
workers = append(workers, topo.Workers[i])
}
topo.Workers = workers

monitors := make([]spec.PrometheusSpec, 0)
for i, instance := range (&spec.MonitorComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Monitors = append(newMeta.Topology.Monitors, topo.Monitors[i])
monitors = append(monitors, topo.Monitors[i])
}
topo.Monitors = monitors

grafanas := make([]spec.GrafanaSpec, 0)
for i, instance := range (&spec.GrafanaComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Grafanas = append(newMeta.Topology.Grafanas, topo.Grafanas[i])
grafanas = append(grafanas, topo.Grafanas[i])
}
topo.Grafanas = grafanas

alertmanagers := make([]spec.AlertmanagerSpec, 0)
for i, instance := range (&spec.AlertManagerComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Alertmanagers = append(newMeta.Topology.Alertmanagers, topo.Alertmanagers[i])
alertmanagers = append(alertmanagers, topo.Alertmanagers[i])
}
topo.Alertmanagers = alertmanagers

return dmspec.GetSpecManager().SaveMeta(u.cluster, newMeta)
return dmspec.GetSpecManager().SaveMeta(u.cluster, u.metadata)
}

// Rollback implements the Task interface
Expand Down
207 changes: 162 additions & 45 deletions pkg/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,53 +1395,51 @@ func (m *Manager) ScaleIn(
var regenConfigTasks []task.Task
hasImported := false
deletedNodes := set.NewStringSet(nodes...)
for _, component := range topo.ComponentsByStartOrder() {
for _, instance := range component.Instances() {
if deletedNodes.Exist(instance.ID()) {
continue
}
deployDir := spec.Abs(base.User, instance.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(base.User, instance.DataDir())
// log dir will always be with values, but might not used by the component
logDir := spec.Abs(base.User, instance.LogDir())
topo.IterInstance(func(instance spec.Instance) {
if deletedNodes.Exist(instance.ID()) {
return
}
deployDir := spec.Abs(base.User, instance.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(base.User, instance.DataDir())
// log dir will always be with values, but might not used by the component
logDir := spec.Abs(base.User, instance.LogDir())

// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder()
if instance.IsImported() {
switch compName := instance.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertmanager:
version := m.bindVersion(compName, base.Version)
tb.Download(compName, instance.OS(), instance.Arch(), version).
CopyComponent(
compName,
instance.OS(),
instance.Arch(),
version,
"", // use default srcPath
instance.GetHost(),
deployDir,
)
}
hasImported = true
// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder()
if instance.IsImported() {
switch compName := instance.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertmanager:
version := m.bindVersion(compName, base.Version)
tb.Download(compName, instance.OS(), instance.Arch(), version).
CopyComponent(
compName,
instance.OS(),
instance.Arch(),
version,
"", // use default srcPath
instance.GetHost(),
deployDir,
)
}

t := tb.InitConfig(clusterName,
base.Version,
m.specManager,
instance,
base.User,
true, // always ignore config check result in scale in
meta.DirPaths{
Deploy: deployDir,
Data: dataDirs,
Log: logDir,
Cache: m.specManager.Path(clusterName, spec.TempConfigPath),
},
).Build()
regenConfigTasks = append(regenConfigTasks, t)
hasImported = true
}
}

t := tb.InitConfig(clusterName,
base.Version,
m.specManager,
instance,
base.User,
true, // always ignore config check result in scale in
meta.DirPaths{
Deploy: deployDir,
Data: dataDirs,
Log: logDir,
Cache: m.specManager.Path(clusterName, spec.TempConfigPath),
},
).Build()
regenConfigTasks = append(regenConfigTasks, t)
})

// handle dir scheme changes
if hasImported {
Expand All @@ -1461,7 +1459,6 @@ func (m *Manager) ScaleIn(
m.specManager.Path(clusterName, "ssh", "id_rsa.pub")).
ClusterSSH(topo, base.User, sshTimeout, sshType, metadata.GetTopology().BaseTopo().GlobalOptions.SSHType)

// TODO: support command scale in operation.
scale(b, metadata, tlsCfg)

t := b.Parallel(force, regenConfigTasks...).Parallel(force, buildDynReloadProm(metadata.GetTopology())...).Build()
Expand Down Expand Up @@ -1595,6 +1592,126 @@ func (m *Manager) ScaleOut(
return nil
}

// DestroyTombstone destory and remove instances that is in tombstone state
func (m *Manager) DestroyTombstone(
clusterName string,
gOpt operator.Options,
skipConfirm bool,
) error {
var (
sshTimeout = gOpt.SSHTimeout
sshType = gOpt.SSHType
)

metadata, err := m.meta(clusterName)
// allow specific validation errors so that user can recover a broken
// cluster if it is somehow in a bad state.
if err != nil &&
!errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) {
return perrs.AddStack(err)
}

topo := metadata.GetTopology()
base := metadata.GetBaseMeta()

clusterMeta := metadata.(*spec.ClusterMeta)
cluster := clusterMeta.Topology

if !operator.NeedCheckTombstone(cluster) {
return nil
}

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return err
}

b := task.NewBuilder().
SSHKeySet(
m.specManager.Path(clusterName, "ssh", "id_rsa"),
m.specManager.Path(clusterName, "ssh", "id_rsa.pub")).
ClusterSSH(topo, base.User, sshTimeout, sshType, metadata.GetTopology().BaseTopo().GlobalOptions.SSHType)

var nodes []string
b.
Func("FindTomestoneNodes", func(ctx *task.Context) (err error) {
nodes, err = operator.DestroyTombstone(ctx, cluster, true /* returnNodesOnly */, gOpt, tlsCfg)
if !skipConfirm {
err = cliutil.PromptForConfirmOrAbortError(
color.HiYellowString(fmt.Sprintf("Will destroy these nodes: %v\nDo you confirm this action? [y/N]:", nodes)),
)
if err != nil {
return err
}
}
log.Infof("Start destroy Tombstone nodes: %v ...", nodes)
return err
}).
ClusterOperate(cluster, operator.DestroyTombstoneOperation, gOpt, tlsCfg).
UpdateMeta(clusterName, clusterMeta, nodes).
UpdateTopology(clusterName, m.specManager.Path(clusterName), clusterMeta, nodes)

var regenConfigTasks []task.Task
deletedNodes := set.NewStringSet(nodes...)
topo.IterInstance(func(instance spec.Instance) {
if deletedNodes.Exist(instance.ID()) {
return
}
deployDir := spec.Abs(base.User, instance.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(base.User, instance.DataDir())
// log dir will always be with values, but might not used by the component
logDir := spec.Abs(base.User, instance.LogDir())

// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder()
if instance.IsImported() {
switch compName := instance.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertmanager:
version := m.bindVersion(compName, base.Version)
tb.Download(compName, instance.OS(), instance.Arch(), version).
CopyComponent(
compName,
instance.OS(),
instance.Arch(),
version,
"", // use default srcPath
instance.GetHost(),
deployDir,
)
}
}

t := tb.InitConfig(clusterName,
base.Version,
m.specManager,
instance,
base.User,
true, // always ignore config check result in scale in
meta.DirPaths{
Deploy: deployDir,
Data: dataDirs,
Log: logDir,
Cache: m.specManager.Path(clusterName, spec.TempConfigPath),
},
).Build()
regenConfigTasks = append(regenConfigTasks, t)
})

t := b.Parallel(true, regenConfigTasks...).Parallel(true, buildDynReloadProm(metadata.GetTopology())...).Build()
if err := t.Execute(task.NewContext()); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
}
return perrs.Trace(err)
}

log.Infof("Destroy success")

return nil
}

func (m *Manager) meta(name string) (metadata spec.Metadata, err error) {
exist, err := m.specManager.Exist(name)
if err != nil {
Expand Down
Loading

0 comments on commit a9370cd

Please sign in to comment.