Skip to content

Commit

Permalink
*: use --wait-timeout for SSH command opeartions with easyssh (#1445)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored Jun 23, 2021
1 parent 857162f commit 5d1942a
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 20 deletions.
2 changes: 1 addition & 1 deletion components/cluster/command/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func newImportCmd() *cobra.Command {
}

// copy config files form deployment servers
if err = ansible.ImportConfig(clsName, clsMeta, gOpt.SSHTimeout, gOpt.SSHType); err != nil {
if err = ansible.ImportConfig(clsName, clsMeta, gOpt.SSHTimeout, gOpt.OptTimeout, gOpt.SSHType); err != nil {
return err
}

Expand Down
18 changes: 15 additions & 3 deletions pkg/cluster/ansible/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// ImportConfig copies config files from cluster which deployed through tidb-ansible
func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, sshType executor.SSHType) error {
func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout, exeTimeout uint64, sshType executor.SSHType) error {
// there may be already cluster dir, skip create
// if err := os.MkdirAll(meta.ClusterPath(name), 0755); err != nil {
// return err
Expand All @@ -45,7 +45,13 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
SSHKeySet(
spec.ClusterPath(name, "ssh", "id_rsa"),
spec.ClusterPath(name, "ssh", "id_rsa.pub")).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout, sshType, "").
UserSSH(
inst.GetHost(),
inst.GetSSHPort(),
clsMeta.User,
sshTimeout,
exeTimeout,
sshType, "").
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+".toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand All @@ -63,7 +69,13 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
SSHKeySet(
spec.ClusterPath(name, "ssh", "id_rsa"),
spec.ClusterPath(name, "ssh", "id_rsa.pub")).
UserSSH(inst.GetHost(), inst.GetSSHPort(), clsMeta.User, sshTimeout, sshType, "").
UserSSH(
inst.GetHost(),
inst.GetSSHPort(),
clsMeta.User,
sshTimeout,
exeTimeout,
sshType, "").
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+".toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand Down
6 changes: 6 additions & 0 deletions pkg/cluster/executor/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type (
Passphrase string // passphrase of the private key file
// Timeout is the maximum amount of time for the TCP connection to establish.
Timeout time.Duration
// ExeTimeout is the maximum abount of time for the command to finish
ExeTimeout time.Duration
}
)

Expand All @@ -106,6 +108,10 @@ func (e *EasySSHExecutor) initialize(config SSHConfig) {
Timeout: config.Timeout, // timeout when connecting to remote
}

if config.ExeTimeout > 0 {
executeDefaultTimeout = config.ExeTimeout
}

// prefer private key authentication
if len(config.KeyFile) > 0 {
e.Config.KeyPath = config.KeyFile
Expand Down
51 changes: 45 additions & 6 deletions pkg/cluster/manager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func buildScaleOutTask(
sshConnProps.IdentityFile,
sshConnProps.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
globalOptions.SSHType,
).
Expand Down Expand Up @@ -158,7 +159,15 @@ func buildScaleOutTask(
}
// Deploy component
tb := task.NewBuilder().
UserSSH(inst.GetHost(), inst.GetSSHPort(), base.User, gOpt.SSHTimeout, gOpt.SSHType, topo.BaseTopo().GlobalOptions.SSHType).
UserSSH(
inst.GetHost(),
inst.GetSSHPort(),
base.User,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
topo.BaseTopo().GlobalOptions.SSHType,
).
Mkdir(base.User, inst.GetHost(), deployDirs...).
Mkdir(base.User, inst.GetHost(), dataDirs...).
Mkdir(base.User, inst.GetHost(), logDir)
Expand Down Expand Up @@ -292,15 +301,29 @@ func buildScaleOutTask(
specManager.Path(name, "ssh", "id_rsa.pub")).
Parallel(false, downloadCompTasks...).
Parallel(false, envInitTasks...).
ClusterSSH(topo, base.User, gOpt.SSHTimeout, gOpt.SSHType, topo.BaseTopo().GlobalOptions.SSHType).
ClusterSSH(
topo,
base.User,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
topo.BaseTopo().GlobalOptions.SSHType,
).
Parallel(false, deployCompTasks...)

if afterDeploy != nil {
afterDeploy(builder, newPart)
}

builder.
ClusterSSH(newPart, base.User, gOpt.SSHTimeout, gOpt.SSHType, topo.BaseTopo().GlobalOptions.SSHType).
ClusterSSH(
newPart,
base.User,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
topo.BaseTopo().GlobalOptions.SSHType,
).
Func("Save meta", func(_ context.Context) error {
metadata.SetTopology(mergedTopo)
return m.specManager.SaveMeta(name, metadata)
Expand Down Expand Up @@ -374,7 +397,15 @@ func buildMonitoredDeployTask(
logDir := spec.Abs(globalOptions.User, monitoredOptions.LogDir)
// Deploy component
t := task.NewBuilder().
UserSSH(host, info.ssh, globalOptions.User, gOpt.SSHTimeout, gOpt.SSHType, globalOptions.SSHType).
UserSSH(
host,
info.ssh,
globalOptions.User,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
globalOptions.SSHType,
).
Mkdir(globalOptions.User, host,
deployDir, dataDir, logDir,
filepath.Join(deployDir, "bin"),
Expand Down Expand Up @@ -416,7 +447,7 @@ func buildRefreshMonitoredConfigTasks(
uniqueHosts map[string]hostInfo, // host -> ssh-port, os, arch
globalOptions spec.GlobalOptions,
monitoredOptions *spec.MonitoredOptions,
sshTimeout uint64,
sshTimeout, exeTimeout uint64,
sshType executor.SSHType,
) []*task.StepDisplay {
if monitoredOptions == nil {
Expand All @@ -438,7 +469,15 @@ func buildRefreshMonitoredConfigTasks(
logDir := spec.Abs(globalOptions.User, monitoredOptions.LogDir)
// Generate configs
t := task.NewBuilder().
UserSSH(host, info.ssh, globalOptions.User, sshTimeout, sshType, globalOptions.SSHType).
UserSSH(
host,
info.ssh,
globalOptions.User,
sshTimeout,
exeTimeout,
sshType,
globalOptions.SSHType,
).
MonitoredConfig(
name,
comp,
Expand Down
3 changes: 3 additions & 0 deletions pkg/cluster/manager/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func checkSystemInfo(s *tui.SSHConnectionProps, topo *spec.Specification, gOpt *
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
topo.GlobalOptions.SSHType,
).
Expand Down Expand Up @@ -315,6 +316,7 @@ func checkSystemInfo(s *tui.SSHConnectionProps, topo *spec.Specification, gOpt *
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
topo.GlobalOptions.SSHType,
).
Expand Down Expand Up @@ -356,6 +358,7 @@ func checkSystemInfo(s *tui.SSHConnectionProps, topo *spec.Specification, gOpt *
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
topo.GlobalOptions.SSHType,
)
Expand Down
11 changes: 10 additions & 1 deletion pkg/cluster/manager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (m *Manager) Deploy(
sshConnProps.IdentityFile,
sshConnProps.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
globalOptions.SSHType,
).
Expand Down Expand Up @@ -266,7 +267,15 @@ func (m *Manager) Deploy(
deployDirs = append(deployDirs, filepath.Join(deployDir, "tls"))
}
t := task.NewBuilder().
UserSSH(inst.GetHost(), inst.GetSSHPort(), globalOptions.User, gOpt.SSHTimeout, gOpt.SSHType, globalOptions.SSHType).
UserSSH(
inst.GetHost(),
inst.GetSSHPort(),
globalOptions.User,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
globalOptions.SSHType,
).
Mkdir(globalOptions.User, inst.GetHost(), deployDirs...).
Mkdir(globalOptions.User, inst.GetHost(), dataDirs...)

Expand Down
10 changes: 9 additions & 1 deletion pkg/cluster/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ func (m *Manager) sshTaskBuilder(name string, topo spec.Topology, user string, o
m.specManager.Path(name, "ssh", "id_rsa"),
m.specManager.Path(name, "ssh", "id_rsa.pub"),
).
ClusterSSH(topo, user, opts.SSHTimeout, opts.SSHType, topo.BaseTopo().GlobalOptions.SSHType)
ClusterSSH(
topo,
user,
opts.SSHTimeout,
opts.OptTimeout,
opts.SSHType,
topo.BaseTopo().GlobalOptions.SSHType,
)
}

func (m *Manager) fillHostArch(s *tui.SSHConnectionProps, topo spec.Topology, gOpt *operator.Options, user string) error {
Expand All @@ -157,6 +164,7 @@ func (m *Manager) fillHostArch(s *tui.SSHConnectionProps, topo spec.Topology, gO
s.IdentityFile,
s.IdentityFilePassphrase,
gOpt.SSHTimeout,
gOpt.OptTimeout,
gOpt.SSHType,
topo.BaseTopo().GlobalOptions.SSHType,
).
Expand Down
2 changes: 2 additions & 0 deletions pkg/cluster/manager/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (m *Manager) Reload(name string, opt operator.Options, skipRestart, skipCon
}

sshTimeout := opt.SSHTimeout
exeTimeout := opt.OptTimeout

metadata, err := m.meta(name)
if err != nil {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (m *Manager) Reload(name string, opt operator.Options, skipRestart, skipCon
*topo.BaseTopo().GlobalOptions,
topo.GetMonitoredOptions(),
sshTimeout,
exeTimeout,
opt.SSHType)

// handle dir scheme changes
Expand Down
15 changes: 13 additions & 2 deletions pkg/cluster/task/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (b *Builder) RootSSH(
port int,
user, password, keyFile, passphrase string,
sshTimeout uint64,
exeTimeout uint64,
sshType executor.SSHType,
defaultSSHType executor.SSHType,
) *Builder {
Expand All @@ -56,13 +57,21 @@ func (b *Builder) RootSSH(
keyFile: keyFile,
passphrase: passphrase,
timeout: sshTimeout,
exeTimeout: exeTimeout,
sshType: sshType,
})
return b
}

// UserSSH append a UserSSH task to the current task collection
func (b *Builder) UserSSH(host string, port int, deployUser string, sshTimeout uint64, sshType, defaultSSHType executor.SSHType) *Builder {
func (b *Builder) UserSSH(
host string,
port int,
deployUser string,
sshTimeout uint64,
exeTimeout uint64,
sshType, defaultSSHType executor.SSHType,
) *Builder {
if sshType == "" {
sshType = defaultSSHType
}
Expand All @@ -71,6 +80,7 @@ func (b *Builder) UserSSH(host string, port int, deployUser string, sshTimeout u
port: port,
deployUser: deployUser,
timeout: sshTimeout,
exeTimeout: exeTimeout,
sshType: sshType,
})
return b
Expand All @@ -86,7 +96,7 @@ func (b *Builder) Func(name string, fn func(ctx context.Context) error) *Builder
}

// ClusterSSH init all UserSSH need for the cluster.
func (b *Builder) ClusterSSH(spec spec.Topology, deployUser string, sshTimeout uint64, sshType, defaultSSHType executor.SSHType) *Builder {
func (b *Builder) ClusterSSH(spec spec.Topology, deployUser string, sshTimeout, exeTimeout uint64, sshType, defaultSSHType executor.SSHType) *Builder {
if sshType == "" {
sshType = defaultSSHType
}
Expand All @@ -98,6 +108,7 @@ func (b *Builder) ClusterSSH(spec spec.Topology, deployUser string, sshTimeout u
port: in.GetSSHPort(),
deployUser: deployUser,
timeout: sshTimeout,
exeTimeout: exeTimeout,
sshType: sshType,
})
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/cluster/task/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type RootSSH struct {
keyFile string // path to the private key file
passphrase string // passphrase of the private key file
timeout uint64 // timeout in seconds when connecting via SSH
exeTimeout uint64 // timeout in seconds waiting command to finish
sshType executor.SSHType // the type of SSH chanel
}

Expand All @@ -49,6 +50,7 @@ func (s *RootSSH) Execute(ctx context.Context) error {
KeyFile: s.keyFile,
Passphrase: s.passphrase,
Timeout: time.Second * time.Duration(s.timeout),
ExeTimeout: time.Second * time.Duration(s.exeTimeout),
})
if err != nil {
return err
Expand Down Expand Up @@ -77,18 +79,20 @@ type UserSSH struct {
host string
port int
deployUser string
timeout uint64
timeout uint64 // timeout in seconds when connecting via SSH
exeTimeout uint64 // timeout in seconds waiting command to finish
sshType executor.SSHType
}

// Execute implements the Task interface
func (s *UserSSH) Execute(ctx context.Context) error {
e, err := executor.New(s.sshType, false, executor.SSHConfig{
Host: s.host,
Port: s.port,
KeyFile: ctxt.GetInner(ctx).PrivateKeyPath,
User: s.deployUser,
Timeout: time.Second * time.Duration(s.timeout),
Host: s.host,
Port: s.port,
KeyFile: ctxt.GetInner(ctx).PrivateKeyPath,
User: s.deployUser,
Timeout: time.Second * time.Duration(s.timeout),
ExeTimeout: time.Second * time.Duration(s.exeTimeout),
})
if err != nil {
return err
Expand Down

0 comments on commit 5d1942a

Please sign in to comment.