From 9828bd5c4d2b0b2df0caa6787ebe5e46bf5d424e Mon Sep 17 00:00:00 2001 From: Klesh Wong Date: Thu, 1 Feb 2024 12:18:53 +0800 Subject: [PATCH] fix: triggering blueprint concurrently might lead to deadlock --- backend/server/services/pipeline.go | 10 ++++++---- backend/server/services/pipeline_helper.go | 3 ++- backend/server/services/task.go | 5 ++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index 9592cb14abc..f9edaff97f3 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -340,7 +340,10 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, txHelper := dbhelper.NewTxHelper(basicRes, &err) tx := txHelper.Begin() defer txHelper.End() - err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}}) + err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{ + {Table: "_devlake_pipelines", Exclusive: true}, + {Table: "_devlake_tasks", Exclusive: true}, + }) if err != nil { err = errors.BadInput.Wrap(err, "failed to lock pipeline table, is there any pending pipeline or deletion?") return @@ -385,7 +388,6 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, } // create new tasks - // TODO: this is better to be wrapped inside a transaction rerunTasks := []*models.Task{} for _, t := range failedTasks { // mark previous task failed @@ -395,7 +397,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, return nil, err } // create new task - rerunTask, err := CreateTask(&models.NewTask{ + rerunTask, err := createTask(&models.NewTask{ PipelineTask: &models.PipelineTask{ Plugin: t.Plugin, Subtasks: t.Subtasks, @@ -405,7 +407,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, PipelineRow: t.PipelineRow, PipelineCol: t.PipelineCol, IsRerun: true, - }) + }, tx) if err != nil { return nil, err } diff --git a/backend/server/services/pipeline_helper.go b/backend/server/services/pipeline_helper.go index 11c00e94d55..eec6a8c3498 100644 --- a/backend/server/services/pipeline_helper.go +++ b/backend/server/services/pipeline_helper.go @@ -37,6 +37,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipelin dal.LockTables{ {Table: "_devlake_pipelines", Exclusive: true}, {Table: "_devlake_pipeline_labels", Exclusive: true}, + {Table: "_devlake_tasks", Exclusive: true}, }, )) if err != nil { @@ -91,7 +92,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipelin PipelineRow: i + 1, PipelineCol: j + 1, } - _ = errors.Must1(CreateTask(newTask)) + _ = errors.Must1(createTask(newTask, tx)) // sync task state back to pipeline dbPipeline.TotalTasks += 1 } diff --git a/backend/server/services/task.go b/backend/server/services/task.go index 6c19abd7bf0..eaf10120d42 100644 --- a/backend/server/services/task.go +++ b/backend/server/services/task.go @@ -40,8 +40,7 @@ type TaskQuery struct { Pending int `form:"pending"` } -// CreateTask creates a new task -func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) { +func createTask(newTask *models.NewTask, tx dal.Transaction) (*models.Task, errors.Error) { task := &models.Task{ Plugin: newTask.Plugin, Subtasks: newTask.Subtasks, @@ -55,7 +54,7 @@ func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) { if newTask.IsRerun { task.Status = models.TASK_RERUN } - err := db.Create(task) + err := tx.Create(task) if err != nil { taskLog.Error(err, "save task failed") return nil, errors.Internal.Wrap(err, "save task failed")