-
Notifications
You must be signed in to change notification settings - Fork 6
/
airflow_template.py
71 lines (65 loc) · 2.3 KB
/
airflow_template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
import yaml
import importlib
import os
with open(os.path.join(os.path.dirname(__file__), "config.yaml"), 'r') as yaml_stream:
try:
config = yaml.safe_load(yaml_stream)
except yaml.YAMLError as exc:
print(exc)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
for job, configs in config.items():
dag = DAG(
job,
default_args=default_args,
description='A standard description',
schedule_interval=configs["schedule"],
start_date=days_ago(100)
)
operators = []
if configs["type"] == "spark": # add spark job
for file in configs["files"]:
operator = BashOperator(
task_id='my_bash_operator',
bash_command=f"spark-submit {file}",
dag=dag,
)
operators.append(operator)
elif configs["type"] == "python": # add python job
for file in configs["files"]:
module = file.split(".")[0]
function = file.split(".")[1]
def python_function():
imported_module = importlib.import_module(module)
function_object = getattr(imported_module, function)
function_object()
operator = PythonOperator(
task_id='my_python_operator',
python_callable=python_function,
dag=dag,
)
operators.append(operator)
elif configs["type"] == "bash": # add bash job
for file in configs["files"]:
operator = BashOperator(
task_id='my_bash_operator',
bash_command=f"{file} ",
dag=dag,
)
operators.append(operator)
else:
raise Exception("Need to configure this new operator")
# important: add your DAG to globals
globals()[job] = dag