Skip to content

Commit

Permalink
Fix/pipeline apitest cost time (#2650)
Browse files Browse the repository at this point in the history
* modify task wait op, listen executor channel

* apitest action don't set task pointers value

* api-test executor define add snyc.map to store running task
  • Loading branch information
chengjoey authored Nov 2, 2021
1 parent 2c564ab commit a7e0350
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package apitest
import (
"context"
"fmt"
"sync"

"github.com/sirupsen/logrus"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/dbclient"
Expand All @@ -28,9 +31,10 @@ import (
var Kind = types.Kind(spec.PipelineTaskExecutorKindAPITest)

type define struct {
name types.Name
options map[string]string
dbClient *dbclient.Client
name types.Name
options map[string]string
dbClient *dbclient.Client
runningAPIs sync.Map
}

func (d *define) Kind() types.Kind { return Kind }
Expand All @@ -44,7 +48,11 @@ func (d *define) Exist(ctx context.Context, task *spec.PipelineTask) (created bo
case status == apistructs.PipelineStatusCreated:
return true, false, nil
case status == apistructs.PipelineStatusQueue, status == apistructs.PipelineStatusRunning:
return true, true, nil
// if apitest task is not procesing, should make status-started false
if _, alreadyProcessing := d.runningAPIs.Load(d.makeRunningApiKey(task)); alreadyProcessing {
return true, true, nil
}
return true, false, nil
case status.IsEndStatus():
return true, true, nil
default:
Expand All @@ -57,7 +65,46 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface
}

func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) {
logic.Do(ctx, task)

go func(ctx context.Context, task *spec.PipelineTask) {
if _, alreadyProcessing := d.runningAPIs.LoadOrStore(d.makeRunningApiKey(task), task); alreadyProcessing {
logrus.Warnf("apitest: task: %d already processing", task.ID)
return
}
executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{})
if !ok {
logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID)
}

var status = apistructs.PipelineStatusFailed
defer func() {
if r := recover(); r != nil {
logrus.Errorf("api-test logic do panic recover:%s", r)
}
// if executor chan is nil, task framework can loop query meta get status
if executorDoneCh != nil {
executorDoneCh <- apistructs.PipelineStatusDesc{Status: status}
}
d.runningAPIs.Delete(d.makeRunningApiKey(task))
}()

logic.Do(ctx, task)

latestTask, err := d.dbClient.GetPipelineTask(task.ID)
if err != nil {
logrus.Errorf("failed to query latest task, err: %v \n", err)
return
}

meta := latestTask.Result.Metadata
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
if metaField.Value == logic.ResultSuccess {
status = apistructs.PipelineStatusSuccess
}
}
}
}(ctx, task)
return nil, nil
}

Expand All @@ -70,13 +117,13 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
if err != nil {
return apistructs.PipelineStatusDesc{}, fmt.Errorf("failed to query latest task, err: %v", err)
}
*task = latestTask
//*task = latestTask

if task.Status.IsEndStatus() {
return apistructs.PipelineStatusDesc{Status: task.Status}, nil
}

created, _, err := d.Exist(ctx, task)
created, started, err := d.Exist(ctx, task)
if err != nil {
return apistructs.PipelineStatusDesc{}, err
}
Expand All @@ -85,19 +132,27 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil
}

if !started && len(latestTask.Result.Metadata) == 0 {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil
}

// status according to api success or not
meta := latestTask.Result.Metadata
var status = apistructs.PipelineStatusFailed
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
if metaField.Value == logic.ResultSuccess {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}, nil
status = apistructs.PipelineStatusSuccess
}
if metaField.Value == logic.ResultFailed {
status = apistructs.PipelineStatusFailed
}
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil
return apistructs.PipelineStatusDesc{Status: status}, nil
}
}

// return created status to do start step
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusCreated}, nil
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusRunning}, nil
}

func (d *define) Inspect(ctx context.Context, task *spec.PipelineTask) (apistructs.TaskInspect, error) {
Expand All @@ -116,16 +171,21 @@ func (d *define) BatchDelete(ctx context.Context, actions []*spec.PipelineTask)
return nil, nil
}

func (d *define) makeRunningApiKey(task *spec.PipelineTask) string {
return fmt.Sprintf("%d-%d", task.PipelineID, task.ID)
}

func init() {
types.MustRegister(Kind, func(name types.Name, options map[string]string) (types.ActionExecutor, error) {
dbClient, err := dbclient.New()
if err != nil {
return nil, fmt.Errorf("failed to init dbclient, err: %v", err)
}
return &define{
name: name,
options: options,
dbClient: dbClient,
name: name,
options: options,
dbClient: dbClient,
runningAPIs: sync.Map{},
}, nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"time"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/actionagent"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/erda-project/erda/pkg/apitestsv2"
"github.com/erda-project/erda/pkg/apitestsv2/cookiejar"
"github.com/erda-project/erda/pkg/encoding/jsonparse"
"github.com/erda-project/erda/pkg/loop"
)

const (
Expand Down Expand Up @@ -112,11 +114,15 @@ func writeMetaFile(ctx context.Context, task *spec.PipelineTask, meta *Meta) {
cb.PipelineTaskID = task.ID

cbData, _ := json.Marshal(&cb)
err := pipelinefunc.CallbackActionFunc(cbData)
if err != nil {
log.Errorf("failed to callback, err: %v", err)
return
}

// apitest should ensure that callback to pipeline after doing request
_ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(10*time.Second)).
Do(func() (bool, error) {
err := pipelinefunc.CallbackActionFunc(cbData)
if err != nil {
log.Errorf("failed to callback, err: %v", err)
return false, err
}
return true, nil
})
return
}
1 change: 0 additions & 1 deletion modules/pipeline/pipengine/reconciler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (r *Reconciler) reconcile(ctx context.Context, pipelineID uint64) error {
if err != nil {
return
}

tr := taskrun.New(ctx, task,
ctx.Value(ctxKeyPipelineExitCh).(chan struct{}), ctx.Value(ctxKeyPipelineExitChCancelFunc).(context.CancelFunc),
r.TaskThrottler, executor, &p, r.bdl, r.dbClient, r.js,
Expand Down
12 changes: 1 addition & 11 deletions modules/pipeline/pipengine/reconciler/taskrun/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ import (
"github.com/erda-project/erda/modules/pipeline/conf"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/rlog"
"github.com/erda-project/erda/modules/pipeline/pkg/errorsx"
"github.com/erda-project/erda/modules/pipeline/spec"
"github.com/erda-project/erda/pkg/loop"
"github.com/erda-project/erda/pkg/strutil"
)

func (tr *TaskRun) Do(itr TaskOp) error {
logrus.Infof("reconciler: pipelineID: %d, task %q begin %s", tr.P.ID, tr.Task.Name, itr.Op())

executorDoneCh := tr.Ctx.Value(spec.MakeTaskExecutorCtxKey(tr.Task)).(chan interface{})
o := &Elem{ErrCh: make(chan error), DoneCh: make(chan interface{}), ExitCh: make(chan struct{}), ExecutorDoneCh: executorDoneCh}
o := &Elem{ErrCh: make(chan error), DoneCh: make(chan interface{}), ExitCh: make(chan struct{})}
o.TimeoutCh, o.Cancel, o.Timeout = itr.TimeoutConfig()

// define op handle func
Expand Down Expand Up @@ -161,14 +159,6 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) {
// aop
_ = aop.Handle(aop.NewContextForTask(*tr.Task, *tr.P, itr.TuneTriggers().AfterProcessing))

case data := <-o.ExecutorDoneCh:
tr.LogStep(itr.Op(), fmt.Sprintf("framework accept signal from executor %s, begin do WhenDone", tr.Executor.Name()))
defer tr.LogStep(itr.Op(), fmt.Sprintf("framework accept signal from executor %s, end do WhenDone", tr.Executor.Name()))
if err := itr.WhenDone(data); err != nil {
errs = append(errs, err.Error())
}
_ = aop.Handle(aop.NewContextForTask(*tr.Task, *tr.P, itr.TuneTriggers().AfterProcessing))

case err := <-o.ErrCh:
logrus.Errorf("reconciler: pipelineID: %d, task %q %s received error (%v)", tr.P.ID, tr.Task.Name, itr.Op(), err)
if errorsx.IsNetworkError(err) {
Expand Down
90 changes: 45 additions & 45 deletions modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package taskop
import (
"context"
"errors"
"math"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -25,13 +26,16 @@ import (
"github.com/erda-project/erda/modules/pipeline/aop/aoptypes"
"github.com/erda-project/erda/modules/pipeline/commonutil/costtimeutil"
"github.com/erda-project/erda/modules/pipeline/conf"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/rlog"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun"
"github.com/erda-project/erda/pkg/loop"
)

var err4EnableDeclineRatio = errors.New("enable decline ratio")

var (
declineRatio float64 = 1.5
declineLimit time.Duration = 10 * time.Second
)

type wait taskrun.TaskRun

func NewWait(tr *taskrun.TaskRun) *wait {
Expand All @@ -47,55 +51,42 @@ func (w *wait) TaskRun() *taskrun.TaskRun {
}

func (w *wait) Processing() (interface{}, error) {
stopWaitCh := make(chan struct{})
defer func() {
stopWaitCh <- struct{}{}
}()
go func() {
var (
data interface{}
loopedTimes uint64
)

timer := time.NewTimer(w.calculateNextLoopTimeDuration(loopedTimes))
defer timer.Stop()
for {
select {
case doneData := <-w.ExecutorDoneCh:
logrus.Infof("%s: accept signal from executor %s, data: %v", w.Op(), w.Executor.Name(), doneData)
return doneData, nil
case <-w.Ctx.Done():
w.StopWaitLoop = true
return
return data, nil
case <-w.PExitCh:
logrus.Warnf("reconciler: pipeline exit, stop wait, pipelineID: %d, taskID: %d", w.P.ID, w.Task.ID)
return
case <-stopWaitCh:
rlog.TDebugf(w.P.ID, w.Task.ID, "stop wait")
close(stopWaitCh)
return
}
}()

var (
data interface{}
)

err := loop.New(loop.WithDeclineRatio(1.5), loop.WithDeclineLimit(time.Second*10)).Do(func() (abort bool, err error) {
if w.QuitWaitTimeout {
return true, nil
}

statusDesc, err := w.Executor.Status(w.Ctx, w.Task)
if err != nil {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status failed, err: %v",
w.P.ID, w.Task.Name, err)
return true, err
}

if statusDesc.Status == apistructs.PipelineStatusUnknown {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown)
return false, err4EnableDeclineRatio
}
return data, nil
case <-timer.C:
statusDesc, err := w.Executor.Status(w.Ctx, w.Task)
if err != nil {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status failed, err: %v",
w.P.ID, w.Task.Name, err)
return nil, err
}
if statusDesc.Status.IsEndStatus() {
data = statusDesc
return data, nil
}
if statusDesc.Status == apistructs.PipelineStatusUnknown {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown)
}

if statusDesc.Status.IsEndStatus() {
data = statusDesc
return true, nil
loopedTimes++
timer.Reset(w.calculateNextLoopTimeDuration(loopedTimes))
}

return w.StopWaitLoop, err4EnableDeclineRatio
})

return data, err
}
}

func (w *wait) WhenDone(data interface{}) error {
Expand Down Expand Up @@ -186,3 +177,12 @@ func (w *wait) TuneTriggers() taskrun.TaskOpTuneTriggers {
AfterProcessing: aoptypes.TuneTriggerTaskAfterWait,
}
}

func (w *wait) calculateNextLoopTimeDuration(loopedTimes uint64) time.Duration {
lastSleepTime := time.Second
lastSleepTime = time.Duration(float64(lastSleepTime) * math.Pow(declineRatio, float64(loopedTimes)))
if lastSleepTime > declineLimit {
return declineLimit
}
return lastSleepTime
}
Loading

0 comments on commit a7e0350

Please sign in to comment.