Skip to content

Commit

Permalink
fix: reset job status ids during internal migration (#2684)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 11, 2022
1 parent 7bba131 commit f2f589c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
43 changes: 22 additions & 21 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ func TestJobsDB(t *testing.T) {
// #numFailedJobs+#numSucceededJobs - #numTotalJobs jobs are unprocessed - should be migrated
)
require.NoError(t, jobDB.Store(context.Background(), jobs))

require.NoError(
t,
jobDB.UpdateJobStatus(
Expand All @@ -723,6 +724,11 @@ func TestJobsDB(t *testing.T) {
),
)

unprocessedBeforeMigration, err := jobDB.GetUnprocessed(context.Background(), GetQueryParamsT{JobsLimit: 100})
require.NoError(t, err)
failedBeforeMigration, err := jobDB.GetToRetry(context.Background(), GetQueryParamsT{JobsLimit: 100})
require.NoError(t, err)

require.EqualValues(t, 1, jobDB.GetMaxDSIndex())
time.Sleep(time.Second * 2) // wait for some time to pass
triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run
Expand All @@ -748,28 +754,31 @@ func TestJobsDB(t *testing.T) {
require.NoError(
t,
jobDB.dbHandle.QueryRow(
fmt.Sprintf(`SELECT COUNT(*) FROM %s`, tablePrefix+`_jobs_`+dsIndicesList[0]),
fmt.Sprintf(`SELECT COUNT(*) FROM %s`, tablePrefix+`_jobs_1_1`),
).Scan(&numJobs),
)
require.Equal(t, numFailedJobs+numUnprocessedJobs, int(numJobs))

// verify job statuses
var numJobstatuses, maxJobStatusID, nextSeqVal int64
require.NoError(
t,
jobDB.dbHandle.QueryRow(
fmt.Sprintf(`SELECT COUNT(*), MAX(id), nextval('%[1]s_id_seq') FROM %[1]s`, tablePrefix+`_job_status_1_1`),
).Scan(&numJobstatuses, &maxJobStatusID, &nextSeqVal),
)
require.Equal(t, numFailedJobs, int(numJobstatuses))
require.Greater(t, nextSeqVal, maxJobStatusID)

// verify that unprocessed jobs are migrated to new DS
unprocessedResult, err := jobDB.GetUnprocessed(context.Background(), GetQueryParamsT{
CustomValFilters: []string{customVal},
JobsLimit: 100,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "GetUnprocessed failed")
require.Equal(t, numUnprocessedJobs, len(unprocessedResult.Jobs))
expectedUnprocessedJobIDs := make([]int64, 0)
for _, job := range jobs[numFailedJobs+numSucceededJobs:] {
expectedUnprocessedJobIDs = append(expectedUnprocessedJobIDs, job.JobID)
}
actualUnprocessedJobIDs := make([]int64, 0)
for _, job := range unprocessedResult.Jobs {
actualUnprocessedJobIDs = append(actualUnprocessedJobIDs, job.JobID)
}
require.Equal(t, expectedUnprocessedJobIDs, actualUnprocessedJobIDs)
require.Len(t, unprocessedResult.Jobs, numUnprocessedJobs)
require.EqualValues(t, unprocessedBeforeMigration.Jobs, unprocessedResult.Jobs)

// verifying that failed jobs are migrated to new DS
failedResult, err := jobDB.GetToRetry(context.Background(), GetQueryParamsT{
Expand All @@ -778,16 +787,8 @@ func TestJobsDB(t *testing.T) {
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "GetToRetry failed")
expectedFailedJobIDs := make([]int64, 0)
for _, job := range jobs[:numFailedJobs] {
expectedFailedJobIDs = append(expectedFailedJobIDs, job.JobID)
}
actualFailedJobIDs := make([]int64, 0)
for _, job := range failedResult.Jobs {
actualFailedJobIDs = append(actualFailedJobIDs, job.JobID)
}
require.Equal(t, numFailedJobs, len(failedResult.Jobs))
require.Equal(t, expectedFailedJobIDs, actualFailedJobIDs)
require.Len(t, failedResult.Jobs, numFailedJobs)
require.EqualValues(t, failedBeforeMigration.Jobs, failedResult.Jobs)
})
}

Expand Down
14 changes: 9 additions & 5 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1775,12 +1775,14 @@ func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS da
),
inserted_jobs as
(
insert into %[3]q (select j.* from %[2]q j left join last_status js on js.job_id = j.job_id
insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at)
(select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, j.event_payload, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id
where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id
),
insertedStatuses as
(
insert into %[4]q (select * from last_status where job_state = ANY('{%[5]s}'))
insert into %[4]q (job_id, job_state, attempt, exec_time, retry_time, error_code, error_response, parameters)
(select job_id, job_state, attempt, exec_time, retry_time, error_code, error_response, parameters from last_status where job_state = ANY('{%[5]s}'))
)
select count(*) from inserted_jobs;`,
srcDS.JobStatusTable,
Expand All @@ -1791,11 +1793,13 @@ func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS da
)

var numJobsMigrated int64
err := tx.QueryRowContext(
if err := tx.QueryRowContext(
ctx,
compactDSQuery,
).Scan(&numJobsMigrated)
if err != nil {
).Scan(&numJobsMigrated); err != nil {
return 0, err
}
if _, err := tx.Exec(fmt.Sprintf(`ANALYZE %q, %q`, destDS.JobTable, destDS.JobStatusTable)); err != nil {
return 0, err
}
return int(numJobsMigrated), nil
Expand Down

0 comments on commit f2f589c

Please sign in to comment.