From 2a1951fea1ebee9d51fc6470a80ea9a962562ce0 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Thu, 13 Jun 2024 22:56:14 +0300 Subject: [PATCH 01/18] implement embedded mode --- cmd/root.go | 49 +++++++++++++++--------- pkg/embedded/config.go | 31 +++++++++++++++ pkg/embedded/embedded.go | 73 +++++++++++++++++++++++++++++++++++ pkg/embedded/register.go | 83 ++++++++++++++++++++++++++++++++++++++++ pkg/embedded/shutdown.go | 22 +++++++++++ pkg/embedded/start.go | 32 ++++++++++++++++ pkg/server/interface.go | 9 +++++ pkg/server/server.go | 2 + 8 files changed, 283 insertions(+), 18 deletions(-) create mode 100644 pkg/embedded/config.go create mode 100644 pkg/embedded/embedded.go create mode 100644 pkg/embedded/register.go create mode 100644 pkg/embedded/shutdown.go create mode 100644 pkg/embedded/start.go create mode 100644 pkg/server/interface.go diff --git a/cmd/root.go b/cmd/root.go index c72e7f64..27a39806 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -9,6 +9,7 @@ import ( "os/signal" "time" + "github.com/canonical/k8s-dqlite/pkg/embedded" "github.com/canonical/k8s-dqlite/pkg/server" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -37,6 +38,8 @@ var ( admissionControlPolicy string acpLimitMaxConcurrentTxn int64 acpOnlyWriteQueries bool + + embeddedMode bool } rootCmd = &cobra.Command{ @@ -66,26 +69,35 @@ var ( }() } - server, err := server.New( - rootCmdOpts.dir, - rootCmdOpts.listen, - rootCmdOpts.tls, - rootCmdOpts.diskMode, - rootCmdOpts.clientSessionCacheSize, - rootCmdOpts.minTLSVersion, - rootCmdOpts.watchAvailableStorageInterval, - rootCmdOpts.watchAvailableStorageMinBytes, - rootCmdOpts.lowAvailableStorageAction, - rootCmdOpts.admissionControlPolicy, - rootCmdOpts.acpLimitMaxConcurrentTxn, - rootCmdOpts.acpOnlyWriteQueries, + var ( + instance server.Instance + err error ) - if err != nil { - logrus.WithError(err).Fatal("Failed to create server") + if rootCmdOpts.embeddedMode { + if instance, err = embedded.New(rootCmdOpts.dir); err != nil { + logrus.WithError(err).Fatal("Failed to create embedded server") + } + } else { + if instance, err = server.New( + rootCmdOpts.dir, + rootCmdOpts.listen, + rootCmdOpts.tls, + rootCmdOpts.diskMode, + rootCmdOpts.clientSessionCacheSize, + rootCmdOpts.minTLSVersion, + rootCmdOpts.watchAvailableStorageInterval, + rootCmdOpts.watchAvailableStorageMinBytes, + rootCmdOpts.lowAvailableStorageAction, + rootCmdOpts.admissionControlPolicy, + rootCmdOpts.acpLimitMaxConcurrentTxn, + rootCmdOpts.acpOnlyWriteQueries, + ); err != nil { + logrus.WithError(err).Fatal("Failed to create server") + } } ctx, cancel := context.WithCancel(cmd.Context()) - if err := server.Start(ctx); err != nil { + if err := instance.Start(ctx); err != nil { logrus.WithError(err).Fatal("Server failed to start") } @@ -98,7 +110,7 @@ var ( select { case <-ch: - case <-server.MustStop(): + case <-instance.MustStop(): } cancel() @@ -106,7 +118,7 @@ var ( stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if err := server.Shutdown(stopCtx); err != nil { + if err := instance.Shutdown(stopCtx); err != nil { logrus.WithError(err).Fatal("Failed to shutdown server") } }, @@ -141,4 +153,5 @@ func init() { // TODO(MK-1408): This value is highly dependend on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value. rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.") rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.") + rootCmd.Flags().BoolVar(&rootCmdOpts.embeddedMode, "embedded", false, "Run in embedded mode") } diff --git a/pkg/embedded/config.go b/pkg/embedded/config.go new file mode 100644 index 00000000..ed046af1 --- /dev/null +++ b/pkg/embedded/config.go @@ -0,0 +1,31 @@ +package embedded + +import ( + "fmt" + "os" + "path/filepath" + + "gopkg.in/yaml.v2" +) + +type clusterConfig struct { + ClientURLs []string `yaml:"client-urls,omitempty"` + PeerURL string `yaml:"peer-url,omitempty"` + CAFile string `yaml:"ca-file,omitempty"` + CertFile string `yaml:"cert-file,omitempty"` + KeyFile string `yaml:"key-file,omitempty"` + PeerCAFile string `yaml:"peer-ca-file,omitempty"` + PeerCertFile string `yaml:"peer-cert-file,omitempty"` + PeerKeyFile string `yaml:"peer-key-file,omitempty"` +} + +func fileUnmarshal(v interface{}, path ...string) error { + b, err := os.ReadFile(filepath.Join(path...)) + if err != nil { + return fmt.Errorf("failed to read file contents: %w", err) + } + if err := yaml.Unmarshal(b, v); err != nil { + return fmt.Errorf("failed to parse as yaml: %w", err) + } + return nil +} diff --git a/pkg/embedded/embedded.go b/pkg/embedded/embedded.go new file mode 100644 index 00000000..d7f2f825 --- /dev/null +++ b/pkg/embedded/embedded.go @@ -0,0 +1,73 @@ +package embedded + +import ( + "fmt" + "os" + "path/filepath" + + kine_tls "github.com/canonical/k8s-dqlite/pkg/kine/tls" + "github.com/canonical/k8s-dqlite/pkg/server" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" +) + +type embedded struct { + clientConfig clientv3.Config + + peerURL string + sentinelFile string + + config *embed.Config + instance *embed.Etcd + + mustStopCh chan struct{} +} + +func New(storageDir string) (*embedded, error) { + config, err := embed.ConfigFromFile(filepath.Join(storageDir, "embedded.yaml")) + if err != nil { + return nil, fmt.Errorf("failed to load embedded config: %w", err) + } + var clusterConfig clusterConfig + if err := fileUnmarshal(&clusterConfig, storageDir, "config.yaml"); err != nil { + return nil, fmt.Errorf("failed to load cluster config: %w", err) + } + + config.PeerTLSInfo.TrustedCAFile = clusterConfig.PeerCAFile + config.PeerTLSInfo.CertFile = clusterConfig.PeerCertFile + config.PeerTLSInfo.KeyFile = clusterConfig.PeerKeyFile + + config.ClientTLSInfo.TrustedCAFile = clusterConfig.CAFile + config.ClientTLSInfo.CertFile = clusterConfig.CertFile + config.ClientTLSInfo.KeyFile = clusterConfig.KeyFile + + tlsConfig, err := kine_tls.Config{ + CAFile: clusterConfig.CAFile, + CertFile: clusterConfig.CertFile, + KeyFile: clusterConfig.KeyFile, + }.ClientConfig() + if err != nil { + return nil, fmt.Errorf("failed to initialize client TLS config: %w", err) + } + + if err := os.MkdirAll(config.Dir, 0700); err != nil { + return nil, fmt.Errorf("failed to ensure data directory: %w", err) + } + + return &embedded{ + config: config, + clientConfig: clientv3.Config{ + Endpoints: clusterConfig.ClientURLs, + TLS: tlsConfig, + }, + peerURL: clusterConfig.PeerURL, + sentinelFile: filepath.Join(config.Dir, "sentinel"), + mustStopCh: make(chan struct{}, 1), + }, nil +} + +func (e *embedded) MustStop() <-chan struct{} { + return e.mustStopCh +} + +var _ server.Instance = &embedded{} diff --git a/pkg/embedded/register.go b/pkg/embedded/register.go new file mode 100644 index 00000000..0e19ea24 --- /dev/null +++ b/pkg/embedded/register.go @@ -0,0 +1,83 @@ +package embedded + +import ( + "context" + "fmt" + "os" + "strings" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +func (e *embedded) hasValidSentinelFile() (bool, error) { + b, err := os.ReadFile(e.sentinelFile) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + return strings.TrimSpace(string(b)) == e.peerURL, nil +} + +func (e *embedded) createSentinel() error { + return os.WriteFile(e.sentinelFile, []byte(e.peerURL), 0600) +} + +func (e *embedded) ensurePeerInCluster(ctx context.Context) (string, error) { + if e.config.ClusterState == "new" { + return "", nil + } + + if hasSentinel, err := e.hasValidSentinelFile(); err != nil { + return "", fmt.Errorf("failed to check sentinel file: %w", err) + } else if hasSentinel { + return "", nil + } + + client, err := clientv3.New(e.clientConfig) + if err != nil { + return "", fmt.Errorf("failed to create etcd client: %w", err) + } + defer client.Close() + + resp, err := client.Cluster.MemberList(ctx) + if err != nil { + return "", fmt.Errorf("failed to list cluster members: %w", err) + } + + for _, member := range resp.Members { + for _, url := range member.GetPeerURLs() { + if e.peerURL == url { + return "", nil + } + } + } + + members, err := client.Cluster.MemberAdd(ctx, []string{e.peerURL}) + if err != nil { + return "", fmt.Errorf("failed to add cluster member with peerURL %q: %w", e.peerURL, err) + } + + defer func() { + if err := e.createSentinel(); err != nil { + panic(fmt.Sprintf("failed to create sentinel file: %w", err.Error())) + } + }() + + initialClusterParts := make([]string, 0, len(members.Members)) + +nextMember: + for _, member := range members.Members { + name := member.GetName() + for _, url := range member.GetPeerURLs() { + if url == e.peerURL { + name = e.config.Name + } + initialClusterParts = append(initialClusterParts, fmt.Sprintf("%s=%s", name, member.GetPeerURLs()[0])) + continue nextMember + } + } + + return strings.Join(initialClusterParts, ","), nil +} diff --git a/pkg/embedded/shutdown.go b/pkg/embedded/shutdown.go new file mode 100644 index 00000000..616e07e6 --- /dev/null +++ b/pkg/embedded/shutdown.go @@ -0,0 +1,22 @@ +package embedded + +import ( + "context" + "fmt" +) + +func (e *embedded) Shutdown(ctx context.Context) error { + if e.instance == nil { + return nil + } + e.instance.Close() + + select { + case <-ctx.Done(): + return fmt.Errorf("timed out waiting for server to stop: %w", ctx.Err()) + case <-e.instance.Server.StopNotify(): + } + + close(e.mustStopCh) + return nil +} diff --git a/pkg/embedded/start.go b/pkg/embedded/start.go new file mode 100644 index 00000000..99537ee6 --- /dev/null +++ b/pkg/embedded/start.go @@ -0,0 +1,32 @@ +package embedded + +import ( + "context" + "fmt" + + "go.etcd.io/etcd/server/v3/embed" + "go.uber.org/zap" +) + +func (e *embedded) Start(ctx context.Context) error { + if initialCluster, err := e.ensurePeerInCluster(ctx); err != nil { + return fmt.Errorf("failed to initialize node: %w", err) + } else if initialCluster != "" { + e.config.GetLogger().Info("Set initial cluster", zap.String("initial-cluster", initialCluster)) + e.config.InitialCluster = initialCluster + e.config.ClusterState = "existing" + } + + if instance, err := embed.StartEtcd(e.config); err != nil { + return fmt.Errorf("failed to start node: %w", err) + } else { + e.instance = instance + } + + select { + case <-ctx.Done(): + return fmt.Errorf("etcd did not start in time: %w", ctx.Err()) + case <-e.instance.Server.ReadyNotify(): + return nil + } +} diff --git a/pkg/server/interface.go b/pkg/server/interface.go new file mode 100644 index 00000000..93e96da5 --- /dev/null +++ b/pkg/server/interface.go @@ -0,0 +1,9 @@ +package server + +import "context" + +type Instance interface { + Start(context.Context) error + MustStop() <-chan struct{} + Shutdown(context.Context) error +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 99357db6..7c37e8c4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -357,3 +357,5 @@ func (s *Server) Shutdown(ctx context.Context) error { close(s.mustStopCh) return nil } + +var _ Instance = &Server{} From e02e30f22cc192629062cd17ee7d5b04e1b3172c Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sat, 15 Jun 2024 17:02:50 +0300 Subject: [PATCH 02/18] adjust URL usage in registration code --- pkg/embedded/register.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/embedded/register.go b/pkg/embedded/register.go index 0e19ea24..d61fba8f 100644 --- a/pkg/embedded/register.go +++ b/pkg/embedded/register.go @@ -66,16 +66,14 @@ func (e *embedded) ensurePeerInCluster(ctx context.Context) (string, error) { }() initialClusterParts := make([]string, 0, len(members.Members)) - -nextMember: for _, member := range members.Members { name := member.GetName() for _, url := range member.GetPeerURLs() { if url == e.peerURL { name = e.config.Name } - initialClusterParts = append(initialClusterParts, fmt.Sprintf("%s=%s", name, member.GetPeerURLs()[0])) - continue nextMember + initialClusterParts = append(initialClusterParts, fmt.Sprintf("%s=%s", name, url)) + break } } From 2888d8145d7b088d226c93955509f8986bc2d472 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sat, 15 Jun 2024 21:47:28 +0300 Subject: [PATCH 03/18] start embeddedctl commands --- cmd/embeddedctl/embedded.go | 21 ++++++++++++++++++++ cmd/embeddedctl/member.go | 23 ++++++++++++++++++++++ cmd/embeddedctl/util.go | 39 +++++++++++++++++++++++++++++++++++++ cmd/root.go | 3 +++ pkg/embedded/client.go | 9 +++++++++ 5 files changed, 95 insertions(+) create mode 100644 cmd/embeddedctl/embedded.go create mode 100644 cmd/embeddedctl/member.go create mode 100644 cmd/embeddedctl/util.go create mode 100644 pkg/embedded/client.go diff --git a/cmd/embeddedctl/embedded.go b/cmd/embeddedctl/embedded.go new file mode 100644 index 00000000..633ad7d8 --- /dev/null +++ b/cmd/embeddedctl/embedded.go @@ -0,0 +1,21 @@ +package embeddedctl + +import ( + "os" + + "github.com/spf13/cobra" +) + +var ( + flagStorageDir string + + Command = &cobra.Command{ + Use: "embeddedctl", + } +) + +func init() { + Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", os.Getenv("EMBEDDED_DIR"), "k8s-dqlite storage directory") + + Command.AddCommand(memberCmd) +} diff --git a/cmd/embeddedctl/member.go b/cmd/embeddedctl/member.go new file mode 100644 index 00000000..f88d5ed9 --- /dev/null +++ b/cmd/embeddedctl/member.go @@ -0,0 +1,23 @@ +package embeddedctl + +import ( + "context" + + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +var memberCmd = &cobra.Command{ + Use: "member", +} + +var memberListCmd = &cobra.Command{ + Use: "list", + RunE: newCommand(func(ctx context.Context, client *clientv3.Client) (any, error) { + return client.MemberList(ctx) + }), +} + +func init() { + memberCmd.AddCommand(memberListCmd) +} diff --git a/cmd/embeddedctl/util.go b/cmd/embeddedctl/util.go new file mode 100644 index 00000000..ca74d6e6 --- /dev/null +++ b/cmd/embeddedctl/util.go @@ -0,0 +1,39 @@ +package embeddedctl + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/canonical/k8s-dqlite/pkg/embedded" + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +func newEmbeddedClient(storageDir string) (*clientv3.Client, error) { + instance, err := embedded.New(storageDir) + if err != nil { + return nil, fmt.Errorf("failed to initialize instance: %w", err) + } + return instance.NewClient() +} + +func jsonOutput(i any) error { + b, err := json.MarshalIndent(i, "", " ") + if err != nil { + return fmt.Errorf("failed to format JSON output: %w", err) + } + fmt.Println(string(b)) + return nil +} + +func newCommand(f func(context.Context, *clientv3.Client) (any, error)) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + client, err := newEmbeddedClient(flagStorageDir) + if err != nil { + return fmt.Errorf("failed to initialize embedded client: %w", err) + } + resp, err := f(cmd.Context(), client) + return jsonOutput(resp) + } +} diff --git a/cmd/root.go b/cmd/root.go index 27a39806..beceb525 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -9,6 +9,7 @@ import ( "os/signal" "time" + "github.com/canonical/k8s-dqlite/cmd/embeddedctl" "github.com/canonical/k8s-dqlite/pkg/embedded" "github.com/canonical/k8s-dqlite/pkg/server" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -154,4 +155,6 @@ func init() { rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.") rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.") rootCmd.Flags().BoolVar(&rootCmdOpts.embeddedMode, "embedded", false, "Run in embedded mode") + + rootCmd.AddCommand(embeddedctl.Command) } diff --git a/pkg/embedded/client.go b/pkg/embedded/client.go new file mode 100644 index 00000000..547d01f6 --- /dev/null +++ b/pkg/embedded/client.go @@ -0,0 +1,9 @@ +package embedded + +import ( + clientv3 "go.etcd.io/etcd/client/v3" +) + +func (e *embedded) NewClient() (*clientv3.Client, error) { + return clientv3.New(e.clientConfig) +} From f87efbe2694b3bf94819e12c83f405dcc3709d41 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sat, 15 Jun 2024 21:55:25 +0300 Subject: [PATCH 04/18] use local client --- cmd/embeddedctl/util.go | 2 +- pkg/embedded/client.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/embeddedctl/util.go b/cmd/embeddedctl/util.go index ca74d6e6..cdcd5928 100644 --- a/cmd/embeddedctl/util.go +++ b/cmd/embeddedctl/util.go @@ -15,7 +15,7 @@ func newEmbeddedClient(storageDir string) (*clientv3.Client, error) { if err != nil { return nil, fmt.Errorf("failed to initialize instance: %w", err) } - return instance.NewClient() + return instance.NewLocalClient() } func jsonOutput(i any) error { diff --git a/pkg/embedded/client.go b/pkg/embedded/client.go index 547d01f6..6112f858 100644 --- a/pkg/embedded/client.go +++ b/pkg/embedded/client.go @@ -7,3 +7,10 @@ import ( func (e *embedded) NewClient() (*clientv3.Client, error) { return clientv3.New(e.clientConfig) } + +func (e *embedded) NewLocalClient() (*clientv3.Client, error) { + return clientv3.New(clientv3.Config{ + Endpoints: []string{e.config.AdvertiseClientUrls[0].String()}, + TLS: e.clientConfig.TLS.Clone(), + }) +} From 19ab01fe6b13351dbc467a2bbc86a115d6ead50a Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sat, 15 Jun 2024 22:02:22 +0300 Subject: [PATCH 05/18] add and remove commands --- cmd/embeddedctl/member.go | 47 +++++++++++++++++++++++++++++++-------- cmd/embeddedctl/util.go | 4 ++-- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/cmd/embeddedctl/member.go b/cmd/embeddedctl/member.go index f88d5ed9..6d0eb646 100644 --- a/cmd/embeddedctl/member.go +++ b/cmd/embeddedctl/member.go @@ -2,22 +2,51 @@ package embeddedctl import ( "context" + "fmt" + "strconv" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) -var memberCmd = &cobra.Command{ - Use: "member", -} +var ( + flagPeerURLs []string + flagMemberID uint64 -var memberListCmd = &cobra.Command{ - Use: "list", - RunE: newCommand(func(ctx context.Context, client *clientv3.Client) (any, error) { - return client.MemberList(ctx) - }), -} + memberCmd = &cobra.Command{ + Use: "member", + } + + memberListCmd = &cobra.Command{ + Use: "list", + RunE: newCommand(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + return client.MemberList(ctx) + }), + } + + memberAddCmd = &cobra.Command{ + Use: "add", + Args: cobra.MinimumNArgs(1), + RunE: newCommand(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + return client.MemberAdd(ctx, args) + }), + } + + memberRemoveCmd = &cobra.Command{ + Use: "add", + Args: cobra.ExactArgs(1), + RunE: newCommand(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + id, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("cannot parse member ID %q: %w", args[0], err) + } + return client.MemberRemove(ctx, id) + }), + } +) func init() { memberCmd.AddCommand(memberListCmd) + memberCmd.AddCommand(memberAddCmd) + memberCmd.AddCommand(memberRemoveCmd) } diff --git a/cmd/embeddedctl/util.go b/cmd/embeddedctl/util.go index cdcd5928..33f73e9d 100644 --- a/cmd/embeddedctl/util.go +++ b/cmd/embeddedctl/util.go @@ -27,13 +27,13 @@ func jsonOutput(i any) error { return nil } -func newCommand(f func(context.Context, *clientv3.Client) (any, error)) func(cmd *cobra.Command, args []string) error { +func newCommand(f func(context.Context, *clientv3.Client, []string) (any, error)) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { client, err := newEmbeddedClient(flagStorageDir) if err != nil { return fmt.Errorf("failed to initialize embedded client: %w", err) } - resp, err := f(cmd.Context(), client) + resp, err := f(cmd.Context(), client, args) return jsonOutput(resp) } } From 8202c2e7d8d9a7a2da5c217ce8195856177bcf8f Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sat, 15 Jun 2024 22:03:24 +0300 Subject: [PATCH 06/18] fix remove command --- cmd/embeddedctl/member.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/embeddedctl/member.go b/cmd/embeddedctl/member.go index 6d0eb646..7131dd0a 100644 --- a/cmd/embeddedctl/member.go +++ b/cmd/embeddedctl/member.go @@ -33,7 +33,7 @@ var ( } memberRemoveCmd = &cobra.Command{ - Use: "add", + Use: "remove", Args: cobra.ExactArgs(1), RunE: newCommand(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { id, err := strconv.ParseUint(args[0], 10, 64) From d21641ab552dc31b0fde7a979b227fb96ada0f0d Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sat, 15 Jun 2024 22:50:11 +0300 Subject: [PATCH 07/18] handle failures --- cmd/embeddedctl/util.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/embeddedctl/util.go b/cmd/embeddedctl/util.go index 33f73e9d..c911d3d0 100644 --- a/cmd/embeddedctl/util.go +++ b/cmd/embeddedctl/util.go @@ -34,6 +34,9 @@ func newCommand(f func(context.Context, *clientv3.Client, []string) (any, error) return fmt.Errorf("failed to initialize embedded client: %w", err) } resp, err := f(cmd.Context(), client, args) + if err != nil { + return fmt.Errorf("command failed: %w", err) + } return jsonOutput(resp) } } From 7538f66c4caa424efdc3d5bbb0dbea62702f655c Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sat, 15 Jun 2024 22:50:42 +0300 Subject: [PATCH 08/18] allow remove with --peer-url --- cmd/embeddedctl/member.go | 59 +++++++++++++++++++++++++++++---------- cmd/embeddedctl/util.go | 2 +- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/cmd/embeddedctl/member.go b/cmd/embeddedctl/member.go index 7131dd0a..50916490 100644 --- a/cmd/embeddedctl/member.go +++ b/cmd/embeddedctl/member.go @@ -10,43 +10,72 @@ import ( ) var ( - flagPeerURLs []string - flagMemberID uint64 + flagPeerURL string memberCmd = &cobra.Command{ - Use: "member", + Use: "member", + Short: "Manage cluster members", } memberListCmd = &cobra.Command{ - Use: "list", - RunE: newCommand(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + Use: "list", + Short: "List cluster members", + RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { return client.MemberList(ctx) }), } memberAddCmd = &cobra.Command{ - Use: "add", - Args: cobra.MinimumNArgs(1), - RunE: newCommand(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + Use: "add [peerURL ...]", + Short: "Add a new cluster member", + Args: cobra.MinimumNArgs(1), + RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { return client.MemberAdd(ctx, args) }), } memberRemoveCmd = &cobra.Command{ - Use: "remove", - Args: cobra.ExactArgs(1), - RunE: newCommand(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { - id, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return nil, fmt.Errorf("cannot parse member ID %q: %w", args[0], err) + Use: "remove [memberID]", + Short: "Remove a cluster member", + Args: cobra.MaximumNArgs(1), + RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + switch { + case flagPeerURL == "" && len(args) == 0: + return nil, fmt.Errorf("specify member ID as argument, or --peer-url flag") + case flagPeerURL != "" && len(args) == 1: + return nil, fmt.Errorf("--peer-url flag not allowed when a member ID is specified") + case len(args) == 1: + id, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("cannot parse member ID %q: %w", args[0], err) + } + return client.MemberRemove(ctx, id) + default: + members, err := client.MemberList(ctx) + if err != nil { + return nil, fmt.Errorf("failed to retrieve list of cluster members: %w", err) + } + for _, member := range members.Members { + for _, url := range member.PeerURLs { + if url == flagPeerURL { + return client.MemberRemove(ctx, member.ID) + } + } + } + return nil, fmt.Errorf("cluster member not found") } - return client.MemberRemove(ctx, id) }), } ) func init() { + // member list memberCmd.AddCommand(memberListCmd) + + // member add memberCmd.AddCommand(memberAddCmd) + + // member remove + memberRemoveCmd.Flags().StringVar(&flagPeerURL, "peer-url", "", "remove cluster member with a matching peer URL") memberCmd.AddCommand(memberRemoveCmd) } diff --git a/cmd/embeddedctl/util.go b/cmd/embeddedctl/util.go index c911d3d0..37960e99 100644 --- a/cmd/embeddedctl/util.go +++ b/cmd/embeddedctl/util.go @@ -27,7 +27,7 @@ func jsonOutput(i any) error { return nil } -func newCommand(f func(context.Context, *clientv3.Client, []string) (any, error)) func(cmd *cobra.Command, args []string) error { +func command(f func(context.Context, *clientv3.Client, []string) (any, error)) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { client, err := newEmbeddedClient(flagStorageDir) if err != nil { From 752918fa397a252ad79254cacf48b9ae016a509f Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sat, 15 Jun 2024 23:04:22 +0300 Subject: [PATCH 09/18] add snapshot commands --- cmd/embeddedctl/embedded.go | 1 + cmd/embeddedctl/snapshot.go | 44 +++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 cmd/embeddedctl/snapshot.go diff --git a/cmd/embeddedctl/embedded.go b/cmd/embeddedctl/embedded.go index 633ad7d8..fd696d58 100644 --- a/cmd/embeddedctl/embedded.go +++ b/cmd/embeddedctl/embedded.go @@ -18,4 +18,5 @@ func init() { Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", os.Getenv("EMBEDDED_DIR"), "k8s-dqlite storage directory") Command.AddCommand(memberCmd) + Command.AddCommand(snapshotCmd) } diff --git a/cmd/embeddedctl/snapshot.go b/cmd/embeddedctl/snapshot.go new file mode 100644 index 00000000..a3cb01bb --- /dev/null +++ b/cmd/embeddedctl/snapshot.go @@ -0,0 +1,44 @@ +package embeddedctl + +import ( + "context" + "fmt" + "io" + "os" + + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +var ( + snapshotCmd = &cobra.Command{ + Use: "snapshot", + Short: "Manage cluster snapshots", + } + + snapshotSaveCmd = &cobra.Command{ + Use: "save [backup.db]", + Args: cobra.ExactArgs(1), + Short: "Save a snapshot of the cluster", + RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { + reader, err := client.Snapshot(ctx) + if err != nil { + return nil, fmt.Errorf("failed to request snapshot: %w", err) + } + b, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to retrieve snapshot: %w", err) + } + if err := os.WriteFile(args[0], b, 0600); err != nil { + return nil, fmt.Errorf("failed to write snapshot to %q: %w", err) + } + return map[string]any{"size": len(b), "file": args[0]}, nil + }), + } +) + +func init() { + // snapshot save + snapshotCmd.AddCommand(snapshotSaveCmd) + +} From 6109aaf2d3c9cd8866a4b932a159dba55c77f8ee Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sun, 16 Jun 2024 00:54:32 +0300 Subject: [PATCH 10/18] make useful default for --storage-dir --- cmd/embeddedctl/embedded.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/cmd/embeddedctl/embedded.go b/cmd/embeddedctl/embedded.go index fd696d58..a1fea665 100644 --- a/cmd/embeddedctl/embedded.go +++ b/cmd/embeddedctl/embedded.go @@ -2,6 +2,7 @@ package embeddedctl import ( "os" + "path/filepath" "github.com/spf13/cobra" ) @@ -15,7 +16,17 @@ var ( ) func init() { - Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", os.Getenv("EMBEDDED_DIR"), "k8s-dqlite storage directory") + // convenient default + defaultStorageDir := os.Getenv("EMBEDDED_DIR") + if defaultStorageDir == "" { + snapCommon := os.Getenv("SNAP_COMMON") + if snapCommon == "" { + snapCommon = "/var/snap/k8s/common" + } + defaultStorageDir = filepath.Join(snapCommon, "var", "lib", "k8s-dqlite") + } + + Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", os.Getenv("EMBEDDED_DIR"), "k8s-dqlite state directory") Command.AddCommand(memberCmd) Command.AddCommand(snapshotCmd) From 150ac5fd69e56bcd68084f00d7af99c4eacbe073 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sun, 16 Jun 2024 00:58:17 +0300 Subject: [PATCH 11/18] fixup --- cmd/embeddedctl/embedded.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/embeddedctl/embedded.go b/cmd/embeddedctl/embedded.go index a1fea665..bc2b10b7 100644 --- a/cmd/embeddedctl/embedded.go +++ b/cmd/embeddedctl/embedded.go @@ -26,7 +26,7 @@ func init() { defaultStorageDir = filepath.Join(snapCommon, "var", "lib", "k8s-dqlite") } - Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", os.Getenv("EMBEDDED_DIR"), "k8s-dqlite state directory") + Command.PersistentFlags().StringVar(&flagStorageDir, "storage-dir", defaultStorageDir, "k8s-dqlite state directory") Command.AddCommand(memberCmd) Command.AddCommand(snapshotCmd) From 8bbb48879594214bbd164b63285673b2c7e3d67a Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Mon, 17 Jun 2024 09:46:05 +0300 Subject: [PATCH 12/18] go mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 7a640896..36fc4308 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.12 go.etcd.io/etcd/client/v3 v3.5.12 go.etcd.io/etcd/server/v3 v3.5.12 + go.uber.org/zap v1.27.0 golang.org/x/sync v0.6.0 golang.org/x/sys v0.18.0 google.golang.org/grpc v1.62.1 @@ -66,7 +67,6 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/net v0.22.0 // indirect golang.org/x/text v0.14.0 // indirect From c6ce1d9350d6a3555190c21c3fa789e908090c1c Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Mon, 17 Jun 2024 09:56:36 +0300 Subject: [PATCH 13/18] go vet fixes --- cmd/embeddedctl/snapshot.go | 2 +- pkg/embedded/register.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/embeddedctl/snapshot.go b/cmd/embeddedctl/snapshot.go index a3cb01bb..9d85100c 100644 --- a/cmd/embeddedctl/snapshot.go +++ b/cmd/embeddedctl/snapshot.go @@ -30,7 +30,7 @@ var ( return nil, fmt.Errorf("failed to retrieve snapshot: %w", err) } if err := os.WriteFile(args[0], b, 0600); err != nil { - return nil, fmt.Errorf("failed to write snapshot to %q: %w", err) + return nil, fmt.Errorf("failed to write snapshot to %q: %w", args[0], err) } return map[string]any{"size": len(b), "file": args[0]}, nil }), diff --git a/pkg/embedded/register.go b/pkg/embedded/register.go index d61fba8f..15351875 100644 --- a/pkg/embedded/register.go +++ b/pkg/embedded/register.go @@ -61,7 +61,7 @@ func (e *embedded) ensurePeerInCluster(ctx context.Context) (string, error) { defer func() { if err := e.createSentinel(); err != nil { - panic(fmt.Sprintf("failed to create sentinel file: %w", err.Error())) + panic(fmt.Sprintf("failed to create sentinel file: %v", err.Error())) } }() From eb91d86fce4318a49b3de2b637e9d34401480bf9 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Mon, 17 Jun 2024 19:19:26 +0300 Subject: [PATCH 14/18] improve error messages for embeddedctl commands --- cmd/embeddedctl/embedded.go | 3 ++- cmd/embeddedctl/member.go | 19 +++++++++++-------- cmd/embeddedctl/snapshot.go | 7 ++++--- cmd/root.go | 2 -- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/cmd/embeddedctl/embedded.go b/cmd/embeddedctl/embedded.go index bc2b10b7..b2a3407e 100644 --- a/cmd/embeddedctl/embedded.go +++ b/cmd/embeddedctl/embedded.go @@ -11,7 +11,8 @@ var ( flagStorageDir string Command = &cobra.Command{ - Use: "embeddedctl", + Use: "embeddedctl", + Short: "Interact with the embedded datastore", } ) diff --git a/cmd/embeddedctl/member.go b/cmd/embeddedctl/member.go index 50916490..8d24a1c3 100644 --- a/cmd/embeddedctl/member.go +++ b/cmd/embeddedctl/member.go @@ -18,26 +18,29 @@ var ( } memberListCmd = &cobra.Command{ - Use: "list", - Short: "List cluster members", + Use: "list", + Short: "List cluster members", + SilenceUsage: true, RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { return client.MemberList(ctx) }), } memberAddCmd = &cobra.Command{ - Use: "add [peerURL ...]", - Short: "Add a new cluster member", - Args: cobra.MinimumNArgs(1), + Use: "add [peerURL ...]", + Short: "Add a new cluster member", + SilenceUsage: true, + Args: cobra.MinimumNArgs(1), RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { return client.MemberAdd(ctx, args) }), } memberRemoveCmd = &cobra.Command{ - Use: "remove [memberID]", - Short: "Remove a cluster member", - Args: cobra.MaximumNArgs(1), + Use: "remove [memberID]", + Short: "Remove a cluster member", + SilenceUsage: true, + Args: cobra.MaximumNArgs(1), RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { switch { case flagPeerURL == "" && len(args) == 0: diff --git a/cmd/embeddedctl/snapshot.go b/cmd/embeddedctl/snapshot.go index 9d85100c..64f58e4f 100644 --- a/cmd/embeddedctl/snapshot.go +++ b/cmd/embeddedctl/snapshot.go @@ -17,9 +17,10 @@ var ( } snapshotSaveCmd = &cobra.Command{ - Use: "save [backup.db]", - Args: cobra.ExactArgs(1), - Short: "Save a snapshot of the cluster", + Use: "save [backup.db]", + Short: "Save a snapshot of the cluster", + SilenceUsage: true, + Args: cobra.ExactArgs(1), RunE: command(func(ctx context.Context, client *clientv3.Client, args []string) (any, error) { reader, err := client.Snapshot(ctx) if err != nil { diff --git a/cmd/root.go b/cmd/root.go index beceb525..eac1d307 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "fmt" "net/http" _ "net/http/pprof" "os" @@ -130,7 +129,6 @@ var ( // This is called by main.main(). It only needs to happen once to the liteCmd. func Execute() { if err := rootCmd.Execute(); err != nil { - fmt.Println(err) os.Exit(1) } } From 8f93f917f2a391f67132e300ac07a8896971c180 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sun, 23 Jun 2024 14:40:32 +0300 Subject: [PATCH 15/18] define etcd mode --- cmd/embeddedctl/util.go | 4 +- cmd/root.go | 10 ++-- pkg/embedded/config.go | 31 ------------- pkg/embedded/embedded.go | 73 ------------------------------ pkg/{embedded => etcd}/client.go | 6 +-- pkg/etcd/config.go | 28 ++++++++++++ pkg/etcd/etcd.go | 65 ++++++++++++++++++++++++++ pkg/{embedded => etcd}/register.go | 8 ++-- pkg/{embedded => etcd}/shutdown.go | 4 +- pkg/{embedded => etcd}/start.go | 4 +- 10 files changed, 111 insertions(+), 122 deletions(-) delete mode 100644 pkg/embedded/config.go delete mode 100644 pkg/embedded/embedded.go rename pkg/{embedded => etcd}/client.go (63%) create mode 100644 pkg/etcd/config.go create mode 100644 pkg/etcd/etcd.go rename pkg/{embedded => etcd}/register.go (89%) rename pkg/{embedded => etcd}/shutdown.go (78%) rename pkg/{embedded => etcd}/start.go (91%) diff --git a/cmd/embeddedctl/util.go b/cmd/embeddedctl/util.go index 37960e99..31ca1173 100644 --- a/cmd/embeddedctl/util.go +++ b/cmd/embeddedctl/util.go @@ -5,13 +5,13 @@ import ( "encoding/json" "fmt" - "github.com/canonical/k8s-dqlite/pkg/embedded" + "github.com/canonical/k8s-dqlite/pkg/etcd" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) func newEmbeddedClient(storageDir string) (*clientv3.Client, error) { - instance, err := embedded.New(storageDir) + instance, err := etcd.New(storageDir) if err != nil { return nil, fmt.Errorf("failed to initialize instance: %w", err) } diff --git a/cmd/root.go b/cmd/root.go index eac1d307..40ca332c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -9,7 +9,7 @@ import ( "time" "github.com/canonical/k8s-dqlite/cmd/embeddedctl" - "github.com/canonical/k8s-dqlite/pkg/embedded" + "github.com/canonical/k8s-dqlite/pkg/etcd" "github.com/canonical/k8s-dqlite/pkg/server" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -39,7 +39,7 @@ var ( acpLimitMaxConcurrentTxn int64 acpOnlyWriteQueries bool - embeddedMode bool + etcdMode bool } rootCmd = &cobra.Command{ @@ -74,8 +74,8 @@ var ( err error ) if rootCmdOpts.embeddedMode { - if instance, err = embedded.New(rootCmdOpts.dir); err != nil { - logrus.WithError(err).Fatal("Failed to create embedded server") + if instance, err = etcd.New(rootCmdOpts.dir); err != nil { + logrus.WithError(err).Fatal("Failed to create etcd server") } } else { if instance, err = server.New( @@ -152,7 +152,7 @@ func init() { // TODO(MK-1408): This value is highly dependend on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value. rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.") rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.") - rootCmd.Flags().BoolVar(&rootCmdOpts.embeddedMode, "embedded", false, "Run in embedded mode") + rootCmd.Flags().BoolVar(&rootCmdOpts.etcdMode, "etcd-mode", false, "Run in etcd mode") rootCmd.AddCommand(embeddedctl.Command) } diff --git a/pkg/embedded/config.go b/pkg/embedded/config.go deleted file mode 100644 index ed046af1..00000000 --- a/pkg/embedded/config.go +++ /dev/null @@ -1,31 +0,0 @@ -package embedded - -import ( - "fmt" - "os" - "path/filepath" - - "gopkg.in/yaml.v2" -) - -type clusterConfig struct { - ClientURLs []string `yaml:"client-urls,omitempty"` - PeerURL string `yaml:"peer-url,omitempty"` - CAFile string `yaml:"ca-file,omitempty"` - CertFile string `yaml:"cert-file,omitempty"` - KeyFile string `yaml:"key-file,omitempty"` - PeerCAFile string `yaml:"peer-ca-file,omitempty"` - PeerCertFile string `yaml:"peer-cert-file,omitempty"` - PeerKeyFile string `yaml:"peer-key-file,omitempty"` -} - -func fileUnmarshal(v interface{}, path ...string) error { - b, err := os.ReadFile(filepath.Join(path...)) - if err != nil { - return fmt.Errorf("failed to read file contents: %w", err) - } - if err := yaml.Unmarshal(b, v); err != nil { - return fmt.Errorf("failed to parse as yaml: %w", err) - } - return nil -} diff --git a/pkg/embedded/embedded.go b/pkg/embedded/embedded.go deleted file mode 100644 index d7f2f825..00000000 --- a/pkg/embedded/embedded.go +++ /dev/null @@ -1,73 +0,0 @@ -package embedded - -import ( - "fmt" - "os" - "path/filepath" - - kine_tls "github.com/canonical/k8s-dqlite/pkg/kine/tls" - "github.com/canonical/k8s-dqlite/pkg/server" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/embed" -) - -type embedded struct { - clientConfig clientv3.Config - - peerURL string - sentinelFile string - - config *embed.Config - instance *embed.Etcd - - mustStopCh chan struct{} -} - -func New(storageDir string) (*embedded, error) { - config, err := embed.ConfigFromFile(filepath.Join(storageDir, "embedded.yaml")) - if err != nil { - return nil, fmt.Errorf("failed to load embedded config: %w", err) - } - var clusterConfig clusterConfig - if err := fileUnmarshal(&clusterConfig, storageDir, "config.yaml"); err != nil { - return nil, fmt.Errorf("failed to load cluster config: %w", err) - } - - config.PeerTLSInfo.TrustedCAFile = clusterConfig.PeerCAFile - config.PeerTLSInfo.CertFile = clusterConfig.PeerCertFile - config.PeerTLSInfo.KeyFile = clusterConfig.PeerKeyFile - - config.ClientTLSInfo.TrustedCAFile = clusterConfig.CAFile - config.ClientTLSInfo.CertFile = clusterConfig.CertFile - config.ClientTLSInfo.KeyFile = clusterConfig.KeyFile - - tlsConfig, err := kine_tls.Config{ - CAFile: clusterConfig.CAFile, - CertFile: clusterConfig.CertFile, - KeyFile: clusterConfig.KeyFile, - }.ClientConfig() - if err != nil { - return nil, fmt.Errorf("failed to initialize client TLS config: %w", err) - } - - if err := os.MkdirAll(config.Dir, 0700); err != nil { - return nil, fmt.Errorf("failed to ensure data directory: %w", err) - } - - return &embedded{ - config: config, - clientConfig: clientv3.Config{ - Endpoints: clusterConfig.ClientURLs, - TLS: tlsConfig, - }, - peerURL: clusterConfig.PeerURL, - sentinelFile: filepath.Join(config.Dir, "sentinel"), - mustStopCh: make(chan struct{}, 1), - }, nil -} - -func (e *embedded) MustStop() <-chan struct{} { - return e.mustStopCh -} - -var _ server.Instance = &embedded{} diff --git a/pkg/embedded/client.go b/pkg/etcd/client.go similarity index 63% rename from pkg/embedded/client.go rename to pkg/etcd/client.go index 6112f858..52a9f0b2 100644 --- a/pkg/embedded/client.go +++ b/pkg/etcd/client.go @@ -1,14 +1,14 @@ -package embedded +package etcd import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func (e *embedded) NewClient() (*clientv3.Client, error) { +func (e *etcd) NewClient() (*clientv3.Client, error) { return clientv3.New(e.clientConfig) } -func (e *embedded) NewLocalClient() (*clientv3.Client, error) { +func (e *etcd) NewLocalClient() (*clientv3.Client, error) { return clientv3.New(clientv3.Config{ Endpoints: []string{e.config.AdvertiseClientUrls[0].String()}, TLS: e.clientConfig.TLS.Clone(), diff --git a/pkg/etcd/config.go b/pkg/etcd/config.go new file mode 100644 index 00000000..8b87ab9b --- /dev/null +++ b/pkg/etcd/config.go @@ -0,0 +1,28 @@ +package etcd + +import ( + "fmt" + "os" + "path/filepath" + + "gopkg.in/yaml.v2" +) + +type registerConfig struct { + ClientURLs []string `yaml:"client-urls,omitempty"` + PeerURL string `yaml:"peer-url,omitempty"` + TrustedCAFile string `yaml:"trusted-ca-file,omitempty"` + CertFile string `yaml:"cert-file,omitempty"` + KeyFile string `yaml:"key-file,omitempty"` +} + +func fileUnmarshal(v interface{}, path ...string) error { + b, err := os.ReadFile(filepath.Join(path...)) + if err != nil { + return fmt.Errorf("failed to read file contents: %w", err) + } + if err := yaml.Unmarshal(b, v); err != nil { + return fmt.Errorf("failed to parse as yaml: %w", err) + } + return nil +} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go new file mode 100644 index 00000000..15acd518 --- /dev/null +++ b/pkg/etcd/etcd.go @@ -0,0 +1,65 @@ +package etcd + +import ( + "fmt" + "os" + "path/filepath" + + kine_tls "github.com/canonical/k8s-dqlite/pkg/kine/tls" + "github.com/canonical/k8s-dqlite/pkg/server" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" +) + +type etcd struct { + clientConfig clientv3.Config + + peerURL string + sentinelFile string + + config *embed.Config + instance *embed.Etcd + + mustStopCh chan struct{} +} + +func New(storageDir string) (*etcd, error) { + config, err := embed.ConfigFromFile(filepath.Join(storageDir, "etcd.yaml")) + if err != nil { + return nil, fmt.Errorf("failed to load etcd config: %w", err) + } + var registerConfig registerConfig + if err := fileUnmarshal(®isterConfig, storageDir, "register.yaml"); err != nil { + return nil, fmt.Errorf("failed to load register config: %w", err) + } + + tlsConfig, err := kine_tls.Config{ + CAFile: registerConfig.TrustedCAFile, + CertFile: registerConfig.CertFile, + KeyFile: registerConfig.KeyFile, + }.ClientConfig() + if err != nil { + return nil, fmt.Errorf("failed to initialize client TLS config: %w", err) + } + + if err := os.MkdirAll(config.Dir, 0700); err != nil { + return nil, fmt.Errorf("failed to ensure data directory: %w", err) + } + + return &etcd{ + config: config, + clientConfig: clientv3.Config{ + Endpoints: registerConfig.ClientURLs, + TLS: tlsConfig, + }, + peerURL: registerConfig.PeerURL, + sentinelFile: filepath.Join(config.Dir, "sentinel"), + mustStopCh: make(chan struct{}, 1), + }, nil +} + +func (e *etcd) MustStop() <-chan struct{} { + return e.mustStopCh +} + +var _ server.Instance = &etcd{} diff --git a/pkg/embedded/register.go b/pkg/etcd/register.go similarity index 89% rename from pkg/embedded/register.go rename to pkg/etcd/register.go index 15351875..77b62004 100644 --- a/pkg/embedded/register.go +++ b/pkg/etcd/register.go @@ -1,4 +1,4 @@ -package embedded +package etcd import ( "context" @@ -9,7 +9,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func (e *embedded) hasValidSentinelFile() (bool, error) { +func (e *etcd) hasValidSentinelFile() (bool, error) { b, err := os.ReadFile(e.sentinelFile) if err != nil { if os.IsNotExist(err) { @@ -20,11 +20,11 @@ func (e *embedded) hasValidSentinelFile() (bool, error) { return strings.TrimSpace(string(b)) == e.peerURL, nil } -func (e *embedded) createSentinel() error { +func (e *etcd) createSentinel() error { return os.WriteFile(e.sentinelFile, []byte(e.peerURL), 0600) } -func (e *embedded) ensurePeerInCluster(ctx context.Context) (string, error) { +func (e *etcd) ensurePeerInCluster(ctx context.Context) (string, error) { if e.config.ClusterState == "new" { return "", nil } diff --git a/pkg/embedded/shutdown.go b/pkg/etcd/shutdown.go similarity index 78% rename from pkg/embedded/shutdown.go rename to pkg/etcd/shutdown.go index 616e07e6..ca830350 100644 --- a/pkg/embedded/shutdown.go +++ b/pkg/etcd/shutdown.go @@ -1,11 +1,11 @@ -package embedded +package etcd import ( "context" "fmt" ) -func (e *embedded) Shutdown(ctx context.Context) error { +func (e *etcd) Shutdown(ctx context.Context) error { if e.instance == nil { return nil } diff --git a/pkg/embedded/start.go b/pkg/etcd/start.go similarity index 91% rename from pkg/embedded/start.go rename to pkg/etcd/start.go index 99537ee6..262d43e9 100644 --- a/pkg/embedded/start.go +++ b/pkg/etcd/start.go @@ -1,4 +1,4 @@ -package embedded +package etcd import ( "context" @@ -8,7 +8,7 @@ import ( "go.uber.org/zap" ) -func (e *embedded) Start(ctx context.Context) error { +func (e *etcd) Start(ctx context.Context) error { if initialCluster, err := e.ensurePeerInCluster(ctx); err != nil { return fmt.Errorf("failed to initialize node: %w", err) } else if initialCluster != "" { From d0307099672718ef8603af893687fb00b6a51bb2 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sun, 23 Jun 2024 14:45:01 +0300 Subject: [PATCH 16/18] fixup --- cmd/root.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/root.go b/cmd/root.go index 40ca332c..54795a03 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -73,7 +73,7 @@ var ( instance server.Instance err error ) - if rootCmdOpts.embeddedMode { + if rootCmdOpts.etcdMode { if instance, err = etcd.New(rootCmdOpts.dir); err != nil { logrus.WithError(err).Fatal("Failed to create etcd server") } From 88bd3f62f1be8c8720c9a3f07afcd0bc3d16ea7b Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sun, 23 Jun 2024 20:54:18 +0300 Subject: [PATCH 17/18] rename command to dbctl --- cmd/{embeddedctl/embedded.go => dbctl/dbctl.go} | 6 +++--- cmd/{embeddedctl => dbctl}/member.go | 2 +- cmd/{embeddedctl => dbctl}/snapshot.go | 2 +- cmd/{embeddedctl => dbctl}/util.go | 2 +- cmd/root.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) rename cmd/{embeddedctl/embedded.go => dbctl/dbctl.go} (86%) rename cmd/{embeddedctl => dbctl}/member.go (99%) rename cmd/{embeddedctl => dbctl}/snapshot.go (98%) rename cmd/{embeddedctl => dbctl}/util.go (98%) diff --git a/cmd/embeddedctl/embedded.go b/cmd/dbctl/dbctl.go similarity index 86% rename from cmd/embeddedctl/embedded.go rename to cmd/dbctl/dbctl.go index b2a3407e..3c27fa1f 100644 --- a/cmd/embeddedctl/embedded.go +++ b/cmd/dbctl/dbctl.go @@ -1,4 +1,4 @@ -package embeddedctl +package dbctl import ( "os" @@ -11,14 +11,14 @@ var ( flagStorageDir string Command = &cobra.Command{ - Use: "embeddedctl", + Use: "dbctl", Short: "Interact with the embedded datastore", } ) func init() { // convenient default - defaultStorageDir := os.Getenv("EMBEDDED_DIR") + defaultStorageDir := os.Getenv("STORAGE_DIR") if defaultStorageDir == "" { snapCommon := os.Getenv("SNAP_COMMON") if snapCommon == "" { diff --git a/cmd/embeddedctl/member.go b/cmd/dbctl/member.go similarity index 99% rename from cmd/embeddedctl/member.go rename to cmd/dbctl/member.go index 8d24a1c3..1e73c4a0 100644 --- a/cmd/embeddedctl/member.go +++ b/cmd/dbctl/member.go @@ -1,4 +1,4 @@ -package embeddedctl +package dbctl import ( "context" diff --git a/cmd/embeddedctl/snapshot.go b/cmd/dbctl/snapshot.go similarity index 98% rename from cmd/embeddedctl/snapshot.go rename to cmd/dbctl/snapshot.go index 64f58e4f..a0edafff 100644 --- a/cmd/embeddedctl/snapshot.go +++ b/cmd/dbctl/snapshot.go @@ -1,4 +1,4 @@ -package embeddedctl +package dbctl import ( "context" diff --git a/cmd/embeddedctl/util.go b/cmd/dbctl/util.go similarity index 98% rename from cmd/embeddedctl/util.go rename to cmd/dbctl/util.go index 31ca1173..4731ae8b 100644 --- a/cmd/embeddedctl/util.go +++ b/cmd/dbctl/util.go @@ -1,4 +1,4 @@ -package embeddedctl +package dbctl import ( "context" diff --git a/cmd/root.go b/cmd/root.go index 54795a03..652926a8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -8,7 +8,7 @@ import ( "os/signal" "time" - "github.com/canonical/k8s-dqlite/cmd/embeddedctl" + "github.com/canonical/k8s-dqlite/cmd/dbctl" "github.com/canonical/k8s-dqlite/pkg/etcd" "github.com/canonical/k8s-dqlite/pkg/server" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -154,5 +154,5 @@ func init() { rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.") rootCmd.Flags().BoolVar(&rootCmdOpts.etcdMode, "etcd-mode", false, "Run in etcd mode") - rootCmd.AddCommand(embeddedctl.Command) + rootCmd.AddCommand(dbctl.Command) } From bd6307b6b9beb11eb3a6b0293eed2669c70afe06 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Sun, 23 Jun 2024 20:56:00 +0300 Subject: [PATCH 18/18] adjust error messages --- cmd/dbctl/util.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/dbctl/util.go b/cmd/dbctl/util.go index 4731ae8b..f11789de 100644 --- a/cmd/dbctl/util.go +++ b/cmd/dbctl/util.go @@ -10,7 +10,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func newEmbeddedClient(storageDir string) (*clientv3.Client, error) { +func newEtcdClient(storageDir string) (*clientv3.Client, error) { instance, err := etcd.New(storageDir) if err != nil { return nil, fmt.Errorf("failed to initialize instance: %w", err) @@ -29,9 +29,9 @@ func jsonOutput(i any) error { func command(f func(context.Context, *clientv3.Client, []string) (any, error)) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { - client, err := newEmbeddedClient(flagStorageDir) + client, err := newEtcdClient(flagStorageDir) if err != nil { - return fmt.Errorf("failed to initialize embedded client: %w", err) + return fmt.Errorf("failed to initialize etcd client: %w", err) } resp, err := f(cmd.Context(), client, args) if err != nil {