Skip to content

Commit

Permalink
[YUNIKORN-2926] Placeholder counters incorrect (#986)
Browse files Browse the repository at this point in the history
Placeholder tracking data is maintained inside the application for
scheduling. If the placeholder is released we update the counters in the
tracking data. We have cases in which we do not do that correctly:
* placeholders are smaller than the real allocation
* placeholder does not have an allocation to replace
* all allocations are removed from an application

Closes: #986

Signed-off-by: Craig Condit <ccondit@apache.org>
  • Loading branch information
wilfred-s authored and craigcondit committed Oct 24, 2024
1 parent 6f54992 commit 9869540
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 145 deletions.
58 changes: 35 additions & 23 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (sa *Application) clearPlaceholderTimer() {
}

// timeoutPlaceholderProcessing cleans up all placeholder asks and allocations that are not used after the timeout.
// If the application has started processing, Starting state or further, the application keeps on processing without
// If the application has started processing, Running state or further, the application keeps on processing without
// being able to use the placeholders.
// If the application is in New or Accepted state we clean up and take followup action based on the gang scheduling
// style.
Expand All @@ -422,17 +422,14 @@ func (sa *Application) timeoutPlaceholderProcessing() {
}
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
// mark as timeout out in the tracking data
if _, ok := sa.placeholderData[alloc.GetTaskGroup()]; ok {
sa.placeholderData[alloc.GetTaskGroup()].TimedOut++
}
}
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("placeholders being replaced", replacing),
zap.Int("releasing placeholders", len(toRelease)))
// trigger the release of the placeholders: accounting updates when the release is done
sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
// Case 2: in every other case fail the application, and notify the context about the expired placeholder asks
// Case 2: in every other case progress the application, and notify the context about the expired placeholder asks
default:
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing asks and placeholders",
zap.String("AppID", sa.ApplicationID),
Expand Down Expand Up @@ -1805,45 +1802,49 @@ func (sa *Application) updatePreemptedResource(info *Allocation) {
info.GetAllocatedResource(), info.GetBindTime())
}

// ReplaceAllocation removes the placeholder from the allocation list and replaces it with the real allocation.
// If no replacing allocation is linked to the placeholder it will still be removed from the application.
// Queue and Node objects are updated by the caller.
func (sa *Application) ReplaceAllocation(allocationKey string) *Allocation {
sa.Lock()
defer sa.Unlock()
// remove the placeholder that was just confirmed by the shim
ph := sa.removeAllocationInternal(allocationKey, si.TerminationType_PLACEHOLDER_REPLACED)
// this has already been replaced or it is a duplicate message from the shim
if ph == nil || !ph.HasRelease() {
log.Log(log.SchedApplication).Debug("Unexpected placeholder released",
// this has already been replaced, or it is a duplicate message from the shim just ignore
if ph == nil {
return nil
}
// ph is the placeholder, the releases entry points to the real allocation we need to swap in
alloc := ph.GetRelease()
if alloc == nil {
log.Log(log.SchedApplication).Warn("Placeholder replaced without replacement allocation",
zap.String("applicationID", sa.ApplicationID),
zap.Stringer("placeholder", ph))
return nil
return ph
}
// update the replacing allocation
// we double linked the real and placeholder allocation
// ph is the placeholder, the releases entry points to the real one
alloc := ph.GetRelease()
alloc.SetPlaceholderUsed(true)
alloc.SetPlaceholderCreateTime(ph.GetCreateTime())
alloc.SetBindTime(time.Now())
sa.addAllocationInternal(Replaced, alloc)
// order is important: clean up the allocation after adding it to the app
// we need the original Replaced allocation resultType.
alloc.ClearRelease()
if sa.placeholderData != nil {
sa.placeholderData[ph.GetTaskGroup()].Replaced++
}
return ph
}

// Remove the Allocation from the application.
// RemoveAllocation removes the Allocation from the application.
// Return the allocation that was removed or nil if not found.
func (sa *Application) RemoveAllocation(allocationKey string, releaseType si.TerminationType) *Allocation {
sa.Lock()
defer sa.Unlock()
return sa.removeAllocationInternal(allocationKey, releaseType)
}

// Remove the Allocation from the application
// No locking must be called while holding the lock
// removeAllocationInternal removes the Allocation from the application.
// Returns the allocation that was removed or nil if not found.
// No locking must be called while holding the application lock.
func (sa *Application) removeAllocationInternal(allocationKey string, releaseType si.TerminationType) *Allocation {
alloc := sa.allocations[allocationKey]

Expand All @@ -1858,9 +1859,14 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp
// update correct allocation tracker
if alloc.IsPlaceholder() {
// make sure we account for the placeholders being removed in the tracking data
if releaseType == si.TerminationType_STOPPED_BY_RM || releaseType == si.TerminationType_PREEMPTED_BY_SCHEDULER || releaseType == si.TerminationType_UNKNOWN_TERMINATION_TYPE {
if _, ok := sa.placeholderData[alloc.taskGroupName]; ok {
sa.placeholderData[alloc.taskGroupName].TimedOut++
// update based on termination type: everything is counted as a timeout except for a real replace
if sa.placeholderData != nil {
if phData, ok := sa.placeholderData[alloc.taskGroupName]; ok {
if releaseType == si.TerminationType_PLACEHOLDER_REPLACED {
phData.Replaced++
} else {
phData.TimedOut++
}
}
}
// as and when every ph gets removed (for replacement), resource usage would be reduced.
Expand Down Expand Up @@ -1933,14 +1939,20 @@ func (sa *Application) hasZeroAllocations() bool {
return resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource)
}

// Remove all allocations from the application.
// RemoveAllAllocations removes all allocations from the application.
// All allocations that have been removed are returned.
func (sa *Application) RemoveAllAllocations() []*Allocation {
sa.Lock()
defer sa.Unlock()

allocationsToRelease := make([]*Allocation, 0)
for _, alloc := range sa.allocations {
// update placeholder tracking data
if alloc.IsPlaceholder() && sa.placeholderData != nil {
if phData, ok := sa.placeholderData[alloc.taskGroupName]; ok {
phData.TimedOut++
}
}
allocationsToRelease = append(allocationsToRelease, alloc)
// Aggregate the resources used by this alloc to the application's user resource tracker
sa.trackCompletedResource(alloc)
Expand All @@ -1959,7 +1971,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation {
// When the resource trackers are zero we should not expect anything to come in later.
if resources.IsZero(sa.pending) {
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
log.Log(log.SchedApplication).Warn("Application state not changed to Waiting while removing all allocations",
log.Log(log.SchedApplication).Warn("Application state not changed to Completing while removing all allocations",
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
Expand Down
Loading

0 comments on commit 9869540

Please sign in to comment.