Skip to content

Commit

Permalink
Make list test linear
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Aug 1, 2024
1 parent b3721ee commit bf7c308
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 74 deletions.
36 changes: 18 additions & 18 deletions test/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
func TestAdmissionControl(t *testing.T) {
for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} {
t.Run(backendType, func(t *testing.T) {
const writeLimit = 10

g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -29,11 +31,14 @@ func TestAdmissionControl(t *testing.T) {
backendType: backendType,
endpointParameters: []string{
"admission-control-policy=limit",
"admission-control-policy-limit-max-concurrent-txn=600",
fmt.Sprintf("admission-control-policy-limit-max-concurrent-txn=%d", writeLimit),
"admission-control-only-write-queries=true",
},
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "Key", 1000, 0, 0)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "Key", 100, 1000); err != nil {
return err
}
return nil
},
})

Expand Down Expand Up @@ -77,35 +82,30 @@ func TestAdmissionControl(t *testing.T) {
}
}

readers := 50
readers_replication := 3
read_entries := 1000 / readers
writers := 500
writers_replication := 10
write_entries := 1000 / writers
wg.Add(readers*readers_replication + writers*writers_replication)
readers := writeLimit * 4
read_entries := 20

writers := writeLimit * 4
write_entries := 20
wg.Add(readers + writers)

start := time.Now()
for i := 0; i < readers; i++ {
for j := 0; j < readers_replication; j++ {
go reader(i*read_entries, (i+1)*read_entries)
}
go reader(i*read_entries, (i+1)*read_entries)
}
for i := 0; i < writers; i++ {
for j := 0; j < writers_replication; j++ {
go writer(i*write_entries, (i+1)*write_entries)
}
go writer(i*write_entries, (i+1)*write_entries)
}

wg.Wait()
duration := time.Since(start)

t.Logf("Executed 1000 queries in %.2f seconds\n", duration.Seconds())
// It is expected that some queries are denied by the admission control due to the load.
g.Expect(numSuccessfulWriterTxn.Load()).To(BeNumerically("<", writers*writers_replication*write_entries))
g.Expect(numSuccessfulWriterTxn.Load()).To(BeNumerically("<", writers*write_entries))

// read queries should be ignored by the admission control
g.Expect(numSuccessfulReaderTxn.Load()).To(BeNumerically("==", readers*readers_replication*read_entries))
g.Expect(numSuccessfulReaderTxn.Load()).To(BeNumerically("==", readers*read_entries))
})
}
}
36 changes: 30 additions & 6 deletions test/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,17 @@ func TestCompaction(t *testing.T) {

kine := newKineServer(ctx, t, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "testkey", 2, 1, 1)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "key", 100, 2); err != nil {
return err
}
if err := updateMany(ctx, tx, "key", 100, 1); err != nil {
return err
}
if err := deleteMany(ctx, tx, "key", 1); err != nil {
return err
}
return nil
},
})

Expand All @@ -47,8 +56,17 @@ func TestCompaction(t *testing.T) {

kine := newKineServer(ctx, t, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "testkey", 10_000, 500, 500)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "key", 100, 10_000); err != nil {
return err
}
if err := updateMany(ctx, tx, "key", 100, 500); err != nil {
return err
}
if err := deleteMany(ctx, tx, "key", 500); err != nil {
return err
}
return nil
},
})

Expand Down Expand Up @@ -77,7 +95,7 @@ func BenchmarkCompaction(b *testing.B) {

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
setup: func(ctx context.Context, tx *sql.Tx) error {
// Make sure there are enough rows deleted to have
// b.N rows to compact.
delCount := b.N + sqllog.SupersededCount
Expand All @@ -86,7 +104,13 @@ func BenchmarkCompaction(b *testing.B) {
// that the deleted rows are about 5% of the total.
addCount := delCount * 20

return setupScenario(ctx, db, "testkey", addCount, 0, delCount)
if err := insertMany(ctx, tx, "key", 100, addCount); err != nil {
return err
}
if err := deleteMany(ctx, tx, "key", delCount); err != nil {
return err
}
return nil
},
})

Expand Down
7 changes: 5 additions & 2 deletions test/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ func BenchmarkDelete(b *testing.B) {

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "key", b.N, 0, 0)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "key", 100, b.N*2); err != nil {
return err
}
return nil
},
})

Expand Down
10 changes: 8 additions & 2 deletions test/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,14 @@ func BenchmarkGet(b *testing.B) {

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "testKey", b.N, b.N, 0)
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "testKey", 100, b.N*2); err != nil {
return err
}
if err := updateMany(ctx, tx, "testKey", 100, b.N); err != nil {
return err
}
return nil
},
})

Expand Down
102 changes: 79 additions & 23 deletions test/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,32 +178,88 @@ func TestList(t *testing.T) {
}
}

// BenchmarkList is a benchmark for the Get operation.
func BenchmarkList(b *testing.B) {
for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} {
b.Run(backendType, func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)
setup := func(ctx context.Context, tx *sql.Tx, payloadSize, n int) error {
if err := insertMany(ctx, tx, "key", payloadSize, n); err != nil {
return err
}
b.Log("insert", n)

if err := updateMany(ctx, tx, "key", payloadSize, n/2); err != nil {
return err
}
b.Log("update", n)

if err := deleteMany(ctx, tx, "key", n/2); err != nil {
return err
}
b.Log("delete", n)
return nil
}
backends := []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend}
payloadSizes := []int{100, 1000, 5000}
for _, backendType := range backends {
for _, payloadSize := range payloadSizes {
b.Run(fmt.Sprintf("%s-%d", backendType, payloadSize), func(b *testing.B) {
b.Run("all", func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(ctx context.Context, tx *sql.Tx) error {
return setup(ctx, tx, payloadSize, b.N)
},
})

kine.ResetMetrics()
b.StartTimer()
resp, err := kine.client.Get(ctx, "key/", clientv3.WithPrefix())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g.Expect(err).To(BeNil())
g.Expect(resp.Kvs).To(HaveLen((b.N + 1) / 2))
kine.ReportMetrics(b)
})

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(db *sql.DB) error {
return setupScenario(ctx, db, "key", b.N*2, b.N, b.N)
},
b.Run("pagination", func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(ctx context.Context, tx *sql.Tx) error {
return setup(ctx, tx, payloadSize, b.N)
},
})

kine.ResetMetrics()
b.StartTimer()
nextKey := "key/"
endRange := clientv3.GetPrefixRangeEnd(nextKey)
count := 0
for more := true; more; {
resp, err := kine.client.Get(ctx,
nextKey,
clientv3.WithRange(endRange),
clientv3.WithLimit(int64(b.N)/10),
)
g.Expect(err).To(BeNil())

more = resp.More
count += len(resp.Kvs)
nextKey = string(resp.Kvs[len(resp.Kvs)-1].Key) + "\x01"
}

g.Expect(count).To(Equal((b.N + 1) / 2))
kine.ReportMetrics(b)
})
})

kine.ResetMetrics()
b.StartTimer()
for i := 0; i < b.N; i++ {
resp, err := kine.client.Get(ctx, "key/", clientv3.WithPrefix())

g.Expect(err).To(BeNil())
g.Expect(resp.Kvs).To(HaveLen(b.N))
}
kine.ReportMetrics(b)
})
}
}
}
48 changes: 25 additions & 23 deletions test/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"path"
Expand Down Expand Up @@ -41,7 +42,7 @@ type kineOptions struct {
// setup is a function to setup the database before a test or
// benchmark starts. It is called after the endpoint started,
// so that migration and database schema setup is already done.
setup func(*sql.DB) error
setup func(context.Context, *sql.Tx) error
}

// newKineServer spins up a new instance of kine. In case of an error, tb.Fatal is called.
Expand Down Expand Up @@ -91,7 +92,15 @@ func newKineServer(ctx context.Context, tb testing.TB, options *kineOptions) *ki
})

if options.setup != nil {
if err := options.setup(db); err != nil {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
tb.Fatal(err)
}
if err := options.setup(ctx, tx); err != nil {
rollbackErr := tx.Rollback()
tb.Fatal(errors.Join(err, rollbackErr))
}
if err := tx.Commit(); err != nil {
tb.Fatal(err)
}
}
Expand Down Expand Up @@ -178,13 +187,7 @@ func (ks *kineServer) ResetMetrics() {
}
}

func setupScenario(ctx context.Context, db *sql.DB, prefix string, numInsert, numUpdates, numDeletes int) error {
t, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer t.Rollback()

func insertMany(ctx context.Context, tx *sql.Tx, prefix string, valueSize, n int) error {
insertManyQuery := `
WITH RECURSIVE gen_id AS(
SELECT 1 AS id
Expand All @@ -201,17 +204,18 @@ WITH RECURSIVE gen_id AS(
INSERT INTO kine(
id, name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
SELECT id + revision.base, ?||'/'||id, 1, 0, id + revision.base, 0, 0, 'value-'||id, NULL
SELECT id + revision.base, ?||'/'||id, 1, 0, id + revision.base, 0, 0, randomblob(?), NULL
FROM gen_id, revision`
if _, err := t.ExecContext(ctx, insertManyQuery, numInsert, prefix); err != nil {
return err
}
_, err := tx.ExecContext(ctx, insertManyQuery, n, prefix, valueSize)
return err
}

func updateMany(ctx context.Context, tx *sql.Tx, prefix string, valueSize, n int) error {
updateManyQuery := `
INSERT INTO kine(
name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
SELECT kv.name, 0, 0, kv.create_revision, kv.id, 0, 'new-'||kv.value, kv.value
SELECT kv.name, 0, 0, kv.create_revision, kv.id, 0, randomblob(?), kv.value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
Expand All @@ -222,11 +226,12 @@ JOIN (
WHERE kv.deleted = 0
ORDER BY kv.name
LIMIT ?`
if _, err := t.ExecContext(ctx, updateManyQuery, prefix, prefix, numUpdates); err != nil {
return err
}
_, err := tx.ExecContext(ctx, updateManyQuery, valueSize, prefix, prefix, n)
return err
}

deleteManyQuery := `
func deleteMany(ctx context.Context, tx *sql.Tx, prefix string, n int) error {
const deleteManyQuery = `
INSERT INTO kine(
name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
Expand All @@ -241,9 +246,6 @@ JOIN (
WHERE kv.deleted = 0
ORDER BY kv.name
LIMIT ?`
if _, err := t.ExecContext(ctx, deleteManyQuery, prefix, prefix, numDeletes); err != nil {
return err
}

return t.Commit()
_, err := tx.ExecContext(ctx, deleteManyQuery, prefix, prefix, n)
return err
}

0 comments on commit bf7c308

Please sign in to comment.