Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event in webhook #2305

Merged
merged 17 commits into from
Apr 27, 2020
14 changes: 13 additions & 1 deletion pkg/webhook/admission_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import (
"time"

"github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/features"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/webhook/pod"
"github.com/pingcap/tidb-operator/pkg/webhook/strategy"
"github.com/pingcap/tidb-operator/pkg/webhook/util"
admission "k8s.io/api/admission/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
eventv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

asappsv1 "github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1"
Expand Down Expand Up @@ -148,7 +152,15 @@ func (a *AdmissionHook) Initialize(cfg *rest.Config, stopCh <-chan struct{}) err

// init pdControl
pdControl := pdapi.NewDefaultPDControl(kubeCli)
pc := pod.NewPodAdmissionControl(kubeCli, cli, pdControl, strings.Split(a.ExtraServiceAccounts, ","), a.EvictRegionLeaderTimeout)

//init recorder
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "tidb-admission-controller"})

pc := pod.NewPodAdmissionControl(kubeCli, cli, pdControl, strings.Split(a.ExtraServiceAccounts, ","), a.EvictRegionLeaderTimeout, recorder)
a.podAC = pc
klog.Info("pod admission webhook initialized successfully")
a.stsAC = statefulset.NewStatefulSetAdmissionControl(cli)
Expand Down
8 changes: 7 additions & 1 deletion pkg/webhook/pod/pd_deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
operatorUtils "github.com/pingcap/tidb-operator/pkg/util"
"github.com/pingcap/tidb-operator/pkg/webhook/util"
admission "k8s.io/api/admission/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
Expand Down Expand Up @@ -93,6 +94,10 @@ func (pc *PodAdmissionControl) admitDeletePdPods(payload *admitPayload) *admissi
return pc.transferPDLeader(payload)
}

if isUpgrading {
pc.recorder.Event(payload.tc, corev1.EventTypeNormal, pdUpgradeReason, podDeleteEventMessage(name))
}

klog.Infof("pod[%s/%s] is not pd-leader,admit to delete", namespace, name)
return util.ARSuccess()
}
Expand Down Expand Up @@ -127,6 +132,7 @@ func (pc *PodAdmissionControl) admitDeleteNonPDMemberPod(payload *admitPayload)
pvc, err := pc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Get(pvcName, meta.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
pc.recorder.Event(payload.tc, corev1.EventTypeNormal, pdScaleInReason, podDeleteEventMessage(name))
return util.ARSuccess()
}
return util.ARFail(err)
Expand All @@ -136,10 +142,10 @@ func (pc *PodAdmissionControl) admitDeleteNonPDMemberPod(payload *admitPayload)
klog.Infof("tc[%s/%s]'s pod[%s/%s] failed to update pvc,%v", namespace, tcName, namespace, name, err)
return util.ARFail(err)
}
pc.recorder.Event(payload.tc, corev1.EventTypeNormal, pdScaleInReason, podDeleteEventMessage(name))
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
}
klog.Infof("pd pod[%s/%s] is not member of tc[%s/%s],admit to delete", namespace, name, namespace, tcName)
return util.ARSuccess()

}
err = payload.pdClient.DeleteMember(name)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion pkg/webhook/pod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
)

Expand All @@ -44,17 +45,24 @@ type PodAdmissionControl struct {
pdControl pdapi.PDControlInterface
// the map of the service account from the request which should be checked by webhook
serviceAccounts sets.String
// recorder to send event
recorder record.EventRecorder
}

const (
stsControllerServiceAccounts = "system:serviceaccount:kube-system:statefulset-controller"
podDeleteMsgPattern = "pod [%s] deleted"
pdScaleInReason = "PDScaleIn"
pdUpgradeReason = "PDUpgrade"
tikvScaleInReason = "TiKVScaleIn"
tikvUpgradeReason = "TiKVUpgrade"
)

var (
AstsControllerServiceAccounts string
)

func NewPodAdmissionControl(kubeCli kubernetes.Interface, operatorCli versioned.Interface, PdControl pdapi.PDControlInterface, extraServiceAccounts []string, evictRegionLeaderTimeout time.Duration) *PodAdmissionControl {
func NewPodAdmissionControl(kubeCli kubernetes.Interface, operatorCli versioned.Interface, PdControl pdapi.PDControlInterface, extraServiceAccounts []string, evictRegionLeaderTimeout time.Duration, recorder record.EventRecorder) *PodAdmissionControl {

serviceAccounts := sets.NewString(stsControllerServiceAccounts)
for _, sa := range extraServiceAccounts {
Expand All @@ -69,6 +77,7 @@ func NewPodAdmissionControl(kubeCli kubernetes.Interface, operatorCli versioned.
operatorCli: operatorCli,
pdControl: PdControl,
serviceAccounts: serviceAccounts,
recorder: recorder,
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/webhook/pod/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package pod
import (
"testing"

"k8s.io/client-go/kubernetes"

. "github.com/onsi/gomega"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
operatorClifake "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned/fake"
Expand All @@ -28,7 +26,9 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -130,10 +130,12 @@ func newAdmissionRequest() *admission.AdmissionRequest {
func newPodAdmissionControl(kubeCli kubernetes.Interface) *PodAdmissionControl {
operatorCli := operatorClifake.NewSimpleClientset()
pdControl := pdapi.NewFakePDControl(kubeCli)
recorder := record.NewFakeRecorder(10)
return &PodAdmissionControl{
kubeCli: kubeCli,
operatorCli: operatorCli,
pdControl: pdControl,
recorder: recorder,
}
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/webhook/pod/tikv_deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
operatorUtils "github.com/pingcap/tidb-operator/pkg/util"
"github.com/pingcap/tidb-operator/pkg/webhook/util"
admission "k8s.io/api/admission/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
Expand Down Expand Up @@ -133,6 +134,7 @@ func (pc *PodAdmissionControl) admitDeleteUselessTiKVPod(payload *admitPayload)
pvc, err := pc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Get(pvcName, meta.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
pc.recorder.Event(payload.tc, corev1.EventTypeNormal, tikvScaleInReason, podDeleteEventMessage(name))
return util.ARSuccess()
}
return util.ARFail(err)
Expand All @@ -142,6 +144,7 @@ func (pc *PodAdmissionControl) admitDeleteUselessTiKVPod(payload *admitPayload)
klog.Infof("tc[%s/%s]'s tikv pod[%s/%s] failed to delete,%v", namespace, tcName, namespace, name, err)
return util.ARFail(err)
}
pc.recorder.Event(payload.tc, corev1.EventTypeNormal, tikvScaleInReason, podDeleteEventMessage(name))
}

return util.ARSuccess()
Expand Down Expand Up @@ -215,6 +218,7 @@ func (pc *PodAdmissionControl) admitDeleteUpTiKVPodDuringUpgrading(payload *admi
}
}

pc.recorder.Event(payload.tc, corev1.EventTypeNormal, tikvUpgradeReason, podDeleteEventMessage(name))
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
return util.ARSuccess()
}

Expand All @@ -231,5 +235,10 @@ func (pc *PodAdmissionControl) admitDeleteDownTikvPod(payload *admitPayload) *ad
if !isInOrdinal {
return pc.rejectDeleteTiKVPod()
}
name := payload.pod.Name
isUpgrading := operatorUtils.IsStatefulSetUpgrading(payload.ownerStatefulSet)
if isUpgrading {
pc.recorder.Event(payload.tc, corev1.EventTypeNormal, tikvUpgradeReason, podDeleteEventMessage(name))
}
return util.ARSuccess()
}
4 changes: 4 additions & 0 deletions pkg/webhook/pod/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,7 @@ func appendExtraLabelsENVForTiKV(labels map[string]string, container *core.Conta
})
}
}

func podDeleteEventMessage(name string) string {
return fmt.Sprintf(podDeleteMsgPattern, name)
}