From e5ee65ee0ea8b39e4c1edaa54851d4869c1f8a88 Mon Sep 17 00:00:00 2001 From: Klesh Wong Date: Mon, 1 Apr 2024 15:34:27 +0800 Subject: [PATCH] feat: resume pipelines on restart (#7229) * feat: resume pipelines on restart * feat: gitextractor clone phase must be executed on resume * fix: subtasks always get skipped --- backend/core/config/config_viper.go | 1 + backend/core/models/task.go | 1 + backend/core/plugin/plugin_task.go | 1 + backend/core/runner/run_pipeline.go | 5 +- backend/core/runner/run_task.go | 47 +++++++++++------ backend/plugins/gitextractor/tasks/clone.go | 1 + backend/server/services/pipeline.go | 58 +++++++++++---------- 7 files changed, 69 insertions(+), 45 deletions(-) diff --git a/backend/core/config/config_viper.go b/backend/core/config/config_viper.go index 3e4e4b0c876..ea5ed6da942 100644 --- a/backend/core/config/config_viper.go +++ b/backend/core/config/config_viper.go @@ -104,6 +104,7 @@ func setDefaultValue(v *viper.Viper) { v.SetDefault("PLUGIN_DIR", "bin/plugins") v.SetDefault("REMOTE_PLUGIN_DIR", "python/plugins") v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger") + v.SetDefault("RESUME_PIPELINES", true) } func init() { diff --git a/backend/core/models/task.go b/backend/core/models/task.go index fd1b7ffbfa3..661ba091ade 100644 --- a/backend/core/models/task.go +++ b/backend/core/models/task.go @@ -26,6 +26,7 @@ import ( const ( TASK_CREATED = "TASK_CREATED" TASK_RERUN = "TASK_RERUN" + TASK_RESUME = "TASK_RESUME" TASK_RUNNING = "TASK_RUNNING" TASK_COMPLETED = "TASK_COMPLETED" TASK_FAILED = "TASK_FAILED" diff --git a/backend/core/plugin/plugin_task.go b/backend/core/plugin/plugin_task.go index 357fa2dc41e..136f4f17eb8 100644 --- a/backend/core/plugin/plugin_task.go +++ b/backend/core/plugin/plugin_task.go @@ -106,6 +106,7 @@ type SubTaskMeta struct { Dependencies []*SubTaskMeta DependencyTables []string ProductTables []string + ForceRunOnResume bool // Should a subtask be ran dispite it was finished before } // PluginTask Implement this interface to let framework run tasks for you diff --git a/backend/core/runner/run_pipeline.go b/backend/core/runner/run_pipeline.go index 24a07453b66..7df1d994dcd 100644 --- a/backend/core/runner/run_pipeline.go +++ b/backend/core/runner/run_pipeline.go @@ -19,11 +19,12 @@ package runner import ( gocontext "context" + "time" + "github.com/apache/incubator-devlake/core/context" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models" - "time" ) // RunPipeline FIXME ... @@ -37,7 +38,7 @@ func RunPipeline( var tasks []models.Task err := db.All( &tasks, - dal.Where("pipeline_id = ? AND status in ?", pipelineId, []string{models.TASK_CREATED, models.TASK_RERUN}), + dal.Where("pipeline_id = ? AND status in ?", pipelineId, []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}), dal.Orderby("pipeline_row, pipeline_col"), ) if err != nil { diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go index 7dbbe5549f2..0b4e0604cd5 100644 --- a/backend/core/runner/run_task.go +++ b/backend/core/runner/run_task.go @@ -47,9 +47,6 @@ func RunTask( if err := db.First(task, dal.Where("id = ?", taskId)); err != nil { return err } - if task.Status == models.TASK_COMPLETED { - return errors.Default.New("invalid task status") - } dbPipeline := &models.Pipeline{} if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId)); err != nil { return err @@ -60,6 +57,9 @@ func RunTask( return err } beganAt := time.Now() + if dbPipeline.BeganAt != nil { + beganAt = *dbPipeline.BeganAt + } // make sure task status always correct even if it panicked defer func() { if r := recover(); r != nil { @@ -119,6 +119,10 @@ func RunTask( } }() + if task.Status == models.TASK_COMPLETED { + return nil + } + // start execution logger.Info("start executing task: %d", task.ID) dbe := db.UpdateColumns(task, []dal.DalSet{ @@ -298,7 +302,6 @@ func RunPluginSubTasks( continue } // run subtask - logger.Info("executing subtask %s", subtaskMeta.Name) subtaskNumber++ if progress != nil { progress <- plugin.RunningProgress{ @@ -307,18 +310,32 @@ func RunPluginSubTasks( SubTaskNumber: subtaskNumber, } } - err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint) - if err != nil { - err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta)) - logger.Error(err, "") - where := dal.Where("task_id = ? and name = ?", task.ID, subtaskCtx.GetName()) - if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ - {ColumnName: "is_failed", Value: 1}, - {ColumnName: "message", Value: err.Error()}, - }, where); err != nil { - basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName()) + subtaskFinsied := false + if !subtaskMeta.ForceRunOnResume { + sfc := errors.Must1( + basicRes.GetDal().Count( + dal.From(&models.Subtask{}), dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", task.ID, subtaskMeta.Name), + ), + ) + subtaskFinsied = sfc > 0 + } + if subtaskFinsied { + logger.Info("subtask %s already finished previously", subtaskMeta.Name) + } else { + logger.Info("executing subtask %s", subtaskMeta.Name) + err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint) + if err != nil { + err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta)) + logger.Error(err, "") + where := dal.Where("task_id = ? and name = ?", task.ID, subtaskCtx.GetName()) + if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ + {ColumnName: "is_failed", Value: 1}, + {ColumnName: "message", Value: err.Error()}, + }, where); err != nil { + basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName()) + } + return err } - return err } taskCtx.IncProgress(1) } diff --git a/backend/plugins/gitextractor/tasks/clone.go b/backend/plugins/gitextractor/tasks/clone.go index c27921a6397..599d4adb6b1 100644 --- a/backend/plugins/gitextractor/tasks/clone.go +++ b/backend/plugins/gitextractor/tasks/clone.go @@ -38,6 +38,7 @@ var CloneGitRepoMeta = plugin.SubTaskMeta{ Required: true, Description: "clone a git repo, make it available to later tasks", DomainTypes: []string{plugin.DOMAIN_TYPE_CODE}, + ForceRunOnResume: true, } func useGoGit(subTaskCtx plugin.SubTaskContext, taskData *GitExtractorTaskData) bool { diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index 2a1ac8ccb11..80ce47e4e9e 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -79,34 +79,14 @@ func pipelineServiceInit() { } // standalone mode: reset pipeline status - errMsg := "The process was terminated unexpectedly" - err := db.UpdateColumns( - &models.Pipeline{}, - []dal.DalSet{ - {ColumnName: "status", Value: models.TASK_FAILED}, - {ColumnName: "message", Value: errMsg}, - }, - dal.Where("status = ?", models.TASK_RUNNING), - ) - if err != nil { - panic(err) - } - err = db.UpdateColumns( - &models.Task{}, - []dal.DalSet{ - {ColumnName: "status", Value: models.TASK_FAILED}, - {ColumnName: "message", Value: errMsg}, - }, - dal.Where("status = ?", models.TASK_RUNNING), - ) - if err != nil { - panic(err) + if cfg.GetBool("RESUME_PIPELINES") { + markInterruptedPipelineAs(models.TASK_RESUME) + } else { + markInterruptedPipelineAs(models.TASK_FAILED) } - err = ReloadBlueprints() - if err != nil { - panic(err) - } + // load cronjobs for blueprints + errors.Must(ReloadBlueprints()) var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL") if pipelineMaxParallel < 0 { @@ -120,6 +100,23 @@ func pipelineServiceInit() { go RunPipelineInQueue(pipelineMaxParallel) } +func markInterruptedPipelineAs(status string) { + errors.Must(db.UpdateColumns( + &models.Pipeline{}, + []dal.DalSet{ + {ColumnName: "status", Value: status}, + }, + dal.Where("status = ?", models.TASK_RUNNING), + )) + errors.Must(db.UpdateColumns( + &models.Task{}, + []dal.DalSet{ + {ColumnName: "status", Value: status}, + }, + dal.Where("status = ?", models.TASK_RUNNING), + )) +} + // CreatePipeline and return the model func CreatePipeline(newPipeline *models.NewPipeline, shouldSanitize bool) (*models.Pipeline, errors.Error) { pipeline, err := CreateDbPipeline(newPipeline) @@ -238,7 +235,7 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, // prepare query to find an appropriate pipeline to execute pipeline = &models.Pipeline{} err = tx.First(pipeline, - dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN}), + dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}), dal.Join( `left join _devlake_pipeline_labels ON _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND @@ -254,11 +251,16 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, ) if err == nil { // mark the pipeline running, now we want a write lock + if pipeline.BeganAt == nil { + now := time.Now() + pipeline.BeganAt = &now + globalPipelineLog.Info("resumed pipeline #%d", pipeline.ID) + } errors.Must(tx.LockTables(dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}})) err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{ {ColumnName: "status", Value: models.TASK_RUNNING}, {ColumnName: "message", Value: ""}, - {ColumnName: "began_at", Value: time.Now()}, + {ColumnName: "began_at", Value: pipeline.BeganAt}, }, dal.Where("id = ?", pipeline.ID)) if err != nil { panic(err)