Skip to content

Commit

Permalink
Revert "Revert machine migration changes (kubernetes-sigs#176) (kuber…
Browse files Browse the repository at this point in the history
…netes-sigs#241)"

This reverts commit 9973eac.
  • Loading branch information
jonathan-innis committed Apr 14, 2023
1 parent b915d90 commit 2a655b3
Show file tree
Hide file tree
Showing 40 changed files with 492 additions and 796 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
k8s.io/csi-translation-lib v0.25.4
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2
knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a
sigs.k8s.io/controller-runtime v0.13.1
sigs.k8s.io/controller-runtime v0.13.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,8 @@ knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a/go.mod h1:fckNBPf9bu5/p1RbnOh
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE3xSlg=
sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/controller-runtime v0.13.0 h1:iqa5RNciy7ADWnIc8QxCbOX5FEKVR3uxVxKHRMc2WIQ=
sigs.k8s.io/controller-runtime v0.13.0/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (
break
}
}
name := test.RandomName()
created := &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: machine.Name,
Labels: lo.Assign(labels, machine.Labels),
Annotations: machine.Annotations,
},
Expand Down
38 changes: 24 additions & 14 deletions pkg/controllers/consistency/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/events"
corecontroller "github.com/aws/karpenter-core/pkg/operator/controller"
machineutil "github.com/aws/karpenter-core/pkg/utils/machine"
)

var _ corecontroller.TypedController[*v1.Node] = (*Controller)(nil)
var _ corecontroller.TypedController[*v1alpha5.Machine] = (*Controller)(nil)

type Controller struct {
clock clock.Clock
Expand All @@ -52,7 +54,7 @@ type Issue string
type Check interface {
// Check performs the consistency check, this should return a list of slice discovered, or an empty
// slice if no issues were found
Check(context.Context, *v1.Node) ([]Issue, error)
Check(context.Context, *v1.Node, *v1alpha5.Machine) ([]Issue, error)
}

// scanPeriod is how often we inspect and report issues that are found.
Expand All @@ -61,15 +63,15 @@ const scanPeriod = 10 * time.Minute
func NewController(clk clock.Clock, kubeClient client.Client, recorder events.Recorder,
provider cloudprovider.CloudProvider) corecontroller.Controller {

return corecontroller.Typed[*v1.Node](kubeClient, &Controller{
return corecontroller.Typed[*v1alpha5.Machine](kubeClient, &Controller{
clock: clk,
kubeClient: kubeClient,
recorder: recorder,
lastScanned: cache.New(scanPeriod, 1*time.Minute),
checks: []Check{
NewFailedInit(clk, kubeClient, provider),
NewFailedInit(clk, provider),
NewTermination(kubeClient),
NewNodeShape(kubeClient, provider),
NewNodeShape(provider),
}},
)
}
Expand All @@ -78,39 +80,47 @@ func (c *Controller) Name() string {
return "consistency"
}

func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Result, error) {
if node.Labels[v1alpha5.ProvisionerNameLabelKey] == "" {
func (c *Controller) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) {
if machine.Status.ProviderID == "" {
return reconcile.Result{}, nil
}
// If we get an event before we should check for consistency checks, we ignore and wait
if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(node).String()); ok {
// If we get an event before we should check for inflight checks, we ignore and wait
if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(machine).String()); ok {
if lastTime, ok := lastTime.(time.Time); ok {
remaining := scanPeriod - c.clock.Since(lastTime)
return reconcile.Result{RequeueAfter: remaining}, nil
}
// the above should always succeed
return reconcile.Result{RequeueAfter: scanPeriod}, nil
}
c.lastScanned.SetDefault(client.ObjectKeyFromObject(node).String(), c.clock.Now())
c.lastScanned.SetDefault(client.ObjectKeyFromObject(machine).String(), c.clock.Now())

node, err := machineutil.NodeForMachine(ctx, c.kubeClient, machine)
if err != nil {
return reconcile.Result{}, machineutil.IgnoreDuplicateNodeError(machineutil.IgnoreNodeNotFoundError(err))
}
for _, check := range c.checks {
issues, err := check.Check(ctx, node)
issues, err := check.Check(ctx, node, machine)
if err != nil {
return reconcile.Result{}, fmt.Errorf("checking node with %T, %w", check, err)
}
for _, issue := range issues {
logging.FromContext(ctx).Errorf("check failed, %s", issue)
consistencyErrors.With(prometheus.Labels{checkLabel: reflect.TypeOf(check).Elem().Name()}).Inc()
c.recorder.Publish(CheckEvent(node, string(issue))...)
c.recorder.Publish(CheckEvent(node, machine, string(issue))...)
}
}
return reconcile.Result{RequeueAfter: scanPeriod}, nil
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder {
func (c *Controller) Builder(ctx context.Context, m manager.Manager) corecontroller.Builder {
return corecontroller.Adapt(controllerruntime.
NewControllerManagedBy(m).
For(&v1.Node{}).
For(&v1alpha5.Machine{}).
Watches(
&source.Kind{Type: &v1.Node{}},
machineutil.NodeEventHandler(ctx, c.kubeClient),
).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}),
)
}
10 changes: 9 additions & 1 deletion pkg/controllers/consistency/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ package consistency
import (
v1 "k8s.io/api/core/v1"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/events"
)

func CheckEvent(node *v1.Node, message string) []events.Event {
func CheckEvent(node *v1.Node, machine *v1alpha5.Machine, message string) []events.Event {
return []events.Event{
{
InvolvedObject: node,
Expand All @@ -29,5 +30,12 @@ func CheckEvent(node *v1.Node, message string) []events.Event {
Message: message,
DedupeValues: []string{node.Name, message},
},
{
InvolvedObject: machine,
Type: v1.EventTypeWarning,
Reason: "FailedConsistencyCheck",
Message: message,
DedupeValues: []string{machine.Name, message},
},
}
}
46 changes: 13 additions & 33 deletions pkg/controllers/consistency/failedinit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ import (
"fmt"
"time"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/controllers/node"
"github.com/aws/karpenter-core/pkg/controllers/machine/lifecycle"
)

// initFailureTime is the time after which we start reporting a node as having failed to initialize. This is set
Expand All @@ -37,51 +34,34 @@ const initFailureTime = time.Hour
// FailedInit detects nodes that fail to initialize within an hour and reports the reason for the initialization
// failure
type FailedInit struct {
clock clock.Clock
kubeClient client.Client
provider cloudprovider.CloudProvider
clock clock.Clock
provider cloudprovider.CloudProvider
}

func NewFailedInit(clk clock.Clock, kubeClient client.Client, provider cloudprovider.CloudProvider) Check {
return &FailedInit{clock: clk, kubeClient: kubeClient, provider: provider}
func NewFailedInit(clk clock.Clock, provider cloudprovider.CloudProvider) Check {
return &FailedInit{clock: clk, provider: provider}
}

func (f FailedInit) Check(ctx context.Context, n *v1.Node) ([]Issue, error) {
// ignore nodes that are deleting
if !n.DeletionTimestamp.IsZero() {
func (f FailedInit) Check(_ context.Context, node *v1.Node, m *v1alpha5.Machine) ([]Issue, error) {
// ignore machines that are deleting
if !m.DeletionTimestamp.IsZero() {
return nil, nil
}

nodeAge := f.clock.Since(n.CreationTimestamp.Time)
// n is already initialized or not old enough
if n.Labels[v1alpha5.LabelNodeInitialized] == "true" || nodeAge < initFailureTime {
// machine is already initialized or isn't old enough
if m.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() ||
f.clock.Now().Before(m.CreationTimestamp.Time.Add(initFailureTime)) {
return nil, nil
}
provisioner := &v1alpha5.Provisioner{}
if err := f.kubeClient.Get(ctx, types.NamespacedName{Name: n.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil {
// provisioner is missing, node should be removed soon
return nil, client.IgnoreNotFound(err)
}
instanceTypes, err := f.provider.GetInstanceTypes(ctx, provisioner)
if err != nil {
return nil, err
}
instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == n.Labels[v1.LabelInstanceTypeStable] })
if !ok {
return []Issue{Issue(fmt.Sprintf("instance type %q not found", n.Labels[v1.LabelInstanceTypeStable]))}, nil
}

// detect startup taints which should be removed
var result []Issue
if taint, ok := node.IsStartupTaintRemoved(n, provisioner); !ok {
if taint, ok := lifecycle.IsStartupTaintRemoved(node, m); !ok {
result = append(result, Issue(fmt.Sprintf("startup taint %q is still on the node", formatTaint(taint))))
}

// and extended resources which never registered
if resource, ok := node.IsExtendedResourceRegistered(n, instanceType); !ok {
if resource, ok := lifecycle.RequestedResourcesRegistered(node, m); !ok {
result = append(result, Issue(fmt.Sprintf("expected resource %q didn't register on the node", resource)))
}

return result, nil
}

Expand Down
39 changes: 11 additions & 28 deletions pkg/controllers/consistency/nodeshape.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,37 @@ import (
"context"
"fmt"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
)

// NodeShape detects nodes that have launched with 10% or less of any resource than was expected.
type NodeShape struct {
kubeClient client.Client
provider cloudprovider.CloudProvider
provider cloudprovider.CloudProvider
}

func NewNodeShape(kubeClient client.Client, provider cloudprovider.CloudProvider) Check {
func NewNodeShape(provider cloudprovider.CloudProvider) Check {
return &NodeShape{
kubeClient: kubeClient,
provider: provider,
provider: provider,
}
}

func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) {
// ignore nodes that are deleting
if !node.DeletionTimestamp.IsZero() {
func (n *NodeShape) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) {
// ignore machines that are deleting
if !machine.DeletionTimestamp.IsZero() {
return nil, nil
}
// and nodes that haven't initialized yet
if node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
// and machines that haven't initialized yet
if machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() {
return nil, nil
}
provisioner := &v1alpha5.Provisioner{}
if err := n.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil {
// provisioner is missing, node should be removed soon
return nil, client.IgnoreNotFound(err)
}
instanceTypes, err := n.provider.GetInstanceTypes(ctx, provisioner)
if err != nil {
return nil, err
}
instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == node.Labels[v1.LabelInstanceTypeStable] })
if !ok {
return []Issue{Issue(fmt.Sprintf("instance type %q not found", node.Labels[v1.LabelInstanceTypeStable]))}, nil
}
var issues []Issue
for resourceName, expectedQuantity := range instanceType.Capacity {
for resourceName, expectedQuantity := range machine.Status.Capacity {
nodeQuantity, ok := node.Status.Capacity[resourceName]
if !ok && !expectedQuantity.IsZero() {
issues = append(issues, Issue(fmt.Sprintf("expected resource %s not found", resourceName)))
issues = append(issues, Issue(fmt.Sprintf("expected resource \"%s\" not found", resourceName)))
continue
}

Expand All @@ -75,6 +57,7 @@ func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) {
issues = append(issues, Issue(fmt.Sprintf("expected %s of resource %s, but found %s (%0.1f%% of expected)", expectedQuantity.String(),
resourceName, nodeQuantity.String(), pct*100)))
}

}
return issues, nil
}
18 changes: 6 additions & 12 deletions pkg/controllers/consistency/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,10 @@ var _ = Describe("Controller", func() {
}
ExpectApplied(ctx, env.Client, provisioner, machine, node)
fakeClock.Step(2 * time.Hour)
ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(machine))
Expect(recorder.DetectedEvent("expected resource \"fake.com/vendor-a\" didn't register on the node")).To(BeTrue())
})
It("should detect issues with nodes that have a startup taint which isn't removed", func() {
provisioner.Spec.StartupTaints = []v1.Taint{
{
Key: "my.startup.taint",
Effect: v1.TaintEffectNoSchedule,
},
}
machine, node := test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -160,7 +154,7 @@ var _ = Describe("Controller", func() {
})
ExpectApplied(ctx, env.Client, provisioner, machine, node)
fakeClock.Step(2 * time.Hour)
ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(machine))
Expect(recorder.DetectedEvent("startup taint \"my.startup.taint:NoSchedule\" is still on the node")).To(BeTrue())
})
})
Expand Down Expand Up @@ -188,12 +182,12 @@ var _ = Describe("Controller", func() {
Labels: podsLabels,
MaxUnavailable: &intstr.IntOrString{IntVal: 0, Type: intstr.Int},
})
node.Finalizers = []string{"prevent.deletion/now"}
machine.Finalizers = []string{"prevent.deletion/now"}
p := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: podsLabels}})
ExpectApplied(ctx, env.Client, provisioner, machine, node, p, pdb)
ExpectManualBinding(ctx, env.Client, p, node)
_ = env.Client.Delete(ctx, node)
ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(node))
_ = env.Client.Delete(ctx, machine)
ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(machine))
Expect(recorder.DetectedEvent(fmt.Sprintf("can't drain node, PDB %s/%s is blocking evictions", pdb.Namespace, pdb.Name))).To(BeTrue())
})
})
Expand Down Expand Up @@ -223,7 +217,7 @@ var _ = Describe("Controller", func() {
v1.ResourcePods: resource.MustParse("10"),
}
ExpectApplied(ctx, env.Client, provisioner, machine, node)
ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(machine))
Expect(recorder.DetectedEvent("expected 128Gi of resource memory, but found 64Gi (50.0% of expected)")).To(BeTrue())
})
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/consistency/termination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/controllers/deprovisioning"
nodeutils "github.com/aws/karpenter-core/pkg/utils/node"
)
Expand All @@ -36,9 +37,9 @@ func NewTermination(kubeClient client.Client) Check {
}
}

func (t *Termination) Check(ctx context.Context, node *v1.Node) ([]Issue, error) {
func (t *Termination) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) {
// we are only looking at nodes that are hung deleting
if node.DeletionTimestamp.IsZero() {
if machine.DeletionTimestamp.IsZero() {
return nil, nil
}
pdbs, err := deprovisioning.NewPDBLimits(ctx, t.kubeClient)
Expand Down
Loading

0 comments on commit 2a655b3

Please sign in to comment.