diff --git a/core/cron/cron.go b/core/cron/cron.go index d03a7b263d..22e784e975 100644 --- a/core/cron/cron.go +++ b/core/cron/cron.go @@ -36,9 +36,30 @@ func ParseCronSchedule(interval string) (*ScheduleSpec, error) { }, nil } -// Interval accepts the time and returns -func (s *ScheduleSpec) Interval(t time.Time) time.Duration { - start := s.Next(t) - next := s.Next(start) - return next.Sub(start) +func (s *ScheduleSpec) Prev(currTime time.Time) time.Time { + startTime := s.getEarliestTimeToStartCron(currTime) + return s.getPreviousSchedule(currTime, startTime) +} + +func (s *ScheduleSpec) getPreviousSchedule(currTime time.Time, startTime time.Time) time.Time { + previousSchedule := startTime + for { + nextSchedule := s.Next(previousSchedule) + if nextSchedule.After(currTime) || nextSchedule.Equal(currTime) { + return previousSchedule + } + previousSchedule = nextSchedule + } +} + +func (s *ScheduleSpec) getEarliestTimeToStartCron(currTime time.Time) time.Time { + initialDelay := -time.Hour * 24 * 7 //nolint:gomnd + startTime := currTime + for { + startTime = startTime.Add(initialDelay) + if s.Next(startTime).Before(currTime) { + break + } + } + return startTime } diff --git a/core/cron/cron_test.go b/core/cron/cron_test.go new file mode 100644 index 0000000000..9d56a7e498 --- /dev/null +++ b/core/cron/cron_test.go @@ -0,0 +1,71 @@ +package cron_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/odpf/optimus/core/cron" +) + +func TestScheduleSpec(t *testing.T) { + t.Run("Prev", func(t *testing.T) { + t.Run("with constant interval", func(t *testing.T) { + scheduleSpec, err := cron.ParseCronSchedule("@midnight") + assert.Nil(t, err) + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00") + prevScheduleTime := scheduleSpec.Prev(scheduleStartTime) + expectedTime, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00") + assert.Equal(t, prevScheduleTime, expectedTime) + }) + t.Run("with varying interval", func(t *testing.T) { + // at 2 AM every month on 2,11,19,26 + scheduleSpec, err := cron.ParseCronSchedule("0 2 2,11,19,26 * *") + assert.Nil(t, err) + + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-19T01:59:59+00:00") + prevScheduleTime := scheduleSpec.Prev(scheduleStartTime) + expectedTime, _ := time.Parse(time.RFC3339, "2022-03-11T02:00:00+00:00") + assert.Equal(t, prevScheduleTime, expectedTime) + }) + t.Run("with time falling on schedule time", func(t *testing.T) { + scheduleSpec, err := cron.ParseCronSchedule("@monthly") + assert.Nil(t, err) + + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-01T00:00:00+00:00") + prevScheduleTime := scheduleSpec.Prev(scheduleStartTime) + expectedTime, _ := time.Parse(time.RFC3339, "2022-02-01T00:00:00+00:00") + assert.Equal(t, prevScheduleTime, expectedTime) + }) + }) + t.Run("Next", func(t *testing.T) { + t.Run("with constant interval", func(t *testing.T) { + scheduleSpec, err := cron.ParseCronSchedule("@midnight") + assert.Nil(t, err) + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00") + prevScheduleTime := scheduleSpec.Next(scheduleStartTime) + expectedTime, _ := time.Parse(time.RFC3339, "2022-03-26T00:00:00+00:00") + assert.Equal(t, prevScheduleTime, expectedTime) + }) + t.Run("with varying interval", func(t *testing.T) { + // at 2 AM every month on 2,11,19,26 + scheduleSpec, err := cron.ParseCronSchedule("0 2 2,11,19,26 * *") + assert.Nil(t, err) + + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-19T02:01:59+00:00") + prevScheduleTime := scheduleSpec.Next(scheduleStartTime) + expectedTime, _ := time.Parse(time.RFC3339, "2022-03-26T02:00:00+00:00") + assert.Equal(t, prevScheduleTime, expectedTime) + }) + t.Run("with current time falling on schedule time", func(t *testing.T) { + scheduleSpec, err := cron.ParseCronSchedule("@monthly") + assert.Nil(t, err) + + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-01T00:00:00+00:00") + prevScheduleTime := scheduleSpec.Next(scheduleStartTime) + expectedTime, _ := time.Parse(time.RFC3339, "2022-04-01T00:00:00+00:00") + assert.Equal(t, prevScheduleTime, expectedTime) + }) + }) +} diff --git a/ext/scheduler/airflow2/airflow.go b/ext/scheduler/airflow2/airflow.go index 629cb841f2..98ef2edcd5 100644 --- a/ext/scheduler/airflow2/airflow.go +++ b/ext/scheduler/airflow2/airflow.go @@ -353,13 +353,7 @@ func (s *scheduler) GetJobRuns(ctx context.Context, projectSpec models.ProjectSp var jobRuns []models.JobRun var dagRunList DagRunListResponse - var dagRunRequest DagRunRequest - if jobQuery.OnlyLastRun { - dagRunRequest = getDagRunRequest(jobQuery) - } else { - jobQueryWithExecDate := covertToExecDate(jobQuery, jobCron) - dagRunRequest = getDagRunRequest(jobQueryWithExecDate) - } + dagRunRequest := s.GetDagRunRequest(jobQuery, jobCron) reqBody, err := json.Marshal(dagRunRequest) if err != nil { return jobRuns, err @@ -379,22 +373,38 @@ func (s *scheduler) GetJobRuns(ctx context.Context, projectSpec models.ProjectSp return getJobRuns(dagRunList, jobCron) } -func covertToExecDate(jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) *models.JobQuery { - givenStartDate := jobQuery.StartDate - givenEndDate := jobQuery.EndDate +func (s *scheduler) GetDagRunRequest(jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) DagRunRequest { + if jobQuery.OnlyLastRun { + return DagRunRequest{ + OrderBy: "-execution_date", + PageOffset: 0, + PageLimit: 1, + DagIds: []string{jobQuery.Name}, + } + } + startDate := s.getExecutionStartDate(jobQuery.StartDate, jobCron) + endDate := s.getExecutionEndDate(jobQuery.EndDate, jobCron) + return DagRunRequest{ + OrderBy: "execution_date", + PageOffset: 0, + PageLimit: pageLimit, + DagIds: []string{jobQuery.Name}, + ExecutionDateGte: startDate.Format(airflowDateFormat), + ExecutionDateLte: endDate.Format(airflowDateFormat), + } +} - duration := jobCron.Interval(givenStartDate) - jobQuery.StartDate = givenStartDate.Add(-duration) - jobQuery.EndDate = givenEndDate.Add(-duration) +func (*scheduler) getExecutionStartDate(scheduleStartTime time.Time, jobCron *cron.ScheduleSpec) time.Time { + return jobCron.Prev(scheduleStartTime) +} - modifiedJobQuery := &models.JobQuery{ - Name: jobQuery.Name, - StartDate: jobQuery.StartDate, - EndDate: jobQuery.EndDate, - Filter: jobQuery.Filter, - OnlyLastRun: false, +func (*scheduler) getExecutionEndDate(scheduleEndTime time.Time, jobCron *cron.ScheduleSpec) time.Time { + // when the current time matches one of the schedule times execution time means previous schedule. + if jobCron.Next(scheduleEndTime.Add(-time.Second * 1)).Equal(scheduleEndTime) { + return jobCron.Prev(scheduleEndTime) } - return modifiedJobQuery + // else it is previous to previous schedule. + return jobCron.Prev(jobCron.Prev(scheduleEndTime)) } func (*scheduler) notifyProgress(po progress.Observer, event progress.Event) { diff --git a/ext/scheduler/airflow2/airflow_test.go b/ext/scheduler/airflow2/airflow_test.go index 4f22aadfc6..e6f78ccc74 100644 --- a/ext/scheduler/airflow2/airflow_test.go +++ b/ext/scheduler/airflow2/airflow_test.go @@ -925,3 +925,78 @@ func TestAirflow2(t *testing.T) { }) }) } + +func TestAirflow_GetDagRunsRequest(t *testing.T) { + t.Run("only last runs", func(t *testing.T) { + inputQuery := models.JobQuery{OnlyLastRun: true, Name: "dag1"} + scheduler := airflow2.NewScheduler(nil, nil, nil) + + scheduleSpec, _ := cron.ParseCronSchedule("@midnight") + dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec) + expectedDagRunRequest := airflow2.DagRunRequest{OrderBy: "-execution_date", + PageOffset: 0, + PageLimit: 1, + DagIds: []string{"dag1"}} + assert.Equal(t, dagRunRequest, expectedDagRunRequest) + }) + t.Run("when input times doesn't fall exactly on schedule times", func(t *testing.T) { + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00") + scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-29T00:00:00+00:00") + expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-24T00:00:00+00:00") + expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-28T00:00:00+00:00") + inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime} + scheduler := airflow2.NewScheduler(nil, nil, nil) + + scheduleSpec, _ := cron.ParseCronSchedule("@midnight") + dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec) + expectedDagRunRequest := airflow2.DagRunRequest{ + OrderBy: "execution_date", + PageOffset: 0, + PageLimit: 99999, + DagIds: []string{"dag1"}, + ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"), + ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"), + } + assert.Equal(t, dagRunRequest, expectedDagRunRequest) + }) + t.Run("when input times fall exactly on schedule times", func(t *testing.T) { + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00") + scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-29T02:00:00+00:00") + expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00") + expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-28T00:00:00+00:00") + inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime} + scheduler := airflow2.NewScheduler(nil, nil, nil) + + scheduleSpec, _ := cron.ParseCronSchedule("@midnight") + dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec) + expectedDagRunRequest := airflow2.DagRunRequest{ + OrderBy: "execution_date", + PageOffset: 0, + PageLimit: 99999, + DagIds: []string{"dag1"}, + ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"), + ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"), + } + assert.Equal(t, dagRunRequest, expectedDagRunRequest) + }) + t.Run("with varying schedule intervals", func(t *testing.T) { + scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-17T00:00:00+00:00") + scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-27T00:00:00+00:00") + expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-11T02:00:00+00:00") + expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00") + inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime} + scheduler := airflow2.NewScheduler(nil, nil, nil) + + scheduleSpec, _ := cron.ParseCronSchedule("0 2 2,11,17,19,25,26,27 * *") + dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec) + expectedDagRunRequest := airflow2.DagRunRequest{ + OrderBy: "execution_date", + PageOffset: 0, + PageLimit: 99999, + DagIds: []string{"dag1"}, + ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"), + ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"), + } + assert.Equal(t, dagRunRequest, expectedDagRunRequest) + }) +} diff --git a/ext/scheduler/airflow2/client.go b/ext/scheduler/airflow2/client.go index 359151361c..becc1e6707 100644 --- a/ext/scheduler/airflow2/client.go +++ b/ext/scheduler/airflow2/client.go @@ -138,25 +138,6 @@ func toJobStatus(list DagRunListResponse) ([]models.JobStatus, error) { return jobStatus, nil } -func getDagRunRequest(param *models.JobQuery) DagRunRequest { - if param.OnlyLastRun { - return DagRunRequest{ - OrderBy: "-execution_date", - PageOffset: 0, - PageLimit: 1, - DagIds: []string{param.Name}, - } - } - return DagRunRequest{ - OrderBy: "execution_date", - PageOffset: 0, - PageLimit: pageLimit, - DagIds: []string{param.Name}, - ExecutionDateGte: param.StartDate.Format(airflowDateFormat), - ExecutionDateLte: param.EndDate.Format(airflowDateFormat), - } -} - func getJobRuns(res DagRunListResponse, spec *cron.ScheduleSpec) ([]models.JobRun, error) { var jobRunList []models.JobRun if res.TotalEntries > pageLimit {