Skip to content

Commit

Permalink
feat: Machine Migration (#273)
Browse files Browse the repository at this point in the history
* Revert "Revert machine migration changes (#176) (#241)"

This reverts commit 9973eac.

* Change owner reference to blockOwnerDeletion

* Remove string logging for machine launch

* Removing FailedInit since machine statusCondition captures it

* Updated eventing on machines
  • Loading branch information
jonathan-innis authored Apr 21, 2023
1 parent 5944d87 commit 62fe4ac
Show file tree
Hide file tree
Showing 55 changed files with 673 additions and 1,062 deletions.
19 changes: 17 additions & 2 deletions pkg/apis/crds/karpenter.sh_machines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,27 @@ spec:
- jsonPath: .metadata.labels.topology\.kubernetes\.io/zone
name: Zone
type: string
- jsonPath: .metadata.labels.karpenter\.sh/capacity-type
name: Capacity
- jsonPath: .status.nodeName
name: Node
type: string
- jsonPath: .status.conditions[?(@.type=="Ready")].status
name: Ready
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .metadata.labels.karpenter\.sh/capacity-type
name: Capacity
priority: 1
type: string
- jsonPath: .metadata.labels.karpenter\.sh/provisioner-name
name: Provisioner
priority: 1
type: string
- jsonPath: .spec.machineTemplateRef.name
name: Template
priority: 1
type: string
name: v1alpha5
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -329,6 +341,9 @@ spec:
- type
type: object
type: array
nodeName:
description: NodeName is the name of the corresponding node object
type: string
providerID:
description: ProviderID of the corresponding node object
type: string
Expand Down
1 change: 0 additions & 1 deletion pkg/apis/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const (
// Karpenter specific domains and labels
const (
ProvisionerNameLabelKey = Group + "/provisioner-name"
MachineNameLabelKey = Group + "/machine-name"
ManagedByLabelKey = Group + "/managed-by"
LabelNodeInitialized = Group + "/initialized"
LabelCapacityType = Group + "/capacity-type"
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/v1alpha5/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,12 @@ type ResourceRequirements struct {
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Type",type="string",JSONPath=".metadata.labels.node\\.kubernetes\\.io/instance-type",description=""
// +kubebuilder:printcolumn:name="Zone",type="string",JSONPath=".metadata.labels.topology\\.kubernetes\\.io/zone",description=""
// +kubebuilder:printcolumn:name="Capacity",type="string",JSONPath=".metadata.labels.karpenter\\.sh/capacity-type",description=""
// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.nodeName",description=""
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status",description=""
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description=""
// +kubebuilder:printcolumn:name="Capacity",type="string",JSONPath=".metadata.labels.karpenter\\.sh/capacity-type",priority=1,description=""
// +kubebuilder:printcolumn:name="Provisioner",type="string",JSONPath=".metadata.labels.karpenter\\.sh/provisioner-name",priority=1,description=""
// +kubebuilder:printcolumn:name="Template",type="string",JSONPath=".spec.machineTemplateRef.name",priority=1,description=""
type Machine struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/v1alpha5/machine_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (

// MachineStatus defines the observed state of Machine
type MachineStatus struct {
// NodeName is the name of the corresponding node object
// +optional
NodeName string `json:"nodeName,omitempty"`
// ProviderID of the corresponding node object
// +optional
ProviderID string `json:"providerID,omitempty"`
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (
break
}
}
name := test.RandomName()
created := &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: machine.Name,
Labels: lo.Assign(labels, machine.Labels),
Annotations: machine.Annotations,
},
Expand Down
37 changes: 24 additions & 13 deletions pkg/controllers/consistency/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/events"
corecontroller "github.com/aws/karpenter-core/pkg/operator/controller"
machineutil "github.com/aws/karpenter-core/pkg/utils/machine"
)

var _ corecontroller.TypedController[*v1.Node] = (*Controller)(nil)
var _ corecontroller.TypedController[*v1alpha5.Machine] = (*Controller)(nil)

type Controller struct {
clock clock.Clock
Expand All @@ -52,7 +54,7 @@ type Issue string
type Check interface {
// Check performs the consistency check, this should return a list of slice discovered, or an empty
// slice if no issues were found
Check(context.Context, *v1.Node) ([]Issue, error)
Check(context.Context, *v1.Node, *v1alpha5.Machine) ([]Issue, error)
}

// scanPeriod is how often we inspect and report issues that are found.
Expand All @@ -61,15 +63,14 @@ const scanPeriod = 10 * time.Minute
func NewController(clk clock.Clock, kubeClient client.Client, recorder events.Recorder,
provider cloudprovider.CloudProvider) corecontroller.Controller {

return corecontroller.Typed[*v1.Node](kubeClient, &Controller{
return corecontroller.Typed[*v1alpha5.Machine](kubeClient, &Controller{
clock: clk,
kubeClient: kubeClient,
recorder: recorder,
lastScanned: cache.New(scanPeriod, 1*time.Minute),
checks: []Check{
NewFailedInit(clk, kubeClient, provider),
NewTermination(kubeClient),
NewNodeShape(kubeClient, provider),
NewNodeShape(provider),
}},
)
}
Expand All @@ -78,39 +79,49 @@ func (c *Controller) Name() string {
return "consistency"
}

func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Result, error) {
if node.Labels[v1alpha5.ProvisionerNameLabelKey] == "" {
func (c *Controller) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) {
if machine.Status.ProviderID == "" {
return reconcile.Result{}, nil
}
// If we get an event before we should check for consistency checks, we ignore and wait
if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(node).String()); ok {
if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(machine).String()); ok {
if lastTime, ok := lastTime.(time.Time); ok {
remaining := scanPeriod - c.clock.Since(lastTime)
return reconcile.Result{RequeueAfter: remaining}, nil
}
// the above should always succeed
return reconcile.Result{RequeueAfter: scanPeriod}, nil
}
c.lastScanned.SetDefault(client.ObjectKeyFromObject(node).String(), c.clock.Now())
c.lastScanned.SetDefault(client.ObjectKeyFromObject(machine).String(), c.clock.Now())

// We assume the invariant that there is a single node for a single machine. If this invariant is violated,
// then we assume this is bubbled up through the machine lifecycle controller and don't perform consistency checks
node, err := machineutil.NodeForMachine(ctx, c.kubeClient, machine)
if err != nil {
return reconcile.Result{}, machineutil.IgnoreDuplicateNodeError(machineutil.IgnoreNodeNotFoundError(err))
}
for _, check := range c.checks {
issues, err := check.Check(ctx, node)
issues, err := check.Check(ctx, node, machine)
if err != nil {
return reconcile.Result{}, fmt.Errorf("checking node with %T, %w", check, err)
}
for _, issue := range issues {
logging.FromContext(ctx).Errorf("check failed, %s", issue)
consistencyErrors.With(prometheus.Labels{checkLabel: reflect.TypeOf(check).Elem().Name()}).Inc()
c.recorder.Publish(CheckEvent(node, string(issue))...)
c.recorder.Publish(CheckEvent(machine, string(issue)))
}
}
return reconcile.Result{RequeueAfter: scanPeriod}, nil
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder {
func (c *Controller) Builder(ctx context.Context, m manager.Manager) corecontroller.Builder {
return corecontroller.Adapt(controllerruntime.
NewControllerManagedBy(m).
For(&v1.Node{}).
For(&v1alpha5.Machine{}).
Watches(
&source.Kind{Type: &v1.Node{}},
machineutil.NodeEventHandler(ctx, c.kubeClient),
).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}),
)
}
17 changes: 8 additions & 9 deletions pkg/controllers/consistency/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ package consistency
import (
v1 "k8s.io/api/core/v1"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/events"
)

func CheckEvent(node *v1.Node, message string) []events.Event {
return []events.Event{
{
InvolvedObject: node,
Type: v1.EventTypeWarning,
Reason: "FailedConsistencyCheck",
Message: message,
DedupeValues: []string{node.Name, message},
},
func CheckEvent(machine *v1alpha5.Machine, message string) events.Event {
return events.Event{
InvolvedObject: machine,
Type: v1.EventTypeWarning,
Reason: "FailedConsistencyCheck",
Message: message,
DedupeValues: []string{machine.Name, message},
}
}
96 changes: 0 additions & 96 deletions pkg/controllers/consistency/failedinit.go

This file was deleted.

39 changes: 11 additions & 28 deletions pkg/controllers/consistency/nodeshape.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,37 @@ import (
"context"
"fmt"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
)

// NodeShape detects nodes that have launched with 10% or less of any resource than was expected.
type NodeShape struct {
kubeClient client.Client
provider cloudprovider.CloudProvider
provider cloudprovider.CloudProvider
}

func NewNodeShape(kubeClient client.Client, provider cloudprovider.CloudProvider) Check {
func NewNodeShape(provider cloudprovider.CloudProvider) Check {
return &NodeShape{
kubeClient: kubeClient,
provider: provider,
provider: provider,
}
}

func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) {
// ignore nodes that are deleting
if !node.DeletionTimestamp.IsZero() {
func (n *NodeShape) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) {
// ignore machines that are deleting
if !machine.DeletionTimestamp.IsZero() {
return nil, nil
}
// and nodes that haven't initialized yet
if node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
// and machines that haven't initialized yet
if !machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() {
return nil, nil
}
provisioner := &v1alpha5.Provisioner{}
if err := n.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil {
// provisioner is missing, node should be removed soon
return nil, client.IgnoreNotFound(err)
}
instanceTypes, err := n.provider.GetInstanceTypes(ctx, provisioner)
if err != nil {
return nil, err
}
instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == node.Labels[v1.LabelInstanceTypeStable] })
if !ok {
return []Issue{Issue(fmt.Sprintf("instance type %q not found", node.Labels[v1.LabelInstanceTypeStable]))}, nil
}
var issues []Issue
for resourceName, expectedQuantity := range instanceType.Capacity {
for resourceName, expectedQuantity := range machine.Status.Capacity {
nodeQuantity, ok := node.Status.Capacity[resourceName]
if !ok && !expectedQuantity.IsZero() {
issues = append(issues, Issue(fmt.Sprintf("expected resource %s not found", resourceName)))
issues = append(issues, Issue(fmt.Sprintf("expected resource %q not found", resourceName)))
continue
}

Expand All @@ -75,6 +57,7 @@ func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) {
issues = append(issues, Issue(fmt.Sprintf("expected %s of resource %s, but found %s (%0.1f%% of expected)", expectedQuantity.String(),
resourceName, nodeQuantity.String(), pct*100)))
}

}
return issues, nil
}
Loading

0 comments on commit 62fe4ac

Please sign in to comment.