diff --git a/pkg/apis/crds/karpenter.sh_machines.yaml b/pkg/apis/crds/karpenter.sh_machines.yaml index b1011f7f22..7afa6848a3 100644 --- a/pkg/apis/crds/karpenter.sh_machines.yaml +++ b/pkg/apis/crds/karpenter.sh_machines.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.2 + controller-gen.kubebuilder.io/version: v0.11.3 creationTimestamp: null name: machines.karpenter.sh spec: diff --git a/pkg/apis/crds/karpenter.sh_provisioners.yaml b/pkg/apis/crds/karpenter.sh_provisioners.yaml index c21677812c..112cd62d2c 100644 --- a/pkg/apis/crds/karpenter.sh_provisioners.yaml +++ b/pkg/apis/crds/karpenter.sh_provisioners.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.2 + controller-gen.kubebuilder.io/version: v0.11.3 creationTimestamp: null name: provisioners.karpenter.sh spec: diff --git a/pkg/apis/v1alpha5/labels.go b/pkg/apis/v1alpha5/labels.go index 80e38fa8df..a65d6334ce 100644 --- a/pkg/apis/v1alpha5/labels.go +++ b/pkg/apis/v1alpha5/labels.go @@ -34,6 +34,7 @@ const ( const ( ProvisionerNameLabelKey = Group + "/provisioner-name" MachineNameLabelKey = Group + "/machine-name" + ManagedByLabelKey = Group + "/managed-by" LabelNodeInitialized = Group + "/initialized" LabelCapacityType = Group + "/capacity-type" ) @@ -44,6 +45,7 @@ const ( DoNotConsolidateNodeAnnotationKey = Group + "/do-not-consolidate" EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp" VoluntaryDisruptionAnnotationKey = Group + "/voluntary-disruption" + MachineLinkedAnnotationKey = Group + "/linked" ProviderCompatabilityAnnotationKey = CompatabilityGroup + "/provider" diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index fdb96f9d22..432b1947cc 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -126,18 +126,18 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) ( Allocatable: functional.FilterMap(instanceType.Allocatable(), func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) }), }, } - c.CreatedMachines[machine.Name] = created + c.CreatedMachines[created.Status.ProviderID] = created return created, nil } -func (c *CloudProvider) Get(_ context.Context, machineName string, _ string) (*v1alpha5.Machine, error) { +func (c *CloudProvider) Get(_ context.Context, id string) (*v1alpha5.Machine, error) { c.mu.RLock() defer c.mu.RUnlock() - if machine, ok := c.CreatedMachines[machineName]; ok { + if machine, ok := c.CreatedMachines[id]; ok { return machine.DeepCopy(), nil } - return nil, cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with name '%s'", machineName)) + return nil, cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with id '%s'", id)) } func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Provisioner) ([]*cloudprovider.InstanceType, error) { @@ -188,8 +188,8 @@ func (c *CloudProvider) Delete(_ context.Context, m *v1alpha5.Machine) error { c.mu.Lock() defer c.mu.Unlock() - if _, ok := c.CreatedMachines[m.Name]; ok { - delete(c.CreatedMachines, m.Name) + if _, ok := c.CreatedMachines[m.Status.ProviderID]; ok { + delete(c.CreatedMachines, m.Status.ProviderID) return nil } return cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with name '%s'", m.Name)) diff --git a/pkg/cloudprovider/metrics/cloudprovider.go b/pkg/cloudprovider/metrics/cloudprovider.go index 855b8fe62b..857b939268 100644 --- a/pkg/cloudprovider/metrics/cloudprovider.go +++ b/pkg/cloudprovider/metrics/cloudprovider.go @@ -76,12 +76,17 @@ func (d *decorator) Delete(ctx context.Context, machine *v1alpha5.Machine) error return d.CloudProvider.Delete(ctx, machine) } -func (d *decorator) Get(ctx context.Context, machineName, provisionerName string) (*v1alpha5.Machine, error) { +func (d *decorator) Get(ctx context.Context, id string) (*v1alpha5.Machine, error) { defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(injection.GetControllerName(ctx), "Get", d.Name()))() - return d.CloudProvider.Get(ctx, machineName, provisionerName) + return d.CloudProvider.Get(ctx, id) } func (d *decorator) GetInstanceTypes(ctx context.Context, provisioner *v1alpha5.Provisioner) ([]*cloudprovider.InstanceType, error) { defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(injection.GetControllerName(ctx), "GetInstanceTypes", d.Name()))() return d.CloudProvider.GetInstanceTypes(ctx, provisioner) } + +func (d *decorator) IsMachineDrifted(ctx context.Context, machine *v1alpha5.Machine) (bool, error) { + defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(injection.GetControllerName(ctx), "IsMachineDrifted", d.Name()))() + return d.CloudProvider.IsMachineDrifted(ctx, machine) +} diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 6508c1c6b9..23a8c324a5 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -54,7 +54,7 @@ type CloudProvider interface { // Delete removes a machine from the cloudprovider by its machine name Delete(context.Context, *v1alpha5.Machine) error // Get retrieves a machine from the cloudprovider by its machine name - Get(context.Context, string, string) (*v1alpha5.Machine, error) + Get(context.Context, string) (*v1alpha5.Machine, error) // GetInstanceTypes returns instance types supported by the cloudprovider. // Availability of types or zone may vary by provisioner or over time. Regardless of // availability, the GetInstanceTypes method should always return all instance types, diff --git a/pkg/controllers/deprovisioning/controller.go b/pkg/controllers/deprovisioning/controller.go index 3e0dff2cf9..c4b987af01 100644 --- a/pkg/controllers/deprovisioning/controller.go +++ b/pkg/controllers/deprovisioning/controller.go @@ -154,7 +154,7 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman for _, oldNode := range command.nodesToRemove { c.recorder.Publish(deprovisioningevents.TerminatingNode(oldNode, command.String())) - if err := c.kubeClient.Delete(ctx, oldNode); err != nil { + if err := c.kubeClient.Delete(ctx, oldNode); client.IgnoreNotFound(err) != nil { logging.FromContext(ctx).Errorf("Deleting node, %s", err) } else { metrics.NodesTerminatedCounter.WithLabelValues(fmt.Sprintf("%s/%s", d, command.action)).Inc() diff --git a/pkg/controllers/machine/controller.go b/pkg/controllers/machine/controller.go index 416da34914..b2ec2fd5e1 100644 --- a/pkg/controllers/machine/controller.go +++ b/pkg/controllers/machine/controller.go @@ -75,8 +75,8 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou recorder: recorder, terminator: terminator, - garbageCollect: &GarbageCollect{kubeClient: kubeClient, cloudProvider: cloudProvider, lastChecked: cache.New(time.Minute*10, 1*time.Minute)}, - launch: &Launch{kubeClient: kubeClient, cloudProvider: cloudProvider}, + garbageCollect: &GarbageCollect{kubeClient: kubeClient, cloudProvider: cloudProvider, lastChecked: cache.New(time.Minute*10, time.Second*10)}, + launch: &Launch{kubeClient: kubeClient, cloudProvider: cloudProvider, cache: cache.New(time.Minute, time.Second*10)}, registration: &Registration{kubeClient: kubeClient}, initialization: &Initialization{kubeClient: kubeClient}, liveness: &Liveness{clock: clk, kubeClient: kubeClient}, @@ -136,8 +136,10 @@ func (c *Controller) Finalize(ctx context.Context, machine *v1alpha5.Machine) (r return reconcile.Result{}, nil } ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", machine.Status.ProviderID)) - if err := c.cloudProvider.Delete(ctx, machine); cloudprovider.IgnoreMachineNotFoundError(err) != nil { - return reconcile.Result{}, fmt.Errorf("terminating cloudprovider instance, %w", err) + if machine.Status.ProviderID != "" { + if err := c.cloudProvider.Delete(ctx, machine); cloudprovider.IgnoreMachineNotFoundError(err) != nil { + return reconcile.Result{}, fmt.Errorf("terminating cloudprovider instance, %w", err) + } } controllerutil.RemoveFinalizer(machine, v1alpha5.TerminationFinalizer) if !equality.Semantic.DeepEqual(stored, machine) { diff --git a/pkg/controllers/machine/garbagecollect.go b/pkg/controllers/machine/garbagecollect.go index eedbcf4f6f..128a4021ed 100644 --- a/pkg/controllers/machine/garbagecollect.go +++ b/pkg/controllers/machine/garbagecollect.go @@ -43,7 +43,7 @@ func (e *GarbageCollect) Reconcile(ctx context.Context, machine *v1alpha5.Machin if _, expireTime, ok := e.lastChecked.GetWithExpiration(client.ObjectKeyFromObject(machine).String()); ok { return reconcile.Result{RequeueAfter: time.Until(expireTime)}, nil } - if _, err := e.cloudProvider.Get(ctx, machine.Name, machine.Labels[v1alpha5.ProvisionerNameLabelKey]); cloudprovider.IsMachineNotFoundError(err) { + if _, err := e.cloudProvider.Get(ctx, machine.Status.ProviderID); cloudprovider.IsMachineNotFoundError(err) { return reconcile.Result{}, client.IgnoreNotFound(e.kubeClient.Delete(ctx, machine)) } e.lastChecked.SetDefault(client.ObjectKeyFromObject(machine).String(), nil) diff --git a/pkg/controllers/machine/launch.go b/pkg/controllers/machine/launch.go index 07b9c9574b..7a7d046d9f 100644 --- a/pkg/controllers/machine/launch.go +++ b/pkg/controllers/machine/launch.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/patrickmn/go-cache" "github.com/samber/lo" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" @@ -25,42 +26,61 @@ import ( "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" + "github.com/aws/karpenter-core/pkg/scheduling" ) type Launch struct { kubeClient client.Client cloudProvider cloudprovider.CloudProvider + cache *cache.Cache // exists due to eventual consistency on the cache } func (l *Launch) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { if machine.Status.ProviderID != "" { return reconcile.Result{}, nil } - retrieved, err := l.cloudProvider.Get(ctx, machine.Name, machine.Labels[v1alpha5.ProvisionerNameLabelKey]) - if err != nil { - if cloudprovider.IsMachineNotFoundError(err) { - logging.FromContext(ctx).Debugf("creating machine") - retrieved, err = l.cloudProvider.Create(ctx, machine) - if err != nil { - if cloudprovider.IsInsufficientCapacityError(err) { - logging.FromContext(ctx).Error(err) - return reconcile.Result{}, client.IgnoreNotFound(l.kubeClient.Delete(ctx, machine)) + var err error + var created *v1alpha5.Machine + if ret, ok := l.cache.Get(client.ObjectKeyFromObject(machine).String()); ok { + created = ret.(*v1alpha5.Machine) + } else if id, ok := machine.Annotations[v1alpha5.MachineLinkedAnnotationKey]; ok { + logging.FromContext(ctx).Debugf("linking machine") + created, err = l.cloudProvider.Get(ctx, id) + if err != nil { + if cloudprovider.IsMachineNotFoundError(err) { + if err = l.kubeClient.Delete(ctx, machine); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) } - return reconcile.Result{}, fmt.Errorf("creating machine, %w", err) + return reconcile.Result{}, nil } - } else { - return reconcile.Result{}, fmt.Errorf("getting machine, %w", err) + return reconcile.Result{}, fmt.Errorf("linking machine, %w", err) + } + } else { + logging.FromContext(ctx).Debugf("creating machine") + created, err = l.cloudProvider.Create(ctx, machine) + if err != nil { + if cloudprovider.IsInsufficientCapacityError(err) { + logging.FromContext(ctx).Error(err) + return reconcile.Result{}, client.IgnoreNotFound(l.kubeClient.Delete(ctx, machine)) + } + return reconcile.Result{}, fmt.Errorf("creating machine, %w", err) } } - populateMachineDetails(machine, retrieved) + l.cache.SetDefault(client.ObjectKeyFromObject(machine).String(), created) + populateMachineDetails(machine, created) machine.StatusConditions().MarkTrue(v1alpha5.MachineCreated) return reconcile.Result{}, nil } func populateMachineDetails(machine, retrieved *v1alpha5.Machine) { - machine.Labels = lo.Assign(machine.Labels, retrieved.Labels, map[string]string{ - v1alpha5.MachineNameLabelKey: machine.Name, - }) + machine.Labels = lo.Assign( + scheduling.NewNodeSelectorRequirements(machine.Spec.Requirements...).Labels(), + machine.Labels, + retrieved.Labels, + map[string]string{ + v1alpha5.MachineNameLabelKey: machine.Name, + }, + ) machine.Annotations = lo.Assign(machine.Annotations, retrieved.Annotations) machine.Status.ProviderID = retrieved.Status.ProviderID machine.Status.Allocatable = retrieved.Status.Allocatable diff --git a/pkg/controllers/machine/launch_test.go b/pkg/controllers/machine/launch_test.go index 6c6119f50f..76ee6eef96 100644 --- a/pkg/controllers/machine/launch_test.go +++ b/pkg/controllers/machine/launch_test.go @@ -49,12 +49,14 @@ var _ = Describe("Launch", func() { ExpectApplied(ctx, env.Client, provisioner, machine) ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(cloudProvider.CreateCalls).To(HaveLen(1)) Expect(cloudProvider.CreatedMachines).To(HaveLen(1)) - _, err := cloudProvider.Get(ctx, machine.Name, "") + _, err := cloudProvider.Get(ctx, machine.Status.ProviderID) Expect(err).ToNot(HaveOccurred()) }) - It("should get an instance and hydrate the Machine when the Machine is already created", func() { + It("should add the MachineCreated status condition after creating the Machine", func() { machine := test.Machine(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -63,9 +65,14 @@ var _ = Describe("Launch", func() { }, }) ExpectApplied(ctx, env.Client, provisioner, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineCreated).Status).To(Equal(v1.ConditionTrue)) + }) + It("should link an instance with the karpenter.sh/linked annotation", func() { cloudProviderMachine := &v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ - Name: machine.Name, Labels: map[string]string{ v1.LabelInstanceTypeStable: "small-instance-type", v1.LabelTopologyZone: "test-zone-1a", @@ -87,7 +94,18 @@ var _ = Describe("Launch", func() { }, }, } - cloudProvider.CreatedMachines[machine.Name] = cloudProviderMachine + cloudProvider.CreatedMachines[cloudProviderMachine.Status.ProviderID] = cloudProviderMachine + machine := test.Machine(v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1alpha5.MachineLinkedAnnotationKey: cloudProviderMachine.Status.ProviderID, + }, + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + }, + }, + }) + ExpectApplied(ctx, env.Client, provisioner, machine) ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) machine = ExpectExists(ctx, env.Client, machine) @@ -101,20 +119,6 @@ var _ = Describe("Launch", func() { Expect(machine.Labels).To(HaveKeyWithValue(v1.LabelTopologyRegion, "test-zone")) Expect(machine.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha5.CapacityTypeSpot)) }) - It("should add the MachineCreated status condition after creating the Machine", func() { - machine := test.Machine(v1alpha5.Machine{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - }) - ExpectApplied(ctx, env.Client, provisioner, machine) - ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) - - machine = ExpectExists(ctx, env.Client, machine) - Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineCreated).Status).To(Equal(v1.ConditionTrue)) - }) It("should delete the machine if InsufficientCapacity is returned from the cloudprovider", func() { cloudProvider.NextCreateErr = cloudprovider.NewInsufficientCapacityError(fmt.Errorf("all instance types were unavailable")) machine := test.Machine() diff --git a/pkg/controllers/machine/registration.go b/pkg/controllers/machine/registration.go index 55c78fed9f..db1ecbc0a0 100644 --- a/pkg/controllers/machine/registration.go +++ b/pkg/controllers/machine/registration.go @@ -21,6 +21,7 @@ import ( "github.com/samber/lo" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -42,6 +43,7 @@ func (r *Registration) Reconcile(ctx context.Context, machine *v1alpha5.Machine) if machine.StatusConditions().GetCondition(v1alpha5.MachineRegistered).IsTrue() { return reconcile.Result{}, nil } + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", machine.Status.ProviderID)) node, err := nodeForMachine(ctx, r.kubeClient, machine) if err != nil { @@ -66,15 +68,24 @@ func (r *Registration) Reconcile(ctx context.Context, machine *v1alpha5.Machine) func (r *Registration) syncNode(ctx context.Context, machine *v1alpha5.Machine, node *v1.Node) error { stored := node.DeepCopy() controllerutil.AddFinalizer(node, v1alpha5.TerminationFinalizer) - node.Labels = lo.Assign(node.Labels, machine.Labels) - node.Annotations = lo.Assign(node.Annotations, machine.Annotations) + lo.Must0(controllerutil.SetOwnerReference(machine, node, scheme.Scheme)) + // Remove any provisioner owner references since we own them + node.OwnerReferences = lo.Reject(node.OwnerReferences, func(o metav1.OwnerReference, _ int) bool { + return o.Kind == "Provisioner" + }) - // Sync all taints inside of Machine into the Machine taints - node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.Taints) + // If the machine isn't registered as linked, then sync it + // This prevents us from messing with nodes that already exist and are scheduled + if _, ok := machine.Annotations[v1alpha5.MachineLinkedAnnotationKey]; !ok { + node.Labels = lo.Assign(node.Labels, machine.Labels) + node.Annotations = lo.Assign(node.Annotations, machine.Annotations) + // Sync all taints inside of Machine into the Machine taints + node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.Taints) + } + node.Labels[v1alpha5.MachineNameLabelKey] = machine.Labels[v1alpha5.MachineNameLabelKey] if !machine.StatusConditions().GetCondition(v1alpha5.MachineRegistered).IsTrue() { node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.StartupTaints) } - lo.Must0(controllerutil.SetOwnerReference(machine, node, scheme.Scheme)) if !equality.Semantic.DeepEqual(stored, node) { if err := r.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { return fmt.Errorf("syncing node labels, %w", err) diff --git a/pkg/operator/controller/singleton.go b/pkg/operator/controller/singleton.go index 3a5fc7ba83..cc3c5a1ec2 100644 --- a/pkg/operator/controller/singleton.go +++ b/pkg/operator/controller/singleton.go @@ -16,6 +16,7 @@ package controller import ( "context" + "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -68,7 +69,7 @@ func newSingletonMetrics(name string) *singletonMetrics { reconcileDuration: prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: metrics.Namespace, - Subsystem: name, + Subsystem: strings.ReplaceAll(name, ".", "_"), Name: "reconcile_time_seconds", Help: "Length of time per reconcile.", Buckets: metrics.DurationBuckets(), @@ -77,7 +78,7 @@ func newSingletonMetrics(name string) *singletonMetrics { reconcileErrors: prometheus.NewCounter( prometheus.CounterOpts{ Namespace: metrics.Namespace, - Subsystem: name, + Subsystem: strings.ReplaceAll(name, ".", "_"), Name: "reconcile_errors_total", Help: "Total number of reconcile errors.", },