Skip to content

Commit

Permalink
Add concurrency to create/update/insert benchmarks (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 authored Sep 12, 2024
1 parent 7e926ad commit 5b20dbb
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 59 deletions.
42 changes: 27 additions & 15 deletions test/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test
import (
"context"
"fmt"
"sync"
"testing"

"github.com/canonical/k8s-dqlite/pkg/kine/endpoint"
Expand Down Expand Up @@ -37,24 +38,35 @@ func TestCreate(t *testing.T) {
// BenchmarkCreate is a benchmark for the Create operation.
func BenchmarkCreate(b *testing.B) {
for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} {
b.Run(backendType, func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)
for _, workers := range []int{1, 2, 4, 8, 16} {
b.Run(fmt.Sprintf("%d-workers/%s", workers, backendType), func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{backendType: backendType})
kine := newKineServer(ctx, b, &kineOptions{backendType: backendType})
wg := &sync.WaitGroup{}
run := func(start int) {
defer wg.Done()
for i := start; i < b.N; i += workers {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
createKey(ctx, g, kine.client, key, value)
}
}

kine.ResetMetrics()
b.StartTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
createKey(ctx, g, kine.client, key, value)
}
kine.ReportMetrics(b)
})
kine.ResetMetrics()
b.StartTimer()
wg.Add(workers)
for worker := 0; worker < workers; worker++ {
go run(worker)
}
wg.Wait()
kine.ReportMetrics(b)
})
}
}
}

Expand Down
58 changes: 35 additions & 23 deletions test/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"sync"
"testing"

"github.com/canonical/k8s-dqlite/pkg/kine/endpoint"
Expand Down Expand Up @@ -57,31 +58,42 @@ func TestDelete(t *testing.T) {
// BenchmarkDelete is a benchmark for the delete operation.
func BenchmarkDelete(b *testing.B) {
for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} {
b.Run(backendType, func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "key", 100, b.N*2); err != nil {
return err
for _, workers := range []int{1, 2, 4, 8, 16} {
b.Run(fmt.Sprintf("%d-workers/%s", workers, backendType), func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{
backendType: backendType,
setup: func(ctx context.Context, tx *sql.Tx) error {
if err := insertMany(ctx, tx, "key", 100, b.N*2); err != nil {
return err
}
return nil
},
})
wg := &sync.WaitGroup{}
run := func(start int) {
defer wg.Done()
for i := start; i < b.N; i += workers {
key := fmt.Sprintf("key/%d", i)
deleteKey(ctx, g, kine.client, key)
}
return nil
},
}

kine.ResetMetrics()
b.StartTimer()
wg.Add(workers)
for worker := 0; worker < workers; worker++ {
go run(worker)
}
wg.Wait()
kine.ReportMetrics(b)
})

kine.ResetMetrics()
b.StartTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key/%d", i)
deleteKey(ctx, g, kine.client, key)
}
kine.ReportMetrics(b)
})
}
}
}

Expand Down
57 changes: 36 additions & 21 deletions test/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test
import (
"context"
"fmt"
"sync"
"testing"

"github.com/canonical/k8s-dqlite/pkg/kine/endpoint"
Expand Down Expand Up @@ -72,32 +73,46 @@ func TestUpdate(t *testing.T) {
// BenchmarkUpdate is a benchmark for the Update operation.
func BenchmarkUpdate(b *testing.B) {
for _, backendType := range []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend} {
b.Run(backendType, func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{backendType: backendType})

kine.ResetMetrics()
b.StartTimer()
for i, lastModRev := 0, int64(0); i < b.N; i++ {
value := fmt.Sprintf("value-%d", i)
lastModRev = updateRev(ctx, g, kine.client, "benchKey", lastModRev, value)
}
kine.ReportMetrics(b)
})
for _, workers := range []int{1, 2, 4, 8, 16} {
b.Run(fmt.Sprintf("%d-workers/%s", workers, backendType), func(b *testing.B) {
b.StopTimer()
g := NewWithT(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kine := newKineServer(ctx, b, &kineOptions{backendType: backendType})
wg := &sync.WaitGroup{}
run := func(start int) {
defer wg.Done()
benchKey := fmt.Sprintf("benchKey-%d", start)
for i, lastModRev := 0, int64(0); i < b.N; i += workers {
value := fmt.Sprintf("value-%d", i)
lastModRev = updateRev(ctx, g, kine.client, benchKey, lastModRev, value)
}
}

kine.ResetMetrics()
b.StartTimer()
wg.Add(workers)
for worker := 0; worker < workers; worker++ {
go run(worker)
}
wg.Wait()
kine.ReportMetrics(b)
})
}
}
}

func updateRev(ctx context.Context, g Gomega, client *clientv3.Client, key string, revision int64, value string) int64 {
resp, err := client.Txn(ctx).
txn := client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)).
Then(clientv3.OpPut(key, value)).
Else(clientv3.OpGet(key, clientv3.WithRange(""))).
Commit()
Then(clientv3.OpPut(key, value))
if revision != 0 {
txn = txn.Else(clientv3.OpGet(key, clientv3.WithRange("")))
}
resp, err := txn.Commit()

g.Expect(err).To(BeNil())
g.Expect(resp.Succeeded).To(BeTrue())
Expand Down

0 comments on commit 5b20dbb

Please sign in to comment.