Skip to content

Commit

Permalink
Merge pull request #105 from canonical/KU-961/embedded
Browse files Browse the repository at this point in the history
Initial implementation for embedded datastore
  • Loading branch information
neoaggelos committed Jun 24, 2024
2 parents 126d8fd + bd6307b commit 6f4a785
Show file tree
Hide file tree
Showing 14 changed files with 495 additions and 21 deletions.
34 changes: 34 additions & 0 deletions cmd/dbctl/dbctl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dbctl

import (
"os"
"path/filepath"

"github.com/spf13/cobra"
)

var (
flagStorageDir string

Command = &cobra.Command{
Use: "dbctl",
Short: "Interact with the embedded datastore",
}
)

func init() {
// convenient default
defaultStorageDir := os.Getenv("STORAGE_DIR")
if defaultStorageDir == "" {
snapCommon := os.Getenv("SNAP_COMMON")
if snapCommon == "" {
snapCommon = "/var/snap/k8s/common"
}
defaultStorageDir = filepath.Join(snapCommon, "var", "lib", "k8s-dqlite")
}

Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", defaultStorageDir, "k8s-dqlite state directory")

Command.AddCommand(memberCmd)
Command.AddCommand(snapshotCmd)
}
84 changes: 84 additions & 0 deletions cmd/dbctl/member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package dbctl

import (
"context"
"fmt"
"strconv"

"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

var (
flagPeerURL string

memberCmd = &cobra.Command{
Use: "member",
Short: "Manage cluster members",
}

memberListCmd = &cobra.Command{
Use: "list",
Short: "List cluster members",
SilenceUsage: true,
RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) {
return client.MemberList(ctx)
}),
}

memberAddCmd = &cobra.Command{
Use: "add [peerURL ...]",
Short: "Add a new cluster member",
SilenceUsage: true,
Args: cobra.MinimumNArgs(1),
RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) {
return client.MemberAdd(ctx, args)
}),
}

memberRemoveCmd = &cobra.Command{
Use: "remove [memberID]",
Short: "Remove a cluster member",
SilenceUsage: true,
Args: cobra.MaximumNArgs(1),
RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) {
switch {
case flagPeerURL == "" && len(args) == 0:
return nil, fmt.Errorf("specify member ID as argument, or --peer-url flag")
case flagPeerURL != "" && len(args) == 1:
return nil, fmt.Errorf("--peer-url flag not allowed when a member ID is specified")
case len(args) == 1:
id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse member ID %q: %w", args[0], err)
}
return client.MemberRemove(ctx, id)
default:
members, err := client.MemberList(ctx)
if err != nil {
return nil, fmt.Errorf("failed to retrieve list of cluster members: %w", err)
}
for _, member := range members.Members {
for _, url := range member.PeerURLs {
if url == flagPeerURL {
return client.MemberRemove(ctx, member.ID)
}
}
}
return nil, fmt.Errorf("cluster member not found")
}
}),
}
)

func init() {
// member list
memberCmd.AddCommand(memberListCmd)

// member add
memberCmd.AddCommand(memberAddCmd)

// member remove
memberRemoveCmd.Flags().StringVar(&flagPeerURL, "peer-url", "", "remove cluster member with a matching peer URL")
memberCmd.AddCommand(memberRemoveCmd)
}
45 changes: 45 additions & 0 deletions cmd/dbctl/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package dbctl

import (
"context"
"fmt"
"io"
"os"

"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

var (
snapshotCmd = &cobra.Command{
Use: "snapshot",
Short: "Manage cluster snapshots",
}

snapshotSaveCmd = &cobra.Command{
Use: "save [backup.db]",
Short: "Save a snapshot of the cluster",
SilenceUsage: true,
Args: cobra.ExactArgs(1),
RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) {
reader, err := client.Snapshot(ctx)
if err != nil {
return nil, fmt.Errorf("failed to request snapshot: %w", err)
}
b, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to retrieve snapshot: %w", err)
}
if err := os.WriteFile(args[0], b, 0600); err != nil {
return nil, fmt.Errorf("failed to write snapshot to %q: %w", args[0], err)
}
return map[string]any{"size": len(b), "file": args[0]}, nil
}),
}
)

func init() {
// snapshot save
snapshotCmd.AddCommand(snapshotSaveCmd)

}
42 changes: 42 additions & 0 deletions cmd/dbctl/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package dbctl

import (
"context"
"encoding/json"
"fmt"

"github.com/canonical/k8s-dqlite/pkg/etcd"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

func newEtcdClient(storageDir string) (*clientv3.Client, error) {
instance, err := etcd.New(storageDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize instance: %w", err)
}
return instance.NewLocalClient()
}

func jsonOutput(i any) error {
b, err := json.MarshalIndent(i, "", " ")
if err != nil {
return fmt.Errorf("failed to format JSON output: %w", err)
}
fmt.Println(string(b))
return nil
}

func command(f func(context.Context, *clientv3.Client, []string) (any, error)) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
client, err := newEtcdClient(flagStorageDir)
if err != nil {
return fmt.Errorf("failed to initialize etcd client: %w", err)
}
resp, err := f(cmd.Context(), client, args)
if err != nil {
return fmt.Errorf("command failed: %w", err)
}
return jsonOutput(resp)
}
}
54 changes: 34 additions & 20 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package cmd

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"time"

"github.com/canonical/k8s-dqlite/cmd/dbctl"
"github.com/canonical/k8s-dqlite/pkg/etcd"
"github.com/canonical/k8s-dqlite/pkg/server"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -37,6 +38,8 @@ var (
admissionControlPolicy string
acpLimitMaxConcurrentTxn int64
acpOnlyWriteQueries bool

etcdMode bool
}

rootCmd = &cobra.Command{
Expand Down Expand Up @@ -66,26 +69,35 @@ var (
}()
}

server, err := server.New(
rootCmdOpts.dir,
rootCmdOpts.listen,
rootCmdOpts.tls,
rootCmdOpts.diskMode,
rootCmdOpts.clientSessionCacheSize,
rootCmdOpts.minTLSVersion,
rootCmdOpts.watchAvailableStorageInterval,
rootCmdOpts.watchAvailableStorageMinBytes,
rootCmdOpts.lowAvailableStorageAction,
rootCmdOpts.admissionControlPolicy,
rootCmdOpts.acpLimitMaxConcurrentTxn,
rootCmdOpts.acpOnlyWriteQueries,
var (
instance server.Instance
err error
)
if err != nil {
logrus.WithError(err).Fatal("Failed to create server")
if rootCmdOpts.etcdMode {
if instance, err = etcd.New(rootCmdOpts.dir); err != nil {
logrus.WithError(err).Fatal("Failed to create etcd server")
}
} else {
if instance, err = server.New(
rootCmdOpts.dir,
rootCmdOpts.listen,
rootCmdOpts.tls,
rootCmdOpts.diskMode,
rootCmdOpts.clientSessionCacheSize,
rootCmdOpts.minTLSVersion,
rootCmdOpts.watchAvailableStorageInterval,
rootCmdOpts.watchAvailableStorageMinBytes,
rootCmdOpts.lowAvailableStorageAction,
rootCmdOpts.admissionControlPolicy,
rootCmdOpts.acpLimitMaxConcurrentTxn,
rootCmdOpts.acpOnlyWriteQueries,
); err != nil {
logrus.WithError(err).Fatal("Failed to create server")
}
}

ctx, cancel := context.WithCancel(cmd.Context())
if err := server.Start(ctx); err != nil {
if err := instance.Start(ctx); err != nil {
logrus.WithError(err).Fatal("Server failed to start")
}

Expand All @@ -98,15 +110,15 @@ var (

select {
case <-ch:
case <-server.MustStop():
case <-instance.MustStop():
}
cancel()

// Create a separate context with 30 seconds to cleanup
stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := server.Shutdown(stopCtx); err != nil {
if err := instance.Shutdown(stopCtx); err != nil {
logrus.WithError(err).Fatal("Failed to shutdown server")
}
},
Expand All @@ -117,7 +129,6 @@ var (
// This is called by main.main(). It only needs to happen once to the liteCmd.
func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
Expand All @@ -141,4 +152,7 @@ func init() {
// TODO(MK-1408): This value is highly dependend on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value.
rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.")
rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.")
rootCmd.Flags().BoolVar(&rootCmdOpts.etcdMode, "etcd-mode", false, "Run in etcd mode")

rootCmd.AddCommand(dbctl.Command)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.12
go.etcd.io/etcd/client/v3 v3.5.12
go.etcd.io/etcd/server/v3 v3.5.12
go.uber.org/zap v1.27.0
golang.org/x/sync v0.6.0
golang.org/x/sys v0.18.0
google.golang.org/grpc v1.62.1
Expand Down Expand Up @@ -66,7 +67,6 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
16 changes: 16 additions & 0 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package etcd

import (
clientv3 "go.etcd.io/etcd/client/v3"
)

func (e *etcd) NewClient() (*clientv3.Client, error) {
return clientv3.New(e.clientConfig)
}

func (e *etcd) NewLocalClient() (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: []string{e.config.AdvertiseClientUrls[0].String()},
TLS: e.clientConfig.TLS.Clone(),
})
}
28 changes: 28 additions & 0 deletions pkg/etcd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package etcd

import (
"fmt"
"os"
"path/filepath"

"gopkg.in/yaml.v2"
)

type registerConfig struct {
ClientURLs []string `yaml:"client-urls,omitempty"`
PeerURL string `yaml:"peer-url,omitempty"`
TrustedCAFile string `yaml:"trusted-ca-file,omitempty"`
CertFile string `yaml:"cert-file,omitempty"`
KeyFile string `yaml:"key-file,omitempty"`
}

func fileUnmarshal(v interface{}, path ...string) error {
b, err := os.ReadFile(filepath.Join(path...))
if err != nil {
return fmt.Errorf("failed to read file contents: %w", err)
}
if err := yaml.Unmarshal(b, v); err != nil {
return fmt.Errorf("failed to parse as yaml: %w", err)
}
return nil
}
Loading

0 comments on commit 6f4a785

Please sign in to comment.