Skip to content

Commit

Permalink
feat: resume pipelines on restart (apache#7229)
Browse files Browse the repository at this point in the history
* feat: resume pipelines on restart

* feat: gitextractor clone phase must be executed on resume

* fix: subtasks always get skipped
  • Loading branch information
klesh authored Apr 1, 2024
1 parent ef714f9 commit e5ee65e
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 45 deletions.
1 change: 1 addition & 0 deletions backend/core/config/config_viper.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func setDefaultValue(v *viper.Viper) {
v.SetDefault("PLUGIN_DIR", "bin/plugins")
v.SetDefault("REMOTE_PLUGIN_DIR", "python/plugins")
v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger")
v.SetDefault("RESUME_PIPELINES", true)
}

func init() {
Expand Down
1 change: 1 addition & 0 deletions backend/core/models/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
const (
TASK_CREATED = "TASK_CREATED"
TASK_RERUN = "TASK_RERUN"
TASK_RESUME = "TASK_RESUME"
TASK_RUNNING = "TASK_RUNNING"
TASK_COMPLETED = "TASK_COMPLETED"
TASK_FAILED = "TASK_FAILED"
Expand Down
1 change: 1 addition & 0 deletions backend/core/plugin/plugin_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type SubTaskMeta struct {
Dependencies []*SubTaskMeta
DependencyTables []string
ProductTables []string
ForceRunOnResume bool // Should a subtask be ran dispite it was finished before
}

// PluginTask Implement this interface to let framework run tasks for you
Expand Down
5 changes: 3 additions & 2 deletions backend/core/runner/run_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package runner

import (
gocontext "context"
"time"

"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"time"
)

// RunPipeline FIXME ...
Expand All @@ -37,7 +38,7 @@ func RunPipeline(
var tasks []models.Task
err := db.All(
&tasks,
dal.Where("pipeline_id = ? AND status in ?", pipelineId, []string{models.TASK_CREATED, models.TASK_RERUN}),
dal.Where("pipeline_id = ? AND status in ?", pipelineId, []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}),
dal.Orderby("pipeline_row, pipeline_col"),
)
if err != nil {
Expand Down
47 changes: 32 additions & 15 deletions backend/core/runner/run_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ func RunTask(
if err := db.First(task, dal.Where("id = ?", taskId)); err != nil {
return err
}
if task.Status == models.TASK_COMPLETED {
return errors.Default.New("invalid task status")
}
dbPipeline := &models.Pipeline{}
if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId)); err != nil {
return err
Expand All @@ -60,6 +57,9 @@ func RunTask(
return err
}
beganAt := time.Now()
if dbPipeline.BeganAt != nil {
beganAt = *dbPipeline.BeganAt
}
// make sure task status always correct even if it panicked
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -119,6 +119,10 @@ func RunTask(
}
}()

if task.Status == models.TASK_COMPLETED {
return nil
}

// start execution
logger.Info("start executing task: %d", task.ID)
dbe := db.UpdateColumns(task, []dal.DalSet{
Expand Down Expand Up @@ -298,7 +302,6 @@ func RunPluginSubTasks(
continue
}
// run subtask
logger.Info("executing subtask %s", subtaskMeta.Name)
subtaskNumber++
if progress != nil {
progress <- plugin.RunningProgress{
Expand All @@ -307,18 +310,32 @@ func RunPluginSubTasks(
SubTaskNumber: subtaskNumber,
}
}
err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint)
if err != nil {
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
logger.Error(err, "")
where := dal.Where("task_id = ? and name = ?", task.ID, subtaskCtx.GetName())
if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "is_failed", Value: 1},
{ColumnName: "message", Value: err.Error()},
}, where); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName())
subtaskFinsied := false
if !subtaskMeta.ForceRunOnResume {
sfc := errors.Must1(
basicRes.GetDal().Count(
dal.From(&models.Subtask{}), dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", task.ID, subtaskMeta.Name),
),
)
subtaskFinsied = sfc > 0
}
if subtaskFinsied {
logger.Info("subtask %s already finished previously", subtaskMeta.Name)
} else {
logger.Info("executing subtask %s", subtaskMeta.Name)
err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint)
if err != nil {
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
logger.Error(err, "")
where := dal.Where("task_id = ? and name = ?", task.ID, subtaskCtx.GetName())
if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "is_failed", Value: 1},
{ColumnName: "message", Value: err.Error()},
}, where); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName())
}
return err
}
return err
}
taskCtx.IncProgress(1)
}
Expand Down
1 change: 1 addition & 0 deletions backend/plugins/gitextractor/tasks/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var CloneGitRepoMeta = plugin.SubTaskMeta{
Required: true,
Description: "clone a git repo, make it available to later tasks",
DomainTypes: []string{plugin.DOMAIN_TYPE_CODE},
ForceRunOnResume: true,
}

func useGoGit(subTaskCtx plugin.SubTaskContext, taskData *GitExtractorTaskData) bool {
Expand Down
58 changes: 30 additions & 28 deletions backend/server/services/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,34 +79,14 @@ func pipelineServiceInit() {
}

// standalone mode: reset pipeline status
errMsg := "The process was terminated unexpectedly"
err := db.UpdateColumns(
&models.Pipeline{},
[]dal.DalSet{
{ColumnName: "status", Value: models.TASK_FAILED},
{ColumnName: "message", Value: errMsg},
},
dal.Where("status = ?", models.TASK_RUNNING),
)
if err != nil {
panic(err)
}
err = db.UpdateColumns(
&models.Task{},
[]dal.DalSet{
{ColumnName: "status", Value: models.TASK_FAILED},
{ColumnName: "message", Value: errMsg},
},
dal.Where("status = ?", models.TASK_RUNNING),
)
if err != nil {
panic(err)
if cfg.GetBool("RESUME_PIPELINES") {
markInterruptedPipelineAs(models.TASK_RESUME)
} else {
markInterruptedPipelineAs(models.TASK_FAILED)
}

err = ReloadBlueprints()
if err != nil {
panic(err)
}
// load cronjobs for blueprints
errors.Must(ReloadBlueprints())

var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
if pipelineMaxParallel < 0 {
Expand All @@ -120,6 +100,23 @@ func pipelineServiceInit() {
go RunPipelineInQueue(pipelineMaxParallel)
}

func markInterruptedPipelineAs(status string) {
errors.Must(db.UpdateColumns(
&models.Pipeline{},
[]dal.DalSet{
{ColumnName: "status", Value: status},
},
dal.Where("status = ?", models.TASK_RUNNING),
))
errors.Must(db.UpdateColumns(
&models.Task{},
[]dal.DalSet{
{ColumnName: "status", Value: status},
},
dal.Where("status = ?", models.TASK_RUNNING),
))
}

// CreatePipeline and return the model
func CreatePipeline(newPipeline *models.NewPipeline, shouldSanitize bool) (*models.Pipeline, errors.Error) {
pipeline, err := CreateDbPipeline(newPipeline)
Expand Down Expand Up @@ -238,7 +235,7 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline,
// prepare query to find an appropriate pipeline to execute
pipeline = &models.Pipeline{}
err = tx.First(pipeline,
dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN}),
dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}),
dal.Join(
`left join _devlake_pipeline_labels ON
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
Expand All @@ -254,11 +251,16 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline,
)
if err == nil {
// mark the pipeline running, now we want a write lock
if pipeline.BeganAt == nil {
now := time.Now()
pipeline.BeganAt = &now
globalPipelineLog.Info("resumed pipeline #%d", pipeline.ID)
}
errors.Must(tx.LockTables(dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}}))
err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_RUNNING},
{ColumnName: "message", Value: ""},
{ColumnName: "began_at", Value: time.Now()},
{ColumnName: "began_at", Value: pipeline.BeganAt},
}, dal.Where("id = ?", pipeline.ID))
if err != nil {
panic(err)
Expand Down

0 comments on commit e5ee65e

Please sign in to comment.