diff --git a/internal/integration/provision/upgrade.go b/internal/integration/provision/upgrade.go index 32e272aaeb..e29af0ccc4 100644 --- a/internal/integration/provision/upgrade.go +++ b/internal/integration/provision/upgrade.go @@ -74,7 +74,7 @@ const ( previousK8sVersion = "1.19.4" stableK8sVersion = "1.20.1" - currentK8sVersion = "1.20.4" //nolint: deadcode,varcheck + currentK8sVersion = "1.20.4" ) var ( @@ -128,7 +128,7 @@ func upgradeStableReleaseToCurrent() upgradeSpec { TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion), TargetVersion: DefaultSettings.CurrentVersion, - TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion, + TargetK8sVersion: currentK8sVersion, TargetSelfHosted: false, MasterNodes: DefaultSettings.MasterNodes, @@ -150,7 +150,7 @@ func upgradeSingeNodePreserve() upgradeSpec { TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion), TargetVersion: DefaultSettings.CurrentVersion, - TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion + TargetK8sVersion: currentK8sVersion, TargetSelfHosted: false, MasterNodes: 1, @@ -173,7 +173,7 @@ func upgradeSingeNodeStage() upgradeSpec { TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion), TargetVersion: DefaultSettings.CurrentVersion, - TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion + TargetK8sVersion: currentK8sVersion, TargetSelfHosted: false, MasterNodes: 1, diff --git a/pkg/cluster/kubernetes/convert.go b/pkg/cluster/kubernetes/convert.go index 07d00fa580..2b144c94af 100644 --- a/pkg/cluster/kubernetes/convert.go +++ b/pkg/cluster/kubernetes/convert.go @@ -32,6 +32,7 @@ import ( "github.com/talos-systems/talos/pkg/machinery/config/configloader" v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/generate" + machinetype "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" "github.com/talos-systems/talos/pkg/resources/config" "github.com/talos-systems/talos/pkg/resources/k8s" @@ -63,7 +64,7 @@ func ConvertToStaticPods(ctx context.Context, cluster ConvertProvider, options C return fmt.Errorf("error building kubernetes client: %w", err) } - options.masterNodes, err = k8sClient.MasterIPs(ctx) + options.masterNodes, err = k8sClient.NodeIPs(ctx, machinetype.TypeControlPlane) if err != nil { return fmt.Errorf("error fetching master nodes: %w", err) } diff --git a/pkg/cluster/kubernetes/talos_managed.go b/pkg/cluster/kubernetes/talos_managed.go index 279695b480..1577fe8ff4 100644 --- a/pkg/cluster/kubernetes/talos_managed.go +++ b/pkg/cluster/kubernetes/talos_managed.go @@ -6,9 +6,22 @@ package kubernetes import ( "context" + "errors" "fmt" + "strings" + "time" + + "github.com/talos-systems/go-retry/retry" + "github.com/talos-systems/os-runtime/pkg/state" + apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/talos-systems/talos/pkg/cluster" + "github.com/talos-systems/talos/pkg/machinery/client" + v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1" + machinetype "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" + "github.com/talos-systems/talos/pkg/machinery/constants" + "github.com/talos-systems/talos/pkg/resources/config" ) // UpgradeProvider are the cluster interfaces required by upgrade process. @@ -18,6 +31,236 @@ type UpgradeProvider interface { } // UpgradeTalosManaged the Kubernetes control plane. +// +//nolint: gocyclo func UpgradeTalosManaged(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions) error { - return fmt.Errorf("not implemented yet") + switch { + case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.19."): + case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.20."): + options.extraUpdaters = append(options.extraUpdaters, addControlPlaneToleration()) + case strings.HasPrefix(options.FromVersion, "1.20.") && strings.HasPrefix(options.ToVersion, "1.20."): + default: + return fmt.Errorf("unsupported upgrade from %q to %q", options.FromVersion, options.ToVersion) + } + + k8sClient, err := cluster.K8sHelper(ctx) + if err != nil { + return fmt.Errorf("error building kubernetes client: %w", err) + } + + options.masterNodes, err = k8sClient.NodeIPs(ctx, machinetype.TypeControlPlane) + if err != nil { + return fmt.Errorf("error fetching master nodes: %w", err) + } + + if len(options.masterNodes) == 0 { + return fmt.Errorf("no master nodes discovered") + } + + fmt.Printf("discovered master nodes %q\n", options.masterNodes) + + for _, service := range []string{kubeAPIServer, kubeControllerManager, kubeScheduler} { + if err = upgradeConfigPatch(ctx, cluster, options, service); err != nil { + return fmt.Errorf("failed updating service %q: %w", service, err) + } + } + + if err = hyperkubeUpgradeDs(ctx, k8sClient.Clientset, kubeProxy, options); err != nil { + return fmt.Errorf("error updating kube-proxy: %w", err) + } + + return nil +} + +func upgradeConfigPatch(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, service string) error { + fmt.Printf("updating %q to version %q\n", service, options.ToVersion) + + for _, node := range options.masterNodes { + if err := upgradeNodeConfigPatch(ctx, cluster, options, service, node); err != nil { + return fmt.Errorf("error updating node %q: %w", node, err) + } + } + + return nil +} + +//nolint: gocyclo +func upgradeNodeConfigPatch(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, service, node string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c, err := cluster.Client() + if err != nil { + return fmt.Errorf("error building Talos API client: %w", err) + } + + ctx = client.WithNodes(ctx, node) + + fmt.Printf(" > updating node %q\n", node) + + watchClient, err := c.Resources.Watch(ctx, config.NamespaceName, config.K8sControlPlaneType, service) + if err != nil { + return fmt.Errorf("error watching service configuration: %w", err) + } + + // first response is resource definition + _, err = watchClient.Recv() + if err != nil { + return fmt.Errorf("error watching config: %w", err) + } + + // second is the initial state + watchInitial, err := watchClient.Recv() + if err != nil { + return fmt.Errorf("error watching config: %w", err) + } + + if watchInitial.EventType != state.Created { + return fmt.Errorf("unexpected event type: %d", watchInitial.EventType) + } + + skipConfigWait := false + + err = patchNodeConfig(ctx, cluster, node, upgradeConfigPatcher(options, service)) + if err != nil { + if errors.Is(err, errUpdateSkipped) { + skipConfigWait = true + } else { + return fmt.Errorf("error patching node config: %w", err) + } + } + + var expectedConfigVersion string + + if !skipConfigWait { + watchUpdated, err := watchClient.Recv() + if err != nil { + return fmt.Errorf("error watching config: %w", err) + } + + if watchUpdated.EventType != state.Updated { + return fmt.Errorf("unexpected event type: %d", watchInitial.EventType) + } + + expectedConfigVersion = watchUpdated.Resource.Metadata().Version().String() + } else { + expectedConfigVersion = watchInitial.Resource.Metadata().Version().String() + } + + return retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second), retry.WithErrorLogging(true)).Retry(func() error { + return checkPodStatus(ctx, cluster, service, node, expectedConfigVersion) + }) +} + +var errUpdateSkipped = fmt.Errorf("update skipped") + +//nolint: gocyclo +func upgradeConfigPatcher(options UpgradeOptions, service string) func(config *v1alpha1config.Config) error { + return func(config *v1alpha1config.Config) error { + if config.ClusterConfig == nil { + config.ClusterConfig = &v1alpha1config.ClusterConfig{} + } + + switch service { + case kubeAPIServer: + if config.ClusterConfig.APIServerConfig == nil { + config.ClusterConfig.APIServerConfig = &v1alpha1config.APIServerConfig{} + } + + image := fmt.Sprintf("%s:v%s", constants.KubernetesAPIServerImage, options.ToVersion) + + if config.ClusterConfig.APIServerConfig.ContainerImage == image { + return errUpdateSkipped + } + + config.ClusterConfig.APIServerConfig.ContainerImage = image + case kubeControllerManager: + if config.ClusterConfig.ControllerManagerConfig == nil { + config.ClusterConfig.ControllerManagerConfig = &v1alpha1config.ControllerManagerConfig{} + } + + image := fmt.Sprintf("%s:v%s", constants.KubernetesControllerManagerImage, options.ToVersion) + + if config.ClusterConfig.ControllerManagerConfig.ContainerImage == image { + return errUpdateSkipped + } + + config.ClusterConfig.ControllerManagerConfig.ContainerImage = image + case kubeScheduler: + if config.ClusterConfig.SchedulerConfig == nil { + config.ClusterConfig.SchedulerConfig = &v1alpha1config.SchedulerConfig{} + } + + image := fmt.Sprintf("%s:v%s", constants.KubernetesSchedulerImage, options.ToVersion) + + if config.ClusterConfig.SchedulerConfig.ContainerImage == image { + return errUpdateSkipped + } + + config.ClusterConfig.SchedulerConfig.ContainerImage = image + default: + return fmt.Errorf("unsupported service %q", service) + } + + return nil + } +} + +//nolint: gocyclo +func checkPodStatus(ctx context.Context, cluster UpgradeProvider, service, node, configVersion string) error { + k8sClient, err := cluster.K8sHelper(ctx) + if err != nil { + return retry.UnexpectedError(fmt.Errorf("error building kubernetes client: %w", err)) + } + + pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{ + LabelSelector: fmt.Sprintf("k8s-app = %s", service), + }) + if err != nil { + if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) { + return retry.ExpectedError(err) + } + + return retry.UnexpectedError(err) + } + + podFound := false + + for _, pod := range pods.Items { + if pod.Status.HostIP != node { + continue + } + + podFound = true + + if pod.Annotations[constants.AnnotationStaticPodConfigVersion] != configVersion { + return retry.ExpectedError(fmt.Errorf("config version mismatch: got %q, expected %q", pod.Annotations[constants.AnnotationStaticPodConfigVersion], configVersion)) + } + + ready := false + + for _, condition := range pod.Status.Conditions { + if condition.Type != "Ready" { + continue + } + + if condition.Status == "True" { + ready = true + + break + } + } + + if !ready { + return retry.ExpectedError(fmt.Errorf("pod is not ready")) + } + + break + } + + if !podFound { + return retry.ExpectedError(fmt.Errorf("pod not found in the API server state")) + } + + return nil } diff --git a/pkg/cluster/kubernetes/upgrade.go b/pkg/cluster/kubernetes/upgrade.go index 04eee63619..b64cac0924 100644 --- a/pkg/cluster/kubernetes/upgrade.go +++ b/pkg/cluster/kubernetes/upgrade.go @@ -28,6 +28,7 @@ type UpgradeOptions struct { extraUpdaters []daemonsetUpdater podCheckpointerExtraUpdaters []daemonsetUpdater + masterNodes []string } type daemonsetUpdater func(ds string, daemonset *appsv1.DaemonSet) error