Skip to content

Commit

Permalink
[controller] Support multi instance placement add (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored Mar 2, 2021
1 parent 7a6466e commit 66671e8
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 101 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ linux-amd64/

# Helm packages
helm/**/*.tgz

# Vim swap files
*.swp
10 changes: 5 additions & 5 deletions pkg/apis/m3dboperator/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const (
// been created for the cluster.
ClusterConditionPlacementInitialized ClusterConditionType = "PlacementInitialized"

// ClusterConditionPodBootstrapping indicates there is a pod bootstrapping.
ClusterConditionPodBootstrapping ClusterConditionType = "PodBootstrapping"
// ClusterConditionPodsBootstrapping indicates there are pods bootstrapping.
ClusterConditionPodsBootstrapping ClusterConditionType = "PodsBootstrapping"
)

// M3DBCluster defines the cluster
Expand Down Expand Up @@ -103,10 +103,10 @@ func (s *M3DBStatus) HasInitializedPlacement() bool {
return s.hasConditionTrue(ClusterConditionPlacementInitialized)
}

// HasPodBootstrapping returns true if conditions indicate a pod is currently
// HasPodsBootstrapping returns true if conditions indicate a pod is currently
// bootstrapping.
func (s *M3DBStatus) HasPodBootstrapping() bool {
return s.hasConditionTrue(ClusterConditionPodBootstrapping)
func (s *M3DBStatus) HasPodsBootstrapping() bool {
return s.hasConditionTrue(ClusterConditionPodsBootstrapping)
}

// GetCondition returns the specified cluster condition if it exists with a bool
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/m3dboperator/v1alpha1/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestStatus(t *testing.T) {
f: func(s *M3DBStatus) bool { return s.HasInitializedPlacement() },
},
{
cond: ClusterConditionPodBootstrapping,
f: func(s *M3DBStatus) bool { return s.HasPodBootstrapping() },
cond: ClusterConditionPodsBootstrapping,
f: func(s *M3DBStatus) bool { return s.HasPodsBootstrapping() },
},
} {
t.Run(string(test.cond), func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/m3admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (c errorPlacementClient) Delete() error {
return c.err
}

func (c errorPlacementClient) Add(placementpb.Instance) error {
func (c errorPlacementClient) Add([]*placementpb.Instance) error {
return c.err
}

Expand Down
18 changes: 8 additions & 10 deletions pkg/controller/m3admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,18 @@ import (
"errors"
"testing"

"github.com/golang/mock/gomock"
"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

myspec "github.com/m3db/m3db-operator/pkg/apis/m3dboperator/v1alpha1"
"github.com/m3db/m3db-operator/pkg/m3admin"
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/m3admin/placement"
"github.com/m3db/m3db-operator/pkg/m3admin/topic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func newTestAdminClient(cl m3admin.Client, url string) *multiAdminClient {
Expand Down Expand Up @@ -237,7 +235,7 @@ func TestErrorPlacementClient(t *testing.T) {
err = cl.Delete()
assert.Equal(t, clErr, err)

err = cl.Add(placementpb.Instance{})
err = cl.Add([]*placementpb.Instance{{}})
assert.Equal(t, clErr, err)

err = cl.Remove("foo")
Expand Down
82 changes: 46 additions & 36 deletions pkg/controller/update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/util/eventer"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/placement"
dbns "github.com/m3db/m3/src/dbnode/generated/proto/namespace"
"github.com/m3db/m3/src/query/generated/proto/admin"
Expand Down Expand Up @@ -100,7 +101,7 @@ func (c *M3DBController) createNamespaces(cluster *myspec.M3DBCluster, registry
zap.String("namespace", ns.Name),
zap.Error(err))

return fmt.Errorf("error creating namespace '%s': %v", ns.Name, err)
return fmt.Errorf("error creating namespace '%s': %w", ns.Name, err)
}

c.recorder.NormalEvent(cluster, eventer.ReasonCreating, "created namespace "+ns.Name)
Expand Down Expand Up @@ -295,8 +296,8 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster)
var err error
cluster, err = c.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).UpdateStatus(cluster)
if err != nil {
err := fmt.Errorf("error updating cluster placement init status: %v", err)
c.logger.Error(err.Error())
c.logger.Error("error updating cluster placement init status",
zap.Error(err))
c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, "failed to update placement status: %v", err)
return nil, err
}
Expand All @@ -306,16 +307,14 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster)
return cluster, nil
}

func (c *M3DBController) setStatusPodBootstrapping(cluster *myspec.M3DBCluster,
func (c *M3DBController) setStatusPodsBootstrapping(cluster *myspec.M3DBCluster,
status corev1.ConditionStatus,
reason, message string) (*myspec.M3DBCluster, error) {

return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, status, reason, message)
return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, status, reason, message)
}

func (c *M3DBController) setStatus(cluster *myspec.M3DBCluster, condition myspec.ClusterConditionType,
status corev1.ConditionStatus, reason, message string) (*myspec.M3DBCluster, error) {

cond, ok := cluster.Status.GetCondition(condition)
if !ok {
cond = myspec.ClusterCondition{
Expand Down Expand Up @@ -355,43 +354,51 @@ func (c *M3DBController) reconcileBootstrappingStatus(cluster *myspec.M3DBCluste
}
}

return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, corev1.ConditionFalse,
return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, corev1.ConditionFalse,
"BootstrapComplete", "no bootstraps in progress")
}

func (c *M3DBController) addPodToPlacement(cluster *myspec.M3DBCluster, pod *corev1.Pod) error {
c.logger.Info("found pod not in placement", zap.String("pod", pod.Name))
inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider)
if err != nil {
err := fmt.Errorf("error creating instance for pod %s", pod.Name)
c.logger.Error(err.Error())
return err
func (c *M3DBController) addPodsToPlacement(cluster *myspec.M3DBCluster, pods []*corev1.Pod) error {
var (
instances = make([]*placementpb.Instance, 0, len(pods))
podNames = make([]string, 0, len(pods))
)
for _, pod := range pods {
c.logger.Info("found pod not in placement", zap.String("pod", pod.Name))
inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider)
if err != nil {
c.logger.Error("error creating instance for pod",
zap.String("pod", pod.Name),
zap.Error(err))
return err
}
instances = append(instances, inst)
podNames = append(podNames, pod.Name)
}

reason := fmt.Sprintf("adding pod %s to placement", pod.Name)
_, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason)
reason := fmt.Sprintf("adding pods to placement (%s)", strings.Join(podNames, ", "))
_, err := c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason)
if err != nil {
err := fmt.Errorf("error setting pod bootstrapping status: %v", err)
c.logger.Error(err.Error())
c.logger.Error("error setting pods bootstrapping status",
zap.Error(err))
return err
}

err = c.adminClient.placementClientForCluster(cluster).Add(*inst)
err = c.adminClient.placementClientForCluster(cluster).Add(instances)
if err != nil {
err := fmt.Errorf("error adding pod to placement: %s", pod.Name)
c.logger.Error(err.Error())
c.logger.Error("error adding instances to placement",
zap.String("reason", reason),
zap.Error(err))
return err
}

c.logger.Info("added pod to placement", zap.String("pod", pod.Name))
c.logger.Info("added pods to placement", zap.Strings("pods", podNames))
return nil
}

func (c *M3DBController) checkPodsForReplacement(
cluster *myspec.M3DBCluster,
pods []*corev1.Pod,
pl placement.Placement) (string, *corev1.Pod, error) {

insts := pl.Instances()
sort.Sort(placement.ByIDAscending(insts))

Expand Down Expand Up @@ -429,28 +436,28 @@ func (c *M3DBController) replacePodInPlacement(
pl placement.Placement,
leavingInstanceID string,
newPod *corev1.Pod) error {

c.logger.Info("replacing pod in placement", zap.String("pod", leavingInstanceID))

newInst, err := m3db.PlacementInstanceFromPod(cluster, newPod, c.podIDProvider)
if err != nil {
err := fmt.Errorf("error creating instance from replacement pod %s: %v", newPod.Name, err)
c.logger.Error(err.Error())
c.logger.Error("error creating instance from replacement pod",
zap.String("pod", newPod.Name),
zap.Error(err))
return err
}

reason := fmt.Sprintf("replacing %s pod in placement", newPod.Name)
_, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason)
_, err = c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason)
if err != nil {
err := fmt.Errorf("error setting replacement pod bootstrapping status: %v", err)
c.logger.Error(err.Error())
c.logger.Error("error setting replacement pod bootstrapping status",
zap.Error(err))
return err
}

err = c.adminClient.placementClientForCluster(cluster).Replace(leavingInstanceID, *newInst)
if err != nil {
err := fmt.Errorf("error replacing pod in placement: %s", leavingInstanceID)
c.logger.Error(err.Error())
c.logger.Error("error replacing pod in placement",
zap.Error(err))
return err
}

Expand All @@ -461,7 +468,6 @@ func (c *M3DBController) replacePodInPlacement(
// be added to the placement and chooses a pod to expand to the placement.
func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet,
group myspec.IsolationGroup, placement placement.Placement) error {

existInsts := instancesInIsoGroup(placement, group.Name)
if len(existInsts) >= int(group.NumInstances) {
c.logger.Warn("not expanding set, already at desired capacity",
Expand All @@ -483,6 +489,7 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set
return err
}

podsToAdd := make([]*corev1.Pod, 0, len(pods))
for _, pod := range pods {
id, err := c.podIDProvider.Identity(pod, cluster)
if err != nil {
Expand All @@ -494,11 +501,14 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set
}
_, ok := placement.Instance(idStr)
if !ok {
return c.addPodToPlacement(cluster, pod)
podsToAdd = append(podsToAdd, pod)
}
}
if len(podsToAdd) == 0 {
return errors.New("could not find pod absent from placement")
}

return errors.New("could not find pod absent from placement")
return c.addPodsToPlacement(cluster, podsToAdd)
}

// shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and
Expand Down
Loading

0 comments on commit 66671e8

Please sign in to comment.