Skip to content

Commit

Permalink
Add astro cli project to run airflow+dagfactory
Browse files Browse the repository at this point in the history
remove scripts/entrypoint.sh

Replace copy to whl build

Apply feedback

Apply feedback
  • Loading branch information
pankajastro committed Oct 21, 2024
1 parent 997614a commit b009780
Show file tree
Hide file tree
Showing 41 changed files with 538 additions and 305 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ __pycache__/
.Python
build/
bin/
include/
develop-eggs/
dist/
downloads/
Expand Down Expand Up @@ -127,3 +126,6 @@ logs/
examples/.airflowignore
airflow.cfg
webserver_config.py

# Astro
dev/include/dag_factory-*
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repos:
- id: check-yaml
args:
- --unsafe
exclude: 'tests/fixtures/dag_factory.yml|examples/invalid.yaml|tests/fixtures/invalid_yaml.yml'
exclude: 'tests/fixtures/dag_factory.yml|dev/dags/invalid.yaml|tests/fixtures/invalid_yaml.yml'
- id: debug-statements
- id: end-of-file-fixer
- id: mixed-line-ending
Expand Down
51 changes: 0 additions & 51 deletions Dockerfile

This file was deleted.

20 changes: 12 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ clean: ## Removes build and test artifacts
@find . -name '*~' -exec rm -f {} +
@find . -name '__pycache__' -exec rm -rf {} +

.PHONY: docker-build
docker-build:
@echo "==> Building docker image for local testing"
@docker build -t dag_factory:latest .

.PHONY: build-whl
build-whl: ## Build installable whl file
python3 -m build --outdir dev/include/
cd examples && ln -sf ../dev/dags/* .

.PHONY: docker-run
docker-run: docker-build ## Runs local Airflow for testing
@docker run -d -e AIRFLOW__CORE__DAGS_FOLDER=/usr/local/airflow/dags -v $(PWD)/examples:/usr/local/airflow/dags -p 127.0.0.1:8080:8080 --name=dag_factory dag_factory:latest
@echo "==> Airflow is running at http://localhost:8080"
docker-run: build-whl ## Runs local Airflow for testing
@if ! lsof -i :8080 | grep LISTEN > /dev/null; then \
cd dev && astro dev start; \
else \
cd dev && astro dev restart; \
fi

.PHONY: docker-stop
docker-stop: ## Stop Docker container
@docker stop dag_factory; docker rm dag_factory
cd dev && astro dev stop
2 changes: 2 additions & 0 deletions dev/.astro/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
project:
name: dev
1 change: 1 addition & 0 deletions dev/.astro/dag_integrity_exceptions.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Add dag files to exempt from parse test below. ex: dags/<test-file>
130 changes: 130 additions & 0 deletions dev/.astro/test_dag_integrity_default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**"""

import logging
import os
from contextlib import contextmanager

import pytest
from airflow.hooks.base import BaseHook
from airflow.models import Connection, DagBag, Variable
from airflow.utils.db import initdb

# init airflow database
initdb()

# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables


# =========== MONKEYPATCH BaseHook.get_connection() ===========
def basehook_get_connection_monkeypatch(key: str, *args, **kwargs):
print(f"Attempted to fetch connection during parse returning an empty Connection object for {key}")
return Connection(key)


BaseHook.get_connection = basehook_get_connection_monkeypatch
# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() ===========


# =========== MONKEYPATCH OS.GETENV() ===========
def os_getenv_monkeypatch(key: str, *args, **kwargs):
default = None
if args:
default = args[0] # os.getenv should get at most 1 arg after the key
if kwargs:
default = kwargs.get("default", None) # and sometimes kwarg if people are using the sig

env_value = os.environ.get(key, None)

if env_value:
return env_value # if the env_value is set, return it
if key == "JENKINS_HOME" and default is None: # fix https://github.com/astronomer/astro-cli/issues/601
return None
if default:
return default # otherwise return whatever default has been passed
return f"MOCKED_{key.upper()}_VALUE" # if absolutely nothing has been passed - return the mocked value


os.getenv = os_getenv_monkeypatch
# # =========== /MONKEYPATCH OS.GETENV() ===========

# =========== MONKEYPATCH VARIABLE.GET() ===========


class magic_dict(dict):
def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)

def __getitem__(self, key):
return {}.get(key, "MOCKED_KEY_VALUE")


_no_default = object() # allow falsey defaults


def variable_get_monkeypatch(key: str, default_var=_no_default, deserialize_json=False):
print(f"Attempted to get Variable value during parse, returning a mocked value for {key}")

if default_var is not _no_default:
return default_var
if deserialize_json:
return magic_dict()
return "NON_DEFAULT_MOCKED_VARIABLE_VALUE"


Variable.get = variable_get_monkeypatch
# # =========== /MONKEYPATCH VARIABLE.GET() ===========


@contextmanager
def suppress_logging(namespace):
"""
Suppress logging within a specific namespace to keep tests "clean" during build
"""
logger = logging.getLogger(namespace)
old_value = logger.disabled
logger.disabled = True
try:
yield
finally:
logger.disabled = old_value


def get_import_errors():
"""
Generate a tuple for import errors in the dag bag, and include DAGs without errors.
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)

def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))

# Initialize an empty list to store the tuples
result = []

# Iterate over the items in import_errors
for k, v in dag_bag.import_errors.items():
result.append((strip_path_prefix(k), v.strip()))

# Check if there are DAGs without errors
for file_path in dag_bag.dags:
# Check if the file_path is not in import_errors, meaning no errors
if file_path not in dag_bag.import_errors:
result.append((strip_path_prefix(file_path), "No import errors"))

return result


@pytest.mark.parametrize("rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()])
def test_file_imports(rel_path, rv):
"""Test for import errors on a file"""
if os.path.exists(".astro/dag_integrity_exceptions.txt"):
with open(".astro/dag_integrity_exceptions.txt", "r") as f:
exceptions = f.readlines()
print(f"Exceptions: {exceptions}")
if (rv != "No import errors") and rel_path not in exceptions:
# If rv is not "No import errors," consider it a failed test
raise Exception(f"{rel_path} failed to import with message \n {rv}")
else:
# If rv is "No import errors," consider it a passed test
print(f"{rel_path} passed the import test")
8 changes: 8 additions & 0 deletions dev/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
astro
.git
.env
airflow_settings.yaml
logs/
.venv
airflow.db
airflow.cfg
11 changes: 11 additions & 0 deletions dev/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.git
.env
.DS_Store
airflow_settings.yaml
__pycache__/
astro
.venv
airflow-webserver.pid
webserver_config.py
airflow.cfg
airflow.db
5 changes: 5 additions & 0 deletions dev/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM quay.io/astronomer/astro-runtime:12.2.0

ENV CONFIG_ROOT_DIR=/usr/local/airflow/dags/

RUN pip install /usr/local/airflow/include/*.whl
48 changes: 48 additions & 0 deletions dev/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Overview
========

Welcome to Astronomer! This project was generated after you ran 'astro dev init' using the Astronomer CLI. This readme describes the contents of the project, as well as how to run Apache Airflow on your local machine.

Project Contents
================

Your Astro project contains the following files and folders:

- dags: This folder contains the Python files for your Airflow DAGs. By default, this directory includes one example DAG:
- `example_astronauts`: This DAG shows a simple ETL pipeline example that queries the list of astronauts currently in space from the Open Notify API and prints a statement for each astronaut. The DAG uses the TaskFlow API to define tasks in Python, and dynamic task mapping to dynamically print a statement for each astronaut. For more on how this DAG works, see our [Getting started tutorial](https://www.astronomer.io/docs/learn/get-started-with-airflow).
- Dockerfile: This file contains a versioned Astro Runtime Docker image that provides a differentiated Airflow experience. If you want to execute other commands or overrides at runtime, specify them here.
- include: This folder contains any additional files that you want to include as part of your project. It is empty by default.
- packages.txt: Install OS-level packages needed for your project by adding them to this file. It is empty by default.
- requirements.txt: Install Python packages needed for your project by adding them to this file. It is empty by default.
- plugins: Add custom or community plugins for your project to this file. It is empty by default.
- airflow_settings.yaml: Use this local-only file to specify Airflow Connections, Variables, and Pools instead of entering them in the Airflow UI as you develop DAGs in this project.

Deploy Your Project Locally
===========================

1. Start Airflow on your local machine by running 'astro dev start'.

This command will spin up 4 Docker containers on your machine, each for a different Airflow component:

- Postgres: Airflow's Metadata Database
- Webserver: The Airflow component responsible for rendering the Airflow UI
- Scheduler: The Airflow component responsible for monitoring and triggering tasks
- Triggerer: The Airflow component responsible for triggering deferred tasks

2. Verify that all 4 Docker containers were created by running 'docker ps'.

Note: Running 'astro dev start' will start your project with the Airflow Webserver exposed at port 8080 and Postgres exposed at port 5432. If you already have either of those ports allocated, you can either [stop your existing Docker containers or change the port](https://www.astronomer.io/docs/astro/cli/troubleshoot-locally#ports-are-not-available-for-my-local-airflow-webserver).

3. Access the Airflow UI for your local Airflow project. To do so, go to http://localhost:8080/ and log in with 'admin' for both your Username and Password.

You should also be able to access your Postgres Database at 'localhost:5432/postgres'.

Deploy Your Project to Astronomer
=================================

If you have an Astronomer account, pushing code to a Deployment on Astronomer is simple. For deploying instructions, refer to Astronomer documentation: https://www.astronomer.io/docs/astro/deploy-code/

Contact
=======

The Astronomer CLI is maintained with love by the Astronomer team. To report a bug or suggest a change, reach out to our support.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
17 changes: 17 additions & 0 deletions dev/dags/example_customize_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_customize_operator.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
46 changes: 46 additions & 0 deletions dev/dags/example_customize_operator.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
default:
default_args:
owner: "default_owner"
start_date: 2020-01-01
retries: 1
retry_delay_sec: 300
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 600
default_view: "tree"
orientation: "LR"
schedule_interval: "0 1 * * *"

example_breadfast:
default_args:
owner: "custom_owner"
start_date: 2 days
description: "this is an customized operator dag"
schedule_interval: "0 3 * * *"
tasks:
begin:
operator: airflow.operators.dummy_operator.DummyOperator
make_bread_1:
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Sourdough'
dependencies:
- begin
make_bread_2:
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Multigrain'
dependencies:
- begin
make_coffee_1:
operator: customized.operators.breakfast_operators.MakeCoffeeOperator
coffee_type: 'Black'
dependencies:
- begin
- make_bread_1
- make_bread_2
end:
operator: airflow.operators.dummy_operator.DummyOperator
dependencies:
- begin
- make_bread_1
- make_bread_2
- make_coffee_1
17 changes: 17 additions & 0 deletions dev/dags/example_dag_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
Loading

0 comments on commit b009780

Please sign in to comment.