Skip to content

Commit

Permalink
Change Autocommit Query Plan (#983)
Browse files Browse the repository at this point in the history
  • Loading branch information
VinaiRachakonda authored Apr 30, 2022
1 parent e5d4752 commit 7028950
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 151 deletions.
146 changes: 5 additions & 141 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (e *Engine) QueryNodeWithBindings(
return nil, nil, err
}

transactionDatabase, err := e.beginTransaction(ctx, analyzed)
_, err = e.beginTransaction(ctx, analyzed)
if err != nil {
return nil, nil, err
}
Expand All @@ -203,19 +203,6 @@ func (e *Engine) QueryNodeWithBindings(
return nil, nil, err
}

autoCommit, err := isSessionAutocommit(ctx)
if err != nil {
return nil, nil, err
}

if autoCommit {
iter = transactionCommittingIter{
childIter: iter,
childIter2: iter2,
transactionDatabase: transactionDatabase,
}
}

if useIter2 {
iter = rowFormatSelectorIter{
iter: iter,
Expand Down Expand Up @@ -408,19 +395,13 @@ func (t rowFormatSelectorIter) IsNode2() bool {
}

const (
fakeReadCommittedEnvVar = "READ_COMMITTED_HACK"
enableIter2EnvVar = "ENABLE_ROW_ITER_2"
enableIter2EnvVar = "ENABLE_ROW_ITER_2"
)

var fakeReadCommitted bool
var enableRowIter2 bool

func init() {
_, ok := os.LookupEnv(fakeReadCommittedEnvVar)
if ok {
fakeReadCommitted = true
}
_, ok = os.LookupEnv(enableIter2EnvVar)
_, ok := os.LookupEnv(enableIter2EnvVar)
if ok {
enableRowIter2 = true
}
Expand All @@ -429,10 +410,10 @@ func init() {
func (e *Engine) beginTransaction(ctx *sql.Context, parsed sql.Node) (string, error) {
// Before we begin a transaction, we need to know if the database being operated on is not the one
// currently selected
transactionDatabase := getTransactionDatabase(ctx, parsed)
transactionDatabase := analyzer.GetTransactionDatabase(ctx, parsed)

// TODO: this won't work with transactions that cross database boundaries, we need to detect that and error out
beginNewTransaction := ctx.GetTransaction() == nil || readCommitted(ctx)
beginNewTransaction := ctx.GetTransaction() == nil || plan.ReadCommitted(ctx)
if beginNewTransaction {
ctx.GetLogger().Tracef("beginning new transaction")
if len(transactionDatabase) > 0 {
Expand Down Expand Up @@ -474,123 +455,6 @@ func (e *Engine) WithBackgroundThreads(b *sql.BackgroundThreads) *Engine {
return e
}

// Returns whether this session has a transaction isolation level of READ COMMITTED.
// If so, we always begin a new transaction for every statement, and commit after every statement as well.
// This is not what the READ COMMITTED isolation level is supposed to do.
func readCommitted(ctx *sql.Context) bool {
if !fakeReadCommitted {
return false
}

val, err := ctx.GetSessionVariable(ctx, "transaction_isolation")
if err != nil {
return false
}

valStr, ok := val.(string)
if !ok {
return false
}

return valStr == "READ-COMMITTED"
}

// transactionCommittingIter is a simple RowIter wrapper to allow the engine to conditionally commit a transaction
// during the Close() operation
type transactionCommittingIter struct {
childIter sql.RowIter
childIter2 sql.RowIter2
transactionDatabase string
}

func (t transactionCommittingIter) Next(ctx *sql.Context) (sql.Row, error) {
return t.childIter.Next(ctx)
}

func (t transactionCommittingIter) Next2(ctx *sql.Context, frame *sql.RowFrame) error {
return t.childIter2.Next2(ctx, frame)
}

func (t transactionCommittingIter) Close(ctx *sql.Context) error {
err := t.childIter.Close(ctx)
if err != nil {
return err
}

tx := ctx.GetTransaction()
commitTransaction := (tx != nil) && !ctx.GetIgnoreAutoCommit()
if commitTransaction {
ctx.GetLogger().Tracef("committing transaction %s", tx)
if err := ctx.Session.CommitTransaction(ctx, t.transactionDatabase, tx); err != nil {
return err
}

// Clearing out the current transaction will tell us to start a new one the next time this session queries
ctx.SetTransaction(nil)
}

return nil
}

func isSessionAutocommit(ctx *sql.Context) (bool, error) {
if readCommitted(ctx) {
return true, nil
}

autoCommitSessionVar, err := ctx.GetSessionVariable(ctx, sql.AutoCommitSessionVar)
if err != nil {
return false, err
}
return sql.ConvertToBool(autoCommitSessionVar)
}

// getDbHelper returns the first database name from a table-like node
func getDbHelper(tables ...sql.Node) string {
if len(tables) == 0 {
return ""
}
switch t := tables[0].(type) {
case *plan.UnresolvedTable:
return t.Database()
case *plan.ResolvedTable:
return t.Database.Name()
case *plan.IndexedTableAccess:
return t.Database().Name()
default:
}
return ""
}

// getTransactionDatabase returns the name of the database that should be considered current for the transaction about
// to begin. The database is not guaranteed to exist.
// For USE DATABASE statements, we consider the transaction database to be the one being USEd
func getTransactionDatabase(ctx *sql.Context, parsed sql.Node) string {
var dbName string
switch n := parsed.(type) {
case *plan.QueryProcess, *plan.RowUpdateAccumulator:
return getTransactionDatabase(ctx, n.(sql.UnaryNode).Child())
case *plan.Use, *plan.CreateProcedure, *plan.DropProcedure, *plan.CreateTrigger, *plan.DropTrigger,
*plan.CreateTable, *plan.InsertInto, *plan.AlterIndex, *plan.AlterAutoIncrement, *plan.AlterPK,
*plan.DropColumn, *plan.RenameColumn, *plan.ModifyColumn:
database := n.(sql.Databaser).Database()
if database != nil {
dbName = database.Name()
}
case *plan.DropForeignKey, *plan.DropIndex, *plan.CreateIndex, *plan.Update, *plan.DeleteFrom,
*plan.CreateForeignKey:
dbName = n.(sql.Databaseable).Database()
case *plan.DropTable:
dbName = getDbHelper(n.Tables...)
case *plan.Truncate:
dbName = getDbHelper(n.Child)
default:
}
if dbName != "" {
return dbName
}
return ctx.GetCurrentDatabase()
}

// readOnlyCheck checks to see if the query is valid with the modification setting of the engine.
func (e *Engine) readOnlyCheck(node sql.Node) error {
if plan.IsDDLNode(node) && e.IsReadOnly {
Expand Down
1 change: 1 addition & 0 deletions enginetest/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func TestAnalyzer(t *testing.T) {
require.NoError(t, err)

analyzed, err := e.Analyzer.Analyze(ctx, parsed, nil)
analyzed = analyzer.StripPassthroughNodes(analyzed)
if tt.err != nil {
require.Error(t, err)
assert.True(t, tt.err.Is(err))
Expand Down
36 changes: 36 additions & 0 deletions enginetest/enginetests.go
Original file line number Diff line number Diff line change
Expand Up @@ -5086,6 +5086,42 @@ func TestUse(t *testing.T, harness Harness) {
require.Equal("foo", ctx.GetCurrentDatabase())
}

// TestConcurrentTransactions tests that two concurrent processes/transactions can successfully execute without early
// cancellation.
func TestConcurrentTransactions(t *testing.T, harness Harness) {
require := require.New(t)
e := NewEngine(t, harness)
defer e.Close()

RunQuery(t, e, harness, `CREATE TABLE a (x int primary key, y int)`)

clientSessionA := NewSession(harness)
clientSessionA.ProcessList = sqle.NewProcessList()

clientSessionB := NewSession(harness)
clientSessionB.ProcessList = sqle.NewProcessList()

var err error
// We want to add the query to the process list to represent the full workflow.
clientSessionA, err = clientSessionA.ProcessList.AddProcess(clientSessionA, "INSERT INTO a VALUES (1,1)")
require.NoError(err)
sch, iter, err := e.Query(clientSessionA, "INSERT INTO a VALUES (1,1)")
require.NoError(err)

clientSessionB, err = clientSessionB.ProcessList.AddProcess(clientSessionB, "INSERT INTO a VALUES (2,2)")
require.NoError(err)
sch2, iter2, err := e.Query(clientSessionB, "INSERT INTO a VALUES (2,2)")
require.NoError(err)

rows, err := sql.RowIterToRows(clientSessionA, sch, iter)
require.NoError(err)
require.Len(rows, 1)

rows, err = sql.RowIterToRows(clientSessionB, sch2, iter2)
require.NoError(err)
require.Len(rows, 1)
}

func TestNoDatabaseSelected(t *testing.T, harness Harness) {
e := NewEngine(t, harness)
defer e.Close()
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474/go.mod h1:kMz7uXOXq4qRriCEyZ/LUeTqraLJCjf0WVZcUi6TxUY=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
github.com/dolthub/vitess v0.0.0-20220404164553-6944b106018c h1:D0U6mDvtWx4NCikNZVMHVywQbMhGEiZMw3SQnuhEYPI=
github.com/dolthub/vitess v0.0.0-20220404164553-6944b106018c/go.mod h1:jxgvpEvrTNw2i4BKlwT75E775eUXBeMv5MPeQkIb9zI=
github.com/dolthub/vitess v0.0.0-20220411205612-30b631cfdaac h1:I/1wR3fAHtLbLuIbn/QW/2nIT9xhiNoMCErfKvn4JvQ=
github.com/dolthub/vitess v0.0.0-20220411205612-30b631cfdaac/go.mod h1:jxgvpEvrTNw2i4BKlwT75E775eUXBeMv5MPeQkIb9zI=
github.com/dolthub/vitess v0.0.0-20220420162151-fd557659d30b h1:XuO7XrCELtcgeyoLWmpKt2BNp/K2Eyk9MNjx2FOYH0s=
github.com/dolthub/vitess v0.0.0-20220420162151-fd557659d30b/go.mod h1:jxgvpEvrTNw2i4BKlwT75E775eUXBeMv5MPeQkIb9zI=
github.com/dolthub/vitess v0.0.0-20220428202705-021fca64b77f h1:l5d/qhXXz/s/Oun9Ex4Xd7xFU9DCGZ8H9QlI/3Z2Qys=
Expand Down
4 changes: 3 additions & 1 deletion sql/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func prePrepareRuleSelector(id RuleId) bool {
case resolvePreparedInsertId,
insertTopNId,
inSubqueryIndexesId,
AutocommitId,
TrackProcessId,
parallelizeId,
clearWarningsId,
Expand Down Expand Up @@ -434,7 +435,7 @@ func postPrepareRuleSelector(id RuleId) bool {
subqueryIndexesId,
inSubqueryIndexesId,
resolvePreparedInsertId,

AutocommitId,
TrackProcessId,
parallelizeId,
clearWarningsId:
Expand Down Expand Up @@ -467,6 +468,7 @@ func postPrepareInsertSourceRuleSelector(id RuleId) bool {
inSubqueryIndexesId,
resolveInsertRowsId,

AutocommitId,
TrackProcessId,
parallelizeId,
clearWarningsId:
Expand Down
98 changes: 98 additions & 0 deletions sql/analyzer/autocommit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package analyzer

import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/plan"
"github.com/dolthub/go-mysql-server/sql/transform"
)

// addAutocommitNode wraps each query with a TransactionCommittingNode.
func addAutocommitNode(ctx *sql.Context, a *Analyzer, n sql.Node, scope *Scope, sel RuleSelector) (sql.Node, transform.TreeIdentity, error) {
if !n.Resolved() {
return n, transform.SameTree, nil
}

transactionDatabase := GetTransactionDatabase(ctx, n)

// TODO: This is a bit of a hack. Need to figure out better relationship between new transaction node and warnings.
if hasShowWarningsNode(n) {
return n, transform.SameTree, nil
}

return plan.NewTransactionCommittingNode(n, transactionDatabase), transform.NewTree, nil
}

func hasShowWarningsNode(n sql.Node) bool {
var ret bool
transform.Inspect(n, func(n sql.Node) bool {
if _, ok := n.(plan.ShowWarnings); ok {
ret = true
return false
}

return true
})

return ret
}

// GetTransactionDatabase returns the name of the database that should be considered current for the transaction about
// to begin. The database is not guaranteed to exist.
// For USE DATABASE statements, we consider the transaction database to be the one being USEd
func GetTransactionDatabase(ctx *sql.Context, parsed sql.Node) string {
var dbName string
switch n := parsed.(type) {
case *plan.QueryProcess, *plan.TransactionCommittingNode, *plan.RowUpdateAccumulator:
return GetTransactionDatabase(ctx, n.(sql.UnaryNode).Child())
case *plan.Use, *plan.CreateProcedure, *plan.DropProcedure, *plan.CreateTrigger, *plan.DropTrigger,
*plan.CreateTable, *plan.InsertInto, *plan.AlterIndex, *plan.AlterAutoIncrement, *plan.AlterPK,
*plan.DropColumn, *plan.RenameColumn, *plan.ModifyColumn:
database := n.(sql.Databaser).Database()
if database != nil {
dbName = database.Name()
}
case *plan.DropForeignKey, *plan.DropIndex, *plan.CreateIndex, *plan.Update, *plan.DeleteFrom,
*plan.CreateForeignKey:
dbName = n.(sql.Databaseable).Database()
case *plan.DropTable:
dbName = getDbHelper(n.Tables...)
case *plan.Truncate:
dbName = getDbHelper(n.Child)
default:
}
if dbName != "" {
return dbName
}
return ctx.GetCurrentDatabase()
}

// getDbHelper returns the first database name from a table-like node
func getDbHelper(tables ...sql.Node) string {
if len(tables) == 0 {
return ""
}
switch t := tables[0].(type) {
case *plan.UnresolvedTable:
return t.Database()
case *plan.ResolvedTable:
return t.Database.Name()
case *plan.IndexedTableAccess:
return t.Database().Name()
default:
}
return ""
}
4 changes: 4 additions & 0 deletions sql/analyzer/parallelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func shouldParallelize(node sql.Node, scope *Scope) bool {
return false
}

if tc, ok := node.(*plan.TransactionCommittingNode); ok {
return shouldParallelize(tc.Child(), scope)
}

// Do not try to parallelize DDL or descriptive operations
return !plan.IsNoRowNode(node)
}
Expand Down
2 changes: 1 addition & 1 deletion sql/analyzer/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestTrackProcessSubquery(t *testing.T) {

func withoutProcessTracking(a *Analyzer) *Analyzer {
afterAll := a.Batches[len(a.Batches)-1]
afterAll.Rules = afterAll.Rules[1:]
afterAll.Rules = afterAll.Rules[2:]
return a
}

Expand Down
Loading

0 comments on commit 7028950

Please sign in to comment.