diff --git a/cmd/controller-manager/main.go b/cmd/controller-manager/main.go index 746026eea4..8144f2d73a 100644 --- a/cmd/controller-manager/main.go +++ b/cmd/controller-manager/main.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/controller/autoscaler" "github.com/pingcap/tidb-operator/pkg/controller/backup" "github.com/pingcap/tidb-operator/pkg/controller/backupschedule" + "github.com/pingcap/tidb-operator/pkg/controller/periodicity" "github.com/pingcap/tidb-operator/pkg/controller/restore" "github.com/pingcap/tidb-operator/pkg/controller/tidbcluster" "github.com/pingcap/tidb-operator/pkg/controller/tidbinitializer" @@ -190,6 +191,12 @@ func main() { bsController := backupschedule.NewController(kubeCli, cli, informerFactory, kubeInformerFactory) tidbInitController := tidbinitializer.NewController(kubeCli, cli, genericCli, informerFactory, kubeInformerFactory) tidbMonitorController := tidbmonitor.NewController(kubeCli, genericCli, informerFactory, kubeInformerFactory) + + var periodicityController *periodicity.Controller + if controller.PodWebhookEnabled { + periodicityController = periodicity.NewController(kubeCli, informerFactory, kubeInformerFactory) + } + var autoScalerController *autoscaler.Controller if features.DefaultFeatureGate.Enabled(features.AutoScaling) { autoScalerController = autoscaler.NewController(kubeCli, cli, informerFactory, kubeInformerFactory) @@ -216,6 +223,9 @@ func main() { go wait.Forever(func() { bsController.Run(workers, ctx.Done()) }, waitDuration) go wait.Forever(func() { tidbInitController.Run(workers, ctx.Done()) }, waitDuration) go wait.Forever(func() { tidbMonitorController.Run(workers, ctx.Done()) }, waitDuration) + if controller.PodWebhookEnabled { + go wait.Forever(func() { periodicityController.Run(ctx.Done()) }, waitDuration) + } if features.DefaultFeatureGate.Enabled(features.AutoScaling) { go wait.Forever(func() { autoScalerController.Run(workers, ctx.Done()) }, waitDuration) } diff --git a/pkg/controller/periodicity/periodicity_controller.go b/pkg/controller/periodicity/periodicity_controller.go new file mode 100644 index 0000000000..a42a29383e --- /dev/null +++ b/pkg/controller/periodicity/periodicity_controller.go @@ -0,0 +1,124 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package periodicity dedicate the periodicity controller. +// This controller updates StatefulSets managed by our operator periodically. +// This is necessary when the pod admission webhook is used. Because we will +// deny pod deletion requests if the pod is not ready for deletion. However, +// retry duration on StatefulSet in its controller grows exponentially on +// failures. So we need to update StatefulSets to trigger events, then they +// will be put into the process queue of StatefulSet controller constantly. +// Refer to https://github.com/pingcap/tidb-operator/pull/1875 and +// https://github.com/pingcap/tidb-operator/issues/1846 for more details. +package periodicity + +import ( + "k8s.io/apimachinery/pkg/util/wait" + "time" + + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions" + v1alpha1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/controller" + "github.com/pingcap/tidb-operator/pkg/label" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/errors" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + eventv1 "k8s.io/client-go/kubernetes/typed/core/v1" + appslisters "k8s.io/client-go/listers/apps/v1" + "k8s.io/client-go/tools/record" + "k8s.io/klog" +) + +type Controller struct { + stsLister appslisters.StatefulSetLister + tcLister v1alpha1listers.TidbClusterLister + statefulSetControl controller.StatefulSetControlInterface +} + +func NewController( + kubeCli kubernetes.Interface, + informerFactory informers.SharedInformerFactory, + kubeInformerFactory kubeinformers.SharedInformerFactory) *Controller { + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{ + Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")}) + recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "periodiciy-controller"}) + stsLister := kubeInformerFactory.Apps().V1().StatefulSets().Lister() + + return &Controller{ + tcLister: informerFactory.Pingcap().V1alpha1().TidbClusters().Lister(), + statefulSetControl: controller.NewRealStatefuSetControl(kubeCli, stsLister, recorder), + stsLister: stsLister, + } + +} + +func (c *Controller) Run(stopCh <-chan struct{}) { + klog.Infof("Staring periodicity controller") + defer klog.Infof("Shutting down periodicity controller") + wait.Until(c.run, time.Minute, stopCh) +} + +func (c *Controller) run() { + var errs []error + if err := c.syncStatefulSetTimeStamp(); err != nil { + errs = append(errs, err) + } + if len(errs) > 0 { + klog.Errorf("error happened in periodicity controller,err:%v", errors.NewAggregate(errs)) + } +} + +// in this sync function, we update all stateful sets the operator managed and log errors +func (c *Controller) syncStatefulSetTimeStamp() error { + selector, err := label.New().Selector() + if err != nil { + return err + } + stsList, err := c.stsLister.List(selector) + if err != nil { + return err + } + var errs []error + for _, sts := range stsList { + // If there is any error during our sts annotation updating, we just collect the error + // and continue to next sts + if sts.Annotations == nil { + sts.Annotations = map[string]string{} + } + if sts.Labels == nil { + sts.Labels = map[string]string{} + } + tcName, ok := sts.Labels[label.InstanceLabelKey] + if !ok { + continue + } + tc, err := c.tcLister.TidbClusters(sts.Namespace).Get(tcName) + if err != nil { + errs = append(errs, err) + continue + } + sts.Annotations[label.AnnStsLastSyncTimestamp] = time.Now().Format(time.RFC3339) + newSts, err := c.statefulSetControl.UpdateStatefulSet(tc, sts) + if err != nil { + klog.Errorf("failed to update statefulset %q, error: %v", sts.Name, err) + errs = append(errs, err) + } + klog.Infof("successfully updated statefulset %q", newSts.Name) + } + return errors.NewAggregate(errs) +} diff --git a/pkg/label/label.go b/pkg/label/label.go index b67fa2339f..4917eb3658 100644 --- a/pkg/label/label.go +++ b/pkg/label/label.go @@ -84,6 +84,8 @@ const ( AnnEvictLeaderBeginTime = "tidb.pingcap.com/evictLeaderBeginTime" // AnnPodDeferDeleting is pod annotation key to indicate the pod which need to be restarted AnnPodDeferDeleting = "tidb.pingcap.com/pod-defer-deleting" + // AnnStsSyncTimestamp is sts annotation key to indicate the last timestamp the operator sync the sts + AnnStsLastSyncTimestamp = "tidb.pingcap.com/sync-timestamp" // AnnForceUpgradeVal is tc annotation value to indicate whether force upgrade should be done AnnForceUpgradeVal = "true" diff --git a/pkg/manager/member/utils.go b/pkg/manager/member/utils.go index 2c83627d0e..26d23490f8 100644 --- a/pkg/manager/member/utils.go +++ b/pkg/manager/member/utils.go @@ -18,7 +18,6 @@ import ( "crypto/sha256" "encoding/json" "fmt" - "github.com/BurntSushi/toml" "github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" @@ -281,6 +280,10 @@ func updateStatefulSet(setCtl controller.StatefulSetControlInterface, tc *v1alph set.Spec.Template.Annotations[LastAppliedConfigAnnotation] = podConfig } set.Annotations = newSet.Annotations + v, ok := oldSet.Annotations[label.AnnStsLastSyncTimestamp] + if ok { + set.Annotations[label.AnnStsLastSyncTimestamp] = v + } *set.Spec.Replicas = *newSet.Spec.Replicas set.Spec.UpdateStrategy = newSet.Spec.UpdateStrategy if isOrphan {