Skip to content

Commit

Permalink
[YUNIKORN-2802] Prune zero values from computed resource types (#950)
Browse files Browse the repository at this point in the history
Closes: #950

Signed-off-by: Craig Condit <ccondit@apache.org>
  • Loading branch information
zhuqi-lucas authored and craigcondit committed Aug 21, 2024
1 parent 4e05576 commit 8a4acda
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 15 deletions.
11 changes: 8 additions & 3 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord
if !ask.IsAllocated() {
deltaPendingResource = ask.GetAllocatedResource()
sa.pending = resources.Sub(sa.pending, deltaPendingResource)
sa.pending.Prune()
}
delete(sa.requests, allocKey)
sa.sortedRequests.remove(ask)
Expand Down Expand Up @@ -648,6 +649,7 @@ func (sa *Application) AddAllocationAsk(ask *Allocation) error {
// Update total pending resource
delta.SubFrom(oldAskResource)
sa.pending = resources.Add(sa.pending, delta)
sa.pending.Prune()
sa.queue.incPendingResource(delta)

log.Log(log.SchedApplication).Info("ask added successfully to application",
Expand Down Expand Up @@ -726,10 +728,11 @@ func (sa *Application) allocateAsk(ask *Allocation) (*resources.Resource, error)
sa.updateAskMaxPriority()
}

delta := resources.Multiply(ask.GetAllocatedResource(), -1)
sa.pending = resources.Add(sa.pending, delta)
delta := ask.GetAllocatedResource()
sa.pending = resources.Sub(sa.pending, delta)
sa.pending.Prune()
// update the pending of the queue with the same delta
sa.queue.incPendingResource(delta)
sa.queue.decPendingResource(delta)

return delta, nil
}
Expand Down Expand Up @@ -1797,6 +1800,7 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp
// as and when every ph gets removed (for replacement), resource usage would be reduced.
// When real allocation happens as part of replacement, usage would be increased again with real alloc resource
sa.allocatedPlaceholder = resources.Sub(sa.allocatedPlaceholder, alloc.GetAllocatedResource())
sa.allocatedPlaceholder.Prune()

// if all the placeholders are replaced, clear the placeholder timer
if resources.IsZero(sa.allocatedPlaceholder) {
Expand All @@ -1821,6 +1825,7 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp
sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
} else {
sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource())
sa.allocatedResource.Prune()

// Aggregate the resources used by this alloc to the application's resource tracker
sa.trackCompletedResource(alloc)
Expand Down
13 changes: 8 additions & 5 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ func TestAllocateDeallocate(t *testing.T) {
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
// allocate
if delta, err := app.AllocateAsk(aKey); err != nil || !resources.Equals(resources.Multiply(res, -1), delta) {
t.Errorf("AllocateAsk() did not return correct delta, err %v, expected %v got %v", err, resources.Multiply(res, -1), delta)
if delta, err := app.AllocateAsk(aKey); err != nil || !resources.Equals(res, delta) {
t.Errorf("AllocateAsk() did not return correct delta, err %v, expected %v got %v", err, res, delta)
}
// allocate again should fail
if delta, err := app.AllocateAsk(aKey); err == nil || delta != nil {
Expand Down Expand Up @@ -943,7 +943,8 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) {
released = app.RemoveAllocationAsk(askID)
assert.Equal(t, released, 0, "allocation ask should not have been reserved")
assert.Assert(t, app.IsCompleting(), "Application should have stayed same, changed unexpectedly: %s", app.CurrentState())
assertUserGroupResource(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
// zero resource should be pruned
assertUserGroupResource(t, getTestUserGroup(), nil)

log := app.GetStateLog()
assert.Equal(t, len(log), 2, "wrong number of app events")
Expand Down Expand Up @@ -1674,7 +1675,8 @@ func TestIncAndDecUserResourceUsage(t *testing.T) {
app.decUserResourceUsage(res, false)
assertUserGroupResource(t, getTestUserGroup(), res)
app.decUserResourceUsage(res, false)
assertUserGroupResource(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
// zero resource should be pruned
assertUserGroupResource(t, getTestUserGroup(), nil)
}

func TestIncAndDecUserResourceUsageInSameGroup(t *testing.T) {
Expand Down Expand Up @@ -1722,7 +1724,8 @@ func TestIncAndDecUserResourceUsageInSameGroup(t *testing.T) {
// Decrease testuser and testuser2 to 0
app.decUserResourceUsage(res, false)
app2.decUserResourceUsage(res, false)
assertUserResourcesAndGroupResources(t, getUserGroup("testuser", testgroups), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), nil, 0)
// zero resoure should be pruned
assertUserResourcesAndGroupResources(t, getUserGroup("testuser", testgroups), nil, nil, 0)
}

func TestGetAllRequests(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (sn *Node) refreshAvailableResource() {
sn.availableResource = sn.totalResource.Clone()
sn.availableResource.SubFrom(sn.allocatedResource)
sn.availableResource.SubFrom(sn.occupiedResource)
sn.availableResource.Prune()
// check if any quantity is negative: a nil resource is all 0's
if !resources.StrictlyGreaterThanOrEquals(sn.availableResource, nil) {
log.Log(log.SchedNode).Warn("Node update triggered over allocated node",
Expand Down Expand Up @@ -311,6 +312,7 @@ func (sn *Node) RemoveAllocation(allocationKey string) *Allocation {
if alloc != nil {
delete(sn.allocations, allocationKey)
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
sn.allocatedResource.Prune()
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, alloc.allocationKey, alloc.allocatedResource)
return alloc
Expand Down Expand Up @@ -353,6 +355,7 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
sn.allocations[alloc.GetAllocationKey()] = alloc
sn.allocatedResource.AddTo(res)
sn.availableResource.SubFrom(res)
sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, alloc.allocationKey, res)
result = true
return result
Expand All @@ -377,6 +380,7 @@ func (sn *Node) ReplaceAllocation(allocationKey string, replace *Allocation, del
// The allocatedResource and availableResource should be updated in the same way
sn.allocatedResource.AddTo(delta)
sn.availableResource.SubFrom(delta)
sn.availableResource.Prune()
if !before.FitIn(sn.allocatedResource) {
log.Log(log.SchedNode).Warn("unexpected increase in node usage after placeholder replacement",
zap.String("placeholder allocationKey", allocationKey),
Expand Down
15 changes: 15 additions & 0 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,11 @@ func (sq *Queue) decPendingResource(delta *resources.Resource) {
zap.Error(err))
} else {
sq.updatePendingResourceMetrics()
// We should update the metrics before pruning the resource.
// For example:
// If we prune the resource first and the resource become nil after pruning,
// the metrics will not be updated with nil resource, this is not expected.
sq.pending.Prune()
}
}

Expand Down Expand Up @@ -1085,7 +1090,12 @@ func (sq *Queue) DecAllocatedResource(alloc *resources.Resource) error {
defer sq.Unlock()
// all OK update the queue
sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc)
// We should update the metrics before pruning the resource.
// For example:
// If we prune the resource first and the resource become nil after pruning,
// the metrics will not be updated with nil resource, this is not expected.
sq.updateAllocatedResourceMetrics()
sq.allocatedResource.Prune()
return nil
}

Expand Down Expand Up @@ -1118,6 +1128,11 @@ func (sq *Queue) DecPreemptingResource(alloc *resources.Resource) {
sq.parent.DecPreemptingResource(alloc)
sq.preemptingResource = resources.Sub(sq.preemptingResource, alloc)
sq.updatePreemptingResourceMetrics()
// We should update the metrics before pruning the resource.
// For example:
// If we prune the resource first and the resource become nil after pruning,
// the metrics will not be updated with nil resource, this is not expected.
sq.preemptingResource.Prune()
}

func (sq *Queue) IsPrioritySortEnabled() bool {
Expand Down
20 changes: 13 additions & 7 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
assert.Equal(t, 1, len(released), "node did not release correct allocation")
assert.Equal(t, 0, len(confirmed), "node did not confirm correct allocation")
assert.Equal(t, released[0].GetAllocationKey(), allocAllocationKey, "allocationKey returned by release not the same as on allocation")
assertLimits(t, getTestUserGroup(), resources.Zero)
// zero resource should be pruned
assertLimits(t, getTestUserGroup(), nil)

assert.NilError(t, err, "the event should have been processed")
}
Expand Down Expand Up @@ -346,7 +347,8 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
assert.Assert(t, resources.Equals(app.GetPendingResource(), appRes), "app should have updated pending resources")
// check the interim state of the placeholder involved
assert.Check(t, !ph.HasRelease(), "placeholder should not have release linked anymore")
assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
// zero resource should be pruned
assertLimits(t, getTestUserGroup(), nil)
}

func TestCalculateNodesResourceUsage(t *testing.T) {
Expand Down Expand Up @@ -1077,7 +1079,8 @@ func TestRemoveApp(t *testing.T) {

allocs = partition.removeApplication("will_not_remove")
assert.Equal(t, 1, len(allocs), "existing application with allocations returned unexpected allocations %v", allocs)
assertLimits(t, getTestUserGroup(), resources.Zero)
// zero resource should be pruned
assertLimits(t, getTestUserGroup(), nil)
}

func TestRemoveAppAllocs(t *testing.T) {
Expand Down Expand Up @@ -1139,7 +1142,8 @@ func TestRemoveAppAllocs(t *testing.T) {
allocs, _ = partition.removeAllocation(release)
assert.Equal(t, 1, len(allocs), "removal request for existing allocation returned wrong allocations: %v", allocs)
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "removal requests did not remove all allocations: %v", partition.allocations)
assertLimits(t, getTestUserGroup(), resources.Zero)
// zero resource should be pruned
assertLimits(t, getTestUserGroup(), nil)
}

func TestRemoveAllPlaceholderAllocs(t *testing.T) {
Expand Down Expand Up @@ -2218,7 +2222,8 @@ func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.A
}
releases, _ := partition.removeAllocation(release)
assert.Equal(t, 0, len(releases), "not expecting any released allocations")
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0}), getExpectedQueuesLimitsForPreemptionWithRequiredNode())
// zero resource should be pruned
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil, getExpectedQueuesLimitsForPreemptionWithRequiredNode())
return partition, app
}

Expand Down Expand Up @@ -2974,7 +2979,8 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
assert.Assert(t, resources.IsZero(node.GetAllocatedResource()), "nothing should be allocated on node")
assert.Assert(t, resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be allocated on queue")
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation should be registered on the partition")
assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0, "second": 0}))
// zero resource should be pruned
assertLimits(t, getTestUserGroup(), nil)
}

// one real allocation should trigger cleanup of all placeholders
Expand Down Expand Up @@ -3044,7 +3050,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
assert.Assert(t, resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be allocated on queue")
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation should be registered on the partition")
assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be on the partition")
assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0, "second": 0}))
assertLimits(t, getTestUserGroup(), nil)
}

func TestPlaceholderBiggerThanReal(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/ugm/queue_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (qt *QueueTracker) decreaseTrackedResource(hierarchy []string, applicationI
}
}
qt.resourceUsage.SubFrom(usage)
qt.resourceUsage.Prune()
if removeApp {
log.Log(log.SchedUGM).Debug("Removed application from running applications",
zap.String("application", applicationID),
Expand Down

0 comments on commit 8a4acda

Please sign in to comment.