Skip to content

Commit

Permalink
check data version when accept from executor done chan
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Dec 17, 2021
1 parent da58d7f commit 16f30ef
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface
func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) {

go func(ctx context.Context, task *spec.PipelineTask) {
executorVersion := task.GenerateExecutorVersion()
if _, alreadyProcessing := d.runningAPIs.LoadOrStore(d.makeRunningApiKey(task), task); alreadyProcessing {
logrus.Warnf("apitest: task: %d already processing", task.ID)
return
}
executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{})
executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan spec.ExecutorChanData)
if !ok {
logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID)
}
Expand All @@ -83,7 +84,10 @@ func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{
}
// if executor chan is nil, task framework can loop query meta get status
if executorDoneCh != nil {
executorDoneCh <- apistructs.PipelineStatusDesc{Status: status}
executorDoneCh <- spec.ExecutorChanData{
Data: apistructs.PipelineStatusDesc{Status: status},
Version: executorVersion,
}
}
d.runningAPIs.Delete(d.makeRunningApiKey(task))
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (w *Wait) Start(ctx context.Context, task *spec.PipelineTask) (interface{},
return nil, nil
}

executorDoneCh := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{})
executorDoneCh := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan spec.ExecutorChanData)
if executorDoneCh == nil {
return nil, errors.Errorf("wait: failed to get exector channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID)
}
Expand All @@ -104,12 +104,16 @@ func (w *Wait) Start(ctx context.Context, task *spec.PipelineTask) (interface{},

timer := time.NewTimer(time.Duration(waitSec) * time.Second)
go func() {
executorVersion := task.GenerateExecutorVersion()
select {
case <-ctx.Done():
logrus.Warnf("wait received stop timer signal, canceled, reason: %s", ctx.Err())
return
case <-timer.C:
executorDoneCh <- apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}
executorDoneCh <- spec.ExecutorChanData{
Data: apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess},
Version: executorVersion,
}
return
}
}()
Expand Down
11 changes: 8 additions & 3 deletions modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,14 @@ func (w *wait) Processing() (interface{}, error) {
defer timer.Stop()
for {
select {
case doneData := <-w.ExecutorDoneCh:
logrus.Infof("%s: accept signal from executor %s, data: %v", w.Op(), w.Executor.Name(), doneData)
return doneData, nil
case executorData := <-w.ExecutorDoneCh:
version := executorData.Version
if err := w.Task.CheckExecutorVersion(version); err != nil {
logrus.Warnf("%s: executor chan accept invalid signal, data: %v, err: %v", w.Op(), executorData, err)
continue
}
logrus.Infof("%s: accept signal from executor %s, data: %v", w.Op(), w.Executor.Name(), executorData)
return executorData.Data, nil
case <-w.Ctx.Done():
return data, nil
case <-w.PExitCh:
Expand Down
4 changes: 2 additions & 2 deletions modules/pipeline/pipengine/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type TaskRun struct {
PExitCh <-chan struct{}
PExitChCancel context.CancelFunc
PExit bool
ExecutorDoneCh chan interface{}
ExecutorDoneCh chan spec.ExecutorChanData

// 轮训状态间隔期间可能任务已经是终态,FakeTimeout = true
FakeTimeout bool
Expand All @@ -72,7 +72,7 @@ func New(ctx context.Context, task *spec.PipelineTask,
extMarketSvc *extmarketsvc.ExtMarketSvc,
) *TaskRun {
// make executor has buffer, don't block task framework
executorCh := make(chan interface{}, 1)
executorCh := make(chan spec.ExecutorChanData, 1)
return &TaskRun{
Ctx: context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorCh),
Task: task,
Expand Down
23 changes: 22 additions & 1 deletion modules/pipeline/spec/pipeline_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
)

const (
CtxExecutorChKeyPrefix = "executor-done-chan"
CtxExecutorChKeyPrefix = "executor-done-chan"
CtxExecutorChVersionPrefix = "executor-done-chan-version"
)

type PipelineTask struct {
Expand Down Expand Up @@ -278,6 +279,26 @@ func (pt *PipelineTask) GetMetadata() apistructs.Metadata {
return pt.Result.Metadata
}

type ExecutorChanData struct {
Data interface{}
Version string
}

func (pt *PipelineTask) GenerateExecutorVersion() string {
if pt.Extra.LoopOptions == nil {
return fmt.Sprintf("%s-%d", CtxExecutorChVersionPrefix, pt.ID)
}
return fmt.Sprintf("%s-%d-loop-%d", CtxExecutorChVersionPrefix, pt.ID, pt.Extra.LoopOptions.LoopedTimes)
}

func (pt *PipelineTask) CheckExecutorVersion(actualVersion string) error {
expectedVersion := pt.GenerateExecutorVersion()
if expectedVersion != actualVersion {
return fmt.Errorf("executor data expected version: %s, actual version: %s", expectedVersion, actualVersion)
}
return nil
}

func GenDefaultTaskResource() RuntimeResource {
return RuntimeResource{
CPU: conf.TaskDefaultCPU(),
Expand Down

0 comments on commit 16f30ef

Please sign in to comment.