Skip to content

Commit

Permalink
fix: getjobruns on varying schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
sravankorumilli committed Jun 7, 2022
1 parent bb61b05 commit 873165d
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 40 deletions.
29 changes: 28 additions & 1 deletion core/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,36 @@ func ParseCronSchedule(interval string) (*ScheduleSpec, error) {
}, nil
}

// Interval accepts the time and returns
func (s *ScheduleSpec) Interval(t time.Time) time.Duration {
start := s.Next(t)
next := s.Next(start)
return next.Sub(start)
}

func (s *ScheduleSpec) Prev(currTime time.Time) time.Time {
startTime := s.getEarliestTimeToStartCron(currTime)
return s.getPreviousSchedule(currTime, startTime)
}

func (s *ScheduleSpec) getPreviousSchedule(currTime time.Time, startTime time.Time) time.Time {
previousSchedule := startTime
for {
nextSchedule := s.Next(previousSchedule)
if nextSchedule.After(currTime) || nextSchedule.Equal(currTime) {
return previousSchedule
}
previousSchedule = nextSchedule
}
}

func (s *ScheduleSpec) getEarliestTimeToStartCron(currTime time.Time) time.Time {
initialDelay := -time.Hour * 24 * 7 //noline:gomnd
startTime := currTime
for {
startTime = startTime.Add(initialDelay)
if s.Next(startTime).Before(currTime) {
break
}
}
return startTime
}
69 changes: 69 additions & 0 deletions core/cron/cron_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package cron_test

import (
"github.com/odpf/optimus/core/cron"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestScheduleSpec(t *testing.T) {
t.Run("Prev", func(t *testing.T) {
t.Run("with constant interval", func(t *testing.T) {
scheduleSpec, err := cron.ParseCronSchedule("@midnight")
assert.Nil(t, err)
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
t.Run("with varying interval", func(t *testing.T) {
// at 2 AM every month on 2,11,19,26
scheduleSpec, err := cron.ParseCronSchedule("0 2 2,11,19,26 * *")
assert.Nil(t, err)

scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-19T01:59:59+00:00")
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-11T02:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
t.Run("with time falling on schedule time", func(t *testing.T) {
scheduleSpec, err := cron.ParseCronSchedule("@monthly")
assert.Nil(t, err)

scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-01T00:00:00+00:00")
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-02-01T00:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
})
t.Run("Next", func(t *testing.T) {
t.Run("with constant interval", func(t *testing.T) {
scheduleSpec, err := cron.ParseCronSchedule("@midnight")
assert.Nil(t, err)
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-26T00:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
t.Run("with varying interval", func(t *testing.T) {
// at 2 AM every month on 2,11,19,26
scheduleSpec, err := cron.ParseCronSchedule("0 2 2,11,19,26 * *")
assert.Nil(t, err)

scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-19T02:01:59+00:00")
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-26T02:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
t.Run("with current time falling on schedule time", func(t *testing.T) {
scheduleSpec, err := cron.ParseCronSchedule("@monthly")
assert.Nil(t, err)

scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-01T00:00:00+00:00")
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
expectedTime, _ := time.Parse(time.RFC3339, "2022-04-01T00:00:00+00:00")
assert.Equal(t, prevScheduleTime, expectedTime)
})
})
}
50 changes: 30 additions & 20 deletions ext/scheduler/airflow2/airflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,7 @@ func (s *scheduler) GetJobRuns(ctx context.Context, projectSpec models.ProjectSp

var jobRuns []models.JobRun
var dagRunList DagRunListResponse
var dagRunRequest DagRunRequest
if jobQuery.OnlyLastRun {
dagRunRequest = getDagRunRequest(jobQuery)
} else {
jobQueryWithExecDate := covertToExecDate(jobQuery, jobCron)
dagRunRequest = getDagRunRequest(jobQueryWithExecDate)
}
dagRunRequest := s.GetDagRunRequest(jobQuery, jobCron)
reqBody, err := json.Marshal(dagRunRequest)
if err != nil {
return jobRuns, err
Expand All @@ -379,22 +373,38 @@ func (s *scheduler) GetJobRuns(ctx context.Context, projectSpec models.ProjectSp
return getJobRuns(dagRunList, jobCron)
}

func covertToExecDate(jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) *models.JobQuery {
givenStartDate := jobQuery.StartDate
givenEndDate := jobQuery.EndDate
func (s *scheduler) GetDagRunRequest(jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) DagRunRequest {
if jobQuery.OnlyLastRun {
return DagRunRequest{
OrderBy: "-execution_date",
PageOffset: 0,
PageLimit: 1,
DagIds: []string{jobQuery.Name},
}
}
startDate := s.getExecutionStartDate(jobQuery.StartDate, jobCron)
endDate := s.getExecutionEndDate(jobQuery.EndDate, jobCron)
return DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: pageLimit,
DagIds: []string{jobQuery.Name},
ExecutionDateGte: startDate.Format(airflowDateFormat),
ExecutionDateLte: endDate.Format(airflowDateFormat),
}
}

duration := jobCron.Interval(givenStartDate)
jobQuery.StartDate = givenStartDate.Add(-duration)
jobQuery.EndDate = givenEndDate.Add(-duration)
func (*scheduler) getExecutionStartDate(scheduleStartTime time.Time, jobCron *cron.ScheduleSpec) time.Time {
return jobCron.Prev(scheduleStartTime)
}

modifiedJobQuery := &models.JobQuery{
Name: jobQuery.Name,
StartDate: jobQuery.StartDate,
EndDate: jobQuery.EndDate,
Filter: jobQuery.Filter,
OnlyLastRun: false,
func (*scheduler) getExecutionEndDate(scheduleEndTime time.Time, jobCron *cron.ScheduleSpec) time.Time {
// when the current time matches one of the schedule times execution time means previous schedule.
if jobCron.Next(scheduleEndTime.Add(-time.Second * 1)).Equal(scheduleEndTime) {
return jobCron.Prev(scheduleEndTime)
}
return modifiedJobQuery
// else it is previous to previous schedule.
return jobCron.Prev(jobCron.Prev(scheduleEndTime))
}

func (*scheduler) notifyProgress(po progress.Observer, event progress.Event) {
Expand Down
75 changes: 75 additions & 0 deletions ext/scheduler/airflow2/airflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,3 +925,78 @@ func TestAirflow2(t *testing.T) {
})
})
}

func TestAirflow_GetDagRunsRequest(t *testing.T) {
t.Run("only last runs", func(t *testing.T) {
inputQuery := models.JobQuery{OnlyLastRun: true, Name: "dag1"}
scheduler := airflow2.NewScheduler(nil, nil, nil)

scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
expectedDagRunRequest := airflow2.DagRunRequest{OrderBy: "-execution_date",
PageOffset: 0,
PageLimit: 1,
DagIds: []string{"dag1"}}
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
})
t.Run("when input times doesn't fall exactly on schedule times", func(t *testing.T) {
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-29T00:00:00+00:00")
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-24T00:00:00+00:00")
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-28T00:00:00+00:00")
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
scheduler := airflow2.NewScheduler(nil, nil, nil)

scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
expectedDagRunRequest := airflow2.DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: 99999,
DagIds: []string{"dag1"},
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
}
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
})
t.Run("when input times fall exactly on schedule times", func(t *testing.T) {
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-29T02:00:00+00:00")
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-28T00:00:00+00:00")
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
scheduler := airflow2.NewScheduler(nil, nil, nil)

scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
expectedDagRunRequest := airflow2.DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: 99999,
DagIds: []string{"dag1"},
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
}
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
})
t.Run("with varying schedule intervals", func(t *testing.T) {
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-17T00:00:00+00:00")
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-27T00:00:00+00:00")
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-11T02:00:00+00:00")
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
scheduler := airflow2.NewScheduler(nil, nil, nil)

scheduleSpec, _ := cron.ParseCronSchedule("0 2 2,11,17,19,25,26,27 * *")
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
expectedDagRunRequest := airflow2.DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: 99999,
DagIds: []string{"dag1"},
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
}
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
})
}
19 changes: 0 additions & 19 deletions ext/scheduler/airflow2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,6 @@ func toJobStatus(list DagRunListResponse) ([]models.JobStatus, error) {
return jobStatus, nil
}

func getDagRunRequest(param *models.JobQuery) DagRunRequest {
if param.OnlyLastRun {
return DagRunRequest{
OrderBy: "-execution_date",
PageOffset: 0,
PageLimit: 1,
DagIds: []string{param.Name},
}
}
return DagRunRequest{
OrderBy: "execution_date",
PageOffset: 0,
PageLimit: pageLimit,
DagIds: []string{param.Name},
ExecutionDateGte: param.StartDate.Format(airflowDateFormat),
ExecutionDateLte: param.EndDate.Format(airflowDateFormat),
}
}

func getJobRuns(res DagRunListResponse, spec *cron.ScheduleSpec) ([]models.JobRun, error) {
var jobRunList []models.JobRun
if res.TotalEntries > pageLimit {
Expand Down

0 comments on commit 873165d

Please sign in to comment.