Skip to content

Commit

Permalink
Fix: scheduler BC testing issues (#722)
Browse files Browse the repository at this point in the history
* fix: handel parallel log relay

* fix: upload all paralel return error

* fix: do not fail if dependent hooks not declared in job

* fix: add error logs

* fix: test cases

* fix: error reporitngs, and migration to fix old job run states

* fix: tests

* fix: move run-input cmd client to job folder
  • Loading branch information
Mryashbhardwaj authored Jan 18, 2023
1 parent cdfecee commit 5f2c317
Show file tree
Hide file tree
Showing 12 changed files with 14 additions and 12 deletions.
1 change: 1 addition & 0 deletions client/cmd/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func NewJobCommand() *cobra.Command {
NewInspectCommand(),
NewReplaceAllCommand(),
NewExportCommand(),
NewJobRunInputCommand(),
)
return cmd
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package scheduler
package job

import (
"fmt"
Expand Down
1 change: 0 additions & 1 deletion client/cmd/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func NewSchedulerCommand() *cobra.Command {

cmd.AddCommand(
UploadCommand(),
NewJobRunInputCommand(),
)
return cmd
}
2 changes: 1 addition & 1 deletion core/scheduler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func StateFromString(state string) (State, error) {
case string(StateFailed):
return StateFailed, nil
default:
return "", errors.InvalidArgument(EntityJobRun, "invalid state for job run "+state)
return "", errors.InvalidArgument(EntityJobRun, "invalid state for run "+state)
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/scheduler/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestStatus(t *testing.T) {

expectedJobRunStatus, err = scheduler.JobRunStatusFrom(currentTime, "unregisteredState")
assert.NotNil(t, err)
assert.EqualError(t, err, "invalid argument for entity jobRun: invalid state for job run unregisteredState")
assert.EqualError(t, err, "invalid argument for entity jobRun: invalid state for run unregisteredState")
assert.Equal(t, scheduler.JobRunStatus{}, expectedJobRunStatus)
})
t.Run("ExecutionStart", func(t *testing.T) {
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestStatus(t *testing.T) {

respState, err := scheduler.StateFromString("unregisteredState")
assert.NotNil(t, err)
assert.EqualError(t, err, "invalid argument for entity jobRun: invalid state for job run unregisteredState")
assert.EqualError(t, err, "invalid argument for entity jobRun: invalid state for run unregisteredState")
assert.Equal(t, scheduler.State(""), respState)
})
}
2 changes: 1 addition & 1 deletion entrypoint_init_container.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ echo "OPTIMUS_HOST:$OPTIMUS_HOST"
echo ""

echo "-- initializing optimus assets"
optimus scheduler run-input "$JOB_NAME" --project-name \
optimus job run-input "$JOB_NAME" --project-name \
"$PROJECT" --output-dir "$JOB_DIR" \
--type "$INSTANCE_TYPE" --name "$INSTANCE_NAME" \
--scheduled-at "$SCHEDULED_AT" --host "$OPTIMUS_HOST"
4 changes: 2 additions & 2 deletions internal/compiler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func (e *Engine) Compile(templateMap map[string]string, context map[string]any)
for name, content := range templateMap {
tmpl, err := e.baseTemplate.New(name).Parse(content)
if err != nil {
return nil, errors.InvalidArgument(EntityCompiler, "unable to parse content for "+name)
return nil, errors.AddErrContext(err, EntityCompiler, "unable to parse content for "+name)
}

var buf bytes.Buffer
err = tmpl.Execute(&buf, context)
if err != nil {
return nil, errors.InvalidArgument(EntityCompiler, "unable to render content for "+name)
return nil, errors.AddErrContext(err, EntityCompiler, "unable to render content for "+name)
}
rendered[name] = strings.TrimSpace(buf.String())
}
Expand Down
4 changes: 2 additions & 2 deletions internal/compiler/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestEngine(t *testing.T) {
_, err := comp.Compile(map[string]string{"query": input}, context)

assert.NotNil(t, err)
assert.EqualError(t, err, "invalid argument for entity compiler: unable to parse content for query")
assert.EqualError(t, err, "internal error for entity compiler: unable to parse content for query")
})
t.Run("returns error when rendering fails", func(t *testing.T) {
input := `event_timestamp > "{{.DSTART | Date }}"`
Expand All @@ -103,7 +103,7 @@ func TestEngine(t *testing.T) {
_, err := comp.Compile(map[string]string{"query": input}, context)

assert.NotNil(t, err)
assert.EqualError(t, err, "invalid argument for entity compiler: unable to render content for query")
assert.EqualError(t, err, "internal error for entity compiler: unable to render content for query")
})
t.Run("returns rendered string with values of macros for template map", func(t *testing.T) {
testCases := []struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
update job_run set status = 'STARTED' where status = 'running';
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
update job_run set status = 'running' where status = 'STARTED';
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func operatorTypeToTableName(operatorType scheduler.OperatorType) (string, error
func (o operatorRun) toOperatorRun() (*scheduler.OperatorRun, error) {
status, err := scheduler.StateFromString(o.Status)
if err != nil {
return nil, errors.NewError(scheduler.EntityJobRun, "invalid job run state in database", err.Error())
return nil, errors.NewError(scheduler.EntityJobRun, "invalid operator run state in database", err.Error())
}
return &scheduler.OperatorRun{
ID: o.ID,
Expand Down
2 changes: 1 addition & 1 deletion internal/store/postgres/scheduler/job_run_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (j jobRun) toJobRun() (*scheduler.JobRun, error) {
}
state, err := scheduler.StateFromString(j.Status)
if err != nil {
return nil, err
return nil, errors.AddErrContext(err, scheduler.EntityJobRun, "invalid job run state in database")
}
return &scheduler.JobRun{
ID: j.ID,
Expand Down

0 comments on commit 5f2c317

Please sign in to comment.