diff --git a/go/test/endtoend/vtgate/lookup_test.go b/go/test/endtoend/vtgate/lookup_test.go index c984459ab97..3294c1898d6 100644 --- a/go/test/endtoend/vtgate/lookup_test.go +++ b/go/test/endtoend/vtgate/lookup_test.go @@ -42,6 +42,45 @@ func TestUnownedLookupInsertNull(t *testing.T) { utils.Exec(t, conn, "insert into t8(id, parent_id, t9_id) VALUES (3, 2, 2)") } +func TestLookupUniqueWithAutocommit(t *testing.T) { + conn, closer := start(t) + defer closer() + + // conn2 is to check entries in the lookup table + conn2, err := mysql.Connect(context.Background(), &vtParams) + require.Nil(t, err) + defer conn2.Close() + + // Test that all vindex writes are autocommitted outside of any ongoing transactions. + // + // Also test that autocommited vindex entries are visible inside transactions, as lookups + // should also use the autocommit connection. + + utils.Exec(t, conn, "insert into t10(id, sharding_key) VALUES (1, 1)") + + utils.AssertMatches(t, conn2, "select id from t10_id_to_keyspace_id_idx order by id asc", "[[INT64(1)]]") + utils.AssertMatches(t, conn, "select id from t10 where id = 1", "[[INT64(1)]]") + + utils.Exec(t, conn, "begin") + + utils.Exec(t, conn, "insert into t10(id, sharding_key) VALUES (2, 1)") + + utils.AssertMatches(t, conn2, "select id from t10_id_to_keyspace_id_idx order by id asc", "[[INT64(1)] [INT64(2)]]") + utils.AssertMatches(t, conn, "select id from t10 where id = 2", "[[INT64(2)]]") + + utils.Exec(t, conn, "insert into t10(id, sharding_key) VALUES (3, 1)") + + utils.AssertMatches(t, conn2, "select id from t10_id_to_keyspace_id_idx order by id asc", "[[INT64(1)] [INT64(2)] [INT64(3)]]") + utils.AssertMatches(t, conn, "select id from t10 where id = 3", "[[INT64(3)]]") + + utils.Exec(t, conn, "savepoint sp_foobar") + + utils.Exec(t, conn, "insert into t10(id, sharding_key) VALUES (4, 1)") + + utils.AssertMatches(t, conn2, "select id from t10_id_to_keyspace_id_idx order by id asc", "[[INT64(1)] [INT64(2)] [INT64(3)] [INT64(4)]]") + utils.AssertMatches(t, conn, "select id from t10 where id = 4", "[[INT64(4)]]") +} + func TestUnownedLookupInsertChecksKeyspaceIdsAreMatching(t *testing.T) { conn, closer := start(t) defer closer() diff --git a/go/test/endtoend/vtgate/main_test.go b/go/test/endtoend/vtgate/main_test.go index f683be5cef1..1d2bc59b50a 100644 --- a/go/test/endtoend/vtgate/main_test.go +++ b/go/test/endtoend/vtgate/main_test.go @@ -110,7 +110,7 @@ func start(t *testing.T) (*mysql.Conn, func()) { deleteAll := func() { utils.Exec(t, conn, "use ks") - tables := []string{"t1", "t2", "vstream_test", "t3", "t4", "t6", "t7_xxhash", "t7_xxhash_idx", "t7_fk", "t8", "t9", "t9_id_to_keyspace_id_idx", "t1_id2_idx", "t2_id4_idx", "t3_id7_idx", "t4_id2_idx", "t5_null_vindex", "t6_id2_idx"} + tables := []string{"t1", "t2", "vstream_test", "t3", "t4", "t6", "t7_xxhash", "t7_xxhash_idx", "t7_fk", "t8", "t9", "t9_id_to_keyspace_id_idx", "t10", "t10_id_to_keyspace_id_idx", "t1_id2_idx", "t2_id4_idx", "t3_id7_idx", "t4_id2_idx", "t5_null_vindex", "t6_id2_idx"} for _, table := range tables { _, _ = utils.ExecAllowError(t, conn, "delete from "+table) } diff --git a/go/test/endtoend/vtgate/schema.sql b/go/test/endtoend/vtgate/schema.sql index c597bd7e53e..536bec397ec 100644 --- a/go/test/endtoend/vtgate/schema.sql +++ b/go/test/endtoend/vtgate/schema.sql @@ -138,3 +138,17 @@ create table t9_id_to_keyspace_id_idx keyspace_id varbinary(10), primary key (id) ) Engine = InnoDB; + +create table t10 +( + id bigint, + sharding_key bigint, + primary key (id) +) Engine = InnoDB; + +create table t10_id_to_keyspace_id_idx +( + id bigint, + keyspace_id varbinary(10), + primary key (id) +) Engine = InnoDB; diff --git a/go/test/endtoend/vtgate/vschema.json b/go/test/endtoend/vtgate/vschema.json index 3aafd1106b5..8d16beec2a6 100644 --- a/go/test/endtoend/vtgate/vschema.json +++ b/go/test/endtoend/vtgate/vschema.json @@ -79,6 +79,16 @@ "to": "keyspace_id" }, "owner": "t9" + }, + "t10_id_to_keyspace_id_idx": { + "type": "lookup_unique", + "params": { + "autocommit": "true", + "table": "t10_id_to_keyspace_id_idx", + "from": "id", + "to": "keyspace_id" + }, + "owner": "t10" } }, "tables": { @@ -271,6 +281,26 @@ "name": "hash" } ] + }, + "t10": { + "column_vindexes": [ + { + "column": "sharding_key", + "name": "hash" + }, + { + "column": "id", + "name": "t10_id_to_keyspace_id_idx" + } + ] + }, + "t10_id_to_keyspace_id_idx": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] } } } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 7d3b6a9079c..702a3a47361 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -105,6 +105,10 @@ func (t *noopVCursor) ExecutePrimitive(ctx context.Context, primitive Primitive, return primitive.TryExecute(ctx, t, bindVars, wantfields) } +func (t *noopVCursor) ExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + return primitive.TryExecute(ctx, t, bindVars, wantfields) +} + func (t *noopVCursor) StreamExecutePrimitive(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { return primitive.TryStreamExecute(ctx, t, bindVars, wantfields, callback) } @@ -361,6 +365,10 @@ func (f *loggingVCursor) ExecutePrimitive(ctx context.Context, primitive Primiti return primitive.TryExecute(ctx, f, bindVars, wantfields) } +func (f *loggingVCursor) ExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + return primitive.TryExecute(ctx, f, bindVars, wantfields) +} + func (f *loggingVCursor) StreamExecutePrimitive(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { return primitive.TryStreamExecute(ctx, f, bindVars, wantfields, callback) } diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index c1fcca4d0b1..36d0719796b 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -57,9 +57,15 @@ type ( Execute(ctx context.Context, method string, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) AutocommitApproval() bool - // Primitive functions + // Execute the given primitive ExecutePrimitive(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) + // Execute the given primitive in a new autocommit session + ExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) + + // Execute the given primitive StreamExecutePrimitive(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error + // Execute the given primitive in a new autocommit session + StreamExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(result *sqltypes.Result) error) error // Shard-level functions. ExecuteMultiShard(ctx context.Context, primitive Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) @@ -109,9 +115,6 @@ type ( // ReleaseLock releases all the held advisory locks. ReleaseLock(ctx context.Context) error - - // StreamExecutePrimitiveStandalone executes the primitive in its own new autocommit session. - StreamExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(result *sqltypes.Result) error) error } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/engine/vindex_lookup.go b/go/vt/vtgate/engine/vindex_lookup.go index 8883f138dcd..816507ae086 100644 --- a/go/vt/vtgate/engine/vindex_lookup.go +++ b/go/vt/vtgate/engine/vindex_lookup.go @@ -187,10 +187,17 @@ func (vr *VindexLookup) executeNonBatch(ctx context.Context, vcursor VCursor, id bindVars := map[string]*querypb.BindVariable{ vr.Arguments[0]: vars, } - result, err := vcursor.ExecutePrimitive(ctx, vr.Lookup, bindVars, false) + + var result *sqltypes.Result + if vr.Vindex.AutoCommitEnabled() { + result, err = vcursor.ExecutePrimitiveStandalone(ctx, vr.Lookup, bindVars, false) + } else { + result, err = vcursor.ExecutePrimitive(ctx, vr.Lookup, bindVars, false) + } if err != nil { return nil, err } + rows := make([][]sqltypes.Value, 0, len(result.Rows)) for _, row := range result.Rows { rows = append(rows, []sqltypes.Value{row[1]}) @@ -212,7 +219,17 @@ func (vr *VindexLookup) executeBatch(ctx context.Context, vcursor VCursor, ids [ bindVars := map[string]*querypb.BindVariable{ vr.Arguments[0]: vars, } - result, err := vcursor.ExecutePrimitive(ctx, vr.Lookup, bindVars, false) + + var result *sqltypes.Result + if vr.Vindex.AutoCommitEnabled() { + result, err = vcursor.ExecutePrimitiveStandalone(ctx, vr.Lookup, bindVars, false) + } else { + result, err = vcursor.ExecutePrimitive(ctx, vr.Lookup, bindVars, false) + } + if err != nil { + return nil, err + } + if err != nil { return nil, vterrors.Wrapf(err, "failed while running the lookup query") } diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index ed84e11429d..63498a9f184 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -109,6 +109,7 @@ func (*nameLkpIndex) Cost() int { return 3 } func (*nameLkpIndex) IsUnique() bool { return false } func (*nameLkpIndex) NeedsVCursor() bool { return false } func (*nameLkpIndex) AllowBatch() bool { return true } +func (*nameLkpIndex) AutoCommitEnabled() bool { return false } func (*nameLkpIndex) GetCommitOrder() vtgatepb.CommitOrder { return vtgatepb.CommitOrder_NORMAL } func (*nameLkpIndex) Verify(context.Context, vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) { return []bool{}, nil diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index f48c35be17c..7bd5a8c78c2 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -413,6 +413,19 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available") } +func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + // clone the vcursorImpl with a new session. + newVC := vc.cloneWithAutocommitSession() + for try := 0; try < MaxBufferingRetries; try++ { + res, err := primitive.TryExecute(ctx, newVC, bindVars, wantfields) + if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError { + continue + } + return res, err + } + return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available") +} + func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { for try := 0; try < MaxBufferingRetries; try++ { err := primitive.TryStreamExecute(ctx, vc, bindVars, wantfields, callback) @@ -1063,6 +1076,7 @@ func (vc *vcursorImpl) ReleaseLock(ctx context.Context) error { func (vc *vcursorImpl) cloneWithAutocommitSession() *vcursorImpl { safeSession := NewAutocommitSession(vc.safeSession.Session) + safeSession.logging = vc.safeSession.logging return &vcursorImpl{ safeSession: safeSession, keyspace: vc.keyspace, diff --git a/go/vt/vtgate/vindexes/consistent_lookup.go b/go/vt/vtgate/vindexes/consistent_lookup.go index 1cd372906f4..3c2166c0aaf 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup.go +++ b/go/vt/vtgate/vindexes/consistent_lookup.go @@ -148,6 +148,10 @@ func (lu *ConsistentLookup) AllowBatch() bool { return lu.lkp.BatchLookup } +func (lu *ConsistentLookup) AutoCommitEnabled() bool { + return lu.lkp.Autocommit +} + //==================================================================== // ConsistentLookupUnique defines a vindex that uses a lookup table. @@ -240,6 +244,10 @@ func (lu *ConsistentLookupUnique) AllowBatch() bool { return lu.lkp.BatchLookup } +func (lu *ConsistentLookupUnique) AutoCommitEnabled() bool { + return lu.lkp.Autocommit +} + //==================================================================== // clCommon defines a vindex that uses a lookup table. diff --git a/go/vt/vtgate/vindexes/lookup.go b/go/vt/vtgate/vindexes/lookup.go index 6884177b074..9ac514175df 100644 --- a/go/vt/vtgate/vindexes/lookup.go +++ b/go/vt/vtgate/vindexes/lookup.go @@ -58,6 +58,10 @@ func (ln *LookupNonUnique) AllowBatch() bool { return ln.lkp.BatchLookup } +func (ln *LookupNonUnique) AutoCommitEnabled() bool { + return ln.lkp.Autocommit +} + // String returns the name of the vindex. func (ln *LookupNonUnique) String() string { return ln.name @@ -233,6 +237,10 @@ func (lu *LookupUnique) AllowBatch() bool { return lu.lkp.BatchLookup } +func (lu *LookupUnique) AutoCommitEnabled() bool { + return lu.lkp.Autocommit +} + // NewLookupUnique creates a LookupUnique vindex. // The supplied map has the following required fields: // diff --git a/go/vt/vtgate/vindexes/lookup_hash.go b/go/vt/vtgate/vindexes/lookup_hash.go index d6721cf44b0..993b9655660 100644 --- a/go/vt/vtgate/vindexes/lookup_hash.go +++ b/go/vt/vtgate/vindexes/lookup_hash.go @@ -171,6 +171,10 @@ func (lh *LookupHash) AllowBatch() bool { return lh.lkp.BatchLookup } +func (lh *LookupHash) AutoCommitEnabled() bool { + return lh.lkp.Autocommit +} + // GetCommitOrder implements the LookupPlanable interface func (lh *LookupHash) GetCommitOrder() vtgatepb.CommitOrder { return vtgatepb.CommitOrder_NORMAL @@ -403,6 +407,10 @@ func (lhu *LookupHashUnique) AllowBatch() bool { return lhu.lkp.BatchLookup } +func (lhu *LookupHashUnique) AutoCommitEnabled() bool { + return lhu.lkp.Autocommit +} + func (lhu *LookupHashUnique) Query() (selQuery string, arguments []string) { return lhu.lkp.query() } diff --git a/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go b/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go index 884fd7c99b9..433234b82cb 100644 --- a/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go +++ b/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go @@ -192,6 +192,10 @@ func (lh *LookupUnicodeLooseMD5Hash) AllowBatch() bool { return lh.lkp.BatchLookup } +func (lh *LookupUnicodeLooseMD5Hash) AutoCommitEnabled() bool { + return lh.lkp.Autocommit +} + // GetCommitOrder implements the LookupPlanable interface func (lh *LookupUnicodeLooseMD5Hash) GetCommitOrder() vtgatepb.CommitOrder { return vtgatepb.CommitOrder_NORMAL @@ -402,6 +406,10 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) AllowBatch() bool { return lhu.lkp.BatchLookup } +func (lhu *LookupUnicodeLooseMD5HashUnique) AutoCommitEnabled() bool { + return lhu.lkp.Autocommit +} + // GetCommitOrder implements the LookupPlanable interface func (lhu *LookupUnicodeLooseMD5HashUnique) GetCommitOrder() vtgatepb.CommitOrder { return vtgatepb.CommitOrder_NORMAL diff --git a/go/vt/vtgate/vindexes/vindex.go b/go/vt/vtgate/vindexes/vindex.go index dcb96e73a28..700b8e6175c 100644 --- a/go/vt/vtgate/vindexes/vindex.go +++ b/go/vt/vtgate/vindexes/vindex.go @@ -143,6 +143,7 @@ type ( MapResult(ids []sqltypes.Value, results []*sqltypes.Result) ([]key.Destination, error) AllowBatch() bool GetCommitOrder() vtgatepb.CommitOrder + AutoCommitEnabled() bool } // LookupBackfill interfaces all lookup vindexes that can backfill rows, such as LookupUnique.