diff --git a/cmd/dbctl/dbctl.go b/cmd/dbctl/dbctl.go new file mode 100644 index 00000000..3c27fa1f --- /dev/null +++ b/cmd/dbctl/dbctl.go @@ -0,0 +1,34 @@ +package dbctl + +import ( + "os" + "path/filepath" + + "github.com/spf13/cobra" +) + +var ( + flagStorageDir string + + Command = &cobra.Command{ + Use: "dbctl", + Short: "Interact with the embedded datastore", + } +) + +func init() { + // convenient default + defaultStorageDir := os.Getenv("STORAGE_DIR") + if defaultStorageDir == "" { + snapCommon := os.Getenv("SNAP_COMMON") + if snapCommon == "" { + snapCommon = "/var/snap/k8s/common" + } + defaultStorageDir = filepath.Join(snapCommon, "var", "lib", "k8s-dqlite") + } + + Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", defaultStorageDir, "k8s-dqlite state directory") + + Command.AddCommand(memberCmd) + Command.AddCommand(snapshotCmd) +} diff --git a/cmd/dbctl/member.go b/cmd/dbctl/member.go new file mode 100644 index 00000000..1e73c4a0 --- /dev/null +++ b/cmd/dbctl/member.go @@ -0,0 +1,84 @@ +package dbctl + +import ( + "context" + "fmt" + "strconv" + + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +var ( + flagPeerURL string + + memberCmd = &cobra.Command{ + Use: "member", + Short: "Manage cluster members", + } + + memberListCmd = &cobra.Command{ + Use: "list", + Short: "List cluster members", + SilenceUsage: true, + RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + return client.MemberList(ctx) + }), + } + + memberAddCmd = &cobra.Command{ + Use: "add [peerURL ...]", + Short: "Add a new cluster member", + SilenceUsage: true, + Args: cobra.MinimumNArgs(1), + RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + return client.MemberAdd(ctx, args) + }), + } + + memberRemoveCmd = &cobra.Command{ + Use: "remove [memberID]", + Short: "Remove a cluster member", + SilenceUsage: true, + Args: cobra.MaximumNArgs(1), + RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + switch { + case flagPeerURL == "" && len(args) == 0: + return nil, fmt.Errorf("specify member ID as argument, or --peer-url flag") + case flagPeerURL != "" && len(args) == 1: + return nil, fmt.Errorf("--peer-url flag not allowed when a member ID is specified") + case len(args) == 1: + id, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("cannot parse member ID %q: %w", args[0], err) + } + return client.MemberRemove(ctx, id) + default: + members, err := client.MemberList(ctx) + if err != nil { + return nil, fmt.Errorf("failed to retrieve list of cluster members: %w", err) + } + for _, member := range members.Members { + for _, url := range member.PeerURLs { + if url == flagPeerURL { + return client.MemberRemove(ctx, member.ID) + } + } + } + return nil, fmt.Errorf("cluster member not found") + } + }), + } +) + +func init() { + // member list + memberCmd.AddCommand(memberListCmd) + + // member add + memberCmd.AddCommand(memberAddCmd) + + // member remove + memberRemoveCmd.Flags().StringVar(&flagPeerURL, "peer-url", "", "remove cluster member with a matching peer URL") + memberCmd.AddCommand(memberRemoveCmd) +} diff --git a/cmd/dbctl/snapshot.go b/cmd/dbctl/snapshot.go new file mode 100644 index 00000000..a0edafff --- /dev/null +++ b/cmd/dbctl/snapshot.go @@ -0,0 +1,45 @@ +package dbctl + +import ( + "context" + "fmt" + "io" + "os" + + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +var ( + snapshotCmd = &cobra.Command{ + Use: "snapshot", + Short: "Manage cluster snapshots", + } + + snapshotSaveCmd = &cobra.Command{ + Use: "save [backup.db]", + Short: "Save a snapshot of the cluster", + SilenceUsage: true, + Args: cobra.ExactArgs(1), + RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + reader, err := client.Snapshot(ctx) + if err != nil { + return nil, fmt.Errorf("failed to request snapshot: %w", err) + } + b, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to retrieve snapshot: %w", err) + } + if err := os.WriteFile(args[0], b, 0600); err != nil { + return nil, fmt.Errorf("failed to write snapshot to %q: %w", args[0], err) + } + return map[string]any{"size": len(b), "file": args[0]}, nil + }), + } +) + +func init() { + // snapshot save + snapshotCmd.AddCommand(snapshotSaveCmd) + +} diff --git a/cmd/dbctl/util.go b/cmd/dbctl/util.go new file mode 100644 index 00000000..f11789de --- /dev/null +++ b/cmd/dbctl/util.go @@ -0,0 +1,42 @@ +package dbctl + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/canonical/k8s-dqlite/pkg/etcd" + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +func newEtcdClient(storageDir string) (*clientv3.Client, error) { + instance, err := etcd.New(storageDir) + if err != nil { + return nil, fmt.Errorf("failed to initialize instance: %w", err) + } + return instance.NewLocalClient() +} + +func jsonOutput(i any) error { + b, err := json.MarshalIndent(i, "", " ") + if err != nil { + return fmt.Errorf("failed to format JSON output: %w", err) + } + fmt.Println(string(b)) + return nil +} + +func command(f func(context.Context, *clientv3.Client, []string) (any, error)) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + client, err := newEtcdClient(flagStorageDir) + if err != nil { + return fmt.Errorf("failed to initialize etcd client: %w", err) + } + resp, err := f(cmd.Context(), client, args) + if err != nil { + return fmt.Errorf("command failed: %w", err) + } + return jsonOutput(resp) + } +} diff --git a/cmd/root.go b/cmd/root.go index c72e7f64..652926a8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,13 +2,14 @@ package cmd import ( "context" - "fmt" "net/http" _ "net/http/pprof" "os" "os/signal" "time" + "github.com/canonical/k8s-dqlite/cmd/dbctl" + "github.com/canonical/k8s-dqlite/pkg/etcd" "github.com/canonical/k8s-dqlite/pkg/server" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -37,6 +38,8 @@ var ( admissionControlPolicy string acpLimitMaxConcurrentTxn int64 acpOnlyWriteQueries bool + + etcdMode bool } rootCmd = &cobra.Command{ @@ -66,26 +69,35 @@ var ( }() } - server, err := server.New( - rootCmdOpts.dir, - rootCmdOpts.listen, - rootCmdOpts.tls, - rootCmdOpts.diskMode, - rootCmdOpts.clientSessionCacheSize, - rootCmdOpts.minTLSVersion, - rootCmdOpts.watchAvailableStorageInterval, - rootCmdOpts.watchAvailableStorageMinBytes, - rootCmdOpts.lowAvailableStorageAction, - rootCmdOpts.admissionControlPolicy, - rootCmdOpts.acpLimitMaxConcurrentTxn, - rootCmdOpts.acpOnlyWriteQueries, + var ( + instance server.Instance + err error ) - if err != nil { - logrus.WithError(err).Fatal("Failed to create server") + if rootCmdOpts.etcdMode { + if instance, err = etcd.New(rootCmdOpts.dir); err != nil { + logrus.WithError(err).Fatal("Failed to create etcd server") + } + } else { + if instance, err = server.New( + rootCmdOpts.dir, + rootCmdOpts.listen, + rootCmdOpts.tls, + rootCmdOpts.diskMode, + rootCmdOpts.clientSessionCacheSize, + rootCmdOpts.minTLSVersion, + rootCmdOpts.watchAvailableStorageInterval, + rootCmdOpts.watchAvailableStorageMinBytes, + rootCmdOpts.lowAvailableStorageAction, + rootCmdOpts.admissionControlPolicy, + rootCmdOpts.acpLimitMaxConcurrentTxn, + rootCmdOpts.acpOnlyWriteQueries, + ); err != nil { + logrus.WithError(err).Fatal("Failed to create server") + } } ctx, cancel := context.WithCancel(cmd.Context()) - if err := server.Start(ctx); err != nil { + if err := instance.Start(ctx); err != nil { logrus.WithError(err).Fatal("Server failed to start") } @@ -98,7 +110,7 @@ var ( select { case <-ch: - case <-server.MustStop(): + case <-instance.MustStop(): } cancel() @@ -106,7 +118,7 @@ var ( stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if err := server.Shutdown(stopCtx); err != nil { + if err := instance.Shutdown(stopCtx); err != nil { logrus.WithError(err).Fatal("Failed to shutdown server") } }, @@ -117,7 +129,6 @@ var ( // This is called by main.main(). It only needs to happen once to the liteCmd. func Execute() { if err := rootCmd.Execute(); err != nil { - fmt.Println(err) os.Exit(1) } } @@ -141,4 +152,7 @@ func init() { // TODO(MK-1408): This value is highly dependend on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value. rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.") rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.") + rootCmd.Flags().BoolVar(&rootCmdOpts.etcdMode, "etcd-mode", false, "Run in etcd mode") + + rootCmd.AddCommand(dbctl.Command) } diff --git a/go.mod b/go.mod index 7a640896..36fc4308 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.12 go.etcd.io/etcd/client/v3 v3.5.12 go.etcd.io/etcd/server/v3 v3.5.12 + go.uber.org/zap v1.27.0 golang.org/x/sync v0.6.0 golang.org/x/sys v0.18.0 google.golang.org/grpc v1.62.1 @@ -66,7 +67,6 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/net v0.22.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go new file mode 100644 index 00000000..52a9f0b2 --- /dev/null +++ b/pkg/etcd/client.go @@ -0,0 +1,16 @@ +package etcd + +import ( + clientv3 "go.etcd.io/etcd/client/v3" +) + +func (e *etcd) NewClient() (*clientv3.Client, error) { + return clientv3.New(e.clientConfig) +} + +func (e *etcd) NewLocalClient() (*clientv3.Client, error) { + return clientv3.New(clientv3.Config{ + Endpoints: []string{e.config.AdvertiseClientUrls[0].String()}, + TLS: e.clientConfig.TLS.Clone(), + }) +} diff --git a/pkg/etcd/config.go b/pkg/etcd/config.go new file mode 100644 index 00000000..8b87ab9b --- /dev/null +++ b/pkg/etcd/config.go @@ -0,0 +1,28 @@ +package etcd + +import ( + "fmt" + "os" + "path/filepath" + + "gopkg.in/yaml.v2" +) + +type registerConfig struct { + ClientURLs []string `yaml:"client-urls,omitempty"` + PeerURL string `yaml:"peer-url,omitempty"` + TrustedCAFile string `yaml:"trusted-ca-file,omitempty"` + CertFile string `yaml:"cert-file,omitempty"` + KeyFile string `yaml:"key-file,omitempty"` +} + +func fileUnmarshal(v interface{}, path ...string) error { + b, err := os.ReadFile(filepath.Join(path...)) + if err != nil { + return fmt.Errorf("failed to read file contents: %w", err) + } + if err := yaml.Unmarshal(b, v); err != nil { + return fmt.Errorf("failed to parse as yaml: %w", err) + } + return nil +} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go new file mode 100644 index 00000000..15acd518 --- /dev/null +++ b/pkg/etcd/etcd.go @@ -0,0 +1,65 @@ +package etcd + +import ( + "fmt" + "os" + "path/filepath" + + kine_tls "github.com/canonical/k8s-dqlite/pkg/kine/tls" + "github.com/canonical/k8s-dqlite/pkg/server" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" +) + +type etcd struct { + clientConfig clientv3.Config + + peerURL string + sentinelFile string + + config *embed.Config + instance *embed.Etcd + + mustStopCh chan struct{} +} + +func New(storageDir string) (*etcd, error) { + config, err := embed.ConfigFromFile(filepath.Join(storageDir, "etcd.yaml")) + if err != nil { + return nil, fmt.Errorf("failed to load etcd config: %w", err) + } + var registerConfig registerConfig + if err := fileUnmarshal(®isterConfig, storageDir, "register.yaml"); err != nil { + return nil, fmt.Errorf("failed to load register config: %w", err) + } + + tlsConfig, err := kine_tls.Config{ + CAFile: registerConfig.TrustedCAFile, + CertFile: registerConfig.CertFile, + KeyFile: registerConfig.KeyFile, + }.ClientConfig() + if err != nil { + return nil, fmt.Errorf("failed to initialize client TLS config: %w", err) + } + + if err := os.MkdirAll(config.Dir, 0700); err != nil { + return nil, fmt.Errorf("failed to ensure data directory: %w", err) + } + + return &etcd{ + config: config, + clientConfig: clientv3.Config{ + Endpoints: registerConfig.ClientURLs, + TLS: tlsConfig, + }, + peerURL: registerConfig.PeerURL, + sentinelFile: filepath.Join(config.Dir, "sentinel"), + mustStopCh: make(chan struct{}, 1), + }, nil +} + +func (e *etcd) MustStop() <-chan struct{} { + return e.mustStopCh +} + +var _ server.Instance = &etcd{} diff --git a/pkg/etcd/register.go b/pkg/etcd/register.go new file mode 100644 index 00000000..77b62004 --- /dev/null +++ b/pkg/etcd/register.go @@ -0,0 +1,81 @@ +package etcd + +import ( + "context" + "fmt" + "os" + "strings" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +func (e *etcd) hasValidSentinelFile() (bool, error) { + b, err := os.ReadFile(e.sentinelFile) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + return strings.TrimSpace(string(b)) == e.peerURL, nil +} + +func (e *etcd) createSentinel() error { + return os.WriteFile(e.sentinelFile, []byte(e.peerURL), 0600) +} + +func (e *etcd) ensurePeerInCluster(ctx context.Context) (string, error) { + if e.config.ClusterState == "new" { + return "", nil + } + + if hasSentinel, err := e.hasValidSentinelFile(); err != nil { + return "", fmt.Errorf("failed to check sentinel file: %w", err) + } else if hasSentinel { + return "", nil + } + + client, err := clientv3.New(e.clientConfig) + if err != nil { + return "", fmt.Errorf("failed to create etcd client: %w", err) + } + defer client.Close() + + resp, err := client.Cluster.MemberList(ctx) + if err != nil { + return "", fmt.Errorf("failed to list cluster members: %w", err) + } + + for _, member := range resp.Members { + for _, url := range member.GetPeerURLs() { + if e.peerURL == url { + return "", nil + } + } + } + + members, err := client.Cluster.MemberAdd(ctx, []string{e.peerURL}) + if err != nil { + return "", fmt.Errorf("failed to add cluster member with peerURL %q: %w", e.peerURL, err) + } + + defer func() { + if err := e.createSentinel(); err != nil { + panic(fmt.Sprintf("failed to create sentinel file: %v", err.Error())) + } + }() + + initialClusterParts := make([]string, 0, len(members.Members)) + for _, member := range members.Members { + name := member.GetName() + for _, url := range member.GetPeerURLs() { + if url == e.peerURL { + name = e.config.Name + } + initialClusterParts = append(initialClusterParts, fmt.Sprintf("%s=%s", name, url)) + break + } + } + + return strings.Join(initialClusterParts, ","), nil +} diff --git a/pkg/etcd/shutdown.go b/pkg/etcd/shutdown.go new file mode 100644 index 00000000..ca830350 --- /dev/null +++ b/pkg/etcd/shutdown.go @@ -0,0 +1,22 @@ +package etcd + +import ( + "context" + "fmt" +) + +func (e *etcd) Shutdown(ctx context.Context) error { + if e.instance == nil { + return nil + } + e.instance.Close() + + select { + case <-ctx.Done(): + return fmt.Errorf("timed out waiting for server to stop: %w", ctx.Err()) + case <-e.instance.Server.StopNotify(): + } + + close(e.mustStopCh) + return nil +} diff --git a/pkg/etcd/start.go b/pkg/etcd/start.go new file mode 100644 index 00000000..262d43e9 --- /dev/null +++ b/pkg/etcd/start.go @@ -0,0 +1,32 @@ +package etcd + +import ( + "context" + "fmt" + + "go.etcd.io/etcd/server/v3/embed" + "go.uber.org/zap" +) + +func (e *etcd) Start(ctx context.Context) error { + if initialCluster, err := e.ensurePeerInCluster(ctx); err != nil { + return fmt.Errorf("failed to initialize node: %w", err) + } else if initialCluster != "" { + e.config.GetLogger().Info("Set initial cluster", zap.String("initial-cluster", initialCluster)) + e.config.InitialCluster = initialCluster + e.config.ClusterState = "existing" + } + + if instance, err := embed.StartEtcd(e.config); err != nil { + return fmt.Errorf("failed to start node: %w", err) + } else { + e.instance = instance + } + + select { + case <-ctx.Done(): + return fmt.Errorf("etcd did not start in time: %w", ctx.Err()) + case <-e.instance.Server.ReadyNotify(): + return nil + } +} diff --git a/pkg/server/interface.go b/pkg/server/interface.go new file mode 100644 index 00000000..93e96da5 --- /dev/null +++ b/pkg/server/interface.go @@ -0,0 +1,9 @@ +package server + +import "context" + +type Instance interface { + Start(context.Context) error + MustStop() <-chan struct{} + Shutdown(context.Context) error +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 99357db6..7c37e8c4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -357,3 +357,5 @@ func (s *Server) Shutdown(ctx context.Context) error { close(s.mustStopCh) return nil } + +var _ Instance = &Server{}