Skip to content

Commit

Permalink
Automated cherry pick of #2013: add spec.paused field to pause the ti…
Browse files Browse the repository at this point in the history
…db cluster syncing (#2064)

* add spec.paused field to pause the tidb cluster syncing

* support auto-failover when paused is enabled

* Revert "support auto-failover when paused is enabled"

This reverts commit 346f94e.

* fix
  • Loading branch information
cofyc authored Mar 30, 2020
1 parent 83965a5 commit f6937cb
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 17 deletions.
26 changes: 26 additions & 0 deletions docs/api-references/docs.html
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,19 @@ <h3 id="pingcap.com/v1alpha1.TidbCluster">TidbCluster
</tr>
<tr>
<td>
<code>paused</code></br>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>Indicates that the tidb cluster is paused and will not be processed by
the controller.</p>
</td>
</tr>
<tr>
<td>
<code>version</code></br>
<em>
string
Expand Down Expand Up @@ -12114,6 +12127,19 @@ <h3 id="pingcap.com/v1alpha1.TidbClusterSpec">TidbClusterSpec
</tr>
<tr>
<td>
<code>paused</code></br>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>Indicates that the tidb cluster is paused and will not be processed by
the controller.</p>
</td>
</tr>
<tr>
<td>
<code>version</code></br>
<em>
string
Expand Down
4 changes: 4 additions & 0 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ spec:
description: Base node selectors of TiDB cluster Pods, components may
add or override selectors upon this respectively
type: object
paused:
description: Indicates that the tidb cluster is paused and will not
be processed by the controller.
type: boolean
pd:
description: PDSpec contains details of PD members
properties:
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/pingcap/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ type TidbClusterSpec struct {
// +optional
Helper *HelperSpec `json:"helper,omitempty"`

// Indicates that the tidb cluster is paused and will not be processed by
// the controller.
// +optional
Paused bool `json:"paused,omitempty"`

// TODO: remove optional after defaulting logic introduced
// TiDB cluster version
// +optional
Expand Down
29 changes: 25 additions & 4 deletions pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ func (pmm *pdMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
}

func (pmm *pdMemberManager) syncPDServiceForTidbCluster(tc *v1alpha1.TidbCluster) error {
if tc.Spec.Paused {
klog.V(4).Infof("tidb cluster %s/%s is paused, skip syncing for pd service", tc.GetNamespace(), tc.GetName())
return nil
}

ns := tc.GetNamespace()
tcName := tc.GetName()

Expand Down Expand Up @@ -148,6 +153,11 @@ func (pmm *pdMemberManager) syncPDServiceForTidbCluster(tc *v1alpha1.TidbCluster
}

func (pmm *pdMemberManager) syncPDHeadlessServiceForTidbCluster(tc *v1alpha1.TidbCluster) error {
if tc.Spec.Paused {
klog.V(4).Infof("tidb cluster %s/%s is paused, skip syncing for pd headless service", tc.GetNamespace(), tc.GetName())
return nil
}

ns := tc.GetNamespace()
tcName := tc.GetName()

Expand Down Expand Up @@ -193,6 +203,16 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu
setNotExist := errors.IsNotFound(err)

oldPDSet := oldPDSetTmp.DeepCopy()

if err := pmm.syncTidbClusterStatus(tc, oldPDSet); err != nil {
klog.Errorf("failed to sync TidbCluster: [%s/%s]'s status, error: %v", ns, tcName, err)
}

if tc.Spec.Paused {
klog.V(4).Infof("tidb cluster %s/%s is paused, skip syncing for pd statefulset", tc.GetNamespace(), tc.GetName())
return nil
}

cm, err := pmm.syncPDConfigMap(tc, oldPDSet)
if err != nil {
return err
Expand All @@ -213,10 +233,6 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu
return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for PD cluster running", ns, tcName)
}

if err := pmm.syncTidbClusterStatus(tc, oldPDSet); err != nil {
klog.Errorf("failed to sync TidbCluster: [%s/%s]'s status, error: %v", ns, tcName, err)
}

if !tc.Status.PD.Synced {
force := NeedForceUpgrade(tc)
if force {
Expand Down Expand Up @@ -251,6 +267,11 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu
}

func (pmm *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *apps.StatefulSet) error {
if set == nil {
// skip if not created yet
return nil
}

ns := tc.GetNamespace()
tcName := tc.GetName()

Expand Down
23 changes: 18 additions & 5 deletions pkg/manager/member/pump_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ func (pmm *pumpMemberManager) syncPumpStatefulSetForTidbCluster(tc *v1alpha1.Tid
notFound := errors.IsNotFound(err)
oldPumpSet := oldPumpSetTemp.DeepCopy()

if err := pmm.syncTiDBClusterStatus(tc, oldPumpSet); err != nil {
klog.Errorf("failed to sync TidbCluster: [%s/%s]'s status, error: %v", tc.Namespace, tc.Name, err)
return err
}

if tc.Spec.Paused {
klog.V(4).Infof("tikv cluster %s/%s is paused, skip syncing for pump statefulset", tc.GetNamespace(), tc.GetName())
return nil
}

cm, err := pmm.syncConfigMap(tc, oldPumpSet)
if err != nil {
return err
Expand All @@ -112,15 +122,14 @@ func (pmm *pumpMemberManager) syncPumpStatefulSetForTidbCluster(tc *v1alpha1.Tid
return pmm.setControl.CreateStatefulSet(tc, newPumpSet)
}

if err := pmm.syncTiDBClusterStatus(tc, oldPumpSet); err != nil {
klog.Errorf("failed to sync TidbCluster: [%s/%s]'s status, error: %v", tc.Namespace, tc.Name, err)
return err
}

return updateStatefulSet(pmm.setControl, tc, newPumpSet, oldPumpSet)
}

func (pmm *pumpMemberManager) syncTiDBClusterStatus(tc *v1alpha1.TidbCluster, set *apps.StatefulSet) error {
if set == nil {
// skip if not created yet
return nil
}

tc.Status.Pump.StatefulSet = &set.Status

Expand All @@ -142,6 +151,10 @@ func (pmm *pumpMemberManager) syncTiDBClusterStatus(tc *v1alpha1.TidbCluster, se
}

func (pmm *pumpMemberManager) syncHeadlessService(tc *v1alpha1.TidbCluster) error {
if tc.Spec.Paused {
klog.V(4).Infof("tikv cluster %s/%s is paused, skip syncing for pump headless service", tc.GetNamespace(), tc.GetName())
return nil
}

newSvc := getNewPumpHeadlessService(tc)
oldSvc, err := pmm.svcLister.Services(newSvc.Namespace).Get(newSvc.Name)
Expand Down
28 changes: 24 additions & 4 deletions pkg/manager/member/tidb_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
v1 "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -122,6 +123,11 @@ func (tmm *tidbMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
}

func (tmm *tidbMemberManager) syncTiDBHeadlessServiceForTidbCluster(tc *v1alpha1.TidbCluster) error {
if tc.Spec.Paused {
klog.V(4).Infof("tidb cluster %s/%s is paused, skip syncing for tidb headless service", tc.GetNamespace(), tc.GetName())
return nil
}

ns := tc.GetNamespace()
tcName := tc.GetName()

Expand Down Expand Up @@ -169,6 +175,15 @@ func (tmm *tidbMemberManager) syncTiDBStatefulSetForTidbCluster(tc *v1alpha1.Tid
setNotExist := errors.IsNotFound(err)

oldTiDBSet := oldTiDBSetTemp.DeepCopy()
if err = tmm.syncTidbClusterStatus(tc, oldTiDBSet); err != nil {
return err
}

if tc.Spec.Paused {
klog.V(4).Infof("tidb cluster %s/%s is paused, skip syncing for tidb statefulset", tc.GetNamespace(), tc.GetName())
return nil
}

cm, err := tmm.syncTiDBConfigMap(tc, oldTiDBSet)
if err != nil {
return err
Expand All @@ -188,10 +203,6 @@ func (tmm *tidbMemberManager) syncTiDBStatefulSetForTidbCluster(tc *v1alpha1.Tid
return nil
}

if err = tmm.syncTidbClusterStatus(tc, oldTiDBSet); err != nil {
return err
}

if !templateEqual(newTiDBSet, oldTiDBSet) || tc.Status.TiDB.Phase == v1alpha1.UpgradePhase {
if err := tmm.tidbUpgrader.Upgrade(tc, oldTiDBSet, newTiDBSet); err != nil {
return err
Expand All @@ -215,6 +226,10 @@ func (tmm *tidbMemberManager) syncTiDBStatefulSetForTidbCluster(tc *v1alpha1.Tid
}

func (tmm *tidbMemberManager) syncTiDBService(tc *v1alpha1.TidbCluster) error {
if tc.Spec.Paused {
klog.V(4).Infof("tidb cluster %s/%s is paused, skip syncing for tidb service", tc.GetNamespace(), tc.GetName())
return nil
}

newSvc := getNewTiDBServiceOrNil(tc)
// TODO: delete tidb service if user remove the service spec deliberately
Expand Down Expand Up @@ -701,6 +716,11 @@ func getNewTiDBSetForTidbCluster(tc *v1alpha1.TidbCluster, cm *corev1.ConfigMap)
}

func (tmm *tidbMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *apps.StatefulSet) error {
if set == nil {
// skip if not created yet
return nil
}

tc.Status.TiDB.StatefulSet = &set.Status

upgrading, err := tmm.tidbStatefulSetIsUpgradingFn(tmm.podLister, set, tc)
Expand Down
23 changes: 19 additions & 4 deletions pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func (tkmm *tikvMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
}

func (tkmm *tikvMemberManager) syncServiceForTidbCluster(tc *v1alpha1.TidbCluster, svcConfig SvcConfig) error {
if tc.Spec.Paused {
klog.V(4).Infof("tikv cluster %s/%s is paused, skip syncing for tikv service", tc.GetNamespace(), tc.GetName())
return nil
}

ns := tc.GetNamespace()
tcName := tc.GetName()

Expand Down Expand Up @@ -183,6 +188,16 @@ func (tkmm *tikvMemberManager) syncStatefulSetForTidbCluster(tc *v1alpha1.TidbCl
setNotExist := errors.IsNotFound(err)

oldSet := oldSetTmp.DeepCopy()

if err := tkmm.syncTidbClusterStatus(tc, oldSet); err != nil {
return err
}

if tc.Spec.Paused {
klog.V(4).Infof("tikv cluster %s/%s is paused, skip syncing for tikv statefulset", tc.GetNamespace(), tc.GetName())
return nil
}

cm, err := tkmm.syncTiKVConfigMap(tc, oldSet)
if err != nil {
return err
Expand All @@ -205,10 +220,6 @@ func (tkmm *tikvMemberManager) syncStatefulSetForTidbCluster(tc *v1alpha1.TidbCl
return nil
}

if err := tkmm.syncTidbClusterStatus(tc, oldSet); err != nil {
return err
}

if _, err := tkmm.setStoreLabelsForTiKV(tc); err != nil {
return err
}
Expand Down Expand Up @@ -554,6 +565,10 @@ func labelTiKV(tc *v1alpha1.TidbCluster) label.Label {
}

func (tkmm *tikvMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *apps.StatefulSet) error {
if set == nil {
// skip if not created yet
return nil
}
tc.Status.TiKV.StatefulSet = &set.Status
upgrading, err := tkmm.tikvStatefulSetIsUpgradingFn(tkmm.podLister, tkmm.pdControl, set, tc)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions tests/e2e/tidbcluster/tidbcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb-operator/tests/apiserver"
e2econfig "github.com/pingcap/tidb-operator/tests/e2e/config"
utilimage "github.com/pingcap/tidb-operator/tests/e2e/util/image"
utilpod "github.com/pingcap/tidb-operator/tests/e2e/util/pod"
"github.com/pingcap/tidb-operator/tests/e2e/util/portforward"
"github.com/pingcap/tidb-operator/tests/pkg/apimachinery"
"github.com/pingcap/tidb-operator/tests/pkg/blockwriter"
Expand All @@ -46,8 +47,10 @@ import (
v1 "k8s.io/api/core/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -863,6 +866,67 @@ var _ = ginkgo.Describe("[tidb-operator] TiDBCluster", func() {
framework.ExpectNoError(err)
})

ginkgo.It("TiDB cluster can be paused and unpaused", func() {
tcName := "paused"
tc := fixture.GetTidbCluster(ns, tcName, utilimage.TiDBV3Version)
tc.Spec.PD.Replicas = 1
tc.Spec.TiKV.Replicas = 1
tc.Spec.TiDB.Replicas = 1
err := genericCli.Create(context.TODO(), tc)
framework.ExpectNoError(err)
err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 15*time.Second)
framework.ExpectNoError(err)

podListBeforePaused, err := c.CoreV1().Pods(ns).List(metav1.ListOptions{})
framework.ExpectNoError(err)

ginkgo.By("Pause the tidb cluster")
err = controller.GuaranteedUpdate(genericCli, tc, func() error {
tc.Spec.Paused = true
return nil
})
framework.ExpectNoError(err)
ginkgo.By("Make a change")
err = controller.GuaranteedUpdate(genericCli, tc, func() error {
tc.Spec.Version = utilimage.TiDBV3UpgradeVersion
return nil
})
framework.ExpectNoError(err)

ginkgo.By("Check pods are not changed when the tidb cluster is paused")
err = utilpod.WaitForPodsAreChanged(c, podListBeforePaused.Items, time.Minute*5)
framework.ExpectEqual(err, wait.ErrWaitTimeout, "Pods are changed when the tidb cluster is paused")

ginkgo.By("Unpause the tidb cluster")
err = controller.GuaranteedUpdate(genericCli, tc, func() error {
tc.Spec.Paused = false
return nil
})
framework.ExpectNoError(err)

ginkgo.By("Check the tidb cluster will be upgraded now")
listOptions := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(label.New().Instance(tcName).Component(label.TiKVLabelVal).Labels()).String(),
}
err = wait.PollImmediate(5*time.Second, 15*time.Minute, func() (bool, error) {
podList, err := c.CoreV1().Pods(ns).List(listOptions)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
for _, pod := range podList.Items {
for _, c := range pod.Spec.Containers {
if c.Name == v1alpha1.TiKVMemberType.String() {
if c.Image == tc.TiKVImage() {
return true, nil
}
}
}
}
return false, nil
})
framework.ExpectNoError(err)
})

ginkgo.It("tidb-scale: clear TiDB failureMembers when scale TiDB to zero", func() {
cluster := newTidbClusterConfig(e2econfig.TestConfig, ns, "tidb-scale", "admin", "")
cluster.Resources["pd.replicas"] = "3"
Expand Down

0 comments on commit f6937cb

Please sign in to comment.