Skip to content

Commit

Permalink
Add optional flag to use v1beta1 CronJob (pre 1.21) (#6030)
Browse files Browse the repository at this point in the history
<!--
Before you open the request please review the following guidelines and
tips to help it be more easily integrated:

 - Describe the scope of your change - i.e. what the change does.
 - Describe any known limitations with your change.
- Please run any tests or examples that can exercise your modified code.

 Thank you for contributing!
 -->

### Description of the change

This PR adds an optional boolean flag that for the apprepository
controller, `--v1-beta1-cron-jobs`, which can be set to ensure that the
app repository controller creates and watches the older v1beta1
cronjobs.

Note that, although enabling this functionality to help with #6021 , we
won't be removing the restriction in the Chart.yaml for > k8s 1.21, so
to install the chart on a cluster prior to 1.21 you'll need to manually
comment out that line in the Chart.yaml (this just ensures we don't go
back and start supporting older k8s versions officially).

To use the flag, you can add the following to your values.yaml:

```yaml
apprepository:
  extraFlags:
    - "--v1-beta1-cron-jobs"
```
<!-- Describe the scope of your change - i.e. what the change does. -->

### Benefits

<!-- What benefits will be realized by the code change? -->
People who need to can choose to install Kubeapps explicitly on 1.20 .

### Possible drawbacks

<!-- Describe any known limitations with your change -->

### Applicable issues

<!-- Enter any applicable Issues here (You can reference an issue using
#) -->

- fixes #6021 

### Additional information

<!-- If there's anything else that's important and relevant to your pull
request, mention that information here.-->

@mecampbellsoup Can you please test the resulting image on your 1.20 dev
cluster and works as you expect (I tested locally with a 1.20 kind
cluster and see app repos being synced etc.)

---------

Signed-off-by: Michael Nelson <minelson@vmware.com>
  • Loading branch information
absoludity authored Mar 1, 2023
1 parent 0652e62 commit 07d413d
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 40 deletions.
1 change: 1 addition & 0 deletions cmd/apprepository-controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func setFlags(c *cobra.Command) {
c.Flags().StringVar(&serveOpts.TTLSecondsAfterFinished, "ttl-lifetime-afterfinished-job", "3600", "Lifetime limit after which the resource Jobs are deleted expressed in seconds by default is 3600 (1h) ")
c.Flags().StringSliceVar(&serveOpts.CustomAnnotations, "custom-annotations", []string{""}, "optional annotations to be passed to the generated CronJobs, Jobs and Pods objects. For example: my/annotation=foo")
c.Flags().StringSliceVar(&serveOpts.CustomLabels, "custom-labels", []string{""}, "optional labels to be passed to the generated CronJobs, Jobs and Pods objects. For example: my/label=foo")
c.Flags().BoolVar(&serveOpts.V1Beta1CronJobs, "v1-beta1-cron-jobs", false, "Defaults to false and so using the v1 cronjobs.")
}

// initConfig reads in config file and ENV variables if set.
Expand Down
151 changes: 111 additions & 40 deletions cmd/apprepository-controller/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
listers "github.com/vmware-tanzu/kubeapps/cmd/apprepository-controller/pkg/client/listers/apprepository/v1alpha1"
"github.com/vmware-tanzu/kubeapps/pkg/helm"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
batchlisters "k8s.io/client-go/listers/batch/v1"
batchlistersv1beta1 "k8s.io/client-go/listers/batch/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -75,10 +77,11 @@ type Controller struct {
// apprepoclientset is the clientset for AppRepository resources
apprepoclientset clientset.Interface

cronjobsLister batchlisters.CronJobLister
cronjobsSynced cache.InformerSynced
appreposLister listers.AppRepositoryLister
appreposSynced cache.InformerSynced
cronjobsLister batchlisters.CronJobLister
cronjobsListerv1beta1 batchlistersv1beta1.CronJobLister
cronjobsSynced cache.InformerSynced
appreposLister listers.AppRepositoryLister
appreposSynced cache.InformerSynced

// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
Expand All @@ -101,16 +104,15 @@ func NewController(
apprepoInformerFactory informers.SharedInformerFactory,
conf *Config) *Controller {

// obtain references to shared index informers for the CronJob and
// AppRepository types.
cronjobInformer := kubeInformerFactory.Batch().V1().CronJobs()
// obtain references to shared index informers for the AppRepository type.
apprepoInformer := apprepoInformerFactory.Kubeapps().V1alpha1().AppRepositories()

// Create event broadcaster
// Add apprepository-controller types to the default Kubernetes Scheme so
// Events can be logged for apprepository-controller types.
err := appreposcheme.AddToScheme(scheme.Scheme)
if err != nil {
log.Errorf("Unable to create new controller: %v", err)
return nil
}

Expand All @@ -123,8 +125,6 @@ func NewController(
controller := &Controller{
kubeclientset: kubeclientset,
apprepoclientset: apprepoclientset,
cronjobsLister: cronjobInformer.Lister(),
cronjobsSynced: cronjobInformer.Informer().HasSynced,
appreposLister: apprepoInformer.Lister(),
appreposSynced: apprepoInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AppRepositories"),
Expand Down Expand Up @@ -154,21 +154,43 @@ func NewController(
log.Warningf("Error adding AppRepository event handler: %v", err)
}

controller.setBatchLister(conf.V1Beta1CronJobs, kubeInformerFactory)

return controller
}

// setBatchListener sets the specific batch listener based on the config, ie. either
// a v1 batch listener or a v1beta1 batch listener.
//
// This allows users on 1.20 to continue to use the latest release.
func (c *Controller) setBatchLister(useV1Beta1 bool, kubeInformerFactory kubeinformers.SharedInformerFactory) {
// Set up an event handler for when CronJob resources get deleted. This
// handler will lookup the owner of the given CronJob, and if it is owned by a
// AppRepository resource will enqueue that AppRepository resource for
// processing so the CronJob gets correctly recreated. This way, we don't need
// to implement custom logic for handling CronJob resources. More info on this
// pattern:
// https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
_, err = cronjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: controller.handleObject,
})
if err != nil {
log.Warningf("Error adding CronJob event handler: %v", err)
var err error
if useV1Beta1 {
cronjobInformer := kubeInformerFactory.Batch().V1beta1().CronJobs()
c.cronjobsListerv1beta1 = cronjobInformer.Lister()
c.cronjobsSynced = cronjobInformer.Informer().HasSynced
_, err = cronjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.handleObject,
})
} else {
cronjobInformer := kubeInformerFactory.Batch().V1().CronJobs()
c.cronjobsLister = cronjobInformer.Lister()
c.cronjobsSynced = cronjobInformer.Informer().HasSynced
_, err = cronjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.handleObject,
})
}

return controller
if err != nil {
log.Fatalf("Error adding CronJob event handler: %v", err)
}
}

// Run will set up the event handlers for types we are interested in, as well
Expand Down Expand Up @@ -299,30 +321,7 @@ func (c *Controller) syncHandler(key string) error {
return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
}

// Get the cronjob with the same name as AppRepository
cronjobName := cronJobName(namespace, name, false)
cronjob, err := c.cronjobsLister.CronJobs(c.conf.KubeappsNamespace).Get(cronjobName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
log.Infof("Creating CronJob %q for AppRepository %q", cronjobName, apprepo.GetName())
cronjob, err = c.kubeclientset.BatchV1().CronJobs(c.conf.KubeappsNamespace).Create(context.TODO(), newCronJob(apprepo, c.conf), metav1.CreateOptions{})
if err != nil {
return err
}

// Trigger a manual Job for the initial sync
_, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newSyncJob(apprepo, c.conf), metav1.CreateOptions{})
} else if err == nil {
// If the resource already exists, we'll update it
log.Infof("Updating CronJob %q in namespace %q for AppRepository %q in namespace %q", cronjobName, c.conf.KubeappsNamespace, apprepo.GetName(), apprepo.GetNamespace())
cronjob, err = c.kubeclientset.BatchV1().CronJobs(c.conf.KubeappsNamespace).Update(context.TODO(), newCronJob(apprepo, c.conf), metav1.UpdateOptions{})
if err != nil {
return err
}

// The AppRepository has changed, launch a manual Job
_, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newSyncJob(apprepo, c.conf), metav1.CreateOptions{})
}
cronjob, err := c.ensureCronJob(apprepo)

// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
Expand All @@ -335,7 +334,7 @@ func (c *Controller) syncHandler(key string) error {
// cronjob for an app repo in another namespace, then we should
// log a warning to the event recorder and return it.
if !metav1.IsControlledBy(cronjob, apprepo) && !objectBelongsTo(cronjob, apprepo) {
msg := fmt.Sprintf(MessageResourceExists, cronjob.Name)
msg := fmt.Sprintf(MessageResourceExists, cronjob.GetName())
c.recorder.Event(apprepo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
Expand All @@ -346,6 +345,64 @@ func (c *Controller) syncHandler(key string) error {
return nil
}

// ensureCronJob ensures that the cronjob exists and is up-to-date.
//
// It looks after creating either a v1 cronjob or v1beta1 cronjob depending on
// configuration.
func (c *Controller) ensureCronJob(apprepo *apprepov1alpha1.AppRepository) (metav1.Object, error) {
// Get the cronjob with the same name as AppRepository
cronjobName := cronJobName(apprepo.GetObjectMeta().GetNamespace(), apprepo.GetObjectMeta().GetName(), false)

var cronjob metav1.Object
var err error

if c.conf.V1Beta1CronJobs {
cronjob, err = c.cronjobsListerv1beta1.CronJobs(c.conf.KubeappsNamespace).Get(cronjobName)
} else {
cronjob, err = c.cronjobsLister.CronJobs(c.conf.KubeappsNamespace).Get(cronjobName)
}

// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
log.Infof("Creating CronJob %q for AppRepository %q", cronjobName, apprepo.GetName())
if c.conf.V1Beta1CronJobs {
cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Create(context.TODO(), v1CronJobToV1Beta1CronJob(newCronJob(apprepo, c.conf)), metav1.CreateOptions{})
if err != nil {
return nil, err
}
} else {
cronjob, err = c.kubeclientset.BatchV1().CronJobs(c.conf.KubeappsNamespace).Create(context.TODO(), newCronJob(apprepo, c.conf), metav1.CreateOptions{})
}
if err != nil {
return nil, err
}

// Trigger a manual Job for the initial sync
_, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newSyncJob(apprepo, c.conf), metav1.CreateOptions{})
if err != nil {
return nil, err
}
} else if err == nil {
// If the resource already exists, we'll update it
log.Infof("Updating CronJob %q in namespace %q for AppRepository %q in namespace %q", cronjobName, c.conf.KubeappsNamespace, apprepo.GetName(), apprepo.GetNamespace())
if c.conf.V1Beta1CronJobs {
cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Update(context.TODO(), v1CronJobToV1Beta1CronJob(newCronJob(apprepo, c.conf)), metav1.UpdateOptions{})
} else {
cronjob, err = c.kubeclientset.BatchV1().CronJobs(c.conf.KubeappsNamespace).Update(context.TODO(), newCronJob(apprepo, c.conf), metav1.UpdateOptions{})
}
if err != nil {
return nil, err
}

// The AppRepository has changed, launch a manual Job
_, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newSyncJob(apprepo, c.conf), metav1.CreateOptions{})
if err != nil {
return nil, err
}
}
return cronjob, nil
}

// belongsTo is similar to IsControlledBy, but enables us to establish a relationship
// between cronjobs and app repositories in different namespaces.
func objectBelongsTo(object, parent metav1.Object) bool {
Expand Down Expand Up @@ -517,6 +574,20 @@ func newCronJob(apprepo *apprepov1alpha1.AppRepository, config Config) *batchv1.
}
}

// v1CronJobToV1Beta1CronJob does exactly what it says: converts a v1 cronjob to a v1beta1 cronjob.
func v1CronJobToV1Beta1CronJob(cj *batchv1.CronJob) *batchv1beta1.CronJob {
return &batchv1beta1.CronJob{
ObjectMeta: cj.ObjectMeta,
Spec: batchv1beta1.CronJobSpec{
Schedule: cj.Spec.Schedule,
ConcurrencyPolicy: batchv1beta1.ConcurrencyPolicy(cj.Spec.ConcurrencyPolicy),
JobTemplate: batchv1beta1.JobTemplateSpec{
Spec: cj.Spec.JobTemplate.Spec,
},
},
}
}

// newSyncJob triggers a job for the AppRepository resource. It also sets the
// appropriate OwnerReferences on the resource
func newSyncJob(apprepo *apprepov1alpha1.AppRepository, config Config) *batchv1.Job {
Expand Down
15 changes: 15 additions & 0 deletions cmd/apprepository-controller/server/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,21 @@ func Test_newCronJob(t *testing.T) {
}
})
}

// Test or v1beta1 cronjobs too.
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := makeDefaultConfig()
config.V1Beta1CronJobs = true
config.Crontab = tt.crontab
config.UserAgentComment = tt.userAgentComment

result := newCronJob(tt.apprepo, config)
if got, want := tt.expected, *result; !cmp.Equal(want, got) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got))
}
})
}
}

func Test_newSyncJob(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions cmd/apprepository-controller/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
CustomLabels []string
ParsedCustomAnnotations map[string]string
ParsedCustomLabels map[string]string
V1Beta1CronJobs bool
}

func Serve(serveOpts Config) error {
Expand Down

0 comments on commit 07d413d

Please sign in to comment.