From 5c44771b75cca8d83c827b53e764b28af5e5236c Mon Sep 17 00:00:00 2001 From: TommyLike Date: Fri, 26 Jul 2019 11:45:00 +0800 Subject: [PATCH] Fix race condition issue --- Makefile | 2 +- pkg/controllers/job/job_controller_actions.go | 26 ++++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/Makefile b/Makefile index 4cf0fc8e91..1ac867a33e 100644 --- a/Makefile +++ b/Makefile @@ -68,7 +68,7 @@ generate-code: ./hack/update-gencode.sh unit-test: - go list ./... | grep -v e2e | xargs go test -v -cover -covermode atomic -coverprofile coverage.txt + go list ./... | grep -v e2e | xargs go test -v -cover -covermode atomic -coverprofile coverage.txt -race e2e-test-kind: ./hack/run-e2e-kind.sh diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 8d4b156958..f528765e4c 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -20,6 +20,7 @@ import ( "fmt" "sort" "sync" + "sync/atomic" "github.com/golang/glog" @@ -215,6 +216,13 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt var podToDelete []*v1.Pod var creationErrs []error var deletionErrs []error + appendMutex := sync.Mutex{} + + appendError := func(container *[]error, err error) { + appendMutex.Lock() + defer appendMutex.Unlock() + *container = append(*container, err) + } for _, ts := range job.Spec.Tasks { ts.Template.Name = ts.Name @@ -238,7 +246,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt delete(pods, podName) if pod.DeletionTimestamp != nil { glog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) - terminating++ + atomic.AddInt32(&terminating, 1) continue } @@ -263,7 +271,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt // So gang-scheduling could schedule the Job successfully glog.Errorf("Failed to create pod %s for Job %s, err %#v", pod.Name, job.Name, err) - creationErrs = append(creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err)) + appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err)) } else { if err != nil && apierrors.IsAlreadyExists(err) { cc.resyncTask(pod) @@ -297,12 +305,12 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt // So gang-scheduling could schedule the Job successfully glog.Errorf("Failed to delete pod %s for Job %s, err %#v", pod.Name, job.Name, err) - deletionErrs = append(deletionErrs, err) + appendError(&deletionErrs, err) cc.resyncTask(pod) } else { glog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>", pod.Name, job.Namespace, job.Name) - terminating++ + atomic.AddInt32(&terminating, 1) } }(pod) } @@ -546,15 +554,15 @@ func (cc *Controller) initJobStatus(job *vkv1.Job) (*vkv1.Job, error) { func classifyAndAddUpPodBaseOnPhase(pod *v1.Pod, pending, running, succeeded, failed, unknown *int32) { switch pod.Status.Phase { case v1.PodPending: - *pending++ + atomic.AddInt32(pending, 1) case v1.PodRunning: - *running++ + atomic.AddInt32(running, 1) case v1.PodSucceeded: - *succeeded++ + atomic.AddInt32(succeeded, 1) case v1.PodFailed: - *failed++ + atomic.AddInt32(failed, 1) default: - *unknown++ + atomic.AddInt32(unknown, 1) } return }