Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve list test and resize admission test #136

Merged
merged 5 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions test/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
func TestAdmissionControl(t *testing.T) {
for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} {
t.Run(backendType, func(t *testing.T) {
const writeLimit = 10

g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -29,11 +31,14 @@ func TestAdmissionControl(t *testing.T) {
backendType: backendType,
endpointParameters: []string{
"admission-control-policy=limit",
"admission-control-policy-limit-max-concurrent-txn=600",
fmt.Sprintf("admission-control-policy-limit-max-concurrent-txn=%d", writeLimit),
"admission-control-only-write-queries=true",
},
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "Key", 1000, 0, 0)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "Key", 100, 1000); err != nil {
return err
}
return nil
},
})

Expand All @@ -42,9 +47,9 @@ func TestAdmissionControl(t *testing.T) {
var numSuccessfulWriterTxn = atomic.Uint64{}
var numSuccessfulReaderTxn = atomic.Uint64{}

reader := func(first int, last int) {
read := func(firstKeyNum, lastKeyNum int) {
defer wg.Done()
for i := first; i < last; i++ {
for i := firstKeyNum; i < lastKeyNum; i++ {
key := fmt.Sprintf("Key/%d", i+1)
_, err := kine.client.Get(ctx, key, clientv3.WithRange(""))
if err == nil {
Expand All @@ -53,9 +58,9 @@ func TestAdmissionControl(t *testing.T) {
}
}

writer := func(first int, last int) {
write := func(firstKeyNum, lastKeyNum int) {
defer wg.Done()
for i := first; i < last; i++ {
for i := firstKeyNum; i < lastKeyNum; i++ {
key := fmt.Sprintf("Key/%d", i+1)
new_value := fmt.Sprintf("New-Value-%d", i+1)
resp, err := kine.client.Get(ctx, key, clientv3.WithRange(""))
Expand All @@ -77,35 +82,31 @@ func TestAdmissionControl(t *testing.T) {
}
}

readers := 50
readers_replication := 3
read_entries := 1000 / readers
writers := 500
writers_replication := 10
write_entries := 1000 / writers
wg.Add(readers*readers_replication + writers*writers_replication)
readers := writeLimit * 4
read_entries := 20

writers := writeLimit * 4
write_entries := 20

wg.Add(readers + writers)
marco6 marked this conversation as resolved.
Show resolved Hide resolved

start := time.Now()
for i := 0; i < readers; i++ {
for j := 0; j < readers_replication; j++ {
go reader(i*read_entries, (i+1)*read_entries)
}
go read(i*read_entries, (i+1)*read_entries)
}
for i := 0; i < writers; i++ {
for j := 0; j < writers_replication; j++ {
go writer(i*write_entries, (i+1)*write_entries)
}
go write(i*write_entries, (i+1)*write_entries)
}

wg.Wait()
duration := time.Since(start)

t.Logf("Executed 1000 queries in %.2f seconds\n", duration.Seconds())
// It is expected that some queries are denied by the admission control due to the load.
g.Expect(numSuccessfulWriterTxn.Load()).To(BeNumerically("<", writers*writers_replication*write_entries))
g.Expect(numSuccessfulWriterTxn.Load()).To(BeNumerically("<", writers*write_entries))

// read queries should be ignored by the admission control
g.Expect(numSuccessfulReaderTxn.Load()).To(BeNumerically("==", readers*readers_replication*read_entries))
g.Expect(numSuccessfulReaderTxn.Load()).To(BeNumerically("==", readers*read_entries))
})
}
}
36 changes: 30 additions & 6 deletions test/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,17 @@ func TestCompaction(t *testing.T) {

kine := newKineServer(ctx, t, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "testkey", 2, 1, 1)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "key", 100, 2); err != nil {
return err
}
if err := updateMany(ctx, tx, "key", 100, 1); err != nil {
return err
}
if err := deleteMany(ctx, tx, "key", 1); err != nil {
return err
}
return nil
},
})

Expand All @@ -47,8 +56,17 @@ func TestCompaction(t *testing.T) {

kine := newKineServer(ctx, t, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "testkey", 10_000, 500, 500)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "key", 100, 10_000); err != nil {
return err
}
if err := updateMany(ctx, tx, "key", 100, 500); err != nil {
return err
}
if err := deleteMany(ctx, tx, "key", 500); err != nil {
return err
}
return nil
},
})

Expand Down Expand Up @@ -77,7 +95,7 @@ func BenchmarkCompaction(b *testing.B) {

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
setup: func(ctx context.Context, tx *sql.Tx) error {
// Make sure there are enough rows deleted to have
// b.N rows to compact.
delCount := b.N + sqllog.SupersededCount
Expand All @@ -86,7 +104,13 @@ func BenchmarkCompaction(b *testing.B) {
// that the deleted rows are about 5% of the total.
addCount := delCount * 20

return setupScenario(ctx, db, "testkey", addCount, 0, delCount)
if err := insertMany(ctx, tx, "key", 100, addCount); err != nil {
return err
}
if err := deleteMany(ctx, tx, "key", delCount); err != nil {
return err
}
return nil
},
})

Expand Down
7 changes: 5 additions & 2 deletions test/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ func BenchmarkDelete(b *testing.B) {

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "key", b.N, 0, 0)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "key", 100, b.N*2); err != nil {
return err
}
return nil
},
})

Expand Down
10 changes: 8 additions & 2 deletions test/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,14 @@ func BenchmarkGet(b *testing.B) {

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "testKey", b.N, b.N, 0)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "testKey", 100, b.N*2); err != nil {
return err
}
if err := updateMany(ctx, tx, "testKey", 100, b.N); err != nil {
return err
}
return nil
},
})

Expand Down
109 changes: 86 additions & 23 deletions test/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,32 +178,95 @@ func TestList(t *testing.T) {
}
}

// BenchmarkList is a benchmark for the Get operation.
func BenchmarkList(b *testing.B) {
for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} {
b.Run(backendType, func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)
setup := func(ctx context.Context, tx *sql.Tx, payloadSize, n int) error {
if err := insertMany(ctx, tx, "key", payloadSize, n*2); err != nil {
return err
}
if err := updateMany(ctx, tx, "key", payloadSize, n); err != nil {
return err
}
if err := deleteMany(ctx, tx, "key", n); err != nil {
return err
}
return nil
}
backends := []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend}
for _, backendType := range backends {
payloads := []struct {
name string
size int
}{{
name: "tiny",
size: 100,
}, {
name: "fits-in-page",
size: 1000,
}, {
name: "overflows-page",
size: 5000,
}}
for _, payload := range payloads {
b.Run(fmt.Sprintf("%s-%s", backendType, payload.name), func(b *testing.B) {
b.Run("all", func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(ctx context.Context, tx *sql.Tx) error {
return setup(ctx, tx, payload.size, b.N)
},
})

kine.ResetMetrics()
b.StartTimer()
resp, err := kine.client.Get(ctx, "key/", clientv3.WithPrefix())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g.Expect(err).To(BeNil())
g.Expect(resp.Kvs).To(HaveLen((b.N + 1) / 2))
kine.ReportMetrics(b)
})

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "key", b.N*2, b.N, b.N)
},
b.Run("pagination", func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(ctx context.Context, tx *sql.Tx) error {
marco6 marked this conversation as resolved.
Show resolved Hide resolved
return setup(ctx, tx, payload.size, b.N)
},
})

kine.ResetMetrics()
b.StartTimer()
nextKey := "key/"
marco6 marked this conversation as resolved.
Show resolved Hide resolved
endRange := clientv3.GetPrefixRangeEnd(nextKey)
count := 0
for more := true; more; {
resp, err := kine.client.Get(ctx,
nextKey,
clientv3.WithRange(endRange),
clientv3.WithLimit(int64(b.N)/10),
)
g.Expect(err).To(BeNil())

more = resp.More
count += len(resp.Kvs)
nextKey = string(resp.Kvs[len(resp.Kvs)-1].Key) + "\x01"
}

g.Expect(count).To(Equal((b.N + 1) / 2))
kine.ReportMetrics(b)
})
})

kine.ResetMetrics()
b.StartTimer()
for i := 0; i < b.N; i++ {
resp, err := kine.client.Get(ctx, "key/", clientv3.WithPrefix())

g.Expect(err).To(BeNil())
g.Expect(resp.Kvs).To(HaveLen(b.N))
}
kine.ReportMetrics(b)
})
}
}
}
Loading
Loading