Skip to content

Commit

Permalink
Add dqlite compaction test
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Jun 19, 2024
1 parent cf58d8d commit 3612039
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 136 deletions.
12 changes: 9 additions & 3 deletions pkg/kine/drivers/dqlite/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/driver"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/sqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
"github.com/canonical/k8s-dqlite/pkg/kine/tls"
Expand All @@ -26,15 +27,20 @@ func init() {
}

func New(ctx context.Context, datasourceName string, tlsInfo tls.Config) (server.Backend, error) {
backend, _, err := NewVariant(ctx, datasourceName)
return backend, err
}

func NewVariant(ctx context.Context, datasourceName string) (server.Backend, *generic.Generic, error) {
logrus.Printf("New kine for dqlite")

// Driver name will be extracted from query parameters
backend, generic, err := sqlite.NewVariant(ctx, "", datasourceName)
if err != nil {
return nil, errors.Wrap(err, "sqlite client")
return nil, nil, errors.Wrap(err, "sqlite client")
}
if err := migrate(ctx, generic.DB); err != nil {
return nil, errors.Wrap(err, "failed to migrate DB from sqlite")
return nil, nil, errors.Wrap(err, "failed to migrate DB from sqlite")
}
generic.LockWrites = true
generic.Retry = func(err error) bool {
Expand Down Expand Up @@ -74,7 +80,7 @@ func New(ctx context.Context, datasourceName string, tlsInfo tls.Config) (server
return err
}

return backend, nil
return backend, generic, nil
}

func migrate(ctx context.Context, newDB *sql.DB) (exitErr error) {
Expand Down
44 changes: 44 additions & 0 deletions pkg/kine/drivers/dqlite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//go:build dqlite

package drivers_test

import (
"context"
"fmt"
"testing"

"github.com/canonical/go-dqlite/app"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/dqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
)

func TestDqliteCompaction(t *testing.T) {
testCompaction(t, newSqliteBackend)
}

func BenchmarkDqliteCompaction(b *testing.B) {
benchmarkCompaction(b, newSqliteBackend)
}

var (
nextIdx int
)

func newDqliteBackend(ctx context.Context, tb testing.TB) (server.Backend, *generic.Generic, error) {
nextIdx++

dir := tb.TempDir()
app, err := app.New(dir, app.WithAddress(fmt.Sprintf("127.0.0.1:%d", 59090+nextIdx)))
if err != nil {
panic(fmt.Errorf("failed to create dqlite app: %w", err))
}
if err := app.Ready(ctx); err != nil {
panic(fmt.Errorf("failed to initialize dqlite: %w", err))
}
tb.Cleanup(func() {
app.Close()
})

return dqlite.NewVariant(ctx, fmt.Sprintf("dqlite://k8s-%d?driver-name=%s", nextIdx, app.Driver()))
}
135 changes: 135 additions & 0 deletions pkg/kine/drivers/generic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package drivers_test

import (
"context"
"fmt"
"testing"

"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/logstructured/sqllog"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
. "github.com/onsi/gomega"
)

type makeBackendFunc func(ctx context.Context, tb testing.TB) (server.Backend, *generic.Generic, error)

func testCompaction(t *testing.T, makeBackend makeBackendFunc) {
ctx := context.Background()

t.Run("SmallDatabaseDeleteEntry", func(t *testing.T) {
g := NewWithT(t)
backend, dialect, err := makeBackend(ctx, t)
if err != nil {
t.Fatal(err)
}
defer dialect.DB.Close()

addEntries(ctx, dialect, 2)
deleteEntries(ctx, dialect, 1)

initialSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

err = backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

finalSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

// Expecting no compaction
g.Expect(finalSize).To(BeNumerically("==", initialSize))
})

t.Run("LargeDatabaseDeleteFivePercent", func(t *testing.T) {
g := NewWithT(t)
backend, dialect, err := makeBackend(ctx, t)
if err != nil {
t.Fatal(err)
}
defer dialect.DB.Close()

addEntries(ctx, dialect, 10_000)
deleteEntries(ctx, dialect, 500)

initialSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

err = backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

finalSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

// Expecting compaction
g.Expect(finalSize).To(BeNumerically("<", initialSize))
})
}

func benchmarkCompaction(b *testing.B, makeBackend makeBackendFunc) {
b.StopTimer()
ctx := context.Background()

backend, dialect, err := makeBackend(ctx, b)
if err != nil {
b.Fatal(err)
}
defer dialect.DB.Close()

// Make sure there's enough rows deleted to have
// b.N rows to compact.
delCount := b.N + sqllog.SupersededCount

// Also, make sure there's some uncollectable data, so
// that the deleted rows are about 5% of the total.
addCount := delCount * 20

if err := addEntries(ctx, dialect, addCount); err != nil {
b.Fatal(err)
}
if err := deleteEntries(ctx, dialect, delCount); err != nil {
b.Fatal(err)
}

b.StartTimer()
err = backend.DoCompact(ctx)
if err != nil {
b.Fatal(err)
}
b.StopTimer()
}

func addEntries(ctx context.Context, dialect *generic.Generic, count int) error {
_, err := dialect.DB.ExecContext(ctx, `
WITH RECURSIVE gen_id AS(
SELECT COALESCE(MAX(id), 0)+1 AS id FROM kine
UNION ALL
SELECT id + 1
FROM gen_id
WHERE id + 1 < ?
)
INSERT INTO kine
SELECT id, 'testkey-'||id, 1, 0, id, 0, 0, 'value-'||id, NULL FROM gen_id;
`, count)
return err
}

func deleteEntries(ctx context.Context, dialect *generic.Generic, count int) error {
_, err := dialect.DB.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO kine(
name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
SELECT kv.name, 0, 1, kv.create_revision, kv.id, 0, kv.value, kv.value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WHERE 'testkey-' <= mkv.name AND mkv.name < 'testkey.'
GROUP BY mkv.name
) maxkv ON maxkv.id = kv.id
WHERE kv.deleted = 0
ORDER BY kv.name
LIMIT %d`, count))
return err
}
134 changes: 1 addition & 133 deletions pkg/kine/drivers/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@ package sqlite_test
import (
"context"
"database/sql"
"fmt"
"path"
"testing"

"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/sqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/logstructured/sqllog"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
)

Expand All @@ -23,7 +18,7 @@ func setupV0(db *sql.DB) error {
// Create the very old key_value table
if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS kine
(
(c
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created INTEGER,
Expand Down Expand Up @@ -74,130 +69,3 @@ WHERE type = 'index'
t.Errorf("Expected 2 indexes, got %d", indexes)
}
}

func TestCompaction(t *testing.T) {
ctx := context.Background()

t.Run("SmallDatabaseDeleteEntry", func(t *testing.T) {
g := NewWithT(t)
backend, dialect, err := newBackend(ctx, t)
if err != nil {
t.Fatal(err)
}
defer dialect.DB.Close()

addEntries(ctx, dialect, 2)
deleteEntries(ctx, dialect, 1)

initialSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

err = backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

finalSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

// Expecting no compaction
g.Expect(finalSize).To(BeNumerically("==", initialSize))
})

t.Run("LargeDatabaseDeleteFivePercent", func(t *testing.T) {
g := NewWithT(t)
backend, dialect, err := newBackend(ctx, t)
if err != nil {
t.Fatal(err)
}
defer dialect.DB.Close()

addEntries(ctx, dialect, 10_000)
deleteEntries(ctx, dialect, 500)

initialSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

err = backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

finalSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

// Expecting compaction
g.Expect(finalSize).To(BeNumerically("<", initialSize))
})
}

func BenchmarkCompaction(b *testing.B) {
b.StopTimer()
ctx := context.Background()

backend, dialect, err := newBackend(ctx, b)
if err != nil {
b.Fatal(err)
}
defer dialect.DB.Close()

// Make sure there's enough rows deleted to have
// b.N rows to compact.
delCount := b.N + sqllog.SupersededCount

// Also, make sure there's some uncollectable data, so
// that the deleted rows are about 5% of the total.
addCount := delCount * 20

if err := addEntries(ctx, dialect, addCount); err != nil {
b.Fatal(err)
}
if err := deleteEntries(ctx, dialect, delCount); err != nil {
b.Fatal(err)
}

b.StartTimer()
err = backend.DoCompact(ctx)
if err != nil {
b.Fatal(err)
}
b.StopTimer()
}

func newBackend(ctx context.Context, tb testing.TB) (server.Backend, *generic.Generic, error) {
dir := tb.TempDir()
dataSource := path.Join(dir, "k8s.sqlite")
return sqlite.NewVariant(ctx, "sqlite3", dataSource)
}

func addEntries(ctx context.Context, dialect *generic.Generic, count int) error {
_, err := dialect.DB.ExecContext(ctx, `
WITH RECURSIVE gen_id AS(
SELECT COALESCE(MAX(id), 0)+1 AS id FROM kine
UNION ALL
SELECT id + 1
FROM gen_id
WHERE id + 1 < ?
)
INSERT INTO kine
SELECT id, 'testkey-'||id, 1, 0, id, 0, 0, 'value-'||id, NULL FROM gen_id;
`, count)
return err
}

func deleteEntries(ctx context.Context, dialect *generic.Generic, count int) error {
_, err := dialect.DB.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO kine(
name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
SELECT kv.name, 0, 1, kv.create_revision, kv.id, 0, kv.value, kv.value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WHERE 'testkey-' <= mkv.name AND mkv.name < 'testkey.'
GROUP BY mkv.name
) maxkv ON maxkv.id = kv.id
WHERE kv.deleted = 0
ORDER BY kv.name
LIMIT %d`, count))
return err
}
Loading

0 comments on commit 3612039

Please sign in to comment.