From bf7c30831162fed67748339a5b9482a8989e7e0d Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Mon, 22 Jul 2024 13:56:47 +0200 Subject: [PATCH] Make list test linear --- test/admission_test.go | 36 +++++++------- test/compaction_test.go | 36 +++++++++++--- test/delete_test.go | 7 ++- test/get_test.go | 10 +++- test/list_test.go | 102 +++++++++++++++++++++++++++++++--------- test/util_test.go | 48 ++++++++++--------- 6 files changed, 165 insertions(+), 74 deletions(-) diff --git a/test/admission_test.go b/test/admission_test.go index ff314011..1e3241d6 100644 --- a/test/admission_test.go +++ b/test/admission_test.go @@ -20,6 +20,8 @@ import ( func TestAdmissionControl(t *testing.T) { for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} { t.Run(backendType, func(t *testing.T) { + const writeLimit = 10 + g := NewWithT(t) ctx, cancel := context.WithCancel(context.Background()) @@ -29,11 +31,14 @@ func TestAdmissionControl(t *testing.T) { backendType: backendType, endpointParameters: []string{ "admission-control-policy=limit", - "admission-control-policy-limit-max-concurrent-txn=600", + fmt.Sprintf("admission-control-policy-limit-max-concurrent-txn=%d", writeLimit), "admission-control-only-write-queries=true", }, - setup: func(db *sql.DB) error { - return setupScenario(ctx, db, "Key", 1000, 0, 0) + setup: func(ctx context.Context, tx *sql.Tx) error { + if err := insertMany(ctx, tx, "Key", 100, 1000); err != nil { + return err + } + return nil }, }) @@ -77,24 +82,19 @@ func TestAdmissionControl(t *testing.T) { } } - readers := 50 - readers_replication := 3 - read_entries := 1000 / readers - writers := 500 - writers_replication := 10 - write_entries := 1000 / writers - wg.Add(readers*readers_replication + writers*writers_replication) + readers := writeLimit * 4 + read_entries := 20 + + writers := writeLimit * 4 + write_entries := 20 + wg.Add(readers + writers) start := time.Now() for i := 0; i < readers; i++ { - for j := 0; j < readers_replication; j++ { - go reader(i*read_entries, (i+1)*read_entries) - } + go reader(i*read_entries, (i+1)*read_entries) } for i := 0; i < writers; i++ { - for j := 0; j < writers_replication; j++ { - go writer(i*write_entries, (i+1)*write_entries) - } + go writer(i*write_entries, (i+1)*write_entries) } wg.Wait() @@ -102,10 +102,10 @@ func TestAdmissionControl(t *testing.T) { t.Logf("Executed 1000 queries in %.2f seconds\n", duration.Seconds()) // It is expected that some queries are denied by the admission control due to the load. - g.Expect(numSuccessfulWriterTxn.Load()).To(BeNumerically("<", writers*writers_replication*write_entries)) + g.Expect(numSuccessfulWriterTxn.Load()).To(BeNumerically("<", writers*write_entries)) // read queries should be ignored by the admission control - g.Expect(numSuccessfulReaderTxn.Load()).To(BeNumerically("==", readers*readers_replication*read_entries)) + g.Expect(numSuccessfulReaderTxn.Load()).To(BeNumerically("==", readers*read_entries)) }) } } diff --git a/test/compaction_test.go b/test/compaction_test.go index fa225f28..5554dd73 100644 --- a/test/compaction_test.go +++ b/test/compaction_test.go @@ -21,8 +21,17 @@ func TestCompaction(t *testing.T) { kine := newKineServer(ctx, t, &kineOptions{ backendType: backendType, - setup: func(db *sql.DB) error { - return setupScenario(ctx, db, "testkey", 2, 1, 1) + setup: func(ctx context.Context, tx *sql.Tx) error { + if err := insertMany(ctx, tx, "key", 100, 2); err != nil { + return err + } + if err := updateMany(ctx, tx, "key", 100, 1); err != nil { + return err + } + if err := deleteMany(ctx, tx, "key", 1); err != nil { + return err + } + return nil }, }) @@ -47,8 +56,17 @@ func TestCompaction(t *testing.T) { kine := newKineServer(ctx, t, &kineOptions{ backendType: backendType, - setup: func(db *sql.DB) error { - return setupScenario(ctx, db, "testkey", 10_000, 500, 500) + setup: func(ctx context.Context, tx *sql.Tx) error { + if err := insertMany(ctx, tx, "key", 100, 10_000); err != nil { + return err + } + if err := updateMany(ctx, tx, "key", 100, 500); err != nil { + return err + } + if err := deleteMany(ctx, tx, "key", 500); err != nil { + return err + } + return nil }, }) @@ -77,7 +95,7 @@ func BenchmarkCompaction(b *testing.B) { kine := newKineServer(ctx, b, &kineOptions{ backendType: backendType, - setup: func(db *sql.DB) error { + setup: func(ctx context.Context, tx *sql.Tx) error { // Make sure there are enough rows deleted to have // b.N rows to compact. delCount := b.N + sqllog.SupersededCount @@ -86,7 +104,13 @@ func BenchmarkCompaction(b *testing.B) { // that the deleted rows are about 5% of the total. addCount := delCount * 20 - return setupScenario(ctx, db, "testkey", addCount, 0, delCount) + if err := insertMany(ctx, tx, "key", 100, addCount); err != nil { + return err + } + if err := deleteMany(ctx, tx, "key", delCount); err != nil { + return err + } + return nil }, }) diff --git a/test/delete_test.go b/test/delete_test.go index 548dac67..af9a79d2 100644 --- a/test/delete_test.go +++ b/test/delete_test.go @@ -66,8 +66,11 @@ func BenchmarkDelete(b *testing.B) { kine := newKineServer(ctx, b, &kineOptions{ backendType: backendType, - setup: func(db *sql.DB) error { - return setupScenario(ctx, db, "key", b.N, 0, 0) + setup: func(ctx context.Context, tx *sql.Tx) error { + if err := insertMany(ctx, tx, "key", 100, b.N*2); err != nil { + return err + } + return nil }, }) diff --git a/test/get_test.go b/test/get_test.go index 984f4a5d..8270bd3c 100644 --- a/test/get_test.go +++ b/test/get_test.go @@ -124,8 +124,14 @@ func BenchmarkGet(b *testing.B) { kine := newKineServer(ctx, b, &kineOptions{ backendType: backendType, - setup: func(db *sql.DB) error { - return setupScenario(ctx, db, "testKey", b.N, b.N, 0) + setup: func(ctx context.Context, tx *sql.Tx) error { + if err := insertMany(ctx, tx, "testKey", 100, b.N*2); err != nil { + return err + } + if err := updateMany(ctx, tx, "testKey", 100, b.N); err != nil { + return err + } + return nil }, }) diff --git a/test/list_test.go b/test/list_test.go index e5ac70cd..d127bb24 100644 --- a/test/list_test.go +++ b/test/list_test.go @@ -178,32 +178,88 @@ func TestList(t *testing.T) { } } -// BenchmarkList is a benchmark for the Get operation. func BenchmarkList(b *testing.B) { - for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} { - b.Run(backendType, func(b *testing.B) { - b.StopTimer() - g := NewWithT(b) + setup := func(ctx context.Context, tx *sql.Tx, payloadSize, n int) error { + if err := insertMany(ctx, tx, "key", payloadSize, n); err != nil { + return err + } + b.Log("insert", n) + + if err := updateMany(ctx, tx, "key", payloadSize, n/2); err != nil { + return err + } + b.Log("update", n) + + if err := deleteMany(ctx, tx, "key", n/2); err != nil { + return err + } + b.Log("delete", n) + return nil + } + backends := []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} + payloadSizes := []int{100, 1000, 5000} + for _, backendType := range backends { + for _, payloadSize := range payloadSizes { + b.Run(fmt.Sprintf("%s-%d", backendType, payloadSize), func(b *testing.B) { + b.Run("all", func(b *testing.B) { + b.StopTimer() + g := NewWithT(b) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + kine := newKineServer(ctx, b, &kineOptions{ + backendType: backendType, + setup: func(ctx context.Context, tx *sql.Tx) error { + return setup(ctx, tx, payloadSize, b.N) + }, + }) + + kine.ResetMetrics() + b.StartTimer() + resp, err := kine.client.Get(ctx, "key/", clientv3.WithPrefix()) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(HaveLen((b.N + 1) / 2)) + kine.ReportMetrics(b) + }) - kine := newKineServer(ctx, b, &kineOptions{ - backendType: backendType, - setup: func(db *sql.DB) error { - return setupScenario(ctx, db, "key", b.N*2, b.N, b.N) - }, + b.Run("pagination", func(b *testing.B) { + b.StopTimer() + g := NewWithT(b) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + kine := newKineServer(ctx, b, &kineOptions{ + backendType: backendType, + setup: func(ctx context.Context, tx *sql.Tx) error { + return setup(ctx, tx, payloadSize, b.N) + }, + }) + + kine.ResetMetrics() + b.StartTimer() + nextKey := "key/" + endRange := clientv3.GetPrefixRangeEnd(nextKey) + count := 0 + for more := true; more; { + resp, err := kine.client.Get(ctx, + nextKey, + clientv3.WithRange(endRange), + clientv3.WithLimit(int64(b.N)/10), + ) + g.Expect(err).To(BeNil()) + + more = resp.More + count += len(resp.Kvs) + nextKey = string(resp.Kvs[len(resp.Kvs)-1].Key) + "\x01" + } + + g.Expect(count).To(Equal((b.N + 1) / 2)) + kine.ReportMetrics(b) + }) }) - - kine.ResetMetrics() - b.StartTimer() - for i := 0; i < b.N; i++ { - resp, err := kine.client.Get(ctx, "key/", clientv3.WithPrefix()) - - g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs).To(HaveLen(b.N)) - } - kine.ReportMetrics(b) - }) + } } } diff --git a/test/util_test.go b/test/util_test.go index 508a59f6..e5b33f8e 100644 --- a/test/util_test.go +++ b/test/util_test.go @@ -3,6 +3,7 @@ package test import ( "context" "database/sql" + "errors" "fmt" "io" "path" @@ -41,7 +42,7 @@ type kineOptions struct { // setup is a function to setup the database before a test or // benchmark starts. It is called after the endpoint started, // so that migration and database schema setup is already done. - setup func(*sql.DB) error + setup func(context.Context, *sql.Tx) error } // newKineServer spins up a new instance of kine. In case of an error, tb.Fatal is called. @@ -91,7 +92,15 @@ func newKineServer(ctx context.Context, tb testing.TB, options *kineOptions) *ki }) if options.setup != nil { - if err := options.setup(db); err != nil { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + tb.Fatal(err) + } + if err := options.setup(ctx, tx); err != nil { + rollbackErr := tx.Rollback() + tb.Fatal(errors.Join(err, rollbackErr)) + } + if err := tx.Commit(); err != nil { tb.Fatal(err) } } @@ -178,13 +187,7 @@ func (ks *kineServer) ResetMetrics() { } } -func setupScenario(ctx context.Context, db *sql.DB, prefix string, numInsert, numUpdates, numDeletes int) error { - t, err := db.BeginTx(ctx, nil) - if err != nil { - return err - } - defer t.Rollback() - +func insertMany(ctx context.Context, tx *sql.Tx, prefix string, valueSize, n int) error { insertManyQuery := ` WITH RECURSIVE gen_id AS( SELECT 1 AS id @@ -201,17 +204,18 @@ WITH RECURSIVE gen_id AS( INSERT INTO kine( id, name, created, deleted, create_revision, prev_revision, lease, value, old_value ) -SELECT id + revision.base, ?||'/'||id, 1, 0, id + revision.base, 0, 0, 'value-'||id, NULL +SELECT id + revision.base, ?||'/'||id, 1, 0, id + revision.base, 0, 0, randomblob(?), NULL FROM gen_id, revision` - if _, err := t.ExecContext(ctx, insertManyQuery, numInsert, prefix); err != nil { - return err - } + _, err := tx.ExecContext(ctx, insertManyQuery, n, prefix, valueSize) + return err +} +func updateMany(ctx context.Context, tx *sql.Tx, prefix string, valueSize, n int) error { updateManyQuery := ` INSERT INTO kine( name, created, deleted, create_revision, prev_revision, lease, value, old_value ) -SELECT kv.name, 0, 0, kv.create_revision, kv.id, 0, 'new-'||kv.value, kv.value +SELECT kv.name, 0, 0, kv.create_revision, kv.id, 0, randomblob(?), kv.value FROM kine AS kv JOIN ( SELECT MAX(mkv.id) as id @@ -222,11 +226,12 @@ JOIN ( WHERE kv.deleted = 0 ORDER BY kv.name LIMIT ?` - if _, err := t.ExecContext(ctx, updateManyQuery, prefix, prefix, numUpdates); err != nil { - return err - } + _, err := tx.ExecContext(ctx, updateManyQuery, valueSize, prefix, prefix, n) + return err +} - deleteManyQuery := ` +func deleteMany(ctx context.Context, tx *sql.Tx, prefix string, n int) error { + const deleteManyQuery = ` INSERT INTO kine( name, created, deleted, create_revision, prev_revision, lease, value, old_value ) @@ -241,9 +246,6 @@ JOIN ( WHERE kv.deleted = 0 ORDER BY kv.name LIMIT ?` - if _, err := t.ExecContext(ctx, deleteManyQuery, prefix, prefix, numDeletes); err != nil { - return err - } - - return t.Commit() + _, err := tx.ExecContext(ctx, deleteManyQuery, prefix, prefix, n) + return err }