Skip to content

Commit

Permalink
feat(cluster): extend ExecutorGetter to get sshkey
Browse files Browse the repository at this point in the history
  • Loading branch information
jsvisa committed Nov 17, 2020
1 parent 40e1d9a commit bf61123
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 14 deletions.
4 changes: 2 additions & 2 deletions components/dm/command/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func ScaleInDMCluster(
continue
}
instCount[instance.GetHost()]--
if err := operator.StopAndDestroyInstance(getter, topo, instance, options, instCount[instance.GetHost()] == 0, publicKeyPath); err != nil {
if err := operator.StopAndDestroyInstance(getter, topo, instance, options, instCount[instance.GetHost()] == 0); err != nil {
log.Warnf("failed to stop/destroy %s: %v", component.Name(), err)
}
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func ScaleInDMCluster(

instCount[instance.GetHost()]--
if instCount[instance.GetHost()] == 0 {
if err := operator.DeletePublicKey(getter, instance.GetHost(), publicKeyPath); err != nil {
if err := operator.DeletePublicKey(getter, instance.GetHost()); err != nil {
return errors.Annotatef(err, "failed to delete public key")
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (m *Manager) DestroyCluster(clusterName string, gOpt operator.Options, dest
}, tlsCfg)
}).
Func("DestroyCluster", func(ctx *task.Context) error {
return operator.Destroy(ctx, topo, ctx.PublicKeyPath, destroyOpt)
return operator.Destroy(ctx, topo, destroyOpt)
}).
Build()

Expand Down
14 changes: 8 additions & 6 deletions pkg/cluster/operation/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func Cleanup(
func Destroy(
getter ExecutorGetter,
cluster spec.Topology,
publicKeyPath string,
options Options,
) error {
coms := cluster.ComponentsByStopOrder()
Expand Down Expand Up @@ -96,7 +95,7 @@ func Destroy(

// after all things done, try to remove SSH public key
for host := range instCount {
if err := DeletePublicKey(getter, host, publicKeyPath); err != nil {
if err := DeletePublicKey(getter, host); err != nil {
return nil
}
}
Expand All @@ -107,7 +106,7 @@ func Destroy(
// StopAndDestroyInstance stop and destroy the instance,
// if this instance is the host's last one, and the host has monitor deployed,
// we need to destroy the monitor, either
func StopAndDestroyInstance(getter ExecutorGetter, cluster spec.Topology, instance spec.Instance, options Options, destroyNode bool, publicKeyPath string) error {
func StopAndDestroyInstance(getter ExecutorGetter, cluster spec.Topology, instance spec.Instance, options Options, destroyNode bool) error {
ignoreErr := options.Force
compName := instance.ComponentName()

Expand Down Expand Up @@ -144,7 +143,7 @@ func StopAndDestroyInstance(getter ExecutorGetter, cluster spec.Topology, instan
}
}

if err := DeletePublicKey(getter, instance.GetHost(), publicKeyPath); err != nil {
if err := DeletePublicKey(getter, instance.GetHost()); err != nil {
if !ignoreErr {
return errors.Annotatef(err, "failed to delete public key")
}
Expand Down Expand Up @@ -196,17 +195,20 @@ func DeleteGlobalDirs(getter ExecutorGetter, host string, options *spec.GlobalOp
}

// DeletePublicKey deletes the SSH public key from host
func DeletePublicKey(getter ExecutorGetter, host, pubKeyPath string) error {
func DeletePublicKey(getter ExecutorGetter, host string) error {
e := getter.Get(host)
log.Infof("Delete public key %s", host)
_, pubKeyPath := getter.GetSSHKeySet()
publicKey, err := ioutil.ReadFile(pubKeyPath)
if err != nil {
return perrs.Trace(err)
}

pubKey := string(bytes.TrimSpace(publicKey))
pubKey = strings.ReplaceAll(pubKey, "/", "\\/")
pubKeysFile := executor.FindSSHAuthorizedKeysFile(e)

// delete the public key with Linux `sed` toolkit
c := module.ShellModuleConfig{
Command: fmt.Sprintf("sed -i '/%s/d' %s", pubKey, pubKeysFile),
UseShell: false,
Expand Down Expand Up @@ -518,7 +520,7 @@ func DestroyClusterTombstone(

for _, instance := range instances {
instCount[instance.GetHost()]--
err := StopAndDestroyInstance(getter, cluster, instance, options, instCount[instance.GetHost()] == 0, publicKey)
err := StopAndDestroyInstance(getter, cluster, instance, options, instCount[instance.GetHost()] == 0)
if err != nil {
return errors.AddStack(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cluster/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,6 @@ func FilterInstance(instances []spec.Instance, nodes set.StringSet) (res []spec.
// ExecutorGetter get the executor by host.
type ExecutorGetter interface {
Get(host string) (e executor.Executor)
// GetSSHKeySet gets the SSH private and public key path
GetSSHKeySet() (privateKeyPath, publicKeyPath string)
}
4 changes: 2 additions & 2 deletions pkg/cluster/operation/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func ScaleInCluster(
}

instCount[instance.GetHost()]--
if err := StopAndDestroyInstance(getter, cluster, instance, options, instCount[instance.GetHost()] == 0, publicKeyPath); err != nil {
if err := StopAndDestroyInstance(getter, cluster, instance, options, instCount[instance.GetHost()] == 0); err != nil {
log.Warnf("failed to stop/destroy %s: %v", compName, err)
}

Expand Down Expand Up @@ -226,7 +226,7 @@ func ScaleInCluster(

if !asyncOfflineComps.Exist(instance.ComponentName()) {
instCount[instance.GetHost()]--
if err := StopAndDestroyInstance(getter, cluster, instance, options, instCount[instance.GetHost()] == 0, publicKeyPath); err != nil {
if err := StopAndDestroyInstance(getter, cluster, instance, options, instCount[instance.GetHost()] == 0); err != nil {
return err
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (c *ClusterOperate) Execute(ctx *Context) error {
}
operator.PrintClusterStatus(ctx, c.spec)
case operator.DestroyOperation:
err := operator.Destroy(ctx, c.spec, ctx.PublicKeyPath, c.options)
err := operator.Destroy(ctx, c.spec, c.options)
if err != nil {
return errors.Annotate(err, "failed to destroy")
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/cluster/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type (
checkResults map[string][]*operator.CheckResult
}

// The public/private key is used to access remote server via the user `tidb`
// The private/public key is used to access remote server via the user `tidb`
PrivateKeyPath string
PublicKeyPath string
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func NewContext() *Context {
}
}

// Get implements operation ExecutorGetter interface.
// Get implements the operation.ExecutorGetter interface.
func (ctx *Context) Get(host string) (e executor.Executor) {
ctx.exec.Lock()
e, ok := ctx.exec.executors[host]
Expand All @@ -107,6 +107,11 @@ func (ctx *Context) Get(host string) (e executor.Executor) {
return
}

// GetSSHKeySet implements the operation.ExecutorGetter interface.
func (ctx *Context) GetSSHKeySet() (privateKeyPath, publicKeyPath string) {
return ctx.PrivateKeyPath, ctx.PublicKeyPath
}

// GetExecutor get the executor.
func (ctx *Context) GetExecutor(host string) (e executor.Executor, ok bool) {
// Mock point for unit test
Expand Down

0 comments on commit bf61123

Please sign in to comment.