Skip to content

Commit

Permalink
Query API methods to query jobs by external job URI and retrieve job …
Browse files Browse the repository at this point in the history
…errors
  • Loading branch information
masipauskas authored Oct 17, 2024
1 parent e78270e commit 6271a58
Show file tree
Hide file tree
Showing 18 changed files with 1,624 additions and 133 deletions.
7 changes: 5 additions & 2 deletions internal/lookoutingesterv2/instructions/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ func (c *InstructionConverter) handleSubmitJob(
priorityClass = &truncatedPriorityClass
}

annotations := extractUserAnnotations(c.userAnnotationPrefix, event.GetObjectMeta().GetAnnotations())
annotations := event.GetObjectMeta().GetAnnotations()
userAnnotations := extractUserAnnotations(c.userAnnotationPrefix, annotations)
externalJobUri := util.Truncate(annotations["armadaproject.io/externalJobUri"], maxAnnotationValLen)

job := model.CreateJobInstruction{
JobId: event.JobId,
Expand All @@ -184,7 +186,8 @@ func (c *InstructionConverter) handleSubmitJob(
State: lookout.JobQueuedOrdinal,
JobProto: jobProto,
PriorityClass: priorityClass,
Annotations: annotations,
Annotations: userAnnotations,
ExternalJobUri: externalJobUri,
}
update.JobsToCreate = append(update.JobsToCreate, &job)

Expand Down
15 changes: 11 additions & 4 deletions internal/lookoutingesterv2/instructions/instructions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ func TestConvert(t *testing.T) {
}
submit.GetSubmitJob().GetMainObject().GetPodSpec().GetPodSpec().PriorityClassName = priorityClass
submit.GetSubmitJob().GetObjectMeta().Annotations = map[string]string{
userAnnotationPrefix + "a": "0",
"b": "1",
userAnnotationPrefix + "a": "0",
"b": "1",
"armadaproject.io/externalJobUri": "external-job-uri",
}
job, err := eventutil.ApiJobFromLogSubmitJob(testfixtures.UserId, []string{}, testfixtures.Queue, testfixtures.JobsetName, testfixtures.BaseTime, submit.GetSubmitJob())
assert.NoError(t, err)
Expand All @@ -209,9 +210,11 @@ func TestConvert(t *testing.T) {
JobProto: jobProto,
PriorityClass: pointer.String(priorityClass),
Annotations: map[string]string{
"a": "0",
"b": "1",
"a": "0",
"b": "1",
"armadaproject.io/externalJobUri": "external-job-uri",
},
ExternalJobUri: "external-job-uri",
}

cancelledWithReason, err := testfixtures.DeepCopy(testfixtures.JobCancelled)
Expand Down Expand Up @@ -481,6 +484,9 @@ func TestTruncatesStringsThatAreTooLong(t *testing.T) {
submit, err := testfixtures.DeepCopy(testfixtures.Submit)
assert.NoError(t, err)
submit.GetSubmitJob().GetMainObject().GetPodSpec().GetPodSpec().PriorityClassName = longString
submit.GetSubmitJob().GetObjectMeta().Annotations = map[string]string{
"armadaproject.io/externalJobUri": longString,
}

leased, err := testfixtures.DeepCopy(testfixtures.Leased)
assert.NoError(t, err)
Expand Down Expand Up @@ -517,6 +523,7 @@ func TestTruncatesStringsThatAreTooLong(t *testing.T) {
assert.Len(t, actual.JobsToCreate[0].Owner, 512)
assert.Len(t, actual.JobsToCreate[0].JobSet, 1024)
assert.Len(t, *actual.JobsToCreate[0].PriorityClass, 63)
assert.Len(t, actual.JobsToCreate[0].ExternalJobUri, 1024)
assert.Len(t, actual.JobRunsToCreate[0].Cluster, 512)
assert.Len(t, *actual.JobRunsToCreate[0].Node, 512)
assert.Len(t, *actual.JobRunsToUpdate[1].Node, 512)
Expand Down
14 changes: 10 additions & 4 deletions internal/lookoutingesterv2/lookoutdb/insertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
last_transition_time timestamp,
last_transition_time_seconds bigint,
priority_class varchar(63),
annotations jsonb
annotations jsonb,
external_job_uri varchar(1024) NULL
) ON COMMIT DROP;`, tmpTable))
if err != nil {
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
Expand Down Expand Up @@ -241,6 +242,7 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
"last_transition_time_seconds",
"priority_class",
"annotations",
"external_job_uri",
},
pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) {
return []interface{}{
Expand All @@ -260,6 +262,7 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
instructions[i].LastTransitionTimeSeconds,
instructions[i].PriorityClass,
instructions[i].Annotations,
instructions[i].ExternalJobUri,
}, nil
}),
)
Expand All @@ -286,7 +289,8 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
last_transition_time,
last_transition_time_seconds,
priority_class,
annotations
annotations,
external_job_uri
) SELECT * from %s
ON CONFLICT DO NOTHING`, tmpTable),
)
Expand Down Expand Up @@ -318,9 +322,10 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions []
last_transition_time,
last_transition_time_seconds,
priority_class,
annotations
annotations,
external_job_uri
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
ON CONFLICT DO NOTHING`
for _, i := range instructions {
err := l.withDatabaseRetryInsert(func() error {
Expand All @@ -341,6 +346,7 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions []
i.LastTransitionTimeSeconds,
i.PriorityClass,
i.Annotations,
i.ExternalJobUri,
)
if err != nil {
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
Expand Down
8 changes: 7 additions & 1 deletion internal/lookoutingesterv2/lookoutdb/insertion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type JobRow struct {
LatestRunId *string
CancelReason *string
Annotations map[string]string
ExternalJobUri string
}

type JobSpecRow struct {
Expand Down Expand Up @@ -156,6 +157,7 @@ var expectedJobAfterSubmit = JobRow{
Duplicate: false,
PriorityClass: priorityClass,
Annotations: annotations,
ExternalJobUri: "external-job-uri",
}

var expectedJobAfterUpdate = JobRow{
Expand All @@ -176,6 +178,7 @@ var expectedJobAfterUpdate = JobRow{
Duplicate: false,
PriorityClass: priorityClass,
Annotations: annotations,
ExternalJobUri: "external-job-uri",
}

var expectedJobRun = JobRunRow{
Expand Down Expand Up @@ -927,6 +930,7 @@ func makeCreateJobInstruction(jobId string) *model.CreateJobInstruction {
JobProto: []byte(jobProto),
PriorityClass: pointer.String(priorityClass),
Annotations: annotations,
ExternalJobUri: "external-job-uri",
}
}

Expand Down Expand Up @@ -964,7 +968,8 @@ func getJob(t *testing.T, db *pgxpool.Pool, jobId string) JobRow {
priority_class,
latest_run_id,
cancel_reason,
annotations
annotations,
external_job_uri
FROM job WHERE job_id = $1`,
jobId)
err := r.Scan(
Expand All @@ -988,6 +993,7 @@ func getJob(t *testing.T, db *pgxpool.Pool, jobId string) JobRow {
&job.LatestRunId,
&job.CancelReason,
&job.Annotations,
&job.ExternalJobUri,
)
assert.Nil(t, err)
return job
Expand Down
1 change: 1 addition & 0 deletions internal/lookoutingesterv2/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type CreateJobInstruction struct {
JobProto []byte
PriorityClass *string
Annotations map[string]string
ExternalJobUri string
}

// UpdateJobInstruction is an instruction to update an existing row in the jobs table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE job ADD COLUMN external_job_uri varchar(1024) NULL;
2 changes: 1 addition & 1 deletion internal/server/queryapi/database/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/server/queryapi/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions internal/server/queryapi/database/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,15 @@ SELECT * FROM job_run WHERE run_id = ANY(sqlc.arg(run_ids)::text[]);
-- name: GetJobRunsByJobIds :many
SELECT * FROM job_run WHERE job_id = ANY(sqlc.arg(job_ids)::text[]) order by leased desc;

-- name: GetJobErrorsByJobIds :many
select j.job_id as job_id, coalesce(je.error, jr.error) as error from job j
left join job_error je on j.job_id = je.job_id
left join job_run jr on j.latest_run_id = jr.run_id
where j.job_id = ANY(sqlc.arg(job_ids)::text[])
order by j.job_id desc;

-- name: GetJobStatesUsingExternalSystemUri :many
SELECT job_id, state FROM job
WHERE queue=sqlc.arg(queue)::text
AND jobset = sqlc.arg(jobset)::text
AND external_job_uri = sqlc.arg(external_job_uri)::text;
73 changes: 72 additions & 1 deletion internal/server/queryapi/database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 59 additions & 5 deletions internal/server/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,31 @@ func (q *QueryApi) GetJobDetails(ctx context.Context, req *api.JobDetailsRequest
}, err
}

func (q *QueryApi) GetJobErrors(ctx context.Context, req *api.JobErrorsRequest) (*api.JobErrorsResponse, error) {
if len(req.JobIds) > q.maxQueryItems {
return nil, fmt.Errorf("request contained more than %d JobIds", q.maxQueryItems)
}

queries := database.New(q.db)
queryResult, err := queries.GetJobErrorsByJobIds(ctx, req.JobIds)
if err != nil {
return nil, err
}

decompressor := q.decompressorFactory()
errorsById := make(map[string]string, len(queryResult))
for _, row := range queryResult {
decompressed, err := decompressor.Decompress(row.Error)
if err != nil {
return nil, err
}
errorsById[row.JobID] = string(decompressed)
}
return &api.JobErrorsResponse{
JobErrors: errorsById,
}, nil
}

func (q *QueryApi) GetJobRunDetails(ctx context.Context, req *api.JobRunDetailsRequest) (*api.JobRunDetailsResponse, error) {
if len(req.RunIds) > q.maxQueryItems {
return nil, fmt.Errorf("request contained more than %d RunIds", q.maxQueryItems)
Expand Down Expand Up @@ -176,11 +201,7 @@ func (q *QueryApi) GetJobStatus(ctx context.Context, req *api.JobStatusRequest)
for _, jobId := range req.JobIds {
dbStatus, ok := dbStatusById[jobId]
if ok {
apiStatus, ok := JobStateMap[dbStatus]
if !ok {
apiStatus = api.JobState_UNKNOWN // We know about this job but we can't map its state
}
apiStatusById[jobId] = apiStatus
apiStatusById[jobId] = parseDbJobStateToApi(dbStatus)
} else {
apiStatusById[jobId] = api.JobState_UNKNOWN // We don't know about this job
}
Expand All @@ -191,6 +212,39 @@ func (q *QueryApi) GetJobStatus(ctx context.Context, req *api.JobStatusRequest)
}, nil
}

func (q *QueryApi) GetJobStatusUsingExternalJobUri(ctx context.Context, req *api.JobStatusUsingExternalJobUriRequest) (*api.JobStatusResponse, error) {
if req.ExternalJobUri == "" {
return nil, fmt.Errorf("request must contain external job uri")
}

queries := database.New(q.db)
queryResult, err := queries.GetJobStatesUsingExternalSystemUri(ctx, database.GetJobStatesUsingExternalSystemUriParams{
Queue: req.Queue,
Jobset: req.Jobset,
ExternalJobUri: req.ExternalJobUri,
})
if err != nil {
return nil, err
}

apiStatusById := make(map[string]api.JobState, len(queryResult))
for _, dbRow := range queryResult {
apiStatusById[dbRow.JobID] = parseDbJobStateToApi(dbRow.State)
}

return &api.JobStatusResponse{
JobStates: apiStatusById,
}, nil
}

func parseDbJobStateToApi(dbStatus int16) api.JobState {
apiStatus, ok := JobStateMap[dbStatus]
if !ok {
apiStatus = api.JobState_UNKNOWN // We know about this job but we can't map its state
}
return apiStatus
}

func parseJobDetails(row database.JobRun) *api.JobRunDetails {
runState, ok := JobRunStateMap[row.JobRunState]
if !ok {
Expand Down
Loading

0 comments on commit 6271a58

Please sign in to comment.