Skip to content

Commit

Permalink
Parallelize cluster snapshot creation
Browse files Browse the repository at this point in the history
  • Loading branch information
macsko committed Dec 20, 2024
1 parent da31dff commit 60477f7
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 21 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ type AutoscalingOptions struct {
ForceDeleteLongUnregisteredNodes bool
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
DynamicResourceAllocationEnabled bool
// ClusterSnapshotParallelization is the maximum parallelization of cluster snapshot creation.
ClusterSnapshotParallelization int
}

// KubeClientOptions specify options for kube client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore())
},
"delta": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore())
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16))
},
}
for snapshotName, snapshotFactory := range snapshots {
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ var (
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
clusterSnapshotParallelization = flag.Int("cluster-snapshot-parallelization", 16, "Maximum parallelization of cluster snapshot creation.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -461,6 +462,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
ClusterSnapshotParallelization: *clusterSnapshotParallelization,
}
}

Expand Down Expand Up @@ -506,7 +508,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
FrameworkHandle: fwHandle,
ClusterSnapshot: predicate.NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled),
ClusterSnapshot: predicate.NewPredicateSnapshot(store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelization), fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled),
KubeClient: kubeClient,
InformerFactory: informerFactory,
DebuggingSnapshotter: debuggingSnapshotter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore())
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
ctx := context.AutoscalingContext{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){
if err != nil {
return nil, err
}
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, true), nil
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil
},
}

Expand Down
102 changes: 87 additions & 15 deletions cluster-autoscaler/simulator/clustersnapshot/store/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package store

import (
"context"
"fmt"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
Expand All @@ -44,7 +46,8 @@ import (
// pod affinity - causes scheduler framework to list pods with non-empty selector,
// so basic caching doesn't help.
type DeltaSnapshotStore struct {
data *internalDeltaSnapshotData
data *internalDeltaSnapshotData
parallelization int
}

type deltaSnapshotStoreNodeLister DeltaSnapshotStore
Expand Down Expand Up @@ -137,10 +140,14 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework
return nodeInfoList
}

func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error {
func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) (*schedulerframework.NodeInfo, error) {
nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(node)
return data.addNodeInfo(nodeInfo)
err := data.addNodeInfo(nodeInfo)
if err != nil {
return nil, err
}
return nodeInfo, nil
}

func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
Expand Down Expand Up @@ -241,6 +248,24 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
return nil
}

func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error {
ni.AddPod(pod)

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error {
for _, pod := range pods {
ni.AddPod(pod)
}

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
// This always clones node info, even if the pod is actually missing.
// Not sure if we mind, since removing non-existent pod
Expand Down Expand Up @@ -403,8 +428,10 @@ func (snapshot *DeltaSnapshotStore) DeviceClasses() schedulerframework.DeviceCla
}

// NewDeltaSnapshotStore creates instances of DeltaSnapshotStore.
func NewDeltaSnapshotStore() *DeltaSnapshotStore {
snapshot := &DeltaSnapshotStore{}
func NewDeltaSnapshotStore(parallelization int) *DeltaSnapshotStore {
snapshot := &DeltaSnapshotStore{
parallelization: parallelization,
}
snapshot.clear()
return snapshot
}
Expand All @@ -417,7 +444,7 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot {

// AddSchedulerNodeInfo adds a NodeInfo.
func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
if err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
if _, err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
return err
}
for _, podInfo := range nodeInfo.Pods {
Expand All @@ -428,24 +455,69 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram
return nil
}

// setClusterStatePodsSequential sets the pods in cluster state in a sequential way.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
for _, pod := range scheduledPods {
if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok {
if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil {
return err
}
}
}
return nil
}

// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelization value.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
podsForNode := make([][]*apiv1.Pod, len(nodeInfos))
for _, pod := range scheduledPods {
nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]
if !ok {
continue
}
podsForNode[nodeIdx] = append(podsForNode[nodeIdx], pod)
}

ctx := context.Background()
ctx, cancel := context.WithCancelCause(ctx)

workqueue.ParallelizeUntil(ctx, snapshot.parallelization, len(nodeInfos), func(nodeIdx int) {
err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx])
if err != nil {
cancel(err)
}
})

return context.Cause(ctx)
}

// SetClusterState sets the cluster state.
func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error {
snapshot.clear()

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.data.addNode(node); err != nil {
nodeNameToIdx := make(map[string]int, len(nodes))
nodeInfos := make([]*schedulerframework.NodeInfo, len(nodes))
for i, node := range nodes {
nodeInfo, err := snapshot.data.addNode(node)
if err != nil {
return err
}
knownNodes[node.Name] = true
nodeNameToIdx[node.Name] = i
nodeInfos[i] = nodeInfo
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil {
return err
}

if snapshot.parallelization > 1 {
err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
} else {
err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
}

// TODO(DRA): Save DRA snapshot.
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
for _, tc := range testCases {
b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000)
deltaStore := NewDeltaSnapshotStore()
deltaStore := NewDeltaSnapshotStore(16)
if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}
Expand All @@ -70,7 +70,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
for _, tc := range testCases {
b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount)
deltaStore := NewDeltaSnapshotStore()
deltaStore := NewDeltaSnapshotStore(16)
if err := deltaStore.SetClusterState(nodes, nil, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}
Expand Down

0 comments on commit 60477f7

Please sign in to comment.