From b19d0360467ac2b71665f8a7d9e9b50f16803cc2 Mon Sep 17 00:00:00 2001 From: Angelos Kolaitis Date: Mon, 1 Jul 2024 18:24:12 +0300 Subject: [PATCH] Microcluster timeouts during `k8s bootstrap` and `k8s join-cluster` (#520) * timeout handling in microcluster bootstrap and join hooks * pass timeout to join and bootstrap * fix client-side 30s timeouts --- src/k8s/api/v1/cluster.go | 3 ++ src/k8s/api/v1/cluster_node.go | 11 ++++--- src/k8s/cmd/k8s/k8s_bootstrap.go | 11 ++----- src/k8s/cmd/k8s/k8s_join_cluster.go | 12 +++++--- src/k8s/pkg/client/k8sd/cluster.go | 11 +++++++ src/k8s/pkg/k8sd/api/cluster_bootstrap.go | 12 ++++++-- src/k8s/pkg/k8sd/api/cluster_join.go | 14 +++++++-- src/k8s/pkg/k8sd/app/hooks_bootstrap.go | 37 ++++++++++++++--------- src/k8s/pkg/k8sd/app/hooks_join.go | 22 +++++++++----- src/k8s/pkg/utils/microcluster.go | 26 ++++++++++++++++ src/k8s/pkg/utils/microcluster_test.go | 28 +++++++++++++++++ 11 files changed, 145 insertions(+), 42 deletions(-) create mode 100644 src/k8s/pkg/utils/microcluster.go create mode 100644 src/k8s/pkg/utils/microcluster_test.go diff --git a/src/k8s/api/v1/cluster.go b/src/k8s/api/v1/cluster.go index 4b61e9a3f..84d00dddd 100644 --- a/src/k8s/api/v1/cluster.go +++ b/src/k8s/api/v1/cluster.go @@ -1,5 +1,7 @@ package apiv1 +import "time" + // GetClusterStatusRequest is used to request the current status of the cluster. type GetClusterStatusRequest struct{} @@ -13,6 +15,7 @@ type PostClusterBootstrapRequest struct { Name string `json:"name"` Address string `json:"address"` Config BootstrapConfig `json:"config"` + Timeout time.Duration `json:"timeout"` } // GetKubeConfigRequest is used to ask for the admin kubeconfig diff --git a/src/k8s/api/v1/cluster_node.go b/src/k8s/api/v1/cluster_node.go index 10e384ab8..5ad6eea8d 100644 --- a/src/k8s/api/v1/cluster_node.go +++ b/src/k8s/api/v1/cluster_node.go @@ -1,11 +1,14 @@ package apiv1 +import "time" + // JoinClusterRequest is used to request to add a node to the cluster. type JoinClusterRequest struct { - Name string `json:"name"` - Address string `json:"address"` - Token string `json:"token"` - Config string `json:"config"` + Name string `json:"name"` + Address string `json:"address"` + Token string `json:"token"` + Config string `json:"config"` + Timeout time.Duration `json:"timeout"` } // RemoveNodeRequest is used to request to remove a node from the cluster. diff --git a/src/k8s/cmd/k8s/k8s_bootstrap.go b/src/k8s/cmd/k8s/k8s_bootstrap.go index c6f321da0..0b02bce4a 100644 --- a/src/k8s/cmd/k8s/k8s_bootstrap.go +++ b/src/k8s/cmd/k8s/k8s_bootstrap.go @@ -3,7 +3,6 @@ package k8s import ( "bufio" "bytes" - "context" "fmt" "io" "os" @@ -125,16 +124,12 @@ func newBootstrapCmd(env cmdutil.ExecutionEnvironment) *cobra.Command { cmd.PrintErrln("Bootstrapping the cluster. This may take a few seconds, please wait.") - request := apiv1.PostClusterBootstrapRequest{ + node, err := client.BootstrapCluster(cmd.Context(), apiv1.PostClusterBootstrapRequest{ Name: opts.name, Address: address, Config: bootstrapConfig, - } - - ctx, cancel := context.WithTimeout(cmd.Context(), opts.timeout) - cobra.OnFinalize(cancel) - - node, err := client.BootstrapCluster(ctx, request) + Timeout: opts.timeout, + }) if err != nil { cmd.PrintErrf("Error: Failed to bootstrap the cluster.\n\nThe error was: %v\n", err) env.Exit(1) diff --git a/src/k8s/cmd/k8s/k8s_join_cluster.go b/src/k8s/cmd/k8s/k8s_join_cluster.go index 45b855ab7..ad7ebc52a 100644 --- a/src/k8s/cmd/k8s/k8s_join_cluster.go +++ b/src/k8s/cmd/k8s/k8s_join_cluster.go @@ -1,7 +1,6 @@ package k8s import ( - "context" "fmt" "io" "os" @@ -98,11 +97,14 @@ func newJoinClusterCmd(env cmdutil.ExecutionEnvironment) *cobra.Command { joinClusterConfig = string(b) } - ctx, cancel := context.WithTimeout(cmd.Context(), opts.timeout) - cobra.OnFinalize(cancel) - cmd.PrintErrln("Joining the cluster. This may take a few seconds, please wait.") - if err := client.JoinCluster(ctx, apiv1.JoinClusterRequest{Name: opts.name, Address: address, Token: token, Config: joinClusterConfig}); err != nil { + if err := client.JoinCluster(cmd.Context(), apiv1.JoinClusterRequest{ + Name: opts.name, + Address: address, + Token: token, + Config: joinClusterConfig, + Timeout: opts.timeout, + }); err != nil { cmd.PrintErrf("Error: Failed to join the cluster using the provided token.\n\nThe error was: %v\n", err) env.Exit(1) return diff --git a/src/k8s/pkg/client/k8sd/cluster.go b/src/k8s/pkg/client/k8sd/cluster.go index 49793ec22..211283100 100644 --- a/src/k8s/pkg/client/k8sd/cluster.go +++ b/src/k8s/pkg/client/k8sd/cluster.go @@ -3,6 +3,7 @@ package k8sd import ( "context" "fmt" + "time" apiv1 "github.com/canonical/k8s/api/v1" "github.com/canonical/k8s/pkg/utils/control" @@ -14,6 +15,11 @@ func (c *k8sd) BootstrapCluster(ctx context.Context, request apiv1.PostClusterBo return apiv1.NodeStatus{}, fmt.Errorf("k8sd is not ready: %w", err) } + // NOTE(neoaggelos): microcluster adds an arbitrary 30 second timeout in case no context deadline is set. + // Configure a client deadline for timeout + 30 seconds (the timeout will come from the server) + ctx, cancel := context.WithTimeout(ctx, request.Timeout+30*time.Second) + defer cancel() + var response apiv1.NodeStatus if err := c.client.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster"), request, &response); err != nil { return apiv1.NodeStatus{}, fmt.Errorf("failed to POST /k8sd/cluster: %w", err) @@ -27,6 +33,11 @@ func (c *k8sd) JoinCluster(ctx context.Context, request apiv1.JoinClusterRequest return fmt.Errorf("k8sd is not ready: %w", err) } + // NOTE(neoaggelos): microcluster adds an arbitrary 30 second timeout in case no context deadline is set. + // Configure a client deadline for timeout + 30 seconds (the timeout will come from the server) + ctx, cancel := context.WithTimeout(ctx, request.Timeout+30*time.Second) + defer cancel() + if err := c.client.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster", "join"), request, nil); err != nil { return fmt.Errorf("failed to POST /k8sd/cluster/join: %w", err) } diff --git a/src/k8s/pkg/k8sd/api/cluster_bootstrap.go b/src/k8s/pkg/k8sd/api/cluster_bootstrap.go index 82dd8aa83..d7455156a 100644 --- a/src/k8s/pkg/k8sd/api/cluster_bootstrap.go +++ b/src/k8s/pkg/k8sd/api/cluster_bootstrap.go @@ -1,8 +1,10 @@ package api import ( + "context" "fmt" "net/http" + "time" apiv1 "github.com/canonical/k8s/api/v1" "github.com/canonical/k8s/pkg/utils" @@ -34,9 +36,15 @@ func (e *Endpoints) postClusterBootstrap(s *state.State, r *http.Request) respon return response.BadRequest(fmt.Errorf("cluster is already bootstrapped")) } + // NOTE(neoaggelos): microcluster adds an implicit 30 second timeout if no context deadline is set. + ctx, cancel := context.WithTimeout(r.Context(), time.Hour) + defer cancel() + + // NOTE(neoaggelos): pass the timeout as a config option, so that the context cancel will propagate errors. + config = utils.MicroclusterConfigWithTimeout(config, req.Timeout) + // Bootstrap the cluster - if err := e.provider.MicroCluster().NewCluster(r.Context(), hostname, req.Address, config); err != nil { - // TODO move node cleanup here + if err := e.provider.MicroCluster().NewCluster(ctx, hostname, req.Address, config); err != nil { return response.BadRequest(fmt.Errorf("failed to bootstrap new cluster: %w", err)) } diff --git a/src/k8s/pkg/k8sd/api/cluster_join.go b/src/k8s/pkg/k8sd/api/cluster_join.go index 9c71b9c21..b762e528d 100644 --- a/src/k8s/pkg/k8sd/api/cluster_join.go +++ b/src/k8s/pkg/k8sd/api/cluster_join.go @@ -1,8 +1,10 @@ package api import ( + "context" "fmt" "net/http" + "time" apiv1 "github.com/canonical/k8s/api/v1" "github.com/canonical/k8s/pkg/k8sd/types" @@ -27,6 +29,14 @@ func (e *Endpoints) postClusterJoin(s *state.State, r *http.Request) response.Re } config := map[string]string{} + + // NOTE(neoaggelos): microcluster adds an implicit 30 second timeout if no context deadline is set. + ctx, cancel := context.WithTimeout(r.Context(), time.Hour) + defer cancel() + + // NOTE(neoaggelos): pass the timeout as a config option, so that the context cancel will propagate errors. + config = utils.MicroclusterConfigWithTimeout(config, req.Timeout) + internalToken := types.InternalWorkerNodeToken{} // Check if token is worker token if internalToken.Decode(req.Token) == nil { @@ -34,13 +44,13 @@ func (e *Endpoints) postClusterJoin(s *state.State, r *http.Request) response.Re // The validation of the token is done when fetching the cluster information. config["workerToken"] = req.Token config["workerJoinConfig"] = req.Config - if err := e.provider.MicroCluster().NewCluster(r.Context(), hostname, req.Address, config); err != nil { + if err := e.provider.MicroCluster().NewCluster(ctx, hostname, req.Address, config); err != nil { return response.InternalError(fmt.Errorf("failed to join k8sd cluster as worker: %w", err)) } } else { // Is not a worker token. let microcluster check if it is a valid control-plane token. config["controlPlaneJoinConfig"] = req.Config - if err := e.provider.MicroCluster().JoinCluster(r.Context(), hostname, req.Address, req.Token, config); err != nil { + if err := e.provider.MicroCluster().JoinCluster(ctx, hostname, req.Address, req.Token, config); err != nil { return response.InternalError(fmt.Errorf("failed to join k8sd cluster as control plane: %w", err)) } } diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index ac73cb4dd..6952ef3b1 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -25,12 +25,21 @@ import ( // onBootstrap is called after we bootstrap the first cluster node. // onBootstrap configures local services then writes the cluster config on the database. func (a *App) onBootstrap(s *state.State, initConfig map[string]string) error { + + // NOTE(neoaggelos): context timeout is passed over configuration, so that hook failures are propagated to the client + ctx, cancel := context.WithCancel(s.Context) + defer cancel() + if t := utils.MicroclusterTimeoutFromConfig(initConfig); t != 0 { + ctx, cancel = context.WithTimeout(ctx, t) + defer cancel() + } + if workerToken, ok := initConfig["workerToken"]; ok { workerConfig, err := apiv1.WorkerJoinConfigFromMicrocluster(initConfig) if err != nil { return fmt.Errorf("failed to unmarshal worker join config: %w", err) } - return a.onBootstrapWorkerNode(s, workerToken, workerConfig) + return a.onBootstrapWorkerNode(ctx, s, workerToken, workerConfig) } bootstrapConfig, err := apiv1.BootstrapConfigFromMicrocluster(initConfig) @@ -38,10 +47,10 @@ func (a *App) onBootstrap(s *state.State, initConfig map[string]string) error { return fmt.Errorf("failed to unmarshal bootstrap config: %w", err) } - return a.onBootstrapControlPlane(s, bootstrapConfig) + return a.onBootstrapControlPlane(ctx, s, bootstrapConfig) } -func (a *App) onBootstrapWorkerNode(s *state.State, encodedToken string, joinConfig apiv1.WorkerNodeJoinConfig) error { +func (a *App) onBootstrapWorkerNode(ctx context.Context, s *state.State, encodedToken string, joinConfig apiv1.WorkerNodeJoinConfig) error { snap := a.Snap() token := &types.InternalWorkerNodeToken{} @@ -181,11 +190,11 @@ func (a *App) onBootstrapWorkerNode(s *state.State, encodedToken string, joinCon } // Pre-init checks - if err := snap.PreInitChecks(s.Context, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg); err != nil { return fmt.Errorf("pre-init checks failed for worker node: %w", err) } - if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + if err := s.Database.Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error { if _, err := database.SetClusterConfig(ctx, tx, cfg); err != nil { return fmt.Errorf("failed to write cluster configuration: %w", err) } @@ -201,7 +210,7 @@ func (a *App) onBootstrapWorkerNode(s *state.State, encodedToken string, joinCon if err := setup.KubeletWorker(snap, s.Name(), nodeIP, response.ClusterDNS, response.ClusterDomain, response.CloudProvider, joinConfig.ExtraNodeKubeletArgs); err != nil { return fmt.Errorf("failed to configure kubelet: %w", err) } - if err := setup.KubeProxy(s.Context, snap, s.Name(), response.PodCIDR, joinConfig.ExtraNodeKubeProxyArgs); err != nil { + if err := setup.KubeProxy(ctx, snap, s.Name(), response.PodCIDR, joinConfig.ExtraNodeKubeProxyArgs); err != nil { return fmt.Errorf("failed to configure kube-proxy: %w", err) } if err := setup.K8sAPIServerProxy(snap, response.APIServers, joinConfig.ExtraNodeK8sAPIServerProxyArgs); err != nil { @@ -217,14 +226,14 @@ func (a *App) onBootstrapWorkerNode(s *state.State, encodedToken string, joinCon } // Start services - if err := snaputil.StartWorkerServices(s.Context, snap); err != nil { + if err := snaputil.StartWorkerServices(ctx, snap); err != nil { return fmt.Errorf("failed to start worker services: %w", err) } return nil } -func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.BootstrapConfig) error { +func (a *App) onBootstrapControlPlane(ctx context.Context, s *state.State, bootstrapConfig apiv1.BootstrapConfig) error { snap := a.Snap() cfg, err := types.ClusterConfigFromBootstrapConfig(bootstrapConfig) @@ -346,7 +355,7 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey) // Pre-init checks - if err := snap.PreInitChecks(s.Context, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg); err != nil { return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err) } @@ -373,7 +382,7 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot if err := setup.KubeletControlPlane(snap, s.Name(), nodeIP, cfg.Kubelet.GetClusterDNS(), cfg.Kubelet.GetClusterDomain(), cfg.Kubelet.GetCloudProvider(), cfg.Kubelet.GetControlPlaneTaints(), bootstrapConfig.ExtraNodeKubeletArgs); err != nil { return fmt.Errorf("failed to configure kubelet: %w", err) } - if err := setup.KubeProxy(s.Context, snap, s.Name(), cfg.Network.GetPodCIDR(), bootstrapConfig.ExtraNodeKubeProxyArgs); err != nil { + if err := setup.KubeProxy(ctx, snap, s.Name(), cfg.Network.GetPodCIDR(), bootstrapConfig.ExtraNodeKubeProxyArgs); err != nil { return fmt.Errorf("failed to configure kube-proxy: %w", err) } if err := setup.KubeControllerManager(snap, bootstrapConfig.ExtraNodeKubeControllerManagerArgs); err != nil { @@ -391,7 +400,7 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot } // Write cluster configuration to dqlite - if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { + if err := s.Database.Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error { if _, err := database.SetClusterConfig(ctx, tx, cfg); err != nil { return fmt.Errorf("failed to write cluster configuration: %w", err) } @@ -400,17 +409,17 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot return fmt.Errorf("database transaction to update cluster configuration failed: %w", err) } - if err := snapdconfig.SetSnapdFromK8sd(s.Context, cfg.ToUserFacing(), snap); err != nil { + if err := snapdconfig.SetSnapdFromK8sd(ctx, cfg.ToUserFacing(), snap); err != nil { return fmt.Errorf("failed to set snapd configuration from k8sd: %w", err) } // Start services - if err := startControlPlaneServices(s.Context, snap, cfg.Datastore.GetType()); err != nil { + if err := startControlPlaneServices(ctx, snap, cfg.Datastore.GetType()); err != nil { return fmt.Errorf("failed to start services: %w", err) } // Wait until Kube-API server is ready - if err := waitApiServerReady(s.Context, snap); err != nil { + if err := waitApiServerReady(ctx, snap); err != nil { return fmt.Errorf("kube-apiserver did not become ready in time: %w", err) } diff --git a/src/k8s/pkg/k8sd/app/hooks_join.go b/src/k8s/pkg/k8sd/app/hooks_join.go index 3a696c191..88af29d46 100644 --- a/src/k8s/pkg/k8sd/app/hooks_join.go +++ b/src/k8s/pkg/k8sd/app/hooks_join.go @@ -1,6 +1,7 @@ package app import ( + "context" "fmt" "net" @@ -18,12 +19,19 @@ import ( func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error { snap := a.Snap() + ctx, cancel := context.WithCancel(s.Context) + defer cancel() + if t := utils.MicroclusterTimeoutFromConfig(initConfig); t != 0 { + ctx, cancel = context.WithTimeout(ctx, t) + defer cancel() + } + joinConfig, err := apiv1.ControlPlaneJoinConfigFromMicrocluster(initConfig) if err != nil { return fmt.Errorf("failed to unmarshal control plane join config: %w", err) } - cfg, err := databaseutil.GetClusterConfig(s.Context, s) + cfg, err := databaseutil.GetClusterConfig(ctx, s) if err != nil { return fmt.Errorf("failed to get cluster config: %w", err) } @@ -111,7 +119,7 @@ func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error { } // Pre-init checks - if err := snap.PreInitChecks(s.Context, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg); err != nil { return fmt.Errorf("pre-init checks failed for joining node: %w", err) } @@ -131,7 +139,7 @@ func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error { if err != nil { return fmt.Errorf("failed to get dqlite leader: %w", err) } - members, err := leader.GetClusterMembers(s.Context) + members, err := leader.GetClusterMembers(ctx) if err != nil { return fmt.Errorf("failed to get microcluster members: %w", err) } @@ -156,7 +164,7 @@ func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error { if err := setup.KubeletControlPlane(snap, s.Name(), nodeIP, cfg.Kubelet.GetClusterDNS(), cfg.Kubelet.GetClusterDomain(), cfg.Kubelet.GetCloudProvider(), cfg.Kubelet.GetControlPlaneTaints(), joinConfig.ExtraNodeKubeletArgs); err != nil { return fmt.Errorf("failed to configure kubelet: %w", err) } - if err := setup.KubeProxy(s.Context, snap, s.Name(), cfg.Network.GetPodCIDR(), joinConfig.ExtraNodeKubeProxyArgs); err != nil { + if err := setup.KubeProxy(ctx, snap, s.Name(), cfg.Network.GetPodCIDR(), joinConfig.ExtraNodeKubeProxyArgs); err != nil { return fmt.Errorf("failed to configure kube-proxy: %w", err) } if err := setup.KubeControllerManager(snap, joinConfig.ExtraNodeKubeControllerManagerArgs); err != nil { @@ -173,17 +181,17 @@ func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error { return fmt.Errorf("failed to write extra node config files: %w", err) } - if err := snapdconfig.SetSnapdFromK8sd(s.Context, cfg.ToUserFacing(), snap); err != nil { + if err := snapdconfig.SetSnapdFromK8sd(ctx, cfg.ToUserFacing(), snap); err != nil { return fmt.Errorf("failed to set snapd configuration from k8sd: %w", err) } // Start services - if err := startControlPlaneServices(s.Context, snap, cfg.Datastore.GetType()); err != nil { + if err := startControlPlaneServices(ctx, snap, cfg.Datastore.GetType()); err != nil { return fmt.Errorf("failed to start services: %w", err) } // Wait until Kube-API server is ready - if err := waitApiServerReady(s.Context, snap); err != nil { + if err := waitApiServerReady(ctx, snap); err != nil { return fmt.Errorf("failed to wait for kube-apiserver to become ready: %w", err) } diff --git a/src/k8s/pkg/utils/microcluster.go b/src/k8s/pkg/utils/microcluster.go new file mode 100644 index 000000000..7eadcfb70 --- /dev/null +++ b/src/k8s/pkg/utils/microcluster.go @@ -0,0 +1,26 @@ +package utils + +import "time" + +// MicroclusterConfigWithTimeout adds a "timeout" configuration value to the config struct. +// If timeout is zero, the configuration is not affected. +func MicroclusterConfigWithTimeout(config map[string]string, timeout time.Duration) map[string]string { + if timeout == 0 { + return config + } + + config["_timeout"] = timeout.String() + return config +} + +// MicroclusterTimeoutFromConfig returns the configured timeout option from the config struct. +// If case of an invalid or empty value, 0 is returned. +func MicroclusterTimeoutFromConfig(config map[string]string) time.Duration { + if v, ok := config["_timeout"]; !ok { + return 0 + } else if d, err := time.ParseDuration(v); err != nil { + return 0 + } else { + return d + } +} diff --git a/src/k8s/pkg/utils/microcluster_test.go b/src/k8s/pkg/utils/microcluster_test.go new file mode 100644 index 000000000..5d819d709 --- /dev/null +++ b/src/k8s/pkg/utils/microcluster_test.go @@ -0,0 +1,28 @@ +package utils_test + +import ( + "testing" + "time" + + "github.com/canonical/k8s/pkg/utils" + . "github.com/onsi/gomega" +) + +func TestMicroclusterTimeout(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + g := NewWithT(t) + + m := map[string]string{} + g.Expect(utils.MicroclusterTimeoutFromConfig(m)).To(BeZero()) + }) + + t.Run("Normal", func(t *testing.T) { + g := NewWithT(t) + + timeout := 5 * time.Second + m := map[string]string{} + + mWithTimeout := utils.MicroclusterConfigWithTimeout(m, timeout) + g.Expect(utils.MicroclusterTimeoutFromConfig(mWithTimeout)).To(Equal(timeout)) + }) +}