Skip to content

Commit

Permalink
feat: support control plane upgrades with Talos managed control plane
Browse files Browse the repository at this point in the history
Upgrade is performed by updating node configuration (node by node, service
by service), watching internal resource state to get new configuration
version and verifying that pod with matching version successfully
propagated to the API server state and pod is ready.

Process is similar to the rolling update of the DaemonSet.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Feb 20, 2021
1 parent 8789849 commit e2f1fbc
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 6 deletions.
8 changes: 4 additions & 4 deletions internal/integration/provision/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (

previousK8sVersion = "1.19.4"
stableK8sVersion = "1.20.1"
currentK8sVersion = "1.20.4" //nolint: deadcode,varcheck
currentK8sVersion = "1.20.4"
)

var (
Expand Down Expand Up @@ -128,7 +128,7 @@ func upgradeStableReleaseToCurrent() upgradeSpec {

TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion,
TargetK8sVersion: currentK8sVersion,
TargetSelfHosted: false,

MasterNodes: DefaultSettings.MasterNodes,
Expand All @@ -150,7 +150,7 @@ func upgradeSingeNodePreserve() upgradeSpec {

TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion
TargetK8sVersion: currentK8sVersion,
TargetSelfHosted: false,

MasterNodes: 1,
Expand All @@ -173,7 +173,7 @@ func upgradeSingeNodeStage() upgradeSpec {

TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion
TargetK8sVersion: currentK8sVersion,
TargetSelfHosted: false,

MasterNodes: 1,
Expand Down
3 changes: 2 additions & 1 deletion pkg/cluster/kubernetes/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/talos-systems/talos/pkg/machinery/config/configloader"
v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/generate"
machinetype "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
Expand Down Expand Up @@ -63,7 +64,7 @@ func ConvertToStaticPods(ctx context.Context, cluster ConvertProvider, options C
return fmt.Errorf("error building kubernetes client: %w", err)
}

options.masterNodes, err = k8sClient.MasterIPs(ctx)
options.masterNodes, err = k8sClient.NodeIPs(ctx, machinetype.TypeControlPlane)
if err != nil {
return fmt.Errorf("error fetching master nodes: %w", err)
}
Expand Down
245 changes: 244 additions & 1 deletion pkg/cluster/kubernetes/talos_managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,22 @@ package kubernetes

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/os-runtime/pkg/state"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/talos-systems/talos/pkg/cluster"
"github.com/talos-systems/talos/pkg/machinery/client"
v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
machinetype "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/config"
)

// UpgradeProvider are the cluster interfaces required by upgrade process.
Expand All @@ -18,6 +31,236 @@ type UpgradeProvider interface {
}

// UpgradeTalosManaged the Kubernetes control plane.
//
//nolint: gocyclo
func UpgradeTalosManaged(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions) error {
return fmt.Errorf("not implemented yet")
switch {
case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.19."):
case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.20."):
options.extraUpdaters = append(options.extraUpdaters, addControlPlaneToleration())
case strings.HasPrefix(options.FromVersion, "1.20.") && strings.HasPrefix(options.ToVersion, "1.20."):
default:
return fmt.Errorf("unsupported upgrade from %q to %q", options.FromVersion, options.ToVersion)
}

k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}

options.masterNodes, err = k8sClient.NodeIPs(ctx, machinetype.TypeControlPlane)
if err != nil {
return fmt.Errorf("error fetching master nodes: %w", err)
}

if len(options.masterNodes) == 0 {
return fmt.Errorf("no master nodes discovered")
}

fmt.Printf("discovered master nodes %q\n", options.masterNodes)

for _, service := range []string{kubeAPIServer, kubeControllerManager, kubeScheduler} {
if err = upgradeConfigPatch(ctx, cluster, options, service); err != nil {
return fmt.Errorf("failed updating service %q: %w", service, err)
}
}

if err = hyperkubeUpgradeDs(ctx, k8sClient.Clientset, kubeProxy, options); err != nil {
return fmt.Errorf("error updating kube-proxy: %w", err)
}

return nil
}

func upgradeConfigPatch(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, service string) error {
fmt.Printf("updating %q to version %q\n", service, options.ToVersion)

for _, node := range options.masterNodes {
if err := upgradeNodeConfigPatch(ctx, cluster, options, service, node); err != nil {
return fmt.Errorf("error updating node %q: %w", node, err)
}
}

return nil
}

//nolint: gocyclo
func upgradeNodeConfigPatch(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, service, node string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

c, err := cluster.Client()
if err != nil {
return fmt.Errorf("error building Talos API client: %w", err)
}

ctx = client.WithNodes(ctx, node)

fmt.Printf(" > updating node %q\n", node)

watchClient, err := c.Resources.Watch(ctx, config.NamespaceName, config.K8sControlPlaneType, service)
if err != nil {
return fmt.Errorf("error watching service configuration: %w", err)
}

// first response is resource definition
_, err = watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching config: %w", err)
}

// second is the initial state
watchInitial, err := watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching config: %w", err)
}

if watchInitial.EventType != state.Created {
return fmt.Errorf("unexpected event type: %d", watchInitial.EventType)
}

skipConfigWait := false

err = patchNodeConfig(ctx, cluster, node, upgradeConfigPatcher(options, service))
if err != nil {
if errors.Is(err, errUpdateSkipped) {
skipConfigWait = true
} else {
return fmt.Errorf("error patching node config: %w", err)
}
}

var expectedConfigVersion string

if !skipConfigWait {
watchUpdated, err := watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching config: %w", err)
}

if watchUpdated.EventType != state.Updated {
return fmt.Errorf("unexpected event type: %d", watchInitial.EventType)
}

expectedConfigVersion = watchUpdated.Resource.Metadata().Version().String()
} else {
expectedConfigVersion = watchInitial.Resource.Metadata().Version().String()
}

return retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second), retry.WithErrorLogging(true)).Retry(func() error {
return checkPodStatus(ctx, cluster, service, node, expectedConfigVersion)
})
}

var errUpdateSkipped = fmt.Errorf("update skipped")

//nolint: gocyclo
func upgradeConfigPatcher(options UpgradeOptions, service string) func(config *v1alpha1config.Config) error {
return func(config *v1alpha1config.Config) error {
if config.ClusterConfig == nil {
config.ClusterConfig = &v1alpha1config.ClusterConfig{}
}

switch service {
case kubeAPIServer:
if config.ClusterConfig.APIServerConfig == nil {
config.ClusterConfig.APIServerConfig = &v1alpha1config.APIServerConfig{}
}

image := fmt.Sprintf("%s:v%s", constants.KubernetesAPIServerImage, options.ToVersion)

if config.ClusterConfig.APIServerConfig.ContainerImage == image {
return errUpdateSkipped
}

config.ClusterConfig.APIServerConfig.ContainerImage = image
case kubeControllerManager:
if config.ClusterConfig.ControllerManagerConfig == nil {
config.ClusterConfig.ControllerManagerConfig = &v1alpha1config.ControllerManagerConfig{}
}

image := fmt.Sprintf("%s:v%s", constants.KubernetesControllerManagerImage, options.ToVersion)

if config.ClusterConfig.ControllerManagerConfig.ContainerImage == image {
return errUpdateSkipped
}

config.ClusterConfig.ControllerManagerConfig.ContainerImage = image
case kubeScheduler:
if config.ClusterConfig.SchedulerConfig == nil {
config.ClusterConfig.SchedulerConfig = &v1alpha1config.SchedulerConfig{}
}

image := fmt.Sprintf("%s:v%s", constants.KubernetesSchedulerImage, options.ToVersion)

if config.ClusterConfig.SchedulerConfig.ContainerImage == image {
return errUpdateSkipped
}

config.ClusterConfig.SchedulerConfig.ContainerImage = image
default:
return fmt.Errorf("unsupported service %q", service)
}

return nil
}
}

//nolint: gocyclo
func checkPodStatus(ctx context.Context, cluster UpgradeProvider, service, node, configVersion string) error {
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return retry.UnexpectedError(fmt.Errorf("error building kubernetes client: %w", err))
}

pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{
LabelSelector: fmt.Sprintf("k8s-app = %s", service),
})
if err != nil {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return retry.ExpectedError(err)
}

return retry.UnexpectedError(err)
}

podFound := false

for _, pod := range pods.Items {
if pod.Status.HostIP != node {
continue
}

podFound = true

if pod.Annotations[constants.AnnotationStaticPodConfigVersion] != configVersion {
return retry.ExpectedError(fmt.Errorf("config version mismatch: got %q, expected %q", pod.Annotations[constants.AnnotationStaticPodConfigVersion], configVersion))
}

ready := false

for _, condition := range pod.Status.Conditions {
if condition.Type != "Ready" {
continue
}

if condition.Status == "True" {
ready = true

break
}
}

if !ready {
return retry.ExpectedError(fmt.Errorf("pod is not ready"))
}

break
}

if !podFound {
return retry.ExpectedError(fmt.Errorf("pod not found in the API server state"))
}

return nil
}
1 change: 1 addition & 0 deletions pkg/cluster/kubernetes/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type UpgradeOptions struct {

extraUpdaters []daemonsetUpdater
podCheckpointerExtraUpdaters []daemonsetUpdater
masterNodes []string
}

type daemonsetUpdater func(ds string, daemonset *appsv1.DaemonSet) error

0 comments on commit e2f1fbc

Please sign in to comment.