Skip to content

Commit

Permalink
[YUNIKORN-2982] Send event when preemption occurs (#1000)
Browse files Browse the repository at this point in the history
Closes: #1000

Signed-off-by: Peter Bacsko <bacskop@gmail.com>
  • Loading branch information
rhh777 authored and pbacsko committed Dec 3, 2024
1 parent 0356a3a commit f7d0e10
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,11 @@ func (a *Allocation) SendRequiredNodePreemptionFailedEvent(node string) {
a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey, a.applicationID, node, a.GetAllocatedResource())
}

// SendPreemptedBySchedulerEvent updates the event system with the preemption event.
func (a *Allocation) SendPreemptedBySchedulerEvent(preemptorAllocKey, preemptorAppId, preemptorQueuePath string) {
a.askEvents.SendPreemptedByScheduler(a.allocationKey, a.applicationID, preemptorAllocKey, preemptorAppId, preemptorQueuePath, a.GetAllocatedResource())
}

// GetAllocationLog returns a list of log entries corresponding to allocation preconditions not being met.
func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
a.RLock()
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/objects/events/ask_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ func (ae *AskEvents) SendRequiredNodePreemptionFailed(allocKey, appID, node stri
ae.eventSystem.AddEvent(event)
}

func (ae *AskEvents) SendPreemptedByScheduler(allocKey, appID, preemptorAllocKey, preemptorAppId, preemptorQueuePath string, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
message := fmt.Sprintf("Preempted by %s from application %s in %s", preemptorAllocKey, preemptorAppId, preemptorQueuePath)
event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource)
ae.eventSystem.AddEvent(event)
}

func NewAskEvents(evt events.EventSystem) *AskEvents {
return newAskEventsWithRate(evt, 15*time.Second, 1)
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/scheduler/objects/events/ask_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,23 @@ func TestRequiredNodePreemptionFailedEvents(t *testing.T) {
event = eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-1' with required node 'node-1', no preemption victim found", event.Message)
}

func TestPreemptedBySchedulerEvents(t *testing.T) {
resource := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
events := NewAskEvents(eventSystem)
events.SendPreemptedByScheduler("alloc-0", appID, "preemptor-0", "preemptor-app-0", "root.parent.child1", resource)
assert.Equal(t, 0, len(eventSystem.Events))

eventSystem = mock.NewEventSystem()
events = NewAskEvents(eventSystem)
events.SendPreemptedByScheduler("alloc-0", appID, "preemptor-0", "preemptor-app-0", "root.parent.child1", resource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "Preempted by preemptor-0 from application preemptor-app-0 in root.parent.child1", event.Message)
}
1 change: 1 addition & 0 deletions pkg/scheduler/objects/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
zap.String("victimNodeID", victim.GetNodeID()),
zap.String("victimQueue", victimQueue.Name),
)
victim.SendPreemptedBySchedulerEvent(p.ask.allocationKey, p.ask.applicationID, p.application.queuePath)
} else {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found for preemption victim",
zap.String("queue", p.queue.Name),
Expand Down
54 changes: 54 additions & 0 deletions pkg/scheduler/objects/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
evtMock "github.com/apache/yunikorn-core/pkg/events/mock"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-core/pkg/plugins"
schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)

Expand Down Expand Up @@ -293,6 +295,58 @@ func TestTryPreemption(t *testing.T) {
assert.Equal(t, len(ask3.GetAllocationLog()), 0)
}

func TestTryPreemption_SendEvent(t *testing.T) {
node := newNode(nodeID1, map[string]resources.Quantity{"first": 10, "pods": 5})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"first": "20", "pods": "5"})
assert.NilError(t, err)
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"})
assert.NilError(t, err)
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, map[string]string{"first": "10"}, map[string]string{"first": "5"})
assert.NilError(t, err)
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, map[string]string{"first": "10"}, map[string]string{"first": "5"})
assert.NilError(t, err)

alloc1, alloc2, err := creatApp1(childQ1, node, nil, map[string]resources.Quantity{"first": 5, "pods": 1})
assert.NilError(t, err)

eventSystem := evtMock.NewEventSystem()
events := schedEvt.NewAskEvents(eventSystem)
alloc1.askEvents = events

app2, ask3, err := creatApp2(childQ2, map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3")
assert.NilError(t, err)
childQ2.incPendingResource(ask3.GetAllocatedResource())

headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods": 3})
preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, iterator(), false)

// register predicate handler
preemptions := []mock.Preemption{
mock.NewPreemption(true, "alloc3", nodeID1, []string{"alloc1"}, 0, 0),
}
plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
plugins.RegisterSchedulerPlugin(plugin)
defer plugins.UnregisterSchedulerPlugins()

result, ok := preemptor.TryPreemption()
assert.Assert(t, result != nil, "no result")
assert.NilError(t, plugin.GetPredicateError())
assert.Assert(t, ok, "no victims found")
assert.Equal(t, "alloc3", result.Request.GetAllocationKey(), "wrong alloc")
assert.Check(t, alloc1.IsPreempted(), "alloc1 not preempted")
assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, alloc1.applicationID, event.ReferenceID)
assert.Equal(t, alloc1.allocationKey, event.ObjectID)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, fmt.Sprintf("Preempted by %s from application %s in %s", "alloc3", appID2, "root.parent.child2"), event.Message)
assert.Equal(t, len(ask3.GetAllocationLog()), 0)
}

// TestTryPreemptionOnNode Test try preemption on node with simple queue hierarchy. Since Node doesn't have enough resources to accomodate, preemption happens because of node resource constraint.
// Guaranteed and Max resource set on both victim queue path and preemptor queue path in 2 levels. victim and preemptor queue are siblings.
// Request (Preemptor) resource type matches with all resource types of the victim. But Guaranteed set only on specific resource type. 2 Victims are available, but 1 should be preempted because further preemption would make usage go below the guaranteed quota
Expand Down

0 comments on commit f7d0e10

Please sign in to comment.