Skip to content

Commit

Permalink
scheduler: support reservation ignored and nodenumaresource preemption (
Browse files Browse the repository at this point in the history
#2163)

Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube authored Sep 2, 2024
1 parent 2df0762 commit 37c8ede
Show file tree
Hide file tree
Showing 25 changed files with 1,464 additions and 192 deletions.
11 changes: 11 additions & 0 deletions apis/extension/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ import (
)

const (
// LabelReservationIgnored indicates whether the pod should schedule ignoring resource reservations on the nodes.
// If a scheduling pod set this label, the pod can allocate the node unreserved resources unallocated and the
// reserved resources unallocated of all reservations on the node. If a pod scheduled with this label on a node,
// the reservations of the node will not consider the pod as their owners. To avoid the pods setting with this label
// to block the other pods allocated reserved resources, it should be used with the reservation preemption.
LabelReservationIgnored = SchedulingDomainPrefix + "/reservation-ignored"

// LabelReservationOrder controls the preference logic for Reservation.
// Reservation with lower order is preferred to be selected before Reservation with higher order.
// But if it is 0, Reservation will be selected according to the capacity score.
Expand Down Expand Up @@ -78,6 +85,10 @@ type ReservationRestrictedOptions struct {
Resources []corev1.ResourceName `json:"resources,omitempty"`
}

func IsReservationIgnored(pod *corev1.Pod) bool {
return pod != nil && pod.Labels != nil && pod.Labels[LabelReservationIgnored] == "true"
}

func GetReservationAllocated(pod *corev1.Pod) (*ReservationAllocated, error) {
if pod.Annotations == nil {
return nil, nil
Expand Down
51 changes: 51 additions & 0 deletions apis/extension/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,57 @@ import (
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)

func TestIsReservationIgnored(t *testing.T) {
tests := []struct {
name string
arg *corev1.Pod
want bool
}{
{
name: "pod is nil",
arg: nil,
want: false,
},
{
name: "pod labels is missing",
arg: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
},
},
want: false,
},
{
name: "pod label is empty",
arg: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: map[string]string{},
},
},
want: false,
},
{
name: "pod set label to true",
arg: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: map[string]string{
LabelReservationIgnored: "true",
},
},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := IsReservationIgnored(tt.arg)
assert.Equal(t, tt.want, got)
})
}
}

func TestSetReservationAllocated(t *testing.T) {
reservation := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Expand Down
2 changes: 2 additions & 0 deletions apis/scheduling/v1alpha1/reservation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type ReservationAllocatePolicy string
const (
// ReservationAllocatePolicyDefault means that there is no restriction on the policy of reserved resources,
// and allocated from the Reservation first, and if it is insufficient, it is allocated from the node.
// DEPRECATED: ReservationAllocatePolicyDefault is deprecated, it is considered as Aligned if specified.
// Please try other polices or set LabelReservationIgnored instead.
ReservationAllocatePolicyDefault ReservationAllocatePolicy = ""
// ReservationAllocatePolicyAligned indicates that the Pod allocates resources from the Reservation first.
// If the remaining resources of the Reservation are insufficient, it can be allocated from the node,
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
nodeDeviceInfo.lock.RLock()
defer nodeDeviceInfo.lock.RUnlock()
// TODO 当 NUMA 策略不为空时,关于 NUMA 下设备是否能分配其实已经在 NodeNUMAResource 的 FilterByNUMANode 中调用过,这里存在重复调用,待优化
allocateResult, status := p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched, node, preemptible, state.hasReservationAffinity)
allocateResult, status := p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched, pod, node, preemptible, state.hasReservationAffinity)
if !status.IsSuccess() {
return status
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.Cy
nodeDeviceInfo.lock.RLock()
defer nodeDeviceInfo.lock.RUnlock()

_, status = p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched[allocIndex:allocIndex+1], nodeInfo.Node(), preemptible, true)
_, status = p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched[allocIndex:allocIndex+1], pod, nodeInfo.Node(), preemptible, true)
return status
}

Expand Down Expand Up @@ -429,6 +429,7 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState,
numaNodes: affinity.NUMANodeAffinity,
}

// TODO: de-duplicate logic done by the Filter phase and move head the pre-process of the resource options
reservationRestoreState := getReservationRestoreState(cycleState)
restoreState := reservationRestoreState.getNodeState(nodeName)
preemptible := appendAllocated(nil, restoreState.mergedUnmatchedUsed, state.preemptibleDevices[nodeName])
Expand Down
81 changes: 63 additions & 18 deletions pkg/scheduler/plugins/deviceshare/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (p *Plugin) tryAllocateFromReservation(
state *preFilterState,
restoreState *nodeReservationRestoreStateData,
matchedReservations []reservationAlloc,
pod *corev1.Pod,
node *corev1.Node,
basicPreemptible map[schedulingv1alpha1.DeviceType]deviceResources,
requiredFromReservation bool,
Expand All @@ -196,6 +197,10 @@ func (p *Plugin) tryAllocateFromReservation(
return nil, nil
}

if apiext.IsReservationIgnored(pod) {
return p.tryAllocateIgnoreReservation(allocator, state, restoreState, restoreState.matched, node, basicPreemptible, false)
}

var hasSatisfiedReservation bool
var result apiext.DeviceAllocations
var status *framework.Status
Expand All @@ -220,30 +225,40 @@ func (p *Plugin) tryAllocateFromReservation(
preemptible := appendAllocated(nil, basicPreemptible, alloc.remained, preemptibleInRR)

allocatePolicy := rInfo.GetAllocatePolicy()
// TODO: Currently the ReservationAllocatePolicyDefault is actually implemented as
// ReservationAllocatePolicyAligned. Need to re-visit the policies.
if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault ||
allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyAligned {
result, status = allocator.Allocate(nil, preferred, nil, preemptible)
if status.IsSuccess() {
hasSatisfiedReservation = true
break
if !status.IsSuccess() {
reservationReasons = append(reservationReasons, status)
continue
}
reservationReasons = append(reservationReasons, status)

hasSatisfiedReservation = true
break

} else if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted {
_, status := allocator.Allocate(preferred, preferred, nil, preemptible)
if status.IsSuccess() {
//
// It is necessary to check separately whether the remaining resources of the device instance
// reserved by the Restricted Reservation meet the requirements of the Pod, to ensure that
// the intersecting resources do not exceed the reserved range of the Restricted Reservation.
//
requiredDeviceResources := calcRequiredDeviceResources(&alloc, preemptibleInRR)
result, status = allocator.Allocate(preferred, preferred, requiredDeviceResources, preemptible)
if status.IsSuccess() {
hasSatisfiedReservation = true
break
}
_, status = allocator.Allocate(preferred, preferred, nil, preemptible)
if !status.IsSuccess() {
reservationReasons = append(reservationReasons, status)
continue
}

//
// It is necessary to check separately whether the remaining resources of the device instance
// reserved by the Restricted Reservation meet the requirements of the Pod, to ensure that
// the intersecting resources do not exceed the reserved range of the Restricted Reservation.
//
requiredDeviceResources := calcRequiredDeviceResources(&alloc, preemptibleInRR)
result, status = allocator.Allocate(preferred, preferred, requiredDeviceResources, preemptible)
if !status.IsSuccess() {
reservationReasons = append(reservationReasons, status)
continue
}
reservationReasons = append(reservationReasons, status)

hasSatisfiedReservation = true
break
}
}
if !hasSatisfiedReservation && requiredFromReservation {
Expand All @@ -252,6 +267,29 @@ func (p *Plugin) tryAllocateFromReservation(
return result, nil
}

// tryAllocateIgnoreReservation will try to allocate where the reserved resources of the node ignored.
func (p *Plugin) tryAllocateIgnoreReservation(
allocator *AutopilotAllocator,
state *preFilterState,
restoreState *nodeReservationRestoreStateData,
ignoredReservations []reservationAlloc,
node *corev1.Node,
basicPreemptible map[schedulingv1alpha1.DeviceType]deviceResources,
requiredFromReservation bool,
) (apiext.DeviceAllocations, *framework.Status) {
preemptibleFromIgnored := map[schedulingv1alpha1.DeviceType]deviceResources{}

// accumulate all ignored reserved resources which are not allocated to any owner pods
for _, alloc := range ignoredReservations {
preemptibleFromIgnored = appendAllocated(preemptibleFromIgnored,
state.preemptibleInRRs[node.Name][alloc.rInfo.UID()], alloc.remained)
}

preemptibleFromIgnored = appendAllocated(preemptibleFromIgnored, basicPreemptible, restoreState.mergedMatchedAllocated)

return allocator.Allocate(nil, nil, nil, preemptibleFromIgnored)
}

func (p *Plugin) makeReasonsByReservation(reservationReasons []*framework.Status) []string {
var reasons []string
for _, status := range reservationReasons {
Expand Down Expand Up @@ -321,6 +359,12 @@ func (p *Plugin) allocateWithNominatedReservation(
return nil, nil
}

// if the pod is reservation-ignored, it should allocate the node unallocated resources and all the reserved
// unallocated resources.
if apiext.IsReservationIgnored(pod) {
return p.tryAllocateIgnoreReservation(allocator, state, restoreState, restoreState.matched, node, basicPreemptible, false)
}

reservation := p.handle.GetReservationNominator().GetNominatedReservation(pod, node.Name)
if reservation == nil {
return nil, nil
Expand All @@ -342,6 +386,7 @@ func (p *Plugin) allocateWithNominatedReservation(
state,
restoreState,
restoreState.matched[allocIndex:allocIndex+1],
pod,
node,
basicPreemptible,
false,
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/plugins/deviceshare/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ func Test_tryAllocateFromReservation(t *testing.T) {
tt.state,
tt.restoreState,
tt.restoreState.matched,
nil,
node,
basicPreemptible,
tt.requiredFromReservation,
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/deviceshare/topology_hint.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *Plugin) Allocate(ctx context.Context, cycleState *framework.CycleState,

nodeDeviceInfo.lock.RLock()
defer nodeDeviceInfo.lock.RUnlock()
allocateResult, status := p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched, node, preemptible, state.hasReservationAffinity)
allocateResult, status := p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched, pod, node, preemptible, state.hasReservationAffinity)
if !status.IsSuccess() {
return status
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (p *Plugin) generateTopologyHints(cycleState *framework.CycleState, state *
}
}

allocateResult, status := p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched, node, preemptible, state.hasReservationAffinity)
allocateResult, status := p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched, pod, node, preemptible, state.hasReservationAffinity)
if !status.IsSuccess() {
return
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/scheduler/plugins/nodenumaresource/node_allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,17 @@ func (n *NodeAllocation) release(podUID types.UID) {
}
}

func (n *NodeAllocation) getAvailableCPUs(cpuTopology *CPUTopology, maxRefCount int, reservedCPUs, preferredCPUs cpuset.CPUSet) (availableCPUs cpuset.CPUSet, allocateInfo CPUDetails) {
func (n *NodeAllocation) getAvailableCPUs(cpuTopology *CPUTopology, maxRefCount int, reservedCPUs cpuset.CPUSet, preferredCPUs ...cpuset.CPUSet) (availableCPUs cpuset.CPUSet, allocateInfo CPUDetails) {
allocateInfo = n.allocatedCPUs.Clone()
if !preferredCPUs.IsEmpty() {
for _, cpuID := range preferredCPUs.ToSliceNoSort() {
// NOTE: preferredCPUs is a slice since we may restore a cpu multiple times when it is referenced more than once.
// e.g. For a pod A tries to preempt a pod B in the reservation R, the CPU C is allocated twice by the B and R.
// So the RefCount of CPU C is 2, and the pod A should allocate it by returning the C twice, where the one is
// by the reservation restoring and the another is by the preemption restoring.
for _, preferred := range preferredCPUs {
if preferred.IsEmpty() {
continue
}
for _, cpuID := range preferred.ToSliceNoSort() {
cpuInfo, ok := allocateInfo[cpuID]
if ok {
cpuInfo.RefCount--
Expand Down
Loading

0 comments on commit 37c8ede

Please sign in to comment.