🐡 USF IMaRS Airflow DAGs
Don't hesitate to open an issue if you are confused or something isn't working for you. This is under heavy development so documentation and code likely contain errors.
Please see details in the (private to IMaRS users) DAG Development Workflows document.
The short version of the simplest workflow:
- Edit DAGs on github
- Wait for test server to pull your changes (via puppet). Pulls occur approximately once every half hour.
- Run the DAG tasks by
- waiting for a new DagRun to trigger
- Trigger a new DAGRun
- Clear an existing DAGRun
- View Results in web GUI
Additional documentation in the ./docs
directory.
- A DAG (Directed Acyclic Graph) defines a processing pipeline.
- Each DAG is in its own python file.
- The DAG file name matches the name string passed to the DAG constructor.
- The main DAG variable in the file is named
this_dag
. - Operator
task_id
string matches the operator variable name.
Data procesing DAGs in general follow the ETL pattern by using the helpers in
dags/util/etl_tools/
.
A typical automated processing pipeline should look something like:
[external data src]---(ingest DAG)--->(imars-etl.load)<---------------------\
| \
[imars_product_metadata db]<---|--->[IMaRS object storage] \
| | \
(product FileTriggerDAG) (imars-etl.extract) |
| | |
|--->(product processing DAG)<--\ | |
|--->(product processing DAG)<---[local copy of input data]<---| |
|--->(product processing DAG)<--/ |
|---> ... \\\ |
\--->[local copy of output data]---------|
Within this there are 3 types of DAGs to be defined:
- product processing DAGs : create a new data product by transforming data from existing products. Eg: L1 file into L2.
- ingest DAGs : fetch data from external sources and load the object & metadata into IMaRS' system.
- FileTriggerDAGs : Start up a list of processing DAGs when a new product file is added. There must be only one of these per product type.
A MySQL database of IMaRS data/image products is maintained independently of airflow. This database (imars_product_metadata) contains information about the data products like the datetime of each granule, the product "type", and coverage areas. This information can be searched by connecting to the database directly, or through the use of the imars-etl package. This database serves two functions:
- enable airflow scripts to trigger based on files added with
FileTriggerDAG
- allow IMaRS users to search for data products by metadata.
Using imars-etl
is critical for fetching and uploading IMaRS data products.
"ETL" is short for Extract-Transform-Load and this describes data processing in
general:
- "extract" products we want to use as inputs from IMaRS' data server(s).
- "transform" the data via some processing method(s)
- "load" the result as a new data product into IMaRS' data system.
The imars-etl package aims to simplify the "extract" and "load" steps by hiding the complexity of IMaRS' data systems behind a nice CLI.
To make things even more simple for airflow DAGs ./dags/util/etl_tools
includes some helper functions to set up imars-etl operators automatically.
The helper will add extract, load, and cleanup operators to your DAG to wrap
around your processing operators like so:
(imars-etl extract)-->(your processing operators)-->(imars-etl load)
\ \
\-----------------------------------------------\-->(clean up local files)
A FileTriggerDAG
is a DAG which checks the IMaRS product metadata database for
new files and starts up processing DAGs.
installation should be handled by the imars_airflow puppet module, but here are the general steps:
- clone the repo
- cp settings/secrets.py manually
- install repo & dependencies w/
pip install -e .
using setup.py
All tests included here can be run using pytest.
IMPORTANT: you must run pytest
from the parent directory of this repo, and you must use the python -m pytest ...
syntax.
For example: if imars_dags
is in /home/dags/imars_dags
:
cd /home/dags
python3 -m pytest ./imars_dags/
If you do not run pytest in this way your tests will throw ImportErrors because of the unusual layout of this repo.
include this from a pip requirements.txt like:
-e git://github.com/USF-IMARS/imars_dags.git@master#egg=imars_airflow_config