Skip to content

Commit

Permalink
Refactor events/actions.
Browse files Browse the repository at this point in the history
Signed-off-by: Klaus Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Jan 28, 2020
1 parent a0125e9 commit bf57556
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 137 deletions.
61 changes: 5 additions & 56 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package v1alpha1
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/volcano/pkg/apis/common"
)

// +genclient
Expand Down Expand Up @@ -118,76 +120,23 @@ const (
JobStatusError JobEvent = "JobStatusError"
)

// Event represent the phase of Job, e.g. pod-failed.
type Event string

const (
// AnyEvent means all event
AnyEvent Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// JobUnknownEvent These below are several events can lead to job 'Unknown'
// 1. Task Unschedulable, this is triggered when part of
// pods can't be scheduled while some are already running in gang-scheduling case.
JobUnknownEvent Event = "Unknown"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"

// Note: events below are used internally, should not be used by users.

// OutOfSyncEvent is triggered if Pod/Job were updated
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
)

// Action is the action that Job controller will take according to the event.
type Action string

const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"
// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"

// Note: actions below are only used internally, should not be used by users.

// SyncJobAction is the action to sync Job/Pod status.
SyncJobAction Action = "SyncJob"
// EnqueueAction is the action to sync Job inqueue status.
EnqueueAction Action = "EnqueueJob"
)

// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
// The action that will be taken to the PodGroup according to Event.
// One of "Restart", "None".
// Default to None.
// +optional
Action Action `json:"action,omitempty" protobuf:"bytes,1,opt,name=action"`
Action common.Action `json:"action,omitempty" protobuf:"bytes,1,opt,name=action"`

// The Event recorded by scheduler; the controller takes actions
// according to this Event.
// +optional
Event Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`
Event common.Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`

// The Events recorded by scheduler; the controller takes actions
// according to this Events.
// +optional
Events []Event `json:"events,omitempty" protobuf:"bytes,3,opt,name=events"`
Events []common.Event `json:"events,omitempty" protobuf:"bytes,3,opt,name=events"`

// The exit code of the pod container, controller will take action
// according to this code.
Expand Down
53 changes: 53 additions & 0 deletions pkg/apis/common/actions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2020 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/


package common

// Action is the action that Job controller will take according to the event.
type Action string

const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"
// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"

// Note: actions below are only used internally, should not be used by users.

// SyncJobAction is the action to sync Job/Pod status.
SyncJobAction Action = "SyncJob"
// EnqueueAction is the action to sync Job inqueue status.
EnqueueAction Action = "EnqueueJob"

// SyncQueueAction is the action to sync queue status.
SyncQueueAction Action = "SyncQueue"
// OpenQueueAction is the action to open queue
OpenQueueAction Action = "OpenQueue"
// CloseQueueAction is the action to close queue
CloseQueueAction Action = "CloseQueue"
)
42 changes: 42 additions & 0 deletions pkg/apis/common/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2020 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

// Event represent the phase of Job, e.g. pod-failed.
type Event string

const (
// AnyEvent means all event
AnyEvent Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// JobUnknownEvent These below are several events can lead to job 'Unknown'
// 1. Task Unschedulable, this is triggered when part of
// pods can't be scheduled while some are already running in gang-scheduling case.
JobUnknownEvent Event = "Unknown"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"

// Note: events below are used internally, should not be used by users.

// OutOfSyncEvent is triggered if Pod/Job were updated
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
)
32 changes: 0 additions & 32 deletions pkg/apis/scheduling/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,28 +113,6 @@ const (
NotEnoughPodsReason string = "NotEnoughTasks"
)

// QueueEvent represent the phase of queue
type QueueEvent string

const (
// QueueOutOfSyncEvent is triggered if PodGroup/Queue were updated
QueueOutOfSyncEvent QueueEvent = "OutOfSync"
// QueueCommandIssuedEvent is triggered if a command is raised by user
QueueCommandIssuedEvent QueueEvent = "CommandIssued"
)

// QueueAction is the action that queue controller will take according to the event.
type QueueAction string

const (
// SyncQueueAction is the action to sync queue status.
SyncQueueAction QueueAction = "SyncQueue"
// OpenQueueAction is the action to open queue
OpenQueueAction QueueAction = "OpenQueue"
// CloseQueueAction is the action to close queue
CloseQueueAction QueueAction = "CloseQueue"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

Expand Down Expand Up @@ -278,13 +256,3 @@ type QueueList struct {
Items []Queue `json:"items" protobuf:"bytes,2,rep,name=items"`
}

// QueueRequest struct
type QueueRequest struct {
// Name is queue name
Name string

// Event is event of queue
Event QueueEvent
// Action is action to be performed
Action QueueAction
}
10 changes: 5 additions & 5 deletions pkg/controllers/apis/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package apis
import (
"fmt"

batch "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/common"
)

//Request struct
Expand All @@ -29,15 +29,15 @@ type Request struct {
TaskName string
QueueName string

Event batch.Event
Event common.Event
ExitCode int32
Action batch.Action
Action common.Action
JobVersion int32
}

//String function returns the request in string format
func (r Request) String() string {
return fmt.Sprintf(
"Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion)
"Queue: %s, Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
r.QueueName, r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion)
}
19 changes: 10 additions & 9 deletions pkg/controllers/queue/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sync"
"time"
"volcano.sh/volcano/pkg/controllers/apis"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -81,10 +82,10 @@ type Controller struct {
// queue name -> podgroup namespace/name
podGroups map[string]map[string]struct{}

syncHandler func(req *schedulingv1alpha2.QueueRequest) error
syncHandler func(req *apis.Request) error
syncCommandHandler func(cmd *busv1alpha1.Command) error

enqueueQueue func(req *schedulingv1alpha2.QueueRequest)
enqueueQueue func(req *apis.Request)

recorder record.EventRecorder
}
Expand Down Expand Up @@ -205,7 +206,7 @@ func (c *Controller) processNextWorkItem() bool {
}
defer c.queue.Done(obj)

req, ok := obj.(*schedulingv1alpha2.QueueRequest)
req, ok := obj.(*apis.Request)
if !ok {
klog.Errorf("%v is not a valid queue request struct.", obj)
return true
Expand All @@ -217,20 +218,20 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

func (c *Controller) handleQueue(req *schedulingv1alpha2.QueueRequest) error {
func (c *Controller) handleQueue(req *apis.Request) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing queue %s (%v).", req.Name, time.Since(startTime))
klog.V(4).Infof("Finished syncing queue %s (%v).", req.QueueName, time.Since(startTime))
}()

queue, err := c.queueLister.Get(req.Name)
queue, err := c.queueLister.Get(req.QueueName)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Queue %s has been deleted.", req.Name)
klog.V(4).Infof("Queue %s has been deleted.", req.QueueName)
return nil
}

return fmt.Errorf("get queue %s failed for %v", req.Name, err)
return fmt.Errorf("get queue %s failed for %v", req.QueueName, err)
}

queueState := queuestate.NewState(queue)
Expand All @@ -240,7 +241,7 @@ func (c *Controller) handleQueue(req *schedulingv1alpha2.QueueRequest) error {

if err := queueState.Execute(req.Action); err != nil {
return fmt.Errorf("sync queue %s failed for %v, event is %v, action is %s",
req.Name, err, req.Event, req.Action)
req.QueueName, err, req.Event, req.Action)
}

return nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package queue
import (
"fmt"
"reflect"
"volcano.sh/volcano/pkg/apis/common"

schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
"volcano.sh/volcano/pkg/controllers/queue/state"
Expand Down Expand Up @@ -87,12 +88,12 @@ func (c *Controller) openQueue(queue *schedulingv1alpha2.Queue, updateStateFn st

if queue.Spec.State != newQueue.Spec.State {
if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(newQueue); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.OpenQueueAction),
c.recorder.Event(newQueue, v1.EventTypeWarning, string(common.OpenQueueAction),
fmt.Sprintf("Open queue failed for %v", err))
return err
}

c.recorder.Event(newQueue, v1.EventTypeNormal, string(schedulingv1alpha2.OpenQueueAction),
c.recorder.Event(newQueue, v1.EventTypeNormal, string(common.OpenQueueAction),
fmt.Sprintf("Open queue succeed"))
} else {
return nil
Expand All @@ -112,7 +113,7 @@ func (c *Controller) openQueue(queue *schedulingv1alpha2.Queue, updateStateFn st

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.OpenQueueAction),
c.recorder.Event(newQueue, v1.EventTypeWarning, string(common.OpenQueueAction),
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
return err
Expand All @@ -130,12 +131,12 @@ func (c *Controller) closeQueue(queue *schedulingv1alpha2.Queue, updateStateFn s

if queue.Spec.State != newQueue.Spec.State {
if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(newQueue); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.CloseQueueAction),
c.recorder.Event(newQueue, v1.EventTypeWarning, string(common.CloseQueueAction),
fmt.Sprintf("Close queue failed for %v", err))
return err
}

c.recorder.Event(newQueue, v1.EventTypeNormal, string(schedulingv1alpha2.CloseQueueAction),
c.recorder.Event(newQueue, v1.EventTypeNormal, string(common.CloseQueueAction),
fmt.Sprintf("Close queue succeed"))
} else {
return nil
Expand All @@ -156,7 +157,7 @@ func (c *Controller) closeQueue(queue *schedulingv1alpha2.Queue, updateStateFn s

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.CloseQueueAction),
c.recorder.Event(newQueue, v1.EventTypeWarning, string(common.CloseQueueAction),
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
return err
Expand Down
Loading

0 comments on commit bf57556

Please sign in to comment.