diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go index d69cb97839b..e3b14d7983f 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go @@ -17,6 +17,9 @@ package apitest import ( "context" "fmt" + "sync" + + "github.com/sirupsen/logrus" "github.com/erda-project/erda/apistructs" "github.com/erda-project/erda/modules/pipeline/dbclient" @@ -28,9 +31,10 @@ import ( var Kind = types.Kind(spec.PipelineTaskExecutorKindAPITest) type define struct { - name types.Name - options map[string]string - dbClient *dbclient.Client + name types.Name + options map[string]string + dbClient *dbclient.Client + runningAPIs sync.Map } func (d *define) Kind() types.Kind { return Kind } @@ -44,7 +48,11 @@ func (d *define) Exist(ctx context.Context, task *spec.PipelineTask) (created bo case status == apistructs.PipelineStatusCreated: return true, false, nil case status == apistructs.PipelineStatusQueue, status == apistructs.PipelineStatusRunning: - return true, true, nil + // if apitest task is not procesing, should make status-started false + if _, alreadyProcessing := d.runningAPIs.Load(d.makeRunningApiKey(task)); alreadyProcessing { + return true, true, nil + } + return true, false, nil case status.IsEndStatus(): return true, true, nil default: @@ -57,7 +65,46 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface } func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) { - logic.Do(ctx, task) + + go func(ctx context.Context, task *spec.PipelineTask) { + if _, alreadyProcessing := d.runningAPIs.LoadOrStore(d.makeRunningApiKey(task), task); alreadyProcessing { + logrus.Warnf("apitest: task: %d already processing", task.ID) + return + } + executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{}) + if !ok { + logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID) + } + + var status = apistructs.PipelineStatusFailed + defer func() { + if r := recover(); r != nil { + logrus.Errorf("api-test logic do panic recover:%s", r) + } + // if executor chan is nil, task framework can loop query meta get status + if executorDoneCh != nil { + executorDoneCh <- apistructs.PipelineStatusDesc{Status: status} + } + d.runningAPIs.Delete(d.makeRunningApiKey(task)) + }() + + logic.Do(ctx, task) + + latestTask, err := d.dbClient.GetPipelineTask(task.ID) + if err != nil { + logrus.Errorf("failed to query latest task, err: %v \n", err) + return + } + + meta := latestTask.Result.Metadata + for _, metaField := range meta { + if metaField.Name == logic.MetaKeyResult { + if metaField.Value == logic.ResultSuccess { + status = apistructs.PipelineStatusSuccess + } + } + } + }(ctx, task) return nil, nil } @@ -70,13 +117,13 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct if err != nil { return apistructs.PipelineStatusDesc{}, fmt.Errorf("failed to query latest task, err: %v", err) } - *task = latestTask + //*task = latestTask if task.Status.IsEndStatus() { return apistructs.PipelineStatusDesc{Status: task.Status}, nil } - created, _, err := d.Exist(ctx, task) + created, started, err := d.Exist(ctx, task) if err != nil { return apistructs.PipelineStatusDesc{}, err } @@ -85,19 +132,27 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil } + if !started && len(latestTask.Result.Metadata) == 0 { + return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil + } + // status according to api success or not meta := latestTask.Result.Metadata + var status = apistructs.PipelineStatusFailed for _, metaField := range meta { if metaField.Name == logic.MetaKeyResult { if metaField.Value == logic.ResultSuccess { - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}, nil + status = apistructs.PipelineStatusSuccess + } + if metaField.Value == logic.ResultFailed { + status = apistructs.PipelineStatusFailed } - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil + return apistructs.PipelineStatusDesc{Status: status}, nil } } // return created status to do start step - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusCreated}, nil + return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusRunning}, nil } func (d *define) Inspect(ctx context.Context, task *spec.PipelineTask) (apistructs.TaskInspect, error) { @@ -116,6 +171,10 @@ func (d *define) BatchDelete(ctx context.Context, actions []*spec.PipelineTask) return nil, nil } +func (d *define) makeRunningApiKey(task *spec.PipelineTask) string { + return fmt.Sprintf("%d-%d", task.PipelineID, task.ID) +} + func init() { types.MustRegister(Kind, func(name types.Name, options map[string]string) (types.ActionExecutor, error) { dbClient, err := dbclient.New() @@ -123,9 +182,10 @@ func init() { return nil, fmt.Errorf("failed to init dbclient, err: %v", err) } return &define{ - name: name, - options: options, - dbClient: dbClient, + name: name, + options: options, + dbClient: dbClient, + runningAPIs: sync.Map{}, }, nil }) } diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go index 16bdd5b0db2..489795099f4 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "strconv" + "time" "github.com/erda-project/erda/apistructs" "github.com/erda-project/erda/modules/actionagent" @@ -26,6 +27,7 @@ import ( "github.com/erda-project/erda/pkg/apitestsv2" "github.com/erda-project/erda/pkg/apitestsv2/cookiejar" "github.com/erda-project/erda/pkg/encoding/jsonparse" + "github.com/erda-project/erda/pkg/loop" ) const ( @@ -112,11 +114,15 @@ func writeMetaFile(ctx context.Context, task *spec.PipelineTask, meta *Meta) { cb.PipelineTaskID = task.ID cbData, _ := json.Marshal(&cb) - err := pipelinefunc.CallbackActionFunc(cbData) - if err != nil { - log.Errorf("failed to callback, err: %v", err) - return - } - + // apitest should ensure that callback to pipeline after doing request + _ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(10*time.Second)). + Do(func() (bool, error) { + err := pipelinefunc.CallbackActionFunc(cbData) + if err != nil { + log.Errorf("failed to callback, err: %v", err) + return false, err + } + return true, nil + }) return } diff --git a/modules/pipeline/pipengine/reconciler/reconcile.go b/modules/pipeline/pipengine/reconciler/reconcile.go index c96997bee98..fe7971148ce 100644 --- a/modules/pipeline/pipengine/reconciler/reconcile.go +++ b/modules/pipeline/pipengine/reconciler/reconcile.go @@ -104,7 +104,6 @@ func (r *Reconciler) reconcile(ctx context.Context, pipelineID uint64) error { if err != nil { return } - tr := taskrun.New(ctx, task, ctx.Value(ctxKeyPipelineExitCh).(chan struct{}), ctx.Value(ctxKeyPipelineExitChCancelFunc).(context.CancelFunc), r.TaskThrottler, executor, &p, r.bdl, r.dbClient, r.js, diff --git a/modules/pipeline/pipengine/reconciler/taskrun/framework.go b/modules/pipeline/pipengine/reconciler/taskrun/framework.go index 0db3fba6db4..12b0dcfd4f3 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/framework.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/framework.go @@ -27,7 +27,6 @@ import ( "github.com/erda-project/erda/modules/pipeline/conf" "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/rlog" "github.com/erda-project/erda/modules/pipeline/pkg/errorsx" - "github.com/erda-project/erda/modules/pipeline/spec" "github.com/erda-project/erda/pkg/loop" "github.com/erda-project/erda/pkg/strutil" ) @@ -35,8 +34,7 @@ import ( func (tr *TaskRun) Do(itr TaskOp) error { logrus.Infof("reconciler: pipelineID: %d, task %q begin %s", tr.P.ID, tr.Task.Name, itr.Op()) - executorDoneCh := tr.Ctx.Value(spec.MakeTaskExecutorCtxKey(tr.Task)).(chan interface{}) - o := &Elem{ErrCh: make(chan error), DoneCh: make(chan interface{}), ExitCh: make(chan struct{}), ExecutorDoneCh: executorDoneCh} + o := &Elem{ErrCh: make(chan error), DoneCh: make(chan interface{}), ExitCh: make(chan struct{})} o.TimeoutCh, o.Cancel, o.Timeout = itr.TimeoutConfig() // define op handle func @@ -161,14 +159,6 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) { // aop _ = aop.Handle(aop.NewContextForTask(*tr.Task, *tr.P, itr.TuneTriggers().AfterProcessing)) - case data := <-o.ExecutorDoneCh: - tr.LogStep(itr.Op(), fmt.Sprintf("framework accept signal from executor %s, begin do WhenDone", tr.Executor.Name())) - defer tr.LogStep(itr.Op(), fmt.Sprintf("framework accept signal from executor %s, end do WhenDone", tr.Executor.Name())) - if err := itr.WhenDone(data); err != nil { - errs = append(errs, err.Error()) - } - _ = aop.Handle(aop.NewContextForTask(*tr.Task, *tr.P, itr.TuneTriggers().AfterProcessing)) - case err := <-o.ErrCh: logrus.Errorf("reconciler: pipelineID: %d, task %q %s received error (%v)", tr.P.ID, tr.Task.Name, itr.Op(), err) if errorsx.IsNetworkError(err) { diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go index c3183c20de5..af391770850 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go @@ -17,6 +17,7 @@ package taskop import ( "context" "errors" + "math" "time" "github.com/sirupsen/logrus" @@ -25,13 +26,16 @@ import ( "github.com/erda-project/erda/modules/pipeline/aop/aoptypes" "github.com/erda-project/erda/modules/pipeline/commonutil/costtimeutil" "github.com/erda-project/erda/modules/pipeline/conf" - "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/rlog" "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun" - "github.com/erda-project/erda/pkg/loop" ) var err4EnableDeclineRatio = errors.New("enable decline ratio") +var ( + declineRatio float64 = 1.5 + declineLimit time.Duration = 10 * time.Second +) + type wait taskrun.TaskRun func NewWait(tr *taskrun.TaskRun) *wait { @@ -47,55 +51,42 @@ func (w *wait) TaskRun() *taskrun.TaskRun { } func (w *wait) Processing() (interface{}, error) { - stopWaitCh := make(chan struct{}) - defer func() { - stopWaitCh <- struct{}{} - }() - go func() { + var ( + data interface{} + loopedTimes uint64 + ) + + timer := time.NewTimer(w.calculateNextLoopTimeDuration(loopedTimes)) + defer timer.Stop() + for { select { + case doneData := <-w.ExecutorDoneCh: + logrus.Infof("%s: accept signal from executor %s, data: %v", w.Op(), w.Executor.Name(), doneData) + return doneData, nil case <-w.Ctx.Done(): - w.StopWaitLoop = true - return + return data, nil case <-w.PExitCh: logrus.Warnf("reconciler: pipeline exit, stop wait, pipelineID: %d, taskID: %d", w.P.ID, w.Task.ID) - return - case <-stopWaitCh: - rlog.TDebugf(w.P.ID, w.Task.ID, "stop wait") - close(stopWaitCh) - return - } - }() - - var ( - data interface{} - ) - - err := loop.New(loop.WithDeclineRatio(1.5), loop.WithDeclineLimit(time.Second*10)).Do(func() (abort bool, err error) { - if w.QuitWaitTimeout { - return true, nil - } - - statusDesc, err := w.Executor.Status(w.Ctx, w.Task) - if err != nil { - logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status failed, err: %v", - w.P.ID, w.Task.Name, err) - return true, err - } - - if statusDesc.Status == apistructs.PipelineStatusUnknown { - logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown) - return false, err4EnableDeclineRatio - } + return data, nil + case <-timer.C: + statusDesc, err := w.Executor.Status(w.Ctx, w.Task) + if err != nil { + logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status failed, err: %v", + w.P.ID, w.Task.Name, err) + return nil, err + } + if statusDesc.Status.IsEndStatus() { + data = statusDesc + return data, nil + } + if statusDesc.Status == apistructs.PipelineStatusUnknown { + logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown) + } - if statusDesc.Status.IsEndStatus() { - data = statusDesc - return true, nil + loopedTimes++ + timer.Reset(w.calculateNextLoopTimeDuration(loopedTimes)) } - - return w.StopWaitLoop, err4EnableDeclineRatio - }) - - return data, err + } } func (w *wait) WhenDone(data interface{}) error { @@ -186,3 +177,12 @@ func (w *wait) TuneTriggers() taskrun.TaskOpTuneTriggers { AfterProcessing: aoptypes.TuneTriggerTaskAfterWait, } } + +func (w *wait) calculateNextLoopTimeDuration(loopedTimes uint64) time.Duration { + lastSleepTime := time.Second + lastSleepTime = time.Duration(float64(lastSleepTime) * math.Pow(declineRatio, float64(loopedTimes))) + if lastSleepTime > declineLimit { + return declineLimit + } + return lastSleepTime +} diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go new file mode 100644 index 00000000000..069eaf47088 --- /dev/null +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go @@ -0,0 +1,76 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package taskop + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun" +) + +func TestCalculateNextLoopTimeDuration(t *testing.T) { + tt := []struct { + loopedTimes uint64 + want string + }{ + { + loopedTimes: 0, + want: "1s", + }, + { + loopedTimes: 1, + want: "1.5s", + }, + { + loopedTimes: 2, + want: "2.25s", + }, + { + loopedTimes: 3, + want: "3.375s", + }, + { + loopedTimes: 4, + want: "5.0625s", + }, + { + loopedTimes: 5, + want: "7.59375s", + }, + { + loopedTimes: 6, + want: "10s", + }, + { + loopedTimes: 7, + want: "10s", + }, + { + loopedTimes: 8, + want: "10s", + }, + { + loopedTimes: 9, + want: "10s", + }, + } + + w := NewWait(&taskrun.TaskRun{}) + for i := range tt { + assert.Equal(t, tt[i].want, w.calculateNextLoopTimeDuration(tt[i].loopedTimes).String()) + } +} diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go b/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go index 6f947ae411c..55b8475b4b4 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go @@ -50,9 +50,10 @@ type TaskRun struct { StopQueueLoop bool StopWaitLoop bool - PExitCh <-chan struct{} - PExitChCancel context.CancelFunc - PExit bool + PExitCh <-chan struct{} + PExitChCancel context.CancelFunc + PExit bool + ExecutorDoneCh chan interface{} // 轮训状态间隔期间可能任务已经是终态,FakeTimeout = true FakeTimeout bool @@ -70,8 +71,10 @@ func New(ctx context.Context, task *spec.PipelineTask, actionAgentSvc *actionagentsvc.ActionAgentSvc, extMarketSvc *extmarketsvc.ExtMarketSvc, ) *TaskRun { + // make executor has buffer, don't block task framework + executorCh := make(chan interface{}, 1) return &TaskRun{ - Ctx: context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), make(chan interface{})), + Ctx: context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorCh), Task: task, Executor: executor, Throttler: throttler, @@ -87,8 +90,9 @@ func New(ctx context.Context, task *spec.PipelineTask, StopQueueLoop: false, StopWaitLoop: false, - PExitCh: pExitCh, - PExitChCancel: pExitChCancel, + PExitCh: pExitCh, + PExitChCancel: pExitChCancel, + ExecutorDoneCh: executorCh, ActionAgentSvc: actionAgentSvc, ExtMarketSvc: extMarketSvc, @@ -135,9 +139,8 @@ type Elem struct { Cancel context.CancelFunc Timeout time.Duration - ErrCh chan error - DoneCh chan interface{} - ExecutorDoneCh chan interface{} // executorDoneCh allow action executor return directly + ErrCh chan error + DoneCh chan interface{} ExitCh chan struct{} }