This plugin includes a single operator that makes it easier to pass intermediary files between tasks on S3.
The LuigiOperator cuts out all the boilerplate you need to save intermediary files between tasks and share the file references to downstream tasks that need to load it. The operator saves these intermediary files on S3, instead of the worker's filesystem to satisfy Airflow best practices.
LuigiOperators "talk to each other" to pass an arbitrary python callable a dictionary of upstream_input_paths
and an output_path
to temporary files to be loaded and saved to respectively.
For a given task the flow of information is:
input file on S3 -> temp file -> python transformation -> temp file -> output file on S3
The LuigiOperator accomplishes this flow by handling the I/O on S3 and sharing the S3 key to downstream tasks over xcoms
An example:
dag = DAG("my_dag")
def upstream_transform(upstream_input_paths: dict = None,
output_path: str = None,
**context):
"""
Don't need to load upstream_input_paths because there aren't any.
Dumps text file to output_path, which is just a tempfile generated by the LuigiOperator
"""
with open(output_path) as f:
f.write("Initial output")
upstream = LuigiOperator(task_id="upstream",
tranform_callable=upstream_transform,
bucket_name="dag_interim_files",
output_file_name="{{ ti.name }}-{{ run_id }}.txt",
dag=dag)
def downstream_transform(upstream_input_paths=None,
output_path=None,
**context):
"""
Add 'My input was:' to the output from upstream task
"""
with open(upstream_input_paths["upstream"]
) as f: # Reference path with upstream task id
input_value = f.read()
with open(output_path, "r") as f:
f.write("My input was: " + input_value)
downstream = LuigiOperator(task_id="downstream",
tranform_callable=downstream_transform,
bucket_name="dag_interim_files",
output_file_name="{{ ti.name }}-{{ run_id }}.txt",
dag=dag)
# Set dependency (dependencies control which input paths are wired into a task)
upstream >> downstream
After running this dag you would find the interim files for the upstream and downstream tasks on the S3 bucket.
pip install git+https://github.com/gusostow/airflow-luigi-plugin
After installation, the operator is available like this
from airflow.operators.luigi_plugin import LuigiOperator