Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove records of deleted pods from failure stores automatically and add an e2e test #2560

Merged
merged 3 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions pkg/manager/member/tiflash_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
)

// TODO reuse tikvFailover since we share the same logic
type tiflashFailover struct {
tiflashFailoverPeriod time.Duration
recorder record.EventRecorder
Expand All @@ -34,6 +36,16 @@ func NewTiFlashFailover(tiflashFailoverPeriod time.Duration, recorder record.Eve
return &tiflashFailover{tiflashFailoverPeriod, recorder}
}

func (tff *tiflashFailover) isPodDesired(tc *v1alpha1.TidbCluster, podName string) bool {
ordinals := tc.TiFlashStsDesiredOrdinals(true)
ordinal, err := util.GetOrdinalFromPodName(podName)
if err != nil {
klog.Errorf("unexpected pod name %q: %v", podName, err)
return false
}
return ordinals.Has(ordinal)
}

func (tff *tiflashFailover) Failover(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
Expand All @@ -43,6 +55,12 @@ func (tff *tiflashFailover) Failover(tc *v1alpha1.TidbCluster) error {
if store.LastTransitionTime.IsZero() {
continue
}
if !tff.isPodDesired(tc, podName) {
// we should ignore the store record of deleted pod, otherwise the
// record of deleted pod may be added back to failure stores
// (before it enters into Offline/Tombstone state)
continue
}
deadline := store.LastTransitionTime.Add(tff.tiflashFailoverPeriod)
exist := false
for _, failureStore := range tc.Status.TiFlash.FailureStores {
Expand Down Expand Up @@ -74,8 +92,15 @@ func (tff *tiflashFailover) Failover(tc *v1alpha1.TidbCluster) error {
return nil
}

func (tff *tiflashFailover) Recover(_ *v1alpha1.TidbCluster) {
// Do nothing now
func (tff *tiflashFailover) Recover(tc *v1alpha1.TidbCluster) {
for key, failureStore := range tc.Status.TiFlash.FailureStores {
if !tff.isPodDesired(tc, failureStore.PodName) {
// If we delete the pods, e.g. by using advanced statefulset delete
// slots feature. We should remove the record of undesired pods,
// otherwise an extra replacement pod will be created.
delete(tc.Status.TiFlash.FailureStores, key)
}
}
}

type fakeTiFlashFailover struct{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/manager/member/tiflash_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ func (tfmm *tiflashMemberManager) syncStatefulSet(tc *v1alpha1.TidbCluster) erro
return err
}

// Recover failed stores if any before generating desired statefulset
if len(tc.Status.TiFlash.FailureStores) > 0 {
tfmm.tiflashFailover.Recover(tc)
}

newSet, err := getNewStatefulSet(tc, cm)
if err != nil {
return err
Expand Down
28 changes: 26 additions & 2 deletions pkg/manager/member/tikv_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -34,6 +35,16 @@ func NewTiKVFailover(tikvFailoverPeriod time.Duration, recorder record.EventReco
return &tikvFailover{tikvFailoverPeriod, recorder}
}

func (tf *tikvFailover) isPodDesired(tc *v1alpha1.TidbCluster, podName string) bool {
ordinals := tc.TiKVStsDesiredOrdinals(true)
ordinal, err := util.GetOrdinalFromPodName(podName)
if err != nil {
klog.Errorf("unexpected pod name %q: %v", podName, err)
return false
}
return ordinals.Has(ordinal)
}

func (tf *tikvFailover) Failover(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
Expand All @@ -43,6 +54,12 @@ func (tf *tikvFailover) Failover(tc *v1alpha1.TidbCluster) error {
if store.LastTransitionTime.IsZero() {
continue
}
if !tf.isPodDesired(tc, podName) {
// we should ignore the store record of deleted pod, otherwise the
// record of deleted pod may be added back to failure stores
// (before it enters into Offline/Tombstone state)
continue
}
deadline := store.LastTransitionTime.Add(tf.tikvFailoverPeriod)
exist := false
for _, failureStore := range tc.Status.TiKV.FailureStores {
Expand Down Expand Up @@ -75,8 +92,15 @@ func (tf *tikvFailover) Failover(tc *v1alpha1.TidbCluster) error {
return nil
}

func (tf *tikvFailover) Recover(_ *v1alpha1.TidbCluster) {
// Do nothing now
func (tf *tikvFailover) Recover(tc *v1alpha1.TidbCluster) {
for key, failureStore := range tc.Status.TiKV.FailureStores {
if !tf.isPodDesired(tc, failureStore.PodName) {
// If we delete the pods, e.g. by using advanced statefulset delete
// slots feature. We should remove the record of undesired pods,
// otherwise an extra replacement pod will be created.
delete(tc.Status.TiKV.FailureStores, key)
}
}
}

type fakeTiKVFailover struct{}
Expand Down
110 changes: 53 additions & 57 deletions pkg/manager/member/tikv_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,12 @@ import (
)

func TestTiKVFailoverFailover(t *testing.T) {
g := NewGomegaWithT(t)

type testcase struct {
tests := []struct {
name string
update func(*v1alpha1.TidbCluster)
err bool
expectFn func(*v1alpha1.TidbCluster)
}
testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
tc := newTidbClusterForPD()
tc.Spec.TiKV.MaxFailoverCount = pointer.Int32Ptr(3)
test.update(tc)
tikvFailover := newFakeTiKVFailover()

err := tikvFailover.Failover(tc)
if test.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
test.expectFn(tc)
}

tests := []testcase{
expectFn func(t *testing.T, tc *v1alpha1.TidbCluster)
}{
{
name: "normal",
update: func(tc *v1alpha1.TidbCluster) {
Expand All @@ -67,8 +48,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(2))
},
},
Expand All @@ -80,8 +61,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(0))
},
},
Expand All @@ -97,8 +78,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(0))
},
},
Expand All @@ -113,8 +94,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(0))
},
},
Expand All @@ -136,8 +117,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(1))
},
},
Expand All @@ -147,17 +128,17 @@ func TestTiKVFailoverFailover(t *testing.T) {
tc.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{
"3": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-3",
PodName: "tikv-0",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"10": {
"4": {
State: v1alpha1.TiKVStateUp,
PodName: "tikv-10",
PodName: "tikv-4",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"11": {
"5": {
State: v1alpha1.TiKVStateUp,
PodName: "tikv-11",
PodName: "tikv-5",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-61 * time.Minute)},
},
}
Expand All @@ -173,8 +154,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(3))
},
},
Expand All @@ -187,14 +168,14 @@ func TestTiKVFailoverFailover(t *testing.T) {
PodName: "tikv-3",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"10": {
"4": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-10",
PodName: "tikv-4",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"11": {
"5": {
State: v1alpha1.TiKVStateUp,
PodName: "tikv-11",
PodName: "tikv-5",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-61 * time.Minute)},
},
}
Expand All @@ -210,28 +191,28 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(3))
},
},
{
name: "exceed max failover count2",
update: func(tc *v1alpha1.TidbCluster) {
tc.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{
"12": {
"0": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-12",
PodName: "tikv-0",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"13": {
"4": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-13",
PodName: "tikv-4",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-61 * time.Minute)},
},
"14": {
"5": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-14",
PodName: "tikv-5",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
}
Expand All @@ -251,8 +232,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(3))
},
},
Expand Down Expand Up @@ -293,14 +274,29 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(3))
},
},
}
for i := range tests {
testFn(&tests[i], t)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewGomegaWithT(t)
tc := newTidbClusterForPD()
tc.Spec.TiKV.Replicas = 6
tc.Spec.TiKV.MaxFailoverCount = pointer.Int32Ptr(3)
tt.update(tc)
tikvFailover := newFakeTiKVFailover()

err := tikvFailover.Failover(tc)
if tt.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
tt.expectFn(t, tc)
})
}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ func (tkmm *tikvMemberManager) syncStatefulSetForTidbCluster(tc *v1alpha1.TidbCl
return err
}

// Recover failed stores if any before generating desired statefulset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also try to recover for TiFlash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if len(tc.Status.TiKV.FailureStores) > 0 {
tkmm.tikvFailover.Recover(tc)
}

newSet, err := getNewTiKVSetForTidbCluster(tc, cm)
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/tidbcluster/tidbcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,9 @@ func filterOutCondition(conditions []v1alpha1.TidbClusterCondition, condType v1a
}
return newConditions
}

// GetTidbClusterReadyCondition extracts the tidbcluster ready condition from the given status and returns that.
// Returns nil if the condition is not present.
func GetTidbClusterReadyCondition(status v1alpha1.TidbClusterStatus) *v1alpha1.TidbClusterCondition {
return GetTidbClusterCondition(status, v1alpha1.TidbClusterReady)
}
Loading