Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Support multi instance placement add #275

Merged
merged 4 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ linux-amd64/

# Helm packages
helm/**/*.tgz

# Vim swap files
*.swp
10 changes: 5 additions & 5 deletions pkg/apis/m3dboperator/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const (
// been created for the cluster.
ClusterConditionPlacementInitialized ClusterConditionType = "PlacementInitialized"

// ClusterConditionPodBootstrapping indicates there is a pod bootstrapping.
ClusterConditionPodBootstrapping ClusterConditionType = "PodBootstrapping"
// ClusterConditionPodsBootstrapping indicates there are pods bootstrapping.
ClusterConditionPodsBootstrapping ClusterConditionType = "PodsBootstrapping"
)

// M3DBCluster defines the cluster
Expand Down Expand Up @@ -103,10 +103,10 @@ func (s *M3DBStatus) HasInitializedPlacement() bool {
return s.hasConditionTrue(ClusterConditionPlacementInitialized)
}

// HasPodBootstrapping returns true if conditions indicate a pod is currently
// HasPodsBootstrapping returns true if conditions indicate a pod is currently
// bootstrapping.
func (s *M3DBStatus) HasPodBootstrapping() bool {
return s.hasConditionTrue(ClusterConditionPodBootstrapping)
func (s *M3DBStatus) HasPodsBootstrapping() bool {
return s.hasConditionTrue(ClusterConditionPodsBootstrapping)
}

// GetCondition returns the specified cluster condition if it exists with a bool
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/m3dboperator/v1alpha1/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestStatus(t *testing.T) {
f: func(s *M3DBStatus) bool { return s.HasInitializedPlacement() },
},
{
cond: ClusterConditionPodBootstrapping,
f: func(s *M3DBStatus) bool { return s.HasPodBootstrapping() },
cond: ClusterConditionPodsBootstrapping,
f: func(s *M3DBStatus) bool { return s.HasPodsBootstrapping() },
},
} {
t.Run(string(test.cond), func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/m3admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (c errorPlacementClient) Delete() error {
return c.err
}

func (c errorPlacementClient) Add(placementpb.Instance) error {
func (c errorPlacementClient) Add([]*placementpb.Instance) error {
return c.err
}

Expand Down
18 changes: 8 additions & 10 deletions pkg/controller/m3admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,18 @@ import (
"errors"
"testing"

"github.com/golang/mock/gomock"
"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

myspec "github.com/m3db/m3db-operator/pkg/apis/m3dboperator/v1alpha1"
"github.com/m3db/m3db-operator/pkg/m3admin"
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/m3admin/placement"
"github.com/m3db/m3db-operator/pkg/m3admin/topic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func newTestAdminClient(cl m3admin.Client, url string) *multiAdminClient {
Expand Down Expand Up @@ -237,7 +235,7 @@ func TestErrorPlacementClient(t *testing.T) {
err = cl.Delete()
assert.Equal(t, clErr, err)

err = cl.Add(placementpb.Instance{})
err = cl.Add([]*placementpb.Instance{{}})
assert.Equal(t, clErr, err)

err = cl.Remove("foo")
Expand Down
82 changes: 46 additions & 36 deletions pkg/controller/update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/util/eventer"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/placement"
dbns "github.com/m3db/m3/src/dbnode/generated/proto/namespace"
"github.com/m3db/m3/src/query/generated/proto/admin"
Expand Down Expand Up @@ -100,7 +101,7 @@ func (c *M3DBController) createNamespaces(cluster *myspec.M3DBCluster, registry
zap.String("namespace", ns.Name),
zap.Error(err))

return fmt.Errorf("error creating namespace '%s': %v", ns.Name, err)
return fmt.Errorf("error creating namespace '%s': %w", ns.Name, err)
}

c.recorder.NormalEvent(cluster, eventer.ReasonCreating, "created namespace "+ns.Name)
Expand Down Expand Up @@ -295,8 +296,8 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster)
var err error
cluster, err = c.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).UpdateStatus(cluster)
if err != nil {
err := fmt.Errorf("error updating cluster placement init status: %v", err)
c.logger.Error(err.Error())
c.logger.Error("error updating cluster placement init status",
zap.Error(err))
c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, "failed to update placement status: %v", err)
return nil, err
}
Expand All @@ -306,16 +307,14 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster)
return cluster, nil
}

func (c *M3DBController) setStatusPodBootstrapping(cluster *myspec.M3DBCluster,
func (c *M3DBController) setStatusPodsBootstrapping(cluster *myspec.M3DBCluster,
status corev1.ConditionStatus,
reason, message string) (*myspec.M3DBCluster, error) {

return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, status, reason, message)
return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, status, reason, message)
}

func (c *M3DBController) setStatus(cluster *myspec.M3DBCluster, condition myspec.ClusterConditionType,
status corev1.ConditionStatus, reason, message string) (*myspec.M3DBCluster, error) {

cond, ok := cluster.Status.GetCondition(condition)
if !ok {
cond = myspec.ClusterCondition{
Expand Down Expand Up @@ -355,43 +354,51 @@ func (c *M3DBController) reconcileBootstrappingStatus(cluster *myspec.M3DBCluste
}
}

return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, corev1.ConditionFalse,
return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, corev1.ConditionFalse,
"BootstrapComplete", "no bootstraps in progress")
}

func (c *M3DBController) addPodToPlacement(cluster *myspec.M3DBCluster, pod *corev1.Pod) error {
c.logger.Info("found pod not in placement", zap.String("pod", pod.Name))
inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider)
if err != nil {
err := fmt.Errorf("error creating instance for pod %s", pod.Name)
c.logger.Error(err.Error())
return err
func (c *M3DBController) addPodsToPlacement(cluster *myspec.M3DBCluster, pods []*corev1.Pod) error {
var (
instances = make([]*placementpb.Instance, 0, len(pods))
podNames = make([]string, 0, len(pods))
)
for _, pod := range pods {
c.logger.Info("found pod not in placement", zap.String("pod", pod.Name))
inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider)
if err != nil {
c.logger.Error("error creating instance for pod",
zap.String("pod", pod.Name),
zap.Error(err))
return err
}
instances = append(instances, inst)
podNames = append(podNames, pod.Name)
}

reason := fmt.Sprintf("adding pod %s to placement", pod.Name)
_, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason)
reason := fmt.Sprintf("adding pods to placement (%s)", strings.Join(podNames, ", "))
_, err := c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason)
if err != nil {
err := fmt.Errorf("error setting pod bootstrapping status: %v", err)
c.logger.Error(err.Error())
c.logger.Error("error setting pods bootstrapping status",
zap.Error(err))
return err
}

err = c.adminClient.placementClientForCluster(cluster).Add(*inst)
err = c.adminClient.placementClientForCluster(cluster).Add(instances)
if err != nil {
err := fmt.Errorf("error adding pod to placement: %s", pod.Name)
c.logger.Error(err.Error())
c.logger.Error("error adding instances to placement",
zap.String("reason", reason),
zap.Error(err))
return err
}

c.logger.Info("added pod to placement", zap.String("pod", pod.Name))
c.logger.Info("added pods to placement", zap.Strings("pods", podNames))
return nil
}

func (c *M3DBController) checkPodsForReplacement(
cluster *myspec.M3DBCluster,
pods []*corev1.Pod,
pl placement.Placement) (string, *corev1.Pod, error) {

insts := pl.Instances()
sort.Sort(placement.ByIDAscending(insts))

Expand Down Expand Up @@ -429,28 +436,28 @@ func (c *M3DBController) replacePodInPlacement(
pl placement.Placement,
leavingInstanceID string,
newPod *corev1.Pod) error {

c.logger.Info("replacing pod in placement", zap.String("pod", leavingInstanceID))

newInst, err := m3db.PlacementInstanceFromPod(cluster, newPod, c.podIDProvider)
if err != nil {
err := fmt.Errorf("error creating instance from replacement pod %s: %v", newPod.Name, err)
c.logger.Error(err.Error())
c.logger.Error("error creating instance from replacement pod",
zap.String("pod", newPod.Name),
zap.Error(err))
return err
}

reason := fmt.Sprintf("replacing %s pod in placement", newPod.Name)
_, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason)
_, err = c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason)
if err != nil {
err := fmt.Errorf("error setting replacement pod bootstrapping status: %v", err)
c.logger.Error(err.Error())
c.logger.Error("error setting replacement pod bootstrapping status",
zap.Error(err))
return err
}

err = c.adminClient.placementClientForCluster(cluster).Replace(leavingInstanceID, *newInst)
if err != nil {
err := fmt.Errorf("error replacing pod in placement: %s", leavingInstanceID)
c.logger.Error(err.Error())
c.logger.Error("error replacing pod in placement",
zap.Error(err))
return err
}

Expand All @@ -461,7 +468,6 @@ func (c *M3DBController) replacePodInPlacement(
// be added to the placement and chooses a pod to expand to the placement.
func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet,
group myspec.IsolationGroup, placement placement.Placement) error {

existInsts := instancesInIsoGroup(placement, group.Name)
if len(existInsts) >= int(group.NumInstances) {
c.logger.Warn("not expanding set, already at desired capacity",
Expand All @@ -483,6 +489,7 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set
return err
}

podsToAdd := make([]*corev1.Pod, 0, len(pods))
for _, pod := range pods {
id, err := c.podIDProvider.Identity(pod, cluster)
if err != nil {
Expand All @@ -494,11 +501,14 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set
}
_, ok := placement.Instance(idStr)
if !ok {
return c.addPodToPlacement(cluster, pod)
podsToAdd = append(podsToAdd, pod)
}
}
if len(podsToAdd) == 0 {
return errors.New("could not find pod absent from placement")
}

return errors.New("could not find pod absent from placement")
return c.addPodsToPlacement(cluster, podsToAdd)
}

// shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and
Expand Down
Loading