Skip to content

Commit

Permalink
add unique and conflicting keys randomly to txs
Browse files Browse the repository at this point in the history
  • Loading branch information
wlawt committed Apr 16, 2024
1 parent b1bbf9e commit 09a732b
Showing 1 changed file with 170 additions and 29 deletions.
199 changes: 170 additions & 29 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
// Run several times to catch non-determinism
const numIterations = 10

func generateNumbers(start int, size int) []int {
array := make([]int, size)
func generateNumbers(start int) []int {
array := make([]int, 9999)
for i := 0; i < 9999; i++ {
array[i] = i + start
}
Expand Down Expand Up @@ -530,12 +530,12 @@ func TestTwoConflictKeys(t *testing.T) {
}
}

func TestLargeNumConcurrentReadTxs(t *testing.T) {
func TestLargeConcurrentRead(t *testing.T) {
rand.Seed(1)

var (
require = require.New(t)
numKeys = 5
numKeys = 10
numTxs = 100_000
conflictKeys = make([]string, 0, numKeys)
blocking = set.Set[int]{}
Expand All @@ -560,11 +560,29 @@ func TestLargeNumConcurrentReadTxs(t *testing.T) {

// Generate txs
for i := 0; i < numTxs; i++ {
// mix of unique and conflict keys in each tx
s := make(state.Keys, (numKeys + 1))

// all txs are concurrent Reads
for j := 0; j < numKeys; j++ {
s.Add(conflictKeys[j], state.Read)
// random size of conflict keys to add
setSize := rand.Intn(numKeys + 1) //nolint:gosec
randomConflictingKeys := set.NewSet[int](setSize) // indices of [conflictKeys]
for {
if randomConflictingKeys.Len() == setSize {
break
}
randomConflictingKeys.Add(rand.Intn(numKeys)) //nolint:gosec
}

// add the random keys to tx
for k := range randomConflictingKeys {
s.Add(conflictKeys[k], state.Read)
}

// fill in rest with unique keys that are Reads
// [remaining] can be 0 here
remaining := numKeys - setSize
for j := 0; j < remaining; j++ {
s.Add(ids.GenerateTestID().String(), state.Read)
}

// pass into executor
Expand Down Expand Up @@ -596,7 +614,7 @@ func TestLargeNumConcurrentReadTxs(t *testing.T) {
require.Len(completed, numTxs)
}

func TestLargeNumSequentialWritesTxs(t *testing.T) {
func TestLargeSequentialWrites(t *testing.T) {
rand.Seed(2)

var (
Expand All @@ -607,7 +625,6 @@ func TestLargeNumSequentialWritesTxs(t *testing.T) {
blocking = set.Set[int]{}
l sync.Mutex
completed = make([]int, 0, numTxs)
answer = make([]int, 0, numTxs)
e = New(numTxs, 4, nil)
slow = make(chan struct{})
)
Expand All @@ -627,12 +644,29 @@ func TestLargeNumSequentialWritesTxs(t *testing.T) {

// Generate txs
for i := 0; i < numTxs; i++ {
answer = append(answer, i)
// mix of unique and conflict keys in each tx
s := make(state.Keys, (numKeys + 1))

// all txs are sequential Writes
for j := 0; j < numKeys; j++ {
s.Add(conflictKeys[j], state.Write)
// random size of conflict keys to add
setSize := rand.Intn(numKeys + 1) //nolint:gosec
randomConflictingKeys := set.NewSet[int](setSize) // indices of [conflictKeys]
for {
if randomConflictingKeys.Len() == setSize {
break
}
randomConflictingKeys.Add(rand.Intn(numKeys)) //nolint:gosec
}

// add the random keys to tx
for k := range randomConflictingKeys {
s.Add(conflictKeys[k], state.Write)
}

// fill in rest with unique keys that are Writes
// [remaining] can be 0 here
remaining := numKeys - setSize
for j := 0; j < remaining; j++ {
s.Add(ids.GenerateTestID().String(), state.Write)
}

// pass into executor
Expand Down Expand Up @@ -661,12 +695,12 @@ func TestLargeNumSequentialWritesTxs(t *testing.T) {
close(slow)
require.NoError(e.Wait())
require.Len(completed, numTxs)
// Txs should be executed sequentially
require.Equal(answer, completed)
}

// This runs a little longer
func TestLargeNumReadsThenWritesTxs(t *testing.T) {
// This test adds all conflicting keys but tests over
// a large tx count, which is important to see if
// things break or not.
func TestLargeReadsThenWrites(t *testing.T) {
rand.Seed(3)

var (
Expand All @@ -683,11 +717,11 @@ func TestLargeNumReadsThenWritesTxs(t *testing.T) {
)

// make [answers] matrix
answers[0] = generateNumbers(10000, 9999)
answers[1] = generateNumbers(30000, 9999)
answers[2] = generateNumbers(50000, 9999)
answers[3] = generateNumbers(70000, 9999)
answers[4] = generateNumbers(90000, 9999)
answers[0] = generateNumbers(10000)
answers[1] = generateNumbers(30000)
answers[2] = generateNumbers(50000)
answers[3] = generateNumbers(70000)
answers[4] = generateNumbers(90000)

// randomly wait on certain txs
for {
Expand Down Expand Up @@ -748,8 +782,10 @@ func TestLargeNumReadsThenWritesTxs(t *testing.T) {
require.Equal(answers[4], completed[90000:99999])
}

// This runs a little longer
func TestLargeNumWritesThenReadsTxs(t *testing.T) {
// This test adds all conflicting keys but tests over
// a large tx count, which is important to see if
// things break or not.
func TestLargeWritesThenReads(t *testing.T) {
rand.Seed(4)

var (
Expand All @@ -766,11 +802,11 @@ func TestLargeNumWritesThenReadsTxs(t *testing.T) {
)

// make [answers] matrix
answers[0] = generateNumbers(0, 9999)
answers[1] = generateNumbers(20000, 9999)
answers[2] = generateNumbers(40000, 9999)
answers[3] = generateNumbers(60000, 9999)
answers[4] = generateNumbers(80000, 9999)
answers[0] = generateNumbers(0)
answers[1] = generateNumbers(20000)
answers[2] = generateNumbers(40000)
answers[3] = generateNumbers(60000)
answers[4] = generateNumbers(80000)

// randomly wait on certain txs
for {
Expand Down Expand Up @@ -830,3 +866,108 @@ func TestLargeNumWritesThenReadsTxs(t *testing.T) {
require.Equal(answers[3], completed[60000:69999])
require.Equal(answers[4], completed[80000:89999])
}

// Random mix of unique and conflicting keys. We just
// need to see if we don't halt in this test
func TestLargeRandomReadsAndWrites(t *testing.T) {
rand.Seed(5)

var (
require = require.New(t)
numKeys = 10
numTxs = 100_000
conflictKeys = make([]string, 0, numKeys)
blocking = set.Set[int]{}
l sync.Mutex
completed = make([]int, 0, numTxs)
e = New(numTxs, 4, nil)
slow = make(chan struct{})
)

// randomly wait on certain txs
for {
if blocking.Len() == 10000 {
break
}
blocking.Add(rand.Intn(100_000)) //nolint:gosec
}

// make the conflict keys
for k := 0; k < numKeys; k++ {
conflictKeys = append(conflictKeys, ids.GenerateTestID().String())
}

// Generate txs
for i := 0; i < numTxs; i++ {
// mix of unique and conflict keys in each tx
s := make(state.Keys, (numKeys + 1))

// random size of conflict keys to add
min := 1
max := 6
setSize := rand.Intn(max-min+1) + min //nolint:gosec
randomConflictingKeys := set.NewSet[int](setSize) // indices of [conflictKeys]
for {
if randomConflictingKeys.Len() == setSize {
break
}
randomConflictingKeys.Add(rand.Intn(numKeys)) //nolint:gosec
}

// randomly pick if these conflict keys are
// going to be Read/Write
conflictMode := rand.Intn(2) //nolint:gosec

// add the random keys to tx
for k := range randomConflictingKeys {
switch conflictMode {
case 0:
s.Add(conflictKeys[k], state.Read)
case 1:
s.Add(conflictKeys[k], state.Write)
}
}

// randomly pick what the key access should be
// for unique keys
uniqueMode := rand.Intn(2) //nolint:gosec

// fill in rest with unique keys
// invariant: [remaining] > 0
remaining := numKeys - setSize
for j := 0; j < remaining; j++ {
switch uniqueMode {
case 0:
s.Add(ids.GenerateTestID().String(), state.Read)
case 1:
s.Add(ids.GenerateTestID().String(), state.Write)
}
}

// pass into executor
ti := i
e.Run(s, func() error {
shouldWait := false
for num := range blocking {
if num == ti {
shouldWait = true
break
}
}
if shouldWait {
<-slow
}

l.Lock()
completed = append(completed, ti)
l.Unlock()
return nil
})
}
for i := 0; i < 1000; i++ {
slow <- struct{}{}
}
close(slow)
require.NoError(e.Wait())
require.Len(completed, numTxs)
}

0 comments on commit 09a732b

Please sign in to comment.