Skip to content

Commit

Permalink
feat: Add machine controller for v1alpha5.Machine reconciliation (#167
Browse files Browse the repository at this point in the history
)

* Add cluster name label key for machine tagging

* Initial commit for machine controller

* Add cleanup logic and label/annotation sync

* Add termination flow to machine

* Return new versions of machines from cloudprovider calls

* Break up machine into sub-reconcilers

* Add liveness if node hasn't registered within registration ttl

* Add syncing for startup taints

* Add testing for machine controller

* PR comments
  • Loading branch information
jonathan-innis authored Jan 26, 2023
1 parent 8e8c049 commit 3809636
Show file tree
Hide file tree
Showing 24 changed files with 1,614 additions and 151 deletions.
4 changes: 2 additions & 2 deletions charts/karpenter-core/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ metadata:
rules:
# Read
- apiGroups: ["karpenter.sh"]
resources: ["provisioners", "provisioners/status"]
resources: ["provisioners", "provisioners/status", "machines", "machines/status"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers", "namespaces"]
Expand All @@ -49,7 +49,7 @@ rules:
verbs: [ "get", "list", "watch" ]
# Write
- apiGroups: ["karpenter.sh"]
resources: ["provisioners/status"]
resources: ["provisioners/status", "machines", "machines/status"]
verbs: ["create", "delete", "patch"]
- apiGroups: [""]
resources: ["events"]
Expand Down
9 changes: 8 additions & 1 deletion pkg/apis/crds/karpenter.sh_machines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ spec:
singular: machine
scope: Cluster
versions:
- name: v1alpha5
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type=="Ready")].status
name: Ready
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1alpha5
schema:
openAPIV3Schema:
description: Machine is the Schema for the Machines API
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/v1alpha5/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ type ResourceRequirements struct {
// +kubebuilder:object:root=true
// +kubebuilder:resource:path=machines,scope=Cluster,categories=karpenter
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status",description=""
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description=""
type Machine struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
52 changes: 35 additions & 17 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ type CloudProvider struct {
InstanceTypes []*cloudprovider.InstanceType

// CreateCalls contains the arguments for every create call that was made since it was cleared
mu sync.Mutex
mu sync.RWMutex
CreateCalls []*v1alpha5.Machine
AllowedCreateCalls int
CreatedMachines map[string]*v1alpha5.Machine
Drifted bool
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)

func NewCloudProvider() *CloudProvider {
return &CloudProvider{
AllowedCreateCalls: math.MaxInt,
CreatedMachines: map[string]*v1alpha5.Machine{},
}
}

Expand All @@ -61,26 +61,29 @@ func (c *CloudProvider) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.CreateCalls = []*v1alpha5.Machine{}
c.CreatedMachines = map[string]*v1alpha5.Machine{}
c.AllowedCreateCalls = math.MaxInt
}

func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (*v1alpha5.Machine, error) {
c.mu.Lock()
defer c.mu.Unlock()

c.CreateCalls = append(c.CreateCalls, machine)
if len(c.CreateCalls) > c.AllowedCreateCalls {
c.mu.Unlock()
return &v1alpha5.Machine{}, fmt.Errorf("erroring as number of AllowedCreateCalls has been exceeded")
}
c.mu.Unlock()

requirements := scheduling.NewNodeSelectorRequirements(machine.Spec.Requirements...)
reqs := scheduling.NewNodeSelectorRequirements(machine.Spec.Requirements...)
instanceTypes := lo.Filter(lo.Must(c.GetInstanceTypes(ctx, &v1alpha5.Provisioner{})), func(i *cloudprovider.InstanceType, _ int) bool {
return requirements.Get(v1.LabelInstanceTypeStable).Has(i.Name)
return reqs.Compatible(i.Requirements) == nil &&
len(i.Offerings.Requirements(reqs).Available()) > 0 &&
resources.Fits(resources.Merge(machine.Spec.Resources.Requests, i.Overhead.Total()), i.Capacity)
})
// Order instance types so that we get the cheapest instance types of the available offerings
sort.Slice(instanceTypes, func(i, j int) bool {
iOfferings := instanceTypes[i].Offerings.Available().Requirements(requirements)
jOfferings := instanceTypes[j].Offerings.Available().Requirements(requirements)
iOfferings := instanceTypes[i].Offerings.Available().Requirements(reqs)
jOfferings := instanceTypes[j].Offerings.Available().Requirements(reqs)
return iOfferings.Cheapest().Price < jOfferings.Cheapest().Price
})
instanceType := instanceTypes[0]
Expand All @@ -93,7 +96,7 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (
}
// Find Offering
for _, o := range instanceType.Offerings.Available() {
if requirements.Compatible(scheduling.NewRequirements(
if reqs.Compatible(scheduling.NewRequirements(
scheduling.NewRequirement(v1.LabelTopologyZone, v1.NodeSelectorOpIn, o.Zone),
scheduling.NewRequirement(v1alpha5.LabelCapacityType, v1.NodeSelectorOpIn, o.CapacityType),
)) == nil {
Expand All @@ -103,22 +106,30 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (
}
}
name := test.RandomName()
return &v1alpha5.Machine{
created := &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
},
Spec: *machine.Spec.DeepCopy(),
Status: v1alpha5.MachineStatus{
ProviderID: fmt.Sprintf("fake://%s", name),
ProviderID: test.ProviderID(name),
Capacity: functional.FilterMap(instanceType.Capacity, func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) }),
Allocatable: functional.FilterMap(instanceType.Allocatable(), func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) }),
},
}, nil
}
c.CreatedMachines[machine.Name] = created
return created, nil
}

func (c *CloudProvider) Get(context.Context, string, string) (*v1alpha5.Machine, error) {
return nil, nil
func (c *CloudProvider) Get(_ context.Context, machineName string, _ string) (*v1alpha5.Machine, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if machine, ok := c.CreatedMachines[machineName]; ok {
return machine.DeepCopy(), nil
}
return nil, cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with name '%s'", machineName))
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Provisioner) ([]*cloudprovider.InstanceType, error) {
Expand Down Expand Up @@ -165,8 +176,15 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Provisio
}, nil
}

func (c *CloudProvider) Delete(context.Context, *v1alpha5.Machine) error {
return nil
func (c *CloudProvider) Delete(_ context.Context, m *v1alpha5.Machine) error {
c.mu.Lock()
defer c.mu.Unlock()

if _, ok := c.CreatedMachines[m.Name]; ok {
delete(c.CreatedMachines, m.Name)
return nil
}
return cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with name '%s'", m.Name))
}

func (c *CloudProvider) IsMachineDrifted(context.Context, *v1alpha5.Machine) (bool, error) {
Expand Down
16 changes: 8 additions & 8 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/counter"
"github.com/aws/karpenter-core/pkg/controllers/deprovisioning"
"github.com/aws/karpenter-core/pkg/controllers/inflightchecks"
"github.com/aws/karpenter-core/pkg/controllers/machine/terminator"
metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner"
metricsstate "github.com/aws/karpenter-core/pkg/controllers/metrics/state"
Expand All @@ -36,7 +37,6 @@ import (
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/settingsstore"
)

func init() {
Expand All @@ -49,25 +49,25 @@ func NewControllers(
kubeClient client.Client,
kubernetesInterface kubernetes.Interface,
cluster *state.Cluster,
eventRecorder events.Recorder,
settingsStore settingsstore.Store,
recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider,
) []controller.Controller {
provisioner := provisioning.NewProvisioner(ctx, kubeClient, kubernetesInterface.CoreV1(), eventRecorder, cloudProvider, cluster)

provisioner := provisioning.NewProvisioner(ctx, kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
terminator := terminator.NewTerminator(clock, kubeClient, cloudProvider, terminator.NewEvictionQueue(ctx, kubernetesInterface.CoreV1(), recorder))
return []controller.Controller{
provisioner,
metricsstate.NewController(cluster),
deprovisioning.NewController(clock, kubeClient, provisioner, cloudProvider, eventRecorder, cluster),
provisioning.NewController(kubeClient, provisioner, eventRecorder),
deprovisioning.NewController(clock, kubeClient, provisioner, cloudProvider, recorder, cluster),
provisioning.NewController(kubeClient, provisioner, recorder),
informer.NewNodeController(kubeClient, cluster),
informer.NewPodController(kubeClient, cluster),
informer.NewProvisionerController(kubeClient, cluster),
node.NewController(clock, kubeClient, cloudProvider, cluster),
termination.NewController(clock, kubeClient, termination.NewEvictionQueue(ctx, kubernetesInterface.CoreV1(), eventRecorder), eventRecorder, cloudProvider),
termination.NewController(kubeClient, terminator, recorder),
metricspod.NewController(kubeClient),
metricsprovisioner.NewController(kubeClient),
counter.NewController(kubeClient, cluster),
inflightchecks.NewController(clock, kubeClient, eventRecorder, cloudProvider),
inflightchecks.NewController(clock, kubeClient, recorder, cloudProvider),
}
}
Loading

0 comments on commit 3809636

Please sign in to comment.