Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle providerID changing in cluster state #166

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pkg/apis/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ const (
TerminationFinalizer = Group + "/termination"
)

// Tags for infrastructure resources deployed into Cloud Provider's accounts
const (
DiscoveryTagKey = Group + "/discovery"
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
ManagedByTagKey = Group + "/managed-by"
)

var (
// RestrictedLabelDomains are either prohibited by the kubelet or reserved by karpenter
RestrictedLabelDomains = sets.NewString(
Expand Down
41 changes: 28 additions & 13 deletions pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,18 @@ func (c *Cluster) UpdateNode(ctx context.Context, node *v1.Node) error {
return nil
}

func (c *Cluster) DeleteNode(nodeName string) {
func (c *Cluster) DeleteNode(name string) {
c.mu.Lock()
defer c.mu.Unlock()

if id := c.nameToProviderID[nodeName]; id != "" {
delete(c.nodes, id)
delete(c.nameToProviderID, nodeName)
c.RecordConsolidationChange()
}
c.cleanupNode(name)
}

// UpdatePod is called every time the pod is reconciled
func (c *Cluster) UpdatePod(ctx context.Context, pod *v1.Pod) error {
c.mu.Lock()
defer c.mu.Unlock()

var err error
if podutils.IsTerminal(pod) {
c.updateNodeUsageFromPodCompletion(client.ObjectKeyFromObject(pod))
Expand All @@ -190,6 +189,9 @@ func (c *Cluster) UpdatePod(ctx context.Context, pod *v1.Pod) error {

// DeletePod is called when the pod has been deleted
func (c *Cluster) DeletePod(podKey types.NamespacedName) {
c.mu.Lock()
defer c.mu.Unlock()

c.antiAffinityPods.Delete(podKey)
c.updateNodeUsageFromPodCompletion(podKey)
c.RecordConsolidationChange()
Expand Down Expand Up @@ -224,6 +226,11 @@ func (c *Cluster) Reset() {
c.antiAffinityPods = sync.Map{}
}

// WARNING
// Everything under this section of code assumes that you have already held a lock when you are calling into these functions
// and explicitly modifying the cluster state. If you do not hold the cluster state lock before calling any of these helpers
// you will hit race conditions and data corruption

func (c *Cluster) newStateFromNode(ctx context.Context, node *v1.Node, oldNode *Node) (*Node, error) {
if oldNode == nil {
oldNode = &Node{
Expand All @@ -250,10 +257,24 @@ func (c *Cluster) newStateFromNode(ctx context.Context, node *v1.Node, oldNode *
); err != nil {
return nil, err
}
// Cleanup the old node with its old providerID if its providerID changes
// This can happen since nodes don't get created with providerIDs. Rather, CCM picks up the
// created node and injects the providerID into the spec.providerID
if id, ok := c.nameToProviderID[node.Name]; ok && id != node.Spec.ProviderID {
c.cleanupNode(node.Name)
}
c.triggerConsolidationOnChange(oldNode, n)
return n, nil
}

func (c *Cluster) cleanupNode(name string) {
if id, ok := c.nameToProviderID[name]; ok {
delete(c.nodes, id)
delete(c.nameToProviderID, name)
c.RecordConsolidationChange()
}
}

func (c *Cluster) populateStartupTaints(ctx context.Context, n *Node) error {
if !n.Owned() {
return nil
Expand Down Expand Up @@ -315,7 +336,7 @@ func (c *Cluster) populateResourceRequests(ctx context.Context, n *Node) error {
}
c.cleanupOldBindings(pod)
n.updateForPod(ctx, pod)
c.bindings[client.ObjectKeyFromObject(pod)] = pod.Spec.NodeName // TODO @joinnis: Potentially change this later
c.bindings[client.ObjectKeyFromObject(pod)] = pod.Spec.NodeName
}
return nil
}
Expand All @@ -328,9 +349,6 @@ func (c *Cluster) updateNodeUsageFromPod(ctx context.Context, pod *v1.Pod) error
return nil
}

c.mu.Lock()
defer c.mu.Unlock()

n, ok := c.nodes[c.nameToProviderID[pod.Spec.NodeName]]
if !ok {
// the node must exist for us to update the resource requests on the node
Expand All @@ -343,9 +361,6 @@ func (c *Cluster) updateNodeUsageFromPod(ctx context.Context, pod *v1.Pod) error
}

func (c *Cluster) updateNodeUsageFromPodCompletion(podKey types.NamespacedName) {
c.mu.Lock()
defer c.mu.Unlock()

nodeName, bindingKnown := c.bindings[podKey]
if !bindingKnown {
// we didn't think the pod was bound, so we weren't tracking it and don't need to do anything
Expand Down
26 changes: 26 additions & 0 deletions pkg/controllers/state/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,22 @@ var _ = Describe("Node Resource Level", func() {
time.Sleep(time.Second * 6) // past 10s, node should no longer be nominated
Expect(ExpectStateNodeExists(node).Nominated()).To(BeFalse())
})
It("should handle a node changing from no providerID to registering a providerID", func() {
node := test.Node()
ExpectApplied(ctx, env.Client, node)
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))

ExpectStateNodeCount("==", 1)
ExpectStateNodeExists(node)

// Change the providerID; this mocks CCM adding the providerID onto the node after registration
node.Spec.ProviderID = fmt.Sprintf("fake://%s", node.Name)
ExpectApplied(ctx, env.Client, node)
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))

ExpectStateNodeCount("==", 1)
ExpectStateNodeExists(node)
})
})

var _ = Describe("Pod Anti-Affinity", func() {
Expand Down Expand Up @@ -802,6 +818,16 @@ var _ = Describe("Provisioner Spec Updates", func() {
})
})

func ExpectStateNodeCount(comparator string, count int) int {
c := 0
cluster.ForEachNode(func(n *state.Node) bool {
c++
return true
})
ExpectWithOffset(1, count).To(BeNumerically(comparator, count))
return c
}

func ExpectStateNodeExistsWithOffset(offset int, node *v1.Node) *state.Node {
var ret *state.Node
cluster.ForEachNode(func(n *state.Node) bool {
Expand Down