Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation for embedded datastore #105

Merged
merged 18 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cmd/dbctl/dbctl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dbctl

import (
"os"
"path/filepath"

"github.com/spf13/cobra"
)

var (
flagStorageDir string

Command = &cobra.Command{
Use: "dbctl",
Short: "Interact with the embedded datastore",
}
)

func init() {
// convenient default
defaultStorageDir := os.Getenv("STORAGE_DIR")
if defaultStorageDir == "" {
snapCommon := os.Getenv("SNAP_COMMON")
if snapCommon == "" {
snapCommon = "/var/snap/k8s/common"
}
defaultStorageDir = filepath.Join(snapCommon, "var", "lib", "k8s-dqlite")
}

Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", defaultStorageDir, "k8s-dqlite state directory")

Command.AddCommand(memberCmd)
Command.AddCommand(snapshotCmd)
}
84 changes: 84 additions & 0 deletions cmd/dbctl/member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package dbctl

import (
"context"
"fmt"
"strconv"

"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

var (
flagPeerURL string

memberCmd = &cobra.Command{
Use: "member",
Short: "Manage cluster members",
}

memberListCmd = &cobra.Command{
Use: "list",
Short: "List cluster members",
SilenceUsage: true,
RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) {
return client.MemberList(ctx)
}),
}

memberAddCmd = &cobra.Command{
Use: "add [peerURL ...]",
Short: "Add a new cluster member",
SilenceUsage: true,
Args: cobra.MinimumNArgs(1),
RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) {
return client.MemberAdd(ctx, args)
}),
}

memberRemoveCmd = &cobra.Command{
Use: "remove [memberID]",
Short: "Remove a cluster member",
SilenceUsage: true,
Args: cobra.MaximumNArgs(1),
RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) {
switch {
case flagPeerURL == "" && len(args) == 0:
return nil, fmt.Errorf("specify member ID as argument, or --peer-url flag")
case flagPeerURL != "" && len(args) == 1:
return nil, fmt.Errorf("--peer-url flag not allowed when a member ID is specified")
case len(args) == 1:
id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse member ID %q: %w", args[0], err)
}
return client.MemberRemove(ctx, id)
default:
members, err := client.MemberList(ctx)
if err != nil {
return nil, fmt.Errorf("failed to retrieve list of cluster members: %w", err)
}
for _, member := range members.Members {
for _, url := range member.PeerURLs {
if url == flagPeerURL {
return client.MemberRemove(ctx, member.ID)
}
}
}
return nil, fmt.Errorf("cluster member not found")
}
}),
}
)

func init() {
// member list
memberCmd.AddCommand(memberListCmd)

// member add
memberCmd.AddCommand(memberAddCmd)

// member remove
memberRemoveCmd.Flags().StringVar(&flagPeerURL, "peer-url", "", "remove cluster member with a matching peer URL")
memberCmd.AddCommand(memberRemoveCmd)
}
45 changes: 45 additions & 0 deletions cmd/dbctl/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package dbctl

import (
"context"
"fmt"
"io"
"os"

"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

var (
snapshotCmd = &cobra.Command{
Use: "snapshot",
Short: "Manage cluster snapshots",
}

snapshotSaveCmd = &cobra.Command{
Use: "save [backup.db]",
Short: "Save a snapshot of the cluster",
SilenceUsage: true,
Args: cobra.ExactArgs(1),
RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) {
reader, err := client.Snapshot(ctx)
if err != nil {
return nil, fmt.Errorf("failed to request snapshot: %w", err)
}
b, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to retrieve snapshot: %w", err)
}
if err := os.WriteFile(args[0], b, 0600); err != nil {
return nil, fmt.Errorf("failed to write snapshot to %q: %w", args[0], err)
}
return map[string]any{"size": len(b), "file": args[0]}, nil
}),
}
)

func init() {
// snapshot save
snapshotCmd.AddCommand(snapshotSaveCmd)

}
42 changes: 42 additions & 0 deletions cmd/dbctl/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package dbctl

import (
"context"
"encoding/json"
"fmt"

"github.com/canonical/k8s-dqlite/pkg/etcd"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

func newEtcdClient(storageDir string) (*clientv3.Client, error) {
instance, err := etcd.New(storageDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize instance: %w", err)
}
return instance.NewLocalClient()
}

func jsonOutput(i any) error {
b, err := json.MarshalIndent(i, "", " ")
if err != nil {
return fmt.Errorf("failed to format JSON output: %w", err)
}
fmt.Println(string(b))
return nil
}

func command(f func(context.Context, *clientv3.Client, []string) (any, error)) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
client, err := newEtcdClient(flagStorageDir)
if err != nil {
return fmt.Errorf("failed to initialize etcd client: %w", err)
}
resp, err := f(cmd.Context(), client, args)
if err != nil {
return fmt.Errorf("command failed: %w", err)
}
return jsonOutput(resp)
}
}
54 changes: 34 additions & 20 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package cmd

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"time"

"github.com/canonical/k8s-dqlite/cmd/dbctl"
"github.com/canonical/k8s-dqlite/pkg/etcd"
"github.com/canonical/k8s-dqlite/pkg/server"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -37,6 +38,8 @@ var (
admissionControlPolicy string
acpLimitMaxConcurrentTxn int64
acpOnlyWriteQueries bool

etcdMode bool
}

rootCmd = &cobra.Command{
Expand Down Expand Up @@ -66,26 +69,35 @@ var (
}()
}

server, err := server.New(
rootCmdOpts.dir,
rootCmdOpts.listen,
rootCmdOpts.tls,
rootCmdOpts.diskMode,
rootCmdOpts.clientSessionCacheSize,
rootCmdOpts.minTLSVersion,
rootCmdOpts.watchAvailableStorageInterval,
rootCmdOpts.watchAvailableStorageMinBytes,
rootCmdOpts.lowAvailableStorageAction,
rootCmdOpts.admissionControlPolicy,
rootCmdOpts.acpLimitMaxConcurrentTxn,
rootCmdOpts.acpOnlyWriteQueries,
var (
instance server.Instance
err error
)
if err != nil {
logrus.WithError(err).Fatal("Failed to create server")
if rootCmdOpts.etcdMode {
if instance, err = etcd.New(rootCmdOpts.dir); err != nil {
logrus.WithError(err).Fatal("Failed to create etcd server")
}
} else {
if instance, err = server.New(
rootCmdOpts.dir,
rootCmdOpts.listen,
rootCmdOpts.tls,
rootCmdOpts.diskMode,
rootCmdOpts.clientSessionCacheSize,
rootCmdOpts.minTLSVersion,
rootCmdOpts.watchAvailableStorageInterval,
rootCmdOpts.watchAvailableStorageMinBytes,
rootCmdOpts.lowAvailableStorageAction,
rootCmdOpts.admissionControlPolicy,
rootCmdOpts.acpLimitMaxConcurrentTxn,
rootCmdOpts.acpOnlyWriteQueries,
); err != nil {
logrus.WithError(err).Fatal("Failed to create server")
}
}

ctx, cancel := context.WithCancel(cmd.Context())
if err := server.Start(ctx); err != nil {
if err := instance.Start(ctx); err != nil {
logrus.WithError(err).Fatal("Server failed to start")
}

Expand All @@ -98,15 +110,15 @@ var (

select {
case <-ch:
case <-server.MustStop():
case <-instance.MustStop():
}
cancel()

// Create a separate context with 30 seconds to cleanup
stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := server.Shutdown(stopCtx); err != nil {
if err := instance.Shutdown(stopCtx); err != nil {
logrus.WithError(err).Fatal("Failed to shutdown server")
}
},
Expand All @@ -117,7 +129,6 @@ var (
// This is called by main.main(). It only needs to happen once to the liteCmd.
func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
Expand All @@ -141,4 +152,7 @@ func init() {
// TODO(MK-1408): This value is highly dependend on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value.
rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.")
rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.")
rootCmd.Flags().BoolVar(&rootCmdOpts.etcdMode, "etcd-mode", false, "Run in etcd mode")

rootCmd.AddCommand(dbctl.Command)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.12
go.etcd.io/etcd/client/v3 v3.5.12
go.etcd.io/etcd/server/v3 v3.5.12
go.uber.org/zap v1.27.0
golang.org/x/sync v0.6.0
golang.org/x/sys v0.18.0
google.golang.org/grpc v1.62.1
Expand Down Expand Up @@ -66,7 +67,6 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
16 changes: 16 additions & 0 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package etcd

import (
clientv3 "go.etcd.io/etcd/client/v3"
)

func (e *etcd) NewClient() (*clientv3.Client, error) {
return clientv3.New(e.clientConfig)
}

func (e *etcd) NewLocalClient() (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: []string{e.config.AdvertiseClientUrls[0].String()},
TLS: e.clientConfig.TLS.Clone(),
})
}
28 changes: 28 additions & 0 deletions pkg/etcd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package etcd

import (
"fmt"
"os"
"path/filepath"

"gopkg.in/yaml.v2"
)

type registerConfig struct {
ClientURLs []string `yaml:"client-urls,omitempty"`
PeerURL string `yaml:"peer-url,omitempty"`
TrustedCAFile string `yaml:"trusted-ca-file,omitempty"`
CertFile string `yaml:"cert-file,omitempty"`
KeyFile string `yaml:"key-file,omitempty"`
}

func fileUnmarshal(v interface{}, path ...string) error {
b, err := os.ReadFile(filepath.Join(path...))
if err != nil {
return fmt.Errorf("failed to read file contents: %w", err)
}
if err := yaml.Unmarshal(b, v); err != nil {
return fmt.Errorf("failed to parse as yaml: %w", err)
}
return nil
}
Loading
Loading