Skip to content

Commit

Permalink
Staging changes for linking machines through discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Feb 14, 2023
1 parent 874ad56 commit eba929c
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_machines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.11.2
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
name: machines.karpenter.sh
spec:
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_provisioners.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.11.2
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
name: provisioners.karpenter.sh
spec:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
const (
ProvisionerNameLabelKey = Group + "/provisioner-name"
MachineNameLabelKey = Group + "/machine-name"
ManagedByLabelKey = Group + "/managed-by"
LabelNodeInitialized = Group + "/initialized"
LabelCapacityType = Group + "/capacity-type"
)
Expand All @@ -44,6 +45,7 @@ const (
DoNotConsolidateNodeAnnotationKey = Group + "/do-not-consolidate"
EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp"
VoluntaryDisruptionAnnotationKey = Group + "/voluntary-disruption"
MachineLinkedAnnotationKey = Group + "/linked"

ProviderCompatabilityAnnotationKey = CompatabilityGroup + "/provider"

Expand Down
12 changes: 6 additions & 6 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,18 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (
Allocatable: functional.FilterMap(instanceType.Allocatable(), func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) }),
},
}
c.CreatedMachines[machine.Name] = created
c.CreatedMachines[created.Status.ProviderID] = created
return created, nil
}

func (c *CloudProvider) Get(_ context.Context, machineName string, _ string) (*v1alpha5.Machine, error) {
func (c *CloudProvider) Get(_ context.Context, id string) (*v1alpha5.Machine, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if machine, ok := c.CreatedMachines[machineName]; ok {
if machine, ok := c.CreatedMachines[id]; ok {
return machine.DeepCopy(), nil
}
return nil, cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with name '%s'", machineName))
return nil, cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with id '%s'", id))
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Provisioner) ([]*cloudprovider.InstanceType, error) {
Expand Down Expand Up @@ -188,8 +188,8 @@ func (c *CloudProvider) Delete(_ context.Context, m *v1alpha5.Machine) error {
c.mu.Lock()
defer c.mu.Unlock()

if _, ok := c.CreatedMachines[m.Name]; ok {
delete(c.CreatedMachines, m.Name)
if _, ok := c.CreatedMachines[m.Status.ProviderID]; ok {
delete(c.CreatedMachines, m.Status.ProviderID)
return nil
}
return cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with name '%s'", m.Name))
Expand Down
9 changes: 7 additions & 2 deletions pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,17 @@ func (d *decorator) Delete(ctx context.Context, machine *v1alpha5.Machine) error
return d.CloudProvider.Delete(ctx, machine)
}

func (d *decorator) Get(ctx context.Context, machineName, provisionerName string) (*v1alpha5.Machine, error) {
func (d *decorator) Get(ctx context.Context, id string) (*v1alpha5.Machine, error) {
defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(injection.GetControllerName(ctx), "Get", d.Name()))()
return d.CloudProvider.Get(ctx, machineName, provisionerName)
return d.CloudProvider.Get(ctx, id)
}

func (d *decorator) GetInstanceTypes(ctx context.Context, provisioner *v1alpha5.Provisioner) ([]*cloudprovider.InstanceType, error) {
defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(injection.GetControllerName(ctx), "GetInstanceTypes", d.Name()))()
return d.CloudProvider.GetInstanceTypes(ctx, provisioner)
}

func (d *decorator) IsMachineDrifted(ctx context.Context, machine *v1alpha5.Machine) (bool, error) {
defer metrics.Measure(methodDurationHistogramVec.WithLabelValues(injection.GetControllerName(ctx), "IsMachineDrifted", d.Name()))()
return d.CloudProvider.IsMachineDrifted(ctx, machine)
}
2 changes: 1 addition & 1 deletion pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type CloudProvider interface {
// Delete removes a machine from the cloudprovider by its machine name
Delete(context.Context, *v1alpha5.Machine) error
// Get retrieves a machine from the cloudprovider by its machine name
Get(context.Context, string, string) (*v1alpha5.Machine, error)
Get(context.Context, string) (*v1alpha5.Machine, error)
// GetInstanceTypes returns instance types supported by the cloudprovider.
// Availability of types or zone may vary by provisioner or over time. Regardless of
// availability, the GetInstanceTypes method should always return all instance types,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman

for _, oldNode := range command.nodesToRemove {
c.recorder.Publish(deprovisioningevents.TerminatingNode(oldNode, command.String()))
if err := c.kubeClient.Delete(ctx, oldNode); err != nil {
if err := c.kubeClient.Delete(ctx, oldNode); client.IgnoreNotFound(err) != nil {
logging.FromContext(ctx).Errorf("Deleting node, %s", err)
} else {
metrics.NodesTerminatedCounter.WithLabelValues(fmt.Sprintf("%s/%s", d, command.action)).Inc()
Expand Down
10 changes: 6 additions & 4 deletions pkg/controllers/machine/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou
recorder: recorder,
terminator: terminator,

garbageCollect: &GarbageCollect{kubeClient: kubeClient, cloudProvider: cloudProvider, lastChecked: cache.New(time.Minute*10, 1*time.Minute)},
launch: &Launch{kubeClient: kubeClient, cloudProvider: cloudProvider},
garbageCollect: &GarbageCollect{kubeClient: kubeClient, cloudProvider: cloudProvider, lastChecked: cache.New(time.Minute*10, time.Second*10)},
launch: &Launch{kubeClient: kubeClient, cloudProvider: cloudProvider, cache: cache.New(time.Minute, time.Second*10)},
registration: &Registration{kubeClient: kubeClient},
initialization: &Initialization{kubeClient: kubeClient},
liveness: &Liveness{clock: clk, kubeClient: kubeClient},
Expand Down Expand Up @@ -136,8 +136,10 @@ func (c *Controller) Finalize(ctx context.Context, machine *v1alpha5.Machine) (r
return reconcile.Result{}, nil
}
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", machine.Status.ProviderID))
if err := c.cloudProvider.Delete(ctx, machine); cloudprovider.IgnoreMachineNotFoundError(err) != nil {
return reconcile.Result{}, fmt.Errorf("terminating cloudprovider instance, %w", err)
if machine.Status.ProviderID != "" {
if err := c.cloudProvider.Delete(ctx, machine); cloudprovider.IgnoreMachineNotFoundError(err) != nil {
return reconcile.Result{}, fmt.Errorf("terminating cloudprovider instance, %w", err)
}
}
controllerutil.RemoveFinalizer(machine, v1alpha5.TerminationFinalizer)
if !equality.Semantic.DeepEqual(stored, machine) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/machine/garbagecollect.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (e *GarbageCollect) Reconcile(ctx context.Context, machine *v1alpha5.Machin
if _, expireTime, ok := e.lastChecked.GetWithExpiration(client.ObjectKeyFromObject(machine).String()); ok {
return reconcile.Result{RequeueAfter: time.Until(expireTime)}, nil
}
if _, err := e.cloudProvider.Get(ctx, machine.Name, machine.Labels[v1alpha5.ProvisionerNameLabelKey]); cloudprovider.IsMachineNotFoundError(err) {
if _, err := e.cloudProvider.Get(ctx, machine.Status.ProviderID); cloudprovider.IsMachineNotFoundError(err) {
return reconcile.Result{}, client.IgnoreNotFound(e.kubeClient.Delete(ctx, machine))
}
e.lastChecked.SetDefault(client.ObjectKeyFromObject(machine).String(), nil)
Expand Down
52 changes: 36 additions & 16 deletions pkg/controllers/machine/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,69 @@ import (
"context"
"fmt"

"github.com/patrickmn/go-cache"
"github.com/samber/lo"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/scheduling"
)

type Launch struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
cache *cache.Cache // exists due to eventual consistency on the cache
}

func (l *Launch) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) {
if machine.Status.ProviderID != "" {
return reconcile.Result{}, nil
}
retrieved, err := l.cloudProvider.Get(ctx, machine.Name, machine.Labels[v1alpha5.ProvisionerNameLabelKey])
if err != nil {
if cloudprovider.IsMachineNotFoundError(err) {
logging.FromContext(ctx).Debugf("creating machine")
retrieved, err = l.cloudProvider.Create(ctx, machine)
if err != nil {
if cloudprovider.IsInsufficientCapacityError(err) {
logging.FromContext(ctx).Error(err)
return reconcile.Result{}, client.IgnoreNotFound(l.kubeClient.Delete(ctx, machine))
var err error
var created *v1alpha5.Machine
if ret, ok := l.cache.Get(client.ObjectKeyFromObject(machine).String()); ok {
created = ret.(*v1alpha5.Machine)
} else if id, ok := machine.Annotations[v1alpha5.MachineLinkedAnnotationKey]; ok {
logging.FromContext(ctx).Debugf("linking machine")
created, err = l.cloudProvider.Get(ctx, id)
if err != nil {
if cloudprovider.IsMachineNotFoundError(err) {
if err = l.kubeClient.Delete(ctx, machine); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
return reconcile.Result{}, fmt.Errorf("creating machine, %w", err)
return reconcile.Result{}, nil
}
} else {
return reconcile.Result{}, fmt.Errorf("getting machine, %w", err)
return reconcile.Result{}, fmt.Errorf("linking machine, %w", err)
}
} else {
logging.FromContext(ctx).Debugf("creating machine")
created, err = l.cloudProvider.Create(ctx, machine)
if err != nil {
if cloudprovider.IsInsufficientCapacityError(err) {
logging.FromContext(ctx).Error(err)
return reconcile.Result{}, client.IgnoreNotFound(l.kubeClient.Delete(ctx, machine))
}
return reconcile.Result{}, fmt.Errorf("creating machine, %w", err)
}
}
populateMachineDetails(machine, retrieved)
l.cache.SetDefault(client.ObjectKeyFromObject(machine).String(), created)
populateMachineDetails(machine, created)
machine.StatusConditions().MarkTrue(v1alpha5.MachineCreated)
return reconcile.Result{}, nil
}

func populateMachineDetails(machine, retrieved *v1alpha5.Machine) {
machine.Labels = lo.Assign(machine.Labels, retrieved.Labels, map[string]string{
v1alpha5.MachineNameLabelKey: machine.Name,
})
machine.Labels = lo.Assign(
scheduling.NewNodeSelectorRequirements(machine.Spec.Requirements...).Labels(),
machine.Labels,
retrieved.Labels,
map[string]string{
v1alpha5.MachineNameLabelKey: machine.Name,
},
)
machine.Annotations = lo.Assign(machine.Annotations, retrieved.Annotations)
machine.Status.ProviderID = retrieved.Status.ProviderID
machine.Status.Allocatable = retrieved.Status.Allocatable
Expand Down
40 changes: 22 additions & 18 deletions pkg/controllers/machine/launch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ var _ = Describe("Launch", func() {
ExpectApplied(ctx, env.Client, provisioner, machine)
ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine))

machine = ExpectExists(ctx, env.Client, machine)

Expect(cloudProvider.CreateCalls).To(HaveLen(1))
Expect(cloudProvider.CreatedMachines).To(HaveLen(1))
_, err := cloudProvider.Get(ctx, machine.Name, "")
_, err := cloudProvider.Get(ctx, machine.Status.ProviderID)
Expect(err).ToNot(HaveOccurred())
})
It("should get an instance and hydrate the Machine when the Machine is already created", func() {
It("should add the MachineCreated status condition after creating the Machine", func() {
machine := test.Machine(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -63,9 +65,14 @@ var _ = Describe("Launch", func() {
},
})
ExpectApplied(ctx, env.Client, provisioner, machine)
ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine))

machine = ExpectExists(ctx, env.Client, machine)
Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineCreated).Status).To(Equal(v1.ConditionTrue))
})
It("should link an instance with the karpenter.sh/linked annotation", func() {
cloudProviderMachine := &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: machine.Name,
Labels: map[string]string{
v1.LabelInstanceTypeStable: "small-instance-type",
v1.LabelTopologyZone: "test-zone-1a",
Expand All @@ -87,7 +94,18 @@ var _ = Describe("Launch", func() {
},
},
}
cloudProvider.CreatedMachines[machine.Name] = cloudProviderMachine
cloudProvider.CreatedMachines[cloudProviderMachine.Status.ProviderID] = cloudProviderMachine
machine := test.Machine(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1alpha5.MachineLinkedAnnotationKey: cloudProviderMachine.Status.ProviderID,
},
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
},
},
})
ExpectApplied(ctx, env.Client, provisioner, machine)
ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine))

machine = ExpectExists(ctx, env.Client, machine)
Expand All @@ -101,20 +119,6 @@ var _ = Describe("Launch", func() {
Expect(machine.Labels).To(HaveKeyWithValue(v1.LabelTopologyRegion, "test-zone"))
Expect(machine.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha5.CapacityTypeSpot))
})
It("should add the MachineCreated status condition after creating the Machine", func() {
machine := test.Machine(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
},
},
})
ExpectApplied(ctx, env.Client, provisioner, machine)
ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine))

machine = ExpectExists(ctx, env.Client, machine)
Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineCreated).Status).To(Equal(v1.ConditionTrue))
})
It("should delete the machine if InsufficientCapacity is returned from the cloudprovider", func() {
cloudProvider.NextCreateErr = cloudprovider.NewInsufficientCapacityError(fmt.Errorf("all instance types were unavailable"))
machine := test.Machine()
Expand Down
21 changes: 16 additions & 5 deletions pkg/controllers/machine/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -42,6 +43,7 @@ func (r *Registration) Reconcile(ctx context.Context, machine *v1alpha5.Machine)
if machine.StatusConditions().GetCondition(v1alpha5.MachineRegistered).IsTrue() {
return reconcile.Result{}, nil
}

ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", machine.Status.ProviderID))
node, err := nodeForMachine(ctx, r.kubeClient, machine)
if err != nil {
Expand All @@ -66,15 +68,24 @@ func (r *Registration) Reconcile(ctx context.Context, machine *v1alpha5.Machine)
func (r *Registration) syncNode(ctx context.Context, machine *v1alpha5.Machine, node *v1.Node) error {
stored := node.DeepCopy()
controllerutil.AddFinalizer(node, v1alpha5.TerminationFinalizer)
node.Labels = lo.Assign(node.Labels, machine.Labels)
node.Annotations = lo.Assign(node.Annotations, machine.Annotations)
lo.Must0(controllerutil.SetOwnerReference(machine, node, scheme.Scheme))
// Remove any provisioner owner references since we own them
node.OwnerReferences = lo.Reject(node.OwnerReferences, func(o metav1.OwnerReference, _ int) bool {
return o.Kind == "Provisioner"
})

// Sync all taints inside of Machine into the Machine taints
node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.Taints)
// If the machine isn't registered as linked, then sync it
// This prevents us from messing with nodes that already exist and are scheduled
if _, ok := machine.Annotations[v1alpha5.MachineLinkedAnnotationKey]; !ok {
node.Labels = lo.Assign(node.Labels, machine.Labels)
node.Annotations = lo.Assign(node.Annotations, machine.Annotations)
// Sync all taints inside of Machine into the Machine taints
node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.Taints)
}
node.Labels[v1alpha5.MachineNameLabelKey] = machine.Labels[v1alpha5.MachineNameLabelKey]
if !machine.StatusConditions().GetCondition(v1alpha5.MachineRegistered).IsTrue() {
node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.StartupTaints)
}
lo.Must0(controllerutil.SetOwnerReference(machine, node, scheme.Scheme))
if !equality.Semantic.DeepEqual(stored, node) {
if err := r.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil {
return fmt.Errorf("syncing node labels, %w", err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/operator/controller/singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"context"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -68,7 +69,7 @@ func newSingletonMetrics(name string) *singletonMetrics {
reconcileDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: name,
Subsystem: strings.ReplaceAll(name, ".", "_"),
Name: "reconcile_time_seconds",
Help: "Length of time per reconcile.",
Buckets: metrics.DurationBuckets(),
Expand All @@ -77,7 +78,7 @@ func newSingletonMetrics(name string) *singletonMetrics {
reconcileErrors: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: name,
Subsystem: strings.ReplaceAll(name, ".", "_"),
Name: "reconcile_errors_total",
Help: "Total number of reconcile errors.",
},
Expand Down

0 comments on commit eba929c

Please sign in to comment.