Skip to content

Commit

Permalink
Support multiple events in the lifecycle policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajadeepan D Ramesh committed Jul 9, 2019
1 parent 7b1aefe commit 50b7595
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 20 deletions.
62 changes: 47 additions & 15 deletions pkg/admission/admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,43 @@ func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path)
exitCodes := map[int32]struct{}{}

for _, policy := range policies {
if policy.Event != "" && policy.ExitCode != nil {
if (policy.Event != "" || len(policy.Events) != 0) && policy.ExitCode != nil {
err = multierror.Append(err, fmt.Errorf("must not specify event and exitCode simultaneously"))
break
}

if policy.Event == "" && policy.ExitCode == nil {
if policy.Event == "" && len(policy.Events) == 0 && policy.ExitCode == nil {
err = multierror.Append(err, fmt.Errorf("either event and exitCode should be specified"))
break
}

if policy.Event != "" {
if allow, ok := policyEventMap[policy.Event]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, policy.Event, fmt.Sprintf("invalid policy event")))
break
if len(policy.Event) != 0 || len(policy.Events) != 0 {
bFlag := false
policyEventsList := getEventlist(policy)
for _, event := range policyEventsList {
if allow, ok := policyEventMap[event]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, event, fmt.Sprintf("invalid policy event")))
bFlag = true
break
}

if allow, ok := policyActionMap[policy.Action]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, policy.Action, fmt.Sprintf("invalid policy action")))
bFlag = true
break
}
if _, found := policyEvents[event]; found {
err = multierror.Append(err, fmt.Errorf("duplicate event %v across different policy", event))
bFlag = true
break
} else {
policyEvents[event] = struct{}{}
}
}

if allow, ok := policyActionMap[policy.Action]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, policy.Action, fmt.Sprintf("invalid policy action")))
if bFlag == true {
break
}
if _, found := policyEvents[policy.Event]; found {
err = multierror.Append(err, fmt.Errorf("duplicate event %v", policy.Event))
break
} else {
policyEvents[policy.Event] = struct{}{}
}

} else {
if *policy.ExitCode == 0 {
err = multierror.Append(err, fmt.Errorf("0 is not a valid error code"))
Expand All @@ -162,6 +173,27 @@ func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path)
return err
}

func getEventlist(policy v1alpha1.LifecyclePolicy) []v1alpha1.Event {
policyEventsList := policy.Events
if len(policy.Event) > 0 {
policyEventsList = append(policyEventsList, policy.Event)
}
uniquePolicyEventlist := removeDuplicates(policyEventsList)
return uniquePolicyEventlist
}

func removeDuplicates(EventList []v1alpha1.Event) []v1alpha1.Event {
keys := make(map[v1alpha1.Event]bool)
list := []v1alpha1.Event{}
for _, val := range EventList {
if _, value := keys[val]; !value {
keys[val] = true
list = append(list, val)
}
}
return list
}

func getValidEvents() []v1alpha1.Event {
var events []v1alpha1.Event
for e, allow := range policyEventMap {
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ type LifecyclePolicy struct {
// +optional
Event Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`

// The Events recorded by scheduler; the controller takes actions
// according to this Events.
// +optional
Events []Event `json:"events,omitempty" protobuf:"bytes,3,opt,name=events"`

// The exit code of the pod container, controller will take action
// according to this code.
// Note: only one of `Event` or `ExitCode` can be specified.
Expand All @@ -189,7 +194,7 @@ type LifecyclePolicy struct {
// Timeout is the grace period for controller to take actions.
// Default to nil (take action immediately).
// +optional
Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,4,opt,name=timeout"`
}

// TaskSpec specifies the task specification of Job
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 28 additions & 4 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kbapi "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1"

"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
"volcano.sh/volcano/pkg/controllers/apis"
Expand Down Expand Up @@ -143,8 +144,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
for _, task := range job.Spec.Tasks {
if task.Name == req.TaskName {
for _, policy := range task.Policies {
if len(policy.Event) > 0 && len(req.Event) > 0 {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
policyEvents := getEventlist(policy)

if len(policyEvents) > 0 && len(req.Event) > 0 {
if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, vkv1.AnyEvent) {
return policy.Action
}
}
Expand All @@ -161,8 +164,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {

// Parse Job level policies
for _, policy := range job.Spec.Policies {
if len(policy.Event) > 0 && len(req.Event) > 0 {
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
policyEvents := getEventlist(policy)

if len(policyEvents) > 0 && len(req.Event) > 0 {
if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, vkv1.AnyEvent) {
return policy.Action
}
}
Expand All @@ -176,8 +181,27 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
return vkv1.SyncJobAction
}

func getEventlist(policy v1alpha1.LifecyclePolicy) []v1alpha1.Event {
policyEventsList := policy.Events
if len(policy.Event) > 0 {
policyEventsList = append(policyEventsList, policy.Event)
}
return policyEventsList
}

func checkEventExist(policyEvents []v1alpha1.Event, reqEvent v1alpha1.Event) bool {
for _, event := range policyEvents {
if event == reqEvent {
return true
}
}
return false

}

func addResourceList(list, req, limit v1.ResourceList) {
for name, quantity := range req {

if value, ok := list[name]; !ok {
list[name] = *quantity.Copy()
} else {
Expand Down

0 comments on commit 50b7595

Please sign in to comment.