Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Support multi instance placement add #275

Merged
merged 4 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ linux-amd64/

# Helm packages
helm/**/*.tgz

# Vim swap files
*.swp
10 changes: 5 additions & 5 deletions pkg/apis/m3dboperator/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const (
// been created for the cluster.
ClusterConditionPlacementInitialized ClusterConditionType = "PlacementInitialized"

// ClusterConditionPodBootstrapping indicates there is a pod bootstrapping.
ClusterConditionPodBootstrapping ClusterConditionType = "PodBootstrapping"
// ClusterConditionPodsBootstrapping indicates there are pods bootstrapping.
ClusterConditionPodsBootstrapping ClusterConditionType = "PodsBootstrapping"
)

// M3DBCluster defines the cluster
Expand Down Expand Up @@ -103,10 +103,10 @@ func (s *M3DBStatus) HasInitializedPlacement() bool {
return s.hasConditionTrue(ClusterConditionPlacementInitialized)
}

// HasPodBootstrapping returns true if conditions indicate a pod is currently
// HasPodsBootstrapping returns true if conditions indicate a pod is currently
// bootstrapping.
func (s *M3DBStatus) HasPodBootstrapping() bool {
return s.hasConditionTrue(ClusterConditionPodBootstrapping)
func (s *M3DBStatus) HasPodsBootstrapping() bool {
return s.hasConditionTrue(ClusterConditionPodsBootstrapping)
}

// GetCondition returns the specified cluster condition if it exists with a bool
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/m3dboperator/v1alpha1/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestStatus(t *testing.T) {
f: func(s *M3DBStatus) bool { return s.HasInitializedPlacement() },
},
{
cond: ClusterConditionPodBootstrapping,
f: func(s *M3DBStatus) bool { return s.HasPodBootstrapping() },
cond: ClusterConditionPodsBootstrapping,
f: func(s *M3DBStatus) bool { return s.HasPodsBootstrapping() },
},
} {
t.Run(string(test.cond), func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/m3admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (c errorPlacementClient) Delete() error {
return c.err
}

func (c errorPlacementClient) Add(placementpb.Instance) error {
func (c errorPlacementClient) Add([]*placementpb.Instance) error {
return c.err
}

Expand Down
18 changes: 8 additions & 10 deletions pkg/controller/m3admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,18 @@ import (
"errors"
"testing"

"github.com/golang/mock/gomock"
"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

myspec "github.com/m3db/m3db-operator/pkg/apis/m3dboperator/v1alpha1"
"github.com/m3db/m3db-operator/pkg/m3admin"
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/m3admin/placement"
"github.com/m3db/m3db-operator/pkg/m3admin/topic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func newTestAdminClient(cl m3admin.Client, url string) *multiAdminClient {
Expand Down Expand Up @@ -237,7 +235,7 @@ func TestErrorPlacementClient(t *testing.T) {
err = cl.Delete()
assert.Equal(t, clErr, err)

err = cl.Add(placementpb.Instance{})
err = cl.Add([]*placementpb.Instance{{}})
assert.Equal(t, clErr, err)

err = cl.Remove("foo")
Expand Down
62 changes: 37 additions & 25 deletions pkg/controller/update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package controller

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand All @@ -38,12 +39,14 @@ import (
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/util/eventer"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/placement"
dbns "github.com/m3db/m3/src/dbnode/generated/proto/namespace"
"github.com/m3db/m3/src/query/generated/proto/admin"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this ls already imported as corev1 on the line above.

klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"

Expand Down Expand Up @@ -306,16 +309,14 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster)
return cluster, nil
}

func (c *M3DBController) setStatusPodBootstrapping(cluster *myspec.M3DBCluster,
func (c *M3DBController) setStatusPodsBootstrapping(cluster *myspec.M3DBCluster,
status corev1.ConditionStatus,
reason, message string) (*myspec.M3DBCluster, error) {

return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, status, reason, message)
return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, status, reason, message)
}

func (c *M3DBController) setStatus(cluster *myspec.M3DBCluster, condition myspec.ClusterConditionType,
status corev1.ConditionStatus, reason, message string) (*myspec.M3DBCluster, error) {

cond, ok := cluster.Status.GetCondition(condition)
if !ok {
cond = myspec.ClusterCondition{
Expand Down Expand Up @@ -355,43 +356,52 @@ func (c *M3DBController) reconcileBootstrappingStatus(cluster *myspec.M3DBCluste
}
}

return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, corev1.ConditionFalse,
return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, corev1.ConditionFalse,
"BootstrapComplete", "no bootstraps in progress")
}

func (c *M3DBController) addPodToPlacement(cluster *myspec.M3DBCluster, pod *corev1.Pod) error {
c.logger.Info("found pod not in placement", zap.String("pod", pod.Name))
inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider)
if err != nil {
err := fmt.Errorf("error creating instance for pod %s", pod.Name)
c.logger.Error(err.Error())
return err
func (c *M3DBController) addPodsToPlacement(cluster *myspec.M3DBCluster, pods []*corev1.Pod) error {
var (
instances = make([]*placementpb.Instance, 0, len(pods))
reasonBuf = bytes.NewBufferString("adding pods to placement (")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty different convention than how build up logs, and this isn't very perf-sensitive code.

I would vote for just adding to an array of strings, which are the names of the instances / pods, and then calling zap.Strings.

loggerFields = make([]zap.Field, 0, len(pods))
)
for _, pod := range pods {
c.logger.Info("found pod not in placement", zap.String("pod", pod.Name))
inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider)
if err != nil {
err := fmt.Errorf("error creating instance for pod %s", pod.Name)
c.logger.Error(err.Error())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: take advantage of structured logging here (i.e. logger.Error("error creating instance for pod", zap.String("pod", ...

return err
}
instances = append(instances, inst)
reasonBuf.WriteString(fmt.Sprintf("%s,", pod.Name))
loggerFields = append(loggerFields, zap.String("pod", pod.Name))
}

reason := fmt.Sprintf("adding pod %s to placement", pod.Name)
_, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason)
reasonBuf.WriteString(")")
reason := reasonBuf.String()
_, err := c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason)
if err != nil {
err := fmt.Errorf("error setting pod bootstrapping status: %v", err)
err := fmt.Errorf("error setting pods bootstrapping status: %w", err)
c.logger.Error(err.Error())
return err
}

err = c.adminClient.placementClientForCluster(cluster).Add(*inst)
err = c.adminClient.placementClientForCluster(cluster).Add(instances)
if err != nil {
err := fmt.Errorf("error adding pod to placement: %s", pod.Name)
err := fmt.Errorf("error: %s", reason)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here re: taking the change to structure the below .Error log.

c.logger.Error(err.Error())
return err
}

c.logger.Info("added pod to placement", zap.String("pod", pod.Name))
c.logger.Info("added pods to placement", loggerFields...)
return nil
}

func (c *M3DBController) checkPodsForReplacement(
cluster *myspec.M3DBCluster,
pods []*corev1.Pod,
pl placement.Placement) (string, *corev1.Pod, error) {

insts := pl.Instances()
sort.Sort(placement.ByIDAscending(insts))

Expand Down Expand Up @@ -429,7 +439,6 @@ func (c *M3DBController) replacePodInPlacement(
pl placement.Placement,
leavingInstanceID string,
newPod *corev1.Pod) error {

c.logger.Info("replacing pod in placement", zap.String("pod", leavingInstanceID))

newInst, err := m3db.PlacementInstanceFromPod(cluster, newPod, c.podIDProvider)
Expand All @@ -440,7 +449,7 @@ func (c *M3DBController) replacePodInPlacement(
}

reason := fmt.Sprintf("replacing %s pod in placement", newPod.Name)
_, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason)
_, err = c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason)
if err != nil {
err := fmt.Errorf("error setting replacement pod bootstrapping status: %v", err)
c.logger.Error(err.Error())
Expand All @@ -461,7 +470,6 @@ func (c *M3DBController) replacePodInPlacement(
// be added to the placement and chooses a pod to expand to the placement.
func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet,
group myspec.IsolationGroup, placement placement.Placement) error {

existInsts := instancesInIsoGroup(placement, group.Name)
if len(existInsts) >= int(group.NumInstances) {
c.logger.Warn("not expanding set, already at desired capacity",
Expand All @@ -483,6 +491,7 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set
return err
}

podsToAdd := make([]*v1.Pod, 0, len(pods))
for _, pod := range pods {
id, err := c.podIDProvider.Identity(pod, cluster)
if err != nil {
Expand All @@ -494,11 +503,14 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set
}
_, ok := placement.Instance(idStr)
if !ok {
return c.addPodToPlacement(cluster, pod)
podsToAdd = append(podsToAdd, pod)
}
}
if len(podsToAdd) == 0 {
return errors.New("could not find pod absent from placement")
}

return errors.New("could not find pod absent from placement")
return c.addPodsToPlacement(cluster, podsToAdd)
}

// shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and
Expand Down
Loading