-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(kwok): prevent quitting when scaling down node group #6336
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,19 +25,19 @@ import ( | |
"log" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" | ||
|
||
apiv1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/serializer" | ||
"k8s.io/apimachinery/pkg/util/yaml" | ||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" | ||
"k8s.io/client-go/kubernetes" | ||
clientscheme "k8s.io/client-go/kubernetes/scheme" | ||
v1lister "k8s.io/client-go/listers/core/v1" | ||
klog "k8s.io/klog/v2" | ||
"k8s.io/klog/v2" | ||
) | ||
|
||
const ( | ||
|
@@ -155,13 +155,19 @@ func createNodegroups(nodes []*apiv1.Node, kubeClient kubernetes.Interface, kc * | |
} | ||
|
||
ngName := getNGName(nodes[i], kc) | ||
if ngName == "" { | ||
klog.Fatalf("%s '%s' for node '%s' not present in the manifest", | ||
kc.status.groupNodesBy, kc.status.key, | ||
nodes[i].GetName()) | ||
} | ||
|
||
if ngs[ngName] != nil { | ||
ngs[ngName].targetSize += 1 | ||
continue | ||
} | ||
|
||
ng := parseAnnotations(nodes[i], kc) | ||
ng.name = getNGName(nodes[i], kc) | ||
ng.name = ngName | ||
sanitizeNode(nodes[i]) | ||
prepareNode(nodes[i], ng.name) | ||
ng.nodeTemplate = nodes[i] | ||
|
@@ -250,6 +256,8 @@ func parseAnnotations(no *apiv1.Node, kc *KwokProviderConfig) *NodeGroup { | |
} | ||
} | ||
|
||
// getNGName returns the node group name of the given k8s node object. | ||
// Return empty string if no node group is found. | ||
func getNGName(no *apiv1.Node, kc *KwokProviderConfig) string { | ||
|
||
if no.GetAnnotations()[NGNameAnnotation] != "" { | ||
|
@@ -263,16 +271,8 @@ func getNGName(no *apiv1.Node, kc *KwokProviderConfig) string { | |
case "label": | ||
ngName = no.GetLabels()[kc.status.key] | ||
default: | ||
klog.Fatal("grouping criteria for nodes is not set (expected: 'annotation' or 'label')") | ||
} | ||
|
||
if ngName == "" { | ||
klog.Fatalf("%s '%s' for node '%s' not present in the manifest", | ||
kc.status.groupNodesBy, kc.status.key, | ||
no.GetName()) | ||
klog.Warning("grouping criteria for nodes is not set (expected: 'annotation' or 'label')") | ||
} | ||
|
||
ngName = fmt.Sprintf("%s-%v", ngName, time.Now().Unix()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it might be better to keep the node group name unchanged, especially in cases where nodes still remain in the cluster and CA is restarted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 275 is clearly a bug. The original intention was to ensure If we already have nodes in the cluster with matching nodegroup annotations/labels and we remove One solution can be to implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know what you think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Kwok provider will calculate the target sizes during startup, and the Cluster Autoscaler will scale these nodegroups to the appropriate size. I think it's not necessary to calculate the target size in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. kwok provider will clean all the fake node when CA quit: // Cleanup cleans up all resources before the cloud provider is removed
func (kwok *KwokCloudProvider) Cleanup() error {
for _, ng := range kwok.nodeGroups {
nodeNames, err := ng.getNodeNamesForNodeGroup()
if err != nil {
return fmt.Errorf("error cleaning up: %v", err)
}
for _, node := range nodeNames {
err := kwok.kubeClient.CoreV1().Nodes().Delete(context.Background(), node, v1.DeleteOptions{})
if err != nil {
klog.Errorf("error cleaning up kwok provider nodes '%v'", node)
}
}
}
return nil
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True. I was considering a case when
I think we might have to try this out to confirm if CA can handle such a situation. If you can try it out as a part of this PR, great. If not, we can take care of it in another issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @vadasambar, I have implemented the Refresh() function. Please take a look. |
||
|
||
return ngName | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,10 +81,9 @@ func (nodeGroup *NodeGroup) IncreaseSize(delta int) error { | |
if err != nil { | ||
return fmt.Errorf("couldn't create new node '%s': %v", node.Name, err) | ||
} | ||
nodeGroup.targetSize += 1 | ||
} | ||
|
||
nodeGroup.targetSize = newSize | ||
|
||
Comment on lines
+84
to
-87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a case in which some nodes are created successfully and some fail There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Should we add a test case around this (for both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
return nil | ||
} | ||
|
||
|
@@ -111,6 +110,7 @@ func (nodeGroup *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error { | |
if err != nil { | ||
return err | ||
} | ||
nodeGroup.targetSize -= 1 | ||
} | ||
return nil | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,19 +22,21 @@ import ( | |
"os" | ||
"strings" | ||
|
||
apiv1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" | ||
"k8s.io/autoscaler/cluster-autoscaler/config" | ||
"k8s.io/autoscaler/cluster-autoscaler/utils/errors" | ||
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu" | ||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" | ||
|
||
apiv1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/client-go/informers" | ||
kubeclient "k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/clientcmd" | ||
klog "k8s.io/klog/v2" | ||
"k8s.io/klog/v2" | ||
) | ||
|
||
// Name returns name of the cloud provider. | ||
|
@@ -123,24 +125,28 @@ func (kwok *KwokCloudProvider) GetNodeGpuConfig(node *apiv1.Node) *cloudprovider | |
|
||
// Refresh is called before every main loop and can be used to dynamically update cloud provider state. | ||
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). | ||
// TODO(vadasambar): implement this | ||
func (kwok *KwokCloudProvider) Refresh() error { | ||
|
||
// TODO(vadasambar): causes CA to not recognize kwok nodegroups | ||
// needs better implementation | ||
// nodeList, err := kwok.lister.List() | ||
// if err != nil { | ||
// return err | ||
// } | ||
allNodes, err := kwok.allNodesLister.List(labels.Everything()) | ||
if err != nil { | ||
klog.ErrorS(err, "failed to list all nodes from lister") | ||
return err | ||
} | ||
|
||
targetSizeInCluster := make(map[string]int) | ||
|
||
// ngs := []*NodeGroup{} | ||
// for _, no := range nodeList { | ||
// ng := parseAnnotationsToNodegroup(no) | ||
// ng.kubeClient = kwok.kubeClient | ||
// ngs = append(ngs, ng) | ||
// } | ||
for _, node := range allNodes { | ||
ngName := getNGName(node, kwok.config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will lead to You might have to use a filter function to filter only nodes which have the nodegroup label/annotation OR convert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. I converted |
||
if ngName == "" { | ||
continue | ||
} | ||
|
||
// kwok.nodeGroups = ngs | ||
targetSizeInCluster[ngName] += 1 | ||
} | ||
|
||
for _, ng := range kwok.nodeGroups { | ||
ng.targetSize = targetSizeInCluster[ng.Id()] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe something for the future: I wonder if we should delete nodes which have |
||
} | ||
|
||
return nil | ||
} | ||
|
@@ -245,6 +251,7 @@ func BuildKwokProvider(ko *kwokOptions) (*KwokCloudProvider, error) { | |
kubeClient: ko.kubeClient, | ||
resourceLimiter: ko.resourceLimiter, | ||
config: kwokConfig, | ||
allNodesLister: ko.allNodesLister, | ||
}, nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,17 +20,18 @@ import ( | |
"os" | ||
"testing" | ||
|
||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" | ||
"k8s.io/autoscaler/cluster-autoscaler/config" | ||
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu" | ||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" | ||
|
||
"github.com/stretchr/testify/assert" | ||
apiv1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider" | ||
"k8s.io/autoscaler/cluster-autoscaler/config" | ||
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu" | ||
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" | ||
"k8s.io/client-go/kubernetes/fake" | ||
v1lister "k8s.io/client-go/listers/core/v1" | ||
core "k8s.io/client-go/testing" | ||
|
@@ -156,6 +157,110 @@ func TestNodeGroups(t *testing.T) { | |
}) | ||
} | ||
|
||
func TestRefresh(t *testing.T) { | ||
fakeClient := &fake.Clientset{} | ||
var nodesFrom string | ||
fakeClient.Fake.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { | ||
getAction := action.(core.GetAction) | ||
|
||
if getAction == nil { | ||
return false, nil, nil | ||
} | ||
|
||
if getAction.GetName() == defaultConfigName { | ||
if nodesFrom == "configmap" { | ||
return true, &apiv1.ConfigMap{ | ||
Data: map[string]string{ | ||
configKey: testConfig, | ||
}, | ||
}, nil | ||
} | ||
|
||
return true, &apiv1.ConfigMap{ | ||
Data: map[string]string{ | ||
configKey: testConfigDynamicTemplates, | ||
}, | ||
}, nil | ||
|
||
} | ||
|
||
if getAction.GetName() == defaultTemplatesConfigName { | ||
if nodesFrom == "configmap" { | ||
return true, &apiv1.ConfigMap{ | ||
Data: map[string]string{ | ||
templatesKey: testTemplates, | ||
}, | ||
}, nil | ||
} | ||
} | ||
|
||
return true, nil, errors.NewNotFound(apiv1.Resource("configmaps"), "whatever") | ||
}) | ||
|
||
os.Setenv("POD_NAMESPACE", "kube-system") | ||
|
||
t.Run("refresh nodegroup target size", func(t *testing.T) { | ||
nodesFrom = "configmap" | ||
ngName := "kind-worker" | ||
fakeNodeLister := newTestAllNodeLister(map[string]*apiv1.Node{ | ||
"node1": { | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "node1", | ||
Labels: map[string]string{ | ||
"kwok-nodegroup": ngName, | ||
}, | ||
}, | ||
}, | ||
"node2": { | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "node2", | ||
Labels: map[string]string{ | ||
"kwok-nodegroup": ngName, | ||
}, | ||
}, | ||
}, | ||
"node3": { | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "node3", | ||
Labels: map[string]string{ | ||
"kwok-nodegroup": ngName, | ||
}, | ||
}, | ||
}, | ||
"node4": { | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "node4", | ||
}, | ||
}, | ||
}) | ||
|
||
ko := &kwokOptions{ | ||
kubeClient: fakeClient, | ||
autoscalingOpts: &config.AutoscalingOptions{}, | ||
discoveryOpts: &cloudprovider.NodeGroupDiscoveryOptions{}, | ||
resourceLimiter: cloudprovider.NewResourceLimiter( | ||
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, | ||
map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}), | ||
allNodesLister: fakeNodeLister, | ||
ngNodeListerFn: testNodeLister, | ||
} | ||
|
||
p, err := BuildKwokProvider(ko) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, p) | ||
|
||
err = p.Refresh() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, it was a misunderstanding on my end. I ran the test again. Looks good to me. |
||
assert.Nil(t, err) | ||
for _, ng := range p.NodeGroups() { | ||
if ng.Id() == ngName { | ||
targetSize, err := ng.TargetSize() | ||
assert.Nil(t, err) | ||
assert.Equal(t, 3, targetSize) | ||
} | ||
} | ||
}) | ||
} | ||
|
||
func TestGetResourceLimiter(t *testing.T) { | ||
fakeClient := &fake.Clientset{} | ||
fakeClient.Fake.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { | ||
|
@@ -639,6 +744,40 @@ func TestNodeGroupForNode(t *testing.T) { | |
assert.Contains(t, ng.Id(), "kind-worker") | ||
}) | ||
|
||
t.Run("empty nodegroup name for node", func(t *testing.T) { | ||
nodesFrom = "configmap" | ||
fakeNodeLister := newTestAllNodeLister(map[string]*apiv1.Node{}) | ||
|
||
ko := &kwokOptions{ | ||
kubeClient: fakeClient, | ||
autoscalingOpts: &config.AutoscalingOptions{}, | ||
discoveryOpts: &cloudprovider.NodeGroupDiscoveryOptions{}, | ||
resourceLimiter: cloudprovider.NewResourceLimiter( | ||
map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, | ||
map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}), | ||
allNodesLister: fakeNodeLister, | ||
ngNodeListerFn: testNodeLister, | ||
} | ||
|
||
p, err := BuildKwokProvider(ko) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, p) | ||
assert.Len(t, p.nodeGroups, 1) | ||
assert.Contains(t, p.nodeGroups[0].Id(), "kind-worker") | ||
|
||
testNode := &apiv1.Node{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "node-without-labels", | ||
}, | ||
Spec: apiv1.NodeSpec{ | ||
ProviderID: "kwok:random-instance-id", | ||
}, | ||
} | ||
ng, err := p.NodeGroupForNode(testNode) | ||
assert.NoError(t, err) | ||
assert.Nil(t, ng) | ||
}) | ||
|
||
} | ||
|
||
func TestBuildKwokProvider(t *testing.T) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍