Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: LastOrdinal based on replicas instead of FreeCap #8388

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type VPodLister func() ([]VPod, error)
// Evictor allows for vreplicas to be evicted.
// For instance, the evictor is used by the statefulset scheduler to
// move vreplicas to pod with a lower ordinal.
//
// pod might be `nil`.
type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) error

// Scheduler is responsible for placing VPods into real Kubernetes pods
Expand Down
56 changes: 23 additions & 33 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ type StateAccessor interface {
// It is used by for the scheduler and the autoscaler
type State struct {
// free tracks the free capacity of each pod.
//
// Including pods that might not exist anymore, it reflects the free capacity determined by
// placements in the vpod status.
FreeCap []int32

// schedulable pods tracks the pods that aren't being evicted.
SchedulablePods []int32

// LastOrdinal is the ordinal index corresponding to the last statefulset replica
// with placed vpods.
LastOrdinal int32

// Pod capacity.
Capacity int32

Expand Down Expand Up @@ -143,14 +142,10 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
return nil, err
}

free := make([]int32, 0)
freeCap := make([]int32, 0)
pending := make(map[types.NamespacedName]int32, 4)
expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods))
schedulablePods := sets.NewInt32()
last := int32(-1)

// keep track of (vpod key, podname) pairs with existing placements
withPlacement := make(map[types.NamespacedName]map[string]bool)

podSpread := make(map[types.NamespacedName]map[string]int32)

Expand All @@ -172,7 +167,7 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
freeCap = s.updateFreeCapacity(logger, freeCap, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
Expand All @@ -182,16 +177,13 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
pending[vpod.GetKey()] = pendingFromVPod(vpod)
expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas()

withPlacement[vpod.GetKey()] = make(map[string]bool)
podSpread[vpod.GetKey()] = make(map[string]int32)

for i := 0; i < len(ps); i++ {
podName := ps[i].PodName
vreplicas := ps[i].VReplicas

free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas)

withPlacement[vpod.GetKey()][podName] = true
freeCap = s.updateFreeCapacity(logger, freeCap, podName, vreplicas)

pod, err := s.podLister.Get(podName)
if err != nil {
Expand All @@ -204,8 +196,17 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
}
}

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}
state := &State{
FreeCap: freeCap,
SchedulablePods: schedulablePods.List(),
Capacity: s.capacity,
Replicas: scale.Spec.Replicas,
StatefulSetName: s.statefulSetName,
PodLister: s.podLister,
PodSpread: podSpread,
Pending: pending,
ExpectedVReplicaByVPod: expectedVReplicasByVPod,
}

logger.Infow("cluster state info", zap.Any("state", state))

Expand All @@ -219,23 +220,19 @@ func pendingFromVPod(vpod scheduler.VPod) int32 {
return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, freeCap []int32, podName string, vreplicas int32) []int32 {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)
freeCap = grow(freeCap, ordinal, s.capacity)

free[ordinal] -= vreplicas
freeCap[ordinal] -= vreplicas

// Assert the pod is not overcommitted
if free[ordinal] < 0 {
if overcommit := freeCap[ordinal]; overcommit < 0 {
// This should not happen anymore. Log as an error but do not interrupt the current scheduling.
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last {
last = ordinal
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("overcommit", overcommit))
}

return free, last
return freeCap
}

func (s *State) TotalPending() int32 {
Expand Down Expand Up @@ -283,23 +280,16 @@ func (s *State) MarshalJSON() ([]byte, error) {
type S struct {
FreeCap []int32 `json:"freeCap"`
SchedulablePods []int32 `json:"schedulablePods"`
LastOrdinal int32 `json:"lastOrdinal"`
Capacity int32 `json:"capacity"`
Replicas int32 `json:"replicas"`
NumZones int32 `json:"numZones"`
NumNodes int32 `json:"numNodes"`
NodeToZoneMap map[string]string `json:"nodeToZoneMap"`
StatefulSetName string `json:"statefulSetName"`
PodSpread map[string]map[string]int32 `json:"podSpread"`
NodeSpread map[string]map[string]int32 `json:"nodeSpread"`
ZoneSpread map[string]map[string]int32 `json:"zoneSpread"`
Pending map[string]int32 `json:"pending"`
}

sj := S{
FreeCap: s.FreeCap,
SchedulablePods: s.SchedulablePods,
LastOrdinal: s.LastOrdinal,
Capacity: s.Capacity,
Replicas: s.Replicas,
StatefulSetName: s.StatefulSetName,
Expand Down
14 changes: 7 additions & 7 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func TestStateBuilder(t *testing.T) {
name: "no vpods",
replicas: int32(0),
vpods: [][]duckv1alpha1.Placement{},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}},
freec: int32(0),
},
{
name: "one vpods",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand All @@ -88,7 +88,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 2}},
{{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 2}},
{{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-2": 5,
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 0}},
{{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, Replicas: 4, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestStateBuilder(t *testing.T) {
name: "three vpods but one tainted and one with no zone label",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand All @@ -207,7 +207,7 @@ func TestStateBuilder(t *testing.T) {
name: "one vpod (HA)",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down
18 changes: 5 additions & 13 deletions pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/integer"
Expand Down Expand Up @@ -250,17 +251,8 @@
zap.Any("state", s),
)

// when there is only one pod there is nothing to move or number of pods is just enough!
if s.LastOrdinal < 1 || len(s.SchedulablePods) <= 1 {
return nil
}

// Determine if there is enough free capacity to
// move all vreplicas placed in the last pod to pods with a lower ordinal
freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal)
usedInLastPod := s.Capacity - s.Free(s.LastOrdinal)

if freeCapacity >= usedInLastPod {
// Determine if there are vpods that need compaction
if s.Replicas != int32(len(s.FreeCap)) {

Check failure on line 255 in pkg/scheduler/statefulset/autoscaler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

G115: integer overflow conversion int -> int32 (gosec)
a.lastCompactAttempt = time.Now()
err := a.compact(s)
if err != nil {
Expand All @@ -285,9 +277,9 @@
for i := len(placements) - 1; i >= 0; i-- { //start from the last placement
ordinal := st.OrdinalFromPodName(placements[i].PodName)

if ordinal == s.LastOrdinal {
if ordinal >= s.Replicas {
pod, err = s.PodLister.Get(placements[i].PodName)
if err != nil {
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err)
}

Expand Down
47 changes: 43 additions & 4 deletions pkg/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,7 @@ func TestCompactor(t *testing.T) {
{PodName: "statefulset-name-0", VReplicas: int32(8)},
{PodName: "statefulset-name-1", VReplicas: int32(2)}}),
},
wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{
{Name: "vpod-1", Namespace: testNs}: {{PodName: "statefulset-name-1", VReplicas: int32(2)}},
},
wantEvictions: nil,
},
{
name: "multiple vpods, with placements in multiple pods, compacted",
Expand Down Expand Up @@ -500,8 +498,49 @@ func TestCompactor(t *testing.T) {
{PodName: "statefulset-name-0", VReplicas: int32(2)},
{PodName: "statefulset-name-2", VReplicas: int32(7)}}),
},
wantEvictions: nil,
},
{
name: "multiple vpods, scale down, with placements in multiple pods, compacted",
replicas: int32(2),
vpods: []scheduler.VPod{
tscheduler.NewVPod(testNs, "vpod-1", 10, []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: int32(3)},
{PodName: "statefulset-name-1", VReplicas: int32(7)}}),
tscheduler.NewVPod(testNs, "vpod-2", 10, []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: int32(7)},
{PodName: "statefulset-name-2", VReplicas: int32(3)}}),
},
wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{
{Name: "vpod-2", Namespace: testNs}: {{PodName: "statefulset-name-2", VReplicas: int32(7)}},
{Name: "vpod-2", Namespace: testNs}: {{PodName: "statefulset-name-2", VReplicas: int32(3)}},
},
},
{
name: "multiple vpods, scale down multiple, with placements in multiple pods, compacted",
replicas: int32(1),
vpods: []scheduler.VPod{
tscheduler.NewVPod(testNs, "vpod-1", 6, []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: int32(3)},
{PodName: "statefulset-name-1", VReplicas: int32(7)},
{PodName: "statefulset-name-2", VReplicas: int32(6)},
}),
tscheduler.NewVPod(testNs, "vpod-2", 3, []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: int32(7)},
{PodName: "statefulset-name-2", VReplicas: int32(3)},
{PodName: "statefulset-name-3", VReplicas: int32(2)},
{PodName: "statefulset-name-10", VReplicas: int32(1)},
}),
},
wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{
{Name: "vpod-1", Namespace: testNs}: {
{PodName: "statefulset-name-2", VReplicas: int32(6)},
{PodName: "statefulset-name-1", VReplicas: int32(7)},
},
{Name: "vpod-2", Namespace: testNs}: {
{PodName: "statefulset-name-10", VReplicas: int32(1)},
{PodName: "statefulset-name-3", VReplicas: int32(2)},
{PodName: "statefulset-name-2", VReplicas: int32(3)},
},
},
},
}
Expand Down
Loading