Skip to content

Commit

Permalink
Merge pull request #3134 from kingeasternsun/improve/jobflow-requeue-…
Browse files Browse the repository at this point in the history
…struct

⚡  jobflow-controller jobtemplate-controller enqueue should use struct not pointer
  • Loading branch information
volcano-sh-bot authored Sep 25, 2023
2 parents 015f86b + 41273f7 commit 00c5e7b
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 16 deletions.
8 changes: 4 additions & 4 deletions pkg/controllers/jobflow/jobflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type jobflowcontroller struct {
recorder record.EventRecorder

queue workqueue.RateLimitingInterface
enqueueJobFlow func(req *apis.FlowRequest)
enqueueJobFlow func(req apis.FlowRequest)

syncHandler func(req *apis.FlowRequest) error

Expand Down Expand Up @@ -164,13 +164,13 @@ func (jf *jobflowcontroller) processNextWorkItem() bool {
// period.
defer jf.queue.Done(obj)

req, ok := obj.(*apis.FlowRequest)
req, ok := obj.(apis.FlowRequest)
if !ok {
klog.Errorf("%v is not a valid queue request struct.", obj)
return true
}

err := jf.syncHandler(req)
err := jf.syncHandler(&req)
jf.handleJobFlowErr(err, obj)

return true
Expand Down Expand Up @@ -218,7 +218,7 @@ func (jf *jobflowcontroller) handleJobFlowErr(err error, obj interface{}) {
return
}

req, _ := obj.(*apis.FlowRequest)
req, _ := obj.(apis.FlowRequest)
jf.recordEventsForJobFlow(req.Namespace, req.JobFlowName, v1.EventTypeWarning, string(req.Action),
fmt.Sprintf("%v JobFlow failed for %v", req.Action, err))
klog.V(4).Infof("Dropping JobFlow request %v out of the queue for %v.", obj, err)
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/jobflow/jobflow_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"volcano.sh/volcano/pkg/controllers/apis"
)

func (jf *jobflowcontroller) enqueue(req *apis.FlowRequest) {
func (jf *jobflowcontroller) enqueue(req apis.FlowRequest) {
jf.queue.Add(req)
}

Expand All @@ -36,7 +36,8 @@ func (jf *jobflowcontroller) addJobFlow(obj interface{}) {
return
}

req := &apis.FlowRequest{
// use struct instead of pointer
req := apis.FlowRequest{
Namespace: jobFlow.Namespace,
JobFlowName: jobFlow.Name,

Expand Down Expand Up @@ -69,7 +70,7 @@ func (jf *jobflowcontroller) updateJobFlow(oldObj, newObj interface{}) {
return
}

req := &apis.FlowRequest{
req := apis.FlowRequest{
Namespace: newJobFlow.Namespace,
JobFlowName: newJobFlow.Name,

Expand Down Expand Up @@ -107,12 +108,12 @@ func (jf *jobflowcontroller) updateJob(oldObj, newObj interface{}) {
return
}

req := &apis.FlowRequest{
req := apis.FlowRequest{
Namespace: newJob.Namespace,
JobFlowName: jobFlowName,
Action: jobflowv1alpha1.SyncJobFlowAction,
Event: jobflowv1alpha1.OutOfSyncEvent,
}

jf.queue.Add(req)
jf.enqueueJobFlow(req)
}
55 changes: 55 additions & 0 deletions pkg/controllers/jobflow/jobflow_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
jobflowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/apis/pkg/client/clientset/versioned/scheme"
"volcano.sh/volcano/pkg/controllers/apis"
)

func TestAddJobFlowFunc(t *testing.T) {
Expand Down Expand Up @@ -170,3 +171,57 @@ func TestUpdateJobFunc(t *testing.T) {
})
}
}

func TestEnqueueJobFlow(t *testing.T) {

namespace := "test"

req1 := apis.FlowRequest{
Namespace: namespace,
JobFlowName: "name1",

Action: jobflowv1alpha1.SyncJobFlowAction,
Event: jobflowv1alpha1.OutOfSyncEvent,
}
req2 := apis.FlowRequest{
Namespace: namespace,
JobFlowName: "name2",

Action: jobflowv1alpha1.SyncJobFlowAction,
Event: jobflowv1alpha1.OutOfSyncEvent,
}

testCases := []struct {
Name string
newReq apis.FlowRequest
oldReq apis.FlowRequest
ExpectValue int
}{
{
Name: "de-duplicate",
newReq: req1,
oldReq: req1,
ExpectValue: 1,
},
{
Name: "no-deduplicate",
newReq: req1,
oldReq: req2,
ExpectValue: 2,
},
}

for i, testcase := range testCases {
t.Run(testcase.Name, func(t *testing.T) {
fakeController := newFakeController()

fakeController.enqueueJobFlow(testcase.oldReq)
fakeController.enqueueJobFlow(testcase.newReq)
queueLen := fakeController.queue.Len()
if testcase.ExpectValue != queueLen {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, queueLen)
}
})
}

}
6 changes: 3 additions & 3 deletions pkg/controllers/jobtemplate/jobtemplate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type jobtemplatecontroller struct {
recorder record.EventRecorder

queue workqueue.RateLimitingInterface
enqueueJobTemplate func(req *apis.FlowRequest)
enqueueJobTemplate func(req apis.FlowRequest)

syncHandler func(req *apis.FlowRequest) error

Expand Down Expand Up @@ -149,13 +149,13 @@ func (jt *jobtemplatecontroller) processNextWorkItem() bool {
// period.
defer jt.queue.Done(obj)

req, ok := obj.(*apis.FlowRequest)
req, ok := obj.(apis.FlowRequest)
if !ok {
klog.Errorf("%v is not a valid queue request struct.", obj)
return true
}

err := jt.syncHandler(req)
err := jt.syncHandler(&req)
jt.handleJobTemplateErr(err, obj)

return true
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/jobtemplate/jobtemplate_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"volcano.sh/volcano/pkg/controllers/apis"
)

func (jt *jobtemplatecontroller) enqueue(req *apis.FlowRequest) {
func (jt *jobtemplatecontroller) enqueue(req apis.FlowRequest) {
jt.queue.Add(req)
}

Expand All @@ -37,7 +37,7 @@ func (jt *jobtemplatecontroller) addJobTemplate(obj interface{}) {
return
}

req := &apis.FlowRequest{
req := apis.FlowRequest{
Namespace: jobTemplate.Namespace,
JobTemplateName: jobTemplate.Name,
}
Expand All @@ -63,9 +63,9 @@ func (jt *jobtemplatecontroller) addJob(obj interface{}) {
}
namespace, name := namespaceName[0], namespaceName[1]

req := &apis.FlowRequest{
req := apis.FlowRequest{
Namespace: namespace,
JobTemplateName: name,
}
jt.queue.Add(req)
jt.enqueueJobTemplate(req)
}
53 changes: 53 additions & 0 deletions pkg/controllers/jobtemplate/jobtemplate_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
jobflowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
)

func TestAddJobTemplateFunc(t *testing.T) {
Expand Down Expand Up @@ -88,3 +89,55 @@ func TestAddJob(t *testing.T) {
})
}
}

func TestEnqueueJobTemplate(t *testing.T) {

namespace := "test"

req1 := apis.FlowRequest{
Namespace: namespace,
JobTemplateName: "name1",

Action: jobflowv1alpha1.SyncJobTemplateAction,
}
req2 := apis.FlowRequest{
Namespace: namespace,
JobTemplateName: "name2",

Action: jobflowv1alpha1.SyncJobTemplateAction,
}

testCases := []struct {
Name string
newReq apis.FlowRequest
oldReq apis.FlowRequest
ExpectValue int
}{
{
Name: "de-duplicate",
newReq: req1,
oldReq: req1,
ExpectValue: 1,
},
{
Name: "no-deduplicate",
newReq: req1,
oldReq: req2,
ExpectValue: 2,
},
}

for i, testcase := range testCases {
t.Run(testcase.Name, func(t *testing.T) {
fakeController := newFakeController()

fakeController.enqueueJobTemplate(testcase.oldReq)
fakeController.enqueueJobTemplate(testcase.newReq)
queueLen := fakeController.queue.Len()
if testcase.ExpectValue != queueLen {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, queueLen)
}
})
}

}

0 comments on commit 00c5e7b

Please sign in to comment.