Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Minor refactor for cronjobtrigger. Add some tests #640

Merged
merged 5 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kubeless-rbac.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ local controller_roles = [
{
apiGroups: ["kubeless.io"],
resources: ["functions", "kafkatriggers", "httptriggers", "cronjobtriggers"],
verbs: ["get", "list", "watch", "update"],
verbs: ["get", "list", "watch", "update", "delete"],
},
{
apiGroups: ["batch"],
Expand Down
76 changes: 27 additions & 49 deletions pkg/controller/cronjob_trigger_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,10 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
}

// CronJob Trigger object should be deleted, so remove associated cronjob and remove the finalizer
_, err := c.clientset.BatchV2alpha1().CronJobs(ns).Get(fmt.Sprintf("trigger-%s", name), metav1.GetOptions{})
if err == nil {
err = c.clientset.BatchV2alpha1().CronJobs(ns).Delete(fmt.Sprintf("trigger-%s", name), &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to remove CronJob created for CronJobTrigger Obj: %s due to: %v: ", key, err)
return err
}
err = c.clientset.BatchV1beta1().CronJobs(ns).Delete(name, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to remove CronJob created for CronJobTrigger Obj: %s due to: %v: ", key, err)
return err
}

// remove finalizer from the cronjob trigger object, so that we dont have to process any further and object can be deleted
Expand Down Expand Up @@ -237,18 +234,12 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
return err
}

restIface := c.clientset.BatchV2alpha1().RESTClient()
groupVersion, err := c.getResouceGroupVersion("cronjobs")
if err != nil {
return err
}

functionObj, err := c.functionInformer.Lister().Functions(ns).Get(cronJobtriggerObj.Spec.FunctionName)
if err != nil {
c.logger.Errorf("Unable to find the function %s in the namespace %s. Received %s: ", cronJobtriggerObj.Spec.FunctionName, ns, err)
return err
}
err = utils.EnsureCronJob(restIface, functionObj, cronJobtriggerObj, or, groupVersion)
err = utils.EnsureCronJob(c.clientset, functionObj, cronJobtriggerObj.Spec.Schedule, or)
if err != nil {
return err
}
Expand All @@ -257,53 +248,41 @@ func (c *CronJobTriggerController) syncCronJobTrigger(key string) error {
return nil
}

func (c *CronJobTriggerController) getResouceGroupVersion(target string) (string, error) {
resources, err := c.clientset.Discovery().ServerResources()
if err != nil {
return "", err
}
groupVersion := ""
for _, resource := range resources {
for _, apiResource := range resource.APIResources {
if apiResource.Name == target {
groupVersion = resource.GroupVersion
break
}
}
}
if groupVersion == "" {
return "", fmt.Errorf("Resource %s not found in any group", target)
}
return groupVersion, nil
}

func (c *CronJobTriggerController) functionAddedDeletedUpdated(obj interface{}, deleted bool) {
func (c *CronJobTriggerController) functionAddedDeletedUpdated(obj interface{}, deleted bool) error {
functionObj, ok := obj.(*kubelessApi.Function)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
c.logger.Errorf("Couldn't get object from tombstone %#v", obj)
return
err := fmt.Errorf("Couldn't get object from tombstone %#v", obj)
c.logger.Errorf(err.Error())
return err
}
functionObj, ok = tombstone.Obj.(*kubelessApi.Function)
if !ok {
c.logger.Errorf("Tombstone contained object that is not a Pod %#v", obj)
return
err := fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj)
c.logger.Errorf(err.Error())
return err
}
}

c.logger.Infof("Processing update to function object %s Namespace: %s", functionObj.Name, functionObj.Namespace)
if deleted {
//check if func is scheduled or not
cronJobName := fmt.Sprintf("trigger-%s", functionObj.ObjectMeta.Name)
_, err := c.clientset.BatchV2alpha1().CronJobs(functionObj.ObjectMeta.Namespace).Get(cronJobName, metav1.GetOptions{})
if err == nil {
err = c.clientset.BatchV2alpha1().CronJobs(functionObj.ObjectMeta.Namespace).Delete(cronJobName, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to delete cronjob %s created for the function %s in namespace %s, Error: %s", cronJobName, functionObj.ObjectMeta.Name, functionObj.ObjectMeta.Namespace, err)
c.logger.Infof("Function %s deleted. Removing associated cronjob trigger", functionObj.Name)
cjtList, err := c.kubelessclient.KubelessV1beta1().CronJobTriggers(functionObj.Namespace).List(metav1.ListOptions{})
if err != nil {
return err
}
for _, cjt := range cjtList.Items {
if cjt.Spec.FunctionName == functionObj.Name {
err = c.kubelessclient.KubelessV1beta1().CronJobTriggers(functionObj.Namespace).Delete(cjt.Name, &metav1.DeleteOptions{})
if err != nil && !k8sErrors.IsNotFound(err) {
c.logger.Errorf("Failed to delete cronjobtrigger created for the function %s in namespace %s, Error: %s", functionObj.ObjectMeta.Name, functionObj.ObjectMeta.Namespace, err)
return err
}
}
}
}
return nil
}

func (c *CronJobTriggerController) cronJobTriggerObjHasFinalizer(triggerObj *kubelessApi.CronJobTrigger) bool {
Expand Down Expand Up @@ -351,9 +330,8 @@ func cronJobTriggerObjChanged(oldObj, newObj *kubelessApi.CronJobTrigger) bool {
if oldObj.ResourceVersion != newObj.ResourceVersion {
return true
}
newSpec := &newObj.Spec
oldSpec := &oldObj.Spec

newSpec := newObj.Spec
oldSpec := oldObj.Spec
if newSpec.Schedule != oldSpec.Schedule {
return true
}
Expand Down
150 changes: 150 additions & 0 deletions pkg/controller/cronjob_trigger_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package controller

import (
"testing"
"time"

kubelessApi "github.com/kubeless/kubeless/pkg/apis/kubeless/v1beta1"
kubelessFake "github.com/kubeless/kubeless/pkg/client/clientset/versioned/fake"
"github.com/sirupsen/logrus"
batchv1beta1 "k8s.io/api/batch/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

func TestFunctionAddedUpdated(t *testing.T) {
myNsFoo := metav1.ObjectMeta{
Namespace: "myns",
Name: "foo",
}

f := kubelessApi.Function{
ObjectMeta: myNsFoo,
}

cjtrigger := kubelessApi.CronJobTrigger{
ObjectMeta: myNsFoo,
}

triggerClientset := kubelessFake.NewSimpleClientset(&f, &cjtrigger)

cronjob := batchv1beta1.CronJob{
ObjectMeta: myNsFoo,
}
clientset := fake.NewSimpleClientset(&cronjob)

controller := CronJobTriggerController{
clientset: clientset,
kubelessclient: triggerClientset,
logger: logrus.WithField("controller", "cronjob-trigger-controller"),
}

// no-op for when the function is not deleted
err := controller.functionAddedDeletedUpdated(&f, false)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

list, err := controller.kubelessclient.KubelessV1beta1().CronJobTriggers("myns").List(metav1.ListOptions{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(list.Items) != 1 || list.Items[0].ObjectMeta.Name != "foo" {
t.Errorf("Missing trigger in list: %v", list.Items)
}
}

func TestFunctionDeleted(t *testing.T) {
myNsFoo := metav1.ObjectMeta{
Namespace: "myns",
Name: "foo",
}

f := kubelessApi.Function{
ObjectMeta: myNsFoo,
}

cjtrigger := kubelessApi.CronJobTrigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "myns",
Name: "foo-trigger",
},
Spec: kubelessApi.CronJobTriggerSpec{
FunctionName: "foo",
},
}

triggerClientset := kubelessFake.NewSimpleClientset(&f, &cjtrigger)

cronjob := batchv1beta1.CronJob{
ObjectMeta: myNsFoo,
}
clientset := fake.NewSimpleClientset(&cronjob)

controller := CronJobTriggerController{
clientset: clientset,
kubelessclient: triggerClientset,
logger: logrus.WithField("controller", "cronjob-trigger-controller"),
}

// no-op for when the function is not deleted
err := controller.functionAddedDeletedUpdated(&f, true)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

list, err := controller.kubelessclient.KubelessV1beta1().CronJobTriggers("myns").List(metav1.ListOptions{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(list.Items) != 0 {
t.Errorf("Trigger should be deleted from list: %v", list.Items)
}
}

func TestCronJobTriggerObjChanged(t *testing.T) {
type testObj struct {
old *kubelessApi.CronJobTrigger
new *kubelessApi.CronJobTrigger
expectedChanged bool
}
t1 := metav1.Time{
Time: time.Now(),
}
t2 := metav1.Time{
Time: time.Now(),
}
testObjs := []testObj{
{
old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
expectedChanged: false,
},
{
old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &t1}},
new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &t2}},
expectedChanged: true,
},
{
old: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
new: &kubelessApi.CronJobTrigger{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
expectedChanged: true,
},
{
old: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
new: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
expectedChanged: false,
},
{
old: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "*/10 * * * *"}},
new: &kubelessApi.CronJobTrigger{Spec: kubelessApi.CronJobTriggerSpec{Schedule: "* * * * *"}},
expectedChanged: true,
},
}
for _, to := range testObjs {
changed := cronJobTriggerObjChanged(to.old, to.new)
if changed != to.expectedChanged {
t.Errorf("%v != %v expected to be %v", to.old, to.new, to.expectedChanged)
}
}
}
20 changes: 10 additions & 10 deletions pkg/utils/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

"k8s.io/api/autoscaling/v2beta1"
batchv1 "k8s.io/api/batch/v1"
batchv2alpha1 "k8s.io/api/batch/v2alpha1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
clientsetAPIExtensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -960,7 +960,7 @@ func doRESTReq(restIface rest.Interface, groupVersion, verb, resource, elem, nam
}

// EnsureCronJob creates/updates a function cron job
func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJobObj *kubelessApi.CronJobTrigger, or []metav1.OwnerReference, groupVersion string) error {
func EnsureCronJob(client kubernetes.Interface, funcObj *kubelessApi.Function, schedule string, or []metav1.OwnerReference) error {
var maxSucccessfulHist, maxFailedHist int32
maxSucccessfulHist = 3
maxFailedHist = 1
Expand All @@ -986,18 +986,18 @@ func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJob
headersString = headersString + " -H \"event-time: " + timestamp.String() + "\""
headersString = headersString + " -H \"event-type: application/json\""
headersString = headersString + " -H \"event-namespace: cronjobtrigger.kubeless.io\""
job := &batchv2alpha1.CronJob{
job := &batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: funcObj.ObjectMeta.Namespace,
Labels: funcObj.ObjectMeta.Labels,
OwnerReferences: or,
},
Spec: batchv2alpha1.CronJobSpec{
Schedule: cronJobObj.Spec.Schedule,
Spec: batchv1beta1.CronJobSpec{
Schedule: schedule,
SuccessfulJobsHistoryLimit: &maxSucccessfulHist,
FailedJobsHistoryLimit: &maxFailedHist,
JobTemplate: batchv2alpha1.JobTemplateSpec{
JobTemplate: batchv1beta1.JobTemplateSpec{
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &activeDeadlineSeconds,
Template: v1.PodTemplateSpec{
Expand All @@ -1019,17 +1019,17 @@ func EnsureCronJob(client rest.Interface, funcObj *kubelessApi.Function, cronJob

// We need to use directly the REST API since the endpoint
// for CronJobs changes from Kubernetes 1.8
err = doRESTReq(client, groupVersion, "create", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, job, nil)
_, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Create(job)
if err != nil && k8sErrors.IsAlreadyExists(err) {
newCronJob := batchv2alpha1.CronJob{}
err = doRESTReq(client, groupVersion, "get", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, nil, &newCronJob)
newCronJob := &batchv1beta1.CronJob{}
newCronJob, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
return err
}
newCronJob.ObjectMeta.Labels = funcObj.ObjectMeta.Labels
newCronJob.ObjectMeta.OwnerReferences = or
newCronJob.Spec = job.Spec
err = doRESTReq(client, groupVersion, "update", "cronjobs", jobName, funcObj.ObjectMeta.Namespace, &newCronJob, nil)
_, err = client.BatchV1beta1().CronJobs(funcObj.ObjectMeta.Namespace).Update(newCronJob)
}
return err
}
Expand Down
Loading