Skip to content

Commit

Permalink
Cleanup on failed k8s bootstrap an k8s join-cluster attempts (#521)
Browse files Browse the repository at this point in the history
* cleanup on failed bootstrap

* cleanup on failed worker join

* create a local microcluster client on App

* start cleanup on control plane join

* do not block remove hook
  • Loading branch information
neoaggelos authored Jul 1, 2024
1 parent b19d036 commit b07b07a
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 45 deletions.
5 changes: 3 additions & 2 deletions src/k8s/api/v1/cluster_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type JoinClusterRequest struct {

// RemoveNodeRequest is used to request to remove a node from the cluster.
type RemoveNodeRequest struct {
Name string `json:"name"`
Force bool `json:"force"`
Name string `json:"name"`
Force bool `json:"force"`
Timeout time.Duration `json:"timeout"`
}
6 changes: 1 addition & 5 deletions src/k8s/cmd/k8s/k8s_remove_node.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package k8s

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -44,11 +43,8 @@ func newRemoveNodeCmd(env cmdutil.ExecutionEnvironment) *cobra.Command {

name := args[0]

ctx, cancel := context.WithTimeout(cmd.Context(), opts.timeout)
cobra.OnFinalize(cancel)

cmd.PrintErrf("Removing %q from the Kubernetes cluster. This may take a few seconds, please wait.\n", name)
if err := client.RemoveNode(ctx, apiv1.RemoveNodeRequest{Name: name, Force: opts.force}); err != nil {
if err := client.RemoveNode(cmd.Context(), apiv1.RemoveNodeRequest{Name: name, Force: opts.force, Timeout: opts.timeout}); err != nil {
cmd.PrintErrf("Error: Failed to remove node %q from the cluster.\n\nThe error was: %v\n", name, err)
env.Exit(1)
return
Expand Down
19 changes: 5 additions & 14 deletions src/k8s/pkg/client/k8sd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/utils/control"
"github.com/canonical/lxd/shared/api"
)

Expand Down Expand Up @@ -42,23 +41,15 @@ func (c *k8sd) JoinCluster(ctx context.Context, request apiv1.JoinClusterRequest
return fmt.Errorf("failed to POST /k8sd/cluster/join: %w", err)
}

// NOTE(neoaggelos): we should not ignore this error
_ = control.WaitUntilReady(ctx, func() (bool, error) {
nodeStatus, err := c.NodeStatus(ctx)
switch {
case err != nil:
return false, fmt.Errorf("failed to get node status: %w", err)
case nodeStatus.DatastoreRole == apiv1.DatastoreRolePending:
// still waiting for node to join
return false, nil
}
return true, nil
})

return nil
}

func (c *k8sd) RemoveNode(ctx context.Context, request apiv1.RemoveNodeRequest) error {
// NOTE(neoaggelos): microcluster adds an arbitrary 30 second timeout in case no context deadline is set.
// Configure a client deadline for timeout + 30 seconds (the timeout will come from the server)
ctx, cancel := context.WithTimeout(ctx, request.Timeout+30*time.Second)
defer cancel()

if err := c.client.Query(ctx, "POST", api.NewURL().Path("k8sd", "cluster", "remove"), request, nil); err != nil {
return fmt.Errorf("failed to POST /k8sd/cluster/remove: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions src/k8s/pkg/client/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"context"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
)

// DeleteNode will remove a node from the kubernetes cluster.
// DeleteNode will retry if there is a conflict on the resource.
// DeleteNode will not fail if the node does not
func (c *Client) DeleteNode(ctx context.Context, nodeName string) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err := c.CoreV1().Nodes().Delete(ctx, nodeName, metav1.DeleteOptions{})
if err != nil {
if err := c.CoreV1().Nodes().Delete(ctx, nodeName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete node: %w", err)
}
return nil
Expand Down
17 changes: 13 additions & 4 deletions src/k8s/pkg/client/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"testing"

"github.com/onsi/gomega"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -15,7 +15,7 @@ import (
)

func TestDeleteNode(t *testing.T) {
g := gomega.NewWithT(t)
g := NewWithT(t)

t.Run("node deletion is successful", func(t *testing.T) {
clientset := fake.NewSimpleClientset()
Expand All @@ -28,7 +28,16 @@ func TestDeleteNode(t *testing.T) {
}, metav1.CreateOptions{})

err := client.DeleteNode(context.Background(), nodeName)
g.Expect(err).To(gomega.BeNil())
g.Expect(err).To(BeNil())
})

t.Run("node does not exist is successful", func(t *testing.T) {
clientset := fake.NewSimpleClientset()
client := &Client{Interface: clientset}
nodeName := "test-node"

err := client.DeleteNode(context.Background(), nodeName)
g.Expect(err).To(BeNil())
})

t.Run("node deletion fails", func(t *testing.T) {
Expand All @@ -41,6 +50,6 @@ func TestDeleteNode(t *testing.T) {
})

err := client.DeleteNode(context.Background(), nodeName)
g.Expect(err).To(gomega.MatchError(fmt.Errorf("failed to delete node: %w", expectedErr)))
g.Expect(err).To(MatchError(fmt.Errorf("failed to delete node: %w", expectedErr)))
})
}
45 changes: 38 additions & 7 deletions src/k8s/pkg/k8sd/api/cluster_remove.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package api

import (
"context"
"database/sql"
"fmt"
"log"
"net/http"

apiv1 "github.com/canonical/k8s/api/v1"
databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/control"
nodeutil "github.com/canonical/k8s/pkg/utils/node"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/cluster"
"github.com/canonical/microcluster/state"
)

Expand All @@ -20,23 +25,49 @@ func (e *Endpoints) postClusterRemove(s *state.State, r *http.Request) response.
return response.BadRequest(fmt.Errorf("failed to parse request: %w", err))
}

isControlPlane, err := nodeutil.IsControlPlaneNode(r.Context(), s, req.Name)
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
if req.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, req.Timeout)
defer cancel()
}

isControlPlane, err := nodeutil.IsControlPlaneNode(ctx, s, req.Name)
if err != nil {
return response.InternalError(fmt.Errorf("failed to check if node is control-plane: %w", err))
}
if isControlPlane {
log.Printf("Waiting for node to not be pending")
control.WaitUntilReady(ctx, func() (bool, error) {
var notPending bool
if err := s.Database.Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
member, err := cluster.GetInternalClusterMember(ctx, tx, req.Name)
if err != nil {
log.Printf("Failed to get member: %v", err)
return nil
}
log.Printf("Node %s is %s", member.Name, member.Role)
notPending = member.Role != cluster.Pending
return nil
}); err != nil {
log.Printf("Transaction to check cluster member role failed: %v", err)
}
return notPending, nil
})
log.Printf("Starting node deletion")

// Remove control plane via microcluster API.
// The postRemove hook will take care of cleaning up kubernetes.
c, err := e.provider.MicroCluster().LocalClient()
c, err := s.Leader()
if err != nil {
return response.InternalError(fmt.Errorf("failed to create local client: %w", err))
return response.InternalError(fmt.Errorf("failed to create client to cluster leader: %w", err))
}
if err := c.DeleteClusterMember(r.Context(), req.Name, req.Force); err != nil {
if err := c.DeleteClusterMember(ctx, req.Name, req.Force); err != nil {
return response.InternalError(fmt.Errorf("failed to delete cluster member %s: %w", req.Name, err))
}
}

isWorker, err := databaseutil.IsWorkerNode(r.Context(), s, req.Name)
isWorker, err := databaseutil.IsWorkerNode(ctx, s, req.Name)
if err != nil {
return response.InternalError(fmt.Errorf("failed to check if node is worker: %w", err))
}
Expand All @@ -47,11 +78,11 @@ func (e *Endpoints) postClusterRemove(s *state.State, r *http.Request) response.
return response.InternalError(fmt.Errorf("failed to create k8s client: %w", err))
}

if err := c.DeleteNode(s.Context, req.Name); err != nil {
if err := c.DeleteNode(ctx, req.Name); err != nil {
return response.InternalError(fmt.Errorf("failed to remove k8s node %q: %w", req.Name, err))
}

if err := databaseutil.DeleteWorkerNodeEntry(r.Context(), s, req.Name); err != nil {
if err := databaseutil.DeleteWorkerNodeEntry(ctx, s, req.Name); err != nil {
return response.InternalError(fmt.Errorf("failed to remove worker entry %q: %w", req.Name, err))
}
}
Expand Down
16 changes: 11 additions & 5 deletions src/k8s/pkg/k8sd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/canonical/k8s/pkg/k8sd/controllers"
"github.com/canonical/k8s/pkg/k8sd/database"
"github.com/canonical/k8s/pkg/snap"
"github.com/canonical/microcluster/client"
"github.com/canonical/microcluster/config"
"github.com/canonical/microcluster/microcluster"
"github.com/canonical/microcluster/state"
Expand Down Expand Up @@ -42,8 +43,9 @@ type Config struct {

// App is the k8sd microcluster instance.
type App struct {
microCluster *microcluster.MicroCluster
snap snap.Snap
cluster *microcluster.MicroCluster
client *client.Client
snap snap.Snap

// profilingAddress
profilingAddress string
Expand Down Expand Up @@ -79,13 +81,17 @@ func New(cfg Config) (*App, error) {
Debug: cfg.Debug,
StateDir: cfg.StateDir,
})

if err != nil {
return nil, fmt.Errorf("failed to create microcluster app: %w", err)
}
client, err := cluster.LocalClient()
if err != nil {
return nil, fmt.Errorf("failed to create microcluster local client: %w", err)
}

app := &App{
microCluster: cluster,
cluster: cluster,
client: client,
snap: cfg.Snap,
profilingAddress: cfg.PprofAddress,
}
Expand Down Expand Up @@ -192,7 +198,7 @@ func (a *App) Run(ctx context.Context, customHooks *config.Hooks) error {
}()
}

err := a.microCluster.Start(ctx, api.New(a).Endpoints(), database.SchemaExtensions, hooks)
err := a.cluster.Start(ctx, api.New(a).Endpoints(), database.SchemaExtensions, hooks)
if err != nil {
return fmt.Errorf("failed to run microcluster: %w", err)
}
Expand Down
18 changes: 18 additions & 0 deletions src/k8s/pkg/k8sd/app/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ func startControlPlaneServices(ctx context.Context, snap snap.Snap, datastore st
return nil
}

func stopControlPlaneServices(ctx context.Context, snap snap.Snap, datastore string) error {
// Stop services
switch datastore {
case "k8s-dqlite":
if err := snaputil.StopK8sDqliteServices(ctx, snap); err != nil {
return fmt.Errorf("failed to stop k8s-dqlite service: %w", err)
}
case "external":
default:
return fmt.Errorf("unsupported datastore %s, must be one of %v", datastore, setup.SupportedDatastores)
}

if err := snaputil.StopControlPlaneServices(ctx, snap); err != nil {
return fmt.Errorf("failed to stop control plane services: %w", err)
}
return nil
}

func waitApiServerReady(ctx context.Context, snap snap.Snap) error {
// Wait for API server to come up
client, err := snap.KubernetesClient("")
Expand Down
Loading

0 comments on commit b07b07a

Please sign in to comment.