Skip to content

Commit

Permalink
Add tests for creating missing stateful sets
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Jun 19, 2019
1 parent d96104b commit 3fafc59
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 73 deletions.
11 changes: 7 additions & 4 deletions pkg/controller/add_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ func TestEnsureConfigMap(t *testing.T) {

require.NoError(t, registerValidConfigMap("my_config_data"))

controller := deps.newController()
err := controller.ensureConfigMap(cluster)
controller, err := deps.newController()
require.NoError(t, err)

err = controller.ensureConfigMap(cluster)
assert.NoError(t, err)

cms, err := controller.kubeClient.CoreV1().ConfigMaps(cluster.Namespace).List(metav1.ListOptions{})
Expand All @@ -104,9 +106,10 @@ func TestEnsureConfigMap_Update(t *testing.T) {

require.NoError(t, registerValidConfigMap("my_config_data"))

controller := deps.newController()
controller, err := deps.newController()
require.NoError(t, err)

err := controller.ensureConfigMap(cluster)
err = controller.ensureConfigMap(cluster)
assert.NoError(t, err)

// Change configmap data, expect to see changes reflected.
Expand Down
17 changes: 14 additions & 3 deletions pkg/controller/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"
"time"

"github.com/m3db/m3db-operator/pkg/k8sops"

crdfake "github.com/m3db/m3db-operator/pkg/client/clientset/versioned/fake"
crdinformers "github.com/m3db/m3db-operator/pkg/client/informers/externalversions"
crdlisters "github.com/m3db/m3db-operator/pkg/client/listers/m3dboperator/v1alpha1"
Expand Down Expand Up @@ -70,20 +72,29 @@ type testDeps struct {
closed int32
}

func (deps *testDeps) newController() *Controller {
func (deps *testDeps) newController() (*Controller, error) {
logger := zap.NewNop()
m := newMultiAdminClient(nil, zap.NewNop())
m.nsClientFn = func(...namespace.Option) (namespace.Client, error) {
return deps.namespaceClient, nil
}
m.plClientFn = func(...placement.Option) (placement.Client, error) {
return deps.placementClient, nil
}
k8sopsClient, err := k8sops.New(
k8sops.WithKClient(deps.kubeClient),
k8sops.WithCRDClient(deps.crdClient),
k8sops.WithLogger(logger))
if err != nil {
return nil, err
}
return &Controller{
logger: zap.NewNop(),
logger: logger,
scope: tally.NoopScope,
clock: deps.clock,
adminClient: m,

k8sclient: k8sopsClient,
kubeClient: deps.kubeClient,
crdClient: deps.crdClient,
podIDProvider: deps.idProvider,
Expand All @@ -95,7 +106,7 @@ func (deps *testDeps) newController() *Controller {
podLister: deps.podLister,

recorder: eventer.NewNopPoster(),
}
}, nil
}

func (deps *testDeps) cleanup() {
Expand Down
80 changes: 49 additions & 31 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ var (
errNonUniqueIsoGroups = errors.New("isolation group names are not unique")
)

type handleClusterUpdateResult uint

const (
noopClusterUpdate handleClusterUpdateResult = iota
waitClusterUpdate
actionClusterUpdate
)

// Controller object
type Controller struct {
lock *sync.Mutex
Expand Down Expand Up @@ -314,12 +322,15 @@ func (c *Controller) handleClusterEvent(key string) error {
return errors.New("got nil cluster for " + key)
}

return c.handleClusterUpdate(cluster)
_, err = c.handleClusterUpdate(cluster)
return err
}

// We are guaranteed by handleClusterEvent that we will never be passed a nil
// cluster here.
func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) (handleClusterUpdateResult, error) {
var result handleClusterUpdateResult

// MUST create a deep copy of the cluster or risk corrupting cache! Technically
// only need if we modify, but we frequently do that so let's deep copy to
// start and remove unnecessary calls later to optimize if we want.
Expand All @@ -330,24 +341,24 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
if err := validateIsolationGroups(cluster); err != nil {
clusterLogger.Error("failed validating isolationgroups", zap.Error(err))
c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, err.Error())
return err
return result, err
}

if err := c.ensureConfigMap(cluster); err != nil {
clusterLogger.Error("failed to ensure configmap", zap.Error(err))
c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, "failed to ensure configmap: %s", err.Error())
return err
return result, err
}

// Per https://v1-10.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#statefulsetspec-v1-apps,
// headless service MUST exist before statefulset.
if err := c.ensureServices(cluster); err != nil {
return err
return result, err
}

if len(cluster.Spec.IsolationGroups) == 0 {
// nothing to do, no groups to create in
return nil
return result, nil
}

// copy since we sort the array
Expand All @@ -357,7 +368,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {

childrenSets, err := c.getChildStatefulSets(cluster)
if err != nil {
return err
return result, err
}

childrenSetsByName := make(map[string]*appsv1.StatefulSet)
Expand All @@ -368,35 +379,36 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
if sts.Spec.Replicas != nil && *sts.Spec.Replicas != sts.Status.ReadyReplicas {
// TODO(schallert): figure out what to do if replicas is not set
c.logger.Info("waiting for statefulset to be ready", zap.String("name", sts.Name), zap.Int32("ready", sts.Status.ReadyReplicas))
return nil
result = waitClusterUpdate
return result, nil
}
}

// Create any missing statefulsets, at this point all existing stateful sets are bootstrapped.
for i := 0; i < len(isoGroups); i++ {
name := fmt.Sprintf("%s-%d", cluster.Name, i)
name := k8sops.StatefulSetName(cluster.Name, i)
_, exists := childrenSetsByName[name]
if !exists {
sts, err := k8sops.GenerateStatefulSet(cluster, isoGroups[i].Name, isoGroups[i].NumInstances)
if err != nil {
return err
return result, err
}

_, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Create(sts)
if err != nil {
c.logger.Error(err.Error())
return err
return result, err
}

c.logger.Info("created statefulset", zap.String("name", name))
return nil
return actionClusterUpdate, nil
}
}

if err := c.reconcileNamespaces(cluster); err != nil {
c.recorder.WarningEvent(cluster, eventer.ReasonFailedCreate, "failed to create namespace: %s", err)
c.logger.Error("error reconciling namespaces", zap.Error(err))
return err
return result, err
}

if len(cluster.Spec.Namespaces) == 0 {
Expand All @@ -407,7 +419,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
if !cluster.Status.HasInitializedPlacement() {
cluster, err = c.validatePlacementWithStatus(cluster)
if err != nil {
return err
return result, err
}
}

Expand All @@ -417,12 +429,12 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
selector := klabels.SelectorFromSet(labels.BaseLabels(cluster))
pods, err := c.podLister.Pods(cluster.Namespace).List(selector)
if err != nil {
return fmt.Errorf("error listing pods: %v", err)
return result, fmt.Errorf("error listing pods: %v", err)
}

placement, err := c.adminClient.placementClientForCluster(cluster).Get()
if err != nil {
return fmt.Errorf("error fetching active placement: %v", err)
return result, fmt.Errorf("error fetching active placement: %v", err)
}

c.logger.Info("found placement", zap.Int("currentPods", len(pods)), zap.Int("placementInsts", placement.NumInstances()))
Expand All @@ -432,50 +444,50 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
if !inst.IsAvailable() {
unavailInsts = append(unavailInsts, inst.ID())
}

}

if ln := len(unavailInsts); ln > 0 {
c.logger.Warn("waiting for instances to be available", zap.Strings("instances", unavailInsts))
c.recorder.WarningEvent(cluster, eventer.ReasonLongerThanUsual, "current unavailable instances: %d", ln)
return nil
result = waitClusterUpdate
return result, nil
}

// Determine if any sets aren't at their desired replica count. Maybe we can
// reuse the set objects from above but being paranoid for now.
childrenSets, err = c.getChildStatefulSets(cluster)
if err != nil {
return err
return result, nil
}

// check if any pods inside the cluster need to be swapped in
leavingInstanceID, podToReplace, err := c.checkPodsForReplacement(cluster, pods, placement)
if err != nil {
return err
return result, nil
}

if podToReplace != nil {
err = c.replacePodInPlacement(cluster, placement, leavingInstanceID, podToReplace)
if err != nil {
c.recorder.WarningEvent(cluster, eventer.ReasonFailedToUpdate, "could not replace instance: "+leavingInstanceID)
return err
return result, err
}
c.recorder.NormalEvent(cluster, eventer.ReasonSuccessfulUpdate, "successfully replaced instance: "+leavingInstanceID)
}

for _, set := range childrenSets {
zone, ok := set.Labels[labels.IsolationGroup]
if !ok {
return fmt.Errorf("statefulset %s has no isolation-group label", set.Name)
return result, fmt.Errorf("statefulset %s has no isolation-group label", set.Name)
}

group, ok := myspec.IsolationGroups(isoGroups).GetByName(zone)
if !ok {
return fmt.Errorf("zone %s not found in cluster isoGroups %v", zone, isoGroups)
return result, fmt.Errorf("zone %s not found in cluster isoGroups %v", zone, isoGroups)
}

if set.Spec.Replicas == nil {
return fmt.Errorf("set %s has unset spec replica", set.Name)
return result, fmt.Errorf("set %s has unset spec replica", set.Name)
}

// Number of pods we want in the group.
Expand Down Expand Up @@ -503,15 +515,21 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
// absent from the placement, add pods to placement.
if inPlacement < current {
setLogger.Info("expanding placement for set")
return c.expandPlacementForSet(cluster, set, group, placement)
if err := c.expandPlacementForSet(cluster, set, group, placement); err != nil {
return result, err
}
return actionClusterUpdate, nil
}
}

// If there are more pods in the placement than we want in the group,
// trigger a remove so that we can shrink the set.
if inPlacement > desired {
setLogger.Info("remove instance from placement for set")
return c.shrinkPlacementForSet(cluster, set, placement)
if err := c.shrinkPlacementForSet(cluster, set, placement); err != nil {
return result, err
}
return actionClusterUpdate, nil
}

var newCount int32
Expand All @@ -525,23 +543,23 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
set.Spec.Replicas = pointer.Int32Ptr(newCount)
set, err = c.kubeClient.AppsV1().StatefulSets(set.Namespace).Update(set)
if err != nil {
return fmt.Errorf("error updating statefulset %s: %v", set.Name, err)
return result, fmt.Errorf("error updating statefulset %s: %v", set.Name, err)
}

return nil
return actionClusterUpdate, nil
}

placement, err = c.adminClient.placementClientForCluster(cluster).Get()
if err != nil {
return fmt.Errorf("error fetching placement: %v", err)
return result, fmt.Errorf("error fetching placement: %v", err)
}

// TODO(celina): possibly do a replacement check here

// See if we need to clean up the pod bootstrapping status.
cluster, err = c.reconcileBootstrappingStatus(cluster, placement)
if err != nil {
return fmt.Errorf("error reconciling bootstrap status: %v", err)
return result, fmt.Errorf("error reconciling bootstrap status: %v", err)
}

c.logger.Info("nothing to do",
Expand All @@ -550,7 +568,7 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
zap.Int64("generation", cluster.ObjectMeta.Generation),
zap.String("rv", cluster.ObjectMeta.ResourceVersion))

return nil
return noopClusterUpdate, nil
}

func instancesInIsoGroup(pl m3placement.Placement, isoGroup string) []m3placement.Instance {
Expand Down
Loading

0 comments on commit 3fafc59

Please sign in to comment.