Skip to content

Commit

Permalink
Watch Program and Flux sources and run a Stack when they change (#348)
Browse files Browse the repository at this point in the history
* Ensure Program .configuration[*].default optional

The default for a configuration item is optional; but the type is (via
an alias) apiextensionsv1.JSON, which is a struct. The zero value
serialises to `null`, which makes the Pulumi YAML engine balk.

Making this pointer typed means if it's absent, it'll be left out of the
serialisation, and Pulumi YAML won't disagree with it.

* React to Program objects changing

At present the controller will notice changes to a Program object when
it reruns a Stack that refers to that object, either because it failed
the previous time, or because it requeued it on a schedule.

This adds an index keeping track of which Stacks reference which
Programs, and a watch that will requeue all the Stacks referring to a
Program when that program changes.

* Watch Flux source kinds when they are seen in sources

A Stack can refer to a Flux source by (arbitrary) API version, kind, and
name. This gives forward-compatibility with new and third-party sources
-- but it makes it a bit trickier to detect when they have changed,
because the kinds to watch aren't known until they're seen by the
controller.

The scheme to do so here is:

 1. index the Stack against the type {group, version, kind} and name of
 the source they reference. This lets us look up the referencing stacks,
 when we see a source has been updated.

 2. whenever a source is successfully retrieved (during Stack
 reconciliation), ensure that its kind is being watched;

 3. the watchers installed in 2.) use the index in 1.) to look up the
 stacks using a source by type and name; then enqueue any such stacks to
 be reconciled.

* Let the watches deal with missing sources

When a flux Source is missing, we can now wait for the watch to fire
when it appears, rather than retrying. The same is true when the source
is marked as unready.

Signed-off-by: Michael Bridgen <mbridgen@pulumi.com>
  • Loading branch information
squaremo authored Oct 25, 2022
1 parent daeeee7 commit 3974c96
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ CHANGELOG
[#339](https://github.com/pulumi/pulumi-kubernetes-operator/pull/339)
- De-escalate a log message about a harmless error from ERROR to DEBUG
[#352](https://github.com/pulumi/pulumi-kubernetes-operator/pull/352)
- Watch source kinds and Programs to react to changes
[#348](https://github.com/pulumi/pulumi-kubernetes-operator/pull/348)

## 1.10.0 (2022-10-21)

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pulumi/v1/program_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Configuration struct {

// default is a value of the appropriate type for the template to use if no value is specified.
// +optional
Default Any `json:"default,omitempty"`
Default *Any `json:"default,omitempty"`
}

type Resource struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/apis/pulumi/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

161 changes: 153 additions & 8 deletions pkg/controller/stack/stack_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/operator-framework/operator-lib/handler"
Expand All @@ -37,13 +38,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrlhandler "sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -60,6 +63,8 @@ var (
const (
pulumiFinalizer = "finalizer.stack.pulumi.com"
defaultMaxConcurrentReconciles = 10
programRefIndexFieldName = ".spec.programRef.name" // this is an arbitrary string, named for the field it indexes
fluxSourceIndexFieldName = ".spec.fluxSource.sourceRef" // an arbitrary name, named for the field it indexes
)

const (
Expand All @@ -78,6 +83,15 @@ func IsNamespaceIsolationWaived() bool {
}
}

func getSourceGVK(src shared.FluxSourceReference) (schema.GroupVersionKind, error) {
gv, err := schema.ParseGroupVersion(src.APIVersion)
return gv.WithKind(src.Kind), err
}

func fluxSourceKey(gvk schema.GroupVersionKind, name string) string {
return fmt.Sprintf("%s:%s", gvk, name)
}

// Add creates a new Stack Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
Expand All @@ -91,7 +105,7 @@ func Add(mgr manager.Manager) error {
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
func newReconciler(mgr manager.Manager) *ReconcileStack {
return &ReconcileStack{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
Expand All @@ -100,7 +114,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(mgr manager.Manager, r *ReconcileStack) error {
var err error
maxConcurrentReconciles := defaultMaxConcurrentReconciles
maxConcurrentReconcilesStr, set := os.LookupEnv("MAX_CONCURRENT_RECONCILES")
Expand Down Expand Up @@ -148,6 +162,122 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

indexer := mgr.GetFieldIndexer()

// Watch Programs, and look up which (if any) Stack refers to them when they change

// Index stacks against the names of programs they reference
if err = indexer.IndexField(context.Background(), &pulumiv1.Stack{}, programRefIndexFieldName, func(o client.Object) []string {
stack := o.(*pulumiv1.Stack)
if stack.Spec.ProgramRef != nil {
return []string{stack.Spec.ProgramRef.Name}
}
return nil
}); err != nil {
return err
}

// this encodes the "use an index to look up the stacks used by a source" pattern which both
// ProgramRef and FluxSource need.
enqueueStacksForSourceFunc := func(indexName string, getFieldKey func(client.Object) string) func(client.Object) []reconcile.Request {
return func(src client.Object) []reconcile.Request {
var stacks pulumiv1.StackList
err := mgr.GetClient().List(context.TODO(), &stacks,
client.InNamespace(src.GetNamespace()),
client.MatchingFields{indexName: getFieldKey(src)})
if err == nil {
reqs := make([]reconcile.Request, len(stacks.Items), len(stacks.Items))
for i := range stacks.Items {
reqs[i].NamespacedName = client.ObjectKeyFromObject(&stacks.Items[i])
}
return reqs
}
// we don't get to return an error; only to fail quietly
mgr.GetLogger().Error(err, "failed to fetch stack referring to source",
"gvk", src.GetObjectKind().GroupVersionKind(),
"name", src.GetName(),
"namespace", src.GetNamespace())
return nil
}
}

err = c.Watch(&source.Kind{Type: &pulumiv1.Program{}}, ctrlhandler.EnqueueRequestsFromMapFunc(
enqueueStacksForSourceFunc(programRefIndexFieldName,
func(obj client.Object) string {
return obj.GetName()
})))
if err != nil {
return err
}

// Watch Flux sources we get told about, and look up the Stack(s) using them when they change

// Index the stacks against the type and name of sources they reference.
if err = indexer.IndexField(context.Background(), &pulumiv1.Stack{}, fluxSourceIndexFieldName, func(o client.Object) []string {
stack := o.(*pulumiv1.Stack)
if source := stack.Spec.FluxSource; source != nil {
gvk, err := getSourceGVK(source.SourceRef)
if err != nil {
mgr.GetLogger().Error(err, "unable to parse .sourceRef.apiVersion in Flux source")
return nil
}
// the keys include the type, because the references are not of a fixed type of object
return []string{fluxSourceKey(gvk, source.SourceRef.Name)}
}
return nil
}); err != nil {
return err
}

// We can't watch a specific type (i.e., using source.Kind) here; what we have to do is wait
// until we see stacks that refer to particular kinds, then watch those. Technically this can
// "leak" watches -- we may end up watching kinds that are no longer mentioned in stacks. My
// assumption is that the number of distinct types that might be mentioned (including typos) is
// low enough that this remains acceptably cheap.

// Keep track of types we've already watched, so we don't install more than one handler for a
// type.
watched := make(map[schema.GroupVersionKind]struct{})
watchedMu := sync.Mutex{}

// Calling this will attempt to install a watch for the kind given in the source reference. It
// will return an error if there's something wrong with the source reference or if the watch
// could not be attempted otherwise. If the kind cannot be found then this will keep trying in
// the background until the context given to controller.Start is cancelled, rather than return
// an error.
r.maybeWatchFluxSourceKind = func(src shared.FluxSourceReference) error {
gvk, err := getSourceGVK(src)
if err != nil {
return err
}
watchedMu.Lock()
_, ok := watched[gvk]
if !ok {
watched[gvk] = struct{}{}
}
watchedMu.Unlock()
if !ok {
// Using PartialObjectMetadata means we don't need the actual types registered in the
// schema.
var sourceKind metav1.PartialObjectMetadata
sourceKind.SetGroupVersionKind(gvk)
mgr.GetLogger().Info("installing watcher for newly seen source kind", "GroupVersionKind", gvk)
if err := c.Watch(&source.Kind{Type: &sourceKind},
ctrlhandler.EnqueueRequestsFromMapFunc(
enqueueStacksForSourceFunc(fluxSourceIndexFieldName, func(obj client.Object) string {
gvk := obj.GetObjectKind().GroupVersionKind()
return fluxSourceKey(gvk, obj.GetName())
}))); err != nil {
watchedMu.Lock()
delete(watched, gvk)
watchedMu.Unlock()
mgr.GetLogger().Error(err, "failed to watch source kind", "GroupVersionKind", gvk)
return err
}
}
return nil
}

return nil
}

Expand All @@ -161,6 +291,9 @@ type ReconcileStack struct {
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder

// this is initialised by add(), to be available to Reconcile
maybeWatchFluxSourceKind func(shared.FluxSourceReference) error
}

// StallError represents a problem that makes a Stack spec unprocessable, while otherwise being
Expand Down Expand Up @@ -342,19 +475,31 @@ func (r *ReconcileStack) Reconcile(ctx context.Context, request reconcile.Reques
Name: fluxSource.SourceRef.Name,
Namespace: request.Namespace,
}, &sourceObject); err != nil {
r.markStackFailed(sess, instance, err, "", "")
reterr := fmt.Errorf("could not resolve sourceRef: %w", err)
r.markStackFailed(sess, instance, reterr, "", "")
if client.IgnoreNotFound(err) != nil {
return reconcile.Result{}, fmt.Errorf("could not resolve sourceRef: %w", err)
return reconcile.Result{}, err
}
// TODO: revisit this, if sources are watched; perhaps it should be stalled?
instance.Status.MarkReconcilingCondition(pulumiv1.ReconcilingRetryReason, err.Error())
return reconcile.Result{Requeue: true}, nil
// this is marked as stalled and not requeued; the watch mechanism will requeue it if
// the source it points to appears.
instance.Status.MarkStalledCondition(pulumiv1.StalledSourceUnavailableReason, reterr.Error())
return reconcile.Result{}, nil
}

// Watch this kind of source, if we haven't already.
if err := r.maybeWatchFluxSourceKind(fluxSource.SourceRef); err != nil {
reterr := fmt.Errorf("cannot process source reference: %w", err)
r.markStackFailed(sess, instance, reterr, "", "")
instance.Status.MarkStalledCondition(pulumiv1.StalledSpecInvalidReason, reterr.Error())
return reconcile.Result{}, nil
}

if err := checkFluxSourceReady(sourceObject); err != nil {
r.markStackFailed(sess, instance, err, "", "")
// This is marked as retrying, but we're really waiting until the source is ready, at
// which time the watch mechanism will requeue it.
instance.Status.MarkReconcilingCondition(pulumiv1.ReconcilingRetryReason, err.Error())
return reconcile.Result{Requeue: true}, nil
return reconcile.Result{}, nil
}

currentCommit, err = sess.SetupWorkdirFromFluxSource(ctx, sourceObject, fluxSource)
Expand Down
58 changes: 51 additions & 7 deletions test/flux_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,25 +156,42 @@ var _ = Describe("Flux source integration", func() {
},
},
}
stack.Name = "missing-source-" + randString()
stack.Name = "missing-source"
stack.Namespace = "default"
})

JustBeforeEach(func() {
stack.Name += ("-" + randString())
Expect(k8sClient.Create(context.TODO(), stack)).To(Succeed())
})

AfterEach(func() {
deleteAndWaitForFinalization(stack)
})

It("is marked as failed and to be retried", func() {
It("is marked as failed and stalled", func() {
waitForStackFailure(stack)
// When this is present it could say that it's retrying, or that it's in progress; since
// it's run through at least once for us to see a failed state above, either indicates a
// retry.
Expect(apimeta.IsStatusConditionTrue(stack.Status.Conditions, pulumiv1.ReconcilingCondition)).To(BeTrue())
Expect(apimeta.IsStatusConditionTrue(stack.Status.Conditions, pulumiv1.StalledCondition)).To(BeTrue())
Expect(apimeta.IsStatusConditionTrue(stack.Status.Conditions, pulumiv1.ReadyCondition)).To(BeFalse())
Expect(apimeta.FindStatusCondition(stack.Status.Conditions, pulumiv1.StalledCondition)).To(BeNil())
Expect(apimeta.FindStatusCondition(stack.Status.Conditions, pulumiv1.ReconcilingCondition)).To(BeNil())
})

When("the source is an unknown group/kind", func() {
BeforeEach(func() {
stack.Name = "unknown-source-kind"
stack.Spec.FluxSource.SourceRef.APIVersion = "doesnotexist/v1"
})

It("is marked as failed and to be retried", func() {
waitForStackFailure(stack)
// When this is present it could say that it's retrying, or that it's in progress; since
// it's run through at least once for us to see a failed state above, either indicates a
// retry.
Expect(apimeta.IsStatusConditionTrue(stack.Status.Conditions, pulumiv1.ReconcilingCondition)).To(BeTrue())
Expect(apimeta.IsStatusConditionTrue(stack.Status.Conditions, pulumiv1.ReadyCondition)).To(BeFalse())
Expect(apimeta.FindStatusCondition(stack.Status.Conditions, pulumiv1.StalledCondition)).To(BeNil())
})
})
})

When("a Stack refers to a Flux source with a latest artifact", func() {
Expand Down Expand Up @@ -263,6 +280,33 @@ var _ = Describe("Flux source integration", func() {
})
})

When("the source is updated after a run", func() {
var newArtifactRevision string

JustBeforeEach(func() {
// wait for one go around
waitForStackSuccess(stack)

newArtifactRevision = randString()
sourceStatus := map[string]interface{}{
"artifact": map[string]interface{}{
"path": "irrelevant",
"url": artifactURL,
"revision": newArtifactRevision,
"checksum": artifactChecksum,
},
}
unstructured.SetNestedMap(source.Object, sourceStatus, "status")
resetWaitForStack()
Expect(k8sClient.Status().Update(context.TODO(), source)).To(Succeed())
})

It("runs the stack again at the new revision", func() {
waitForStackSuccess(stack)
Expect(stack.Status.LastUpdate.LastSuccessfulCommit).To(Equal(newArtifactRevision))
})
})

When("the source object is explicitly marked as not ready", func() {
BeforeEach(func() {
notready := map[string]interface{}{
Expand Down
Loading

0 comments on commit 3974c96

Please sign in to comment.