Skip to content

Commit

Permalink
[Gen4] Fix lookup vindexes with autocommit enabled (#12172)
Browse files Browse the repository at this point in the history
* test: show that `autocommit` on lookup vindexes is broken when used after a savepoint is created

Signed-off-by: Arthur Schreiber <arthurschreiber@github.com>

* test: rework the test case to show that the problem lies with transactions

Signed-off-by: Arthur Schreiber <arthurschreiber@github.com>

* Correctly handle `autocommit` in the `VIndexLookup` primitive.

Signed-off-by: Arthur Schreiber <arthurschreiber@github.com>

* Copy the `logging` struct so autocommit queries show up correctly in `vexplain` output.

Signed-off-by: Arthur Schreiber <arthurschreiber@github.com>

---------

Signed-off-by: Arthur Schreiber <arthurschreiber@github.com>
  • Loading branch information
arthurschreiber authored Feb 3, 2023
1 parent b73da20 commit ee0670f
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 7 deletions.
39 changes: 39 additions & 0 deletions go/test/endtoend/vtgate/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,45 @@ func TestUnownedLookupInsertNull(t *testing.T) {
utils.Exec(t, conn, "insert into t8(id, parent_id, t9_id) VALUES (3, 2, 2)")
}

func TestLookupUniqueWithAutocommit(t *testing.T) {
conn, closer := start(t)
defer closer()

// conn2 is to check entries in the lookup table
conn2, err := mysql.Connect(context.Background(), &vtParams)
require.Nil(t, err)
defer conn2.Close()

// Test that all vindex writes are autocommitted outside of any ongoing transactions.
//
// Also test that autocommited vindex entries are visible inside transactions, as lookups
// should also use the autocommit connection.

utils.Exec(t, conn, "insert into t10(id, sharding_key) VALUES (1, 1)")

utils.AssertMatches(t, conn2, "select id from t10_id_to_keyspace_id_idx order by id asc", "[[INT64(1)]]")
utils.AssertMatches(t, conn, "select id from t10 where id = 1", "[[INT64(1)]]")

utils.Exec(t, conn, "begin")

utils.Exec(t, conn, "insert into t10(id, sharding_key) VALUES (2, 1)")

utils.AssertMatches(t, conn2, "select id from t10_id_to_keyspace_id_idx order by id asc", "[[INT64(1)] [INT64(2)]]")
utils.AssertMatches(t, conn, "select id from t10 where id = 2", "[[INT64(2)]]")

utils.Exec(t, conn, "insert into t10(id, sharding_key) VALUES (3, 1)")

utils.AssertMatches(t, conn2, "select id from t10_id_to_keyspace_id_idx order by id asc", "[[INT64(1)] [INT64(2)] [INT64(3)]]")
utils.AssertMatches(t, conn, "select id from t10 where id = 3", "[[INT64(3)]]")

utils.Exec(t, conn, "savepoint sp_foobar")

utils.Exec(t, conn, "insert into t10(id, sharding_key) VALUES (4, 1)")

utils.AssertMatches(t, conn2, "select id from t10_id_to_keyspace_id_idx order by id asc", "[[INT64(1)] [INT64(2)] [INT64(3)] [INT64(4)]]")
utils.AssertMatches(t, conn, "select id from t10 where id = 4", "[[INT64(4)]]")
}

func TestUnownedLookupInsertChecksKeyspaceIdsAreMatching(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtgate/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func start(t *testing.T) (*mysql.Conn, func()) {

deleteAll := func() {
utils.Exec(t, conn, "use ks")
tables := []string{"t1", "t2", "vstream_test", "t3", "t4", "t6", "t7_xxhash", "t7_xxhash_idx", "t7_fk", "t8", "t9", "t9_id_to_keyspace_id_idx", "t1_id2_idx", "t2_id4_idx", "t3_id7_idx", "t4_id2_idx", "t5_null_vindex", "t6_id2_idx"}
tables := []string{"t1", "t2", "vstream_test", "t3", "t4", "t6", "t7_xxhash", "t7_xxhash_idx", "t7_fk", "t8", "t9", "t9_id_to_keyspace_id_idx", "t10", "t10_id_to_keyspace_id_idx", "t1_id2_idx", "t2_id4_idx", "t3_id7_idx", "t4_id2_idx", "t5_null_vindex", "t6_id2_idx"}
for _, table := range tables {
_, _ = utils.ExecAllowError(t, conn, "delete from "+table)
}
Expand Down
14 changes: 14 additions & 0 deletions go/test/endtoend/vtgate/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,17 @@ create table t9_id_to_keyspace_id_idx
keyspace_id varbinary(10),
primary key (id)
) Engine = InnoDB;

create table t10
(
id bigint,
sharding_key bigint,
primary key (id)
) Engine = InnoDB;

create table t10_id_to_keyspace_id_idx
(
id bigint,
keyspace_id varbinary(10),
primary key (id)
) Engine = InnoDB;
30 changes: 30 additions & 0 deletions go/test/endtoend/vtgate/vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@
"to": "keyspace_id"
},
"owner": "t9"
},
"t10_id_to_keyspace_id_idx": {
"type": "lookup_unique",
"params": {
"autocommit": "true",
"table": "t10_id_to_keyspace_id_idx",
"from": "id",
"to": "keyspace_id"
},
"owner": "t10"
}
},
"tables": {
Expand Down Expand Up @@ -271,6 +281,26 @@
"name": "hash"
}
]
},
"t10": {
"column_vindexes": [
{
"column": "sharding_key",
"name": "hash"
},
{
"column": "id",
"name": "t10_id_to_keyspace_id_idx"
}
]
},
"t10_id_to_keyspace_id_idx": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
}
}
}
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func (t *noopVCursor) ExecutePrimitive(ctx context.Context, primitive Primitive,
return primitive.TryExecute(ctx, t, bindVars, wantfields)
}

func (t *noopVCursor) ExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
return primitive.TryExecute(ctx, t, bindVars, wantfields)
}

func (t *noopVCursor) StreamExecutePrimitive(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
return primitive.TryStreamExecute(ctx, t, bindVars, wantfields, callback)
}
Expand Down Expand Up @@ -361,6 +365,10 @@ func (f *loggingVCursor) ExecutePrimitive(ctx context.Context, primitive Primiti
return primitive.TryExecute(ctx, f, bindVars, wantfields)
}

func (f *loggingVCursor) ExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
return primitive.TryExecute(ctx, f, bindVars, wantfields)
}

func (f *loggingVCursor) StreamExecutePrimitive(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
return primitive.TryStreamExecute(ctx, f, bindVars, wantfields, callback)
}
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,15 @@ type (
Execute(ctx context.Context, method string, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error)
AutocommitApproval() bool

// Primitive functions
// Execute the given primitive
ExecutePrimitive(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error)
// Execute the given primitive in a new autocommit session
ExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error)

// Execute the given primitive
StreamExecutePrimitive(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error
// Execute the given primitive in a new autocommit session
StreamExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(result *sqltypes.Result) error) error

// Shard-level functions.
ExecuteMultiShard(ctx context.Context, primitive Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error)
Expand Down Expand Up @@ -109,9 +115,6 @@ type (

// ReleaseLock releases all the held advisory locks.
ReleaseLock(ctx context.Context) error

// StreamExecutePrimitiveStandalone executes the primitive in its own new autocommit session.
StreamExecutePrimitiveStandalone(ctx context.Context, primitive Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(result *sqltypes.Result) error) error
}

// SessionActions gives primitives ability to interact with the session state
Expand Down
21 changes: 19 additions & 2 deletions go/vt/vtgate/engine/vindex_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,17 @@ func (vr *VindexLookup) executeNonBatch(ctx context.Context, vcursor VCursor, id
bindVars := map[string]*querypb.BindVariable{
vr.Arguments[0]: vars,
}
result, err := vcursor.ExecutePrimitive(ctx, vr.Lookup, bindVars, false)

var result *sqltypes.Result
if vr.Vindex.AutoCommitEnabled() {
result, err = vcursor.ExecutePrimitiveStandalone(ctx, vr.Lookup, bindVars, false)
} else {
result, err = vcursor.ExecutePrimitive(ctx, vr.Lookup, bindVars, false)
}
if err != nil {
return nil, err
}

rows := make([][]sqltypes.Value, 0, len(result.Rows))
for _, row := range result.Rows {
rows = append(rows, []sqltypes.Value{row[1]})
Expand All @@ -212,7 +219,17 @@ func (vr *VindexLookup) executeBatch(ctx context.Context, vcursor VCursor, ids [
bindVars := map[string]*querypb.BindVariable{
vr.Arguments[0]: vars,
}
result, err := vcursor.ExecutePrimitive(ctx, vr.Lookup, bindVars, false)

var result *sqltypes.Result
if vr.Vindex.AutoCommitEnabled() {
result, err = vcursor.ExecutePrimitiveStandalone(ctx, vr.Lookup, bindVars, false)
} else {
result, err = vcursor.ExecutePrimitive(ctx, vr.Lookup, bindVars, false)
}
if err != nil {
return nil, err
}

if err != nil {
return nil, vterrors.Wrapf(err, "failed while running the lookup query")
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (*nameLkpIndex) Cost() int { return 3 }
func (*nameLkpIndex) IsUnique() bool { return false }
func (*nameLkpIndex) NeedsVCursor() bool { return false }
func (*nameLkpIndex) AllowBatch() bool { return true }
func (*nameLkpIndex) AutoCommitEnabled() bool { return false }
func (*nameLkpIndex) GetCommitOrder() vtgatepb.CommitOrder { return vtgatepb.CommitOrder_NORMAL }
func (*nameLkpIndex) Verify(context.Context, vindexes.VCursor, []sqltypes.Value, [][]byte) ([]bool, error) {
return []bool{}, nil
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,19 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available")
}

func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
// clone the vcursorImpl with a new session.
newVC := vc.cloneWithAutocommitSession()
for try := 0; try < MaxBufferingRetries; try++ {
res, err := primitive.TryExecute(ctx, newVC, bindVars, wantfields)
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
continue
}
return res, err
}
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available")
}

func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
for try := 0; try < MaxBufferingRetries; try++ {
err := primitive.TryStreamExecute(ctx, vc, bindVars, wantfields, callback)
Expand Down Expand Up @@ -1063,6 +1076,7 @@ func (vc *vcursorImpl) ReleaseLock(ctx context.Context) error {

func (vc *vcursorImpl) cloneWithAutocommitSession() *vcursorImpl {
safeSession := NewAutocommitSession(vc.safeSession.Session)
safeSession.logging = vc.safeSession.logging
return &vcursorImpl{
safeSession: safeSession,
keyspace: vc.keyspace,
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (lu *ConsistentLookup) AllowBatch() bool {
return lu.lkp.BatchLookup
}

func (lu *ConsistentLookup) AutoCommitEnabled() bool {
return lu.lkp.Autocommit
}

//====================================================================

// ConsistentLookupUnique defines a vindex that uses a lookup table.
Expand Down Expand Up @@ -240,6 +244,10 @@ func (lu *ConsistentLookupUnique) AllowBatch() bool {
return lu.lkp.BatchLookup
}

func (lu *ConsistentLookupUnique) AutoCommitEnabled() bool {
return lu.lkp.Autocommit
}

//====================================================================

// clCommon defines a vindex that uses a lookup table.
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/vindexes/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (ln *LookupNonUnique) AllowBatch() bool {
return ln.lkp.BatchLookup
}

func (ln *LookupNonUnique) AutoCommitEnabled() bool {
return ln.lkp.Autocommit
}

// String returns the name of the vindex.
func (ln *LookupNonUnique) String() string {
return ln.name
Expand Down Expand Up @@ -233,6 +237,10 @@ func (lu *LookupUnique) AllowBatch() bool {
return lu.lkp.BatchLookup
}

func (lu *LookupUnique) AutoCommitEnabled() bool {
return lu.lkp.Autocommit
}

// NewLookupUnique creates a LookupUnique vindex.
// The supplied map has the following required fields:
//
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/vindexes/lookup_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (lh *LookupHash) AllowBatch() bool {
return lh.lkp.BatchLookup
}

func (lh *LookupHash) AutoCommitEnabled() bool {
return lh.lkp.Autocommit
}

// GetCommitOrder implements the LookupPlanable interface
func (lh *LookupHash) GetCommitOrder() vtgatepb.CommitOrder {
return vtgatepb.CommitOrder_NORMAL
Expand Down Expand Up @@ -403,6 +407,10 @@ func (lhu *LookupHashUnique) AllowBatch() bool {
return lhu.lkp.BatchLookup
}

func (lhu *LookupHashUnique) AutoCommitEnabled() bool {
return lhu.lkp.Autocommit
}

func (lhu *LookupHashUnique) Query() (selQuery string, arguments []string) {
return lhu.lkp.query()
}
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ func (lh *LookupUnicodeLooseMD5Hash) AllowBatch() bool {
return lh.lkp.BatchLookup
}

func (lh *LookupUnicodeLooseMD5Hash) AutoCommitEnabled() bool {
return lh.lkp.Autocommit
}

// GetCommitOrder implements the LookupPlanable interface
func (lh *LookupUnicodeLooseMD5Hash) GetCommitOrder() vtgatepb.CommitOrder {
return vtgatepb.CommitOrder_NORMAL
Expand Down Expand Up @@ -402,6 +406,10 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) AllowBatch() bool {
return lhu.lkp.BatchLookup
}

func (lhu *LookupUnicodeLooseMD5HashUnique) AutoCommitEnabled() bool {
return lhu.lkp.Autocommit
}

// GetCommitOrder implements the LookupPlanable interface
func (lhu *LookupUnicodeLooseMD5HashUnique) GetCommitOrder() vtgatepb.CommitOrder {
return vtgatepb.CommitOrder_NORMAL
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/vindexes/vindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type (
MapResult(ids []sqltypes.Value, results []*sqltypes.Result) ([]key.Destination, error)
AllowBatch() bool
GetCommitOrder() vtgatepb.CommitOrder
AutoCommitEnabled() bool
}

// LookupBackfill interfaces all lookup vindexes that can backfill rows, such as LookupUnique.
Expand Down

0 comments on commit ee0670f

Please sign in to comment.