Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/scalein pump monitor not reloaded #958

Merged
70 changes: 1 addition & 69 deletions components/cluster/command/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,6 @@
package command

import (
"fmt"

"github.com/fatih/color"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cliutil"
operator "github.com/pingcap/tiup/pkg/cluster/operation"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/pingcap/tiup/pkg/cluster/task"
"github.com/pingcap/tiup/pkg/logger/log"
"github.com/spf13/cobra"
)

Expand All @@ -37,68 +28,9 @@ func newPruneCmd() *cobra.Command {

clusterName := args[0]

metadata, err := spec.ClusterMetadata(clusterName)
if err != nil {
return err
}

return destroyTombstoneIfNeed(clusterName, metadata, gOpt, skipConfirm)
return manager.DestroyTombstone(clusterName, gOpt, skipConfirm)
},
}

return cmd
}

func destroyTombstoneIfNeed(clusterName string, metadata *spec.ClusterMeta, opt operator.Options, skipConfirm bool) error {
topo := metadata.Topology

if !operator.NeedCheckTombstone(topo) {
return nil
}

tlsCfg, err := topo.TLSConfig(tidbSpec.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
}

ctx := task.NewContext()
err = ctx.SetSSHKeySet(spec.ClusterPath(clusterName, "ssh", "id_rsa"),
spec.ClusterPath(clusterName, "ssh", "id_rsa.pub"))
if err != nil {
return perrs.AddStack(err)
}

err = ctx.SetClusterSSH(topo, metadata.User, gOpt.SSHTimeout, gOpt.SSHType, topo.BaseTopo().GlobalOptions.SSHType)
if err != nil {
return perrs.AddStack(err)
}

nodes, err := operator.DestroyTombstone(ctx, topo, true /* returnNodesOnly */, opt, tlsCfg)
if err != nil {
return perrs.AddStack(err)
}

if len(nodes) == 0 {
return nil
}

if !skipConfirm {
err = cliutil.PromptForConfirmOrAbortError(
color.HiYellowString(fmt.Sprintf("Will destroy these nodes: %v\nDo you confirm this action? [y/N]:", nodes)),
)
if err != nil {
return err
}
}

log.Infof("Start destroy Tombstone nodes: %v ...", nodes)

_, err = operator.DestroyTombstone(ctx, topo, false /* returnNodesOnly */, opt, tlsCfg)
if err != nil {
return perrs.AddStack(err)
}

log.Infof("Destroy success")

return spec.SaveClusterMeta(clusterName, metadata)
}
38 changes: 23 additions & 15 deletions components/dm/task/update_dm_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,50 +41,58 @@ func NewUpdateDMMeta(cluster string, metadata *dmspec.Metadata, deletedNodesID [
}

// Execute implements the Task interface
// the metadata especially the topology is in wide use,
// the other callers point to this field by a pointer,
// so we should update the original topology directly, and don't make a copy
func (u *UpdateDMMeta) Execute(ctx *task.Context) error {
// make a copy
newMeta := &dmspec.Metadata{}
*newMeta = *u.metadata
newMeta.Topology = &dmspec.Specification{
GlobalOptions: u.metadata.Topology.GlobalOptions,
// MonitoredOptions: u.metadata.Topology.MonitoredOptions,
ServerConfigs: u.metadata.Topology.ServerConfigs,
}

deleted := set.NewStringSet(u.deletedNodesID...)
topo := u.metadata.Topology
masters := make([]dmspec.MasterSpec, 0)
for i, instance := range (&dmspec.DMMasterComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Masters = append(newMeta.Topology.Masters, topo.Masters[i])
masters = append(masters, topo.Masters[i])
}
topo.Masters = masters

workers := make([]dmspec.WorkerSpec, 0)
for i, instance := range (&dmspec.DMWorkerComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Workers = append(newMeta.Topology.Workers, topo.Workers[i])
workers = append(workers, topo.Workers[i])
}
topo.Workers = workers

monitors := make([]spec.PrometheusSpec, 0)
for i, instance := range (&spec.MonitorComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Monitors = append(newMeta.Topology.Monitors, topo.Monitors[i])
monitors = append(monitors, topo.Monitors[i])
}
topo.Monitors = monitors

grafanas := make([]spec.GrafanaSpec, 0)
for i, instance := range (&spec.GrafanaComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Grafanas = append(newMeta.Topology.Grafanas, topo.Grafanas[i])
grafanas = append(grafanas, topo.Grafanas[i])
}
topo.Grafanas = grafanas

alertmanagers := make([]spec.AlertmanagerSpec, 0)
for i, instance := range (&spec.AlertManagerComponent{Topology: topo}).Instances() {
if deleted.Exist(instance.ID()) {
continue
}
newMeta.Topology.Alertmanagers = append(newMeta.Topology.Alertmanagers, topo.Alertmanagers[i])
alertmanagers = append(alertmanagers, topo.Alertmanagers[i])
}
topo.Alertmanagers = alertmanagers

return dmspec.GetSpecManager().SaveMeta(u.cluster, newMeta)
return dmspec.GetSpecManager().SaveMeta(u.cluster, u.metadata)
}

// Rollback implements the Task interface
Expand Down
207 changes: 162 additions & 45 deletions pkg/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,53 +1395,51 @@ func (m *Manager) ScaleIn(
var regenConfigTasks []task.Task
hasImported := false
deletedNodes := set.NewStringSet(nodes...)
for _, component := range topo.ComponentsByStartOrder() {
for _, instance := range component.Instances() {
if deletedNodes.Exist(instance.ID()) {
continue
}
deployDir := spec.Abs(base.User, instance.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(base.User, instance.DataDir())
// log dir will always be with values, but might not used by the component
logDir := spec.Abs(base.User, instance.LogDir())
topo.IterInstance(func(instance spec.Instance) {
if deletedNodes.Exist(instance.ID()) {
return
}
deployDir := spec.Abs(base.User, instance.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(base.User, instance.DataDir())
// log dir will always be with values, but might not used by the component
logDir := spec.Abs(base.User, instance.LogDir())

// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder()
if instance.IsImported() {
switch compName := instance.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertmanager:
version := m.bindVersion(compName, base.Version)
tb.Download(compName, instance.OS(), instance.Arch(), version).
CopyComponent(
compName,
instance.OS(),
instance.Arch(),
version,
"", // use default srcPath
instance.GetHost(),
deployDir,
)
}
hasImported = true
// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder()
if instance.IsImported() {
switch compName := instance.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertmanager:
version := m.bindVersion(compName, base.Version)
tb.Download(compName, instance.OS(), instance.Arch(), version).
CopyComponent(
compName,
instance.OS(),
instance.Arch(),
version,
"", // use default srcPath
instance.GetHost(),
deployDir,
)
}

t := tb.InitConfig(clusterName,
base.Version,
m.specManager,
instance,
base.User,
true, // always ignore config check result in scale in
meta.DirPaths{
Deploy: deployDir,
Data: dataDirs,
Log: logDir,
Cache: m.specManager.Path(clusterName, spec.TempConfigPath),
},
).Build()
regenConfigTasks = append(regenConfigTasks, t)
hasImported = true
}
}

t := tb.InitConfig(clusterName,
base.Version,
m.specManager,
instance,
base.User,
true, // always ignore config check result in scale in
meta.DirPaths{
Deploy: deployDir,
Data: dataDirs,
Log: logDir,
Cache: m.specManager.Path(clusterName, spec.TempConfigPath),
},
).Build()
regenConfigTasks = append(regenConfigTasks, t)
})

// handle dir scheme changes
if hasImported {
Expand All @@ -1461,7 +1459,6 @@ func (m *Manager) ScaleIn(
m.specManager.Path(clusterName, "ssh", "id_rsa.pub")).
ClusterSSH(topo, base.User, sshTimeout, sshType, metadata.GetTopology().BaseTopo().GlobalOptions.SSHType)

// TODO: support command scale in operation.
scale(b, metadata, tlsCfg)

t := b.Parallel(force, regenConfigTasks...).Parallel(force, buildDynReloadProm(metadata.GetTopology())...).Build()
Expand Down Expand Up @@ -1595,6 +1592,126 @@ func (m *Manager) ScaleOut(
return nil
}

// DestroyTombstone destory and remove instances that is in tombstone state
func (m *Manager) DestroyTombstone(
clusterName string,
gOpt operator.Options,
skipConfirm bool,
) error {
var (
sshTimeout = gOpt.SSHTimeout
sshType = gOpt.SSHType
)

metadata, err := m.meta(clusterName)
// allow specific validation errors so that user can recover a broken
// cluster if it is somehow in a bad state.
if err != nil &&
!errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) {
return perrs.AddStack(err)
}

topo := metadata.GetTopology()
base := metadata.GetBaseMeta()

clusterMeta := metadata.(*spec.ClusterMeta)
cluster := clusterMeta.Topology

if !operator.NeedCheckTombstone(cluster) {
return nil
}

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return err
}

b := task.NewBuilder().
SSHKeySet(
m.specManager.Path(clusterName, "ssh", "id_rsa"),
m.specManager.Path(clusterName, "ssh", "id_rsa.pub")).
ClusterSSH(topo, base.User, sshTimeout, sshType, metadata.GetTopology().BaseTopo().GlobalOptions.SSHType)

var nodes []string
b.
Func("FindTomestoneNodes", func(ctx *task.Context) (err error) {
nodes, err = operator.DestroyTombstone(ctx, cluster, true /* returnNodesOnly */, gOpt, tlsCfg)
if !skipConfirm {
err = cliutil.PromptForConfirmOrAbortError(
color.HiYellowString(fmt.Sprintf("Will destroy these nodes: %v\nDo you confirm this action? [y/N]:", nodes)),
)
if err != nil {
return err
}
}
log.Infof("Start destroy Tombstone nodes: %v ...", nodes)
return err
}).
ClusterOperate(cluster, operator.DestroyTombstoneOperation, gOpt, tlsCfg).
UpdateMeta(clusterName, clusterMeta, nodes).
UpdateTopology(clusterName, m.specManager.Path(clusterName), clusterMeta, nodes)

var regenConfigTasks []task.Task
deletedNodes := set.NewStringSet(nodes...)
topo.IterInstance(func(instance spec.Instance) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a function such as RefreshConfig to build the regenConfigTasks? I found that there is a lot of redundant code: https://github.com/pingcap/tiup/pull/958/files#diff-5609f7b865f14ce911938ab0a8f21caa07446244d9c106a5df27ba9c08cf80e6R1386

Notice: there are many other places use the same logic, we can replace that logic with just a function call:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this PR contains 200+ lines modified, IMO, I'll trigger another PR to do the refactor work. BTW the file manager.go is too large(~ 3000 loc), I want to split this file into some small files If you agree with it. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very nice if you can split the manager.go, I want to do that days ago.

I think we can add a package (eg. tiup/pkg/cluster/manager) which contains several .go files, each of them has different function (eg. split by Deploy, Start, 'ScaleOut` .etc.)

if deletedNodes.Exist(instance.ID()) {
return
}
deployDir := spec.Abs(base.User, instance.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(base.User, instance.DataDir())
// log dir will always be with values, but might not used by the component
logDir := spec.Abs(base.User, instance.LogDir())

// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder()
if instance.IsImported() {
switch compName := instance.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertmanager:
version := m.bindVersion(compName, base.Version)
tb.Download(compName, instance.OS(), instance.Arch(), version).
CopyComponent(
compName,
instance.OS(),
instance.Arch(),
version,
"", // use default srcPath
instance.GetHost(),
deployDir,
)
}
}

t := tb.InitConfig(clusterName,
base.Version,
m.specManager,
instance,
base.User,
true, // always ignore config check result in scale in
meta.DirPaths{
Deploy: deployDir,
Data: dataDirs,
Log: logDir,
Cache: m.specManager.Path(clusterName, spec.TempConfigPath),
},
).Build()
regenConfigTasks = append(regenConfigTasks, t)
})

t := b.Parallel(true, regenConfigTasks...).Parallel(true, buildDynReloadProm(metadata.GetTopology())...).Build()
if err := t.Execute(task.NewContext()); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
}
return perrs.Trace(err)
}

log.Infof("Destroy success")

return nil
}

func (m *Manager) meta(name string) (metadata spec.Metadata, err error) {
exist, err := m.specManager.Exist(name)
if err != nil {
Expand Down
Loading