Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix minReadySeconds for DC #14954

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/deploy/strategy/recreate/recreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewRecreateDeploymentStrategy(client kclientset.Interface, tagClient client
eventClient: client.Core(),
podClient: client.Core(),
getUpdateAcceptor: func(timeout time.Duration, minReadySeconds int32) strat.UpdateAcceptor {
return stratsupport.NewAcceptAvailablePods(out, client.Core(), timeout, acceptorInterval, minReadySeconds)
return stratsupport.NewAcceptAvailablePods(out, client.Core(), timeout)
},
scaler: scaler,
decoder: decoder,
Expand Down
2 changes: 1 addition & 1 deletion pkg/deploy/strategy/rolling/rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewRollingDeploymentStrategy(namespace string, client kclientset.Interface,
},
hookExecutor: stratsupport.NewHookExecutor(client.Core(), tags, client.Core(), os.Stdout, decoder),
getUpdateAcceptor: func(timeout time.Duration, minReadySeconds int32) strat.UpdateAcceptor {
return stratsupport.NewAcceptAvailablePods(out, client.Core(), timeout, acceptorInterval, minReadySeconds)
return stratsupport.NewAcceptAvailablePods(out, client.Core(), timeout)
},
}
}
Expand Down
120 changes: 26 additions & 94 deletions pkg/deploy/strategy/support/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ import (
"sync"
"time"

"github.com/golang/glog"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
kapi "k8s.io/kubernetes/pkg/api"
kapipod "k8s.io/kubernetes/pkg/api/pod"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"

"github.com/openshift/origin/pkg/client"
Expand Down Expand Up @@ -481,117 +477,53 @@ func newPodWatch(client kcoreclient.PodInterface, namespace, name, resourceVersi
// NewAcceptAvailablePods makes a new acceptAvailablePods from a real client.
func NewAcceptAvailablePods(
out io.Writer,
kclient kcoreclient.PodsGetter,
kclient kcoreclient.ReplicationControllersGetter,
timeout time.Duration,
interval time.Duration,
minReadySeconds int32,
) *acceptAvailablePods {

return &acceptAvailablePods{
out: out,
timeout: timeout,
interval: interval,
minReadySeconds: minReadySeconds,
acceptedPods: sets.NewString(),
getRcPodStore: func(rc *kapi.ReplicationController) (cache.Store, chan struct{}) {
selector := labels.Set(rc.Spec.Selector).AsSelector()
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = selector.String()
return kclient.Pods(rc.Namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = selector.String()
return kclient.Pods(rc.Namespace).Watch(options)
},
}
stop := make(chan struct{})
cache.NewReflector(lw, &kapi.Pod{}, store, 10*time.Second).RunUntil(stop)
return store, stop
},
out: out,
kclient: kclient,
timeout: timeout,
}
}

// acceptAvailablePods will accept a replication controller if all the pods
// for the replication controller become available.
//
// acceptAvailablePods keeps track of the pods it has accepted for a
// replication controller so that the acceptor can be reused across multiple
// batches of updates to a single controller. For example, if during the first
// acceptance call the replication controller has 3 pods, the acceptor will
// validate those 3 pods. If the same acceptor instance is used again for the
// same replication controller which now has 6 pods, only the latest 3 pods
// will be considered for acceptance. The status of the original 3 pods becomes
// irrelevant.
//
// Note that this struct is stateful and intended for use with a single
// replication controller and should be discarded and recreated between
// rollouts.
type acceptAvailablePods struct {
out io.Writer
// getRcPodStore should return a Store containing all the pods for the
// replication controller, and a channel to stop whatever process is
// feeding the store.
getRcPodStore func(*kapi.ReplicationController) (cache.Store, chan struct{})
// timeout is how long to wait for pod readiness.
out io.Writer
kclient kcoreclient.ReplicationControllersGetter
// timeout is how long to wait for pods to become available from ready state.
timeout time.Duration
// interval is how often to check for pod readiness
interval time.Duration
// minReadySeconds is the minimum number of seconds for which a newly created
// pod should be ready without any of its container crashing, for it to be
// considered available.
minReadySeconds int32
// acceptedPods keeps track of pods which have been previously accepted for
// a replication controller.
acceptedPods sets.String
}

// Accept all pods for a replication controller once they are available.
func (c *acceptAvailablePods) Accept(rc *kapi.ReplicationController) error {
// Make a pod store to poll and ensure it gets cleaned up.
podStore, stopStore := c.getRcPodStore(rc)
defer close(stopStore)
allReplicasAvailable := func(r *kapi.ReplicationController) bool {
return r.Status.AvailableReplicas == r.Spec.Replicas
}

// Start checking for pod updates.
if c.acceptedPods.Len() > 0 {
fmt.Fprintf(c.out, "--> Waiting up to %s for pods in rc %s to become ready (%d pods previously accepted)\n", c.timeout, rc.Name, c.acceptedPods.Len())
} else {
fmt.Fprintf(c.out, "--> Waiting up to %s for pods in rc %s to become ready\n", c.timeout, rc.Name)
}
err := wait.Poll(c.interval, c.timeout, func() (done bool, err error) {
// Check for pod readiness.
unready := sets.NewString()
for _, obj := range podStore.List() {
pod := obj.(*kapi.Pod)
// Skip previously accepted pods; we only want to verify newly observed
// and unaccepted pods.
if c.acceptedPods.Has(pod.Name) {
continue
}
if kapipod.IsPodAvailable(pod, c.minReadySeconds, metav1.NewTime(time.Now())) {
// If the pod is ready, track it as accepted.
c.acceptedPods.Insert(pod.Name)
} else {
// Otherwise, track it as unready.
unready.Insert(pod.Name)
}
}
// Check to see if we're done.
if unready.Len() == 0 {
return true, nil
if allReplicasAvailable(rc) {
return nil
}

watcher, err := c.kclient.ReplicationControllers(rc.Namespace).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: rc.Name, ResourceVersion: rc.ResourceVersion}))
if err != nil {
return fmt.Errorf("acceptAvailablePods failed to watch ReplicationController %s/%s: %v", rc.Namespace, rc.Name, err)
}

_, err = watch.Until(c.timeout, watcher, func(event watch.Event) (bool, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kargakis I know what you are about to say :), PTO and certification got into way of fixing watch.Until I'll get back to working on it.
Well, it used WATCH even in previous implementation.

if t := event.Type; t != watch.Modified {
return false, fmt.Errorf("acceptAvailablePods failed watching for ReplicationController %s/%s: received event %v", rc.Namespace, rc.Name, t)
}
// Otherwise, try again later.
glog.V(4).Infof("Still waiting for %d pods to become ready for rc %s", unready.Len(), rc.Name)
return false, nil
newRc := event.Object.(*kapi.ReplicationController)
return allReplicasAvailable(newRc), nil
})

// Handle acceptance failure.
if err != nil {
if err == wait.ErrWaitTimeout {
return fmt.Errorf("pods for rc %q took longer than %.f seconds to become ready", rc.Name, c.timeout.Seconds())
return fmt.Errorf("pods for rc '%s/%s' took longer than %.f seconds to become available", rc.Namespace, rc.Name, c.timeout.Seconds())
}
return fmt.Errorf("pod readiness check failed for rc %q: %v", rc.Name, err)
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing the context from the error?

Copy link
Contributor Author

@tnozicka tnozicka Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in current context it feels misleading in what it's saying. Other things might have failed here, not just "pod readiness check".

I am open to suggestions if we want to reformat the error here and not leave it on the caller.

}
return nil
}
112 changes: 0 additions & 112 deletions pkg/deploy/strategy/support/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
kapi "k8s.io/kubernetes/pkg/api"
kapihelper "k8s.io/kubernetes/pkg/api/helper"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
Expand Down Expand Up @@ -542,116 +540,6 @@ func TestHookExecutor_makeHookPodRestart(t *testing.T) {
}
}

func TestAcceptAvailablePods_scenarios(t *testing.T) {
scenarios := []struct {
name string
// any pods which are previously accepted
acceptedPods []string
// the current pods which will be in the store; pod name -> ready
currentPods map[string]bool
// whether or not the scenario should result in acceptance
accepted bool
}{
{
name: "all ready, none previously accepted",
accepted: true,
acceptedPods: []string{},
currentPods: map[string]bool{
"pod-1": true,
"pod-2": true,
},
},
{
name: "some ready, none previously accepted",
accepted: false,
acceptedPods: []string{},
currentPods: map[string]bool{
"pod-1": false,
"pod-2": true,
},
},
{
name: "previously accepted has become unready, new are ready",
accepted: true,
acceptedPods: []string{"pod-1"},
currentPods: map[string]bool{
// this pod should be ignored because it was previously accepted
"pod-1": false,
"pod-2": true,
},
},
{
name: "previously accepted all ready, new is unready",
accepted: false,
acceptedPods: []string{"pod-1"},
currentPods: map[string]bool{
"pod-1": true,
"pod-2": false,
},
},
}
for _, s := range scenarios {
t.Logf("running scenario: %s", s.name)

// Populate the store with real pods with the desired ready condition.
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
for podName, ready := range s.currentPods {
status := kapi.ConditionTrue
if !ready {
status = kapi.ConditionFalse
}
pod := &kapi.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Status: kapi.PodStatus{
Conditions: []kapi.PodCondition{
{
Type: kapi.PodReady,
Status: status,
},
},
},
}
store.Add(pod)
}

// Set up accepted pods for the scenario.
acceptedPods := sets.NewString()
for _, podName := range s.acceptedPods {
acceptedPods.Insert(podName)
}

acceptorLogs := &bytes.Buffer{}
acceptor := &acceptAvailablePods{
out: acceptorLogs,
timeout: 10 * time.Millisecond,
interval: 1 * time.Millisecond,
getRcPodStore: func(deployment *kapi.ReplicationController) (cache.Store, chan struct{}) {
return store, make(chan struct{})
},
acceptedPods: acceptedPods,
}

deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codecs.LegacyCodec(deployv1.SchemeGroupVersion))
deployment.Spec.Replicas = 1

acceptor.out = &bytes.Buffer{}
err := acceptor.Accept(deployment)

if s.accepted {
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
} else {
if err == nil {
t.Fatalf("expected an error")
}
t.Logf("got expected error: %s", err)
}
}
}

func deployment(name, namespace string, strategyLabels, strategyAnnotations map[string]string) (*deployapi.DeploymentConfig, *kapi.ReplicationController) {
config := &deployapi.DeploymentConfig{
ObjectMeta: metav1.ObjectMeta{
Expand Down
33 changes: 12 additions & 21 deletions test/extended/deployments/deployments.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package deployments

import (
//"errors"
"errors"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -934,31 +934,22 @@ var _ = g.Describe("deploymentconfigs", func() {
return true, nil
}

// FIXME: There is a race between deployer pod updating phase and RC updating AvailableReplicas
// FIXME: Enable this when we switch pod acceptors to use RC AvailableReplicas with MinReadySecondsSet
//if deployutil.DeploymentStatusFor(rc) == deployapi.DeploymentStatusComplete {
// e2e.Logf("Failed RC: %#v", rc)
// return false, errors.New("deployment shouldn't be completed before ReadyReplicas become AvailableReplicas")
//}
if deployutil.DeploymentStatusFor(rc) == deployapi.DeploymentStatusComplete {
e2e.Logf("Failed RC: %#v", rc)
return false, errors.New("deployment shouldn't be completed before ReadyReplicas become AvailableReplicas")
}
return false, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(rc1.Status.AvailableReplicas).To(o.Equal(dc.Spec.Replicas))
// FIXME: There is a race between deployer pod updating phase and RC updating AvailableReplicas
// FIXME: Enable this when we switch pod acceptors to use RC AvailableReplicas with MinReadySecondsSet
//// Deployment status can't be updated yet but should be right after
//o.Expect(deployutil.DeploymentStatusFor(rc1)).To(o.Equal(deployapi.DeploymentStatusRunning))
// Deployment status can't be updated yet but should be right after
o.Expect(deployutil.DeploymentStatusFor(rc1)).To(o.Equal(deployapi.DeploymentStatusRunning))
// It should finish right after
// FIXME: remove this condition when the above is fixed
if deployutil.DeploymentStatusFor(rc1) != deployapi.DeploymentStatusComplete {
// FIXME: remove this assertion when the above is fixed
o.Expect(deployutil.DeploymentStatusFor(rc1)).To(o.Equal(deployapi.DeploymentStatusRunning))
rc1, err = waitForRCModification(oc, namespace, rc1.Name, deploymentChangeTimeout,
rc1.GetResourceVersion(), func(rc *kapiv1.ReplicationController) (bool, error) {
return deployutil.DeploymentStatusFor(rc) == deployapi.DeploymentStatusComplete, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
}
rc1, err = waitForRCModification(oc, namespace, rc1.Name, deploymentChangeTimeout,
rc1.GetResourceVersion(), func(rc *kapiv1.ReplicationController) (bool, error) {
return deployutil.DeploymentStatusFor(rc) == deployapi.DeploymentStatusComplete, nil
})
o.Expect(err).NotTo(o.HaveOccurred())

// We might check that minReadySecond passed between pods becoming ready
// and available but I don't think there is a way to get a timestamp from events
Expand Down
26 changes: 0 additions & 26 deletions vendor/k8s.io/kubernetes/pkg/api/pod/util_patch.go

This file was deleted.