Skip to content

Commit

Permalink
fix: retry Kubernetes API errors on cordon/uncordon/etc
Browse files Browse the repository at this point in the history
This extracts function which was used in upgrade/convert flows to retry
transient errors to the main `kubernetes` package, expands it to ignore
timeout errors, and it is now used to retry errors where applicable in
`pkg/kubernetes`.

Fixes #3403

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Apr 2, 2021
1 parent 063d1ab commit a1e6415
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 33 deletions.
8 changes: 4 additions & 4 deletions pkg/cluster/kubernetes/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func waitForStaticPods(ctx context.Context, cluster ConvertProvider, options *Co
LabelSelector: fmt.Sprintf("k8s-app = %s", k8sApp),
})
if err != nil {
if retryableError(err) {
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

Expand Down Expand Up @@ -547,7 +547,7 @@ func disablePodCheckpointer(ctx context.Context, cluster ConvertProvider) error

checkpoints, err = getActiveCheckpoints(ctx, k8sClient)
if err != nil {
if retryableError(err) {
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

Expand Down Expand Up @@ -610,7 +610,7 @@ func deleteDaemonset(ctx context.Context, cluster ConvertProvider, k8sApp string
if err = retry.Constant(time.Minute, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
err = k8sClient.AppsV1().DaemonSets(namespace).Delete(ctx, k8sApp, v1.DeleteOptions{})
if err != nil {
if retryableError(err) {
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

Expand All @@ -631,7 +631,7 @@ func deleteDaemonset(ctx context.Context, cluster ConvertProvider, k8sApp string
LabelSelector: fmt.Sprintf("k8s-app = %s", k8sApp),
})
if err != nil {
if retryableError(err) {
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

Expand Down
27 changes: 0 additions & 27 deletions pkg/cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,3 @@

// Package kubernetes provides cluster-wide kubernetes utilities.
package kubernetes

import (
"errors"
"io"
"net"
"syscall"

apierrors "k8s.io/apimachinery/pkg/api/errors"
)

func retryableError(err error) bool {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return true
}

if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}

netErr := &net.OpError{}

if errors.As(err, &netErr) {
return netErr.Temporary() || errors.Is(netErr.Err, syscall.ECONNREFUSED)
}

return false
}
3 changes: 2 additions & 1 deletion pkg/cluster/kubernetes/self_hosted.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/talos-systems/talos/pkg/cluster"
k8s "github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/constants"
)

Expand Down Expand Up @@ -130,7 +131,7 @@ func updateDaemonset(ctx context.Context, clientset *kubernetes.Clientset, ds st
return retry.Constant(5*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error {
daemonset, err = clientset.AppsV1().DaemonSets(namespace).Get(ctx, ds, metav1.GetOptions{})
if err != nil {
if retryableError(err) {
if k8s.IsRetryableError(err) {
return retry.ExpectedError(err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/cluster/kubernetes/talos_managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/talos-systems/talos/pkg/cluster"
"github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/client"
v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
machinetype "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
Expand Down Expand Up @@ -226,7 +227,7 @@ func checkPodStatus(ctx context.Context, cluster UpgradeProvider, service, node,
LabelSelector: fmt.Sprintf("k8s-app = %s", service),
})
if err != nil {
if retryableError(err) {
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/kubernetes/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package kubernetes

import (
"errors"
"io"
"net"
"syscall"

apierrors "k8s.io/apimachinery/pkg/api/errors"
)

// IsRetryableError returns true if this Kubernetes API should be retried.
func IsRetryableError(err error) bool {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return true
}

if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}

netErr := &net.OpError{}

if errors.As(err, &netErr) {
return netErr.Temporary() || netErr.Timeout() || errors.Is(netErr.Err, syscall.ECONNREFUSED)
}

return false
}
12 changes: 12 additions & 0 deletions pkg/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ func (h *Client) Cordon(ctx context.Context, name string) error {
err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).RetryWithContext(ctx, func(ctx context.Context) error {
node, err := h.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
if err != nil {
if IsRetryableError(err) {
return retry.ExpectedError(err)
}

return retry.UnexpectedError(err)
}

Expand Down Expand Up @@ -332,6 +336,10 @@ func (h *Client) Uncordon(ctx context.Context, name string, force bool) error {

node, err := h.CoreV1().Nodes().Get(attemptCtx, name, metav1.GetOptions{})
if err != nil {
if IsRetryableError(err) {
return retry.ExpectedError(err)
}

return retry.UnexpectedError(err)
}

Expand Down Expand Up @@ -447,6 +455,10 @@ func (h *Client) waitForPodDeleted(ctx context.Context, p *corev1.Pod) error {
case apierrors.IsNotFound(err):
return nil
case err != nil:
if IsRetryableError(err) {
return retry.ExpectedError(err)
}

return retry.UnexpectedError(fmt.Errorf("failed to get pod %s/%s: %w", p.GetNamespace(), p.GetName(), err))
}

Expand Down

0 comments on commit a1e6415

Please sign in to comment.