Skip to content

Commit

Permalink
Wait for replica to update pods before processing next statefulset up…
Browse files Browse the repository at this point in the history
…date (#260)
  • Loading branch information
robskillington authored Jan 8, 2021
1 parent 4074a36 commit 2c89672
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 22 deletions.
36 changes: 33 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,39 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
// If any of the statefulsets aren't ready, wait until they are as we'll get
// another event (ready == bootstrapped)
for _, sts := range childrenSets {
if sts.Spec.Replicas != nil && *sts.Spec.Replicas != sts.Status.ReadyReplicas {
// TODO(schallert): figure out what to do if replicas is not set
c.logger.Info("waiting for statefulset to be ready", zap.String("name", sts.Name), zap.Int32("ready", sts.Status.ReadyReplicas))
if sts.Spec.Replicas == nil {
c.logger.Warn("skip check for statefulset, replicas is nil",
zap.String("name", sts.Name),
zap.Int32("readyReplicas", sts.Status.ReadyReplicas),
zap.Int32("updatedReplicas", sts.Status.UpdatedReplicas),
zap.String("currentRevision", sts.Status.CurrentRevision),
zap.String("updateRevision", sts.Status.UpdateRevision))
continue
}

replicas := *sts.Spec.Replicas
ready := replicas == sts.Status.ReadyReplicas
if sts.Status.UpdateRevision != "" {
// If there is an update revision, ensure all pods are updated
// otherwise there is a rollout in progress.
// Note: This ensures there is no race condition between
// updating a stateful set and it seemingly being "ready" and
// all pods healthy immediately after the update, since the
// updated replicas will not match the desired replicas
// and avoid the race condition of proceeding to another update
// before a stateful set has had a chance to update the pods
// but otherwise seemingly is healthy.
ready = ready && replicas == sts.Status.UpdatedReplicas
}

if !ready {
c.logger.Info("waiting for statefulset to be ready",
zap.String("name", sts.Name),
zap.Int32("replicas", replicas),
zap.Int32("readyReplicas", sts.Status.ReadyReplicas),
zap.Int32("updatedReplicas", sts.Status.UpdatedReplicas),
zap.String("currentRevision", sts.Status.CurrentRevision),
zap.String("updateRevision", sts.Status.UpdateRevision))
return nil
}
}
Expand Down
135 changes: 116 additions & 19 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,27 @@ func setupTestCluster(
return cluster, deps
}

type waitForStatefulSetsOptions struct {
setReadyReplicas bool
expectedStatefulSets []string
simulatePodsNotUpdated bool
}

type waitForStatefulSetsResult struct {
updatedStatefulSets []string
failedUpdateStatefulSets []string
}

func waitForStatefulSets(
t *testing.T,
controller *M3DBController,
cluster *myspec.M3DBCluster,
verb string,
setReadyReplicas bool,
expectedStatefulSets []string,
) bool {
opts waitForStatefulSetsOptions,
) (waitForStatefulSetsResult, bool) {
var (
mu sync.Mutex
updates int
actualSets = make(map[string]bool)
)

Expand All @@ -161,11 +172,21 @@ func waitForStatefulSets(
sts = action.(kubetesting.CreateActionImpl).GetObject().(*appsv1.StatefulSet)
case "update":
sts = action.(kubetesting.UpdateActionImpl).GetObject().(*appsv1.StatefulSet)
// Note: Simulate an update revision to make sure the updated
// replicas check is enforced.
updates++
sts.Status.UpdateRevision = fmt.Sprintf("updated-revision-%d", updates)
if !opts.simulatePodsNotUpdated {
// Simulate all update immediately, unless explicitly
// testing for when not updating them.
sts.Status.UpdatedReplicas = *sts.Spec.Replicas
}
default:
t.Errorf("verb %s is not supported", verb)
}

if setReadyReplicas && sts.Spec.Replicas != nil {
if opts.setReadyReplicas {
// Note: should always have replicas set.
sts.Status.ReadyReplicas = *sts.Spec.Replicas
}

Expand All @@ -183,8 +204,7 @@ func waitForStatefulSets(
// we see all stateful sets that we expect and also be able to catch any extra stateful
// sets that we don't.
var (
done bool
iters = math.Max(float64(2*len(expectedStatefulSets)), 5)
iters = math.Max(float64(2*len(opts.expectedStatefulSets)), 5)
)
for i := 0; i < int(iters); i++ {
err := controller.handleClusterUpdate(cluster)
Expand All @@ -199,13 +219,49 @@ func waitForStatefulSets(
}
mu.Unlock()

if seen != len(expectedStatefulSets) {
if seen != len(opts.expectedStatefulSets) {
time.Sleep(100 * time.Millisecond)
continue
}
done = true

break
}

mu.Lock()
defer mu.Unlock()

var (
updated []string
failed []string
seen int
)
// Start with all failed.
for _, name := range opts.expectedStatefulSets {
failed = append(failed, name)
}
for name, found := range actualSets {
if found {
seen++

// Updated.
updated = append(updated, name)

// Remove from failed.
filterFailed := failed[:]
failed = failed[:0]
for _, elem := range filterFailed {
if elem != name {
failed = append(failed, elem)
}
}
}
}
return done

done := seen == len(opts.expectedStatefulSets)
return waitForStatefulSetsResult{
updatedStatefulSets: updated,
failedUpdateStatefulSets: failed,
}, done
}

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -623,21 +679,27 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) {
defer deps.cleanup()
c := deps.newController(t)

done := waitForStatefulSets(t, c, cluster, "create", false, test.expCreateStatefulSets)
_, done := waitForStatefulSets(t, c, cluster, "create", waitForStatefulSetsOptions{
setReadyReplicas: false,
expectedStatefulSets: test.expCreateStatefulSets,
})
assert.True(t, done, "expected all sets to be created")
})
}
}

func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) {
tests := []struct {
name string
cluster *metav1.ObjectMeta
sets []*metav1.ObjectMeta
newImage string
newConfigMap string
increaseReplicas bool
expUpdateStatefulSets []string
name string
cluster *metav1.ObjectMeta
sets []*metav1.ObjectMeta
newImage string
newConfigMap string
increaseReplicas bool
simulatePodsNotUpdated bool
expUpdateStatefulSets []string
expFailedUpdateStatefulSets []string
expNotDone bool
}{
{
name: "updates stateful sets when image is out of date",
Expand Down Expand Up @@ -735,6 +797,30 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) {
increaseReplicas: true,
expUpdateStatefulSets: []string{},
},
{
name: "updates stateful sets does not progress if pods not updated",
cluster: newMeta("cluster1", map[string]string{
"foo": "bar",
"operator.m3db.io/app": "m3db",
"operator.m3db.io/cluster": "cluster1",
}, nil),
sets: []*metav1.ObjectMeta{
newMeta("cluster1-rep0", nil, map[string]string{
annotations.Update: "enabled",
}),
newMeta("cluster1-rep1", nil, map[string]string{
annotations.Update: "enabled",
}),
newMeta("cluster1-rep2", nil, map[string]string{
annotations.Update: "enabled",
}),
},
newImage: "m3db:v2.0.0",
simulatePodsNotUpdated: true,
expUpdateStatefulSets: []string{"cluster1-rep0", "cluster1-rep1", "cluster1-rep2"},
expFailedUpdateStatefulSets: []string{"cluster1-rep1", "cluster1-rep2"},
expNotDone: true,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -780,8 +866,19 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) {
})
}

done := waitForStatefulSets(t, c, cluster, "update", true, test.expUpdateStatefulSets)
assert.True(t, done, "expected all sets to be updated")
result, done := waitForStatefulSets(t, c, cluster, "update", waitForStatefulSetsOptions{
setReadyReplicas: true,
expectedStatefulSets: test.expUpdateStatefulSets,
simulatePodsNotUpdated: test.simulatePodsNotUpdated,
})
if test.expNotDone {
assert.False(t, done, "expected not all sets to be updated")
} else {
assert.True(t, done, "expected all sets to be updated")
}
if len(test.expFailedUpdateStatefulSets) > 0 {
assert.Equal(t, test.expFailedUpdateStatefulSets, result.failedUpdateStatefulSets)
}
})
}
}
Expand Down

0 comments on commit 2c89672

Please sign in to comment.