From bb43b5e91c02bfd1d51ba765b6d762a36f8add57 Mon Sep 17 00:00:00 2001 From: joey Date: Fri, 19 Nov 2021 16:25:59 +0800 Subject: [PATCH] cron compensator judge cron start time before execute --- modules/pipeline/services/pipelinesvc/cron.go | 4 ++-- .../services/pipelinesvc/cron_compensate.go | 21 ++++++++++++++++--- .../pipelinesvc/cron_compensate_test.go | 11 ++++++++++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/modules/pipeline/services/pipelinesvc/cron.go b/modules/pipeline/services/pipelinesvc/cron.go index 2c14562b5f1..b4bc597533b 100644 --- a/modules/pipeline/services/pipelinesvc/cron.go +++ b/modules/pipeline/services/pipelinesvc/cron.go @@ -56,8 +56,8 @@ func (s *PipelineSvc) RunCronPipelineFunc(id uint64) { return } - // 如果当前触发时间小于定时开始时间,return - if pc.Extra.CronStartFrom != nil && cronTriggerTime.Before(*pc.Extra.CronStartFrom) { + // if trigger time less than cronStartFrom, return directly + if s.isCronShouldBeIgnored(pc) { logrus.Warnf("crond: pipelineCronID: %d, triggered but ignored, triggerTime: %s, cronStartFrom: %s", pc.ID, cronTriggerTime, *pc.Extra.CronStartFrom) return diff --git a/modules/pipeline/services/pipelinesvc/cron_compensate.go b/modules/pipeline/services/pipelinesvc/cron_compensate.go index 9369e49b8cb..33755668714 100644 --- a/modules/pipeline/services/pipelinesvc/cron_compensate.go +++ b/modules/pipeline/services/pipelinesvc/cron_compensate.go @@ -154,15 +154,21 @@ func (s *PipelineSvc) traverseDoCompensate(doCompensate func(cron spec.PipelineC } group := limit_sync_group.NewSemaphore(int(conf.CronCompensateConcurrentNumber())) - for i := range enabledCrons { + for _, pc := range enabledCrons { + if s.isCronShouldBeIgnored(pc) { + triggerTime := time.Now() + logrus.Warnf("crond compensator: pipelineCronID: %d, triggered compensate but ignored, triggerTime: %s, cronStartFrom: %s", + pc.ID, triggerTime, *pc.Extra.CronStartFrom) + continue + } if sync { - doCompensate(enabledCrons[i]) + doCompensate(pc) } else { group.Add(1) go func(pc spec.PipelineCron) { defer group.Done() doCompensate(pc) - }(enabledCrons[i]) + }(pc) } } group.Wait() @@ -464,3 +470,12 @@ func (s *PipelineSvc) createCronCompensatePipeline(pc spec.PipelineCron, trigger }, }) } + +// isCronShouldIgnore if trigger time before cron start from time, should ignore cron at this trigger time +func (s *PipelineSvc) isCronShouldBeIgnored(pc spec.PipelineCron) bool { + if pc.Extra.CronStartFrom == nil { + return false + } + triggerTime := time.Now() + return triggerTime.Before(*pc.Extra.CronStartFrom) +} diff --git a/modules/pipeline/services/pipelinesvc/cron_compensate_test.go b/modules/pipeline/services/pipelinesvc/cron_compensate_test.go index f785dd0547d..eee935aafc6 100644 --- a/modules/pipeline/services/pipelinesvc/cron_compensate_test.go +++ b/modules/pipeline/services/pipelinesvc/cron_compensate_test.go @@ -221,3 +221,14 @@ func TestGetTriggeredTime(t *testing.T) { triggerTime := getTriggeredTime(p) assert.Equal(t, now.Unix(), triggerTime.Unix()) } + +func Test_isCronShouldBeIgnored(t *testing.T) { + emptyStartTimeCron := spec.PipelineCron{} + startTime := time.Date(2099, 1, 1, 1, 1, 1, 1, time.Local) + startTimeCron := spec.PipelineCron{Extra: spec.PipelineCronExtra{ + CronStartFrom: &startTime, + }} + svc := PipelineSvc{} + assert.Equal(t, false, svc.isCronShouldBeIgnored(emptyStartTimeCron)) + assert.Equal(t, true, svc.isCronShouldBeIgnored(startTimeCron)) +}