Skip to content

Commit

Permalink
refactor: change external optimus sensor task id similar with interna…
Browse files Browse the repository at this point in the history
…l sensor task id (#491)

* refactor: change external optimus sensor task name similar with internal sensor name

* fix: cross optimus task dependencies in airflow is not properly

* refactor: revert cross optimus sensor variable name

Co-authored-by: Anwar Hidayat <anwarhidayat14@gmail.com>
  • Loading branch information
arinda-arif and irainia authored Aug 2, 2022
1 parent fb02abd commit 5b3896d
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 5 deletions.
1 change: 1 addition & 0 deletions ext/resourcemgr/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,6 @@ func (o *optimusResourceManager) toOptimusDependency(response jobSpecificationRe
ProjectName: response.ProjectName,
NamespaceName: response.NamespaceName,
JobName: response.Job.Name,
TaskName: response.Job.TaskName,
}
}
4 changes: 3 additions & 1 deletion ext/resourcemgr/optimus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ func (o *OptimusResourceManager) TestGetJobSpecifications() {
"name": "hook-1-config-1-key",
"value": "hook-1-config-1-value"
}]
}]
}],
"taskName": "task-1"
}
}
]
Expand All @@ -265,6 +266,7 @@ func (o *OptimusResourceManager) TestGetJobSpecifications() {
ProjectName: "project",
NamespaceName: "namespace",
JobName: "job",
TaskName: "task-1",
},
}

Expand Down
12 changes: 12 additions & 0 deletions ext/scheduler/airflow2/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ func TestCompilerIntegration(t *testing.T) {
"destination1": {Job: &depSpecIntra, Project: &projSpec, Type: models.JobSpecDependencyTypeIntra},
"destination2": {Job: &depSpecInter, Project: &externalProjSpec, Type: models.JobSpecDependencyTypeInter},
},
ExternalDependencies: models.ExternalDependency{
OptimusDependencies: []models.OptimusDependency{
{
Name: "external-optimus",
Host: "http://optimus.external.io",
ProjectName: "foo-external-optimus-project",
NamespaceName: "bar-external-optimus-namespace",
JobName: "foo-external-optimus-dep-job",
TaskName: "bq",
},
},
},
Assets: *models.JobAssets{}.New(
[]models.JobSpecAsset{
{
Expand Down
8 changes: 4 additions & 4 deletions ext/scheduler/airflow2/resources/base_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
sla_miss_callback=optimus_sla_miss_notify,
catchup={{ if .Job.Behavior.CatchUp -}}True{{- else -}}False{{- end }},
dagrun_timeout=timedelta(seconds=DAGRUN_TIMEOUT_IN_SECS),
tags = [
tags = [
{{- range $key, $value := $.Job.Labels}}
"{{ $value }}",
{{- end}}
Expand Down Expand Up @@ -228,15 +228,15 @@

{{- range $_, $dependency := $.Job.ExternalDependencies.OptimusDependencies}}
{{ $identity := print $dependency.Name "-" $dependency.ProjectName "-" $dependency.JobName }}
wait_{{$identity | replace "-" "__dash__" | replace "." "__dot__"}} = SuperExternalTaskSensor(
wait_{{ $identity | replace "-" "__dash__" | replace "." "__dot__"}} = SuperExternalTaskSensor(
optimus_hostname="{{$dependency.Host}}",
upstream_optimus_project="{{$dependency.ProjectName}}",
upstream_optimus_namespace="{{$dependency.NamespaceName}}",
upstream_optimus_job="{{$dependency.JobName}}",
window_size="{{ $baseWindow.Size.String }}",
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
task_id="wait_{{$identity | trunc 200}}",
task_id="wait_{{$dependency.JobName | trunc 200}}-{{$dependency.TaskName}}",
dag=dag
)
{{- end}}
Expand Down Expand Up @@ -268,7 +268,7 @@
{{- end}}
{{- range $_, $dependency := $.Job.ExternalDependencies.OptimusDependencies}}
{{ $identity := print $dependency.Name "-" $dependency.ProjectName "-" $dependency.JobName }}
publish_job_start_event >> wait_{{$identity | replace "-" "__dash__" | replace "." "__dot__"}} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
publish_job_start_event >> wait_{{ $identity | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end}}
{{if and (not $.Job.Dependencies) (not $.Job.ExternalDependencies.HTTPDependencies) (not $.Job.ExternalDependencies.OptimusDependencies)}}
# if no sensor and dependency is configured
Expand Down
14 changes: 14 additions & 0 deletions ext/scheduler/airflow2/resources/expected_compiled_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,27 @@
dag=dag
)

wait_external__dash__optimus__dash__foo__dash__external__dash__optimus__dash__project__dash__foo__dash__external__dash__optimus__dash__dep__dash__job = SuperExternalTaskSensor(
optimus_hostname="http://optimus.external.io",
upstream_optimus_project="foo-external-optimus-project",
upstream_optimus_namespace="bar-external-optimus-namespace",
upstream_optimus_job="foo-external-optimus-dep-job",
window_size="1h0m0s",
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
task_id="wait_foo-external-optimus-dep-job-bq",
dag=dag
)

# arrange inter task dependencies
####################################

# upstream sensors -> base transformation task
publish_job_start_event >> wait_foo__dash__intra__dash__dep__dash__job >> transformation_bq
publish_job_start_event >> wait_foo__dash__inter__dash__dep__dash__job >> transformation_bq

publish_job_start_event >> wait_external__dash__optimus__dash__foo__dash__external__dash__optimus__dash__project__dash__foo__dash__external__dash__optimus__dash__dep__dash__job >> transformation_bq

# post completion hook
transformation_bq >> publish_job_end_event

Expand Down
1 change: 1 addition & 0 deletions models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ type OptimusDependency struct {
ProjectName string
NamespaceName string
JobName string
TaskName string
}

type HTTPDependency struct {
Expand Down

0 comments on commit 5b3896d

Please sign in to comment.