From 314ebcf13923f98945595208d5099eca4a7184ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Taylor?= Date: Sat, 14 Oct 2023 10:17:41 +0200 Subject: [PATCH] bugfix: use the proper interface for comment directives (#14267) --- .../vtgate/queries/no_scatter/main_test.go | 78 +++++++++++++++++++ .../vtgate/queries/no_scatter/queries_test.go | 62 +++++++++++++++ .../vtgate/queries/no_scatter/schema.sql | 13 ++++ .../vtgate/queries/no_scatter/vschema.json | 26 +++++++ go/vt/sqlparser/analyzer.go | 16 +--- go/vt/sqlparser/ast.go | 8 ++ go/vt/sqlparser/comments.go | 71 +++-------------- go/vt/sqlparser/comments_test.go | 20 ++--- go/vt/vtgate/executor_ddl_test.go | 4 +- go/vt/vtgate/executor_test.go | 3 +- go/vt/vtgate/planbuilder/vexplain.go | 5 +- go/vt/vttablet/tabletserver/query_engine.go | 12 +-- test/config.json | 9 +++ 13 files changed, 227 insertions(+), 100 deletions(-) create mode 100644 go/test/endtoend/vtgate/queries/no_scatter/main_test.go create mode 100644 go/test/endtoend/vtgate/queries/no_scatter/queries_test.go create mode 100644 go/test/endtoend/vtgate/queries/no_scatter/schema.sql create mode 100644 go/test/endtoend/vtgate/queries/no_scatter/vschema.json diff --git a/go/test/endtoend/vtgate/queries/no_scatter/main_test.go b/go/test/endtoend/vtgate/queries/no_scatter/main_test.go new file mode 100644 index 00000000000..c4b0974c24b --- /dev/null +++ b/go/test/endtoend/vtgate/queries/no_scatter/main_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregation + +import ( + _ "embed" + "flag" + "os" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "test" + + //go:embed schema.sql + schemaSQL string + + //go:embed vschema.json + vschema string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + VSchema: vschema, + } + clusterInstance.VtGateExtraArgs = []string{"--no_scatter=true"} + err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 0, false) + if err != nil { + return 1 + } + // Start vtgate + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + + vtParams = clusterInstance.GetVTParams(keyspaceName) + + return m.Run() + }() + os.Exit(exitCode) +} diff --git a/go/test/endtoend/vtgate/queries/no_scatter/queries_test.go b/go/test/endtoend/vtgate/queries/no_scatter/queries_test.go new file mode 100644 index 00000000000..7bf702afc15 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/no_scatter/queries_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregation + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" +) + +func start(t *testing.T) (*mysql.Conn, func()) { + vtConn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + + deleteAll := func() { + tables := []string{"music", "user"} + for _, table := range tables { + utils.Exec(t, vtConn, "delete from "+table) + } + } + + deleteAll() + + return vtConn, func() { + deleteAll() + vtConn.Close() + cluster.PanicHandler(t) + } +} + +func TestFailsWhenForcedToScatter(t *testing.T) { + vtconn, closer := start(t) + defer closer() + + utils.Exec(t, vtconn, "insert into music(id, user_id) values(1,1), (2,5), (3,1), (4,2), (5,3), (6,4), (7,5)") + utils.Exec(t, vtconn, "insert into user(id, name) values(1,'toto'), (2,'tata'), (3,'titi'), (4,'tete'), (5,'foo')") + + _, err := utils.ExecAllowError(t, vtconn, "select * from user") // fails since we have disallowed scatter + require.ErrorContains(t, err, "plan includes scatter, which is disallowed") + + _ = utils.Exec(t, vtconn, "select /*vt+ ALLOW_SCATTER */ * from user") // passes thanks to the comment directive + _ = utils.Exec(t, vtconn, "vexplain select /*vt+ ALLOW_SCATTER */ * from user") // passes thanks to the comment directive +} diff --git a/go/test/endtoend/vtgate/queries/no_scatter/schema.sql b/go/test/endtoend/vtgate/queries/no_scatter/schema.sql new file mode 100644 index 00000000000..cf608028ed5 --- /dev/null +++ b/go/test/endtoend/vtgate/queries/no_scatter/schema.sql @@ -0,0 +1,13 @@ +create table user +( + id bigint, + name varchar(255), + primary key (id) +) Engine = InnoDB; + +create table music +( + id bigint, + user_id bigint, + primary key (id) +) Engine = InnoDB; diff --git a/go/test/endtoend/vtgate/queries/no_scatter/vschema.json b/go/test/endtoend/vtgate/queries/no_scatter/vschema.json new file mode 100644 index 00000000000..82461e6e57a --- /dev/null +++ b/go/test/endtoend/vtgate/queries/no_scatter/vschema.json @@ -0,0 +1,26 @@ +{ + "sharded": true, + "vindexes": { + "user_index": { + "type": "hash" + } + }, + "tables": { + "user": { + "column_vindexes": [ + { + "column": "id", + "name": "user_index" + } + ] + }, + "music": { + "column_vindexes": [ + { + "column": "user_id", + "name": "user_index" + } + ] + } + } +} diff --git a/go/vt/sqlparser/analyzer.go b/go/vt/sqlparser/analyzer.go index f3b1fc10340..b4015f7937b 100644 --- a/go/vt/sqlparser/analyzer.go +++ b/go/vt/sqlparser/analyzer.go @@ -145,22 +145,12 @@ func CanNormalize(stmt Statement) bool { // CachePlan takes Statement and returns true if the query plan should be cached func CachePlan(stmt Statement) bool { - var comments *ParsedComments - switch stmt := stmt.(type) { - case *Select: - comments = stmt.Comments - case *Insert: - comments = stmt.Comments - case *Update: - comments = stmt.Comments - case *Delete: - comments = stmt.Comments - case *Union, *Stream: - return true + switch stmt.(type) { + case *Select, *Insert, *Update, *Delete, *Union, *Stream: + return !checkDirective(stmt, DirectiveSkipQueryPlanCache) default: return false } - return !comments.Directives().IsSet(DirectiveSkipQueryPlanCache) } // MustRewriteAST takes Statement and returns true if RewriteAST must run on it for correct execution irrespective of user flags. diff --git a/go/vt/sqlparser/ast.go b/go/vt/sqlparser/ast.go index b4dd7ea49d6..c167c9971c8 100644 --- a/go/vt/sqlparser/ast.go +++ b/go/vt/sqlparser/ast.go @@ -1388,6 +1388,14 @@ func (node *ExplainStmt) GetParsedComments() *ParsedComments { // GetParsedComments implements Commented interface. func (node *VExplainStmt) GetParsedComments() *ParsedComments { + if node.Comments == nil { + cmt, ok := node.Statement.(Commented) + if !ok { + return nil + } + return cmt.GetParsedComments() + } + return node.Comments } diff --git a/go/vt/sqlparser/comments.go b/go/vt/sqlparser/comments.go index 4ecf7b1b293..84b73f8e81c 100644 --- a/go/vt/sqlparser/comments.go +++ b/go/vt/sqlparser/comments.go @@ -322,86 +322,39 @@ func (d *CommentDirectives) GetString(key string, defaultVal string) (string, bo // MultiShardAutocommitDirective returns true if multishard autocommit directive is set to true in query. func MultiShardAutocommitDirective(stmt Statement) bool { - var comments *ParsedComments - switch stmt := stmt.(type) { - case *Insert: - comments = stmt.Comments - case *Update: - comments = stmt.Comments - case *Delete: - comments = stmt.Comments - } - return comments != nil && comments.Directives().IsSet(DirectiveMultiShardAutocommit) -} - -// SkipQueryPlanCacheDirective returns true if skip query plan cache directive is set to true in query. -func SkipQueryPlanCacheDirective(stmt Statement) bool { - var comments *ParsedComments - switch stmt := stmt.(type) { - case *Select: - comments = stmt.Comments - case *Insert: - comments = stmt.Comments - case *Update: - comments = stmt.Comments - case *Delete: - comments = stmt.Comments - } - return comments != nil && comments.Directives().IsSet(DirectiveSkipQueryPlanCache) + return checkDirective(stmt, DirectiveMultiShardAutocommit) } // IgnoreMaxPayloadSizeDirective returns true if the max payload size override // directive is set to true. func IgnoreMaxPayloadSizeDirective(stmt Statement) bool { - var comments *ParsedComments switch stmt := stmt.(type) { // For transactional statements, they should always be passed down and // should not come into max payload size requirement. case *Begin, *Commit, *Rollback, *Savepoint, *SRollback, *Release: return true - case *Select: - comments = stmt.Comments - case *Insert: - comments = stmt.Comments - case *Update: - comments = stmt.Comments - case *Delete: - comments = stmt.Comments + default: + return checkDirective(stmt, DirectiveIgnoreMaxPayloadSize) } - return comments != nil && comments.Directives().IsSet(DirectiveIgnoreMaxPayloadSize) } // IgnoreMaxMaxMemoryRowsDirective returns true if the max memory rows override // directive is set to true. func IgnoreMaxMaxMemoryRowsDirective(stmt Statement) bool { - var comments *ParsedComments - switch stmt := stmt.(type) { - case *Select: - comments = stmt.Comments - case *Insert: - comments = stmt.Comments - case *Update: - comments = stmt.Comments - case *Delete: - comments = stmt.Comments - } - return comments != nil && comments.Directives().IsSet(DirectiveIgnoreMaxMemoryRows) + return checkDirective(stmt, DirectiveIgnoreMaxMemoryRows) } // AllowScatterDirective returns true if the allow scatter override is set to true func AllowScatterDirective(stmt Statement) bool { - var comments *ParsedComments - switch stmt := stmt.(type) { - case *Select: - comments = stmt.Comments - case *Insert: - comments = stmt.Comments - case *Update: - comments = stmt.Comments - case *Delete: - comments = stmt.Comments + return checkDirective(stmt, DirectiveAllowScatter) +} + +func checkDirective(stmt Statement, key string) bool { + cmt, ok := stmt.(Commented) + if ok { + return cmt.GetParsedComments().Directives().IsSet(key) } - return comments != nil && comments.Directives().IsSet(DirectiveAllowScatter) + return false } // GetPriorityFromStatement gets the priority from the provided Statement, using DirectivePriority diff --git a/go/vt/sqlparser/comments_test.go b/go/vt/sqlparser/comments_test.go index a1530cc3812..b3c1bf9fec8 100644 --- a/go/vt/sqlparser/comments_test.go +++ b/go/vt/sqlparser/comments_test.go @@ -394,29 +394,19 @@ func TestExtractCommentDirectives(t *testing.T) { func TestSkipQueryPlanCacheDirective(t *testing.T) { stmt, _ := Parse("insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)") - if !SkipQueryPlanCacheDirective(stmt) { - t.Errorf("d.SkipQueryPlanCacheDirective(stmt) should be true") - } + assert.False(t, CachePlan(stmt)) stmt, _ = Parse("insert into user(id) values (1), (2)") - if SkipQueryPlanCacheDirective(stmt) { - t.Errorf("d.SkipQueryPlanCacheDirective(stmt) should be false") - } + assert.True(t, CachePlan(stmt)) stmt, _ = Parse("update /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ users set name=1") - if !SkipQueryPlanCacheDirective(stmt) { - t.Errorf("d.SkipQueryPlanCacheDirective(stmt) should be true") - } + assert.False(t, CachePlan(stmt)) stmt, _ = Parse("select /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ * from users") - if !SkipQueryPlanCacheDirective(stmt) { - t.Errorf("d.SkipQueryPlanCacheDirective(stmt) should be true") - } + assert.False(t, CachePlan(stmt)) stmt, _ = Parse("delete /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ from users") - if !SkipQueryPlanCacheDirective(stmt) { - t.Errorf("d.SkipQueryPlanCacheDirective(stmt) should be true") - } + assert.False(t, CachePlan(stmt)) } func TestIgnoreMaxPayloadSizeDirective(t *testing.T) { diff --git a/go/vt/vtgate/executor_ddl_test.go b/go/vt/vtgate/executor_ddl_test.go index b2502ab247a..3274fd94475 100644 --- a/go/vt/vtgate/executor_ddl_test.go +++ b/go/vt/vtgate/executor_ddl_test.go @@ -26,8 +26,6 @@ import ( ) func TestDDLFlags(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - session := NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}) defer func() { enableOnlineDDL = true enableDirectDDL = true @@ -57,6 +55,8 @@ func TestDDLFlags(t *testing.T) { } for _, testcase := range testcases { t.Run(fmt.Sprintf("%s-%v-%v", testcase.sql, testcase.enableDirectDDL, testcase.enableOnlineDDL), func(t *testing.T) { + executor, _, _, _, ctx := createExecutorEnv(t) + session := NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}) enableDirectDDL = testcase.enableDirectDDL enableOnlineDDL = testcase.enableOnlineDDL _, err := executor.Execute(ctx, nil, "TestDDLFlags", session, testcase.sql, nil) diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index c0c52fa7377..59c045529a7 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1371,8 +1371,6 @@ func TestExecutorDDL(t *testing.T) { } func TestExecutorDDLFk(t *testing.T) { - executor, _, _, sbc, ctx := createExecutorEnv(t) - mName := "TestExecutorDDLFk" stmts := []string{ "create table t1(id bigint primary key, foreign key (id) references t2(id))", @@ -1382,6 +1380,7 @@ func TestExecutorDDLFk(t *testing.T) { for _, stmt := range stmts { for _, fkMode := range []string{"allow", "disallow"} { t.Run(stmt+fkMode, func(t *testing.T) { + executor, _, _, sbc, ctx := createExecutorEnv(t) sbc.ExecCount.Store(0) foreignKeyMode = fkMode _, err := executor.Execute(ctx, nil, mName, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}), stmt, nil) diff --git a/go/vt/vtgate/planbuilder/vexplain.go b/go/vt/vtgate/planbuilder/vexplain.go index 1e01576b25d..5c99ab87a95 100644 --- a/go/vt/vtgate/planbuilder/vexplain.go +++ b/go/vt/vtgate/planbuilder/vexplain.go @@ -118,10 +118,9 @@ func buildVExplainLoggingPlan(ctx context.Context, explain *sqlparser.VExplainSt switch input.primitive.(type) { case *engine.Insert, *engine.Delete, *engine.Update: directives := explain.GetParsedComments().Directives() - if directives.IsSet(sqlparser.DirectiveVExplainRunDMLQueries) { - break + if !directives.IsSet(sqlparser.DirectiveVExplainRunDMLQueries) { + return nil, vterrors.VT09008() } - return nil, vterrors.VT09008() } return &planResult{primitive: &engine.VExplain{Input: input.primitive, Type: explain.Type}, tables: input.tables}, nil diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 82fa03924c7..1881dd4091a 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -370,11 +370,11 @@ func (qe *QueryEngine) getPlan(curSchema *currentSchema, sql string) (*TabletPla plan := &TabletPlan{Plan: splan, Original: sql} plan.Rules = qe.queryRuleSources.FilterByPlan(sql, plan.PlanID, plan.TableNames()...) plan.buildAuthorized() - if plan.PlanID == planbuilder.PlanDDL || plan.PlanID == planbuilder.PlanSet || sqlparser.SkipQueryPlanCacheDirective(statement) { - return plan, errNoCache + if sqlparser.CachePlan(statement) { + return plan, nil } - return plan, nil + return plan, errNoCache } // GetPlan returns the TabletPlan that for the query. Plans are cached in a theine LRU cache. @@ -417,11 +417,11 @@ func (qe *QueryEngine) getStreamPlan(curSchema *currentSchema, sql string) (*Tab plan.Rules = qe.queryRuleSources.FilterByPlan(sql, plan.PlanID, plan.TableName().String()) plan.buildAuthorized() - if sqlparser.SkipQueryPlanCacheDirective(statement) { - return plan, errNoCache + if sqlparser.CachePlan(statement) { + return plan, nil } - return plan, nil + return plan, errNoCache } // GetStreamPlan returns the TabletPlan that for the query. Plans are cached in a theine LRU cache. diff --git a/test/config.json b/test/config.json index 2654169d2d0..d38b3603a81 100644 --- a/test/config.json +++ b/test/config.json @@ -563,6 +563,15 @@ "RetryMax": 2, "Tags": [] }, + "vtgate_queries_no_scatter": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/queries/no_scatter"], + "Command": [], + "Manual": false, + "Shard": "vtgate_queries", + "RetryMax": 1, + "Tags": [] + }, "vtgate_queries_orderby": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/queries/orderby", "-timeout", "20m"],