diff --git a/.gitignore b/.gitignore index 24687f1c..99e4a61b 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,6 @@ linux-amd64/ # Helm packages helm/**/*.tgz + +# Vim swap files +*.swp diff --git a/pkg/apis/m3dboperator/v1alpha1/cluster.go b/pkg/apis/m3dboperator/v1alpha1/cluster.go index 9b4478f7..6f129096 100644 --- a/pkg/apis/m3dboperator/v1alpha1/cluster.go +++ b/pkg/apis/m3dboperator/v1alpha1/cluster.go @@ -41,8 +41,8 @@ const ( // been created for the cluster. ClusterConditionPlacementInitialized ClusterConditionType = "PlacementInitialized" - // ClusterConditionPodBootstrapping indicates there is a pod bootstrapping. - ClusterConditionPodBootstrapping ClusterConditionType = "PodBootstrapping" + // ClusterConditionPodsBootstrapping indicates there are pods bootstrapping. + ClusterConditionPodsBootstrapping ClusterConditionType = "PodsBootstrapping" ) // M3DBCluster defines the cluster @@ -103,10 +103,10 @@ func (s *M3DBStatus) HasInitializedPlacement() bool { return s.hasConditionTrue(ClusterConditionPlacementInitialized) } -// HasPodBootstrapping returns true if conditions indicate a pod is currently +// HasPodsBootstrapping returns true if conditions indicate a pod is currently // bootstrapping. -func (s *M3DBStatus) HasPodBootstrapping() bool { - return s.hasConditionTrue(ClusterConditionPodBootstrapping) +func (s *M3DBStatus) HasPodsBootstrapping() bool { + return s.hasConditionTrue(ClusterConditionPodsBootstrapping) } // GetCondition returns the specified cluster condition if it exists with a bool diff --git a/pkg/apis/m3dboperator/v1alpha1/cluster_test.go b/pkg/apis/m3dboperator/v1alpha1/cluster_test.go index cdbe7d93..3d5e98f7 100644 --- a/pkg/apis/m3dboperator/v1alpha1/cluster_test.go +++ b/pkg/apis/m3dboperator/v1alpha1/cluster_test.go @@ -39,8 +39,8 @@ func TestStatus(t *testing.T) { f: func(s *M3DBStatus) bool { return s.HasInitializedPlacement() }, }, { - cond: ClusterConditionPodBootstrapping, - f: func(s *M3DBStatus) bool { return s.HasPodBootstrapping() }, + cond: ClusterConditionPodsBootstrapping, + f: func(s *M3DBStatus) bool { return s.HasPodsBootstrapping() }, }, } { t.Run(string(test.cond), func(t *testing.T) { diff --git a/pkg/controller/m3admin_client.go b/pkg/controller/m3admin_client.go index 4bbed62e..80a9fcc7 100644 --- a/pkg/controller/m3admin_client.go +++ b/pkg/controller/m3admin_client.go @@ -267,7 +267,7 @@ func (c errorPlacementClient) Delete() error { return c.err } -func (c errorPlacementClient) Add(placementpb.Instance) error { +func (c errorPlacementClient) Add([]*placementpb.Instance) error { return c.err } diff --git a/pkg/controller/m3admin_client_test.go b/pkg/controller/m3admin_client_test.go index f1b19a7d..fb95aac6 100644 --- a/pkg/controller/m3admin_client_test.go +++ b/pkg/controller/m3admin_client_test.go @@ -24,20 +24,18 @@ import ( "errors" "testing" + "github.com/golang/mock/gomock" + "github.com/m3db/m3/src/cluster/generated/proto/placementpb" + "github.com/m3db/m3/src/msg/generated/proto/topicpb" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + myspec "github.com/m3db/m3db-operator/pkg/apis/m3dboperator/v1alpha1" "github.com/m3db/m3db-operator/pkg/m3admin" "github.com/m3db/m3db-operator/pkg/m3admin/namespace" "github.com/m3db/m3db-operator/pkg/m3admin/placement" "github.com/m3db/m3db-operator/pkg/m3admin/topic" - - "github.com/m3db/m3/src/cluster/generated/proto/placementpb" - "github.com/m3db/m3/src/msg/generated/proto/topicpb" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" ) func newTestAdminClient(cl m3admin.Client, url string) *multiAdminClient { @@ -237,7 +235,7 @@ func TestErrorPlacementClient(t *testing.T) { err = cl.Delete() assert.Equal(t, clErr, err) - err = cl.Add(placementpb.Instance{}) + err = cl.Add([]*placementpb.Instance{{}}) assert.Equal(t, clErr, err) err = cl.Remove("foo") diff --git a/pkg/controller/update_cluster.go b/pkg/controller/update_cluster.go index 730b38e9..3f69edfb 100644 --- a/pkg/controller/update_cluster.go +++ b/pkg/controller/update_cluster.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3db-operator/pkg/m3admin/namespace" "github.com/m3db/m3db-operator/pkg/util/eventer" + "github.com/m3db/m3/src/cluster/generated/proto/placementpb" "github.com/m3db/m3/src/cluster/placement" dbns "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/query/generated/proto/admin" @@ -100,7 +101,7 @@ func (c *M3DBController) createNamespaces(cluster *myspec.M3DBCluster, registry zap.String("namespace", ns.Name), zap.Error(err)) - return fmt.Errorf("error creating namespace '%s': %v", ns.Name, err) + return fmt.Errorf("error creating namespace '%s': %w", ns.Name, err) } c.recorder.NormalEvent(cluster, eventer.ReasonCreating, "created namespace "+ns.Name) @@ -295,8 +296,8 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster) var err error cluster, err = c.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).UpdateStatus(cluster) if err != nil { - err := fmt.Errorf("error updating cluster placement init status: %v", err) - c.logger.Error(err.Error()) + c.logger.Error("error updating cluster placement init status", + zap.Error(err)) c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, "failed to update placement status: %v", err) return nil, err } @@ -306,16 +307,14 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster) return cluster, nil } -func (c *M3DBController) setStatusPodBootstrapping(cluster *myspec.M3DBCluster, +func (c *M3DBController) setStatusPodsBootstrapping(cluster *myspec.M3DBCluster, status corev1.ConditionStatus, reason, message string) (*myspec.M3DBCluster, error) { - - return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, status, reason, message) + return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, status, reason, message) } func (c *M3DBController) setStatus(cluster *myspec.M3DBCluster, condition myspec.ClusterConditionType, status corev1.ConditionStatus, reason, message string) (*myspec.M3DBCluster, error) { - cond, ok := cluster.Status.GetCondition(condition) if !ok { cond = myspec.ClusterCondition{ @@ -355,35 +354,44 @@ func (c *M3DBController) reconcileBootstrappingStatus(cluster *myspec.M3DBCluste } } - return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, corev1.ConditionFalse, + return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, corev1.ConditionFalse, "BootstrapComplete", "no bootstraps in progress") } -func (c *M3DBController) addPodToPlacement(cluster *myspec.M3DBCluster, pod *corev1.Pod) error { - c.logger.Info("found pod not in placement", zap.String("pod", pod.Name)) - inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider) - if err != nil { - err := fmt.Errorf("error creating instance for pod %s", pod.Name) - c.logger.Error(err.Error()) - return err +func (c *M3DBController) addPodsToPlacement(cluster *myspec.M3DBCluster, pods []*corev1.Pod) error { + var ( + instances = make([]*placementpb.Instance, 0, len(pods)) + podNames = make([]string, 0, len(pods)) + ) + for _, pod := range pods { + c.logger.Info("found pod not in placement", zap.String("pod", pod.Name)) + inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider) + if err != nil { + c.logger.Error("error creating instance for pod", + zap.String("pod", pod.Name), + zap.Error(err)) + return err + } + instances = append(instances, inst) + podNames = append(podNames, pod.Name) } - - reason := fmt.Sprintf("adding pod %s to placement", pod.Name) - _, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason) + reason := fmt.Sprintf("adding pods to placement (%s)", strings.Join(podNames, ", ")) + _, err := c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason) if err != nil { - err := fmt.Errorf("error setting pod bootstrapping status: %v", err) - c.logger.Error(err.Error()) + c.logger.Error("error setting pods bootstrapping status", + zap.Error(err)) return err } - err = c.adminClient.placementClientForCluster(cluster).Add(*inst) + err = c.adminClient.placementClientForCluster(cluster).Add(instances) if err != nil { - err := fmt.Errorf("error adding pod to placement: %s", pod.Name) - c.logger.Error(err.Error()) + c.logger.Error("error adding instances to placement", + zap.String("reason", reason), + zap.Error(err)) return err } - c.logger.Info("added pod to placement", zap.String("pod", pod.Name)) + c.logger.Info("added pods to placement", zap.Strings("pods", podNames)) return nil } @@ -391,7 +399,6 @@ func (c *M3DBController) checkPodsForReplacement( cluster *myspec.M3DBCluster, pods []*corev1.Pod, pl placement.Placement) (string, *corev1.Pod, error) { - insts := pl.Instances() sort.Sort(placement.ByIDAscending(insts)) @@ -429,28 +436,28 @@ func (c *M3DBController) replacePodInPlacement( pl placement.Placement, leavingInstanceID string, newPod *corev1.Pod) error { - c.logger.Info("replacing pod in placement", zap.String("pod", leavingInstanceID)) newInst, err := m3db.PlacementInstanceFromPod(cluster, newPod, c.podIDProvider) if err != nil { - err := fmt.Errorf("error creating instance from replacement pod %s: %v", newPod.Name, err) - c.logger.Error(err.Error()) + c.logger.Error("error creating instance from replacement pod", + zap.String("pod", newPod.Name), + zap.Error(err)) return err } reason := fmt.Sprintf("replacing %s pod in placement", newPod.Name) - _, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason) + _, err = c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason) if err != nil { - err := fmt.Errorf("error setting replacement pod bootstrapping status: %v", err) - c.logger.Error(err.Error()) + c.logger.Error("error setting replacement pod bootstrapping status", + zap.Error(err)) return err } err = c.adminClient.placementClientForCluster(cluster).Replace(leavingInstanceID, *newInst) if err != nil { - err := fmt.Errorf("error replacing pod in placement: %s", leavingInstanceID) - c.logger.Error(err.Error()) + c.logger.Error("error replacing pod in placement", + zap.Error(err)) return err } @@ -461,7 +468,6 @@ func (c *M3DBController) replacePodInPlacement( // be added to the placement and chooses a pod to expand to the placement. func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, group myspec.IsolationGroup, placement placement.Placement) error { - existInsts := instancesInIsoGroup(placement, group.Name) if len(existInsts) >= int(group.NumInstances) { c.logger.Warn("not expanding set, already at desired capacity", @@ -483,6 +489,7 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set return err } + podsToAdd := make([]*corev1.Pod, 0, len(pods)) for _, pod := range pods { id, err := c.podIDProvider.Identity(pod, cluster) if err != nil { @@ -494,11 +501,14 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set } _, ok := placement.Instance(idStr) if !ok { - return c.addPodToPlacement(cluster, pod) + podsToAdd = append(podsToAdd, pod) } } + if len(podsToAdd) == 0 { + return errors.New("could not find pod absent from placement") + } - return errors.New("could not find pod absent from placement") + return c.addPodsToPlacement(cluster, podsToAdd) } // shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and diff --git a/pkg/controller/update_cluster_test.go b/pkg/controller/update_cluster_test.go index ced9ed4e..f36951df 100644 --- a/pkg/controller/update_cluster_test.go +++ b/pkg/controller/update_cluster_test.go @@ -365,7 +365,7 @@ func TestNamespacesToReady(t *testing.T) { func TestSetPodBootstrappingStatus(t *testing.T) { cluster := getFixture("cluster-simple.yaml", t) - assert.False(t, cluster.Status.HasPodBootstrapping()) + assert.False(t, cluster.Status.HasPodsBootstrapping()) deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, @@ -373,10 +373,10 @@ func TestSetPodBootstrappingStatus(t *testing.T) { controller := deps.newController(t) defer deps.cleanup() - cluster, err := controller.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "foo", "bar") + cluster, err := controller.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "foo", "bar") assert.NoError(t, err) - assert.True(t, cluster.Status.HasPodBootstrapping()) + assert.True(t, cluster.Status.HasPodsBootstrapping()) } func TestSetStatus(t *testing.T) { @@ -426,7 +426,7 @@ func TestReconcileBootstrappingStatus(t *testing.T) { controller := deps.newController(t) defer deps.cleanup() - const cond = myspec.ClusterConditionPodBootstrapping + const cond = myspec.ClusterConditionPodsBootstrapping newPl := func(state shard.State) placement.Placement { return placement.NewPlacement().SetInstances([]placement.Instance{ @@ -452,7 +452,8 @@ func TestReconcileBootstrappingStatus(t *testing.T) { assert.Equal(t, string(corev1.ConditionFalse), string(c.Status)) } -func TestAddPodToPlacement(t *testing.T) { +// nolint: paralleltest +func TestAddPodsToPlacement(t *testing.T) { cluster := getFixture("cluster-simple.yaml", t) deps := newTestDeps(t, &testOpts{ @@ -461,36 +462,41 @@ func TestAddPodToPlacement(t *testing.T) { controller := deps.newController(t) defer deps.cleanup() - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-a", - Labels: map[string]string{ - "operator.m3db.io/isolation-group": "zone-a", + var ( + podNames = []string{"pod-a1", "pod-a2"} + pods = make([]*corev1.Pod, 0, len(podNames)) + placements = make([]*placementpb.Instance, 0, len(podNames)) + ) + for _, name := range podNames { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "operator.m3db.io/isolation-group": "zone-a", + }, }, - }, - } - - deps.idProvider.EXPECT().Identity(pod, cluster).Return(&myspec.PodIdentity{}, nil) - - expInstance := placementpb.Instance{ - Id: "{}", - IsolationGroup: "zone-a", - Zone: "embedded", - Endpoint: "pod-a.m3dbnode-cluster-simple:9000", - Hostname: "pod-a.m3dbnode-cluster-simple", - Port: 9000, - Weight: 100, + } + deps.idProvider.EXPECT().Identity(pod, cluster).Return(&myspec.PodIdentity{}, nil) + pods = append(pods, pod) + placements = append(placements, &placementpb.Instance{ + Id: "{}", + IsolationGroup: "zone-a", + Zone: "embedded", + Endpoint: fmt.Sprintf("%s.m3dbnode-cluster-simple:9000", name), + Hostname: fmt.Sprintf("%s.m3dbnode-cluster-simple", name), + Port: 9000, + Weight: 100, + }) } + deps.placementClient.EXPECT().Add(placements) - deps.placementClient.EXPECT().Add(expInstance) - - err := controller.addPodToPlacement(cluster, pod) + err := controller.addPodsToPlacement(cluster, pods) assert.NoError(t, err) cluster, err = controller.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{}) assert.NoError(t, err) - assert.True(t, cluster.Status.HasPodBootstrapping()) + assert.True(t, cluster.Status.HasPodsBootstrapping()) } func podsForClusterSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, numPods int) []*corev1.Pod { @@ -599,19 +605,23 @@ func TestExpandPlacementForSet(t *testing.T) { identifyPods(idProvider, pods, nil) - pl := placementFromPods(t, cluster, pods[:2], idProvider) + pl := placementFromPods(t, cluster, pods[:1], idProvider) group := cluster.Spec.IsolationGroups[0] - instPb, err := m3db.PlacementInstanceFromPod(cluster, pods[2], idProvider) - require.NoError(t, err) + placements := make([]*placementpb.Instance, 0, len(pods[1:])) + for _, pod := range pods[1:] { + instPb, err := m3db.PlacementInstanceFromPod(cluster, pod, idProvider) + require.NoError(t, err) + placements = append(placements, instPb) + } - placementMock.EXPECT().Add(*instPb) + placementMock.EXPECT().Add(placements) err = controller.expandPlacementForSet(cluster, set, group, pl) assert.NoError(t, err) cluster, err = deps.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{}) require.NoError(t, err) - assert.True(t, cluster.Status.HasPodBootstrapping()) + assert.True(t, cluster.Status.HasPodsBootstrapping()) } func TestExpandPlacementForSet_Nop(t *testing.T) { @@ -853,7 +863,6 @@ func TestSortPods(t *testing.T) { } func TestCheckPodsForReplacement(t *testing.T) { - cluster := getFixture("cluster-3-zones.yaml", t) deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, @@ -888,7 +897,8 @@ func TestCheckPodsForReplacement(t *testing.T) { Labels: map[string]string{ "different": "label", }, - }}) + }, + }) identifyPods(idProvider, replacePods, nil) @@ -950,7 +960,6 @@ func TestReplacePodInPlacement(t *testing.T) { err = controller.replacePodInPlacement(cluster, pl, testLeavingInstanceID, testNewPod) require.NoError(t, err) - } func TestReplacePodInPlacementWithError(t *testing.T) { @@ -972,7 +981,7 @@ func TestReplacePodInPlacementWithError(t *testing.T) { pl := placementFromPods(t, cluster, podsForPlacement, idProvider) // error creating instance - var badPod = &corev1.Pod{ + badPod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podsForPlacement[0].Name, UID: types.UID("ABC"), diff --git a/pkg/m3admin/placement/client.go b/pkg/m3admin/placement/client.go index c1d494bc..ea70bff5 100644 --- a/pkg/m3admin/placement/client.go +++ b/pkg/m3admin/placement/client.go @@ -114,11 +114,11 @@ func (p *placementClient) Get() (m3placement.Placement, error) { return m3placement.NewPlacementFromProto(resp.Placement) } -// Add will add an instance to the current placement -func (p *placementClient) Add(instance placementpb.Instance) error { +// Add multiple instances to the current placement. +func (p *placementClient) Add(instances []*placementpb.Instance) error { url := p.url + placementBaseURL req := &admin.PlacementAddRequest{ - Instances: []*placementpb.Instance{&instance}, + Instances: instances, } err := p.client.DoHTTPJSONPBRequest(http.MethodPost, url, req, nil) diff --git a/pkg/m3admin/placement/client_mock.go b/pkg/m3admin/placement/client_mock.go index 0817780a..794c6172 100644 --- a/pkg/m3admin/placement/client_mock.go +++ b/pkg/m3admin/placement/client_mock.go @@ -115,17 +115,17 @@ func (mr *MockClientMockRecorder) Delete() *gomock.Call { } // Add mocks base method -func (m *MockClient) Add(instance placementpb.Instance) error { +func (m *MockClient) Add(instances []*placementpb.Instance) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Add", instance) + ret := m.ctrl.Call(m, "Add", instances) ret0, _ := ret[0].(error) return ret0 } // Add indicates an expected call of Add -func (mr *MockClientMockRecorder) Add(instance interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) Add(instances interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockClient)(nil).Add), instance) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockClient)(nil).Add), instances) } // Remove mocks base method diff --git a/pkg/m3admin/placement/client_test.go b/pkg/m3admin/placement/client_test.go index a485a84d..016eac00 100644 --- a/pkg/m3admin/placement/client_test.go +++ b/pkg/m3admin/placement/client_test.go @@ -90,7 +90,7 @@ func TestAdd(t *testing.T) { return } - const expected = `{"instances":[{"id":"a"}]}` + const expected = `{"instances":[{"id":"a"},{"id":"b"}]}` assert.Equal(t, expected, string(bytes)) w.WriteHeader(200) @@ -100,7 +100,10 @@ func TestAdd(t *testing.T) { defer s.Close() client := newPlacementClient(t, s.URL) - err := client.Add(placementpb.Instance{Id: "a"}) + err := client.Add([]*placementpb.Instance{ + {Id: "a"}, + {Id: "b"}, + }) require.Nil(t, err) } @@ -113,7 +116,7 @@ func TestAddErr(t *testing.T) { defer s.Close() client := newPlacementClient(t, s.URL) - err := client.Add(placementpb.Instance{}) + err := client.Add([]*placementpb.Instance{}) require.NotNil(t, err) } diff --git a/pkg/m3admin/placement/types.go b/pkg/m3admin/placement/types.go index 10b50e50..113168b7 100644 --- a/pkg/m3admin/placement/types.go +++ b/pkg/m3admin/placement/types.go @@ -37,7 +37,7 @@ type Client interface { // Delete will delete the current placment Delete() error // Add will add an instance to the placement - Add(instance placementpb.Instance) error + Add(instances []*placementpb.Instance) error // Remove removes a given instance with the given ID from the placement. Remove(id string) error // Replace replaces one instance with another.