From 00c45fa17061a7997c1adfc2c9b689233cc89cf9 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 19 Jul 2023 11:30:51 +0200 Subject: [PATCH 1/9] Combined squashed commit Signed-off-by: Rohit Nayak Don't process SrvVSchemas in the sandbox topo watcher if it hasn't changed. Add wait while creating the test executor env which seems to fix some tests that started failing with this PR Signed-off-by: Rohit Nayak Hack it so that the additional vschema watcher is not run during vtgate tests to see if it unit tests work with that hack Signed-off-by: Rohit Nayak Make hack mode non-racy :-( Signed-off-by: Rohit Nayak Test buffering for 2->4 resharding Signed-off-by: Rohit Nayak Use kew's ctx instead of background. Use passed in vschema from the watcher instead of querying topo again for routing rules Signed-off-by: Rohit Nayak Fix bug in accessing vschema routing rules. Bypass hack to not run SrvVSchema watcher in tests Signed-off-by: Rohit Nayak Don't create the keyspace event watcher and honor the enable_buffer flag. Unskip skipped test since. Signed-off-by: Rohit Nayak Remove references to useCache: we don't need to evict from cache since we are waiting for the new vschema and that will have cleared the cache already Signed-off-by: Rohit Nayak Use basic vs simple everywhere for new workflow Signed-off-by: Rohit Nayak Add buffering test CI workflow Signed-off-by: Rohit Nayak Remove unused code Signed-off-by: Rohit Nayak Address review comments Signed-off-by: Rohit Nayak Don't retry if already in a session. Signed-off-by: Rohit Nayak Remove embedded ctx Signed-off-by: Rohit Nayak --- ...oend_vreplication_movetables_buffering.yml | 161 ++++++++++++++++++ doc/design-docs/VTGateBuffering.md | 60 +++++++ go/test/endtoend/vreplication/cluster_test.go | 5 +- go/test/endtoend/vreplication/config_test.go | 20 ++- .../vreplication/movetables_buffering_test.go | 123 +++++++++++++ .../vreplication/partial_movetables_test.go | 31 +++- .../resharding_workflows_v2_test.go | 53 +++++- .../vreplication/vreplication_test.go | 5 +- .../vreplication/vreplication_test_env.go | 10 +- go/vt/discovery/keyspace_events.go | 151 +++++++++++++++- go/vt/topotools/routing_rules.go | 50 ++++-- go/vt/vtgate/buffer/buffer.go | 55 +++++- go/vt/vtgate/buffer/flags.go | 8 + go/vt/vtgate/buffer/shard_buffer.go | 33 +++- go/vt/vtgate/buffer/variables.go | 1 + go/vt/vtgate/executor_framework_test.go | 6 + go/vt/vtgate/executor_select_test.go | 4 +- go/vt/vtgate/executor_test.go | 5 + go/vt/vtgate/plan_execute.go | 136 ++++++++++----- go/vt/vtgate/planbuilder/bypass.go | 1 - go/vt/vtgate/sandbox_test.go | 31 +++- go/vt/vtgate/tabletgateway.go | 18 +- go/vt/vtgate/vindexes/vschema.go | 16 ++ go/vt/vtgate/vindexes/vschema_test.go | 20 ++- go/vt/vtgate/vschema_manager_test.go | 3 + go/vt/wrangler/traffic_switcher.go | 11 +- test/ci_workflow_gen.go | 1 + test/config.json | 9 + 28 files changed, 921 insertions(+), 106 deletions(-) create mode 100644 .github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml create mode 100644 doc/design-docs/VTGateBuffering.md create mode 100644 go/test/endtoend/vreplication/movetables_buffering_test.go diff --git a/.github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml b/.github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml new file mode 100644 index 00000000000..ce8cc7edfae --- /dev/null +++ b/.github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml @@ -0,0 +1,161 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (vreplication_movetables_buffering) +on: [push, pull_request] +concurrency: + group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vreplication_movetables_buffering)') + cancel-in-progress: true + +permissions: read-all + +env: + LAUNCHABLE_ORGANIZATION: "vitess" + LAUNCHABLE_WORKSPACE: "vitess-app" + GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" + +jobs: + build: + name: Run endtoend tests on Cluster (vreplication_movetables_buffering) + runs-on: ubuntu-22.04 + + steps: + - name: Skip CI + run: | + if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then + echo "skipping CI due to the 'Skip CI' label" + exit 1 + fi + + - name: Check if workflow needs to be skipped + id: skip-workflow + run: | + skip='false' + if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then + skip='true' + fi + echo Skip ${skip} + echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT + + - name: Check out code + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: actions/checkout@v3 + + - name: Check for changes in relevant files + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: frouioui/paths-filter@main + id: changes + with: + token: '' + filters: | + end_to_end: + - 'go/**/*.go' + - 'test.go' + - 'Makefile' + - 'build.env' + - 'go.sum' + - 'go.mod' + - 'proto/*.proto' + - 'tools/**' + - 'config/**' + - 'bootstrap.sh' + - '.github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml' + + - name: Set up Go + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-go@v4 + with: + go-version: 1.20.5 + + - name: Set up python + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-python@v4 + + - name: Tune the OS + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + # Limit local port range to not use ports that overlap with server side + # ports that we listen on. + sudo sysctl -w net.ipv4.ip_local_port_range="22768 65535" + # Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio + echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf + sudo sysctl -p /etc/sysctl.conf + + - name: Get dependencies + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + + # Get key to latest MySQL repo + sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 467B942D3A79BD29 + # Setup MySQL 8.0 + wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.24-1_all.deb + echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections + sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config* + sudo apt-get update + # Install everything else we need, and configure + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata xz-utils libncurses5 + + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + # install JUnit report formatter + go install github.com/vitessio/go-junit-report@HEAD + + - name: Setup launchable dependencies + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' + run: | + # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up + pip3 install --user launchable~=1.0 > /dev/null + + # verify that launchable setup is all correct. + launchable verify || true + + # Tell Launchable about the build you are producing and testing + launchable record build --name "$GITHUB_RUN_ID" --no-commit-collection --source . + + - name: Run cluster endtoend test + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + timeout-minutes: 45 + run: | + # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file + # which musn't be more than 107 characters long. + export VTDATAROOT="/tmp/" + source build.env + + set -exo pipefail + + # Increase our open file descriptor limit as we could hit this + ulimit -n 65536 + cat <<-EOF>>./config/mycnf/mysql80.cnf + innodb_buffer_pool_dump_at_shutdown=OFF + innodb_buffer_pool_in_core_file=OFF + innodb_buffer_pool_load_at_startup=OFF + innodb_buffer_pool_size=64M + innodb_doublewrite=OFF + innodb_flush_log_at_trx_commit=0 + innodb_flush_method=O_DIRECT + innodb_numa_interleave=ON + innodb_adaptive_hash_index=OFF + sync_binlog=0 + sync_relay_log=0 + performance_schema=OFF + slow-query-log=OFF + EOF + + cat <<-EOF>>./config/mycnf/mysql80.cnf + binlog-transaction-compression=ON + EOF + + # run the tests however you normally do, then produce a JUnit XML file + eatmydata -- go run test.go -docker=false -follow -shard vreplication_movetables_buffering | tee -a output.txt | go-junit-report -set-exit-code > report.xml + + - name: Print test output and Record test result in launchable + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() + run: | + # send recorded tests to launchable + launchable record tests --build "$GITHUB_RUN_ID" go-test . || true + + # print test output + cat output.txt diff --git a/doc/design-docs/VTGateBuffering.md b/doc/design-docs/VTGateBuffering.md new file mode 100644 index 00000000000..a8adc88f5c8 --- /dev/null +++ b/doc/design-docs/VTGateBuffering.md @@ -0,0 +1,60 @@ +# Adding buffering to VTGate while switching traffic during a movetables operation + +## Current buffering support in VTGate + +VTGate currently supports buffering of queries during reparent and resharding operations. This is done by buffering the +failing queries in the tablet gateway layer in vtgate. When a query fails, the reason for the failure is checked. + +To assist in diagnosing the root cause a _KeyspaceEventWatcher_ (aka KEW) was introduced. This runs in a goroutine and +watches the SrvKeyspace: if there is a change to the keyspace partitions in the topo it is considered that there is a +resharding in progress. + +The buffering logic subscribes to the keyspace event watcher. + +Otherwise if there are no tables to serve from, based on the health check results, it is assumed that there is a cluster +event where either the primary is being reparented (for example, during a DML) or if the cluster is being restarted and +all tables are in the process of starting up. + +If either of these occurs, the _consistent_ flag is set to false for that keyspace. When that happens the keyspace +watcher checks, on every SrvKeyspace update, if the event has got resolved. This can happen when tablets are now +available (in case of a cluster event) or if the partition information indicates that resharding is complete. + +When that happens. the keyspace event watcher publishes an event that the keyspace is now consistent. The buffers are +then drained and the queries retried (at the tablet gateway). + +## Additional support for MoveTables + +### Background + +MoveTables does not affect the entire keyspace, just the tables being moved. So the KEW doesn't detect a cluster event +since the tablets are still available and shard partitions are unchanged. + +MoveTables moves tables from one keyspace to another. There are two flavors of MoveTables: one where the tables are +moved into all shards in the target keyspace. In Shard-By-Shard Migration user can specify a subset of shards to move +the tables into. + +These are the topo attributes that are affected during a MoveTables (regular or shard-by-shard): + +* DeniedTables in a shard's TabletControls. These are used to stop writes to the source keyspace for these tables. While + switching writes we first create these entries, wait for the target to catchup to the source (using gtid positions), + and then update the routing rules to point these tables to the target. +* RoutingRules (for regular movetables) and ShardRoutingRules (for shard by shard migration). Simplified, routing rules + are pointers for each table being moved to a keyspace. When a MoveTables is initiated, that keyspace is the source + keyspace. After traffic is switched the pointer is changed to point to the target keyspace. If routing rules + are specified, VTGate uses them to decide which keyspace to route each table in a query to. + +### Changes + +There are two main changes: +* The keyspace event watcher is enhanced to look at the topo attributes mentioned above. We need to watch the + SrvVSchema for changes to the Routing Rules. DeniedTables are only in the Shard records in the topo. To get around + that we change the traffic switcher to also rebuild SrvVSchema when DeniedTables are modified. This way we get + notified when that changes as well. +* The logic to start buffering needs to look for the "enforce denied tables" error that is thrown by the + vttablets when it tries to execute a query on a table being switched. +* We cannot use the current buffering logic which is at the tablet gateway: meaning the keyspace is + already fixed by the planner and cannot be changed in that layer. We need to add a new buffering logic at a higher + level (the _newExecute_ method) and always replan before retrying a query. This also means that we need to bypass + the plan cache while retrying. + + diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 12539b778de..63526aa450c 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -52,8 +52,9 @@ var ( sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName)) mainClusterConfig *ClusterConfig externalClusterConfig *ClusterConfig - extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms"} - extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} + extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", "10s", + "--buffer_size", "100000", "--buffer_min_time_between_failovers", "2m", "--buffer_max_failover_duration", "10s"} + extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} // This variable can be used within specific tests to alter vttablet behavior extraVTTabletArgs = []string{} diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 213ad0bcc75..dcae0f6a5bf 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -59,6 +59,7 @@ create table json_tbl (id int, j1 json, j2 json, primary key(id)); create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, mp multipoint, mls multilinestring, mpg multipolygon, gc geometrycollection, primary key(id)); create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id)); create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); +create table loadtest (id int, name varchar(256), primary key(id), key(name)); ` // These should always be ignored in vreplication internalSchema = ` @@ -76,6 +77,7 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); "product": {}, "merchant": {}, "orders": {}, + "loadtest": {}, "customer": {}, "customer_seq": { "type": "sequence" @@ -117,6 +119,14 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); } }, "tables": { + "loadtest": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, "customer": { "column_vindexes": [ { @@ -283,7 +293,15 @@ create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); } }, "tables": { - "customer": { + "loadtest": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "customer": { "column_vindexes": [ { "column": "cid", diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go new file mode 100644 index 00000000000..5a54f63b1b5 --- /dev/null +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -0,0 +1,123 @@ +package vreplication + +import ( + "context" + "fmt" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/wrangler" +) + +func TestMoveTablesBuffering(t *testing.T) { + defaultRdonly = 1 + vc = setupMinimalCluster(t) + defer vtgateConn.Close() + defer vc.TearDown(t) + + currentWorkflowType = wrangler.MoveTablesWorkflow + setupMinimalCustomerKeyspace(t) + tables := "loadtest" + err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, + tables, workflowActionCreate, "", "", "") + require.NoError(t, err) + waitForWorkflowState(t, vc, ksWorkflow, workflowStateRunning) + + loadCtx, cancelLoad := context.WithCancel(context.Background()) + go func() { + startLoad(t, loadCtx) + }() + time.Sleep(2 * time.Second) // wait for enough records to be inserted by startLoad + + catchup(t, targetTab1, workflowName, "MoveTables") + catchup(t, targetTab2, workflowName, "MoveTables") + vdiff1(t, ksWorkflow, "") + waitForLowLag(t, "customer", workflowName) + tstWorkflowSwitchReads(t, "", "") + tstWorkflowSwitchWrites(t) + log.Infof("SwitchWrites done") + stopLoad(t, cancelLoad) + + log.Infof("TestMoveTablesBuffering: done") + log.Flush() +} + +func stopLoad(t *testing.T, cancel context.CancelFunc) { + time.Sleep(11 * time.Second) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched + log.Infof("Canceling load") + cancel() + time.Sleep(2 * time.Second) // wait for cancel to take effect + log.Flush() + +} +func startLoad(t *testing.T, ctx context.Context) { + var id int64 + log.Infof("startLoad: starting") + queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')" + var totalQueries, successfulQueries int64 + var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64 + defer func() { + + log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) + }() + logOnce := true + for { + select { + case <-ctx.Done(): + log.Infof("startLoad: context cancelled") + log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) + require.Less(t, deniedErrors, int64(1)) + require.Less(t, otherErrors, int64(1)) + require.Equal(t, totalQueries, successfulQueries) + return + default: + go func() { + conn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer conn.Close() + atomic.AddInt64(&id, 1) + query := fmt.Sprintf(queryTemplate, id, id) + _, err := conn.ExecuteFetch(query, 1, false) + atomic.AddInt64(&totalQueries, 1) + if err != nil { + sqlErr := err.(*mysql.SQLError) + if strings.Contains(strings.ToLower(err.Error()), "denied tables") { + log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err) + atomic.AddInt64(&deniedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") { + // this can happen when a second keyspace is setup with the same tables, but there are no routing rules + // set yet by MoveTables. So we ignore these errors. + atomic.AddInt64(&ambiguousErrors, 1) + log.Flush() + //panic(err) + } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { + atomic.AddInt64(&reshardedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "not found") { + atomic.AddInt64(&tableNotFoundErrors, 1) + } else { + if logOnce { + log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err) + logOnce = false + } + atomic.AddInt64(&otherErrors, 1) + } + //log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + // totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) + //log.Flush() + //panic(err) + time.Sleep(2 * time.Millisecond) + } else { + atomic.AddInt64(&successfulQueries, 1) + } + }() + time.Sleep(2 * time.Millisecond) + } + } +} diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 1cba9a6b4f1..a76ffac740a 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -17,9 +17,11 @@ limitations under the License. package vreplication import ( + "context" "fmt" "strings" "testing" + "time" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" @@ -51,14 +53,14 @@ func TestPartialMoveTablesBasic(t *testing.T) { defer func() { extraVTGateArgs = origExtraVTGateArgs }() - vc = setupCluster(t) + vc = setupMinimalCluster(t) defer vtgateConn.Close() defer vc.TearDown(t) - setupCustomerKeyspace(t) + setupMinimalCustomerKeyspace(t) // Move customer table from unsharded product keyspace to // sharded customer keyspace. - createMoveTablesWorkflow(t, "customer") + createMoveTablesWorkflow(t, "customer,loadtest") tstWorkflowSwitchReadsAndWrites(t) tstWorkflowComplete(t) @@ -75,6 +77,10 @@ func TestPartialMoveTablesBasic(t *testing.T) { applyShardRoutingRules(t, emptyShardRoutingRules) require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t)) + runWithLoad := true + var cancelLoad context.CancelFunc + var loadCtx context.Context + // Now setup the customer2 keyspace so we can do a partial // move tables for one of the two shards: 80-. defaultRdonly = 0 @@ -88,8 +94,18 @@ func TestPartialMoveTablesBasic(t *testing.T) { // start the partial movetables for 80- err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, - "customer", workflowActionCreate, "", shard, "") + "customer,loadtest", workflowActionCreate, "", shard, "") require.NoError(t, err) + + if runWithLoad { // start load after routing rules are set, otherwise we end up with ambiguous tables + loadCtx, cancelLoad = context.WithCancel(context.Background()) + //defer func() { stopLoad(t, cancelLoad) }() + go func() { + startLoad(t, loadCtx) + }() + time.Sleep(2 * time.Second) // wait for enough records to be inserted by startLoad + } + targetTab1 = vc.getPrimaryTablet(t, targetKs, shard) catchup(t, targetTab1, wfName, "Partial MoveTables Customer to Customer2") vdiff1(t, ksWf, "") @@ -221,14 +237,14 @@ func TestPartialMoveTablesBasic(t *testing.T) { wfName = "partialDash80" shard = "-80" ksWf = fmt.Sprintf("%s.%s", targetKs, wfName) - // Start the partial movetables for -80, 80- has already been switched err = tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, - "customer", workflowActionCreate, "", shard, "") + "customer,loadtest", workflowActionCreate, "", shard, "") require.NoError(t, err) targetTab2 := vc.getPrimaryTablet(t, targetKs, shard) catchup(t, targetTab2, wfName, "Partial MoveTables Customer to Customer2: -80") vdiff1(t, ksWf, "") + // Switch all traffic for the shard require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "")) expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", @@ -243,6 +259,8 @@ func TestPartialMoveTablesBasic(t *testing.T) { // target side (customer2). require.Equal(t, postCutoverShardRoutingRules, getShardRoutingRules(t)) + stopLoad(t, cancelLoad) + // Cancel both reverse workflows (as we've done the cutover), which should // clean up both the global routing rules and the shard routing rules. for _, wf := range []string{"partialDash80", "partial80Dash"} { @@ -272,4 +290,5 @@ func TestPartialMoveTablesBasic(t *testing.T) { // Confirm that the shard routing rules are now gone. require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t)) + } diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index a55ad3047e1..b5a8b0a980f 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -429,7 +429,7 @@ func testMoveTablesV2Workflow(t *testing.T) { setupCustomerKeyspace(t) // The purge table should get skipped/ignored // If it's not then we'll get an error as the table doesn't exist in the vschema - createMoveTablesWorkflow(t, "customer,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") + createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") if !strings.Contains(lastOutput, "Workflow started successfully") { t.Fail() } @@ -639,21 +639,58 @@ func setupCustomer2Keyspace(t *testing.T) { c2shards := []string{"-80", "80-"} c2keyspace := "customer2" if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, c2keyspace, strings.Join(c2shards, ","), - customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 1200, nil); err != nil { + customerVSchema, customerSchema, 0, 0, 1200, nil); err != nil { t.Fatal(err) } for _, c2shard := range c2shards { err := cluster.WaitForHealthyShard(vc.VtctldClient, c2keyspace, c2shard) require.NoError(t, err) - if defaultReplicas > 0 { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", c2keyspace, c2shard), defaultReplicas, 30*time.Second)) - } - if defaultRdonly > 0 { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", c2keyspace, c2shard), defaultRdonly, 30*time.Second)) - } + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", c2keyspace, c2shard), 1, 30*time.Second)) } } +func setupMinimalCluster(t *testing.T) *VitessCluster { + cells := []string{"zone1"} + + vc = NewVitessCluster(t, "TestBasicVreplicationWorkflow", cells, mainClusterConfig) + require.NotNil(t, vc) + defaultCellName := "zone1" + allCellNames = defaultCellName + defaultCell = vc.Cells[defaultCellName] + + zone1 := vc.Cells["zone1"] + + vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil) + + vtgate = zone1.Vtgates[0] + require.NotNil(t, vtgate) + err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0") + require.NoError(t, err) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1, 30*time.Second)) + + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + verifyClusterHealth(t, vc) + insertInitialData(t) + + sourceTab = vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet + + return vc +} + +func setupMinimalCustomerKeyspace(t *testing.T) { + if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-", + customerVSchema, customerSchema, 0, 0, 200, nil); err != nil { + t.Fatal(err) + } + require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "-80")) + require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "80-")) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "-80"), 1, 30*time.Second)) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "80-"), 1, 30*time.Second)) + custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] + targetTab1 = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet + targetTab2 = custKs.Shards["80-"].Tablets["zone1-300"].Vttablet +} + func TestSwitchReadsWritesInAnyOrder(t *testing.T) { vc = setupCluster(t) defer vc.TearDown(t) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 35f7062d27d..f908d5c5150 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -328,6 +328,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string verifyClusterHealth(t, vc) insertInitialData(t) materializeRollup(t) + shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName, false) // the Lead and Lead-1 tables tested a specific case with binary sharding keys. Drop it now so that we don't @@ -623,6 +624,8 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { shardCustomer(t, true, []*Cell{cell1, cell2}, "alias", false) } +var queryErrorCount int64 + // testVStreamFrom confirms that the "vstream * from" endpoint is serving data func testVStreamFrom(t *testing.T, table string, expectedRowCount int) { ctx := context.Background() @@ -705,7 +708,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl defaultCell := cells[0] custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] - tables := "customer,Lead,Lead-1,db_order_test,geom_tbl,json_tbl,blüb_tbl,vdiff_order,reftable" + tables := "customer,loadtest,Lead,Lead-1,db_order_test,geom_tbl,json_tbl,blüb_tbl,vdiff_order,reftable" moveTablesAction(t, "Create", sourceCellOrAlias, workflow, sourceKs, targetKs, tables) customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index f3109b2123b..4500a98868c 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -19,14 +19,14 @@ package vreplication var dryRunResultsSwitchWritesCustomerShard = []string{ "Lock keyspace product", "Lock keyspace customer", - "Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order]:", + "Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]:", "/ Keyspace product, Shard 0 at Position", "Wait for VReplication on stopped streams to catchup for up to 30s", "Create reverse replication workflow p2c_reverse", "Create journal entries on source databases", - "Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order]", + "Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]", "Switch routing from keyspace product to keyspace customer", - "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order] will be updated", + "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", "Switch writes completed, freeze and delete vreplication streams on:", " tablet 200 ", " tablet 300 ", @@ -41,8 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ var dryRunResultsReadCustomerShard = []string{ "Lock keyspace product", - "Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]", - "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,reftable,vdiff_order] will be updated", + "Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]", + "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", "Unlock keyspace product", } diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 89a0fb23eac..bf5cfcdf1df 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -26,10 +26,12 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/sidecardb" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools" ) // KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents @@ -62,6 +64,9 @@ type KeyspaceEvent struct { // Shards is a list of all the shards in the keyspace, including their state after the event is resolved Shards []ShardEvent + + // MoveTablesState records the current state of an ongoing MoveTables workflow + MoveTablesState MoveTablesState } type ShardEvent struct { @@ -86,6 +91,16 @@ func NewKeyspaceEventWatcher(ctx context.Context, topoServer srvtopo.Server, hc return kew } +type MoveTablesStatus int + +const ( + MoveTablesUnknown MoveTablesStatus = iota + // MoveTablesSwitching is set when the write traffic is the middle of being switched from the source to the target + MoveTablesSwitching + // MoveTablesSwitched is set when write traffic has been completely switched to the target + MoveTablesSwitched +) + // keyspaceState is the internal state for all the keyspaces that the KEW is // currently watching type keyspaceState struct { @@ -99,6 +114,8 @@ type keyspaceState struct { lastError error lastKeyspace *topodatapb.SrvKeyspace shards map[string]*shardState + + moveTablesState *MoveTablesState } // Format prints the internal state for this keyspace for debug purposes @@ -134,6 +151,7 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool { // for all the known shards, try to find a primary shard besides the one we're trying to access // and which is currently healthy. if there are other healthy primaries in the keyspace, it means // we're in the middle of a resharding operation + // FIXME: probably doesn't work for anything other than 1->2 resharding for shard, sstate := range kss.shards { if shard != currentShard && sstate.serving { return true @@ -218,6 +236,10 @@ func (kss *keyspaceState) ensureConsistentLocked() { return } + if kss.moveTablesState != nil && kss.moveTablesState.Typ != MoveTablesNone && kss.moveTablesState.State != MoveTablesSwitched { + return + } + // get the topology metadata for our primary from `lastKeyspace`; this value is refreshed // from our topology watcher whenever a change is detected, so it should always be up to date primary := topoproto.SrvKeyspaceGetPartition(kss.lastKeyspace, topodatapb.TabletType_PRIMARY) @@ -252,16 +274,25 @@ func (kss *keyspaceState) ensureConsistentLocked() { } } + // clone the current moveTablesState, if any, to handle race conditions where it can get updated while we're broadcasting + var moveTablesState MoveTablesState + if kss.moveTablesState != nil { + moveTablesState = *kss.moveTablesState + } + + ksevent := &KeyspaceEvent{ + Cell: kss.kew.localCell, + Keyspace: kss.keyspace, + Shards: make([]ShardEvent, 0, len(kss.shards)), + MoveTablesState: moveTablesState, + } + // we haven't found any inconsistencies between the HealthCheck stream and the topology // watcher. this means the ongoing availability event has been resolved, so we can broadcast // a resolution event to all listeners kss.consistent = true - ksevent := &KeyspaceEvent{ - Cell: kss.kew.localCell, - Keyspace: kss.keyspace, - Shards: make([]ShardEvent, 0, len(kss.shards)), - } + kss.moveTablesState = nil for shard, sstate := range kss.shards { ksevent.Shards = append(ksevent.Shards, ShardEvent{ @@ -329,6 +360,97 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { kss.ensureConsistentLocked() } +type MoveTablesType int + +const ( + MoveTablesNone MoveTablesType = iota + MoveTablesRegular + MoveTablesShardByShard +) + +type MoveTablesState struct { + Typ MoveTablesType + State MoveTablesStatus +} + +func (kss *keyspaceState) getMoveTablesStatus(vs *vschemapb.SrvVSchema) (*MoveTablesState, error) { + mtState := &MoveTablesState{ + Typ: MoveTablesNone, + State: MoveTablesUnknown, + } + + // if there are no routing rules defined, then movetables is not in progress, exit early + if (vs.RoutingRules != nil && len(vs.RoutingRules.Rules) == 0) && + (vs.ShardRoutingRules != nil && len(vs.ShardRoutingRules.Rules) == 0) { + return mtState, nil + } + + shortCtx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + ts, _ := kss.kew.ts.GetTopoServer() + + // collect all current shard information from the topo + var shardInfos []*topo.ShardInfo + for _, sstate := range kss.shards { + si, err := ts.GetShard(shortCtx, kss.keyspace, sstate.target.Shard) + if err != nil { + return nil, err + } + shardInfos = append(shardInfos, si) + } + + // check if any shard has denied tables and if so, record one of these to check where it currently points to + // using the (shard) routing rules + var shardsWithDeniedTables []string + var oneDeniedTable string + for _, si := range shardInfos { + for _, tc := range si.TabletControls { + if len(tc.DeniedTables) > 0 { + oneDeniedTable = tc.DeniedTables[0] + shardsWithDeniedTables = append(shardsWithDeniedTables, si.ShardName()) + } + } + } + if len(shardsWithDeniedTables) == 0 { + return mtState, nil + } + + // check if a shard by shard migration is in progress and if so detect if it has been switched + isPartialTables := vs.ShardRoutingRules != nil && len(vs.ShardRoutingRules.Rules) > 0 + + if isPartialTables { + srr := topotools.GetShardRoutingRulesMap(vs.ShardRoutingRules) + mtState.Typ = MoveTablesShardByShard + mtState.State = MoveTablesSwitched + for _, shard := range shardsWithDeniedTables { + ruleKey := topotools.GetShardRoutingRuleKey(kss.keyspace, shard) + if _, ok := srr[ruleKey]; ok { + // still pointing to the source shard + mtState.State = MoveTablesSwitching + break + } + } + log.Infof("getMoveTablesStatus: keyspace %s declaring partial move tables %v", kss.keyspace, mtState) + return mtState, nil + } + + // it wasn't a shard by shard migration, but since we have denied tables it must be a regular MoveTables + mtState.Typ = MoveTablesRegular + mtState.State = MoveTablesSwitching + rr := topotools.GetRoutingRulesMap(vs.RoutingRules) + if rr != nil { + r, ok := rr[oneDeniedTable] + // if a rule exists for the table and points to the target keyspace, writes have been switched + if ok && len(r) > 0 && r[0] != fmt.Sprintf("%s.%s", kss.keyspace, oneDeniedTable) { + mtState.State = MoveTablesSwitched + log.Infof("onSrvKeyspace:: keyspace %s writes have been switched for table %s, rule %v", kss.keyspace, oneDeniedTable, r[0]) + } + } + log.Infof("getMoveTablesStatus: keyspace %s declaring regular move tables %v", kss.keyspace, mtState) + + return mtState, nil +} + // onSrvKeyspace is the callback that updates this keyspace with fresh topology data from our topology server. // this callback is called from a Watcher in the topo server whenever a change to the topology for this keyspace // occurs. this watcher is dedicated to this keyspace, and will only yield topology metadata changes for as @@ -391,6 +513,23 @@ func (kss *keyspaceState) isServing() bool { return false } +// onSrvVSchema is called from a Watcher in the topo server whenever the SrvVSchema is updated by Vitess. +// For the purposes here, we are interested in updates to the RoutingRules or ShardRoutingRules. +// In addition, the traffic switcher updates SrvVSchema when the DeniedTables attributes in a Shard record is +// modified. +func (kss *keyspaceState) onSrvVSchema(vs *vschemapb.SrvVSchema, err error) bool { + kss.mu.Lock() + defer kss.mu.Unlock() + kss.moveTablesState, _ = kss.getMoveTablesStatus(vs) + if kss.moveTablesState != nil && kss.moveTablesState.Typ != MoveTablesNone { + // mark the keyspace as inconsistent. ensureConsistentLocked() checks if the workflow is switched, + // and if so, it will send an event to the buffering subscribers to indicate that buffering can be stopped. + kss.consistent = false + kss.ensureConsistentLocked() + } + return true +} + // newKeyspaceState allocates the internal state required to keep track of availability incidents // in this keyspace, and starts up a SrvKeyspace watcher on our topology server which will update // our keyspaceState with any topology changes in real time. @@ -402,6 +541,7 @@ func newKeyspaceState(kew *KeyspaceEventWatcher, cell, keyspace string) *keyspac shards: make(map[string]*shardState), } kew.ts.WatchSrvKeyspace(context.Background(), cell, keyspace, kss.onSrvKeyspace) + kew.ts.WatchSrvVSchema(context.Background(), cell, kss.onSrvVSchema) return kss } @@ -421,7 +561,6 @@ func (kew *KeyspaceEventWatcher) processHealthCheck(th *TabletHealth) { func (kew *KeyspaceEventWatcher) getKeyspaceStatus(keyspace string) *keyspaceState { kew.mu.Lock() defer kew.mu.Unlock() - kss := kew.keyspaces[keyspace] if kss == nil { kss = newKeyspaceState(kew, kew.localCell, keyspace) diff --git a/go/vt/topotools/routing_rules.go b/go/vt/topotools/routing_rules.go index 6dfa8b655ca..9eb64c936d7 100644 --- a/go/vt/topotools/routing_rules.go +++ b/go/vt/topotools/routing_rules.go @@ -27,6 +27,19 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) +//region routing rules + +func GetRoutingRulesMap(rules *vschemapb.RoutingRules) map[string][]string { + if rules == nil { + return nil + } + rulesMap := make(map[string][]string, len(rules.Rules)) + for _, rr := range rules.Rules { + rulesMap[rr.FromTable] = rr.ToTables + } + return rulesMap +} + // GetRoutingRules fetches routing rules from the topology server and returns a // mapping of fromTable=>[]toTables. func GetRoutingRules(ctx context.Context, ts *topo.Server) (map[string][]string, error) { @@ -35,10 +48,7 @@ func GetRoutingRules(ctx context.Context, ts *topo.Server) (map[string][]string, return nil, err } - rules := make(map[string][]string, len(rrs.Rules)) - for _, rr := range rrs.Rules { - rules[rr.FromTable] = rr.ToTables - } + rules := GetRoutingRulesMap(rrs) return rules, nil } @@ -59,6 +69,29 @@ func SaveRoutingRules(ctx context.Context, ts *topo.Server, rules map[string][]s return ts.SaveRoutingRules(ctx, rrs) } +//endregion + +//region shard routing rules + +func GetShardRoutingRuleKey(fromKeyspace, shard string) string { + return fmt.Sprintf("%s.%s", fromKeyspace, shard) +} +func ParseShardRoutingRuleKey(key string) (string, string) { + arr := strings.Split(key, ".") + return arr[0], arr[1] +} + +func GetShardRoutingRulesMap(rules *vschemapb.ShardRoutingRules) map[string]string { + if rules == nil { + return nil + } + rulesMap := make(map[string]string, len(rules.Rules)) + for _, rr := range rules.Rules { + rulesMap[GetShardRoutingRuleKey(rr.FromKeyspace, rr.Shard)] = rr.ToKeyspace + } + return rulesMap +} + // GetShardRoutingRules fetches shard routing rules from the topology server and returns a // mapping of fromKeyspace.Shard=>toKeyspace. func GetShardRoutingRules(ctx context.Context, ts *topo.Server) (map[string]string, error) { @@ -67,10 +100,7 @@ func GetShardRoutingRules(ctx context.Context, ts *topo.Server) (map[string]stri return nil, err } - rules := make(map[string]string, len(rrs.Rules)) - for _, rr := range rrs.Rules { - rules[fmt.Sprintf("%s.%s", rr.FromKeyspace, rr.Shard)] = rr.ToKeyspace - } + rules := GetShardRoutingRulesMap(rrs) return rules, nil } @@ -82,9 +112,7 @@ func SaveShardRoutingRules(ctx context.Context, ts *topo.Server, srr map[string] srs := &vschemapb.ShardRoutingRules{Rules: make([]*vschemapb.ShardRoutingRule, 0, len(srr))} for from, to := range srr { - arr := strings.Split(from, ".") - fromKeyspace := arr[0] - shard := arr[1] + fromKeyspace, shard := ParseShardRoutingRuleKey(from) srs.Rules = append(srs.Rules, &vschemapb.ShardRoutingRule{ FromKeyspace: fromKeyspace, ToKeyspace: to, diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 3db19f68b98..96630aca13e 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -28,6 +28,7 @@ package buffer import ( "context" + "strings" "sync" "golang.org/x/sync/semaphore" @@ -65,12 +66,61 @@ const ( // currently retried. type RetryDoneFunc context.CancelFunc +const ( + ClusterEventReshardingInProgress = "current keyspace is being resharded" + ClusterEventReparentInProgress = "primary is not serving, there may be a reparent operation in progress" + ClusterEventMoveTables = "disallowed due to rule" +) + +var ClusterEvents []string + +func init() { + ClusterEvents = []string{ + ClusterEventReshardingInProgress, + ClusterEventReparentInProgress, + ClusterEventMoveTables, + } +} + // CausedByFailover returns true if "err" was supposedly caused by a failover. // To simplify things, we've merged the detection for different MySQL flavors // in one function. Supported flavors: MariaDB, MySQL func CausedByFailover(err error) bool { log.V(2).Infof("Checking error (type: %T) if it is caused by a failover. err: %v", err, err) - return vterrors.Code(err) == vtrpcpb.Code_CLUSTER_EVENT + reason, isFailover := isFailoverError(err) + if isFailover { + log.Infof("CausedByFailover signalling failover for reason: %s", reason) + } + return isFailover +} + +// for debugging purposes +func getReason(err error) string { + for _, ce := range ClusterEvents { + if strings.Contains(err.Error(), ce) { + return ce + } + } + return "" +} + +// isFailoverError looks at the error returned by the sql query execution to check if there is a cluster event +// (caused by resharding or reparenting) or a denied tables error seen during switch writes in MoveTables +func isFailoverError(err error) (string, bool) { + var reason string + var isFailover bool + switch vterrors.Code(err) { + case vtrpcpb.Code_CLUSTER_EVENT: + isFailover = true + case vtrpcpb.Code_FAILED_PRECONDITION: // previous attempt, not used, to be removed + if strings.Contains(err.Error(), ClusterEventMoveTables) { + isFailover = true + } + } + if isFailover { + reason = getReason(err) + } + return reason, isFailover } // Buffer is used to track ongoing PRIMARY tablet failovers and buffer @@ -138,7 +188,6 @@ func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, requestsSkipped.Add([]string{keyspace, shard, skippedDisabled}, 1) return nil, nil } - return sb.waitForFailoverEnd(ctx, keyspace, shard, err) } @@ -146,7 +195,7 @@ func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) { for _, shard := range ksevent.Shards { sb := b.getOrCreateBuffer(shard.Target.Keyspace, shard.Target.Shard) if sb != nil { - sb.recordKeyspaceEvent(shard.Tablet, shard.Serving) + sb.recordKeyspaceEvent(shard.Tablet, shard.Serving, ksevent) } } } diff --git a/go/vt/vtgate/buffer/flags.go b/go/vt/vtgate/buffer/flags.go index 742a5d5d412..7fd99ee6b76 100644 --- a/go/vt/vtgate/buffer/flags.go +++ b/go/vt/vtgate/buffer/flags.go @@ -165,6 +165,14 @@ func NewDefaultConfig() *Config { } } +func EnableBuffering() { + bufferEnabled = true +} + +func DisableBuffering() { + bufferEnabled = false +} + func NewConfigFromFlags() *Config { if err := verifyFlags(); err != nil { log.Fatalf("Invalid buffer configuration: %v", err) diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index 1b829cb3ddd..06dc8db4954 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/vtgate/errorsanitizer" "vitess.io/vitess/go/vt/log" @@ -476,11 +478,12 @@ func (sb *shardBuffer) remove(toRemove *entry) { // Entry was already removed. Keep the queue as it is. } -func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillServing bool) { +func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillServing bool, keyspaceEvent *discovery.KeyspaceEvent) { sb.mu.Lock() defer sb.mu.Unlock() - log.Infof("disruption in shard %s/%s resolved (serving: %v)", sb.keyspace, sb.shard, stillServing) + log.Infof("disruption in shard %s/%s resolved (serving: %v), movetable state %#v", + sb.keyspace, sb.shard, stillServing, keyspaceEvent.MoveTablesState) if !topoproto.TabletAliasEqual(alias, sb.currentPrimary) { if sb.currentPrimary != nil { @@ -488,11 +491,26 @@ func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillS } sb.currentPrimary = alias } - if stillServing { - sb.stopBufferingLocked(stopFailoverEndDetected, "a primary promotion has been detected") - } else { - sb.stopBufferingLocked(stopShardMissing, "the keyspace has been resharded") + var reason stopReason + var msg string + + // heuristically determine the reason why vtgate is currently buffering + moveTablesSwitched := false + if keyspaceEvent.MoveTablesState.State == discovery.MoveTablesSwitched { + moveTablesSwitched = true + } + switch { + case moveTablesSwitched: + reason = stopMoveTablesSwitchingTraffic + msg = "MoveTables has switched writes" + case stillServing: + reason = stopFailoverEndDetected + msg = "a primary promotion has been detected" + default: + reason = stopShardMissing + msg = "the keyspace has been resharded" } + sb.stopBufferingLocked(reason, msg) } func (sb *shardBuffer) recordExternallyReparentedTimestamp(timestamp int64, alias *topodatapb.TabletAlias) { @@ -569,7 +587,8 @@ func (sb *shardBuffer) stopBufferingLocked(reason stopReason, details string) { if sb.mode == bufferModeDryRun { msg = "Dry-run: Would have stopped buffering" } - log.Infof("%v for shard: %s after: %.1f seconds due to: %v. Draining %d buffered requests now.", msg, topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d.Seconds(), details, len(q)) + log.Infof("%v for shard: %s after: %.1f seconds due to: %v. Draining %d buffered requests now.", + msg, topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d.Seconds(), details, len(q)) var clientEntryError error if reason == stopShardMissing { diff --git a/go/vt/vtgate/buffer/variables.go b/go/vt/vtgate/buffer/variables.go index b4b036b0775..73e982c343d 100644 --- a/go/vt/vtgate/buffer/variables.go +++ b/go/vt/vtgate/buffer/variables.go @@ -112,6 +112,7 @@ const ( stopFailoverEndDetected stopReason = "NewPrimarySeen" stopMaxFailoverDurationExceeded stopReason = "MaxDurationExceeded" stopShutdown stopReason = "Shutdown" + stopMoveTablesSwitchingTraffic stopReason = "MoveTablesSwitchedTraffic" ) // evictedReason is used in "requestsEvicted" as "Reason" label. diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 25b5e4e4183..14614c75b91 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -176,6 +176,9 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn primarySession = &vtgatepb.Session{ TargetString: "@primary", } + // FIXME: This sleep seems to fix a lot of tests that are failing due to the change in this PR. + // For now keeping the sleep to confirm in CI. We need to replace this by waiting for whatever race this fixes. + // time.Sleep(1 * time.Second) return executor, sbc1, sbc2, sbclookup } @@ -295,6 +298,9 @@ func assertQueries(t *testing.T, sbc *sandboxconn.SandboxConn, wantQueries []*qu got := query.Sql expected := wantQueries[idx].Sql assert.Equal(t, expected, got) + // FIXME: Bizarre behavior. The following log statement causes the test to pass. + // commenting the hack to surface the errors in CI, leaving the comment in, in case it can help debug the issue. + //log.Infof("\n%v\n%v", wantQueries[idx].BindVariables, query.BindVariables) assert.Equal(t, wantQueries[idx].BindVariables, query.BindVariables) idx++ } diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 91e102b6a49..39dc1d5105c 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -3011,6 +3011,8 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { } executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_Gen4) + // some sleep for all goroutines to start + time.Sleep(100 * time.Millisecond) before := runtime.NumGoroutine() query := "select id, col from user order by id limit 2" @@ -3867,7 +3869,7 @@ func TestSelectCFC(t *testing.T) { _, err := executor.Execute(context.Background(), nil, "TestSelectCFC", session, "select /*vt+ PLANNER=gen4 */ c2 from tbl_cfc where c1 like 'A%'", nil) require.NoError(t, err) - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) for { select { case <-timeout: diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index bf7f1eac211..4710cb8f53c 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -35,6 +35,8 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/vt/vtgate/buffer" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/cache" @@ -2074,6 +2076,9 @@ func TestExecutorClearsWarnings(t *testing.T) { // TestServingKeyspaces tests that the dual queries are routed to the correct keyspaces from the list of serving keyspaces. func TestServingKeyspaces(t *testing.T) { + buffer.EnableBuffering() + defer buffer.DisableBuffering() + executor, sbc1, _, sbclookup := createExecutorEnv() executor.pv = querypb.ExecuteOptions_Gen4 gw, ok := executor.resolver.resolver.GetGateway().(*TabletGateway) diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 6ec72d919cb..79ec01576db 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -18,10 +18,12 @@ package vtgate import ( "context" + "fmt" "strings" "time" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" @@ -34,6 +36,25 @@ import ( type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error type txResult func(sqlparser.StatementType, *sqltypes.Result) error +func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time) bool { + timeout := 5 * time.Second + pollingInterval := 10 * time.Millisecond + waitCtx, cancel := context.WithTimeout(ctx, timeout) + ticker := time.NewTicker(pollingInterval) + defer ticker.Stop() + defer cancel() + for { + select { + case <-waitCtx.Done(): + return false + case <-ticker.C: + if e.VSchema().GetCreated().After(lastVSchemaCreated) { + return true + } + } + } +} + func (e *Executor) newExecute( ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, @@ -57,10 +78,6 @@ func (e *Executor) newExecute( } query, comments := sqlparser.SplitMarginComments(sql) - vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) - if err != nil { - return err - } // 2: Parse and Validate query stmt, reservedVars, err := parseAndValidateQuery(query) @@ -68,47 +85,90 @@ func (e *Executor) newExecute( return err } - // 3: Create a plan for the query - plan, err := e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) - execStart := e.logPlanningFinished(logStats, plan) + // During MoveTables we need to replan since the routing rules change and hence the target keyspace will be different. + retrying := false + var lastVSchemaCreated time.Time + vs := e.VSchema() + lastVSchemaCreated = vs.GetCreated() + for try := 0; try < MaxBufferingRetries; try++ { + if try > 0 && !vs.GetCreated().After(lastVSchemaCreated) { + // There is a race due to which the executor's vschema may not have been updated yet. + // Without a wait we fail non-deterministically since the previous vschema will not have the updated routing rules + if waitForNewerVSchema(ctx, e, lastVSchemaCreated) { + vs = e.VSchema() + } + } - if err != nil { - safeSession.ClearWarnings() - return err - } + vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) + if err != nil { + return err + } - if plan.Type != sqlparser.StmtShow { - safeSession.ClearWarnings() - } + // 3: Create a plan for the query + var plan *engine.Plan + // If we are retrying, it is likely that the routing rules have changed and hence we need to replan the query since the target + // keyspace of the resolved shards may have changed. + plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) + execStart := e.logPlanningFinished(logStats, plan) - // add any warnings that the planner wants to add - for _, warning := range plan.Warnings { - safeSession.RecordWarning(warning) - } + if err != nil { + safeSession.ClearWarnings() + return err + } - result, err := e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt) - if err != nil { - return err - } - if result != nil { - return recResult(plan.Type, result) - } + if plan.Type != sqlparser.StmtShow { + safeSession.ClearWarnings() + } - // 3: Prepare for execution - err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession) - if err != nil { - logStats.Error = err - return err - } + // add any warnings that the planner wants to add + for _, warning := range plan.Warnings { + safeSession.RecordWarning(warning) + } - if plan.Instructions.NeedsTransaction() { - return e.insideTransaction(ctx, safeSession, logStats, - func() error { - return execPlan(ctx, plan, vcursor, bindVars, execStart) - }) - } + result, err := e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt) + if err != nil { + return err + } + if result != nil { + return recResult(plan.Type, result) + } + + // 4: Prepare for execution + err = e.addNeededBindVars(vcursor, plan.BindVarNeeds, bindVars, safeSession) + if err != nil { + logStats.Error = err + return err + } + + // 5: Execute the plan and retry if needed + if plan.Instructions.NeedsTransaction() { + err = e.insideTransaction(ctx, safeSession, logStats, + func() error { + return execPlan(ctx, plan, vcursor, bindVars, execStart) + }) + } else { + err = execPlan(ctx, plan, vcursor, bindVars, execStart) + } - return execPlan(ctx, plan, vcursor, bindVars, execStart) + if !safeSession.InTransaction() && err != nil { + rootCause := vterrors.RootCause(err) + if rootCause != nil && strings.Contains(rootCause.Error(), "enforce denied tables") { + log.Infof("%d:%t will retry query %s due to %v", try, retrying, query, err) + retrying = true + lastVSchemaCreated = vs.GetCreated() + continue + } else { + retrying = false + } + } else { + retrying = false + } + if err == nil && try > 0 { + log.Infof("query %d:%t succeeded on retry: %s", try, retrying, query) + } + return err + } + return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("query %s failed after retries: %v ", query, err)) } // handleTransactions deals with transactional queries: begin, commit, rollback and savepoint management diff --git a/go/vt/vtgate/planbuilder/bypass.go b/go/vt/vtgate/planbuilder/bypass.go index a5490e2231e..52286816a11 100644 --- a/go/vt/vtgate/planbuilder/bypass.go +++ b/go/vt/vtgate/planbuilder/bypass.go @@ -30,7 +30,6 @@ func buildPlanForBypass(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vsc if err != nil { return nil, err } - switch dest := vschema.Destination().(type) { case key.DestinationExactKeyRange: if _, ok := stmt.(*sqlparser.Insert); ok { diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index 4197e6ef231..91ae9589591 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -19,6 +19,8 @@ package vtgate import ( "context" "fmt" + "hash/fnv" + "strconv" "sync" "vitess.io/vitess/go/json2" @@ -284,6 +286,16 @@ func (sct *sandboxTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace str // panic("not supported: WatchSrvKeyspace") } +func hash(s string) uint32 { + h := fnv.New32a() + h.Write([]byte(s)) + return h.Sum32() +} + +func GetSrvVSchemaHash(vs *vschemapb.SrvVSchema) string { + return strconv.Itoa(int(hash(vs.String()))) +} + // WatchSrvVSchema is part of the srvtopo.Server interface. // // If the sandbox was created with a backing topo service, piggy back on it @@ -302,11 +314,24 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba if !callback(current.Value, nil) { panic("sandboxTopo callback returned false") } + currentHash := GetSrvVSchemaHash(current.Value) go func() { for { - update := <-updateChan - if !callback(update.Value, update.Err) { - panic("sandboxTopo callback returned false") + select { + case <-ctx.Done(): + return + case update := <-updateChan: + newHash := GetSrvVSchemaHash(update.Value) + if newHash == currentHash { + // sometimes we get the same update multiple times. This results in the plan cache to be cleared + // causing tests to fail. So we just ignore the duplicate updates. + continue + } + currentHash = newHash + if !callback(update.Value, update.Err) { + panic("sandboxTopo callback returned false") + } + } } }() diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 1bba8a6a2f1..dfeb99fa25f 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math/rand" + "runtime/debug" "sort" "sync" "sync/atomic" @@ -117,6 +118,10 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop func (gw *TabletGateway) setupBuffering(ctx context.Context) { cfg := buffer.NewConfigFromFlags() + if !cfg.Enabled { + log.Info("Query buffering is disabled") + return + } gw.buffer = buffer.New(cfg) gw.kev = discovery.NewKeyspaceEventWatcher(ctx, gw.srvTopoServer, gw.hc, gw.localCell) @@ -223,6 +228,7 @@ func (gw *TabletGateway) CacheStatus() TabletCacheStatusList { // withShardError should not be combined with withRetry. func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, _ queryservice.QueryService, _ string, inTransaction bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error { + // for transactions, we connect to a specific tablet instead of letting gateway choose one if inTransaction && target.TabletType != topodatapb.TabletType_PRIMARY { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "tabletGateway's query service can only be used for non-transactional queries on replicas") @@ -246,12 +252,11 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, bufferedOnce := false for i := 0; i < gw.retryCount+1; i++ { - // Check if we should buffer PRIMARY queries which failed due to an ongoing - // failover. + // Check if we should buffer PRIMARY queries which failed due to an ongoing failover. // Note: We only buffer once and only "!inTransaction" queries i.e. // a) no transaction is necessary (e.g. critical reads) or // b) no transaction was created yet. - if !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY { + if gw.buffer != nil && !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY { // The next call blocks if we should buffer during a failover. retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, err) @@ -267,6 +272,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, err = vterrors.Wrapf(bufferErr, "failed to automatically buffer and retry failed request during failover. original err (type=%T): %v", err, err) + log.Infof("%v", err) break } } @@ -277,12 +283,13 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, // or if a reparent operation is in progress. if kev := gw.kev; kev != nil { if kev.TargetIsBeingResharded(target) { - err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "current keyspace is being resharded") + log.Infof("current keyspace is being resharded, retrying: %s: %s", target.Keyspace, debug.Stack()) + err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReshardingInProgress) continue } primary, notServing := kev.PrimaryIsNotServing(target) if notServing { - err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "primary is not serving, there may be a reparent operation in progress") + err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReparentInProgress) continue } // if primary is serving, but we initially found no tablet, we're in an inconsistent state @@ -297,6 +304,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no healthy tablet available for '%s'", target.String()) break } + gw.shuffleTablets(gw.localCell, tablets) var th *discovery.TabletHealth diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 90becdf275f..c044c6a3151 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -23,6 +23,7 @@ import ( "os" "sort" "strings" + "time" "vitess.io/vitess/go/sqlescape" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -69,6 +70,9 @@ type VSchema struct { uniqueVindexes map[string]Vindex Keyspaces map[string]*KeyspaceSchema `json:"keyspaces"` ShardRoutingRules map[string]string `json:"shard_routing_rules"` + // created is the time when the VSchema object was created. Used to detect if a cached + // copy of the vschema is stale. + created time.Time } // RoutingRule represents one routing rule. @@ -259,6 +263,7 @@ func BuildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) { globalTables: make(map[string]*Table), uniqueVindexes: make(map[string]Vindex), Keyspaces: make(map[string]*KeyspaceSchema), + created: time.Now(), } buildKeyspaces(source, vschema) // buildGlobalTables before buildReferences so that buildReferences can @@ -1147,6 +1152,17 @@ func (vschema *VSchema) FindRoutedShard(keyspace, shard string) (string, error) return keyspace, nil } +// GetCreated returns the time when the VSchema was created. +func (vschema *VSchema) GetCreated() time.Time { + return vschema.created +} + +// ResetCreated resets the created time to zero value. +// Used only in tests where vschema protos are compared. +func (vschema *VSchema) ResetCreated() { + vschema.created = time.Time{} +} + // ByCost provides the interface needed for ColumnVindexes to // be sorted by cost order. type ByCost []*ColumnVindex diff --git a/go/vt/vtgate/vindexes/vschema_test.go b/go/vt/vtgate/vindexes/vschema_test.go index c7257e0a633..6a58810b064 100644 --- a/go/vt/vtgate/vindexes/vschema_test.go +++ b/go/vt/vtgate/vindexes/vschema_test.go @@ -234,6 +234,14 @@ func init() { Register("mcfu", newMCFU) } +func buildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) { + vs := BuildVSchema(source) + if vs != nil { + vs.ResetCreated() + } + return vs +} + func TestUnshardedVSchemaValid(t *testing.T) { _, err := BuildKeyspace(&vschemapb.Keyspace{ Sharded: false, @@ -1037,7 +1045,7 @@ func TestShardedVSchemaMultiColumnVindex(t *testing.T) { Columns: []string{"c1", "c2"}, Name: "stfu1"}}}}}}} - got := BuildVSchema(&good) + got := buildVSchema(&good) err := got.Keyspaces["sharded"].Error require.NoError(t, err) ks := &Keyspace{ @@ -1106,7 +1114,7 @@ func TestShardedVSchemaNotOwned(t *testing.T) { Name: "stlu1"}, { Column: "c2", Name: "stfu1"}}}}}}} - got := BuildVSchema(&good) + got := buildVSchema(&good) err := got.Keyspaces["sharded"].Error require.NoError(t, err) ks := &Keyspace{ @@ -1236,7 +1244,7 @@ func TestBuildVSchemaDupSeq(t *testing.T) { Name: "ksa"} ksb := &Keyspace{ Name: "ksb"} - got := BuildVSchema(&good) + got := buildVSchema(&good) t1a := &Table{ Name: sqlparser.NewIdentifierCS("t1"), Keyspace: ksa, @@ -1291,7 +1299,7 @@ func TestBuildVSchemaDupTable(t *testing.T) { }, }, } - got := BuildVSchema(&good) + got := buildVSchema(&good) ksa := &Keyspace{ Name: "ksa", } @@ -1383,7 +1391,7 @@ func TestBuildVSchemaDupVindex(t *testing.T) { }, }, } - got := BuildVSchema(&good) + got := buildVSchema(&good) err := got.Keyspaces["ksa"].Error err1 := got.Keyspaces["ksb"].Error require.NoError(t, err) @@ -1959,7 +1967,7 @@ func TestSequence(t *testing.T) { }, }, } - got := BuildVSchema(&good) + got := buildVSchema(&good) err := got.Keyspaces["sharded"].Error require.NoError(t, err) err1 := got.Keyspaces["unsharded"].Error diff --git a/go/vt/vtgate/vschema_manager_test.go b/go/vt/vtgate/vschema_manager_test.go index 4aa81442f38..a52338b0fff 100644 --- a/go/vt/vtgate/vschema_manager_test.go +++ b/go/vt/vtgate/vschema_manager_test.go @@ -100,6 +100,7 @@ func TestVSchemaUpdate(t *testing.T) { var vs *vindexes.VSchema vm.subscriber = func(vschema *vindexes.VSchema, _ *VSchemaStats) { vs = vschema + vs.ResetCreated() } for _, tcase := range tcases { t.Run(tcase.name, func(t *testing.T) { @@ -199,6 +200,7 @@ func TestRebuildVSchema(t *testing.T) { var vs *vindexes.VSchema vm.subscriber = func(vschema *vindexes.VSchema, _ *VSchemaStats) { vs = vschema + vs.ResetCreated() } for _, tcase := range tcases { t.Run(tcase.name, func(t *testing.T) { @@ -229,6 +231,7 @@ func makeTestVSchema(ks string, sharded bool, tbls map[string]*vindexes.Table) * } vs := makeTestEmptyVSchema() vs.Keyspaces[ks] = keyspaceSchema + vs.ResetCreated() return vs } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index e3f45283ee2..6be857ae620 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1159,7 +1159,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { } func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { - return ts.ForAllSources(func(source *workflow.MigrationSource) error { + err := ts.ForAllSources(func(source *workflow.MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { @@ -1174,6 +1174,14 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a } return err }) + if err != nil { + log.Warningf("Error in changeTableSourceWrites: %s", err) + return err + } + // Note that the denied tables, which are being updated in this method, are not part of the SrvVSchema in the topo. + // However, we are using the notification of a SrvVSchema change in VTGate to recompute the state of a + // MoveTables workflow (which also looks up denied tables from the topo). So we need to trigger a SrvVSchema change here. + return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } // executeLockTablesOnSource executes a LOCK TABLES tb1 READ, tbl2 READ,... statement on each @@ -1509,7 +1517,6 @@ func (ts *trafficSwitcher) changeWriteRoute(ctx context.Context) error { return err } } - return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index f61c9af472f..0d012895175 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -121,6 +121,7 @@ var ( "vreplication_v2", "vreplication_partial_movetables_basic", "vreplication_partial_movetables_sequences", + "vreplication_movetables_buffering", "schemadiff_vrepl", "topo_connection_cache", "vtgate_partial_keyspace", diff --git a/test/config.json b/test/config.json index fe18772714f..dd4d76b3d8c 100644 --- a/test/config.json +++ b/test/config.json @@ -1004,6 +1004,15 @@ "RetryMax": 0, "Tags": [] }, + "vreplication_movetables_buffering": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesBuffering"], + "Command": [], + "Manual": false, + "Shard": "vreplication_movetables_buffering", + "RetryMax": 0, + "Tags": [] + }, "vreplication_vschema_load": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVSchemaChangesUnderLoad"], From d3727cdb181dbd46b0aa740d1a794a69c41808b2 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 26 Jul 2023 11:07:04 +0200 Subject: [PATCH 2/9] Update workflow/traffic_switcher.go with changes in wrangler/traffic_swicher.go Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/traffic_switcher.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index e0482dedeb0..c73f0a31d6c 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -931,7 +931,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { } func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { - return ts.ForAllSources(func(source *MigrationSource) error { + err := ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { @@ -946,6 +946,14 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a } return err }) + if err != nil { + log.Warningf("Error in changeTableSourceWrites: %s", err) + return err + } + // Note that the denied tables, which are being updated in this method, are not part of the SrvVSchema in the topo. + // However, we are using the notification of a SrvVSchema change in VTGate to recompute the state of a + // MoveTables workflow (which also looks up denied tables from the topo). So we need to trigger a SrvVSchema change here. + return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { From 78ffcf4f1f0bef4a0e6871612aaabf504df21514 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 26 Jul 2023 11:24:30 +0200 Subject: [PATCH 3/9] Remove check for MinTimeBetweenFailovers Signed-off-by: Rohit Nayak --- go/vt/vtgate/buffer/flags.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go/vt/vtgate/buffer/flags.go b/go/vt/vtgate/buffer/flags.go index 7fd99ee6b76..11848372d25 100644 --- a/go/vt/vtgate/buffer/flags.go +++ b/go/vt/vtgate/buffer/flags.go @@ -70,9 +70,6 @@ func verifyFlags() error { if bufferSize < 1 { return fmt.Errorf("--buffer_size must be >= 1 (specified value: %d)", bufferSize) } - if bufferMinTimeBetweenFailovers < bufferMaxFailoverDuration*time.Duration(2) { - return fmt.Errorf("--buffer_min_time_between_failovers should be at least twice the length of --buffer_max_failover_duration: %v vs. %v", bufferMinTimeBetweenFailovers, bufferMaxFailoverDuration) - } if bufferDrainConcurrency < 1 { return fmt.Errorf("--buffer_drain_concurrency must be >= 1 (specified value: %d)", bufferDrainConcurrency) From cf7be29e281a759f131e9eceb54b203c18134c35 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 26 Jul 2023 12:32:53 +0200 Subject: [PATCH 4/9] Address review comments Signed-off-by: Rohit Nayak --- doc/design-docs/VTGateBuffering.md | 2 +- go/test/endtoend/vreplication/cluster_test.go | 4 ++-- go/test/endtoend/vreplication/movetables_buffering_test.go | 4 +++- go/vt/vtgate/buffer/shard_buffer.go | 6 +++--- go/vt/vtgate/buffer/variables.go | 4 ++++ go/vt/vtgate/executor_framework_test.go | 3 --- go/vt/vtgate/plan_execute.go | 2 +- 7 files changed, 14 insertions(+), 11 deletions(-) diff --git a/doc/design-docs/VTGateBuffering.md b/doc/design-docs/VTGateBuffering.md index a8adc88f5c8..e7e9987022d 100644 --- a/doc/design-docs/VTGateBuffering.md +++ b/doc/design-docs/VTGateBuffering.md @@ -7,7 +7,7 @@ failing queries in the tablet gateway layer in vtgate. When a query fails, the r To assist in diagnosing the root cause a _KeyspaceEventWatcher_ (aka KEW) was introduced. This runs in a goroutine and watches the SrvKeyspace: if there is a change to the keyspace partitions in the topo it is considered that there is a -resharding in progress. +resharding operation in progress. The buffering logic subscribes to the keyspace event watcher. diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 63526aa450c..688161cc5ad 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -52,8 +52,8 @@ var ( sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName)) mainClusterConfig *ClusterConfig externalClusterConfig *ClusterConfig - extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", "10s", - "--buffer_size", "100000", "--buffer_min_time_between_failovers", "2m", "--buffer_max_failover_duration", "10s"} + extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", "1m", + "--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", "1m"} extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} // This variable can be used within specific tests to alter vttablet behavior extraVTTabletArgs = []string{} diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index 5a54f63b1b5..6a275fc4e9b 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -27,7 +29,7 @@ func TestMoveTablesBuffering(t *testing.T) { err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, tables, workflowActionCreate, "", "", "") require.NoError(t, err) - waitForWorkflowState(t, vc, ksWorkflow, workflowStateRunning) + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) loadCtx, cancelLoad := context.WithCancel(context.Background()) go func() { diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index 06dc8db4954..321e9b90f73 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -502,13 +502,13 @@ func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillS switch { case moveTablesSwitched: reason = stopMoveTablesSwitchingTraffic - msg = "MoveTables has switched writes" + msg = stopMoveTablesSwitchingTrafficMessage case stillServing: reason = stopFailoverEndDetected - msg = "a primary promotion has been detected" + msg = stopFailoverEndDetectedMessage default: reason = stopShardMissing - msg = "the keyspace has been resharded" + msg = stopShardMissingMessage } sb.stopBufferingLocked(reason, msg) } diff --git a/go/vt/vtgate/buffer/variables.go b/go/vt/vtgate/buffer/variables.go index 73e982c343d..af99cb52220 100644 --- a/go/vt/vtgate/buffer/variables.go +++ b/go/vt/vtgate/buffer/variables.go @@ -113,6 +113,10 @@ const ( stopMaxFailoverDurationExceeded stopReason = "MaxDurationExceeded" stopShutdown stopReason = "Shutdown" stopMoveTablesSwitchingTraffic stopReason = "MoveTablesSwitchedTraffic" + + stopMoveTablesSwitchingTrafficMessage = "MoveTables has switched writes" + stopFailoverEndDetectedMessage = "a primary promotion has been detected" + stopShardMissingMessage = "the keyspace has been resharded" ) // evictedReason is used in "requestsEvicted" as "Reason" label. diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 14614c75b91..09968a7f76a 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -176,9 +176,6 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn primarySession = &vtgatepb.Session{ TargetString: "@primary", } - // FIXME: This sleep seems to fix a lot of tests that are failing due to the change in this PR. - // For now keeping the sleep to confirm in CI. We need to replace this by waiting for whatever race this fixes. - // time.Sleep(1 * time.Second) return executor, sbc1, sbc2, sbclookup } diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 79ec01576db..8c28148057c 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -37,7 +37,7 @@ type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bind type txResult func(sqlparser.StatementType, *sqltypes.Result) error func waitForNewerVSchema(ctx context.Context, e *Executor, lastVSchemaCreated time.Time) bool { - timeout := 5 * time.Second + timeout := 30 * time.Second pollingInterval := 10 * time.Millisecond waitCtx, cancel := context.WithTimeout(ctx, timeout) ticker := time.NewTicker(pollingInterval) From e1d053fc066cfd4faa41bde7ce7aa27f87a97805 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 26 Jul 2023 14:55:31 +0200 Subject: [PATCH 5/9] Address review comments. Move new buffering test to existing test shard, will keep it there if it is not flaky Signed-off-by: Rohit Nayak --- ...oend_vreplication_movetables_buffering.yml | 161 ------------------ go/test/endtoend/vreplication/cluster_test.go | 10 +- go/test/endtoend/vreplication/helper_test.go | 121 +++++++++++++ .../vreplication/movetables_buffering_test.go | 88 +--------- .../vreplication/partial_movetables_test.go | 15 +- test/config.json | 2 +- 6 files changed, 139 insertions(+), 258 deletions(-) delete mode 100644 .github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml diff --git a/.github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml b/.github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml deleted file mode 100644 index ce8cc7edfae..00000000000 --- a/.github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml +++ /dev/null @@ -1,161 +0,0 @@ -# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" - -name: Cluster (vreplication_movetables_buffering) -on: [push, pull_request] -concurrency: - group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vreplication_movetables_buffering)') - cancel-in-progress: true - -permissions: read-all - -env: - LAUNCHABLE_ORGANIZATION: "vitess" - LAUNCHABLE_WORKSPACE: "vitess-app" - GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" - -jobs: - build: - name: Run endtoend tests on Cluster (vreplication_movetables_buffering) - runs-on: ubuntu-22.04 - - steps: - - name: Skip CI - run: | - if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then - echo "skipping CI due to the 'Skip CI' label" - exit 1 - fi - - - name: Check if workflow needs to be skipped - id: skip-workflow - run: | - skip='false' - if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then - skip='true' - fi - echo Skip ${skip} - echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT - - - name: Check out code - if: steps.skip-workflow.outputs.skip-workflow == 'false' - uses: actions/checkout@v3 - - - name: Check for changes in relevant files - if: steps.skip-workflow.outputs.skip-workflow == 'false' - uses: frouioui/paths-filter@main - id: changes - with: - token: '' - filters: | - end_to_end: - - 'go/**/*.go' - - 'test.go' - - 'Makefile' - - 'build.env' - - 'go.sum' - - 'go.mod' - - 'proto/*.proto' - - 'tools/**' - - 'config/**' - - 'bootstrap.sh' - - '.github/workflows/cluster_endtoend_vreplication_movetables_buffering.yml' - - - name: Set up Go - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - uses: actions/setup-go@v4 - with: - go-version: 1.20.5 - - - name: Set up python - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - uses: actions/setup-python@v4 - - - name: Tune the OS - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - run: | - # Limit local port range to not use ports that overlap with server side - # ports that we listen on. - sudo sysctl -w net.ipv4.ip_local_port_range="22768 65535" - # Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio - echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf - sudo sysctl -p /etc/sysctl.conf - - - name: Get dependencies - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - run: | - - # Get key to latest MySQL repo - sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 467B942D3A79BD29 - # Setup MySQL 8.0 - wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.24-1_all.deb - echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections - sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config* - sudo apt-get update - # Install everything else we need, and configure - sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata xz-utils libncurses5 - - sudo service mysql stop - sudo service etcd stop - sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ - sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld - go mod download - - # install JUnit report formatter - go install github.com/vitessio/go-junit-report@HEAD - - - name: Setup launchable dependencies - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' - run: | - # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up - pip3 install --user launchable~=1.0 > /dev/null - - # verify that launchable setup is all correct. - launchable verify || true - - # Tell Launchable about the build you are producing and testing - launchable record build --name "$GITHUB_RUN_ID" --no-commit-collection --source . - - - name: Run cluster endtoend test - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - timeout-minutes: 45 - run: | - # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file - # which musn't be more than 107 characters long. - export VTDATAROOT="/tmp/" - source build.env - - set -exo pipefail - - # Increase our open file descriptor limit as we could hit this - ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf - innodb_buffer_pool_dump_at_shutdown=OFF - innodb_buffer_pool_in_core_file=OFF - innodb_buffer_pool_load_at_startup=OFF - innodb_buffer_pool_size=64M - innodb_doublewrite=OFF - innodb_flush_log_at_trx_commit=0 - innodb_flush_method=O_DIRECT - innodb_numa_interleave=ON - innodb_adaptive_hash_index=OFF - sync_binlog=0 - sync_relay_log=0 - performance_schema=OFF - slow-query-log=OFF - EOF - - cat <<-EOF>>./config/mycnf/mysql80.cnf - binlog-transaction-compression=ON - EOF - - # run the tests however you normally do, then produce a JUnit XML file - eatmydata -- go run test.go -docker=false -follow -shard vreplication_movetables_buffering | tee -a output.txt | go-junit-report -set-exit-code > report.xml - - - name: Print test output and Record test result in launchable - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() - run: | - # send recorded tests to launchable - launchable record tests --build "$GITHUB_RUN_ID" go-test . || true - - # print test output - cat output.txt diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 688161cc5ad..54ba2d3d0b0 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" @@ -52,8 +54,8 @@ var ( sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName)) mainClusterConfig *ClusterConfig externalClusterConfig *ClusterConfig - extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", "1m", - "--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", "1m"} + extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr, + "--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr} extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"} // This variable can be used within specific tests to alter vttablet behavior extraVTTabletArgs = []string{} @@ -731,6 +733,10 @@ func (vc *VitessCluster) getPrimaryTablet(t *testing.T, ksName, shardName string return nil } +func (vc *VitessCluster) GetVTGateConn(t *testing.T) *mysql.Conn { + return getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) +} + func (vc *VitessCluster) startQuery(t *testing.T, query string) (func(t *testing.T), func(t *testing.T)) { conn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) _, err := conn.ExecuteFetch("begin", 1000, false) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 79052307c7a..f237cd58754 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -29,6 +29,7 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -696,3 +697,123 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool mode := strings.ToLower(rs.Rows[0][0].ToString()) return mode == "noblob" } + +const ( + loadTestBufferingWindowDurationStr = "30s" + loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr + loadTestWaitForCancel = 30 * time.Second + loadTestWaitBetweenQueries = 2 * time.Millisecond +) + +type loadGenerator struct { + t *testing.T + vc *VitessCluster + ctx context.Context + cancel context.CancelFunc +} + +func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator { + return &loadGenerator{ + t: t, + vc: vc, + } +} + +func (lg *loadGenerator) stop() { + time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched + log.Infof("Canceling load") + lg.cancel() + time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect + log.Flush() + +} + +func (lg *loadGenerator) start() { + t := lg.t + lg.ctx, lg.cancel = context.WithCancel(context.Background()) + + var id int64 + log.Infof("startLoad: starting") + queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')" + var totalQueries, successfulQueries int64 + var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64 + defer func() { + + log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) + }() + logOnce := true + for { + select { + case <-lg.ctx.Done(): + log.Infof("startLoad: context cancelled") + log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", + deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) + require.Equal(t, int64(0), deniedErrors) + require.Equal(t, int64(0), otherErrors) + require.Equal(t, totalQueries, successfulQueries) + return + default: + go func() { + conn := vc.GetVTGateConn(t) + defer conn.Close() + atomic.AddInt64(&id, 1) + query := fmt.Sprintf(queryTemplate, id, id) + _, err := conn.ExecuteFetch(query, 1, false) + atomic.AddInt64(&totalQueries, 1) + if err != nil { + sqlErr := err.(*mysql.SQLError) + if strings.Contains(strings.ToLower(err.Error()), "denied tables") { + log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err) + atomic.AddInt64(&deniedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") { + // this can happen when a second keyspace is setup with the same tables, but there are no routing rules + // set yet by MoveTables. So we ignore these errors. + atomic.AddInt64(&ambiguousErrors, 1) + log.Flush() + //panic(err) + } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { + atomic.AddInt64(&reshardedErrors, 1) + } else if strings.Contains(strings.ToLower(err.Error()), "not found") { + atomic.AddInt64(&tableNotFoundErrors, 1) + } else { + if logOnce { + log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err) + logOnce = false + } + atomic.AddInt64(&otherErrors, 1) + } + time.Sleep(loadTestWaitBetweenQueries) + } else { + atomic.AddInt64(&successfulQueries, 1) + } + }() + time.Sleep(loadTestWaitBetweenQueries) + } + } +} + +func (lg *loadGenerator) waitForCount(want int64) { + t := lg.t + conn := vc.GetVTGateConn(t) + defer conn.Close() + timer := time.NewTimer(defaultTimeout) + defer timer.Stop() + for { + qr, err := conn.ExecuteFetch("select count(*) from loadtest", 1, false) + require.NoError(t, err) + require.NotNil(t, qr) + got, _ := qr.Rows[0][0].ToInt64() + + if int64(got) >= want { + return + } + select { + case <-timer.C: + require.FailNow(t, fmt.Sprintf("table %q did not reach the expected number of rows (%d) before the timeout of %s; last seen count: %v", + "loadtest", want, defaultTimeout, got)) + default: + time.Sleep(defaultTick) + } + } +} diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index 6a275fc4e9b..2dc2db7b909 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -1,18 +1,12 @@ package vreplication import ( - "context" - "fmt" - "strings" - "sync/atomic" "testing" - "time" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/wrangler" ) @@ -31,11 +25,11 @@ func TestMoveTablesBuffering(t *testing.T) { require.NoError(t, err) waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) - loadCtx, cancelLoad := context.WithCancel(context.Background()) + lg := newLoadGenerator(t, vc) go func() { - startLoad(t, loadCtx) + lg.start() }() - time.Sleep(2 * time.Second) // wait for enough records to be inserted by startLoad + lg.waitForCount(1000) catchup(t, targetTab1, workflowName, "MoveTables") catchup(t, targetTab2, workflowName, "MoveTables") @@ -44,82 +38,8 @@ func TestMoveTablesBuffering(t *testing.T) { tstWorkflowSwitchReads(t, "", "") tstWorkflowSwitchWrites(t) log.Infof("SwitchWrites done") - stopLoad(t, cancelLoad) + lg.stop() log.Infof("TestMoveTablesBuffering: done") log.Flush() } - -func stopLoad(t *testing.T, cancel context.CancelFunc) { - time.Sleep(11 * time.Second) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched - log.Infof("Canceling load") - cancel() - time.Sleep(2 * time.Second) // wait for cancel to take effect - log.Flush() - -} -func startLoad(t *testing.T, ctx context.Context) { - var id int64 - log.Infof("startLoad: starting") - queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')" - var totalQueries, successfulQueries int64 - var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64 - defer func() { - - log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", - totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) - }() - logOnce := true - for { - select { - case <-ctx.Done(): - log.Infof("startLoad: context cancelled") - log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", - deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) - require.Less(t, deniedErrors, int64(1)) - require.Less(t, otherErrors, int64(1)) - require.Equal(t, totalQueries, successfulQueries) - return - default: - go func() { - conn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) - defer conn.Close() - atomic.AddInt64(&id, 1) - query := fmt.Sprintf(queryTemplate, id, id) - _, err := conn.ExecuteFetch(query, 1, false) - atomic.AddInt64(&totalQueries, 1) - if err != nil { - sqlErr := err.(*mysql.SQLError) - if strings.Contains(strings.ToLower(err.Error()), "denied tables") { - log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err) - atomic.AddInt64(&deniedErrors, 1) - } else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") { - // this can happen when a second keyspace is setup with the same tables, but there are no routing rules - // set yet by MoveTables. So we ignore these errors. - atomic.AddInt64(&ambiguousErrors, 1) - log.Flush() - //panic(err) - } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { - atomic.AddInt64(&reshardedErrors, 1) - } else if strings.Contains(strings.ToLower(err.Error()), "not found") { - atomic.AddInt64(&tableNotFoundErrors, 1) - } else { - if logOnce { - log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err) - logOnce = false - } - atomic.AddInt64(&otherErrors, 1) - } - //log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d", - // totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors) - //log.Flush() - //panic(err) - time.Sleep(2 * time.Millisecond) - } else { - atomic.AddInt64(&successfulQueries, 1) - } - }() - time.Sleep(2 * time.Millisecond) - } - } -} diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index a76ffac740a..321d6afc6c1 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -17,11 +17,9 @@ limitations under the License. package vreplication import ( - "context" "fmt" "strings" "testing" - "time" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" @@ -78,8 +76,6 @@ func TestPartialMoveTablesBasic(t *testing.T) { require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t)) runWithLoad := true - var cancelLoad context.CancelFunc - var loadCtx context.Context // Now setup the customer2 keyspace so we can do a partial // move tables for one of the two shards: 80-. @@ -96,14 +92,13 @@ func TestPartialMoveTablesBasic(t *testing.T) { err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, "customer,loadtest", workflowActionCreate, "", shard, "") require.NoError(t, err) - + var lg *loadGenerator if runWithLoad { // start load after routing rules are set, otherwise we end up with ambiguous tables - loadCtx, cancelLoad = context.WithCancel(context.Background()) - //defer func() { stopLoad(t, cancelLoad) }() + lg = newLoadGenerator(t, vc) go func() { - startLoad(t, loadCtx) + lg.start() }() - time.Sleep(2 * time.Second) // wait for enough records to be inserted by startLoad + lg.waitForCount(1000) } targetTab1 = vc.getPrimaryTablet(t, targetKs, shard) @@ -259,7 +254,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // target side (customer2). require.Equal(t, postCutoverShardRoutingRules, getShardRoutingRules(t)) - stopLoad(t, cancelLoad) + lg.stop() // Cancel both reverse workflows (as we've done the cutover), which should // clean up both the global routing rules and the shard routing rules. diff --git a/test/config.json b/test/config.json index dd4d76b3d8c..c0f0b0825f8 100644 --- a/test/config.json +++ b/test/config.json @@ -1009,7 +1009,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesBuffering"], "Command": [], "Manual": false, - "Shard": "vreplication_movetables_buffering", + "Shard": "vreplication_cellalias", "RetryMax": 0, "Tags": [] }, From 7768d6149bd3773e5180bcb421ab2e9575deaa9c Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 26 Jul 2023 18:05:33 +0200 Subject: [PATCH 6/9] Correct design doc Signed-off-by: Rohit Nayak --- doc/design-docs/VTGateBuffering.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/design-docs/VTGateBuffering.md b/doc/design-docs/VTGateBuffering.md index e7e9987022d..25e9758cc3d 100644 --- a/doc/design-docs/VTGateBuffering.md +++ b/doc/design-docs/VTGateBuffering.md @@ -52,8 +52,8 @@ There are two main changes: notified when that changes as well. * The logic to start buffering needs to look for the "enforce denied tables" error that is thrown by the vttablets when it tries to execute a query on a table being switched. -* We cannot use the current buffering logic which is at the tablet gateway: meaning the keyspace is - already fixed by the planner and cannot be changed in that layer. We need to add a new buffering logic at a higher +* We cannot use the current query retry logic which is at the tablet gateway: meaning the keyspace is + already fixed by the planner and cannot be changed in that layer. We need to add a new retry logic at a higher level (the _newExecute_ method) and always replan before retrying a query. This also means that we need to bypass the plan cache while retrying. From 70a12c995a0da7c2f8a6aed91d51d9cb9de810c5 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 26 Jul 2023 18:12:37 +0200 Subject: [PATCH 7/9] Update workflow generator script Signed-off-by: Rohit Nayak --- test/ci_workflow_gen.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 0d012895175..f61c9af472f 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -121,7 +121,6 @@ var ( "vreplication_v2", "vreplication_partial_movetables_basic", "vreplication_partial_movetables_sequences", - "vreplication_movetables_buffering", "schemadiff_vrepl", "topo_connection_cache", "vtgate_partial_keyspace", From 3047ed3de0961c94447c8172358ef248928ac5cc Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 29 Jul 2023 23:11:33 +0200 Subject: [PATCH 8/9] Address review comments Signed-off-by: Rohit Nayak --- go/vt/vtgate/executor_framework_test.go | 3 --- go/vt/vtgate/plan_execute.go | 24 +++++++++--------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 09968a7f76a..25b5e4e4183 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -295,9 +295,6 @@ func assertQueries(t *testing.T, sbc *sandboxconn.SandboxConn, wantQueries []*qu got := query.Sql expected := wantQueries[idx].Sql assert.Equal(t, expected, got) - // FIXME: Bizarre behavior. The following log statement causes the test to pass. - // commenting the hack to surface the errors in CI, leaving the comment in, in case it can help debug the issue. - //log.Infof("\n%v\n%v", wantQueries[idx].BindVariables, query.BindVariables) assert.Equal(t, wantQueries[idx].BindVariables, query.BindVariables) idx++ } diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 8c28148057c..6523b25bd4e 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -86,7 +86,6 @@ func (e *Executor) newExecute( } // During MoveTables we need to replan since the routing rules change and hence the target keyspace will be different. - retrying := false var lastVSchemaCreated time.Time vs := e.VSchema() lastVSchemaCreated = vs.GetCreated() @@ -150,22 +149,17 @@ func (e *Executor) newExecute( err = execPlan(ctx, plan, vcursor, bindVars, execStart) } - if !safeSession.InTransaction() && err != nil { - rootCause := vterrors.RootCause(err) - if rootCause != nil && strings.Contains(rootCause.Error(), "enforce denied tables") { - log.Infof("%d:%t will retry query %s due to %v", try, retrying, query, err) - retrying = true - lastVSchemaCreated = vs.GetCreated() - continue - } else { - retrying = false - } - } else { - retrying = false + if err == nil || safeSession.InTransaction() { + return err } - if err == nil && try > 0 { - log.Infof("query %d:%t succeeded on retry: %s", try, retrying, query) + + rootCause := vterrors.RootCause(err) + if rootCause != nil && strings.Contains(rootCause.Error(), "enforce denied tables") { + log.V(2).Infof("Retry: %d, will retry query %s due to %v", try, query, err) + lastVSchemaCreated = vs.GetCreated() + continue } + return err } return vterrors.New(vtrpcpb.Code_INTERNAL, fmt.Sprintf("query %s failed after retries: %v ", query, err)) From c6e2b8391913bb852725dcc0af3ce5b9f8e1ef7a Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 30 Jul 2023 16:29:19 +0200 Subject: [PATCH 9/9] Self-review Signed-off-by: Rohit Nayak --- doc/design-docs/VTGateBuffering.md | 65 ++++++++++---------- go/test/endtoend/vreplication/helper_test.go | 2 - go/vt/vtgate/buffer/buffer.go | 2 +- go/vt/vtgate/buffer/flags.go | 2 + go/vt/vtgate/plan_execute.go | 11 +++- go/vt/vtgate/tabletgateway.go | 3 +- 6 files changed, 46 insertions(+), 39 deletions(-) diff --git a/doc/design-docs/VTGateBuffering.md b/doc/design-docs/VTGateBuffering.md index 25e9758cc3d..9155929ea49 100644 --- a/doc/design-docs/VTGateBuffering.md +++ b/doc/design-docs/VTGateBuffering.md @@ -2,32 +2,32 @@ ## Current buffering support in VTGate -VTGate currently supports buffering of queries during reparent and resharding operations. This is done by buffering the -failing queries in the tablet gateway layer in vtgate. When a query fails, the reason for the failure is checked. +VTGate currently supports buffering of queries during reparenting and resharding operations. This is done by buffering +the failing queries in the tablet gateway layer in vtgate. When a query fails, the reason for the failure is checked, to +see if is due to one of these. -To assist in diagnosing the root cause a _KeyspaceEventWatcher_ (aka KEW) was introduced. This runs in a goroutine and -watches the SrvKeyspace: if there is a change to the keyspace partitions in the topo it is considered that there is a -resharding operation in progress. +To assist in diagnosing the root cause a _KeyspaceEventWatcher_ (aka *KEW*) was introduced. This watches the +SrvKeyspace (in a goroutine): if there is a change to the keyspace partitions in the topo it is considered that there is +a resharding operation in progress. The buffering logic subscribes to the keyspace event watcher. -The buffering logic subscribes to the keyspace event watcher. - -Otherwise if there are no tables to serve from, based on the health check results, it is assumed that there is a cluster -event where either the primary is being reparented (for example, during a DML) or if the cluster is being restarted and -all tables are in the process of starting up. +Otherwise, if there are no tables to serve from, based on the health check results, it is assumed that there is a +cluster event where either the primary is being reparented or if the cluster is being restarted and all tablets are in +the process of starting up. If either of these occurs, the _consistent_ flag is set to false for that keyspace. When that happens the keyspace watcher checks, on every SrvKeyspace update, if the event has got resolved. This can happen when tablets are now available (in case of a cluster event) or if the partition information indicates that resharding is complete. When that happens. the keyspace event watcher publishes an event that the keyspace is now consistent. The buffers are -then drained and the queries retried (at the tablet gateway). +then drained and the queries retried by the tablet gateway. -## Additional support for MoveTables +## Adding buffering support for MoveTables ### Background -MoveTables does not affect the entire keyspace, just the tables being moved. So the KEW doesn't detect a cluster event -since the tablets are still available and shard partitions are unchanged. +MoveTables does not affect the entire keyspace, just the tables being moved. Even if all tables are being moved there is +no change in existing keyspace or shard configurations. So the KEW doesn't detect a cluster event since the tablets are +still available and shard partitions are unchanged. MoveTables moves tables from one keyspace to another. There are two flavors of MoveTables: one where the tables are moved into all shards in the target keyspace. In Shard-By-Shard Migration user can specify a subset of shards to move @@ -35,26 +35,29 @@ the tables into. These are the topo attributes that are affected during a MoveTables (regular or shard-by-shard): -* DeniedTables in a shard's TabletControls. These are used to stop writes to the source keyspace for these tables. While - switching writes we first create these entries, wait for the target to catchup to the source (using gtid positions), - and then update the routing rules to point these tables to the target. -* RoutingRules (for regular movetables) and ShardRoutingRules (for shard by shard migration). Simplified, routing rules - are pointers for each table being moved to a keyspace. When a MoveTables is initiated, that keyspace is the source - keyspace. After traffic is switched the pointer is changed to point to the target keyspace. If routing rules - are specified, VTGate uses them to decide which keyspace to route each table in a query to. +* *DeniedTables* in a shard's TabletControls. These are used to stop writes to the source keyspace for these tables. + While switching writes we first create these entries, wait for the target to catchup to the source (using gtid + positions), and then update the routing rules to point these tables to the target. When a primary sees a DeniedTables + entry during a DML it will error with an "enforce denied tables". +* *RoutingRules* (for regular movetables) and *ShardRoutingRules* (for shard by shard migration). Routing rules are + pointers for each table being moved to a keyspace. When a MoveTables is initiated, that keyspace is the source + keyspace. After traffic is switched the pointer is changed to point to the target keyspace. If routing rules are + specified, VTGate uses them to decide which keyspace to route each table. ### Changes There are two main changes: -* The keyspace event watcher is enhanced to look at the topo attributes mentioned above. We need to watch the - SrvVSchema for changes to the Routing Rules. DeniedTables are only in the Shard records in the topo. To get around - that we change the traffic switcher to also rebuild SrvVSchema when DeniedTables are modified. This way we get - notified when that changes as well. -* The logic to start buffering needs to look for the "enforce denied tables" error that is thrown by the - vttablets when it tries to execute a query on a table being switched. -* We cannot use the current query retry logic which is at the tablet gateway: meaning the keyspace is - already fixed by the planner and cannot be changed in that layer. We need to add a new retry logic at a higher - level (the _newExecute_ method) and always replan before retrying a query. This also means that we need to bypass - the plan cache while retrying. + +* The keyspace event watcher is enhanced to look at the topo attributes mentioned above. An SrvVSchema watcher looks for + changes in the Routing Rules. DeniedTables are only in the Shard records in the topo. So any changes to the + DeniedTables would not result in a notification. To get around that we change the traffic switcher to also rebuild + SrvVSchema when DeniedTables are modified. +* The logic to start buffering needs to look for the "enforce denied tables" error that is thrown by the vttablets when + it tries to execute a query on a table being switched. +* We cannot use the current query retry logic which is at the tablet gateway level: meaning the keyspace is already + fixed by the planner and cannot be changed in that layer. We need to add a new retry logic at a higher level (the + _newExecute_ method) and always replan before retrying a query. This also means that we need to bypass the plan cache + while retrying. + diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index f237cd58754..20d31948e26 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -770,8 +770,6 @@ func (lg *loadGenerator) start() { // this can happen when a second keyspace is setup with the same tables, but there are no routing rules // set yet by MoveTables. So we ignore these errors. atomic.AddInt64(&ambiguousErrors, 1) - log.Flush() - //panic(err) } else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") { atomic.AddInt64(&reshardedErrors, 1) } else if strings.Contains(strings.ToLower(err.Error()), "not found") { diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 96630aca13e..622bb03b082 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -112,7 +112,7 @@ func isFailoverError(err error) (string, bool) { switch vterrors.Code(err) { case vtrpcpb.Code_CLUSTER_EVENT: isFailover = true - case vtrpcpb.Code_FAILED_PRECONDITION: // previous attempt, not used, to be removed + case vtrpcpb.Code_FAILED_PRECONDITION: if strings.Contains(err.Error(), ClusterEventMoveTables) { isFailover = true } diff --git a/go/vt/vtgate/buffer/flags.go b/go/vt/vtgate/buffer/flags.go index 11848372d25..a17cc09ccc3 100644 --- a/go/vt/vtgate/buffer/flags.go +++ b/go/vt/vtgate/buffer/flags.go @@ -162,10 +162,12 @@ func NewDefaultConfig() *Config { } } +// EnableBuffering is used in tests where we require the keyspace event watcher to be created func EnableBuffering() { bufferEnabled = true } +// DisableBuffering is the counterpart of EnableBuffering func DisableBuffering() { bufferEnabled = false } diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 6523b25bd4e..5d2414ac275 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -85,7 +85,6 @@ func (e *Executor) newExecute( return err } - // During MoveTables we need to replan since the routing rules change and hence the target keyspace will be different. var lastVSchemaCreated time.Time vs := e.VSchema() lastVSchemaCreated = vs.GetCreated() @@ -104,9 +103,15 @@ func (e *Executor) newExecute( } // 3: Create a plan for the query + // If we are retrying, it is likely that the routing rules have changed and hence we need to + // replan the query since the target keyspace of the resolved shards may have changed as a + // result of MoveTables. So we cannot reuse the plan from the first try. + // When buffering ends, many queries might be getting planned at the same time. Ideally we + // should be able to reuse plans once the first drained query has been planned. For now, we + // punt on this and choose not to prematurely optimize since it is not clear how much caching + // will help and if it will result in hard-to-track edge cases. + var plan *engine.Plan - // If we are retrying, it is likely that the routing rules have changed and hence we need to replan the query since the target - // keyspace of the resolved shards may have changed. plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) execStart := e.logPlanningFinished(logStats, plan) diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index dfeb99fa25f..b468543e24d 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -272,7 +272,6 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, err = vterrors.Wrapf(bufferErr, "failed to automatically buffer and retry failed request during failover. original err (type=%T): %v", err, err) - log.Infof("%v", err) break } } @@ -283,7 +282,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, // or if a reparent operation is in progress. if kev := gw.kev; kev != nil { if kev.TargetIsBeingResharded(target) { - log.Infof("current keyspace is being resharded, retrying: %s: %s", target.Keyspace, debug.Stack()) + log.V(2).Infof("current keyspace is being resharded, retrying: %s: %s", target.Keyspace, debug.Stack()) err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReshardingInProgress) continue }