Skip to content

Commit

Permalink
[controller] add support for adding multiple instances to a placement…
Browse files Browse the repository at this point in the history
… at once
  • Loading branch information
notbdu committed Feb 25, 2021
1 parent 06ebd7b commit 05f3c48
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 76 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ linux-amd64/

# Helm packages
helm/**/*.tgz

# Vim swap files
*.swp
10 changes: 5 additions & 5 deletions pkg/apis/m3dboperator/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const (
// been created for the cluster.
ClusterConditionPlacementInitialized ClusterConditionType = "PlacementInitialized"

// ClusterConditionPodBootstrapping indicates there is a pod bootstrapping.
ClusterConditionPodBootstrapping ClusterConditionType = "PodBootstrapping"
// ClusterConditionPodsBootstrapping indicates there are pods bootstrapping.
ClusterConditionPodsBootstrapping ClusterConditionType = "PodsBootstrapping"
)

// M3DBCluster defines the cluster
Expand Down Expand Up @@ -103,10 +103,10 @@ func (s *M3DBStatus) HasInitializedPlacement() bool {
return s.hasConditionTrue(ClusterConditionPlacementInitialized)
}

// HasPodBootstrapping returns true if conditions indicate a pod is currently
// HasPodsBootstrapping returns true if conditions indicate a pod is currently
// bootstrapping.
func (s *M3DBStatus) HasPodBootstrapping() bool {
return s.hasConditionTrue(ClusterConditionPodBootstrapping)
func (s *M3DBStatus) HasPodsBootstrapping() bool {
return s.hasConditionTrue(ClusterConditionPodsBootstrapping)
}

// GetCondition returns the specified cluster condition if it exists with a bool
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/m3admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (c errorPlacementClient) Delete() error {
return c.err
}

func (c errorPlacementClient) Add(placementpb.Instance) error {
func (c errorPlacementClient) Add([]*placementpb.Instance) error {
return c.err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/m3admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestErrorPlacementClient(t *testing.T) {
err = cl.Delete()
assert.Equal(t, clErr, err)

err = cl.Add(placementpb.Instance{})
err = cl.Add([]*placementpb.Instance{&placementpb.Instance{}})
assert.Equal(t, clErr, err)

err = cl.Remove("foo")
Expand Down
62 changes: 37 additions & 25 deletions pkg/controller/update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package controller

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand All @@ -38,12 +39,14 @@ import (
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/util/eventer"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/placement"
dbns "github.com/m3db/m3/src/dbnode/generated/proto/namespace"
"github.com/m3db/m3/src/query/generated/proto/admin"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"

Expand Down Expand Up @@ -306,16 +309,14 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster)
return cluster, nil
}

func (c *M3DBController) setStatusPodBootstrapping(cluster *myspec.M3DBCluster,
func (c *M3DBController) setStatusPodsBootstrapping(cluster *myspec.M3DBCluster,
status corev1.ConditionStatus,
reason, message string) (*myspec.M3DBCluster, error) {

return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, status, reason, message)
return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, status, reason, message)
}

func (c *M3DBController) setStatus(cluster *myspec.M3DBCluster, condition myspec.ClusterConditionType,
status corev1.ConditionStatus, reason, message string) (*myspec.M3DBCluster, error) {

cond, ok := cluster.Status.GetCondition(condition)
if !ok {
cond = myspec.ClusterCondition{
Expand Down Expand Up @@ -355,43 +356,52 @@ func (c *M3DBController) reconcileBootstrappingStatus(cluster *myspec.M3DBCluste
}
}

return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, corev1.ConditionFalse,
return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, corev1.ConditionFalse,
"BootstrapComplete", "no bootstraps in progress")
}

func (c *M3DBController) addPodToPlacement(cluster *myspec.M3DBCluster, pod *corev1.Pod) error {
c.logger.Info("found pod not in placement", zap.String("pod", pod.Name))
inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider)
if err != nil {
err := fmt.Errorf("error creating instance for pod %s", pod.Name)
c.logger.Error(err.Error())
return err
func (c *M3DBController) addPodsToPlacement(cluster *myspec.M3DBCluster, pods []*corev1.Pod) error {
var (
instances = make([]*placementpb.Instance, 0, len(pods))
reasonBuf = bytes.NewBufferString("adding pods to placement (")
loggerFields = make([]zap.Field, 0, len(pods))
)
for _, pod := range pods {
c.logger.Info("found pod not in placement", zap.String("pod", pod.Name))
inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider)
if err != nil {
err := fmt.Errorf("error creating instance for pod %s", pod.Name)
c.logger.Error(err.Error())
return err
}
instances = append(instances, inst)
reasonBuf.WriteString(fmt.Sprintf("%s,", pod.Name))
loggerFields = append(loggerFields, zap.String("pod", pod.Name))
}

reason := fmt.Sprintf("adding pod %s to placement", pod.Name)
_, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason)
reasonBuf.WriteString(")")
reason := reasonBuf.String()
_, err := c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason)
if err != nil {
err := fmt.Errorf("error setting pod bootstrapping status: %v", err)
err := fmt.Errorf("error setting pods bootstrapping status: %v", err)
c.logger.Error(err.Error())
return err
}

err = c.adminClient.placementClientForCluster(cluster).Add(*inst)
err = c.adminClient.placementClientForCluster(cluster).Add(instances)
if err != nil {
err := fmt.Errorf("error adding pod to placement: %s", pod.Name)
err := fmt.Errorf("error: %s", reason)
c.logger.Error(err.Error())
return err
}

c.logger.Info("added pod to placement", zap.String("pod", pod.Name))
c.logger.Info("added pods to placement", loggerFields...)
return nil
}

func (c *M3DBController) checkPodsForReplacement(
cluster *myspec.M3DBCluster,
pods []*corev1.Pod,
pl placement.Placement) (string, *corev1.Pod, error) {

insts := pl.Instances()
sort.Sort(placement.ByIDAscending(insts))

Expand Down Expand Up @@ -429,7 +439,6 @@ func (c *M3DBController) replacePodInPlacement(
pl placement.Placement,
leavingInstanceID string,
newPod *corev1.Pod) error {

c.logger.Info("replacing pod in placement", zap.String("pod", leavingInstanceID))

newInst, err := m3db.PlacementInstanceFromPod(cluster, newPod, c.podIDProvider)
Expand All @@ -440,7 +449,7 @@ func (c *M3DBController) replacePodInPlacement(
}

reason := fmt.Sprintf("replacing %s pod in placement", newPod.Name)
_, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason)
_, err = c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason)
if err != nil {
err := fmt.Errorf("error setting replacement pod bootstrapping status: %v", err)
c.logger.Error(err.Error())
Expand All @@ -461,7 +470,6 @@ func (c *M3DBController) replacePodInPlacement(
// be added to the placement and chooses a pod to expand to the placement.
func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet,
group myspec.IsolationGroup, placement placement.Placement) error {

existInsts := instancesInIsoGroup(placement, group.Name)
if len(existInsts) >= int(group.NumInstances) {
c.logger.Warn("not expanding set, already at desired capacity",
Expand All @@ -483,6 +491,7 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set
return err
}

podsToAdd := make([]*v1.Pod, 0, len(pods))
for _, pod := range pods {
id, err := c.podIDProvider.Identity(pod, cluster)
if err != nil {
Expand All @@ -494,11 +503,14 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set
}
_, ok := placement.Instance(idStr)
if !ok {
return c.addPodToPlacement(cluster, pod)
podsToAdd = append(podsToAdd, pod)
}
}
if len(podsToAdd) == 0 {
return errors.New("could not find pod absent from placement")
}

return errors.New("could not find pod absent from placement")
return c.addPodsToPlacement(cluster, podsToAdd)
}

// shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and
Expand Down
79 changes: 43 additions & 36 deletions pkg/controller/update_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,18 +365,18 @@ func TestNamespacesToReady(t *testing.T) {

func TestSetPodBootstrappingStatus(t *testing.T) {
cluster := getFixture("cluster-simple.yaml", t)
assert.False(t, cluster.Status.HasPodBootstrapping())
assert.False(t, cluster.Status.HasPodsBootstrapping())

deps := newTestDeps(t, &testOpts{
crdObjects: []runtime.Object{cluster},
})
controller := deps.newController(t)
defer deps.cleanup()

cluster, err := controller.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "foo", "bar")
cluster, err := controller.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "foo", "bar")
assert.NoError(t, err)

assert.True(t, cluster.Status.HasPodBootstrapping())
assert.True(t, cluster.Status.HasPodsBootstrapping())
}

func TestSetStatus(t *testing.T) {
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestReconcileBootstrappingStatus(t *testing.T) {
controller := deps.newController(t)
defer deps.cleanup()

const cond = myspec.ClusterConditionPodBootstrapping
const cond = myspec.ClusterConditionPodsBootstrapping

newPl := func(state shard.State) placement.Placement {
return placement.NewPlacement().SetInstances([]placement.Instance{
Expand All @@ -452,7 +452,7 @@ func TestReconcileBootstrappingStatus(t *testing.T) {
assert.Equal(t, string(corev1.ConditionFalse), string(c.Status))
}

func TestAddPodToPlacement(t *testing.T) {
func TestAddPodsToPlacement(t *testing.T) {
cluster := getFixture("cluster-simple.yaml", t)

deps := newTestDeps(t, &testOpts{
Expand All @@ -461,36 +461,40 @@ func TestAddPodToPlacement(t *testing.T) {
controller := deps.newController(t)
defer deps.cleanup()

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-a",
Labels: map[string]string{
"operator.m3db.io/isolation-group": "zone-a",
var (
pods []*corev1.Pod
placements []*placementpb.Instance
)
for _, name := range []string{"pod-a1", "pod-a2"} {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
"operator.m3db.io/isolation-group": "zone-a",
},
},
},
}

deps.idProvider.EXPECT().Identity(pod, cluster).Return(&myspec.PodIdentity{}, nil)

expInstance := placementpb.Instance{
Id: "{}",
IsolationGroup: "zone-a",
Zone: "embedded",
Endpoint: "pod-a.m3dbnode-cluster-simple:9000",
Hostname: "pod-a.m3dbnode-cluster-simple",
Port: 9000,
Weight: 100,
}
deps.idProvider.EXPECT().Identity(pod, cluster).Return(&myspec.PodIdentity{}, nil)
pods = append(pods, pod)
placements = append(placements, &placementpb.Instance{
Id: "{}",
IsolationGroup: "zone-a",
Zone: "embedded",
Endpoint: fmt.Sprintf("%s.m3dbnode-cluster-simple:9000", name),
Hostname: fmt.Sprintf("%s.m3dbnode-cluster-simple", name),
Port: 9000,
Weight: 100,
})
}
deps.placementClient.EXPECT().Add(placements)

deps.placementClient.EXPECT().Add(expInstance)

err := controller.addPodToPlacement(cluster, pod)
err := controller.addPodsToPlacement(cluster, pods)
assert.NoError(t, err)

cluster, err = controller.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{})
assert.NoError(t, err)

assert.True(t, cluster.Status.HasPodBootstrapping())
assert.True(t, cluster.Status.HasPodsBootstrapping())
}

func podsForClusterSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, numPods int) []*corev1.Pod {
Expand Down Expand Up @@ -599,19 +603,23 @@ func TestExpandPlacementForSet(t *testing.T) {

identifyPods(idProvider, pods, nil)

pl := placementFromPods(t, cluster, pods[:2], idProvider)
pl := placementFromPods(t, cluster, pods[:1], idProvider)
group := cluster.Spec.IsolationGroups[0]

instPb, err := m3db.PlacementInstanceFromPod(cluster, pods[2], idProvider)
require.NoError(t, err)
var placements []*placementpb.Instance
for _, pod := range pods[1:] {
instPb, err := m3db.PlacementInstanceFromPod(cluster, pod, idProvider)
require.NoError(t, err)
placements = append(placements, instPb)
}

placementMock.EXPECT().Add(*instPb)
placementMock.EXPECT().Add(placements)
err = controller.expandPlacementForSet(cluster, set, group, pl)
assert.NoError(t, err)

cluster, err = deps.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{})
require.NoError(t, err)
assert.True(t, cluster.Status.HasPodBootstrapping())
assert.True(t, cluster.Status.HasPodsBootstrapping())
}

func TestExpandPlacementForSet_Nop(t *testing.T) {
Expand Down Expand Up @@ -853,7 +861,6 @@ func TestSortPods(t *testing.T) {
}

func TestCheckPodsForReplacement(t *testing.T) {

cluster := getFixture("cluster-3-zones.yaml", t)
deps := newTestDeps(t, &testOpts{
crdObjects: []runtime.Object{cluster},
Expand Down Expand Up @@ -888,7 +895,8 @@ func TestCheckPodsForReplacement(t *testing.T) {
Labels: map[string]string{
"different": "label",
},
}})
},
})

identifyPods(idProvider, replacePods, nil)

Expand Down Expand Up @@ -950,7 +958,6 @@ func TestReplacePodInPlacement(t *testing.T) {

err = controller.replacePodInPlacement(cluster, pl, testLeavingInstanceID, testNewPod)
require.NoError(t, err)

}

func TestReplacePodInPlacementWithError(t *testing.T) {
Expand All @@ -972,7 +979,7 @@ func TestReplacePodInPlacementWithError(t *testing.T) {
pl := placementFromPods(t, cluster, podsForPlacement, idProvider)

// error creating instance
var badPod = &corev1.Pod{
badPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podsForPlacement[0].Name,
UID: types.UID("ABC"),
Expand Down
6 changes: 3 additions & 3 deletions pkg/m3admin/placement/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ func (p *placementClient) Get() (m3placement.Placement, error) {
return m3placement.NewPlacementFromProto(resp.Placement)
}

// Add will add an instance to the current placement
func (p *placementClient) Add(instance placementpb.Instance) error {
// Add multiple instances to the current placement.
func (p *placementClient) Add(instances []*placementpb.Instance) error {
url := p.url + placementBaseURL
req := &admin.PlacementAddRequest{
Instances: []*placementpb.Instance{&instance},
Instances: instances,
}

err := p.client.DoHTTPJSONPBRequest(http.MethodPost, url, req, nil)
Expand Down
Loading

0 comments on commit 05f3c48

Please sign in to comment.