Skip to content

Commit

Permalink
Refactor scaler to update only the DC
Browse files Browse the repository at this point in the history
  • Loading branch information
ironcladlou committed Nov 16, 2015
1 parent 224d98d commit 79988c9
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 132 deletions.
70 changes: 36 additions & 34 deletions pkg/deploy/scaler/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package scaler
import (
"time"

"github.com/golang/glog"
kerrors "k8s.io/kubernetes/pkg/api/errors"
kapi "k8s.io/kubernetes/pkg/api"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/util/wait"
Expand All @@ -26,9 +25,10 @@ type DeploymentConfigScaler struct {
clientInterface kclient.Interface
}

// Scale updates a replication controller created by the DeploymentConfig with the provided namespace/name,
// to a new size, with optional precondition check (if preconditions is not nil),optional retries (if retry
// is not nil), and then optionally waits for its replica count to reach the new value (if wait is not nil).
// Scale updates the DeploymentConfig with the provided namespace/name, to a
// new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for its
// deployment replica count to reach the new value (if wait is not nil).
func (scaler *DeploymentConfigScaler) Scale(namespace, name string, newSize uint, preconditions *kubectl.ScalePrecondition, retry, waitForReplicas *kubectl.RetryParams) error {
if preconditions == nil {
preconditions = &kubectl.ScalePrecondition{Size: -1, ResourceVersion: ""}
Expand All @@ -39,19 +39,6 @@ func (scaler *DeploymentConfigScaler) Scale(namespace, name string, newSize uint
}
cond := kubectl.ScaleCondition(scaler, preconditions, namespace, name, newSize)
if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil {
if scaleErr, ok := err.(kubectl.ControllerScaleError); ok && kerrors.IsNotFound(scaleErr.ActualError) {
glog.Infof("No deployment found for dc/%s. Scaling the deployment configuration template...", name)
dc, err := scaler.dcClient.DeploymentConfigs(namespace).Get(name)
if err != nil {
return err
}
dc.Template.ControllerTemplate.Replicas = int(newSize)

if _, err := scaler.dcClient.DeploymentConfigs(namespace).Update(dc); err != nil {
return err
}
return nil
}
return err
}
if waitForReplicas != nil {
Expand All @@ -63,33 +50,48 @@ func (scaler *DeploymentConfigScaler) Scale(namespace, name string, newSize uint
if err != nil {
return err
}
return wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, kclient.ControllerHasDesiredReplicas(scaler.clientInterface, rc))
return wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, controllerHasSpecifiedReplicas(scaler.clientInterface, rc, dc.Template.ControllerTemplate.Replicas))
}
return nil
}

// ScaleSimple does a simple one-shot attempt at scaling - not useful on it's own, but
// a necessary building block for Scale
// ScaleSimple does a simple one-shot attempt at scaling - not useful on it's
// own, but a necessary building block for Scale
func (scaler *DeploymentConfigScaler) ScaleSimple(namespace, name string, preconditions *kubectl.ScalePrecondition, newSize uint) error {
dc, err := scaler.dcClient.DeploymentConfigs(namespace).Get(name)
if err != nil {
return err
}
controller, err := scaler.rcClient.ReplicationControllers(namespace).Get(util.LatestDeploymentNameForConfig(dc))
if err != nil {
return kubectl.ControllerScaleError{FailureType: kubectl.ControllerScaleGetFailure, ResourceVersion: "Unknown", ActualError: err}
dc.Template.ControllerTemplate.Replicas = int(newSize)
if _, err := scaler.dcClient.DeploymentConfigs(namespace).Update(dc); err != nil {
return kubectl.ControllerScaleError{FailureType: kubectl.ControllerScaleUpdateFailure, ResourceVersion: dc.ResourceVersion, ActualError: err}
}
// TODO: do a better job of printing objects here.
return nil
}

if preconditions != nil {
if err := preconditions.ValidateReplicationController(controller); err != nil {
return err
// controllerHasSpecifiedReplicas returns a condition that will be true if and
// only if the specified replica count for a controller's ReplicaSelector
// equals the Replicas count.
//
// This is a slightly modified version of
// unversioned.ControllerHasDesiredReplicas. This is necessary because when
// scaling an RC via a DC, the RC spec replica count is not immediately
// updated to match the owning DC.
func controllerHasSpecifiedReplicas(c kclient.Interface, controller *kapi.ReplicationController, specifiedReplicas int) wait.ConditionFunc {
// If we're given a controller where the status lags the spec, it either means that the controller is stale,
// or that the rc manager hasn't noticed the update yet. Polling status.Replicas is not safe in the latter case.
desiredGeneration := controller.Generation

return func() (bool, error) {
ctrl, err := c.ReplicationControllers(controller.Namespace).Get(controller.Name)
if err != nil {
return false, err
}
// There's a chance a concurrent update modifies the Spec.Replicas causing this check to pass,
// or, after this check has passed, a modification causes the rc manager to create more pods.
// This will not be an issue once we've implemented graceful delete for rcs, but till then
// concurrent stop operations on the same rc might have unintended side effects.
return ctrl.Status.ObservedGeneration >= desiredGeneration && ctrl.Status.Replicas == specifiedReplicas, nil
}
controller.Spec.Replicas = int(newSize)
// TODO: do retry on 409 errors here?
if _, err := scaler.rcClient.ReplicationControllers(namespace).Update(controller); err != nil {
return kubectl.ControllerScaleError{FailureType: kubectl.ControllerScaleUpdateFailure, ResourceVersion: controller.ResourceVersion, ActualError: err}
}
// TODO: do a better job of printing objects here.
return nil
}
146 changes: 48 additions & 98 deletions pkg/deploy/scaler/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,125 +7,75 @@ import (
kapi "k8s.io/kubernetes/pkg/api"
ktestclient "k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/runtime"

"github.com/openshift/origin/pkg/client/testclient"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deploytest "github.com/openshift/origin/pkg/deploy/api/test"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)

func mkdeployment(version int) kapi.ReplicationController {
deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(version), kapi.Codec)
return *deployment
}

func mkDeploymentList(versions ...int) *kapi.ReplicationControllerList {
list := &kapi.ReplicationControllerList{}
for _, v := range versions {
list.Items = append(list.Items, mkdeployment(v))
}
return list
}

func TestScale(t *testing.T) {
tests := []struct {
testName string
namespace string
name string
count uint
preconditions *kubectl.ScalePrecondition
retry, waitForReplicas *kubectl.RetryParams
oc *testclient.Fake
kc *ktestclient.Fake
expected []ktestclient.Action
kexpected []ktestclient.Action
expectedErr error
name string
size uint
wait bool
errExpected bool
}{
{
testName: "simple scale",
namespace: "default",
name: "foo",
count: uint(3),
oc: testclient.NewSimpleFake(deploytest.OkDeploymentConfig(1)),
kc: ktestclient.NewSimpleFake(mkDeploymentList(1)),
expected: []ktestclient.Action{
ktestclient.NewGetAction("deploymentconfigs", "default", "foo"),
},
kexpected: []ktestclient.Action{
ktestclient.NewGetAction("replicationcontrollers", "default", "config-1"),
ktestclient.NewUpdateAction("replicationcontrollers", "default", nil),
},
expectedErr: nil,
},
{
testName: "wait for replicas",
namespace: "default",
name: "foo",
count: uint(3),
waitForReplicas: &kubectl.RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond},
oc: testclient.NewSimpleFake(deploytest.OkDeploymentConfig(1)),
kc: ktestclient.NewSimpleFake(mkDeploymentList(1)),
expected: []ktestclient.Action{
ktestclient.NewGetAction("deploymentconfigs", "default", "foo"),
ktestclient.NewGetAction("deploymentconfigs", "default", "foo"),
},
kexpected: []ktestclient.Action{
ktestclient.NewGetAction("replicationcontrollers", "default", "config-1"),
ktestclient.NewUpdateAction("replicationcontrollers", "default", nil),
ktestclient.NewGetAction("replicationcontrollers", "default", "config-1"),
ktestclient.NewGetAction("replicationcontrollers", "", "config-1"),
},
expectedErr: nil,
name: "simple scale",
size: 2,
wait: false,
errExpected: false,
},
{
testName: "no deployment - dc scale",
namespace: "default",
name: "foo",
count: uint(3),
oc: testclient.NewSimpleFake(deploytest.OkDeploymentConfig(1)),
kc: ktestclient.NewSimpleFake(),
expected: []ktestclient.Action{
ktestclient.NewGetAction("deploymentconfigs", "default", "foo"),
ktestclient.NewGetAction("deploymentconfigs", "default", "foo"),
ktestclient.NewUpdateAction("deploymentconfigs", "default", nil),
},
kexpected: []ktestclient.Action{
ktestclient.NewGetAction("replicationcontrollers", "default", "config-1"),
},
expectedErr: nil,
name: "scale with wait",
size: 2,
wait: true,
errExpected: false,
},
}

for _, test := range tests {
scaler := NewDeploymentConfigScaler(test.oc, test.kc)
got := scaler.Scale(test.namespace, test.name, test.count, test.preconditions, test.retry, test.waitForReplicas)
if got != test.expectedErr {
t.Errorf("%s: error mismatch: expected %v, got %v", test.testName, test.expectedErr, got)
}
t.Logf("evaluating test %q", test.name)
oc := &testclient.Fake{}
kc := &ktestclient.Fake{}
scaler := NewDeploymentConfigScaler(oc, kc)

config := deploytest.OkDeploymentConfig(1)
config.Template.ControllerTemplate.Replicas = 1
deployment, _ := deployutil.MakeDeployment(config, kapi.Codec)

if len(test.oc.Actions()) != len(test.expected) {
t.Fatalf("%s: unexpected OpenShift actions amount: %d, expected %d", test.testName, len(test.oc.Actions()), len(test.expected))
var wait *kubectl.RetryParams
if test.wait {
wait = &kubectl.RetryParams{Interval: time.Millisecond, Timeout: time.Second}
}
for j, actualAction := range test.oc.Actions() {
e, a := test.expected[j], actualAction
if e.GetVerb() != a.GetVerb() ||
e.GetNamespace() != a.GetNamespace() ||
e.GetResource() != a.GetResource() ||
e.GetSubresource() != a.GetSubresource() {
t.Errorf("%s: unexpected OpenShift action[%d]: %s, expected %s", test.testName, j, a, e)

oc.AddReactor("get", "deploymentconfigs", func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) {
return true, config, nil
})
oc.AddReactor("update", "deploymentconfigs", func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) {
// Simulate the asynchronous update of the RC replicas based on the DC
// replica count.
dc := action.(ktestclient.UpdateAction).GetObject().(*deployapi.DeploymentConfig)
deployment.Spec.Replicas = dc.Template.ControllerTemplate.Replicas
deployment.Status.Replicas = deployment.Spec.Replicas
return true, dc, nil
})
kc.AddReactor("get", "replicationcontrollers", func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) {
return true, deployment, nil
})

err := scaler.Scale("default", config.Name, test.size, nil, nil, wait)
if err != nil {
if !test.errExpected {
t.Errorf("unexpected error: %s", err)
continue
}
}

if len(test.kc.Actions()) != len(test.kexpected) {
t.Fatalf("%s: unexpected Kubernetes actions amount: %d, expected %d", test.testName, len(test.kc.Actions()), len(test.kexpected))
}
for j, actualAction := range test.kc.Actions() {
e, a := test.kexpected[j], actualAction
if e.GetVerb() != a.GetVerb() ||
e.GetNamespace() != a.GetNamespace() ||
e.GetResource() != a.GetResource() ||
e.GetSubresource() != a.GetSubresource() {
t.Errorf("%s: unexpected Kubernetes action[%d]: %s, expected %s", test.testName, j, a, e)
}
if e, a := config.Template.ControllerTemplate.Replicas, deployment.Spec.Replicas; e != a {
t.Errorf("expected rc/%s replicas %d, got %s", deployment.Name, e, a)
}
}
}

0 comments on commit 79988c9

Please sign in to comment.