diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 4de35b3e6..98bbcdb75 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -409,9 +409,8 @@ func (sa *Application) clearPlaceholderTimer() { func (sa *Application) timeoutPlaceholderProcessing() { sa.Lock() defer sa.Unlock() - switch { - // Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders - case (sa.IsRunning() || sa.IsCompleting()) && !resources.IsZero(sa.allocatedPlaceholder): + if (sa.IsRunning() || sa.IsCompleting()) && !resources.IsZero(sa.allocatedPlaceholder) { + // Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders var toRelease []*Allocation replacing := 0 for _, alloc := range sa.getPlaceholderAllocations() { @@ -423,19 +422,14 @@ func (sa *Application) timeoutPlaceholderProcessing() { alloc.SetReleased(true) toRelease = append(toRelease, alloc) } - log.Log(log.SchedApplication).Info("Placeholder timeout, releasing placeholders", + log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated placeholders", zap.String("AppID", sa.ApplicationID), zap.Int("placeholders being replaced", replacing), zap.Int("releasing placeholders", len(toRelease))) // trigger the release of the placeholders: accounting updates when the release is done sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout") - // Case 2: in every other case progress the application, and notify the context about the expired placeholder asks - default: - log.Log(log.SchedApplication).Info("Placeholder timeout, releasing asks and placeholders", - zap.String("AppID", sa.ApplicationID), - zap.Int("releasing placeholders", len(sa.allocations)), - zap.Int("releasing asks", len(sa.requests)), - zap.String("gang scheduling style", sa.gangSchedulingStyle)) + } else { + // Case 2: in every other case progress the application, and notify the context about the expired placeholders // change the status of the app based on gang style: soft resume normal allocations, hard fail the app event := ResumeApplication if sa.gangSchedulingStyle == Hard { @@ -447,14 +441,30 @@ func (sa *Application) timeoutPlaceholderProcessing() { zap.String("currentState", sa.CurrentState()), zap.Error(err)) } - sa.notifyRMAllocationReleased(sa.getAllRequestsInternal(), si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder timeout") - sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT) - // all allocations are placeholders but GetAllAllocations is locked and cannot be used - sa.notifyRMAllocationReleased(sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout") - // we are in an accepted or new state so nothing can be replaced yet: mark everything as timedout - for _, phData := range sa.placeholderData { - phData.TimedOut = phData.Count + // all allocations are placeholders release them all + var toRelease, pendingRelease []*Allocation + for _, alloc := range sa.allocations { + alloc.SetReleased(true) + toRelease = append(toRelease, alloc) + } + // get all open requests and remove them all filter out already allocated as they are already released + for _, alloc := range sa.requests { + if !alloc.IsAllocated() { + alloc.SetReleased(true) + pendingRelease = append(pendingRelease, alloc) + sa.placeholderData[alloc.taskGroupName].TimedOut++ + } } + log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated and pending placeholders", + zap.String("AppID", sa.ApplicationID), + zap.Int("releasing placeholders", len(toRelease)), + zap.Int("pending placeholders", len(pendingRelease)), + zap.String("gang scheduling style", sa.gangSchedulingStyle)) + sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT) + // trigger the release of the allocated placeholders: accounting updates when the release is done + sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout") + // trigger the release of the pending placeholders: accounting has been done + sa.notifyRMAllocationReleased(pendingRelease, si.TerminationType_TIMEOUT, "releasing pending placeholders on placeholder timeout") } sa.clearPlaceholderTimer() } @@ -1906,7 +1916,7 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp if sa.hasZeroAllocations() { removeApp = true event = CompleteApplication - eventWarning = "Application state not changed to Waiting while removing an allocation" + eventWarning = "Application state not changed to Completing while removing an allocation" } sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp) } diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 91db298fe..0b77db566 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1516,14 +1516,14 @@ func TestReplaceAllocationTracking(t *testing.T) { } func TestTimeoutPlaceholderSoft(t *testing.T) { - runTimeoutPlaceholderTest(t, Resuming.String(), Soft, []int{1, 2}) + runTimeoutPlaceholderTest(t, Resuming.String(), Soft) } func TestTimeoutPlaceholderHard(t *testing.T) { - runTimeoutPlaceholderTest(t, Failing.String(), Hard, []int{1, 2}) + runTimeoutPlaceholderTest(t, Failing.String(), Hard) } -func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulingStyle string, expectedReleases []int) { +func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulingStyle string) { const ( tg1 = "tg-1" tg2 = "tg-2" @@ -1555,16 +1555,16 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") // add the placeholder to the app - ph := newPlaceholderAlloc(appID1, nodeID1, res, tg2) - app.AddAllocation(ph) - app.addPlaceholderDataWithLocking(ph) + ph1 := newPlaceholderAlloc(appID1, nodeID1, res, tg2) + app.AddAllocation(ph1) + app.addPlaceholderDataWithLocking(ph1) assertPlaceholderData(t, app, tg2, 1, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), res) assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") // add a second one to check the filter - ph = newPlaceholderAlloc(appID1, nodeID1, res, tg2) - app.AddAllocation(ph) - app.addPlaceholderDataWithLocking(ph) + ph2 := newPlaceholderAlloc(appID1, nodeID1, res, tg2) + app.AddAllocation(ph2) + app.addPlaceholderDataWithLocking(ph2) assertPlaceholderData(t, app, tg2, 2, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") @@ -1574,11 +1574,22 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin return app.placeholderTimer == nil }) assert.NilError(t, err, "Placeholder timeout cleanup did not trigger unexpectedly") - // When the app was in the accepted state, timeout should equal to count + // pending updates immediately assertPlaceholderData(t, app, tg1, 1, 1, 0, res) - assertPlaceholderData(t, app, tg2, 2, 2, 0, res) + // No changes until the removals are confirmed + assertPlaceholderData(t, app, tg2, 2, 0, 0, res) + assert.Equal(t, app.stateMachine.Current(), expectedState, "Application did not progress into expected state") + log := app.GetStateLog() + assert.Equal(t, len(log), 2, "wrong number of app events") + assert.Equal(t, log[0].ApplicationState, Accepted.String()) + assert.Equal(t, log[1].ApplicationState, expectedState) + assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) + // ordering of events is based on the order in which we call the release in the application + // first are the allocated placeholders then the pending placeholders, always same values + // See the timeoutPlaceholderProcessing() function + expectedReleases := []int{2, 1} events := testHandler.GetEvents() var found int idx := 0 @@ -1597,10 +1608,15 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)), "Unexpected placeholder resources for the app") assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - log := app.GetStateLog() - assert.Equal(t, len(log), 2, "wrong number of app events") - assert.Equal(t, log[0].ApplicationState, Accepted.String()) - assert.Equal(t, log[1].ApplicationState, expectedState) + removed := app.RemoveAllocation(ph1.allocationKey, si.TerminationType_TIMEOUT) + assert.Assert(t, removed != nil, "expected allocation got nil") + assert.Equal(t, ph1.allocationKey, removed.allocationKey, "expected placeholder to be returned") + removed = app.RemoveAllocation(ph2.allocationKey, si.TerminationType_TIMEOUT) + assert.Assert(t, removed != nil, "expected allocation got nil") + assert.Equal(t, ph2.allocationKey, removed.allocationKey, "expected placeholder to be returned") + + // Removals are confirmed: timeout should equal to count + assertPlaceholderData(t, app, tg2, 2, 2, 0, res) } func TestTimeoutPlaceholderAllocReleased(t *testing.T) {