Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Dec 18, 2024
1 parent 6c1a3d2 commit 2fbe771
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 24 deletions.
3 changes: 3 additions & 0 deletions go/sqltypes/proto3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sqltypes

import (
"google.golang.org/protobuf/proto"
"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/vterrors"

Expand Down Expand Up @@ -131,9 +132,11 @@ func Proto3ToResult(qr *querypb.QueryResult) *Result {
// takes a separate fields input because not all QueryResults contain the field info.
// In particular, only the first packet of streaming queries contain the field info.
func CustomProto3ToResult(fields []*querypb.Field, qr *querypb.QueryResult) *Result {
log.Info("Building Proto to Result")
if qr == nil {
return nil
}
log.Info("Result: %v, %v", qr.InsertId, qr.InsertIdChanged)
return &Result{
Fields: qr.Fields,
RowsAffected: qr.RowsAffected,
Expand Down
20 changes: 10 additions & 10 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestCast(t *testing.T) {
func TestSetAndGetLastInsertID(t *testing.T) {
notZero := 1
checkQuery := func(i string, workload string, tx bool, mcmp utils.MySQLCompare) {
for _, val := range []int{0, notZero} {
for _, val := range []int{notZero, 0, notZero + 99} {
query := fmt.Sprintf(i, val)
name := fmt.Sprintf("%s - %s", workload, query)
if tx {
Expand All @@ -154,18 +154,18 @@ func TestSetAndGetLastInsertID(t *testing.T) {
}

queries := []string{
"select last_insert_id(%d)",
// "select last_insert_id(%d)",
"select last_insert_id(%d), id1, id2 from t1 limit 1",
"select last_insert_id(%d), id1, id2 from t1 where 1 = 2",
"select 12 from t1 where last_insert_id(%d)",
"update t1 set id2 = last_insert_id(%d) where id1 = 1",
"update t1 set id2 = last_insert_id(%d) where id1 = 2",
"update t1 set id2 = 88 where id1 = last_insert_id(%d)",
"delete from t1 where id1 = last_insert_id(%d)",
// "select last_insert_id(%d), id1, id2 from t1 where 1 = 2",
// "select 12 from t1 where last_insert_id(%d)",
// "update t1 set id2 = last_insert_id(%d) where id1 = 1",
// "update t1 set id2 = last_insert_id(%d) where id1 = 2",
// "update t1 set id2 = 88 where id1 = last_insert_id(%d)",
// "delete from t1 where id1 = last_insert_id(%d)",
}

for _, workload := range []string{"olap", "oltp"} {
for _, tx := range []bool{true, false} {
for _, workload := range []string{"olap"} {
for _, tx := range []bool{false} {
mcmp, closer := start(t)
_, err := mcmp.VtConn.ExecuteFetch(fmt.Sprintf("set workload = %s", workload), 1000, false)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func (stc *ScatterConn) StreamExecuteMulti(
if reply != nil {
resultsObserver.Observe(reply)
}
log.Infof("result received: %v, %v", reply.InsertID, reply.InsertIDChanged)
return callback(reply)
}
allErrors := stc.multiGoTransaction(
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"google.golang.org/grpc"
"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
Expand Down Expand Up @@ -65,6 +66,7 @@ func (q *query) StreamExecute(request *querypb.StreamExecuteRequest, stream quer
request.ImmediateCallerId,
)
err = q.server.StreamExecute(ctx, request.Target, request.Query.Sql, request.Query.BindVariables, request.TransactionId, request.ReservedId, request.Options, func(reply *sqltypes.Result) error {
log.Infof("StreamExecute: (%v, %v)", reply.InsertID, reply.InsertIDChanged)
return stream.Send(&querypb.StreamExecuteResponse{
Result: sqltypes.ResultToProto3(reply),
})
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/spf13/pflag"
"google.golang.org/grpc"
"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -187,6 +188,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb.
if fields == nil {
fields = ser.Result.Fields
}
log.Infof("StreamExecute result: %v, %v, %v", query, ser.Result.InsertId, ser.Result.InsertIdChanged)
if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil {
if err == io.EOF {
return nil
Expand Down
45 changes: 31 additions & 14 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,16 +1202,8 @@ func (qre *QueryExecutor) fetchLastInsertID(ctx context.Context, conn *connpool.

func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction bool, sql string, callback func(*sqltypes.Result) error) error {
span, ctx := trace.NewSpan(qre.ctx, "QueryExecutor.execStreamSQL")
defer span.Finish()
trace.AnnotateSQL(span, sqlparser.Preview(sql))
callBackClosingSpan := func(result *sqltypes.Result) error {
defer span.Finish()

// if err := qre.fetchLastInsertID(ctx, conn.Conn, result); err != nil {
// return err
// }

return callback(result)
}

start := time.Now()
defer qre.logStats.AddRewrittenSQL(sql, start)
Expand All @@ -1222,17 +1214,27 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction
// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
// once their grace period is over.
qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn)
// if err := qre.resetLastInsertIDIfNeeded(ctx, conn.Conn); err != nil {
// return err
// }
log.Infof("Fetch Last Insert ID: %v, sql: %v", qre.options.GetFetchLastInsertId(), sql)

if err := qre.resetLastInsertIDIfNeeded(ctx, conn.Conn); err != nil {
return err
}

lastInsertIDSet := false
cb := func(result *sqltypes.Result) error {
if result != nil && result.InsertID != 0 {
lastInsertIDSet = true
}
return callback(result)
}

if isTransaction {
err := qre.tsv.statefulql.Add(qd)
if err != nil {
return err
}
defer qre.tsv.statefulql.Remove(qd)
err = conn.Conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options))
err = conn.Conn.StreamOnce(ctx, sql, cb, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options))
if err != nil {
return err
}
Expand All @@ -1243,7 +1245,22 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction
return err
}
defer qre.tsv.olapql.Remove(qd)
return conn.Conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options))
err = conn.Conn.Stream(ctx, sql, cb, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options))
log.Infof("streaming complete")
if err != nil || lastInsertIDSet || !qre.options.GetFetchLastInsertId() {
return err
}
log.Infof("checking for last insert id change")
res := &sqltypes.Result{}
if err = qre.fetchLastInsertID(ctx, conn.Conn, res); err != nil {
return err
}
if res.InsertIDChanged {
log.Infof("done callback with values: (%v, %v)", res.InsertID, res.InsertIDChanged)
return callback(res)
}
log.Info("no changes for last insert ID")
return nil
}

func (qre *QueryExecutor) recordUserQuery(queryType string, duration int64) {
Expand Down

0 comments on commit 2fbe771

Please sign in to comment.