Skip to content

Commit

Permalink
Microcluster timeouts during k8s bootstrap and k8s join-cluster (#…
Browse files Browse the repository at this point in the history
…520)

* timeout handling in microcluster bootstrap and join hooks

* pass timeout to join and bootstrap

* fix client-side 30s timeouts
  • Loading branch information
neoaggelos committed Jul 1, 2024
1 parent 9c26047 commit b19d036
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 42 deletions.
3 changes: 3 additions & 0 deletions src/k8s/api/v1/cluster.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package apiv1

import "time"

// GetClusterStatusRequest is used to request the current status of the cluster.
type GetClusterStatusRequest struct{}

Expand All @@ -13,6 +15,7 @@ type PostClusterBootstrapRequest struct {
Name string `json:"name"`
Address string `json:"address"`
Config BootstrapConfig `json:"config"`
Timeout time.Duration `json:"timeout"`
}

// GetKubeConfigRequest is used to ask for the admin kubeconfig
Expand Down
11 changes: 7 additions & 4 deletions src/k8s/api/v1/cluster_node.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package apiv1

import "time"

// JoinClusterRequest is used to request to add a node to the cluster.
type JoinClusterRequest struct {
Name string `json:"name"`
Address string `json:"address"`
Token string `json:"token"`
Config string `json:"config"`
Name string `json:"name"`
Address string `json:"address"`
Token string `json:"token"`
Config string `json:"config"`
Timeout time.Duration `json:"timeout"`
}

// RemoveNodeRequest is used to request to remove a node from the cluster.
Expand Down
11 changes: 3 additions & 8 deletions src/k8s/cmd/k8s/k8s_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package k8s
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -125,16 +124,12 @@ func newBootstrapCmd(env cmdutil.ExecutionEnvironment) *cobra.Command {

cmd.PrintErrln("Bootstrapping the cluster. This may take a few seconds, please wait.")

request := apiv1.PostClusterBootstrapRequest{
node, err := client.BootstrapCluster(cmd.Context(), apiv1.PostClusterBootstrapRequest{
Name: opts.name,
Address: address,
Config: bootstrapConfig,
}

ctx, cancel := context.WithTimeout(cmd.Context(), opts.timeout)
cobra.OnFinalize(cancel)

node, err := client.BootstrapCluster(ctx, request)
Timeout: opts.timeout,
})
if err != nil {
cmd.PrintErrf("Error: Failed to bootstrap the cluster.\n\nThe error was: %v\n", err)
env.Exit(1)
Expand Down
12 changes: 7 additions & 5 deletions src/k8s/cmd/k8s/k8s_join_cluster.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package k8s

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -98,11 +97,14 @@ func newJoinClusterCmd(env cmdutil.ExecutionEnvironment) *cobra.Command {
joinClusterConfig = string(b)
}

ctx, cancel := context.WithTimeout(cmd.Context(), opts.timeout)
cobra.OnFinalize(cancel)

cmd.PrintErrln("Joining the cluster. This may take a few seconds, please wait.")
if err := client.JoinCluster(ctx, apiv1.JoinClusterRequest{Name: opts.name, Address: address, Token: token, Config: joinClusterConfig}); err != nil {
if err := client.JoinCluster(cmd.Context(), apiv1.JoinClusterRequest{
Name: opts.name,
Address: address,
Token: token,
Config: joinClusterConfig,
Timeout: opts.timeout,
}); err != nil {
cmd.PrintErrf("Error: Failed to join the cluster using the provided token.\n\nThe error was: %v\n", err)
env.Exit(1)
return
Expand Down
11 changes: 11 additions & 0 deletions src/k8s/pkg/client/k8sd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8sd
import (
"context"
"fmt"
"time"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/utils/control"
Expand All @@ -14,6 +15,11 @@ func (c *k8sd) BootstrapCluster(ctx context.Context, request apiv1.PostClusterBo
return apiv1.NodeStatus{}, fmt.Errorf("k8sd is not ready: %w", err)
}

// NOTE(neoaggelos): microcluster adds an arbitrary 30 second timeout in case no context deadline is set.
// Configure a client deadline for timeout + 30 seconds (the timeout will come from the server)
ctx, cancel := context.WithTimeout(ctx, request.Timeout+30*time.Second)
defer cancel()

var response apiv1.NodeStatus
if err := c.client.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster"), request, &response); err != nil {
return apiv1.NodeStatus{}, fmt.Errorf("failed to POST /k8sd/cluster: %w", err)
Expand All @@ -27,6 +33,11 @@ func (c *k8sd) JoinCluster(ctx context.Context, request apiv1.JoinClusterRequest
return fmt.Errorf("k8sd is not ready: %w", err)
}

// NOTE(neoaggelos): microcluster adds an arbitrary 30 second timeout in case no context deadline is set.
// Configure a client deadline for timeout + 30 seconds (the timeout will come from the server)
ctx, cancel := context.WithTimeout(ctx, request.Timeout+30*time.Second)
defer cancel()

if err := c.client.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster", "join"), request, nil); err != nil {
return fmt.Errorf("failed to POST /k8sd/cluster/join: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions src/k8s/pkg/k8sd/api/cluster_bootstrap.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package api

import (
"context"
"fmt"
"net/http"
"time"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/utils"
Expand Down Expand Up @@ -34,9 +36,15 @@ func (e *Endpoints) postClusterBootstrap(s *state.State, r *http.Request) respon
return response.BadRequest(fmt.Errorf("cluster is already bootstrapped"))
}

// NOTE(neoaggelos): microcluster adds an implicit 30 second timeout if no context deadline is set.
ctx, cancel := context.WithTimeout(r.Context(), time.Hour)
defer cancel()

// NOTE(neoaggelos): pass the timeout as a config option, so that the context cancel will propagate errors.
config = utils.MicroclusterConfigWithTimeout(config, req.Timeout)

// Bootstrap the cluster
if err := e.provider.MicroCluster().NewCluster(r.Context(), hostname, req.Address, config); err != nil {
// TODO move node cleanup here
if err := e.provider.MicroCluster().NewCluster(ctx, hostname, req.Address, config); err != nil {
return response.BadRequest(fmt.Errorf("failed to bootstrap new cluster: %w", err))
}

Expand Down
14 changes: 12 additions & 2 deletions src/k8s/pkg/k8sd/api/cluster_join.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package api

import (
"context"
"fmt"
"net/http"
"time"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/k8sd/types"
Expand All @@ -27,20 +29,28 @@ func (e *Endpoints) postClusterJoin(s *state.State, r *http.Request) response.Re
}

config := map[string]string{}

// NOTE(neoaggelos): microcluster adds an implicit 30 second timeout if no context deadline is set.
ctx, cancel := context.WithTimeout(r.Context(), time.Hour)
defer cancel()

// NOTE(neoaggelos): pass the timeout as a config option, so that the context cancel will propagate errors.
config = utils.MicroclusterConfigWithTimeout(config, req.Timeout)

internalToken := types.InternalWorkerNodeToken{}
// Check if token is worker token
if internalToken.Decode(req.Token) == nil {
// valid worker node token - let's join the cluster
// The validation of the token is done when fetching the cluster information.
config["workerToken"] = req.Token
config["workerJoinConfig"] = req.Config
if err := e.provider.MicroCluster().NewCluster(r.Context(), hostname, req.Address, config); err != nil {
if err := e.provider.MicroCluster().NewCluster(ctx, hostname, req.Address, config); err != nil {
return response.InternalError(fmt.Errorf("failed to join k8sd cluster as worker: %w", err))
}
} else {
// Is not a worker token. let microcluster check if it is a valid control-plane token.
config["controlPlaneJoinConfig"] = req.Config
if err := e.provider.MicroCluster().JoinCluster(r.Context(), hostname, req.Address, req.Token, config); err != nil {
if err := e.provider.MicroCluster().JoinCluster(ctx, hostname, req.Address, req.Token, config); err != nil {
return response.InternalError(fmt.Errorf("failed to join k8sd cluster as control plane: %w", err))
}
}
Expand Down
37 changes: 23 additions & 14 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,32 @@ import (
// onBootstrap is called after we bootstrap the first cluster node.
// onBootstrap configures local services then writes the cluster config on the database.
func (a *App) onBootstrap(s *state.State, initConfig map[string]string) error {

// NOTE(neoaggelos): context timeout is passed over configuration, so that hook failures are propagated to the client
ctx, cancel := context.WithCancel(s.Context)
defer cancel()
if t := utils.MicroclusterTimeoutFromConfig(initConfig); t != 0 {
ctx, cancel = context.WithTimeout(ctx, t)
defer cancel()
}

if workerToken, ok := initConfig["workerToken"]; ok {
workerConfig, err := apiv1.WorkerJoinConfigFromMicrocluster(initConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal worker join config: %w", err)
}
return a.onBootstrapWorkerNode(s, workerToken, workerConfig)
return a.onBootstrapWorkerNode(ctx, s, workerToken, workerConfig)
}

bootstrapConfig, err := apiv1.BootstrapConfigFromMicrocluster(initConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal bootstrap config: %w", err)
}

return a.onBootstrapControlPlane(s, bootstrapConfig)
return a.onBootstrapControlPlane(ctx, s, bootstrapConfig)
}

func (a *App) onBootstrapWorkerNode(s *state.State, encodedToken string, joinConfig apiv1.WorkerNodeJoinConfig) error {
func (a *App) onBootstrapWorkerNode(ctx context.Context, s *state.State, encodedToken string, joinConfig apiv1.WorkerNodeJoinConfig) error {
snap := a.Snap()

token := &types.InternalWorkerNodeToken{}
Expand Down Expand Up @@ -181,11 +190,11 @@ func (a *App) onBootstrapWorkerNode(s *state.State, encodedToken string, joinCon
}

// Pre-init checks
if err := snap.PreInitChecks(s.Context, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg); err != nil {
return fmt.Errorf("pre-init checks failed for worker node: %w", err)
}

if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
if err := s.Database.Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
if _, err := database.SetClusterConfig(ctx, tx, cfg); err != nil {
return fmt.Errorf("failed to write cluster configuration: %w", err)
}
Expand All @@ -201,7 +210,7 @@ func (a *App) onBootstrapWorkerNode(s *state.State, encodedToken string, joinCon
if err := setup.KubeletWorker(snap, s.Name(), nodeIP, response.ClusterDNS, response.ClusterDomain, response.CloudProvider, joinConfig.ExtraNodeKubeletArgs); err != nil {
return fmt.Errorf("failed to configure kubelet: %w", err)
}
if err := setup.KubeProxy(s.Context, snap, s.Name(), response.PodCIDR, joinConfig.ExtraNodeKubeProxyArgs); err != nil {
if err := setup.KubeProxy(ctx, snap, s.Name(), response.PodCIDR, joinConfig.ExtraNodeKubeProxyArgs); err != nil {
return fmt.Errorf("failed to configure kube-proxy: %w", err)
}
if err := setup.K8sAPIServerProxy(snap, response.APIServers, joinConfig.ExtraNodeK8sAPIServerProxyArgs); err != nil {
Expand All @@ -217,14 +226,14 @@ func (a *App) onBootstrapWorkerNode(s *state.State, encodedToken string, joinCon
}

// Start services
if err := snaputil.StartWorkerServices(s.Context, snap); err != nil {
if err := snaputil.StartWorkerServices(ctx, snap); err != nil {
return fmt.Errorf("failed to start worker services: %w", err)
}

return nil
}

func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.BootstrapConfig) error {
func (a *App) onBootstrapControlPlane(ctx context.Context, s *state.State, bootstrapConfig apiv1.BootstrapConfig) error {
snap := a.Snap()

cfg, err := types.ClusterConfigFromBootstrapConfig(bootstrapConfig)
Expand Down Expand Up @@ -346,7 +355,7 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot
cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey)

// Pre-init checks
if err := snap.PreInitChecks(s.Context, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg); err != nil {
return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err)
}

Expand All @@ -373,7 +382,7 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot
if err := setup.KubeletControlPlane(snap, s.Name(), nodeIP, cfg.Kubelet.GetClusterDNS(), cfg.Kubelet.GetClusterDomain(), cfg.Kubelet.GetCloudProvider(), cfg.Kubelet.GetControlPlaneTaints(), bootstrapConfig.ExtraNodeKubeletArgs); err != nil {
return fmt.Errorf("failed to configure kubelet: %w", err)
}
if err := setup.KubeProxy(s.Context, snap, s.Name(), cfg.Network.GetPodCIDR(), bootstrapConfig.ExtraNodeKubeProxyArgs); err != nil {
if err := setup.KubeProxy(ctx, snap, s.Name(), cfg.Network.GetPodCIDR(), bootstrapConfig.ExtraNodeKubeProxyArgs); err != nil {
return fmt.Errorf("failed to configure kube-proxy: %w", err)
}
if err := setup.KubeControllerManager(snap, bootstrapConfig.ExtraNodeKubeControllerManagerArgs); err != nil {
Expand All @@ -391,7 +400,7 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot
}

// Write cluster configuration to dqlite
if err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
if err := s.Database.Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
if _, err := database.SetClusterConfig(ctx, tx, cfg); err != nil {
return fmt.Errorf("failed to write cluster configuration: %w", err)
}
Expand All @@ -400,17 +409,17 @@ func (a *App) onBootstrapControlPlane(s *state.State, bootstrapConfig apiv1.Boot
return fmt.Errorf("database transaction to update cluster configuration failed: %w", err)
}

if err := snapdconfig.SetSnapdFromK8sd(s.Context, cfg.ToUserFacing(), snap); err != nil {
if err := snapdconfig.SetSnapdFromK8sd(ctx, cfg.ToUserFacing(), snap); err != nil {
return fmt.Errorf("failed to set snapd configuration from k8sd: %w", err)
}

// Start services
if err := startControlPlaneServices(s.Context, snap, cfg.Datastore.GetType()); err != nil {
if err := startControlPlaneServices(ctx, snap, cfg.Datastore.GetType()); err != nil {
return fmt.Errorf("failed to start services: %w", err)
}

// Wait until Kube-API server is ready
if err := waitApiServerReady(s.Context, snap); err != nil {
if err := waitApiServerReady(ctx, snap); err != nil {
return fmt.Errorf("kube-apiserver did not become ready in time: %w", err)
}

Expand Down
22 changes: 15 additions & 7 deletions src/k8s/pkg/k8sd/app/hooks_join.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"fmt"
"net"

Expand All @@ -18,12 +19,19 @@ import (
func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error {
snap := a.Snap()

ctx, cancel := context.WithCancel(s.Context)
defer cancel()
if t := utils.MicroclusterTimeoutFromConfig(initConfig); t != 0 {
ctx, cancel = context.WithTimeout(ctx, t)
defer cancel()
}

joinConfig, err := apiv1.ControlPlaneJoinConfigFromMicrocluster(initConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal control plane join config: %w", err)
}

cfg, err := databaseutil.GetClusterConfig(s.Context, s)
cfg, err := databaseutil.GetClusterConfig(ctx, s)
if err != nil {
return fmt.Errorf("failed to get cluster config: %w", err)
}
Expand Down Expand Up @@ -111,7 +119,7 @@ func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error {
}

// Pre-init checks
if err := snap.PreInitChecks(s.Context, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg); err != nil {
return fmt.Errorf("pre-init checks failed for joining node: %w", err)
}

Expand All @@ -131,7 +139,7 @@ func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error {
if err != nil {
return fmt.Errorf("failed to get dqlite leader: %w", err)
}
members, err := leader.GetClusterMembers(s.Context)
members, err := leader.GetClusterMembers(ctx)
if err != nil {
return fmt.Errorf("failed to get microcluster members: %w", err)
}
Expand All @@ -156,7 +164,7 @@ func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error {
if err := setup.KubeletControlPlane(snap, s.Name(), nodeIP, cfg.Kubelet.GetClusterDNS(), cfg.Kubelet.GetClusterDomain(), cfg.Kubelet.GetCloudProvider(), cfg.Kubelet.GetControlPlaneTaints(), joinConfig.ExtraNodeKubeletArgs); err != nil {
return fmt.Errorf("failed to configure kubelet: %w", err)
}
if err := setup.KubeProxy(s.Context, snap, s.Name(), cfg.Network.GetPodCIDR(), joinConfig.ExtraNodeKubeProxyArgs); err != nil {
if err := setup.KubeProxy(ctx, snap, s.Name(), cfg.Network.GetPodCIDR(), joinConfig.ExtraNodeKubeProxyArgs); err != nil {
return fmt.Errorf("failed to configure kube-proxy: %w", err)
}
if err := setup.KubeControllerManager(snap, joinConfig.ExtraNodeKubeControllerManagerArgs); err != nil {
Expand All @@ -173,17 +181,17 @@ func (a *App) onPostJoin(s *state.State, initConfig map[string]string) error {
return fmt.Errorf("failed to write extra node config files: %w", err)
}

if err := snapdconfig.SetSnapdFromK8sd(s.Context, cfg.ToUserFacing(), snap); err != nil {
if err := snapdconfig.SetSnapdFromK8sd(ctx, cfg.ToUserFacing(), snap); err != nil {
return fmt.Errorf("failed to set snapd configuration from k8sd: %w", err)
}

// Start services
if err := startControlPlaneServices(s.Context, snap, cfg.Datastore.GetType()); err != nil {
if err := startControlPlaneServices(ctx, snap, cfg.Datastore.GetType()); err != nil {
return fmt.Errorf("failed to start services: %w", err)
}

// Wait until Kube-API server is ready
if err := waitApiServerReady(s.Context, snap); err != nil {
if err := waitApiServerReady(ctx, snap); err != nil {
return fmt.Errorf("failed to wait for kube-apiserver to become ready: %w", err)
}

Expand Down
Loading

0 comments on commit b19d036

Please sign in to comment.