diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md
index 9421018bc9c..7d62c9c8b52 100644
--- a/changelog/20.0/20.0.0/summary.md
+++ b/changelog/20.0/20.0.0/summary.md
@@ -26,6 +26,7 @@
- **[Flag changes](#flag-changes)**
- [`pprof-http` default change](#pprof-http-default)
- [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag)
+ - [New minimum for `--buffer_min_time_between_failovers`](#buffer_min_time_between_failovers-flag)
- [New `track-udfs` vtgate flag](#vtgate-track-udfs-flag)
- **[Minor Changes](#minor-changes)**
- **[New Stats](#new-stats)**
@@ -214,6 +215,10 @@ To continue enabling these endpoints, explicitly set `--pprof-http` when startin
The new `--healthcheck-dial-concurrency` flag defines the maximum number of healthcheck connections that can open concurrently. This limit is to avoid hitting Go runtime panics on deployments watching enough tablets [to hit the runtime's maximum thread limit of `10000`](https://pkg.go.dev/runtime/debug#SetMaxThreads) due to blocking network syscalls. This flag applies to `vtcombo`, `vtctld` and `vtgate` only and a value less than the runtime max thread limit _(`10000`)_ is recommended.
+#### New minimum for `--buffer_min_time_between_failovers`
+
+The `--buffer_min_time_between_failovers` `vttablet` flag now has a minimum value of `1s`. This is because a value of 0 can cause issues with the buffering mechanics resulting in unexpected and unnecessary query errors — in particular during `MoveTables SwitchTraffic` operations. If you are currently specifying a value of 0 for this flag then you will need to update the config value to 1s *prior to upgrading to v20 or later* as `vttablet` will report an error and terminate if you attempt to start it with a value of 0.
+
#### New `--track-udfs` vtgate flag
The new `--track-udfs` flag enables VTGate to track user defined functions for better planning.
diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go
index b88e919e5ca..ce8c0bc916c 100644
--- a/go/test/endtoend/vreplication/cluster_test.go
+++ b/go/test/endtoend/vreplication/cluster_test.go
@@ -56,8 +56,9 @@ var (
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
- extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr,
- "--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr}
+ extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(),
+ "--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
+ "--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}
diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go
index b5310822d0c..63cd4934273 100644
--- a/go/test/endtoend/vreplication/helper_test.go
+++ b/go/test/endtoend/vreplication/helper_test.go
@@ -18,17 +18,18 @@ package vreplication
import (
"context"
- "crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
+ "math/rand"
"net/http"
"os"
"os/exec"
"regexp"
"sort"
"strings"
+ "sync"
"sync/atomic"
"testing"
"time"
@@ -121,9 +122,10 @@ func getConnectionNoError(t *testing.T, hostname string, port int) *mysql.Conn {
func getConnection(t *testing.T, hostname string, port int) *mysql.Conn {
vtParams := mysql.ConnParams{
- Host: hostname,
- Port: port,
- Uname: "vt_dba",
+ Host: hostname,
+ Port: port,
+ Uname: "vt_dba",
+ ConnectTimeoutMs: 1000,
}
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
@@ -803,92 +805,111 @@ func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
}
const (
- loadTestBufferingWindowDurationStr = "30s"
- loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr
- loadTestWaitForCancel = 30 * time.Second
- loadTestWaitBetweenQueries = 2 * time.Millisecond
+ loadTestBufferingWindowDuration = 10 * time.Second
+ loadTestAvgWaitBetweenQueries = 500 * time.Microsecond
+ loadTestDefaultConnections = 100
)
type loadGenerator struct {
- t *testing.T
- vc *VitessCluster
- ctx context.Context
- cancel context.CancelFunc
+ t *testing.T
+ vc *VitessCluster
+ ctx context.Context
+ cancel context.CancelFunc
+ connections int
+ wg sync.WaitGroup
}
func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator {
return &loadGenerator{
- t: t,
- vc: vc,
+ t: t,
+ vc: vc,
+ connections: loadTestDefaultConnections,
}
}
func (lg *loadGenerator) stop() {
- time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
+ // Wait for buffering to stop and additional records to be inserted by start
+ // after traffic is switched.
+ time.Sleep(loadTestBufferingWindowDuration * 2)
log.Infof("Canceling load")
lg.cancel()
- time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect
+ lg.wg.Wait()
}
func (lg *loadGenerator) start() {
t := lg.t
lg.ctx, lg.cancel = context.WithCancel(context.Background())
+ var connectionCount atomic.Int64
var id int64
- log.Infof("startLoad: starting")
+ log.Infof("loadGenerator: starting")
queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')"
var totalQueries, successfulQueries int64
var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64
+ lg.wg.Add(1)
defer func() {
-
- log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
+ defer lg.wg.Done()
+ log.Infof("loadGenerator: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
}()
- logOnce := true
for {
select {
case <-lg.ctx.Done():
- log.Infof("startLoad: context cancelled")
- log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
+ log.Infof("loadGenerator: context cancelled")
+ log.Infof("loadGenerator: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
require.Equal(t, int64(0), deniedErrors)
require.Equal(t, int64(0), otherErrors)
+ require.Equal(t, int64(0), reshardedErrors)
require.Equal(t, totalQueries, successfulQueries)
return
default:
- go func() {
- conn := vc.GetVTGateConn(t)
- defer conn.Close()
- atomic.AddInt64(&id, 1)
- query := fmt.Sprintf(queryTemplate, id, id)
- _, err := conn.ExecuteFetch(query, 1, false)
- atomic.AddInt64(&totalQueries, 1)
- if err != nil {
- sqlErr := err.(*sqlerror.SQLError)
- if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
- log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
- atomic.AddInt64(&deniedErrors, 1)
- } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
- // this can happen when a second keyspace is setup with the same tables, but there are no routing rules
- // set yet by MoveTables. So we ignore these errors.
- atomic.AddInt64(&ambiguousErrors, 1)
- } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
- atomic.AddInt64(&reshardedErrors, 1)
- } else if strings.Contains(strings.ToLower(err.Error()), "not found") {
- atomic.AddInt64(&tableNotFoundErrors, 1)
- } else {
- if logOnce {
- log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
- logOnce = false
+ if int(connectionCount.Load()) < lg.connections {
+ connectionCount.Add(1)
+ lg.wg.Add(1)
+ go func() {
+ defer lg.wg.Done()
+ defer connectionCount.Add(-1)
+ conn := vc.GetVTGateConn(t)
+ defer conn.Close()
+ for {
+ select {
+ case <-lg.ctx.Done():
+ return
+ default:
}
- atomic.AddInt64(&otherErrors, 1)
+ newID := atomic.AddInt64(&id, 1)
+ query := fmt.Sprintf(queryTemplate, newID, newID)
+ _, err := conn.ExecuteFetch(query, 1, false)
+ atomic.AddInt64(&totalQueries, 1)
+ if err != nil {
+ sqlErr := err.(*sqlerror.SQLError)
+ if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
+ if debugMode {
+ t.Logf("loadGenerator: denied tables error executing query: %d:%v", sqlErr.Number(), err)
+ }
+ atomic.AddInt64(&deniedErrors, 1)
+ } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
+ // This can happen when a second keyspace is setup with the same tables, but
+ // there are no routing rules set yet by MoveTables. So we ignore these errors.
+ atomic.AddInt64(&ambiguousErrors, 1)
+ } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
+ atomic.AddInt64(&reshardedErrors, 1)
+ } else if strings.Contains(strings.ToLower(err.Error()), "not found") {
+ atomic.AddInt64(&tableNotFoundErrors, 1)
+ } else {
+ if debugMode {
+ t.Logf("loadGenerator: error executing query: %d:%v", sqlErr.Number(), err)
+ }
+ atomic.AddInt64(&otherErrors, 1)
+ }
+ } else {
+ atomic.AddInt64(&successfulQueries, 1)
+ }
+ time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64())))
}
- time.Sleep(loadTestWaitBetweenQueries)
- } else {
- atomic.AddInt64(&successfulQueries, 1)
- }
- }()
- time.Sleep(loadTestWaitBetweenQueries)
+ }()
+ }
}
}
}
diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go
index a977320ec4a..f456c32bfd5 100644
--- a/go/test/endtoend/vreplication/movetables_buffering_test.go
+++ b/go/test/endtoend/vreplication/movetables_buffering_test.go
@@ -2,6 +2,7 @@ package vreplication
import (
"testing"
+ "time"
"github.com/stretchr/testify/require"
@@ -33,8 +34,12 @@ func TestMoveTablesBuffering(t *testing.T) {
catchup(t, targetTab2, workflowName, "MoveTables")
vdiffSideBySide(t, ksWorkflow, "")
waitForLowLag(t, "customer", workflowName)
- tstWorkflowSwitchReads(t, "", "")
- tstWorkflowSwitchWrites(t)
+ for i := 0; i < 10; i++ {
+ tstWorkflowSwitchReadsAndWrites(t)
+ time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
+ tstWorkflowReverseReadsAndWrites(t)
+ time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
+ }
log.Infof("SwitchWrites done")
lg.stop()
diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go
index 2f0c7c71d29..4236bff95a3 100644
--- a/go/test/endtoend/vreplication/partial_movetables_test.go
+++ b/go/test/endtoend/vreplication/partial_movetables_test.go
@@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"testing"
+ "time"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
@@ -67,10 +68,12 @@ func testCancel(t *testing.T) {
mt.SwitchReadsAndWrites()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, true)
+ time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
mt.ReverseReadsAndWrites()
checkDenyList(targetKeyspace, true)
checkDenyList(sourceKeyspace, false)
+ time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
mt.Cancel()
checkDenyList(targetKeyspace, false)
@@ -123,6 +126,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
catchup(t, targetTab80Dash, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mt.SwitchReadsAndWrites()
+ time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
mt.Complete()
emptyGlobalRoutingRules := "{}\n"
@@ -246,6 +250,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
// Switch all traffic for the shard
mt80Dash.SwitchReadsAndWrites()
+ time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
// Confirm global routing rules -- everything should still be routed
// to the source side, customer, globally.
@@ -331,6 +336,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
catchup(t, targetTabDash80, workflowName, "MoveTables")
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mtDash80.SwitchReadsAndWrites()
+ time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
// Confirm global routing rules: everything should still be routed
// to the source side, customer, globally.
diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go
index 014284ed5ee..9fa457c1589 100644
--- a/go/vt/discovery/keyspace_events.go
+++ b/go/vt/discovery/keyspace_events.go
@@ -21,6 +21,7 @@ import (
"fmt"
"sync"
+ "golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"vitess.io/vitess/go/vt/key"
@@ -93,18 +94,8 @@ func NewKeyspaceEventWatcher(ctx context.Context, topoServer srvtopo.Server, hc
return kew
}
-type MoveTablesStatus int
-
-const (
- MoveTablesUnknown MoveTablesStatus = iota
- // MoveTablesSwitching is set when the write traffic is the middle of being switched from the source to the target
- MoveTablesSwitching
- // MoveTablesSwitched is set when write traffic has been completely switched to the target
- MoveTablesSwitched
-)
-
// keyspaceState is the internal state for all the keyspaces that the KEW is
-// currently watching
+// currently watching.
type keyspaceState struct {
kew *KeyspaceEventWatcher
keyspace string
@@ -120,7 +111,7 @@ type keyspaceState struct {
moveTablesState *MoveTablesState
}
-// Format prints the internal state for this keyspace for debug purposes
+// Format prints the internal state for this keyspace for debug purposes.
func (kss *keyspaceState) Format(f fmt.State, verb rune) {
kss.mu.Lock()
defer kss.mu.Unlock()
@@ -137,9 +128,9 @@ func (kss *keyspaceState) Format(f fmt.State, verb rune) {
fmt.Fprintf(f, "]\n")
}
-// beingResharded returns whether this keyspace is thought to be in the middle of a resharding
-// operation. currentShard is the name of the shard that belongs to this keyspace and which
-// we are trying to access. currentShard can _only_ be a primary shard.
+// beingResharded returns whether this keyspace is thought to be in the middle of a
+// resharding operation. currentShard is the name of the shard that belongs to this
+// keyspace and which we are trying to access. currentShard can _only_ be a primary shard.
func (kss *keyspaceState) beingResharded(currentShard string) bool {
kss.mu.Lock()
defer kss.mu.Unlock()
@@ -179,11 +170,19 @@ type shardState struct {
currentPrimary *topodatapb.TabletAlias
}
-// Subscribe returns a channel that will receive any KeyspaceEvents for all keyspaces in the current cell
+// Subscribe returns a channel that will receive any KeyspaceEvents for all keyspaces in the
+// current cell.
func (kew *KeyspaceEventWatcher) Subscribe() chan *KeyspaceEvent {
kew.subsMu.Lock()
defer kew.subsMu.Unlock()
- c := make(chan *KeyspaceEvent, 2)
+ // Use a decent size buffer to:
+ // 1. Avoid blocking the KEW
+ // 2. While not losing/missing any events
+ // 3. And processing them in the order received
+ // TODO: do we care about intermediate events?
+ // If not, then we could instead e.g. pull the first/oldest event
+ // from the channel, discard it, and add the current/latest.
+ c := make(chan *KeyspaceEvent, 10)
kew.subs[c] = struct{}{}
return c
}
@@ -195,14 +194,11 @@ func (kew *KeyspaceEventWatcher) Unsubscribe(c chan *KeyspaceEvent) {
delete(kew.subs, c)
}
-func (kew *KeyspaceEventWatcher) broadcast(th *KeyspaceEvent) {
+func (kew *KeyspaceEventWatcher) broadcast(ev *KeyspaceEvent) {
kew.subsMu.Lock()
defer kew.subsMu.Unlock()
for c := range kew.subs {
- select {
- case c <- th:
- default:
- }
+ c <- ev
}
}
@@ -240,7 +236,8 @@ func (kew *KeyspaceEventWatcher) run(ctx context.Context) {
}
// ensureConsistentLocked checks if the current keyspace has recovered from an availability
-// event, and if so, returns information about the availability event to all subscribers
+// event, and if so, returns information about the availability event to all subscribers.
+// Note: you MUST be holding the ks.mu when calling this function.
func (kss *keyspaceState) ensureConsistentLocked() {
// if this keyspace is consistent, there's no ongoing availability event
if kss.consistent {
@@ -285,7 +282,8 @@ func (kss *keyspaceState) ensureConsistentLocked() {
}
}
- // clone the current moveTablesState, if any, to handle race conditions where it can get updated while we're broadcasting
+ // Clone the current moveTablesState, if any, to handle race conditions where it can get
+ // updated while we're broadcasting.
var moveTablesState MoveTablesState
if kss.moveTablesState != nil {
moveTablesState = *kss.moveTablesState
@@ -312,8 +310,8 @@ func (kss *keyspaceState) ensureConsistentLocked() {
Serving: sstate.serving,
})
- log.Infof("keyspace event resolved: %s/%s is now consistent (serving: %v)",
- sstate.target.Keyspace, sstate.target.Keyspace,
+ log.Infof("keyspace event resolved: %s is now consistent (serving: %t)",
+ topoproto.KeyspaceShardString(sstate.target.Keyspace, sstate.target.Shard),
sstate.serving,
)
@@ -325,9 +323,10 @@ func (kss *keyspaceState) ensureConsistentLocked() {
kss.kew.broadcast(ksevent)
}
-// onHealthCheck is the callback that updates this keyspace with event data from the HealthCheck stream.
-// the HealthCheck stream applies to all the keyspaces in the cluster and emits TabletHealth events to our
-// parent KeyspaceWatcher, which will mux them into their corresponding keyspaceState
+// onHealthCheck is the callback that updates this keyspace with event data from the HealthCheck
+// stream. The HealthCheck stream applies to all the keyspaces in the cluster and emits
+// TabletHealth events to our parent KeyspaceWatcher, which will mux them into their
+// corresponding keyspaceState.
func (kss *keyspaceState) onHealthCheck(th *TabletHealth) {
// we only care about health events on the primary
if th.Target.TabletType != topodatapb.TabletType_PRIMARY {
@@ -371,6 +370,17 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) {
kss.ensureConsistentLocked()
}
+type MoveTablesStatus int
+
+const (
+ MoveTablesUnknown MoveTablesStatus = iota
+ // MoveTablesSwitching is set when the write traffic is the middle of being switched from
+ // the source to the target.
+ MoveTablesSwitching
+ // MoveTablesSwitched is set when write traffic has been completely switched to the target.
+ MoveTablesSwitched
+)
+
type MoveTablesType int
const (
@@ -384,33 +394,66 @@ type MoveTablesState struct {
State MoveTablesStatus
}
+func (mts MoveTablesState) String() string {
+ var typ, state string
+ switch mts.Typ {
+ case MoveTablesRegular:
+ typ = "Regular"
+ case MoveTablesShardByShard:
+ typ = "ShardByShard"
+ default:
+ typ = "None"
+ }
+ switch mts.State {
+ case MoveTablesSwitching:
+ state = "Switching"
+ case MoveTablesSwitched:
+ state = "Switched"
+ default:
+ state = "Unknown"
+ }
+ return fmt.Sprintf("{Type: %s, State: %s}", typ, state)
+}
+
func (kss *keyspaceState) getMoveTablesStatus(vs *vschemapb.SrvVSchema) (*MoveTablesState, error) {
mtState := &MoveTablesState{
Typ: MoveTablesNone,
State: MoveTablesUnknown,
}
- // if there are no routing rules defined, then movetables is not in progress, exit early
+ // If there are no routing rules defined, then movetables is not in progress, exit early.
if len(vs.GetRoutingRules().GetRules()) == 0 && len(vs.GetShardRoutingRules().GetRules()) == 0 {
return mtState, nil
}
shortCtx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
- ts, _ := kss.kew.ts.GetTopoServer()
-
- // collect all current shard information from the topo
+ ts, err := kss.kew.ts.GetTopoServer()
+ if err != nil {
+ return mtState, err
+ }
+ // Collect all current shard information from the topo.
var shardInfos []*topo.ShardInfo
+ mu := sync.Mutex{}
+ eg, ectx := errgroup.WithContext(shortCtx)
for _, sstate := range kss.shards {
- si, err := ts.GetShard(shortCtx, kss.keyspace, sstate.target.Shard)
- if err != nil {
- return nil, err
- }
- shardInfos = append(shardInfos, si)
+ eg.Go(func() error {
+ si, err := ts.GetShard(ectx, kss.keyspace, sstate.target.Shard)
+ if err != nil {
+ return err
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ shardInfos = append(shardInfos, si)
+ return nil
+ })
+ }
+ if err := eg.Wait(); err != nil {
+ return mtState, err
}
- // check if any shard has denied tables and if so, record one of these to check where it currently points to
- // using the (shard) routing rules
+ // Check if any shard has denied tables and if so, record one of these to check where it
+ // currently points to using the (shard) routing rules.
var shardsWithDeniedTables []string
var oneDeniedTable string
for _, si := range shardInfos {
@@ -425,11 +468,11 @@ func (kss *keyspaceState) getMoveTablesStatus(vs *vschemapb.SrvVSchema) (*MoveTa
return mtState, nil
}
- // check if a shard by shard migration is in progress and if so detect if it has been switched
- isPartialTables := vs.ShardRoutingRules != nil && len(vs.ShardRoutingRules.Rules) > 0
+ // Check if a shard by shard migration is in progress and if so detect if it has been switched.
+ isPartialTables := vs.GetShardRoutingRules() != nil && len(vs.GetShardRoutingRules().GetRules()) > 0
if isPartialTables {
- srr := topotools.GetShardRoutingRulesMap(vs.ShardRoutingRules)
+ srr := topotools.GetShardRoutingRulesMap(vs.GetShardRoutingRules())
mtState.Typ = MoveTablesShardByShard
mtState.State = MoveTablesSwitched
for _, shard := range shardsWithDeniedTables {
@@ -440,31 +483,32 @@ func (kss *keyspaceState) getMoveTablesStatus(vs *vschemapb.SrvVSchema) (*MoveTa
break
}
}
- log.Infof("getMoveTablesStatus: keyspace %s declaring partial move tables %v", kss.keyspace, mtState)
+ log.Infof("getMoveTablesStatus: keyspace %s declaring partial move tables %s", kss.keyspace, mtState.String())
return mtState, nil
}
- // it wasn't a shard by shard migration, but since we have denied tables it must be a regular MoveTables
+ // It wasn't a shard by shard migration, but since we have denied tables it must be a
+ // regular MoveTables.
mtState.Typ = MoveTablesRegular
mtState.State = MoveTablesSwitching
- rr := topotools.GetRoutingRulesMap(vs.RoutingRules)
+ rr := topotools.GetRoutingRulesMap(vs.GetRoutingRules())
if rr != nil {
r, ok := rr[oneDeniedTable]
- // if a rule exists for the table and points to the target keyspace, writes have been switched
+ // If a rule exists for the table and points to the target keyspace, writes have been switched.
if ok && len(r) > 0 && r[0] != fmt.Sprintf("%s.%s", kss.keyspace, oneDeniedTable) {
mtState.State = MoveTablesSwitched
log.Infof("onSrvKeyspace:: keyspace %s writes have been switched for table %s, rule %v", kss.keyspace, oneDeniedTable, r[0])
}
}
- log.Infof("getMoveTablesStatus: keyspace %s declaring regular move tables %v", kss.keyspace, mtState)
+ log.Infof("getMoveTablesStatus: keyspace %s declaring regular move tables %s", kss.keyspace, mtState.String())
return mtState, nil
}
-// onSrvKeyspace is the callback that updates this keyspace with fresh topology data from our topology server.
-// this callback is called from a Watcher in the topo server whenever a change to the topology for this keyspace
-// occurs. this watcher is dedicated to this keyspace, and will only yield topology metadata changes for as
-// long as we're interested on this keyspace.
+// onSrvKeyspace is the callback that updates this keyspace with fresh topology data from our
+// topology server. this callback is called from a Watcher in the topo server whenever a change to
+// the topology for this keyspace occurs. This watcher is dedicated to this keyspace, and will
+// only yield topology metadata changes for as long as we're interested on this keyspace.
func (kss *keyspaceState) onSrvKeyspace(newKeyspace *topodatapb.SrvKeyspace, newError error) bool {
kss.mu.Lock()
defer kss.mu.Unlock()
@@ -478,23 +522,25 @@ func (kss *keyspaceState) onSrvKeyspace(newKeyspace *topodatapb.SrvKeyspace, new
return false
}
- // if there's another kind of error while watching this keyspace, we assume it's temporary and related
- // to the topology server, not to the keyspace itself. we'll keep waiting for more topology events.
+ // If there's another kind of error while watching this keyspace, we assume it's temporary and
+ // related to the topology server, not to the keyspace itself. we'll keep waiting for more
+ // topology events.
if newError != nil {
kss.lastError = newError
log.Errorf("error while watching keyspace %q: %v", kss.keyspace, newError)
return true
}
- // if the topology metadata for our keyspace is identical to the last one we saw there's nothing to do
- // here. this is a side-effect of the way ETCD watchers work.
+ // If the topology metadata for our keyspace is identical to the last one we saw there's nothing to
+ // do here. this is a side-effect of the way ETCD watchers work.
if proto.Equal(kss.lastKeyspace, newKeyspace) {
// no changes
return true
}
- // we only mark this keyspace as inconsistent if there has been a topology change in the PRIMARY for
- // this keyspace, but we store the topology metadata for both primary and replicas for future-proofing.
+ // we only mark this keyspace as inconsistent if there has been a topology change in the PRIMARY
+ // for this keyspace, but we store the topology metadata for both primary and replicas for
+ // future-proofing.
var oldPrimary, newPrimary *topodatapb.SrvKeyspace_KeyspacePartition
if kss.lastKeyspace != nil {
oldPrimary = topoproto.SrvKeyspaceGetPartition(kss.lastKeyspace, topodatapb.TabletType_PRIMARY)
@@ -525,20 +571,24 @@ func (kss *keyspaceState) isServing() bool {
// onSrvVSchema is called from a Watcher in the topo server whenever the SrvVSchema is updated by Vitess.
// For the purposes here, we are interested in updates to the RoutingRules or ShardRoutingRules.
-// In addition, the traffic switcher updates SrvVSchema when the DeniedTables attributes in a Shard record is
-// modified.
+// In addition, the traffic switcher updates SrvVSchema when the DeniedTables attributes in a Shard
+// record is modified.
func (kss *keyspaceState) onSrvVSchema(vs *vschemapb.SrvVSchema, err error) bool {
- // the vschema can be nil if the server is currently shutting down
+ // The vschema can be nil if the server is currently shutting down.
if vs == nil {
return true
}
kss.mu.Lock()
defer kss.mu.Unlock()
- kss.moveTablesState, _ = kss.getMoveTablesStatus(vs)
+ var kerr error
+ if kss.moveTablesState, kerr = kss.getMoveTablesStatus(vs); err != nil {
+ log.Errorf("onSrvVSchema: keyspace %s failed to get move tables status: %v", kss.keyspace, kerr)
+ }
if kss.moveTablesState != nil && kss.moveTablesState.Typ != MoveTablesNone {
- // mark the keyspace as inconsistent. ensureConsistentLocked() checks if the workflow is switched,
- // and if so, it will send an event to the buffering subscribers to indicate that buffering can be stopped.
+ // Mark the keyspace as inconsistent. ensureConsistentLocked() checks if the workflow is
+ // switched, and if so, it will send an event to the buffering subscribers to indicate that
+ // buffering can be stopped.
kss.consistent = false
kss.ensureConsistentLocked()
}
@@ -560,8 +610,9 @@ func newKeyspaceState(ctx context.Context, kew *KeyspaceEventWatcher, cell, keys
return kss
}
-// processHealthCheck is the callback that is called by the global HealthCheck stream that was initiated
-// by this KeyspaceEventWatcher. it redirects the TabletHealth event to the corresponding keyspaceState
+// processHealthCheck is the callback that is called by the global HealthCheck stream that was
+// initiated by this KeyspaceEventWatcher. It redirects the TabletHealth event to the
+// corresponding keyspaceState.
func (kew *KeyspaceEventWatcher) processHealthCheck(ctx context.Context, th *TabletHealth) {
kss := kew.getKeyspaceStatus(ctx, th.Target.Keyspace)
if kss == nil {
@@ -571,8 +622,8 @@ func (kew *KeyspaceEventWatcher) processHealthCheck(ctx context.Context, th *Tab
kss.onHealthCheck(th)
}
-// getKeyspaceStatus returns the keyspaceState object for the corresponding keyspace, allocating it
-// if we've never seen the keyspace before.
+// getKeyspaceStatus returns the keyspaceState object for the corresponding keyspace, allocating
+// it if we've never seen the keyspace before.
func (kew *KeyspaceEventWatcher) getKeyspaceStatus(ctx context.Context, keyspace string) *keyspaceState {
kew.mu.Lock()
defer kew.mu.Unlock()
@@ -612,15 +663,15 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar
}
// PrimaryIsNotServing checks if the reason why the given target is not accessible right now is
-// that the primary tablet for that shard is not serving. This is possible during a Planned Reparent Shard
-// operation. Just as the operation completes, a new primary will be elected, and it will send its own healthcheck
-// stating that it is serving. We should buffer requests until that point.
-// There are use cases where people do not run with a Primary server at all, so we must verify that
-// we only start buffering when a primary was present, and it went not serving.
-// The shard state keeps track of the current primary and the last externally reparented time, which we can use
-// to determine that there was a serving primary which now became non serving. This is only possible in a DemotePrimary
-// RPC which are only called from ERS and PRS. So buffering will stop when these operations succeed.
-// We return the tablet alias of the primary if it is serving.
+// that the primary tablet for that shard is not serving. This is possible during a Planned
+// Reparent Shard operation. Just as the operation completes, a new primary will be elected, and
+// it will send its own healthcheck stating that it is serving. We should buffer requests until
+// that point. There are use cases where people do not run with a Primary server at all, so we must
+// verify that we only start buffering when a primary was present, and it went not serving.
+// The shard state keeps track of the current primary and the last externally reparented time, which
+// we can use to determine that there was a serving primary which now became non serving. This is
+// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will
+// stop when these operations succeed. We return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
if target.TabletType != topodatapb.TabletType_PRIMARY {
return nil, false
@@ -632,7 +683,8 @@ func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target
ks.mu.Lock()
defer ks.mu.Unlock()
if state, ok := ks.shards[target.Shard]; ok {
- // If the primary tablet was present then externallyReparented will be non-zero and currentPrimary will be not nil
+ // If the primary tablet was present then externallyReparented will be non-zero and
+ // currentPrimary will be not nil.
return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
}
return nil, false
diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go
index af60479a42b..bcaf48b62a8 100644
--- a/go/vt/discovery/keyspace_events_test.go
+++ b/go/vt/discovery/keyspace_events_test.go
@@ -19,6 +19,8 @@ package discovery
import (
"context"
"encoding/hex"
+ "sync"
+ "sync/atomic"
"testing"
"time"
@@ -53,6 +55,67 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) {
require.True(t, kss.onSrvKeyspace(nil, nil))
}
+// TestKeyspaceEventConcurrency confirms that the keyspace event watcher
+// does not fail to broadcast received keyspace events to subscribers.
+// This verifies that no events are lost when there's a high number of
+// concurrent keyspace events.
+func TestKeyspaceEventConcurrency(t *testing.T) {
+ cell := "cell1"
+ factory := faketopo.NewFakeTopoFactory()
+ factory.AddCell(cell)
+ sts := &fakeTopoServer{}
+ hc := NewFakeHealthCheck(make(chan *TabletHealth))
+ defer hc.Close()
+ kew := &KeyspaceEventWatcher{
+ hc: hc,
+ ts: sts,
+ localCell: cell,
+ keyspaces: make(map[string]*keyspaceState),
+ subs: make(map[chan *KeyspaceEvent]struct{}),
+ }
+
+ // Subscribe to the watcher's broadcasted keyspace events.
+ receiver := kew.Subscribe()
+
+ updates := atomic.Uint32{}
+ updates.Store(0)
+ wg := sync.WaitGroup{}
+ concurrency := 100
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-receiver:
+ updates.Add(1)
+ }
+ }
+ }()
+ // Start up concurent go-routines that will broadcast keyspace events.
+ for i := 1; i <= concurrency; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ kew.broadcast(&KeyspaceEvent{})
+ }()
+ }
+ wg.Wait()
+ for {
+ select {
+ case <-ctx.Done():
+ require.Equal(t, concurrency, int(updates.Load()), "expected %d updates, got %d", concurrency, updates.Load())
+ return
+ default:
+ if int(updates.Load()) == concurrency { // Pass
+ cancel()
+ return
+ }
+ }
+ }
+}
+
// TestKeyspaceEventTypes confirms that the keyspace event watcher determines
// that the unavailability event is caused by the correct scenario. We should
// consider it to be caused by a resharding operation when the following
@@ -302,6 +365,26 @@ func (f *fakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace stri
return ks, nil
}
+// GetSrvVSchema returns the SrvVSchema for a cell.
+func (f *fakeTopoServer) GetSrvVSchema(ctx context.Context, cell string) (*vschemapb.SrvVSchema, error) {
+ vs := &vschemapb.SrvVSchema{
+ Keyspaces: map[string]*vschemapb.Keyspace{
+ "ks1": {
+ Sharded: true,
+ },
+ },
+ RoutingRules: &vschemapb.RoutingRules{
+ Rules: []*vschemapb.RoutingRule{
+ {
+ FromTable: "db1.t1",
+ ToTables: []string{"db1.t1"},
+ },
+ },
+ },
+ }
+ return vs, nil
+}
+
func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) {
ks, err := f.GetSrvKeyspace(ctx, cell, keyspace)
callback(ks, err)
@@ -311,5 +394,6 @@ func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace st
// the provided cell. It will call the callback when
// a new value or an error occurs.
func (f *fakeTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) {
-
+ sv, err := f.GetSrvVSchema(ctx, cell)
+ callback(sv, err)
}
diff --git a/go/vt/srvtopo/watch_srvvschema.go b/go/vt/srvtopo/watch_srvvschema.go
index 1b5536e623d..c758211375d 100644
--- a/go/vt/srvtopo/watch_srvvschema.go
+++ b/go/vt/srvtopo/watch_srvvschema.go
@@ -21,8 +21,9 @@ import (
"time"
"vitess.io/vitess/go/stats"
- vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
+
+ vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)
type SrvVSchemaWatcher struct {
diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go
index cdc9be44b21..2fc58d437ff 100644
--- a/go/vt/topo/etcd2topo/watch.go
+++ b/go/vt/topo/etcd2topo/watch.go
@@ -51,7 +51,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
}
wd := &topo.WatchData{
Contents: initial.Kvs[0].Value,
- Version: EtcdVersion(initial.Kvs[0].ModRevision),
+ Version: EtcdVersion(initial.Kvs[0].Version),
}
// Create an outer context that will be canceled on return and will cancel all inner watches.
@@ -76,7 +76,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
defer close(notifications)
defer outerCancel()
- var currVersion = initial.Header.Revision
+ var rev = initial.Header.Revision
var watchRetries int
for {
select {
@@ -107,9 +107,9 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
// Cancel inner context on retry and create new one.
watchCancel()
watchCtx, watchCancel = context.WithCancel(ctx)
- newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion))
+ newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(rev))
if newWatcher == nil {
- log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion)
+ log.Warningf("watch %v failed and get a nil channel returned, rev: %v", nodePath, rev)
} else {
watcher = newWatcher
}
@@ -126,7 +126,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
return
}
- currVersion = wresp.Header.GetRevision()
+ rev = wresp.Header.GetRevision()
for _, ev := range wresp.Events {
switch ev.Type {
@@ -174,7 +174,7 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa
var wd topo.WatchDataRecursive
wd.Path = string(kv.Key)
wd.Contents = kv.Value
- wd.Version = EtcdVersion(initial.Kvs[0].ModRevision)
+ wd.Version = EtcdVersion(initial.Kvs[0].Version)
initialwd = append(initialwd, &wd)
}
@@ -200,7 +200,7 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa
defer close(notifications)
defer outerCancel()
- var currVersion = initial.Header.Revision
+ var rev = initial.Header.Revision
var watchRetries int
for {
select {
@@ -228,9 +228,9 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa
watchCancel()
watchCtx, watchCancel = context.WithCancel(ctx)
- newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion), clientv3.WithPrefix())
+ newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(rev), clientv3.WithPrefix())
if newWatcher == nil {
- log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion)
+ log.Warningf("watch %v failed and get a nil channel returned, rev: %v", nodePath, rev)
} else {
watcher = newWatcher
}
@@ -247,7 +247,7 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa
return
}
- currVersion = wresp.Header.GetRevision()
+ rev = wresp.Header.GetRevision()
for _, ev := range wresp.Events {
switch ev.Type {
diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go
index c46d220a942..43890f1b4a7 100644
--- a/go/vt/vtctl/workflow/server.go
+++ b/go/vt/vtctl/workflow/server.go
@@ -3039,7 +3039,9 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
return nil, err
}
if hasReplica || hasRdonly {
- if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, timeout, false, direction); err != nil {
+ // If we're going to switch writes immediately after then we don't need to
+ // rebuild the SrvVSchema here as we will do it after switching writes.
+ if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !hasPrimary /* rebuildSrvVSchema */, direction); err != nil {
return nil, err
}
log.Infof("Switch Reads done for workflow %s.%s", req.Keyspace, req.Workflow)
@@ -3094,7 +3096,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
}
// switchReads is a generic way of switching read traffic for a workflow.
-func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, state *State, timeout time.Duration, cancel bool, direction TrafficSwitchDirection) (*[]string, error) {
+func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, state *State, rebuildSrvVSchema bool, direction TrafficSwitchDirection) (*[]string, error) {
var roTabletTypes []topodatapb.TabletType
// When we are switching all traffic we also get the primary tablet type, which we need to
// filter out for switching reads.
@@ -3198,7 +3200,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
case ts.isPartialMigration:
ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.")
default:
- err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, direction)
+ err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, rebuildSrvVSchema, direction)
if err != nil {
return handleError("failed to switch read traffic for the tables", err)
}
diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go
index 59bb387cca5..fface2cdcc6 100644
--- a/go/vt/vtctl/workflow/switcher.go
+++ b/go/vt/vtctl/workflow/switcher.go
@@ -70,8 +70,8 @@ func (r *switcher) switchShardReads(ctx context.Context, cells []string, servedT
return r.ts.switchShardReads(ctx, cells, servedTypes, direction)
}
-func (r *switcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error {
- return r.ts.switchTableReads(ctx, cells, servedTypes, direction)
+func (r *switcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error {
+ return r.ts.switchTableReads(ctx, cells, servedTypes, rebuildSrvVSchema, direction)
}
func (r *switcher) startReverseVReplication(ctx context.Context) error {
diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go
index eb6ab80d351..c9c49729c32 100644
--- a/go/vt/vtctl/workflow/switcher_dry_run.go
+++ b/go/vt/vtctl/workflow/switcher_dry_run.go
@@ -84,7 +84,7 @@ func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string,
return nil
}
-func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error {
+func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error {
ks := dr.ts.TargetKeyspaceName()
if direction == DirectionBackward {
ks = dr.ts.SourceKeyspaceName()
@@ -96,6 +96,9 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string,
tables := strings.Join(dr.ts.Tables(), ",")
dr.drLog.Logf("Switch reads for tables [%s] to keyspace %s for tablet types [%s]", tables, ks, strings.Join(tabletTypes, ","))
dr.drLog.Logf("Routing rules for tables [%s] will be updated", tables)
+ if rebuildSrvVSchema {
+ dr.drLog.Logf("Serving VSchema will be rebuilt for the %s keyspace", ks)
+ }
return nil
}
diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go
index 82e3a2be357..fce3743984a 100644
--- a/go/vt/vtctl/workflow/switcher_interface.go
+++ b/go/vt/vtctl/workflow/switcher_interface.go
@@ -36,7 +36,7 @@ type iswitcher interface {
changeRouting(ctx context.Context) error
streamMigraterfinalize(ctx context.Context, ts *trafficSwitcher, workflows []string) error
startReverseVReplication(ctx context.Context) error
- switchTableReads(ctx context.Context, cells []string, servedType []topodatapb.TabletType, direction TrafficSwitchDirection) error
+ switchTableReads(ctx context.Context, cells []string, servedType []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error
switchShardReads(ctx context.Context, cells []string, servedType []topodatapb.TabletType, direction TrafficSwitchDirection) error
validateWorkflowHasCompleted(ctx context.Context) error
removeSourceTables(ctx context.Context, removalType TableRemovalType) error
diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go
index 31b8609232e..8cfc4b14fee 100644
--- a/go/vt/vtctl/workflow/traffic_switcher.go
+++ b/go/vt/vtctl/workflow/traffic_switcher.go
@@ -600,7 +600,7 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string,
return nil
}
-func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error {
+func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error {
log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction %d", strings.Join(cells, ","), servedTypes, direction)
rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer())
if err != nil {
@@ -632,7 +632,10 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string,
if err := topotools.SaveRoutingRules(ctx, ts.TopoServer(), rules); err != nil {
return err
}
- return ts.TopoServer().RebuildSrvVSchema(ctx, cells)
+ if rebuildSrvVSchema {
+ return ts.TopoServer().RebuildSrvVSchema(ctx, cells)
+ }
+ return nil
}
func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error {
diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go
index 622bb03b082..260fb272544 100644
--- a/go/vt/vtgate/buffer/buffer.go
+++ b/go/vt/vtgate/buffer/buffer.go
@@ -164,6 +164,10 @@ func New(cfg *Config) *Buffer {
}
}
+func (b *Buffer) GetConfig() *Config {
+ return b.config
+}
+
// WaitForFailoverEnd blocks until a pending buffering due to a failover for
// keyspace/shard is over.
// If there is no ongoing failover, "err" is checked. If it's caused by a
diff --git a/go/vt/vtgate/buffer/flags.go b/go/vt/vtgate/buffer/flags.go
index b45f10a6e38..01a3c33e869 100644
--- a/go/vt/vtgate/buffer/flags.go
+++ b/go/vt/vtgate/buffer/flags.go
@@ -70,6 +70,9 @@ func verifyFlags() error {
if bufferSize < 1 {
return fmt.Errorf("--buffer_size must be >= 1 (specified value: %d)", bufferSize)
}
+ if bufferMinTimeBetweenFailovers < 1*time.Second {
+ return fmt.Errorf("--buffer_min_time_between_failovers must be >= 1s (specified value: %v)", bufferMinTimeBetweenFailovers)
+ }
if bufferDrainConcurrency < 1 {
return fmt.Errorf("--buffer_drain_concurrency must be >= 1 (specified value: %d)", bufferDrainConcurrency)
diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go
index bb38dcd2caa..b0764c2ad91 100644
--- a/go/vt/vtgate/buffer/shard_buffer.go
+++ b/go/vt/vtgate/buffer/shard_buffer.go
@@ -25,14 +25,13 @@ import (
"time"
"vitess.io/vitess/go/vt/discovery"
-
- "vitess.io/vitess/go/vt/vtgate/errorsanitizer"
-
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
- topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
+ "vitess.io/vitess/go/vt/vtgate/errorsanitizer"
+
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
// bufferState represents the different states a shardBuffer object can be in.
diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go
index 0154da92657..cce717674d6 100644
--- a/go/vt/vtgate/executor_test.go
+++ b/go/vt/vtgate/executor_test.go
@@ -1317,7 +1317,7 @@ func TestExecutorAlterVSchemaKeyspace(t *testing.T) {
session := NewSafeSession(&vtgatepb.Session{TargetString: "@primary", Autocommit: true})
vschemaUpdates := make(chan *vschemapb.SrvVSchema, 2)
- executor.serv.WatchSrvVSchema(ctx, "aa", func(vschema *vschemapb.SrvVSchema, err error) bool {
+ executor.serv.WatchSrvVSchema(ctx, executor.cell, func(vschema *vschemapb.SrvVSchema, err error) bool {
vschemaUpdates <- vschema
return true
})
diff --git a/go/vt/vtgate/executor_vschema_ddl_test.go b/go/vt/vtgate/executor_vschema_ddl_test.go
index 1c2813a33c4..1c912ed0d62 100644
--- a/go/vt/vtgate/executor_vschema_ddl_test.go
+++ b/go/vt/vtgate/executor_vschema_ddl_test.go
@@ -17,26 +17,23 @@ limitations under the License.
package vtgate
import (
- "context"
"reflect"
"slices"
"testing"
"time"
- "vitess.io/vitess/go/test/utils"
-
- "vitess.io/vitess/go/vt/callerid"
- querypb "vitess.io/vitess/go/vt/proto/query"
- vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"vitess.io/vitess/go/sqltypes"
+ "vitess.io/vitess/go/test/utils"
+ "vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/vtgate/vschemaacl"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
-
+ querypb "vitess.io/vitess/go/vt/proto/query"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
+ vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
func waitForVindex(t *testing.T, ks, name string, watch chan *vschemapb.SrvVSchema, executor *Executor) (*vschemapb.SrvVSchema, *vschemapb.Vindex) {
@@ -426,9 +423,7 @@ func TestExecutorDropSequenceDDL(t *testing.T) {
_, err = executor.Execute(ctx, nil, "TestExecute", session, stmt, nil)
require.NoError(t, err)
- ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- if !waitForNewerVSchema(ctxWithTimeout, executor, ts) {
+ if !waitForNewerVSchema(ctx, executor, ts, 5*time.Second) {
t.Fatalf("vschema did not drop the sequene 'test_seq'")
}
@@ -464,9 +459,7 @@ func TestExecutorDropAutoIncDDL(t *testing.T) {
stmt = "alter vschema on test_table add auto_increment id using `db-name`.`test_seq`"
_, err = executor.Execute(ctx, nil, "TestExecute", session, stmt, nil)
require.NoError(t, err)
- ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- if !waitForNewerVSchema(ctxWithTimeout, executor, ts) {
+ if !waitForNewerVSchema(ctx, executor, ts, 5*time.Second) {
t.Fatalf("vschema did not update with auto_increment for 'test_table'")
}
ts = executor.VSchema().GetCreated()
@@ -480,9 +473,7 @@ func TestExecutorDropAutoIncDDL(t *testing.T) {
_, err = executor.Execute(ctx, nil, "TestExecute", session, stmt, nil)
require.NoError(t, err)
- ctxWithTimeout, cancel2 := context.WithTimeout(ctx, 5*time.Second)
- defer cancel2()
- if !waitForNewerVSchema(ctxWithTimeout, executor, ts) {
+ if !waitForNewerVSchema(ctx, executor, ts, 5*time.Second) {
t.Fatalf("vschema did not drop the auto_increment for 'test_table'")
}
if executor.vm.GetCurrentSrvVschema().Keyspaces[ks].Tables["test_table"].AutoIncrement != nil {
diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go
index 4e2c3bfea4c..199892842ee 100644
--- a/go/vt/vtgate/plan_execute.go
+++ b/go/vt/vtgate/plan_execute.go
@@ -24,20 +24,20 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
- querypb "vitess.io/vitess/go/vt/proto/query"
- vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"
+
+ querypb "vitess.io/vitess/go/vt/proto/query"
+ vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error
type txResult func(sqlparser.StatementType, *sqltypes.Result) error
-func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time) bool {
- timeout := 30 * time.Second
+func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time, timeout time.Duration) bool {
pollingInterval := 10 * time.Millisecond
waitCtx, cancel := context.WithTimeout(ctx, timeout)
ticker := time.NewTicker(pollingInterval)
@@ -48,7 +48,7 @@ func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated ti
case <-waitCtx.Done():
return false
case <-ticker.C:
- if e.VSchema().GetCreated().After(lastVSchemaCreated) {
+ if e.VSchema() != nil && e.VSchema().GetCreated().After(lastVSchemaCreated) {
return true
}
}
@@ -64,11 +64,11 @@ func (e *Executor) newExecute(
logStats *logstats.LogStats,
execPlan planExec, // used when there is a plan to execute
recResult txResult, // used when it's something simple like begin/commit/rollback/savepoint
-) error {
- // 1: Prepare before planning and execution
+) (err error) {
+ // 1: Prepare before planning and execution.
// Start an implicit transaction if necessary.
- err := e.startTxIfNecessary(ctx, safeSession)
+ err = e.startTxIfNecessary(ctx, safeSession)
if err != nil {
return err
}
@@ -79,21 +79,35 @@ func (e *Executor) newExecute(
query, comments := sqlparser.SplitMarginComments(sql)
- // 2: Parse and Validate query
+ // 2: Parse and Validate query.
stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
return err
}
- var lastVSchemaCreated time.Time
- vs := e.VSchema()
- lastVSchemaCreated = vs.GetCreated()
+ var (
+ vs = e.VSchema()
+ lastVSchemaCreated = vs.GetCreated()
+ result *sqltypes.Result
+ plan *engine.Plan
+ )
+
for try := 0; try < MaxBufferingRetries; try++ {
- if try > 0 && !vs.GetCreated().After(lastVSchemaCreated) {
- // There is a race due to which the executor's vschema may not have been updated yet.
- // Without a wait we fail non-deterministically since the previous vschema will not have the updated routing rules
- if waitForNewerVSchema(ctx, e, lastVSchemaCreated) {
+ if try > 0 && !vs.GetCreated().After(lastVSchemaCreated) { // We need to wait for a vschema update
+ // Without a wait we fail non-deterministically since the previous vschema will not have
+ // the updated routing rules.
+ // We retry MaxBufferingRetries-1 (2) times before giving up. How long we wait before each retry
+ // -- IF we don't see a newer vschema come in -- affects how long we retry in total and how quickly
+ // we retry the query and (should) succeed when the traffic switch fails or we otherwise hit the
+ // max buffer failover time without resolving the keyspace event and marking it as consistent.
+ // This calculation attemps to ensure that we retry at a sensible interval and number of times
+ // based on the buffering configuration. This way we should be able to perform the max retries
+ // within the given window of time for most queries and we should not end up waiting too long
+ // after the traffic switch fails or the buffer window has ended, retrying old queries.
+ timeout := e.resolver.scatterConn.gateway.buffer.GetConfig().MaxFailoverDuration / (MaxBufferingRetries - 1)
+ if waitForNewerVSchema(ctx, e, lastVSchemaCreated, timeout) {
vs = e.VSchema()
+ lastVSchemaCreated = vs.GetCreated()
}
}
@@ -102,16 +116,13 @@ func (e *Executor) newExecute(
return err
}
- // 3: Create a plan for the query
+ // 3: Create a plan for the query.
// If we are retrying, it is likely that the routing rules have changed and hence we need to
// replan the query since the target keyspace of the resolved shards may have changed as a
- // result of MoveTables. So we cannot reuse the plan from the first try.
- // When buffering ends, many queries might be getting planned at the same time. Ideally we
- // should be able to reuse plans once the first drained query has been planned. For now, we
- // punt on this and choose not to prematurely optimize since it is not clear how much caching
- // will help and if it will result in hard-to-track edge cases.
-
- var plan *engine.Plan
+ // result of MoveTables SwitchTraffic which does a RebuildSrvVSchema which in turn causes
+ // the vtgate to clear the cached plans when processing the new serving vschema.
+ // When buffering ends, many queries might be getting planned at the same time and we then
+ // take full advatange of the cached plan.
plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats)
execStart := e.logPlanningFinished(logStats, plan)
@@ -124,12 +135,12 @@ func (e *Executor) newExecute(
safeSession.ClearWarnings()
}
- // add any warnings that the planner wants to add
+ // Add any warnings that the planner wants to add.
for _, warning := range plan.Warnings {
safeSession.RecordWarning(warning)
}
- result, err := e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt)
+ result, err = e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt)
if err != nil {
return err
}
@@ -137,14 +148,14 @@ func (e *Executor) newExecute(
return recResult(plan.Type, result)
}
- // 4: Prepare for execution
+ // 4: Prepare for execution.
err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession)
if err != nil {
logStats.Error = err
return err
}
- // 5: Execute the plan and retry if needed
+ // 5: Execute the plan.
if plan.Instructions.NeedsTransaction() {
err = e.insideTransaction(ctx, safeSession, logStats,
func() error {
@@ -158,10 +169,39 @@ func (e *Executor) newExecute(
return err
}
+ // 6: Retry if needed.
rootCause := vterrors.RootCause(err)
if rootCause != nil && strings.Contains(rootCause.Error(), "enforce denied tables") {
log.V(2).Infof("Retry: %d, will retry query %s due to %v", try, query, err)
- lastVSchemaCreated = vs.GetCreated()
+ if try == 0 { // We are going to retry at least once
+ defer func() {
+ // Prevent any plan cache pollution from queries planned against the wrong keyspace during a MoveTables
+ // traffic switching operation.
+ if err != nil { // The error we're checking here is the return value from the newExecute function
+ cause := vterrors.RootCause(err)
+ if cause != nil && strings.Contains(cause.Error(), "enforce denied tables") {
+ // The executor's VSchemaManager clears the plan cache when it receives a new vschema via its
+ // SrvVSchema watcher (it calls executor.SaveVSchema() in its watch's subscriber callback). This
+ // happens concurrently with the KeyspaceEventWatcher also receiving the new vschema in its
+ // SrvVSchema watcher and in its subscriber callback processing it (which includes getting info
+ // on all shards from the topo), and eventually determining that the keyspace is consistent and
+ // ending the buffering window. So there's race with query retries such that a query could be
+ // planned against the wrong side just as the keyspace event is getting resolved and the buffers
+ // drained. Then that bad plan is the cached plan for the query until you do another
+ // topo.RebuildSrvVSchema/vtctldclient RebuildVSchemaGraph which then causes the VSchemaManager
+ // to clear the plan cache. It's essentially a race between the two SrvVSchema watchers and the
+ // work they do when a new one is received. If we DID a retry AND the last time we retried
+ // still encountered the error, we know that the plan used was 1) not valid/correct and going to
+ // the wrong side of the traffic switch as it failed with the denied tables error and 2) it will
+ // remain the plan in the cache if we do not clear the plans after it was added to to the cache.
+ // So here we clear the plan cache in order to prevent this scenario where the bad plan is
+ // cached indefinitely and re-used after the buffering window ends and the keyspace event is
+ // resolved.
+ e.ClearPlans()
+ }
+ }
+ }()
+ }
continue
}