Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make connection pool configurable #158

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Steps:
make static
sudo ./bin/static/k8s-dqlite \
--storage-dir /var/snap/microk8s/current/var/kubernetes/backend \
--listen unix:///var/snap/microk8s/current/var/kubernetes/backend/kine.sock
--listen unix:///var/snap/microk8s/current/var/kubernetes/backend/kine.sock:12379
```

7. While developing and making changes to `k8s-dqlite`, just restart k8s-dqlite
Expand Down
8 changes: 8 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/signal"
"time"

"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/server"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand All @@ -32,6 +33,8 @@ var (
otel bool
otelAddress string

connectionPoolConfig generic.ConnectionPoolConfig

watchAvailableStorageInterval time.Duration
watchAvailableStorageMinBytes uint64
lowAvailableStorageAction string
Expand Down Expand Up @@ -108,6 +111,7 @@ var (
rootCmdOpts.admissionControlPolicy,
rootCmdOpts.acpLimitMaxConcurrentTxn,
rootCmdOpts.acpOnlyWriteQueries,
rootCmdOpts.connectionPoolConfig,
rootCmdOpts.watchQueryTimeout,
)
if err != nil {
Expand Down Expand Up @@ -176,6 +180,10 @@ func init() {
rootCmd.Flags().BoolVar(&rootCmdOpts.otel, "otel", false, "enable traces endpoint")
rootCmd.Flags().StringVar(&rootCmdOpts.otelAddress, "otel-listen", "127.0.0.1:4317", "listen address for OpenTelemetry endpoint")
rootCmd.Flags().StringVar(&rootCmdOpts.metricsAddress, "metrics-listen", "127.0.0.1:9042", "listen address for metrics endpoint")
rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxIdle, "datastore-max-idle-connections", 5, "Maximum number of idle connections retained by datastore. If value = 0, the system default will be used. If value < 0, idle connections will not be reused.")
rootCmd.Flags().IntVar(&rootCmdOpts.connectionPoolConfig.MaxOpen, "datastore-max-open-connections", 5, "Maximum number of open connections used by datastore. If value <= 0, then there is no limit")
rootCmd.Flags().DurationVar(&rootCmdOpts.connectionPoolConfig.MaxLifetime, "datastore-connection-max-lifetime", 60*time.Second, "Maximum amount of time a connection may be reused. If value <= 0, then there is no limit.")
rootCmd.Flags().DurationVar(&rootCmdOpts.connectionPoolConfig.MaxIdleTime, "datastore-connection-max-idle-time", 0*time.Second, "Maximum amount of time a connection may be idle before being closed. If value <= 0, then there is no limit.")
rootCmd.Flags().DurationVar(&rootCmdOpts.watchAvailableStorageInterval, "watch-storage-available-size-interval", 5*time.Second, "Interval to check if the disk is running low on space. Set to 0 to disable the periodic disk size check")
rootCmd.Flags().Uint64Var(&rootCmdOpts.watchAvailableStorageMinBytes, "watch-storage-available-size-min-bytes", 10*1024*1024, "Minimum required available disk size (in bytes) to continue operation. If available disk space gets below this threshold, then the --low-available-storage-action is performed")
rootCmd.Flags().StringVar(&rootCmdOpts.lowAvailableStorageAction, "low-available-storage-action", "none", "Action to perform in case the available storage is low. One of (none|handover|terminate). none means no action is performed. handover means the dqlite node will handover its leadership role, if any. terminate means this dqlite node will shutdown")
Expand Down
8 changes: 4 additions & 4 deletions pkg/kine/drivers/dqlite/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ func init() {
}
}

func New(ctx context.Context, datasourceName string, tlsInfo tls.Config) (server.Backend, error) {
backend, _, err := NewVariant(ctx, datasourceName)
func New(ctx context.Context, datasourceName string, tlsInfo tls.Config, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, error) {
backend, _, err := NewVariant(ctx, datasourceName, connectionPoolConfig)
return backend, err
}

func NewVariant(ctx context.Context, datasourceName string) (server.Backend, *generic.Generic, error) {
func NewVariant(ctx context.Context, datasourceName string, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
logrus.Printf("New kine for dqlite")

// Driver name will be extracted from query parameters
backend, generic, err := sqlite.NewVariant(ctx, "", datasourceName)
backend, generic, err := sqlite.NewVariant(ctx, "", datasourceName, connectionPoolConfig)
if err != nil {
return nil, nil, errors.Wrap(err, "sqlite client")
}
Expand Down
39 changes: 32 additions & 7 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"go.opentelemetry.io/otel/trace"
)

const otelName = "generic"
const (
otelName = "generic"
defaultMaxIdleConns = 2 // default from database/sql
louiseschmidtgen marked this conversation as resolved.
Show resolved Hide resolved
)

var (
otelTracer trace.Tracer
Expand Down Expand Up @@ -162,10 +165,32 @@ type Generic struct {
WatchQueryTimeout time.Duration
}

func configureConnectionPooling(db *sql.DB) {
db.SetMaxIdleConns(5)
db.SetMaxOpenConns(5)
db.SetConnMaxLifetime(60 * time.Second)
type ConnectionPoolConfig struct {
MaxIdle int
MaxOpen int
MaxLifetime time.Duration
MaxIdleTime time.Duration
}

func configureConnectionPooling(connPoolConfig *ConnectionPoolConfig, db *sql.DB) {
// behavior of database/sql - zero means defaultMaxIdleConns; negative means 0
if connPoolConfig.MaxIdle < 0 {
connPoolConfig.MaxIdle = 0
} else if connPoolConfig.MaxIdle == 0 {
connPoolConfig.MaxIdle = defaultMaxIdleConns
}

logrus.Infof(
"Configuring database connection pooling: maxIdleConns=%d, maxOpenConns=%d, connMaxLifetime=%v, connMaxIdleTime=%v ",
connPoolConfig.MaxIdle,
connPoolConfig.MaxOpen,
connPoolConfig.MaxLifetime,
connPoolConfig.MaxIdleTime,
)
db.SetMaxIdleConns(connPoolConfig.MaxIdle)
db.SetMaxOpenConns(connPoolConfig.MaxOpen)
db.SetConnMaxLifetime(connPoolConfig.MaxLifetime)
db.SetConnMaxIdleTime(connPoolConfig.MaxIdleTime)
}

func q(sql, param string, numbered bool) string {
Expand Down Expand Up @@ -200,7 +225,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
return db, nil
}

func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter string, numbered bool) (*Generic, error) {
func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig *ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) {
var (
db *sql.DB
err error
Expand All @@ -219,7 +244,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter
}
}

configureConnectionPooling(db)
configureConnectionPooling(connPoolConfig, db)

return &Generic{
DB: prepared.New(db),
Expand Down
8 changes: 4 additions & 4 deletions pkg/kine/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ type opts struct {
admissionControlOnlyWriteQueries bool
}

func New(ctx context.Context, dataSourceName string) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName)
func New(ctx context.Context, dataSourceName string, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connectionPoolConfig)
if err != nil {
return nil, err
}

return backend, err
}

func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.Backend, *generic.Generic, error) {
func NewVariant(ctx context.Context, driverName, dataSourceName string, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
const retryAttempts = 300

opts, err := parseOpts(dataSourceName)
Expand All @@ -65,7 +65,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.
dataSourceName = "./db/state.db?_journal=WAL&cache=shared"
}

dialect, err := generic.Open(ctx, driverName, opts.dsn, "?", false)
dialect, err := generic.Open(ctx, driverName, opts.dsn, connectionPoolConfig, "?", false)
if err != nil {
return nil, nil, err
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/kine/drivers/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"database/sql"
"path"
"testing"
"time"

"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/sqlite"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -51,7 +53,13 @@ func TestMigration(t *testing.T) {
}

ctx := context.Background()
if _, err := sqlite.New(ctx, dbPath); err != nil {
connPoolConfig := generic.ConnectionPoolConfig{
MaxIdle: 5,
MaxOpen: 5,
MaxLifetime: 60 * time.Second,
MaxIdleTime: 0 * time.Second,
}
if _, err := sqlite.New(ctx, dbPath, &connPoolConfig); err != nil {
t.Fatal(err)
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/kine/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/canonical/k8s-dqlite/pkg/kine/drivers/dqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/sqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
"github.com/canonical/k8s-dqlite/pkg/kine/tls"
Expand All @@ -26,9 +27,10 @@ const (
)

type Config struct {
GRPCServer *grpc.Server
Listener string
Endpoint string
GRPCServer *grpc.Server
Listener string
Endpoint string
ConnectionPoolConfig generic.ConnectionPoolConfig

tls.Config
}
Expand Down Expand Up @@ -180,9 +182,9 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config)
switch driver {
case SQLiteBackend:
leaderElect = false
backend, err = sqlite.New(ctx, dsn)
backend, err = sqlite.New(ctx, dsn, &cfg.ConnectionPoolConfig)
case DQLiteBackend:
backend, err = dqlite.New(ctx, dsn, cfg.Config)
backend, err = dqlite.New(ctx, dsn, cfg.Config, &cfg.ConnectionPoolConfig)
default:
return false, nil, fmt.Errorf("storage backend is not defined")
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/app"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/endpoint"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
kine_tls "github.com/canonical/k8s-dqlite/pkg/kine/tls"
Expand Down Expand Up @@ -69,6 +70,7 @@ func New(
admissionControlPolicy string,
admissionControlPolicyLimitMaxConcurrentTxn int64,
admissionControlOnlyWriteQueries bool,
connectionPoolConfig generic.ConnectionPoolConfig,
watchQueryTimeout time.Duration,
) (*Server, error) {
var (
Expand Down Expand Up @@ -221,7 +223,8 @@ func New(
}
options = append(options, app.WithTLS(listen, dial))
}

// set datastore connection pool options
kineConfig.ConnectionPoolConfig = connectionPoolConfig
// handle tuning parameters
if exists, err := fileExists(dir, "tuning.yaml"); err != nil {
return nil, fmt.Errorf("failed to check for tuning.yaml: %w", err)
Expand Down
Loading