Skip to content

Commit

Permalink
fix: Remove job_start_event and job_end_event from airflow dag templa…
Browse files Browse the repository at this point in the history
…tes (#768)

Co-authored-by: sambhav13 <sambhav.gupta@gojek.com>
  • Loading branch information
sambhav13 and sambhav13 authored Apr 4, 2023
1 parent 3ec5e08 commit 04eb548
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 65 deletions.
45 changes: 12 additions & 33 deletions ext/scheduler/airflow2/resources/base_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@
from __lib import optimus_sla_miss_notify, SuperKubernetesPodOperator, \
SuperExternalTaskSensor, ExternalHttpSensor

from __lib import JOB_START_EVENT_NAME, \
JOB_END_EVENT_NAME, \
log_start_event, \
log_success_event, \
from __lib import log_success_event, \
log_retry_event, \
log_failure_event, \
EVENT_NAMES, \
log_job_end, log_job_start
EVENT_NAMES

SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60))
SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60))
Expand Down Expand Up @@ -69,23 +65,6 @@
]
)

publish_job_start_event = PythonOperator(
task_id = JOB_START_EVENT_NAME,
python_callable = log_job_start,
provide_context=True,
depends_on_past=False,
dag=dag
)

publish_job_end_event = PythonOperator(
task_id = JOB_END_EVENT_NAME,
python_callable = log_job_end,
provide_context=True,
trigger_rule= 'all_success',
depends_on_past=False,
dag=dag
)

{{$baseTaskSchema := .Job.Task.Unit.Info -}}
{{- $setCPURequest := not (empty .Metadata.Resource.Request.CPU) -}}
{{- $setMemoryRequest := not (empty .Metadata.Resource.Request.Memory) -}}
Expand Down Expand Up @@ -299,33 +278,33 @@

# upstream sensors -> base transformation task
{{- range $i, $t := $.Job.Dependencies }}
publish_job_start_event >> wait_{{ $t.Job.Name | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
wait_{{ $t.Job.Name | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end}}
{{- range $_, $t := $.Job.ExternalDependencies.HTTPDependencies }}
publish_job_start_event >> wait_{{ $t.Name }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
wait_{{ $t.Name }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end}}
{{- range $_, $dependency := $.Job.ExternalDependencies.OptimusDependencies}}
{{ $identity := print $dependency.Name "-" $dependency.ProjectName "-" $dependency.JobName }}
publish_job_start_event >> wait_{{ $identity | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
wait_{{ $identity | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end}}
{{if and (not $.Job.Dependencies) (not $.Job.ExternalDependencies.HTTPDependencies) (not $.Job.ExternalDependencies.OptimusDependencies)}}
# if no sensor and dependency is configured
publish_job_start_event >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{end}}
# post completion hook
transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> publish_job_end_event
transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}

# set inter-dependencies between task and hooks
{{- range $_, $task := .Job.Hooks }}
{{- $hookSchema := $task.Unit.Info }}
{{- if eq $hookSchema.HookType $.HookTypePre }}
publish_job_start_event >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end -}}
{{- if eq $hookSchema.HookType $.HookTypePost }}
transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> publish_job_end_event
transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}}
{{- end -}}
{{- if eq $hookSchema.HookType $.HookTypeFail }}
transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> publish_job_end_event
transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}}
{{- end -}}
{{- end }}

Expand All @@ -334,7 +313,7 @@
{{- $hookSchema := $t.Unit.Info }}
{{- range $_, $depend := $t.DependsOn }}
{{- $dependHookSchema := $depend.Unit.Info }}
hook_{{$dependHookSchema.Name | replace "-" "__dash__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> publish_job_end_event
hook_{{$dependHookSchema.Name | replace "-" "__dash__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}}
{{- end }}
{{- end }}

Expand All @@ -349,7 +328,7 @@
{{- $fhookSchema := $ftask.Unit.Info }}
{{- if eq $fhookSchema.HookType $.HookTypeFail }} hook_{{$fhookSchema.Name | replace "-" "__dash__"}}, {{- end -}}
{{- end -}}
] >> publish_job_end_event
]

{{- end -}}

Expand Down
43 changes: 11 additions & 32 deletions ext/scheduler/airflow2/resources/expected_compiled_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@
from __lib import optimus_sla_miss_notify, SuperKubernetesPodOperator, \
SuperExternalTaskSensor, ExternalHttpSensor

from __lib import JOB_START_EVENT_NAME, \
JOB_END_EVENT_NAME, \
log_start_event, \
log_success_event, \
from __lib import log_success_event, \
log_retry_event, \
log_failure_event, \
EVENT_NAMES, \
log_job_end, log_job_start
EVENT_NAMES

SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60))
SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60))
Expand Down Expand Up @@ -61,23 +57,6 @@
]
)

publish_job_start_event = PythonOperator(
task_id = JOB_START_EVENT_NAME,
python_callable = log_job_start,
provide_context=True,
depends_on_past=False,
dag=dag
)

publish_job_end_event = PythonOperator(
task_id = JOB_END_EVENT_NAME,
python_callable = log_job_end,
provide_context=True,
trigger_rule= 'all_success',
depends_on_past=False,
dag=dag
)


JOB_DIR = "/data"
IMAGE_PULL_POLICY="IfNotPresent"
Expand Down Expand Up @@ -315,22 +294,22 @@
####################################

# upstream sensors -> base transformation task
publish_job_start_event >> wait_foo__dash__intra__dash__dep__dash__job >> transformation_bq
publish_job_start_event >> wait_foo__dash__inter__dash__dep__dash__job >> transformation_bq
wait_foo__dash__intra__dash__dep__dash__job >> transformation_bq
wait_foo__dash__inter__dash__dep__dash__job >> transformation_bq

publish_job_start_event >> wait_external__dash__optimus__dash__foo__dash__external__dash__optimus__dash__project__dash__foo__dash__external__dash__optimus__dash__dep__dash__job >> transformation_bq
wait_external__dash__optimus__dash__foo__dash__external__dash__optimus__dash__project__dash__foo__dash__external__dash__optimus__dash__dep__dash__job >> transformation_bq

# post completion hook
transformation_bq >> publish_job_end_event
transformation_bq

# set inter-dependencies between task and hooks
publish_job_start_event >> hook_transporter >> transformation_bq
transformation_bq >> hook_predator >> publish_job_end_event
transformation_bq >> hook_hook__dash__for__dash__fail >> publish_job_end_event
hook_transporter >> transformation_bq
transformation_bq >> hook_predator
transformation_bq >> hook_hook__dash__for__dash__fail

# set inter-dependencies between hooks and hooks
hook_transporter >> hook_predator >> publish_job_end_event
hook_transporter >> hook_predator

# arrange failure hook after post hooks

hook_predator >> [ hook_hook__dash__for__dash__fail,] >> publish_job_end_event
hook_predator >> [ hook_hook__dash__for__dash__fail,]

0 comments on commit 04eb548

Please sign in to comment.