Skip to content

Commit

Permalink
Merge pull request #7704 from dolthub/nicktobey/autoinclock
Browse files Browse the repository at this point in the history
Implement `traditional` auto-increment lock mode hold the lock for the duration of the insert iter.
  • Loading branch information
nicktobey authored Apr 11, 2024
2 parents 8f7caa2 + 19c89b3 commit 5d5847f
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 33 deletions.
2 changes: 1 addition & 1 deletion go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3
github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb
github.com/dolthub/swiss v0.1.0
github.com/goccy/go-json v0.10.2
github.com/google/go-github/v57 v57.0.0
Expand Down
4 changes: 2 additions & 2 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168=
github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3 h1:i48WY1tLEbvbN4PWiozfC7CzuuOw9or4tZedIWwxWE0=
github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3/go.mod h1:uTqIti1oPqKEUS9N1zcT43a/QQ8n0PuBdZUMZziBaGU=
github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb h1:AN5zMcStorSl9r87Wkvz1aFN57VBobiofbRzxnAqh5o=
github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb/go.mod h1:uTqIti1oPqKEUS9N1zcT43a/QQ8n0PuBdZUMZziBaGU=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto=
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 h1:bMGS25NWAGTEtT5tOBsCuCrlYnLRKpbJVJkDbrTRhwQ=
Expand Down
36 changes: 29 additions & 7 deletions go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,28 @@ import (
"github.com/dolthub/dolt/go/store/types"
)

type LockMode int64

var (
LockMode_Traditional LockMode = 0
LockMode_Concurret LockMode = 1
LockMode_Interleaved LockMode = 2
)

type AutoIncrementTracker struct {
dbName string
sequences *sync.Map // map[string]uint64
mm *mutexmap.MutexMap
lockMode LockMode
}

var _ globalstate.AutoIncrementTracker = AutoIncrementTracker{}
var _ globalstate.AutoIncrementTracker = &AutoIncrementTracker{}

// NewAutoIncrementTracker returns a new autoincrement tracker for the roots given. All roots sets must be
// considered because the auto increment value for a table is tracked globally, across all branches.
// Roots provided should be the working sets when available, or the branches when they are not (e.g. for remote
// branches that don't have a local working set)
func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb.Rootish) (AutoIncrementTracker, error) {
func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb.Rootish) (*AutoIncrementTracker, error) {
ait := AutoIncrementTracker{
dbName: dbName,
sequences: &sync.Map{},
Expand All @@ -56,7 +65,7 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb
for _, root := range roots {
root, err := root.ResolveRootValue(ctx)
if err != nil {
return AutoIncrementTracker{}, err
return &AutoIncrementTracker{}, err
}

err = root.IterTables(ctx, func(tableName string, table *doltdb.Table, sch schema.Schema) (bool, error) {
Expand All @@ -81,10 +90,11 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb
})

if err != nil {
return AutoIncrementTracker{}, err
return &AutoIncrementTracker{}, err
}
}
return ait, nil

return &ait, nil
}

func loadAutoIncValue(sequences *sync.Map, tableName string) uint64 {
Expand All @@ -111,8 +121,10 @@ func (a AutoIncrementTracker) Next(tbl string, insertVal interface{}) (uint64, e
return 0, err
}

release := a.mm.Lock(tbl)
defer release()
if a.lockMode == LockMode_Interleaved {
release := a.mm.Lock(tbl)
defer release()
}

curr := loadAutoIncValue(a.sequences, tbl)

Expand Down Expand Up @@ -406,3 +418,13 @@ func (a AutoIncrementTracker) DropTable(ctx *sql.Context, tableName string, wses

return nil
}

func (a *AutoIncrementTracker) AcquireTableLock(ctx *sql.Context, tableName string) (func(), error) {
_, i, _ := sql.SystemVariables.GetGlobal("innodb_autoinc_lock_mode")
lockMode := LockMode(i.(int64))
if lockMode == LockMode_Interleaved {
panic("Attempted to acquire AutoInc lock for entire insert operation, but lock mode was set to Interleaved")
}
a.lockMode = lockMode
return a.mm.Lock(tableName), nil
}
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/dsess/globalstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol
}

type GlobalStateImpl struct {
aiTracker AutoIncrementTracker
aiTracker globalstate.AutoIncrementTracker
mu *sync.Mutex
}

Expand Down
24 changes: 16 additions & 8 deletions go/libraries/doltcore/sqle/dsess/mutexmap/mutexmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,26 @@ func NewMutexMap() *MutexMap {

func (mm *MutexMap) Lock(key interface{}) func() {
mm.mu.Lock()
defer mm.mu.Unlock()

keyedMutex, hasKey := mm.keyedMutexes[key]
if !hasKey {
keyedMutex = &mapMutex{parent: mm, key: key}
mm.keyedMutexes[key] = keyedMutex
}
keyedMutex.refcount++
var keyedMutex *mapMutex
func() {
// We must release the parent lock before attempting to acquire the child lock, otherwise if the child lock
// is currently held it will never be released.
defer mm.mu.Unlock()
var hasKey bool
keyedMutex, hasKey = mm.keyedMutexes[key]
if !hasKey {
keyedMutex = &mapMutex{parent: mm, key: key}
mm.keyedMutexes[key] = keyedMutex
}
keyedMutex.refcount++
}()

keyedMutex.mu.Lock()

return func() { keyedMutex.Unlock() }
return func() {
keyedMutex.Unlock()
}
}

func (mm *mapMutex) Unlock() {
Expand Down
121 changes: 121 additions & 0 deletions go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package enginetest
import (
"context"
"fmt"
"io"
"os"
"runtime"
"sync"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/dolthub/go-mysql-server/sql/memo"
"github.com/dolthub/go-mysql-server/sql/mysql_db"
"github.com/dolthub/go-mysql-server/sql/plan"
"github.com/dolthub/go-mysql-server/sql/transform"
gmstypes "github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/vitess/go/mysql"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -195,6 +197,125 @@ func newUpdateResult(matched, updated int) gmstypes.OkResult {
}
}

func TestAutoIncrementTrackerLockMode(t *testing.T) {
for _, lockMode := range []int64{0, 1, 2} {
t.Run(fmt.Sprintf("lock mode %d", lockMode), func(t *testing.T) {
testAutoIncrementTrackerWithLockMode(t, lockMode)
})
}
}

// testAutoIncrementTrackerWithLockMode tests that interleaved inserts don't cause deadlocks, regardless of the value of innodb_autoinc_lock_mode.
// In a real use case, these interleaved operations would be happening in different sessions on different threads.
// In order to make the test behave predictably, we manually interleave the two iterators.
func testAutoIncrementTrackerWithLockMode(t *testing.T, lockMode int64) {

err := sql.SystemVariables.AssignValues(map[string]interface{}{"innodb_autoinc_lock_mode": lockMode})
require.NoError(t, err)

setupScripts := []setup.SetupScript{[]string{
"CREATE TABLE test1 (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,c0 int,index t1_c_index (c0));",
"CREATE TABLE test2 (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,c0 int,index t2_c_index (c0));",
"CREATE TABLE timestamps (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT, t int);",
"CREATE TRIGGER t1 AFTER INSERT ON test1 FOR EACH ROW INSERT INTO timestamps VALUES (0, 1);",
"CREATE TRIGGER t2 AFTER INSERT ON test2 FOR EACH ROW INSERT INTO timestamps VALUES (0, 2);",
"CREATE VIEW bin AS SELECT 0 AS v UNION ALL SELECT 1;",
"CREATE VIEW sequence5bit AS SELECT b1.v + 2*b2.v + 4*b3.v + 8*b4.v + 16*b5.v AS v from bin b1, bin b2, bin b3, bin b4, bin b5;",
}}

harness := newDoltHarness(t)
defer harness.Close()
harness.Setup(setup.MydbData, setupScripts)
e := mustNewEngine(t, harness)

defer e.Close()
ctx := enginetest.NewContext(harness)

// Confirm that the system variable was correctly set.
_, iter, err := e.Query(ctx, "select @@innodb_autoinc_lock_mode")
require.NoError(t, err)
rows, err := sql.RowIterToRows(ctx, iter)
require.NoError(t, err)
assert.Equal(t, rows, []sql.Row{{lockMode}})

// Ordinarily QueryEngine.query manages transactions.
// Since we can't use that for this test, we manually start a new transaction.
ts := ctx.Session.(sql.TransactionSession)
tx, err := ts.StartTransaction(ctx, sql.ReadWrite)
require.NoError(t, err)
ctx.SetTransaction(tx)

getTriggerIter := func(query string) sql.RowIter {
root, err := e.AnalyzeQuery(ctx, query)
require.NoError(t, err)

var triggerNode *plan.TriggerExecutor
transform.Node(root, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
if triggerNode != nil {
return n, transform.SameTree, nil
}
if t, ok := n.(*plan.TriggerExecutor); ok {
triggerNode = t
}
return n, transform.NewTree, nil
})
iter, err := e.EngineAnalyzer().ExecBuilder.Build(ctx, triggerNode, nil)
require.NoError(t, err)
return iter
}

iter1 := getTriggerIter("INSERT INTO test1 (c0) select v from sequence5bit;")
iter2 := getTriggerIter("INSERT INTO test2 (c0) select v from sequence5bit;")

// Alternate the iterators until they're exhausted.
var err1 error
var err2 error
for err1 != io.EOF || err2 != io.EOF {
if err1 != io.EOF {
var row1 sql.Row
require.NoError(t, err1)
row1, err1 = iter1.Next(ctx)
_ = row1
}
if err2 != io.EOF {
require.NoError(t, err2)
_, err2 = iter2.Next(ctx)
}
}
err = iter1.Close(ctx)
require.NoError(t, err)
err = iter2.Close(ctx)
require.NoError(t, err)

dsess.DSessFromSess(ctx.Session).CommitTransaction(ctx, ctx.GetTransaction())

// Verify that the inserts are seen by the engine.
{
_, iter, err := e.Query(ctx, "select count(*) from timestamps")
require.NoError(t, err)
rows, err := sql.RowIterToRows(ctx, iter)
require.NoError(t, err)
assert.Equal(t, rows, []sql.Row{{int64(64)}})
}

// Verify that the insert operations are actually interleaved by inspecting the order that values were added to `timestamps`
{
_, iter, err := e.Query(ctx, "select (select min(pk) from timestamps where t = 1) < (select max(pk) from timestamps where t = 2)")
require.NoError(t, err)
rows, err := sql.RowIterToRows(ctx, iter)
require.NoError(t, err)
assert.Equal(t, rows, []sql.Row{{true}})
}

{
_, iter, err := e.Query(ctx, "select (select min(pk) from timestamps where t = 2) < (select max(pk) from timestamps where t = 1)")
require.NoError(t, err)
rows, err := sql.RowIterToRows(ctx, iter)
require.NoError(t, err)
assert.Equal(t, rows, []sql.Row{{true}})
}
}

// Convenience test for debugging a single query. Unskip and set to the desired query.
func TestSingleMergeScript(t *testing.T) {
t.Skip()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ type AutoIncrementTracker interface {
// below the current value for this table. The table in the provided working set is assumed to already have the value
// given, so the new global maximum is computed without regard for its value in that working set.
Set(ctx *sql.Context, tableName string, table *doltdb.Table, ws ref.WorkingSetRef, newAutoIncVal uint64) (*doltdb.Table, error)

// AcquireTableLock acquires the auto increment lock on a table, and reutrns a callback function to release the lock.
// Depending on the value of the `innodb_autoinc_lock_mode` system variable, the engine may need to acquire and hold
// the lock for the duration of an insert statement.
AcquireTableLock(ctx *sql.Context, tableName string) (func(), error)
}
4 changes: 4 additions & 0 deletions go/libraries/doltcore/sqle/sqlutil/static_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (e *StaticErrorEditor) SetAutoIncrementValue(*sql.Context, uint64) error {
return e.err
}

func (e *StaticErrorEditor) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) {
return func() {}, nil
}

func (e *StaticErrorEditor) StatementBegin(ctx *sql.Context) {}

func (e *StaticErrorEditor) DiscardChanges(ctx *sql.Context, errorEncountered error) error {
Expand Down
4 changes: 4 additions & 0 deletions go/libraries/doltcore/sqle/writer/noms_table_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (te *nomsTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64) e
return te.flush(ctx)
}

func (te *nomsTableWriter) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) {
return te.autoInc.AcquireTableLock(ctx, te.tableName)
}

func (te *nomsTableWriter) IndexedAccess(i sql.IndexLookup) sql.IndexedTable {
idx := index.DoltIndexFromSqlIndex(i.Index)
return &nomsFkIndexer{
Expand Down
6 changes: 5 additions & 1 deletion go/libraries/doltcore/sqle/writer/prolly_table_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (w *prollyTableWriter) GetNextAutoIncrementValue(ctx *sql.Context, insertVa
return w.aiTracker.Next(w.tableName, insertVal)
}

// SetAutoIncrementValue implements TableWriter.
// SetAutoIncrementValue implements AutoIncrementSetter.
func (w *prollyTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64) error {
seq, err := w.aiTracker.CoerceAutoIncrementValue(val)
if err != nil {
Expand All @@ -220,6 +220,10 @@ func (w *prollyTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64)
return w.flush(ctx)
}

func (w *prollyTableWriter) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) {
return w.aiTracker.AcquireTableLock(ctx, w.tableName)
}

// Close implements Closer
func (w *prollyTableWriter) Close(ctx *sql.Context) error {
// We discard data changes in DiscardChanges, but this doesn't include schema changes, which we don't want to flush
Expand Down
Loading

0 comments on commit 5d5847f

Please sign in to comment.