From cb81012280e258f9f78aaabc5a7b844894297d7f Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 25 Feb 2021 10:51:28 -0500 Subject: [PATCH 1/4] [controller] add support for adding multiple instances to a placement at once --- .gitignore | 3 + pkg/apis/m3dboperator/v1alpha1/cluster.go | 10 +-- pkg/controller/m3admin_client.go | 2 +- pkg/controller/m3admin_client_test.go | 2 +- pkg/controller/update_cluster.go | 62 +++++++++++------- pkg/controller/update_cluster_test.go | 79 ++++++++++++----------- pkg/m3admin/placement/client.go | 6 +- pkg/m3admin/placement/client_mock.go | 8 +-- pkg/m3admin/placement/types.go | 2 +- 9 files changed, 98 insertions(+), 76 deletions(-) diff --git a/.gitignore b/.gitignore index 24687f1c..99e4a61b 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,6 @@ linux-amd64/ # Helm packages helm/**/*.tgz + +# Vim swap files +*.swp diff --git a/pkg/apis/m3dboperator/v1alpha1/cluster.go b/pkg/apis/m3dboperator/v1alpha1/cluster.go index 9b4478f7..6f129096 100644 --- a/pkg/apis/m3dboperator/v1alpha1/cluster.go +++ b/pkg/apis/m3dboperator/v1alpha1/cluster.go @@ -41,8 +41,8 @@ const ( // been created for the cluster. ClusterConditionPlacementInitialized ClusterConditionType = "PlacementInitialized" - // ClusterConditionPodBootstrapping indicates there is a pod bootstrapping. - ClusterConditionPodBootstrapping ClusterConditionType = "PodBootstrapping" + // ClusterConditionPodsBootstrapping indicates there are pods bootstrapping. + ClusterConditionPodsBootstrapping ClusterConditionType = "PodsBootstrapping" ) // M3DBCluster defines the cluster @@ -103,10 +103,10 @@ func (s *M3DBStatus) HasInitializedPlacement() bool { return s.hasConditionTrue(ClusterConditionPlacementInitialized) } -// HasPodBootstrapping returns true if conditions indicate a pod is currently +// HasPodsBootstrapping returns true if conditions indicate a pod is currently // bootstrapping. -func (s *M3DBStatus) HasPodBootstrapping() bool { - return s.hasConditionTrue(ClusterConditionPodBootstrapping) +func (s *M3DBStatus) HasPodsBootstrapping() bool { + return s.hasConditionTrue(ClusterConditionPodsBootstrapping) } // GetCondition returns the specified cluster condition if it exists with a bool diff --git a/pkg/controller/m3admin_client.go b/pkg/controller/m3admin_client.go index 4bbed62e..80a9fcc7 100644 --- a/pkg/controller/m3admin_client.go +++ b/pkg/controller/m3admin_client.go @@ -267,7 +267,7 @@ func (c errorPlacementClient) Delete() error { return c.err } -func (c errorPlacementClient) Add(placementpb.Instance) error { +func (c errorPlacementClient) Add([]*placementpb.Instance) error { return c.err } diff --git a/pkg/controller/m3admin_client_test.go b/pkg/controller/m3admin_client_test.go index f1b19a7d..b7c5e1ed 100644 --- a/pkg/controller/m3admin_client_test.go +++ b/pkg/controller/m3admin_client_test.go @@ -237,7 +237,7 @@ func TestErrorPlacementClient(t *testing.T) { err = cl.Delete() assert.Equal(t, clErr, err) - err = cl.Add(placementpb.Instance{}) + err = cl.Add([]*placementpb.Instance{&placementpb.Instance{}}) assert.Equal(t, clErr, err) err = cl.Remove("foo") diff --git a/pkg/controller/update_cluster.go b/pkg/controller/update_cluster.go index 730b38e9..b2f7a45f 100644 --- a/pkg/controller/update_cluster.go +++ b/pkg/controller/update_cluster.go @@ -21,6 +21,7 @@ package controller import ( + "bytes" "encoding/json" "errors" "fmt" @@ -38,12 +39,14 @@ import ( "github.com/m3db/m3db-operator/pkg/m3admin/namespace" "github.com/m3db/m3db-operator/pkg/util/eventer" + "github.com/m3db/m3/src/cluster/generated/proto/placementpb" "github.com/m3db/m3/src/cluster/placement" dbns "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/query/generated/proto/admin" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" klabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" @@ -306,16 +309,14 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster) return cluster, nil } -func (c *M3DBController) setStatusPodBootstrapping(cluster *myspec.M3DBCluster, +func (c *M3DBController) setStatusPodsBootstrapping(cluster *myspec.M3DBCluster, status corev1.ConditionStatus, reason, message string) (*myspec.M3DBCluster, error) { - - return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, status, reason, message) + return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, status, reason, message) } func (c *M3DBController) setStatus(cluster *myspec.M3DBCluster, condition myspec.ClusterConditionType, status corev1.ConditionStatus, reason, message string) (*myspec.M3DBCluster, error) { - cond, ok := cluster.Status.GetCondition(condition) if !ok { cond = myspec.ClusterCondition{ @@ -355,35 +356,45 @@ func (c *M3DBController) reconcileBootstrappingStatus(cluster *myspec.M3DBCluste } } - return c.setStatus(cluster, myspec.ClusterConditionPodBootstrapping, corev1.ConditionFalse, + return c.setStatus(cluster, myspec.ClusterConditionPodsBootstrapping, corev1.ConditionFalse, "BootstrapComplete", "no bootstraps in progress") } -func (c *M3DBController) addPodToPlacement(cluster *myspec.M3DBCluster, pod *corev1.Pod) error { - c.logger.Info("found pod not in placement", zap.String("pod", pod.Name)) - inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider) - if err != nil { - err := fmt.Errorf("error creating instance for pod %s", pod.Name) - c.logger.Error(err.Error()) - return err +func (c *M3DBController) addPodsToPlacement(cluster *myspec.M3DBCluster, pods []*corev1.Pod) error { + var ( + instances = make([]*placementpb.Instance, 0, len(pods)) + reasonBuf = bytes.NewBufferString("adding pods to placement (") + loggerFields = make([]zap.Field, 0, len(pods)) + ) + for _, pod := range pods { + c.logger.Info("found pod not in placement", zap.String("pod", pod.Name)) + inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider) + if err != nil { + err := fmt.Errorf("error creating instance for pod %s", pod.Name) + c.logger.Error(err.Error()) + return err + } + instances = append(instances, inst) + reasonBuf.WriteString(fmt.Sprintf("%s,", pod.Name)) + loggerFields = append(loggerFields, zap.String("pod", pod.Name)) } - - reason := fmt.Sprintf("adding pod %s to placement", pod.Name) - _, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason) + reasonBuf.WriteString(")") + reason := reasonBuf.String() + _, err := c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason) if err != nil { - err := fmt.Errorf("error setting pod bootstrapping status: %v", err) + err := fmt.Errorf("error setting pods bootstrapping status: %v", err) c.logger.Error(err.Error()) return err } - err = c.adminClient.placementClientForCluster(cluster).Add(*inst) + err = c.adminClient.placementClientForCluster(cluster).Add(instances) if err != nil { - err := fmt.Errorf("error adding pod to placement: %s", pod.Name) + err := fmt.Errorf("error: %s", reason) c.logger.Error(err.Error()) return err } - c.logger.Info("added pod to placement", zap.String("pod", pod.Name)) + c.logger.Info("added pods to placement", loggerFields...) return nil } @@ -391,7 +402,6 @@ func (c *M3DBController) checkPodsForReplacement( cluster *myspec.M3DBCluster, pods []*corev1.Pod, pl placement.Placement) (string, *corev1.Pod, error) { - insts := pl.Instances() sort.Sort(placement.ByIDAscending(insts)) @@ -429,7 +439,6 @@ func (c *M3DBController) replacePodInPlacement( pl placement.Placement, leavingInstanceID string, newPod *corev1.Pod) error { - c.logger.Info("replacing pod in placement", zap.String("pod", leavingInstanceID)) newInst, err := m3db.PlacementInstanceFromPod(cluster, newPod, c.podIDProvider) @@ -440,7 +449,7 @@ func (c *M3DBController) replacePodInPlacement( } reason := fmt.Sprintf("replacing %s pod in placement", newPod.Name) - _, err = c.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason) + _, err = c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason) if err != nil { err := fmt.Errorf("error setting replacement pod bootstrapping status: %v", err) c.logger.Error(err.Error()) @@ -461,7 +470,6 @@ func (c *M3DBController) replacePodInPlacement( // be added to the placement and chooses a pod to expand to the placement. func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, group myspec.IsolationGroup, placement placement.Placement) error { - existInsts := instancesInIsoGroup(placement, group.Name) if len(existInsts) >= int(group.NumInstances) { c.logger.Warn("not expanding set, already at desired capacity", @@ -483,6 +491,7 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set return err } + podsToAdd := make([]*v1.Pod, 0, len(pods)) for _, pod := range pods { id, err := c.podIDProvider.Identity(pod, cluster) if err != nil { @@ -494,11 +503,14 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set } _, ok := placement.Instance(idStr) if !ok { - return c.addPodToPlacement(cluster, pod) + podsToAdd = append(podsToAdd, pod) } } + if len(podsToAdd) == 0 { + return errors.New("could not find pod absent from placement") + } - return errors.New("could not find pod absent from placement") + return c.addPodsToPlacement(cluster, podsToAdd) } // shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and diff --git a/pkg/controller/update_cluster_test.go b/pkg/controller/update_cluster_test.go index ced9ed4e..a7435828 100644 --- a/pkg/controller/update_cluster_test.go +++ b/pkg/controller/update_cluster_test.go @@ -365,7 +365,7 @@ func TestNamespacesToReady(t *testing.T) { func TestSetPodBootstrappingStatus(t *testing.T) { cluster := getFixture("cluster-simple.yaml", t) - assert.False(t, cluster.Status.HasPodBootstrapping()) + assert.False(t, cluster.Status.HasPodsBootstrapping()) deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, @@ -373,10 +373,10 @@ func TestSetPodBootstrappingStatus(t *testing.T) { controller := deps.newController(t) defer deps.cleanup() - cluster, err := controller.setStatusPodBootstrapping(cluster, corev1.ConditionTrue, "foo", "bar") + cluster, err := controller.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "foo", "bar") assert.NoError(t, err) - assert.True(t, cluster.Status.HasPodBootstrapping()) + assert.True(t, cluster.Status.HasPodsBootstrapping()) } func TestSetStatus(t *testing.T) { @@ -426,7 +426,7 @@ func TestReconcileBootstrappingStatus(t *testing.T) { controller := deps.newController(t) defer deps.cleanup() - const cond = myspec.ClusterConditionPodBootstrapping + const cond = myspec.ClusterConditionPodsBootstrapping newPl := func(state shard.State) placement.Placement { return placement.NewPlacement().SetInstances([]placement.Instance{ @@ -452,7 +452,7 @@ func TestReconcileBootstrappingStatus(t *testing.T) { assert.Equal(t, string(corev1.ConditionFalse), string(c.Status)) } -func TestAddPodToPlacement(t *testing.T) { +func TestAddPodsToPlacement(t *testing.T) { cluster := getFixture("cluster-simple.yaml", t) deps := newTestDeps(t, &testOpts{ @@ -461,36 +461,40 @@ func TestAddPodToPlacement(t *testing.T) { controller := deps.newController(t) defer deps.cleanup() - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-a", - Labels: map[string]string{ - "operator.m3db.io/isolation-group": "zone-a", + var ( + pods []*corev1.Pod + placements []*placementpb.Instance + ) + for _, name := range []string{"pod-a1", "pod-a2"} { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "operator.m3db.io/isolation-group": "zone-a", + }, }, - }, - } - - deps.idProvider.EXPECT().Identity(pod, cluster).Return(&myspec.PodIdentity{}, nil) - - expInstance := placementpb.Instance{ - Id: "{}", - IsolationGroup: "zone-a", - Zone: "embedded", - Endpoint: "pod-a.m3dbnode-cluster-simple:9000", - Hostname: "pod-a.m3dbnode-cluster-simple", - Port: 9000, - Weight: 100, + } + deps.idProvider.EXPECT().Identity(pod, cluster).Return(&myspec.PodIdentity{}, nil) + pods = append(pods, pod) + placements = append(placements, &placementpb.Instance{ + Id: "{}", + IsolationGroup: "zone-a", + Zone: "embedded", + Endpoint: fmt.Sprintf("%s.m3dbnode-cluster-simple:9000", name), + Hostname: fmt.Sprintf("%s.m3dbnode-cluster-simple", name), + Port: 9000, + Weight: 100, + }) } + deps.placementClient.EXPECT().Add(placements) - deps.placementClient.EXPECT().Add(expInstance) - - err := controller.addPodToPlacement(cluster, pod) + err := controller.addPodsToPlacement(cluster, pods) assert.NoError(t, err) cluster, err = controller.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{}) assert.NoError(t, err) - assert.True(t, cluster.Status.HasPodBootstrapping()) + assert.True(t, cluster.Status.HasPodsBootstrapping()) } func podsForClusterSet(cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, numPods int) []*corev1.Pod { @@ -599,19 +603,23 @@ func TestExpandPlacementForSet(t *testing.T) { identifyPods(idProvider, pods, nil) - pl := placementFromPods(t, cluster, pods[:2], idProvider) + pl := placementFromPods(t, cluster, pods[:1], idProvider) group := cluster.Spec.IsolationGroups[0] - instPb, err := m3db.PlacementInstanceFromPod(cluster, pods[2], idProvider) - require.NoError(t, err) + var placements []*placementpb.Instance + for _, pod := range pods[1:] { + instPb, err := m3db.PlacementInstanceFromPod(cluster, pod, idProvider) + require.NoError(t, err) + placements = append(placements, instPb) + } - placementMock.EXPECT().Add(*instPb) + placementMock.EXPECT().Add(placements) err = controller.expandPlacementForSet(cluster, set, group, pl) assert.NoError(t, err) cluster, err = deps.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{}) require.NoError(t, err) - assert.True(t, cluster.Status.HasPodBootstrapping()) + assert.True(t, cluster.Status.HasPodsBootstrapping()) } func TestExpandPlacementForSet_Nop(t *testing.T) { @@ -853,7 +861,6 @@ func TestSortPods(t *testing.T) { } func TestCheckPodsForReplacement(t *testing.T) { - cluster := getFixture("cluster-3-zones.yaml", t) deps := newTestDeps(t, &testOpts{ crdObjects: []runtime.Object{cluster}, @@ -888,7 +895,8 @@ func TestCheckPodsForReplacement(t *testing.T) { Labels: map[string]string{ "different": "label", }, - }}) + }, + }) identifyPods(idProvider, replacePods, nil) @@ -950,7 +958,6 @@ func TestReplacePodInPlacement(t *testing.T) { err = controller.replacePodInPlacement(cluster, pl, testLeavingInstanceID, testNewPod) require.NoError(t, err) - } func TestReplacePodInPlacementWithError(t *testing.T) { @@ -972,7 +979,7 @@ func TestReplacePodInPlacementWithError(t *testing.T) { pl := placementFromPods(t, cluster, podsForPlacement, idProvider) // error creating instance - var badPod = &corev1.Pod{ + badPod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podsForPlacement[0].Name, UID: types.UID("ABC"), diff --git a/pkg/m3admin/placement/client.go b/pkg/m3admin/placement/client.go index c1d494bc..ea70bff5 100644 --- a/pkg/m3admin/placement/client.go +++ b/pkg/m3admin/placement/client.go @@ -114,11 +114,11 @@ func (p *placementClient) Get() (m3placement.Placement, error) { return m3placement.NewPlacementFromProto(resp.Placement) } -// Add will add an instance to the current placement -func (p *placementClient) Add(instance placementpb.Instance) error { +// Add multiple instances to the current placement. +func (p *placementClient) Add(instances []*placementpb.Instance) error { url := p.url + placementBaseURL req := &admin.PlacementAddRequest{ - Instances: []*placementpb.Instance{&instance}, + Instances: instances, } err := p.client.DoHTTPJSONPBRequest(http.MethodPost, url, req, nil) diff --git a/pkg/m3admin/placement/client_mock.go b/pkg/m3admin/placement/client_mock.go index 0817780a..794c6172 100644 --- a/pkg/m3admin/placement/client_mock.go +++ b/pkg/m3admin/placement/client_mock.go @@ -115,17 +115,17 @@ func (mr *MockClientMockRecorder) Delete() *gomock.Call { } // Add mocks base method -func (m *MockClient) Add(instance placementpb.Instance) error { +func (m *MockClient) Add(instances []*placementpb.Instance) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Add", instance) + ret := m.ctrl.Call(m, "Add", instances) ret0, _ := ret[0].(error) return ret0 } // Add indicates an expected call of Add -func (mr *MockClientMockRecorder) Add(instance interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) Add(instances interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockClient)(nil).Add), instance) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockClient)(nil).Add), instances) } // Remove mocks base method diff --git a/pkg/m3admin/placement/types.go b/pkg/m3admin/placement/types.go index 10b50e50..113168b7 100644 --- a/pkg/m3admin/placement/types.go +++ b/pkg/m3admin/placement/types.go @@ -37,7 +37,7 @@ type Client interface { // Delete will delete the current placment Delete() error // Add will add an instance to the placement - Add(instance placementpb.Instance) error + Add(instances []*placementpb.Instance) error // Remove removes a given instance with the given ID from the placement. Remove(id string) error // Replace replaces one instance with another. From 75ae345bf04588c232d17b17ce85b00d3a609432 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 25 Feb 2021 11:01:53 -0500 Subject: [PATCH 2/4] [tests] fix tests --- pkg/apis/m3dboperator/v1alpha1/cluster_test.go | 4 ++-- pkg/m3admin/placement/client_test.go | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/apis/m3dboperator/v1alpha1/cluster_test.go b/pkg/apis/m3dboperator/v1alpha1/cluster_test.go index cdbe7d93..3d5e98f7 100644 --- a/pkg/apis/m3dboperator/v1alpha1/cluster_test.go +++ b/pkg/apis/m3dboperator/v1alpha1/cluster_test.go @@ -39,8 +39,8 @@ func TestStatus(t *testing.T) { f: func(s *M3DBStatus) bool { return s.HasInitializedPlacement() }, }, { - cond: ClusterConditionPodBootstrapping, - f: func(s *M3DBStatus) bool { return s.HasPodBootstrapping() }, + cond: ClusterConditionPodsBootstrapping, + f: func(s *M3DBStatus) bool { return s.HasPodsBootstrapping() }, }, } { t.Run(string(test.cond), func(t *testing.T) { diff --git a/pkg/m3admin/placement/client_test.go b/pkg/m3admin/placement/client_test.go index a485a84d..016eac00 100644 --- a/pkg/m3admin/placement/client_test.go +++ b/pkg/m3admin/placement/client_test.go @@ -90,7 +90,7 @@ func TestAdd(t *testing.T) { return } - const expected = `{"instances":[{"id":"a"}]}` + const expected = `{"instances":[{"id":"a"},{"id":"b"}]}` assert.Equal(t, expected, string(bytes)) w.WriteHeader(200) @@ -100,7 +100,10 @@ func TestAdd(t *testing.T) { defer s.Close() client := newPlacementClient(t, s.URL) - err := client.Add(placementpb.Instance{Id: "a"}) + err := client.Add([]*placementpb.Instance{ + {Id: "a"}, + {Id: "b"}, + }) require.Nil(t, err) } @@ -113,7 +116,7 @@ func TestAddErr(t *testing.T) { defer s.Close() client := newPlacementClient(t, s.URL) - err := client.Add(placementpb.Instance{}) + err := client.Add([]*placementpb.Instance{}) require.NotNil(t, err) } From 589eca437caf3700ee527cad710332f329152776 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Tue, 2 Mar 2021 12:09:53 -0500 Subject: [PATCH 3/4] fix lint --- pkg/controller/m3admin_client_test.go | 18 ++++++++---------- pkg/controller/update_cluster.go | 2 +- pkg/controller/update_cluster_test.go | 10 ++++++---- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/controller/m3admin_client_test.go b/pkg/controller/m3admin_client_test.go index b7c5e1ed..fb95aac6 100644 --- a/pkg/controller/m3admin_client_test.go +++ b/pkg/controller/m3admin_client_test.go @@ -24,20 +24,18 @@ import ( "errors" "testing" + "github.com/golang/mock/gomock" + "github.com/m3db/m3/src/cluster/generated/proto/placementpb" + "github.com/m3db/m3/src/msg/generated/proto/topicpb" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + myspec "github.com/m3db/m3db-operator/pkg/apis/m3dboperator/v1alpha1" "github.com/m3db/m3db-operator/pkg/m3admin" "github.com/m3db/m3db-operator/pkg/m3admin/namespace" "github.com/m3db/m3db-operator/pkg/m3admin/placement" "github.com/m3db/m3db-operator/pkg/m3admin/topic" - - "github.com/m3db/m3/src/cluster/generated/proto/placementpb" - "github.com/m3db/m3/src/msg/generated/proto/topicpb" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" ) func newTestAdminClient(cl m3admin.Client, url string) *multiAdminClient { @@ -237,7 +235,7 @@ func TestErrorPlacementClient(t *testing.T) { err = cl.Delete() assert.Equal(t, clErr, err) - err = cl.Add([]*placementpb.Instance{&placementpb.Instance{}}) + err = cl.Add([]*placementpb.Instance{{}}) assert.Equal(t, clErr, err) err = cl.Remove("foo") diff --git a/pkg/controller/update_cluster.go b/pkg/controller/update_cluster.go index b2f7a45f..288bca04 100644 --- a/pkg/controller/update_cluster.go +++ b/pkg/controller/update_cluster.go @@ -382,7 +382,7 @@ func (c *M3DBController) addPodsToPlacement(cluster *myspec.M3DBCluster, pods [] reason := reasonBuf.String() _, err := c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason) if err != nil { - err := fmt.Errorf("error setting pods bootstrapping status: %v", err) + err := fmt.Errorf("error setting pods bootstrapping status: %w", err) c.logger.Error(err.Error()) return err } diff --git a/pkg/controller/update_cluster_test.go b/pkg/controller/update_cluster_test.go index a7435828..f36951df 100644 --- a/pkg/controller/update_cluster_test.go +++ b/pkg/controller/update_cluster_test.go @@ -452,6 +452,7 @@ func TestReconcileBootstrappingStatus(t *testing.T) { assert.Equal(t, string(corev1.ConditionFalse), string(c.Status)) } +// nolint: paralleltest func TestAddPodsToPlacement(t *testing.T) { cluster := getFixture("cluster-simple.yaml", t) @@ -462,10 +463,11 @@ func TestAddPodsToPlacement(t *testing.T) { defer deps.cleanup() var ( - pods []*corev1.Pod - placements []*placementpb.Instance + podNames = []string{"pod-a1", "pod-a2"} + pods = make([]*corev1.Pod, 0, len(podNames)) + placements = make([]*placementpb.Instance, 0, len(podNames)) ) - for _, name := range []string{"pod-a1", "pod-a2"} { + for _, name := range podNames { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -606,7 +608,7 @@ func TestExpandPlacementForSet(t *testing.T) { pl := placementFromPods(t, cluster, pods[:1], idProvider) group := cluster.Spec.IsolationGroups[0] - var placements []*placementpb.Instance + placements := make([]*placementpb.Instance, 0, len(pods[1:])) for _, pod := range pods[1:] { instPb, err := m3db.PlacementInstanceFromPod(cluster, pod, idProvider) require.NoError(t, err) From 9447ac043b1efc9d929c55c3e1223c9126da090d Mon Sep 17 00:00:00 2001 From: Bo Du Date: Tue, 2 Mar 2021 18:02:28 -0500 Subject: [PATCH 4/4] Address PR feedback. --- pkg/controller/update_cluster.go | 50 +++++++++++++++----------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/pkg/controller/update_cluster.go b/pkg/controller/update_cluster.go index 288bca04..3f69edfb 100644 --- a/pkg/controller/update_cluster.go +++ b/pkg/controller/update_cluster.go @@ -21,7 +21,6 @@ package controller import ( - "bytes" "encoding/json" "errors" "fmt" @@ -46,7 +45,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" klabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" @@ -103,7 +101,7 @@ func (c *M3DBController) createNamespaces(cluster *myspec.M3DBCluster, registry zap.String("namespace", ns.Name), zap.Error(err)) - return fmt.Errorf("error creating namespace '%s': %v", ns.Name, err) + return fmt.Errorf("error creating namespace '%s': %w", ns.Name, err) } c.recorder.NormalEvent(cluster, eventer.ReasonCreating, "created namespace "+ns.Name) @@ -298,8 +296,8 @@ func (c *M3DBController) setStatusPlacementCreated(cluster *myspec.M3DBCluster) var err error cluster, err = c.crdClient.OperatorV1alpha1().M3DBClusters(cluster.Namespace).UpdateStatus(cluster) if err != nil { - err := fmt.Errorf("error updating cluster placement init status: %v", err) - c.logger.Error(err.Error()) + c.logger.Error("error updating cluster placement init status", + zap.Error(err)) c.recorder.WarningEvent(cluster, eventer.ReasonFailSync, "failed to update placement status: %v", err) return nil, err } @@ -362,39 +360,38 @@ func (c *M3DBController) reconcileBootstrappingStatus(cluster *myspec.M3DBCluste func (c *M3DBController) addPodsToPlacement(cluster *myspec.M3DBCluster, pods []*corev1.Pod) error { var ( - instances = make([]*placementpb.Instance, 0, len(pods)) - reasonBuf = bytes.NewBufferString("adding pods to placement (") - loggerFields = make([]zap.Field, 0, len(pods)) + instances = make([]*placementpb.Instance, 0, len(pods)) + podNames = make([]string, 0, len(pods)) ) for _, pod := range pods { c.logger.Info("found pod not in placement", zap.String("pod", pod.Name)) inst, err := m3db.PlacementInstanceFromPod(cluster, pod, c.podIDProvider) if err != nil { - err := fmt.Errorf("error creating instance for pod %s", pod.Name) - c.logger.Error(err.Error()) + c.logger.Error("error creating instance for pod", + zap.String("pod", pod.Name), + zap.Error(err)) return err } instances = append(instances, inst) - reasonBuf.WriteString(fmt.Sprintf("%s,", pod.Name)) - loggerFields = append(loggerFields, zap.String("pod", pod.Name)) + podNames = append(podNames, pod.Name) } - reasonBuf.WriteString(")") - reason := reasonBuf.String() + reason := fmt.Sprintf("adding pods to placement (%s)", strings.Join(podNames, ", ")) _, err := c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodAdded", reason) if err != nil { - err := fmt.Errorf("error setting pods bootstrapping status: %w", err) - c.logger.Error(err.Error()) + c.logger.Error("error setting pods bootstrapping status", + zap.Error(err)) return err } err = c.adminClient.placementClientForCluster(cluster).Add(instances) if err != nil { - err := fmt.Errorf("error: %s", reason) - c.logger.Error(err.Error()) + c.logger.Error("error adding instances to placement", + zap.String("reason", reason), + zap.Error(err)) return err } - c.logger.Info("added pods to placement", loggerFields...) + c.logger.Info("added pods to placement", zap.Strings("pods", podNames)) return nil } @@ -443,23 +440,24 @@ func (c *M3DBController) replacePodInPlacement( newInst, err := m3db.PlacementInstanceFromPod(cluster, newPod, c.podIDProvider) if err != nil { - err := fmt.Errorf("error creating instance from replacement pod %s: %v", newPod.Name, err) - c.logger.Error(err.Error()) + c.logger.Error("error creating instance from replacement pod", + zap.String("pod", newPod.Name), + zap.Error(err)) return err } reason := fmt.Sprintf("replacing %s pod in placement", newPod.Name) _, err = c.setStatusPodsBootstrapping(cluster, corev1.ConditionTrue, "PodReplaced", reason) if err != nil { - err := fmt.Errorf("error setting replacement pod bootstrapping status: %v", err) - c.logger.Error(err.Error()) + c.logger.Error("error setting replacement pod bootstrapping status", + zap.Error(err)) return err } err = c.adminClient.placementClientForCluster(cluster).Replace(leavingInstanceID, *newInst) if err != nil { - err := fmt.Errorf("error replacing pod in placement: %s", leavingInstanceID) - c.logger.Error(err.Error()) + c.logger.Error("error replacing pod in placement", + zap.Error(err)) return err } @@ -491,7 +489,7 @@ func (c *M3DBController) expandPlacementForSet(cluster *myspec.M3DBCluster, set return err } - podsToAdd := make([]*v1.Pod, 0, len(pods)) + podsToAdd := make([]*corev1.Pod, 0, len(pods)) for _, pod := range pods { id, err := c.podIDProvider.Identity(pod, cluster) if err != nil {