Skip to content

Commit

Permalink
Merge branch 'slack-15.0' into bp-pr16852_slack-15.0
Browse files Browse the repository at this point in the history
  • Loading branch information
timvaillancourt authored Oct 10, 2024
2 parents 5c24c2c + 0f201a6 commit ecb2946
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 27 deletions.
44 changes: 27 additions & 17 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
streamModeTar = "tar"
xtrabackupBinaryName = "xtrabackup"
xtrabackupEngineName = "xtrabackup"
xtrabackupInfoFile = "xtrabackup_info"
xbstream = "xbstream"

// closeTimeout is the timeout for closing backup files after writing.
Expand Down Expand Up @@ -238,15 +239,22 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara
return true, nil
}

func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) {

func (be *XtrabackupEngine) backupFiles(
ctx context.Context,
params BackupParams,
bh backupstorage.BackupHandle,
backupFileName string,
numStripes int,
flavor string,
) (replicationPosition mysql.Position, finalErr error) {
backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName)
flagsToExec := []string{"--defaults-file=" + params.Cnf.Path,
"--backup",
"--socket=" + params.Cnf.SocketFile,
"--slave-info",
"--user=" + xtrabackupUser,
"--target-dir=" + params.Cnf.TmpDir,
"--extra-lsndir=" + params.Cnf.TmpDir,
}
if xtrabackupStreamMode != "" {
flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode)
Expand Down Expand Up @@ -345,27 +353,14 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
// the replication position. Note that if we don't read stderr as we go, the
// xtrabackup process gets blocked when the write buffer fills up.
stderrBuilder := &strings.Builder{}
posBuilder := &strings.Builder{}
stderrDone := make(chan struct{})
go func() {
defer close(stderrDone)

scanner := bufio.NewScanner(backupErr)
capture := false
for scanner.Scan() {
line := scanner.Text()
params.Logger.Infof("xtrabackup stderr: %s", line)

// Wait until we see the first line of the binlog position.
// Then capture all subsequent lines. We need multiple lines since
// the value we're looking for has newlines in it.
if !capture {
if !strings.Contains(line, "MySQL binlog position") {
continue
}
capture = true
}
fmt.Fprintln(posBuilder, line)
}
if err := scanner.Err(); err != nil {
params.Logger.Errorf("error reading from xtrabackup stderr: %v", err)
Expand Down Expand Up @@ -409,8 +404,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput))
}

posOutput := posBuilder.String()
replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger)
replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(params.Cnf.TmpDir, flavor, params.Logger)
if rerr != nil {
return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position")
}
Expand Down Expand Up @@ -694,6 +688,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
return nil
}

func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (mysql.Position, error) {
f, err := os.Open(path.Join(directory, xtrabackupInfoFile))
if err != nil {
return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT,
"couldn't open %q to read GTID position", path.Join(directory, xtrabackupInfoFile))
}
defer f.Close()

contents, err := io.ReadAll(f)
if err != nil {
return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name())
}

return findReplicationPosition(string(contents), flavor, logger)
}

var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`)

func findReplicationPosition(input, flavor string, logger logutil.Logger) (mysql.Position, error) {
Expand Down
46 changes: 36 additions & 10 deletions go/vt/mysqlctl/xtrabackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ import (
"bytes"
"io"
"math/rand"
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/logutil"
)

Expand Down Expand Up @@ -51,26 +55,48 @@ func TestFindReplicationPosition(t *testing.T) {
}
}

func TestFindReplicationPositionNoMatch(t *testing.T) {
func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) {
input := `tool_version = 8.0.35-30
binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,
1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,
47b59de1-b368-11e9-b48b-624401d35560:1-152981,
557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,
599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,
b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262'
format = xbstream
`
want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262"

tmp, err := os.MkdirTemp(t.TempDir(), "test")
assert.NoError(t, err)

f, err := os.Create(path.Join(tmp, xtrabackupInfoFile))
assert.NoError(t, err)
_, err = f.WriteString(input)
assert.NoError(t, err)
assert.NoError(t, f.Close())

pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger())
assert.NoError(t, err)
assert.Equal(t, want, pos.String())
}

func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) {
// Make sure failure to find a match triggers an error.
input := `nothing`

_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
if err == nil {
t.Fatalf("expected error from findReplicationPosition but got nil")
}
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
assert.Error(t, err)
}

func TestFindReplicationPositionEmptyMatch(t *testing.T) {
func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) {
// Make sure failure to find a match triggers an error.
input := `GTID of the last change '
'`

_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
if err == nil {
t.Fatalf("expected error from findReplicationPosition but got nil")
}
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
assert.Error(t, err)
}

func TestStripeRoundTrip(t *testing.T) {
Expand Down

0 comments on commit ecb2946

Please sign in to comment.