Skip to content

Commit

Permalink
[YUNIKORN-2953] Placeholder release count incorrect (#991)
Browse files Browse the repository at this point in the history
While releasing placeholders the allocated placeholders are counted
twice in the tracking information. With YUNIKORN-2926 in place this
happens if some but not all placeholders are allocated only.

Mark unallocated placeholders that timeout as released to prevent
scheduling issues.

Closes: #991

Signed-off-by: Craig Condit <ccondit@apache.org>
  • Loading branch information
wilfred-s authored and craigcondit committed Nov 4, 2024
1 parent 7057201 commit 6ef347b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 34 deletions.
48 changes: 29 additions & 19 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,8 @@ func (sa *Application) clearPlaceholderTimer() {
func (sa *Application) timeoutPlaceholderProcessing() {
sa.Lock()
defer sa.Unlock()
switch {
// Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders
case (sa.IsRunning() || sa.IsCompleting()) && !resources.IsZero(sa.allocatedPlaceholder):
if (sa.IsRunning() || sa.IsCompleting()) && !resources.IsZero(sa.allocatedPlaceholder) {
// Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders
var toRelease []*Allocation
replacing := 0
for _, alloc := range sa.getPlaceholderAllocations() {
Expand All @@ -423,19 +422,14 @@ func (sa *Application) timeoutPlaceholderProcessing() {
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
}
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing placeholders",
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("placeholders being replaced", replacing),
zap.Int("releasing placeholders", len(toRelease)))
// trigger the release of the placeholders: accounting updates when the release is done
sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
// Case 2: in every other case progress the application, and notify the context about the expired placeholder asks
default:
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing asks and placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("releasing placeholders", len(sa.allocations)),
zap.Int("releasing asks", len(sa.requests)),
zap.String("gang scheduling style", sa.gangSchedulingStyle))
} else {
// Case 2: in every other case progress the application, and notify the context about the expired placeholders
// change the status of the app based on gang style: soft resume normal allocations, hard fail the app
event := ResumeApplication
if sa.gangSchedulingStyle == Hard {
Expand All @@ -447,14 +441,30 @@ func (sa *Application) timeoutPlaceholderProcessing() {
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
sa.notifyRMAllocationReleased(sa.getAllRequestsInternal(), si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder timeout")
sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
// all allocations are placeholders but GetAllAllocations is locked and cannot be used
sa.notifyRMAllocationReleased(sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
// we are in an accepted or new state so nothing can be replaced yet: mark everything as timedout
for _, phData := range sa.placeholderData {
phData.TimedOut = phData.Count
// all allocations are placeholders release them all
var toRelease, pendingRelease []*Allocation
for _, alloc := range sa.allocations {
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
}
// get all open requests and remove them all filter out already allocated as they are already released
for _, alloc := range sa.requests {
if !alloc.IsAllocated() {
alloc.SetReleased(true)
pendingRelease = append(pendingRelease, alloc)
sa.placeholderData[alloc.taskGroupName].TimedOut++
}
}
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated and pending placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("releasing placeholders", len(toRelease)),
zap.Int("pending placeholders", len(pendingRelease)),
zap.String("gang scheduling style", sa.gangSchedulingStyle))
sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
// trigger the release of the allocated placeholders: accounting updates when the release is done
sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
// trigger the release of the pending placeholders: accounting has been done
sa.notifyRMAllocationReleased(pendingRelease, si.TerminationType_TIMEOUT, "releasing pending placeholders on placeholder timeout")
}
sa.clearPlaceholderTimer()
}
Expand Down Expand Up @@ -1906,7 +1916,7 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp
if sa.hasZeroAllocations() {
removeApp = true
event = CompleteApplication
eventWarning = "Application state not changed to Waiting while removing an allocation"
eventWarning = "Application state not changed to Completing while removing an allocation"
}
sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
}
Expand Down
46 changes: 31 additions & 15 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,14 +1516,14 @@ func TestReplaceAllocationTracking(t *testing.T) {
}

func TestTimeoutPlaceholderSoft(t *testing.T) {
runTimeoutPlaceholderTest(t, Resuming.String(), Soft, []int{1, 2})
runTimeoutPlaceholderTest(t, Resuming.String(), Soft)
}

func TestTimeoutPlaceholderHard(t *testing.T) {
runTimeoutPlaceholderTest(t, Failing.String(), Hard, []int{1, 2})
runTimeoutPlaceholderTest(t, Failing.String(), Hard)
}

func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulingStyle string, expectedReleases []int) {
func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulingStyle string) {
const (
tg1 = "tg-1"
tg2 = "tg-2"
Expand Down Expand Up @@ -1555,16 +1555,16 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
assert.Assert(t, app.IsAccepted(), "Application should be in accepted state")

// add the placeholder to the app
ph := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph)
ph1 := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
app.AddAllocation(ph1)
app.addPlaceholderDataWithLocking(ph1)
assertPlaceholderData(t, app, tg2, 1, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), res)
assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation")
// add a second one to check the filter
ph = newPlaceholderAlloc(appID1, nodeID1, res, tg2)
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph)
ph2 := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
app.AddAllocation(ph2)
app.addPlaceholderDataWithLocking(ph2)
assertPlaceholderData(t, app, tg2, 2, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))
assert.Assert(t, app.IsAccepted(), "Application should be in accepted state")
Expand All @@ -1574,11 +1574,22 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
return app.placeholderTimer == nil
})
assert.NilError(t, err, "Placeholder timeout cleanup did not trigger unexpectedly")
// When the app was in the accepted state, timeout should equal to count
// pending updates immediately
assertPlaceholderData(t, app, tg1, 1, 1, 0, res)
assertPlaceholderData(t, app, tg2, 2, 2, 0, res)
// No changes until the removals are confirmed
assertPlaceholderData(t, app, tg2, 2, 0, 0, res)

assert.Equal(t, app.stateMachine.Current(), expectedState, "Application did not progress into expected state")
log := app.GetStateLog()
assert.Equal(t, len(log), 2, "wrong number of app events")
assert.Equal(t, log[0].ApplicationState, Accepted.String())
assert.Equal(t, log[1].ApplicationState, expectedState)

assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))
// ordering of events is based on the order in which we call the release in the application
// first are the allocated placeholders then the pending placeholders, always same values
// See the timeoutPlaceholderProcessing() function
expectedReleases := []int{2, 1}
events := testHandler.GetEvents()
var found int
idx := 0
Expand All @@ -1597,10 +1608,15 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)), "Unexpected placeholder resources for the app")
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))

log := app.GetStateLog()
assert.Equal(t, len(log), 2, "wrong number of app events")
assert.Equal(t, log[0].ApplicationState, Accepted.String())
assert.Equal(t, log[1].ApplicationState, expectedState)
removed := app.RemoveAllocation(ph1.allocationKey, si.TerminationType_TIMEOUT)
assert.Assert(t, removed != nil, "expected allocation got nil")
assert.Equal(t, ph1.allocationKey, removed.allocationKey, "expected placeholder to be returned")
removed = app.RemoveAllocation(ph2.allocationKey, si.TerminationType_TIMEOUT)
assert.Assert(t, removed != nil, "expected allocation got nil")
assert.Equal(t, ph2.allocationKey, removed.allocationKey, "expected placeholder to be returned")

// Removals are confirmed: timeout should equal to count
assertPlaceholderData(t, app, tg2, 2, 2, 0, res)
}

func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
Expand Down

0 comments on commit 6ef347b

Please sign in to comment.