Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing backup_pitr flaky tests via wait-for loop on topo reads #13781

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vtctlbackup

import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -52,7 +53,8 @@ const (
XtraBackup = iota
BuiltinBackup
Mysqlctld
timeout = time.Duration(60 * time.Second)
timeout = time.Duration(60 * time.Second)
topoConsistencyTimeout = 20 * time.Second
)

var (
Expand Down Expand Up @@ -1077,12 +1079,13 @@ func terminateRestore(t *testing.T) {

func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backups []string) {
replica := getReplica(t, replicaIndex)
numBackups := len(waitForNumBackups(t, -1))

err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias)
require.Nil(t, err)

backups, err = localCluster.ListBackups(shardKsName)
require.Nil(t, err)
backups = waitForNumBackups(t, numBackups+1)
require.NotEmpty(t, backups)

verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())

Expand Down Expand Up @@ -1203,7 +1206,38 @@ func TestReplicaFullBackup(t *testing.T, replicaIndex int) (manifest *mysqlctl.B
return readManifestFile(t, backupLocation)
}

// waitForNumBackups waits for GetBackups to list exactly the given expected number.
// If expectNumBackups < 0 then any response is considered valid
func waitForNumBackups(t *testing.T, expectNumBackups int) []string {
ctx, cancel := context.WithTimeout(context.Background(), topoConsistencyTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
if expectNumBackups < 0 {
// any result is valid
return backups
}
if len(backups) == expectNumBackups {
// what we waited for
return backups
}
assert.Less(t, len(backups), expectNumBackups)
select {
case <-ctx.Done():
assert.Failf(t, ctx.Err().Error(), "expected %d backups, got %d", expectNumBackups, len(backups))
return nil
case <-ticker.C:
}
}
}

func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
numBackups := len(waitForNumBackups(t, -1))
incrementalFromPosArg := "auto"
if !incrementalFromPos.IsZero() {
incrementalFromPosArg = replication.EncodePosition(incrementalFromPos)
Expand All @@ -1216,8 +1250,9 @@ func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incre
}
require.NoErrorf(t, err, "output: %v", output)

backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
backups := waitForNumBackups(t, numBackups+1)
require.NotEmptyf(t, backups, "output: %v", output)

verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())
backupName = backups[len(backups)-1]
backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName
Expand Down
24 changes: 20 additions & 4 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
{
name: "first incremental backup",
},
{
name: "fail1",
expectError: "no binary logs to backup",
},
{
name: "fail2",
expectError: "no binary logs to backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
Expand Down Expand Up @@ -170,10 +178,10 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
if tc.writeBeforeBackup {
InsertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// we wait for >1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
// Also, we give the replica a chance to catch up.
time.Sleep(postWriteSleepDuration)
// randomly flush binary logs 0, 1 or 2 times
FlushBinaryLogsOnReplica(t, 0, rand.Intn(3))
Expand Down Expand Up @@ -295,6 +303,14 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
{
name: "first incremental backup",
},
{
name: "fail1",
expectError: "no binary logs to backup",
},
{
name: "fail2",
expectError: "no binary logs to backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
Expand Down Expand Up @@ -333,10 +349,10 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
if tc.writeBeforeBackup {
insertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// we wait for >1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
// Also, we give the replica a chance to catch up.
time.Sleep(postWriteSleepDuration)
waitForReplica(t, 0)
rowsBeforeBackup := ReadRowsFromReplica(t, 0)
Expand Down
1 change: 1 addition & 0 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func Backup(ctx context.Context, params BackupParams) error {
if err != nil {
return vterrors.Wrap(err, "StartBackup failed")
}
params.Logger.Infof("Starting backup %v", bh.Name())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really want the backup process to print out the name of the backup. It's so useful. I'd also look into formalizing it into something that is machine readable rather than human readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach Would this have potential to create (a lot) of logging noise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbussink it's a single additional line per backup. A full backup usually has hundreds of log lines. An incremental backup usually has a 5-6 log lines. One extra line is nothing.


// Scope stats to selected backup engine.
beParams := params.Copy()
Expand Down
4 changes: 2 additions & 2 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
// everything that's ever been applied, and a subset of that is gtid_purged, which are the event no longer available in binary logs.
// When we consider Vitess incremental backups, what's important for us is "what's the GTIDSet that's true when this backup was taken,
// and which will be true when we restore this backup". The answer to this is the GTIDSet that includes the purged GTIDs.
// It's also nice for icnremental backups that are taken on _other_ tablets, so that they don't need to understand what exactly was purged
// It's also nice for incremental backups that are taken on _other_ tablets, so that they don't need to understand what exactly was purged
// on _this_ tablet. They don't care, all they want to know is "what GTIDSet can we get from this".
incrementalBackupToPosition.GTIDSet = incrementalBackupToPosition.GTIDSet.Union(gtidPurged.GTIDSet)
req := &mysqlctl.ReadBinlogFilesTimestampsRequest{}
Expand All @@ -345,7 +345,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
return false, vterrors.Wrapf(err, "reading timestamps from binlog files %v", binaryLogsToBackup)
}
if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" {
return false, vterrors.Wrapf(err, "empty binlog name in response. Request=%v, Response=%v", req, resp)
return false, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a logical error. The previous vterrors.Wrapf used to return nil, because err was nil. This change ensures to return a non-nil error.

}
incrDetails := &IncrementalBackupDetails{
FirstTimestamp: FormatRFC3339(logutil.ProtoToTime(resp.FirstTimestamp)),
Expand Down
109 changes: 56 additions & 53 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,60 @@ func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err er
return false, t, nil
}

// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly just refactored this function outside of ReadBinlogFilesTimestamps, but also changed a bit to ensure the termination of the mysqlbinlog process as I've found that in GitHub CI it might be slow to terminate and still holds on to the binlog file, which then conflicts with the next access to the file.

// either looks for the first such timestamp or the last.
func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) {
args := []string{binlogFile}
mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...)
mysqlbinlogCmd.Dir = mysqlbinlogDir
mysqlbinlogCmd.Env = mysqlbinlogEnv
log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd)
pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql
if err != nil {
return matchedTime, false, err
}
scanComplete := make(chan error)
intentionalKill := false
scan := func() {
defer close(scanComplete)
defer func() {
intentionalKill = true
mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released
}()
// Read line by line and process it
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
logEntry := scanner.Text()

found, t, err := parseBinlogEntryTimestamp(logEntry)
if err != nil {
scanComplete <- err
return
}
if found {
matchedTime = t
matchFound = true
}
if found && stopAtFirst {
// Found the first timestamp and it's all we need. We won't scan any further and so we should also
// kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe).
return
}
}
}
if err := mysqlbinlogCmd.Start(); err != nil {
return matchedTime, false, err
}
go scan()
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill {
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
}
if err := <-scanComplete; err != nil {
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ")
}
return matchedTime, matchFound, nil
}

// ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps
func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlctlpb.ReadBinlogFilesTimestampsRequest) (*mysqlctlpb.ReadBinlogFilesTimestampsResponse, error) {
if len(req.BinlogFileNames) == 0 {
Expand All @@ -1335,8 +1389,6 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
defer client.Close()
return client.ReadBinlogFilesTimestamps(ctx, req)
}
var mysqlbinlogCmd *exec.Cmd

dir, err := vtenv.VtMysqlRoot()
if err != nil {
return nil, err
Expand All @@ -1350,59 +1402,10 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
return nil, err
}

scanTimestamp := func(binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) {
args := []string{binlogFile}
mysqlbinlogCmd = exec.Command(mysqlbinlogName, args...)
mysqlbinlogCmd.Dir = dir
mysqlbinlogCmd.Env = env
log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd)
pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql
if err != nil {
return matchedTime, false, err
}
scanner := bufio.NewScanner(pipe)
scanComplete := make(chan error)
intentionalKill := false
scan := func() {
defer close(scanComplete)
// Read line by line and process it
for scanner.Scan() {
logEntry := scanner.Text()

found, t, err := parseBinlogEntryTimestamp(logEntry)
if err != nil {
scanComplete <- err
return
}
if found {
matchedTime = t
matchFound = true
}
if found && stopAtFirst {
// Found the first timestamp and it's all we need. We won't scan any further and so we should also
// kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe).
intentionalKill = true
mysqlbinlogCmd.Process.Kill()
return
}
}
}
if err := mysqlbinlogCmd.Start(); err != nil {
return matchedTime, false, err
}
go scan()
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill {
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
}
if err := <-scanComplete; err != nil {
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ")
}
return matchedTime, matchFound, nil
}
resp := &mysqlctlpb.ReadBinlogFilesTimestampsResponse{}
// Find first timestamp
for _, binlogFile := range req.BinlogFileNames {
t, found, err := scanTimestamp(binlogFile, true)
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return nil, err
}
Expand All @@ -1415,7 +1418,7 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
// Find last timestamp
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]
t, found, err := scanTimestamp(binlogFile, false)
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return nil, err
}
Expand Down