Skip to content

Commit

Permalink
cron compensator judge cron start time before execute
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Nov 22, 2021
1 parent e9ffe30 commit bb43b5e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
4 changes: 2 additions & 2 deletions modules/pipeline/services/pipelinesvc/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (s *PipelineSvc) RunCronPipelineFunc(id uint64) {
return
}

// 如果当前触发时间小于定时开始时间,return
if pc.Extra.CronStartFrom != nil && cronTriggerTime.Before(*pc.Extra.CronStartFrom) {
// if trigger time less than cronStartFrom, return directly
if s.isCronShouldBeIgnored(pc) {
logrus.Warnf("crond: pipelineCronID: %d, triggered but ignored, triggerTime: %s, cronStartFrom: %s",
pc.ID, cronTriggerTime, *pc.Extra.CronStartFrom)
return
Expand Down
21 changes: 18 additions & 3 deletions modules/pipeline/services/pipelinesvc/cron_compensate.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,21 @@ func (s *PipelineSvc) traverseDoCompensate(doCompensate func(cron spec.PipelineC
}

group := limit_sync_group.NewSemaphore(int(conf.CronCompensateConcurrentNumber()))
for i := range enabledCrons {
for _, pc := range enabledCrons {
if s.isCronShouldBeIgnored(pc) {
triggerTime := time.Now()
logrus.Warnf("crond compensator: pipelineCronID: %d, triggered compensate but ignored, triggerTime: %s, cronStartFrom: %s",
pc.ID, triggerTime, *pc.Extra.CronStartFrom)
continue
}
if sync {
doCompensate(enabledCrons[i])
doCompensate(pc)
} else {
group.Add(1)
go func(pc spec.PipelineCron) {
defer group.Done()
doCompensate(pc)
}(enabledCrons[i])
}(pc)
}
}
group.Wait()
Expand Down Expand Up @@ -464,3 +470,12 @@ func (s *PipelineSvc) createCronCompensatePipeline(pc spec.PipelineCron, trigger
},
})
}

// isCronShouldIgnore if trigger time before cron start from time, should ignore cron at this trigger time
func (s *PipelineSvc) isCronShouldBeIgnored(pc spec.PipelineCron) bool {
if pc.Extra.CronStartFrom == nil {
return false
}
triggerTime := time.Now()
return triggerTime.Before(*pc.Extra.CronStartFrom)
}
11 changes: 11 additions & 0 deletions modules/pipeline/services/pipelinesvc/cron_compensate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,14 @@ func TestGetTriggeredTime(t *testing.T) {
triggerTime := getTriggeredTime(p)
assert.Equal(t, now.Unix(), triggerTime.Unix())
}

func Test_isCronShouldBeIgnored(t *testing.T) {
emptyStartTimeCron := spec.PipelineCron{}
startTime := time.Date(2099, 1, 1, 1, 1, 1, 1, time.Local)
startTimeCron := spec.PipelineCron{Extra: spec.PipelineCronExtra{
CronStartFrom: &startTime,
}}
svc := PipelineSvc{}
assert.Equal(t, false, svc.isCronShouldBeIgnored(emptyStartTimeCron))
assert.Equal(t, true, svc.isCronShouldBeIgnored(startTimeCron))
}

0 comments on commit bb43b5e

Please sign in to comment.