Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp annotation in tidbcluster statefulset #1875

Merged
merged 15 commits into from
Mar 9, 2020
26 changes: 16 additions & 10 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main
import (
"context"
"flag"
"github.com/pingcap/tidb-operator/pkg/controller/periodicity"
"net/http"
_ "net/http/pprof"
"os"
Expand Down Expand Up @@ -51,16 +52,17 @@ import (
)

var (
printVersion bool
workers int
autoFailover bool
pdFailoverPeriod time.Duration
tikvFailoverPeriod time.Duration
tidbFailoverPeriod time.Duration
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
waitDuration = 5 * time.Second
printVersion bool
workers int
autoFailover bool
pdFailoverPeriod time.Duration
tikvFailoverPeriod time.Duration
tidbFailoverPeriod time.Duration
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
waitDuration = 5 * time.Second
periodicityDuration = 1 * time.Minute
)

func init() {
Expand All @@ -78,6 +80,7 @@ func init() {
// TODO: actually we just want to use the same image with tidb-controller-manager, but DownwardAPI cannot get image ID, see if there is any better solution
flag.StringVar(&controller.TidbDiscoveryImage, "tidb-discovery-image", "pingcap/tidb-operator:latest", "The image of the tidb discovery service")
flag.BoolVar(&controller.PodWebhookEnabled, "pod-webhook-enabled", false, "Whether Pod admission webhook is enabled")
flag.DurationVar(&periodicityDuration, "periodicity-duration", 1*time.Minute, "the interval duration between each 2 syncing for periodicity controller")
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
features.DefaultFeatureGate.AddFlag(flag.CommandLine)

flag.Parse()
Expand Down Expand Up @@ -190,6 +193,8 @@ func main() {
bsController := backupschedule.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
tidbInitController := tidbinitializer.NewController(kubeCli, cli, genericCli, informerFactory, kubeInformerFactory)
tidbMonitorController := tidbmonitor.NewController(kubeCli, genericCli, informerFactory, kubeInformerFactory)
periodicityController := periodicity.NewController(kubeCli, informerFactory, kubeInformerFactory)

var autoScalerController *autoscaler.Controller
if features.DefaultFeatureGate.Enabled(features.AutoScaling) {
autoScalerController = autoscaler.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
Expand All @@ -216,6 +221,7 @@ func main() {
go wait.Forever(func() { bsController.Run(workers, ctx.Done()) }, waitDuration)
go wait.Forever(func() { tidbInitController.Run(workers, ctx.Done()) }, waitDuration)
go wait.Forever(func() { tidbMonitorController.Run(workers, ctx.Done()) }, waitDuration)
go wait.Forever(func() { periodicityController.Run() }, periodicityDuration)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
if features.DefaultFeatureGate.Enabled(features.AutoScaling) {
go wait.Forever(func() { autoScalerController.Run(workers, ctx.Done()) }, waitDuration)
}
Expand Down
96 changes: 96 additions & 0 deletions pkg/controller/periodicity/periodicity_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package periodicity

import (
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
v1alpha1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
eventv1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
)

type Controller struct {
stsLister appslisters.StatefulSetLister
tcLister v1alpha1listers.TidbClusterLister
statefulSetControl controller.StatefulSetControlInterface
}

func NewController(
kubeCli kubernetes.Interface,
informerFactory informers.SharedInformerFactory,
kubeInformerFactory kubeinformers.SharedInformerFactory) *Controller {

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "periodiciy-controller"})
stsLister := kubeInformerFactory.Apps().V1().StatefulSets().Lister()

return &Controller{
tcLister: informerFactory.Pingcap().V1alpha1().TidbClusters().Lister(),
statefulSetControl: controller.NewRealStatefuSetControl(kubeCli, stsLister, recorder),
stsLister: stsLister,
}

}

func (c *Controller) Run() {
klog.Infof("Start to running periodicity job")
var errs []error
if controller.PodWebhookEnabled {
if err := c.syncStatefulSetTimeStamp(); err != nil {
errs = append(errs, err)
}
}
klog.Errorf("error happened in periodicity controller,err:%v", errors.NewAggregate(errs))
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}

// refer: https://github.com/pingcap/tidb-operator/pull/1875
func (c *Controller) syncStatefulSetTimeStamp() error {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
selector, err := label.New().Selector()
if err != nil {
return err
}
stsList, err := c.stsLister.List(selector)
if err != nil {
return err
}
var errs []error
for _, sts := range stsList {
// If there is any error during our sts annotation updating, we just collect the error
// and continue to next sts
if sts.Annotations == nil {
sts.Annotations = map[string]string{}
}
if sts.Labels == nil {
sts.Labels = map[string]string{}
}
tcName, ok := sts.Labels[label.InstanceLabelKey]
if !ok {
continue
}
tc, err := c.tcLister.TidbClusters(sts.Namespace).Get(tcName)
if err != nil {
errs = append(errs, err)
continue
}
sts.Annotations[label.AnnStsLastSyncTimestamp] = time.Now().Format(time.RFC3339)
newSts, err := c.statefulSetControl.UpdateStatefulSet(tc, sts)
if err != nil {
klog.Errorf("update error sts[%s],err:%v", sts.Name, err)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
errs = append(errs, err)
}
klog.Infof("newSts[%s], annotation value=%v", newSts.Name, newSts.Annotations)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}
return errors.NewAggregate(errs)
}
2 changes: 2 additions & 0 deletions pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ const (
AnnEvictLeaderBeginTime = "tidb.pingcap.com/evictLeaderBeginTime"
// AnnPodDeferDeleting is pod annotation key to indicate the pod which need to be restarted
AnnPodDeferDeleting = "tidb.pingcap.com/pod-defer-deleting"
// AnnStsSyncTimestamp is sts annotation key to indicate the last timestamp the operator sync the sts
AnnStsLastSyncTimestamp = "tidb.pingcap.com/sync-timestamp"

// AnnForceUpgradeVal is tc annotation value to indicate whether force upgrade should be done
AnnForceUpgradeVal = "true"
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (tkmm *tikvMemberManager) syncStatefulSetForTidbCluster(tc *v1alpha1.TidbCl
if err != nil {
return err
}

Yisaer marked this conversation as resolved.
Show resolved Hide resolved
if setNotExist {
err = SetStatefulSetLastAppliedConfigAnnotation(newSet)
if err != nil {
Expand Down Expand Up @@ -510,6 +511,7 @@ func getNewTiKVSetForTidbCluster(tc *v1alpha1.TidbCluster, cm *corev1.ConfigMap)
},
},
}

Yisaer marked this conversation as resolved.
Show resolved Hide resolved
return tikvset, nil
}

Expand Down
17 changes: 16 additions & 1 deletion pkg/manager/member/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"

"github.com/BurntSushi/toml"
"github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand Down Expand Up @@ -291,9 +290,25 @@ func updateStatefulSet(setCtl controller.StatefulSetControlInterface, tc *v1alph
if err != nil {
return err
}
v, ok := oldSet.Annotations[label.AnnStsLastSyncTimestamp]
if ok {
newSet.Annotations[label.AnnStsLastSyncTimestamp] = v
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove it, the AnnStsLastSyncTimestamp annotation would be cleaned after each tidbcluster syncing. Though it would not affect adding sts to the sts controller, it would be difficult for us to observe whether the dedicate controller is working when we enable the pod admission webhook.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, can you move these lines after set.Annotations = newSet.Annotations, and set set.Annotations instead of newSet.Annotations?

set.Annotations and newSet.Annotations are the same map, but we are going to call UpdateStatefulSet on set. it's clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

_, err = setCtl.UpdateStatefulSet(tc, &set)
return err
}

return nil
}

func SetLastAppliedPodTemplateAnn(sts *apps.StatefulSet) error {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
if sts.Annotations == nil {
sts.Annotations = map[string]string{}
}
b, err := json.Marshal(sts.Spec.Template.Spec)
if err != nil {
return err
}
sts.Annotations[controller.LastAppliedPodTemplate] = string(b)
return nil
}