Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add a cacheSyncingClient for testing that allows for custom FieldIndexers #145

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
ctx = settings.ToContext(ctx, test.Settings(test.SettingsOptions{DriftEnabled: true}))
cloudProvider = fake.NewCloudProvider()
fakeClock = clock.NewFakeClock(time.Now())
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/inflightchecks/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestAPIs(t *testing.T) {

var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
ctx = settings.ToContext(ctx, test.Settings())
cp = &fake.CloudProvider{}
recorder = test.NewEventRecorder()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/metrics/provisioner/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
provisionerController = provisioner.NewController(env.Client)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/metrics/state/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))

ctx = settings.ToContext(ctx, test.Settings())
cloudProvider = fake.NewCloudProvider()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/node/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestAPIs(t *testing.T) {

var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
ctx = settings.ToContext(ctx, test.Settings())
cp := fake.NewCloudProvider()
cluster := state.NewCluster(ctx, fakeClock, env.Client, cp)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestScheduling(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
ctx = settings.ToContext(ctx, test.Settings())
cloudProv = fake.NewCloudProvider()
instanceTypes, _ := cloudProv.GetInstanceTypes(ctx, nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
ctx = settings.ToContext(ctx, test.Settings())
cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/state/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
})

var _ = AfterSuite(func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestAPIs(t *testing.T) {

var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))

cloudProvider := fake.NewCloudProvider()
eventRecorder := test.NewEventRecorder()
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, apis.CRDs...)
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
cmw = informer.NewInformedWatcher(env.KubernetesInterface, system.Namespace())
defaultConfigMap = &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand Down
160 changes: 160 additions & 0 deletions pkg/test/cachesyncingclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package test

import (
"context"
"fmt"
"time"

"github.com/avast/retry-go"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// CacheSyncingClient exists for tests that need to use custom fieldSelectors (thus, they need a client cache)
// and also need consistency in their testing by waiting for caches to sync after performing WRITE operations
// NOTE: This cache sync doesn't sync with third-party operations on the api-server
type CacheSyncingClient struct {
client.Client
}

// If we timeout on polling, the assumption is that the cache updated to a newer version
// and we missed the current WRITE operation that we just performed
var pollingOptions = []retry.Option{
retry.Attempts(100), // This whole poll should take ~1s
retry.Delay(time.Millisecond * 10),
retry.DelayType(retry.FixedDelay),
}

func (c *CacheSyncingClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if err := c.Client.Create(ctx, obj, opts...); err != nil {
return err
}
_ = retry.Do(func() error {
if err := c.Client.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
return fmt.Errorf("getting object, %w", err)
}
return nil
}, pollingOptions...)
return nil
}

func (c *CacheSyncingClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
if err := c.Client.Delete(ctx, obj, opts...); err != nil {
return err
}
_ = retry.Do(func() error {
if err := c.Client.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("getting object, %w", err)
}
return fmt.Errorf("object still exists")
}, pollingOptions...)
return nil
}

func (c *CacheSyncingClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
if err := c.Client.Update(ctx, obj, opts...); err != nil {
return err
}
_ = retry.Do(func() error {
return objectSynced(ctx, c.Client, obj)
}, pollingOptions...)
return nil
}

func (c *CacheSyncingClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
if err := c.Client.Patch(ctx, obj, patch, opts...); err != nil {
return err
}
_ = retry.Do(func() error {
return objectSynced(ctx, c.Client, obj)
}, pollingOptions...)
return nil
}

func (c *CacheSyncingClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error {
options := &client.DeleteAllOfOptions{}
for _, o := range opts {
o.ApplyToDeleteAllOf(options)
}
if err := c.Client.DeleteAllOf(ctx, obj, opts...); err != nil {
return err
}
metaList := &metav1.PartialObjectMetadataList{}
metaList.SetGroupVersionKind(lo.Must(apiutil.GVKForObject(obj, c.Scheme())))

_ = retry.Do(func() error {
listOptions := []client.ListOption{client.Limit(1)}
if options.ListOptions.Namespace != "" {
listOptions = append(listOptions, client.InNamespace(options.ListOptions.Namespace))
}
if err := c.Client.List(ctx, metaList, listOptions...); err != nil {
return fmt.Errorf("listing objects, %w", err)
}
if len(metaList.Items) != 0 {
return fmt.Errorf("objects still exist")
}
return nil
}, pollingOptions...)
return nil
}

func (c *CacheSyncingClient) Status() client.StatusWriter {
return &cacheSyncingStatusWriter{
client: c.Client,
}
}

type cacheSyncingStatusWriter struct {
client client.Client
}

func (c *cacheSyncingStatusWriter) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
if err := c.client.Status().Update(ctx, obj, opts...); err != nil {
return err
}
_ = retry.Do(func() error {
return objectSynced(ctx, c.client, obj)
}, pollingOptions...)
return nil
}

func (c *cacheSyncingStatusWriter) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
if err := c.client.Status().Patch(ctx, obj, patch, opts...); err != nil {
return err
}
_ = retry.Do(func() error {
return objectSynced(ctx, c.client, obj)
}, pollingOptions...)
return nil
}

func objectSynced(ctx context.Context, c client.Client, obj client.Object) error {
stored := obj.DeepCopyObject().(client.Object)
if err := c.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
return fmt.Errorf("getting object, %w", err)
}
if obj.GetResourceVersion() != stored.GetResourceVersion() {
return fmt.Errorf("object hasn't updated")
}
return nil
}
61 changes: 58 additions & 3 deletions pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,106 @@ limitations under the License.
package test

import (
"context"
"os"
"strings"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/system"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"

"github.com/aws/karpenter-core/pkg/utils/env"
"github.com/aws/karpenter-core/pkg/utils/functional"
)

type Environment struct {
envtest.Environment

Client client.Client
KubernetesInterface kubernetes.Interface
Version *version.Version
Done chan struct{}
Cancel context.CancelFunc
}

type EnvironmentOptions struct {
crds []*v1.CustomResourceDefinition
fieldIndexers []func(cache.Cache) error
}

func NewEnvironment(scheme *runtime.Scheme, crds ...*v1.CustomResourceDefinition) *Environment {
// WithCRDs registers the specified CRDs to the apiserver for use in testing
func WithCRDs(crds ...*v1.CustomResourceDefinition) functional.Option[EnvironmentOptions] {
return func(o EnvironmentOptions) EnvironmentOptions {
o.crds = append(o.crds, crds...)
return o
}
}

// WithFieldIndexers expects a function that indexes fields against the cache such as cache.IndexField(...)
func WithFieldIndexers(fieldIndexers ...func(cache.Cache) error) functional.Option[EnvironmentOptions] {
return func(o EnvironmentOptions) EnvironmentOptions {
o.fieldIndexers = append(o.fieldIndexers, fieldIndexers...)
return o
}
}

func NewEnvironment(scheme *runtime.Scheme, options ...functional.Option[EnvironmentOptions]) *Environment {
opts := functional.ResolveOptions(options...)
ctx, cancel := context.WithCancel(context.Background())

os.Setenv(system.NamespaceEnvKey, "default")
version := version.MustParseSemantic(strings.Replace(env.WithDefaultString("K8S_VERSION", "1.21.x"), ".x", ".0", -1))
environment := envtest.Environment{Scheme: scheme, CRDs: crds}
environment := envtest.Environment{Scheme: scheme, CRDs: opts.crds}
if version.Minor() >= 21 {
// PodAffinityNamespaceSelector is used for label selectors in pod affinities. If the feature-gate is turned off,
// the api-server just clears out the label selector so we never see it. If we turn it on, the label selectors
// are passed to us and we handle them. This feature is alpha in v1.21, beta in v1.22 and will be GA in 1.24. See
// https://github.com/kubernetes/enhancements/issues/2249 for more info.
environment.ControlPlane.GetAPIServer().Configure().Set("feature-gates", "PodAffinityNamespaceSelector=true")
}

_ = lo.Must(environment.Start())
c := lo.Must(client.New(environment.Config, client.Options{Scheme: scheme}))

// We use a modified client if we need field indexers
if len(opts.fieldIndexers) > 0 {
cache := lo.Must(cache.New(environment.Config, cache.Options{Scheme: scheme}))
for _, index := range opts.fieldIndexers {
lo.Must0(index(cache))
}
lo.Must0(cache.IndexField(ctx, &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
pod := o.(*corev1.Pod)
return []string{pod.Spec.NodeName}
}))
c = &CacheSyncingClient{
Client: lo.Must(client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cache,
Client: c,
})),
}
go func() {
lo.Must0(cache.Start(ctx))
}()
}
return &Environment{
Environment: environment,
Client: lo.Must(client.New(environment.Config, client.Options{Scheme: environment.Scheme})),
Client: c,
KubernetesInterface: kubernetes.NewForConfigOrDie(environment.Config),
Version: version,
Done: make(chan struct{}),
Cancel: cancel,
}
}

func (e *Environment) Stop() error {
close(e.Done)
e.Cancel()
return e.Environment.Stop()
}
Loading