Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster/executor: implement native SCP download instead of cat #1382

Merged
merged 3 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/cluster/command/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func newPullCmd() *cobra.Command {

cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only exec on host with specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only exec on host with specified nodes")
cmd.Flags().IntVarP(&opt.Limit, "limit", "l", 0, "Limits the used bandwidth, specified in Kbit/s")

return cmd
}
Expand Down
4 changes: 2 additions & 2 deletions components/dm/ansible/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (im *Importer) fetchFile(ctx context.Context, host string, port int, fname

tmp = filepath.Join(tmp, filepath.Base(fname))

err = e.Transfer(ctx, fname, tmp, true /*download*/)
err = e.Transfer(ctx, fname, tmp, true /*download*/, 0)
if err != nil {
return nil, errors.Annotatef(err, "transfer %s from %s:%d", fname, host, port)
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (im *Importer) ScpSourceToMaster(ctx context.Context, topo *spec.Specificat
return errors.AddStack(err)
}

err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false)
err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false, 0)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions components/dm/ansible/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ func (g *executorGetter) Get(host string) ctxt.Executor {

// Transfer implements executor interface.
// Replace the deploy directory as the local one in testdata, so we can fetch it.
func (l *localExecutor) Transfer(ctx context.Context, src string, target string, download bool) error {
func (l *localExecutor) Transfer(ctx context.Context, src, target string, download bool, limit int) error {
mydeploy, err := filepath.Abs("./testdata/deploy_dir/" + l.host)
if err != nil {
return errors.AddStack(err)
}
src = strings.Replace(src, "/home/tidb/deploy", mydeploy, 1)
return l.Local.Transfer(ctx, src, target, download)
return l.Local.Transfer(ctx, src, target, download, 0)
}

func TestParseRunScript(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (i *MasterInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_dm-master.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down Expand Up @@ -176,7 +176,7 @@ func (i *MasterInstance) ScaleConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_dm-master.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down Expand Up @@ -265,7 +265,7 @@ func (i *WorkerInstance) InitConfig(
}
dst := filepath.Join(paths.Deploy, "scripts", "run_dm-worker.sh")

if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/cluster/ansible/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
inst.GetHost(),
inst.GetPort())),
inst.GetHost(),
true).
true,
0).
Build()
copyFileTasks = append(copyFileTasks, t)
case spec.ComponentTiFlash:
Expand All @@ -71,7 +72,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
inst.GetHost(),
inst.GetPort())),
inst.GetHost(),
true).
true,
0).
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+"-learner.toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand All @@ -80,7 +82,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, sshTimeout uint64, ssh
inst.GetHost(),
inst.GetPort())),
inst.GetHost(),
true).
true,
0).
Build()
copyFileTasks = append(copyFileTasks, t)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/ctxt/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type (
Execute(ctx context.Context, cmd string, sudo bool, timeout ...time.Duration) (stdout []byte, stderr []byte, err error)

// Transfer copies files from or to a target
Transfer(ctx context.Context, src string, dst string, download bool) error
Transfer(ctx context.Context, src, dst string, download bool, limit int) error
}

// ExecutorGetter get the executor by host.
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/executor/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ func (c *CheckPointExecutor) Execute(ctx context.Context, cmd string, sudo bool,
}

// Transfer implements Executer interface.
func (c *CheckPointExecutor) Transfer(ctx context.Context, src string, dst string, download bool) (err error) {
func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) (err error) {
point := checkpoint.Acquire(ctx, scpPoint, map[string]interface{}{
"host": c.config.Host,
"port": c.config.Port,
"user": c.config.User,
"src": src,
"dst": dst,
"download": download,
"limit": limit,
})
defer func() {
point.Release(err,
Expand All @@ -108,5 +109,5 @@ func (c *CheckPointExecutor) Transfer(ctx context.Context, src string, dst strin
return nil
}

return c.Executor.Transfer(ctx, src, dst, download)
return c.Executor.Transfer(ctx, src, dst, download, limit)
}
2 changes: 1 addition & 1 deletion pkg/cluster/executor/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (l *Local) Execute(ctx context.Context, cmd string, sudo bool, timeout ...t
}

// Transfer implements Executer interface.
func (l *Local) Transfer(ctx context.Context, src string, dst string, download bool) error {
func (l *Local) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
targetPath := filepath.Dir(dst)
if err := utils.CreateDir(targetPath); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/executor/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestLocal(t *testing.T) {
defer os.Remove(dst.Name())

// Transfer src to dst and check it.
err = local.Transfer(ctx, src.Name(), dst.Name(), false)
err = local.Transfer(ctx, src.Name(), dst.Name(), false, 0)
assert.Nil(err)

data, err := os.ReadFile(dst.Name())
Expand Down
129 changes: 129 additions & 0 deletions pkg/cluster/executor/scp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"bufio"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/pingcap/tiup/pkg/utils"
"golang.org/x/crypto/ssh"
)

// ScpDownload downloads a file from remote with SCP
// The implementation is partially inspired by github.com/dtylman/scp
func ScpDownload(session *ssh.Session, client *ssh.Client, src, dst string, limit int) error {
r, err := session.StdoutPipe()
if err != nil {
return err
}
bufr := bufio.NewReader(r)

w, err := session.StdinPipe()
if err != nil {
return err
}

copyF := func() error {
// parse SCP command
line, _, err := bufr.ReadLine()
if err != nil {
return err
}
if line[0] != byte('C') {
return fmt.Errorf("incorrect scp command '%b', should be 'C'", line[0])
}

mode, err := strconv.ParseUint(string(line[1:5]), 0, 32)
AstroProfundis marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error parsing file mode; %s", err)
}

// prepare dst file
targetPath := filepath.Dir(dst)
if err := utils.CreateDir(targetPath); err != nil {
return err
}
targetFile, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fs.FileMode(mode))
if err != nil {
return err
}
defer targetFile.Close()

size, err := strconv.Atoi(strings.Fields(string(line))[1])
if err != nil {
return err
}

if err := ack(w); err != nil {
return err
}

// transferring data
n, err := io.CopyN(targetFile, bufr, int64(size))
if err != nil {
return err
}
if n < int64(size) {
return fmt.Errorf("error downloading via scp, file size mismatch")
}
if err := targetFile.Sync(); err != nil {
return err
}

return ack(w)
}

copyErrC := make(chan error, 1)
go func() {
defer w.Close()
copyErrC <- copyF()
}()

remoteCmd := fmt.Sprintf("scp -f %s", src)
if limit > 0 {
remoteCmd = fmt.Sprintf("scp -l %d -f %s", limit, src)
}
err = session.Start(remoteCmd)
if err != nil {
return err
}
if err := ack(w); err != nil { // send an empty byte to start transfer
return err
}

err = <-copyErrC
if err != nil {
return err
}
return session.Wait()
}

func ack(w io.Writer) error {
msg := []byte("\x00")
n, err := w.Write(msg)
if err != nil {
return fmt.Errorf("fail to send response to remote: %s", err)
}
if n < len(msg) {
return fmt.Errorf("fail to send response to remote, size mismatch")
}
return nil
}
20 changes: 6 additions & 14 deletions pkg/cluster/executor/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (e *EasySSHExecutor) Execute(ctx context.Context, cmd string, sudo bool, ti
// This function depends on `scp` (a tool from OpenSSH or other SSH implementation)
// This function is based on easyssh.MakeConfig.Scp() but with support of copying
// file from remote to local.
func (e *EasySSHExecutor) Transfer(ctx context.Context, src string, dst string, download bool) error {
func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
if !download {
err := e.Config.Scp(src, dst)
if err != nil {
Expand All @@ -197,18 +197,7 @@ func (e *EasySSHExecutor) Transfer(ctx context.Context, src string, dst string,
defer client.Close()
defer session.Close()

targetPath := filepath.Dir(dst)
if err = utils.CreateDir(targetPath); err != nil {
return err
}
targetFile, err := os.Create(dst)
if err != nil {
return err
}

session.Stdout = targetFile

return session.Run(fmt.Sprintf("cat %s", src))
return ScpDownload(session, client, src, dst, limit)
}

func (e *NativeSSHExecutor) prompt(def string) string {
Expand Down Expand Up @@ -319,7 +308,7 @@ func (e *NativeSSHExecutor) Execute(ctx context.Context, cmd string, sudo bool,

// Transfer copies files via SCP
// This function depends on `scp` (a tool from OpenSSH or other SSH implementation)
func (e *NativeSSHExecutor) Transfer(ctx context.Context, src string, dst string, download bool) error {
func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
if e.ConnectionTestResult != nil {
return e.ConnectionTestResult
}
Expand All @@ -334,6 +323,9 @@ func (e *NativeSSHExecutor) Transfer(ctx context.Context, src string, dst string
}

args := []string{scp, "-r", "-o", "StrictHostKeyChecking=no"}
if limit > 0 {
args = append(args, "-l", fmt.Sprint(limit))
}
args = e.configArgs(args) // prefix and postfix args

if download {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/manager/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type TransferOptions struct {
Local string
Remote string
Pull bool // default to push
Limit int // rate limit in Kbit/s
}

// Transfer copies files from or to host in the tidb cluster.
Expand Down Expand Up @@ -93,9 +94,9 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio
for _, p := range i.Slice() {
t := task.NewBuilder()
if opt.Pull {
t.CopyFile(p, srcPath, host, opt.Pull)
t.CopyFile(p, srcPath, host, opt.Pull, opt.Limit)
} else {
t.CopyFile(srcPath, p, host, opt.Pull)
t.CopyFile(srcPath, p, host, opt.Pull, opt.Limit)
}
shellTasks = append(shellTasks, t.Build())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func GetNodeInfo(

dstDir := filepath.Join(dir, "bin")
dstPath := filepath.Join(dstDir, path.Base(srcPath))
err = exec.Transfer(nctx, srcPath, dstPath, false)
err = exec.Transfer(nctx, srcPath, dstPath, false, 0)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (i *AlertManagerInstance) InitConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_alertmanager.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (i *CDCInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_cdc.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (i *DrainerInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_drainer.sh")
if err := e.Transfer(ctx, fp, dst, false); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
return err
}

Expand Down
Loading