Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use right execution_dates to fetch airflow job runs for varying schedule intervals #396

Merged
merged 3 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions core/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,30 @@ func ParseCronSchedule(interval string) (*ScheduleSpec, error) {
}, nil
}

// Interval accepts the time and returns
func (s *ScheduleSpec) Interval(t time.Time) time.Duration {
start := s.Next(t)
next := s.Next(start)
return next.Sub(start)
func (s *ScheduleSpec) Prev(currTime time.Time) time.Time {
startTime := s.getEarliestTimeToStartCron(currTime)
return s.getPreviousSchedule(currTime, startTime)
}

func (s *ScheduleSpec) getPreviousSchedule(currTime time.Time, startTime time.Time) time.Time {
previousSchedule := startTime
for {
nextSchedule := s.Next(previousSchedule)
if nextSchedule.After(currTime) || nextSchedule.Equal(currTime) {
return previousSchedule
}
previousSchedule = nextSchedule
}
}

func (s *ScheduleSpec) getEarliestTimeToStartCron(currTime time.Time) time.Time {
initialDelay := -time.Hour * 24 * 7 //nolint:gomnd
startTime := currTime
for {
startTime = startTime.Add(initialDelay)
if s.Next(startTime).Before(currTime) {
break
}
}
return startTime
}
71 changes: 71 additions & 0 deletions core/cron/cron_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package cron_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/odpf/optimus/core/cron"
)

func TestScheduleSpec(t *testing.T) {
t.Run("Prev", func(t *testing.T) {
t.Run("with constant interval", func(t *testing.T) {
scheduleSpec, err := cron.ParseCronSchedule("@midnight")
assert.Nil(t, err)
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
t.Run("with varying interval", func(t *testing.T) {
// at 2 AM every month on 2,11,19,26
scheduleSpec, err := cron.ParseCronSchedule("0 2 2,11,19,26 * *")
assert.Nil(t, err)

scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-19T01:59:59+00:00")
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-11T02:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
t.Run("with time falling on schedule time", func(t *testing.T) {
scheduleSpec, err := cron.ParseCronSchedule("@monthly")
assert.Nil(t, err)

scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-01T00:00:00+00:00")
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-02-01T00:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
})
t.Run("Next", func(t *testing.T) {
t.Run("with constant interval", func(t *testing.T) {
scheduleSpec, err := cron.ParseCronSchedule("@midnight")
assert.Nil(t, err)
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-26T00:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
t.Run("with varying interval", func(t *testing.T) {
// at 2 AM every month on 2,11,19,26
scheduleSpec, err := cron.ParseCronSchedule("0 2 2,11,19,26 * *")
assert.Nil(t, err)

scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-19T02:01:59+00:00")
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-26T02:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
t.Run("with current time falling on schedule time", func(t *testing.T) {
scheduleSpec, err := cron.ParseCronSchedule("@monthly")
assert.Nil(t, err)

scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-01T00:00:00+00:00")
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-04-01T00:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
})
}
50 changes: 30 additions & 20 deletions ext/scheduler/airflow2/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,7 @@ func (s *scheduler) GetJobRuns(ctx context.Context, projectSpec models.ProjectSp

var jobRuns []models.JobRun
var dagRunList DagRunListResponse
var dagRunRequest DagRunRequest
if jobQuery.OnlyLastRun {
dagRunRequest = getDagRunRequest(jobQuery)
} else {
jobQueryWithExecDate := covertToExecDate(jobQuery, jobCron)
dagRunRequest = getDagRunRequest(jobQueryWithExecDate)
}
dagRunRequest := s.GetDagRunRequest(jobQuery, jobCron)
reqBody, err := json.Marshal(dagRunRequest)
if err != nil {
return jobRuns, err
Expand All @@ -379,22 +373,38 @@ func (s *scheduler) GetJobRuns(ctx context.Context, projectSpec models.ProjectSp
return getJobRuns(dagRunList, jobCron)
}

func covertToExecDate(jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) *models.JobQuery {
givenStartDate := jobQuery.StartDate
givenEndDate := jobQuery.EndDate
func (s *scheduler) GetDagRunRequest(jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) DagRunRequest {
if jobQuery.OnlyLastRun {
return DagRunRequest{
OrderBy: "-execution_date",
PageOffset: 0,
PageLimit: 1,
DagIds: []string{jobQuery.Name},
}
}
startDate := s.getExecutionStartDate(jobQuery.StartDate, jobCron)
endDate := s.getExecutionEndDate(jobQuery.EndDate, jobCron)
return DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: pageLimit,
DagIds: []string{jobQuery.Name},
ExecutionDateGte: startDate.Format(airflowDateFormat),
ExecutionDateLte: endDate.Format(airflowDateFormat),
}
}

duration := jobCron.Interval(givenStartDate)
jobQuery.StartDate = givenStartDate.Add(-duration)
jobQuery.EndDate = givenEndDate.Add(-duration)
func (*scheduler) getExecutionStartDate(scheduleStartTime time.Time, jobCron *cron.ScheduleSpec) time.Time {
return jobCron.Prev(scheduleStartTime)
}

modifiedJobQuery := &models.JobQuery{
Name: jobQuery.Name,
StartDate: jobQuery.StartDate,
EndDate: jobQuery.EndDate,
Filter: jobQuery.Filter,
OnlyLastRun: false,
func (*scheduler) getExecutionEndDate(scheduleEndTime time.Time, jobCron *cron.ScheduleSpec) time.Time {
// when the current time matches one of the schedule times execution time means previous schedule.
if jobCron.Next(scheduleEndTime.Add(-time.Second * 1)).Equal(scheduleEndTime) {
return jobCron.Prev(scheduleEndTime)
}
return modifiedJobQuery
// else it is previous to previous schedule.
return jobCron.Prev(jobCron.Prev(scheduleEndTime))
}

func (*scheduler) notifyProgress(po progress.Observer, event progress.Event) {
Expand Down
75 changes: 75 additions & 0 deletions ext/scheduler/airflow2/airflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,3 +925,78 @@ func TestAirflow2(t *testing.T) {
})
})
}

func TestAirflow_GetDagRunsRequest(t *testing.T) {
t.Run("only last runs", func(t *testing.T) {
inputQuery := models.JobQuery{OnlyLastRun: true, Name: "dag1"}
scheduler := airflow2.NewScheduler(nil, nil, nil)

scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
expectedDagRunRequest := airflow2.DagRunRequest{OrderBy: "-execution_date",
PageOffset: 0,
PageLimit: 1,
DagIds: []string{"dag1"}}
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
})
t.Run("when input times doesn't fall exactly on schedule times", func(t *testing.T) {
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-29T00:00:00+00:00")
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-24T00:00:00+00:00")
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-28T00:00:00+00:00")
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
scheduler := airflow2.NewScheduler(nil, nil, nil)

scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
expectedDagRunRequest := airflow2.DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: 99999,
DagIds: []string{"dag1"},
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
}
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
})
t.Run("when input times fall exactly on schedule times", func(t *testing.T) {
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-29T02:00:00+00:00")
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-28T00:00:00+00:00")
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
scheduler := airflow2.NewScheduler(nil, nil, nil)

scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
expectedDagRunRequest := airflow2.DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: 99999,
DagIds: []string{"dag1"},
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
}
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
})
t.Run("with varying schedule intervals", func(t *testing.T) {
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-17T00:00:00+00:00")
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-27T00:00:00+00:00")
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-11T02:00:00+00:00")
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
scheduler := airflow2.NewScheduler(nil, nil, nil)

scheduleSpec, _ := cron.ParseCronSchedule("0 2 2,11,17,19,25,26,27 * *")
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
expectedDagRunRequest := airflow2.DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: 99999,
DagIds: []string{"dag1"},
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
}
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
})
}
19 changes: 0 additions & 19 deletions ext/scheduler/airflow2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,6 @@ func toJobStatus(list DagRunListResponse) ([]models.JobStatus, error) {
return jobStatus, nil
}

func getDagRunRequest(param *models.JobQuery) DagRunRequest {
if param.OnlyLastRun {
return DagRunRequest{
OrderBy: "-execution_date",
PageOffset: 0,
PageLimit: 1,
DagIds: []string{param.Name},
}
}
return DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: pageLimit,
DagIds: []string{param.Name},
ExecutionDateGte: param.StartDate.Format(airflowDateFormat),
ExecutionDateLte: param.EndDate.Format(airflowDateFormat),
}
}

func getJobRuns(res DagRunListResponse, spec *cron.ScheduleSpec) ([]models.JobRun, error) {
var jobRunList []models.JobRun
if res.TotalEntries > pageLimit {
Expand Down