diff --git a/PRIVACY_NOTICE.md b/PRIVACY_NOTICE.md index f40eb5cf..d70458a6 100644 --- a/PRIVACY_NOTICE.md +++ b/PRIVACY_NOTICE.md @@ -1,3 +1,36 @@ # Privacy Notice -This project follows the [Privacy Policy of Astronomer](https://www.astronomer.io/privacy/) +This project follows the [Privacy Policy of Astronomer](https://www.astronomer.io/privacy/). + +## Collection of Data + +DAG Factory integrates [Scarf](https://about.scarf.sh/) to collect basic telemetry data during operation. +This data assists the project maintainers in better understanding how DAG Factory is used. +Insights gained from this telemetry are critical for prioritizing patches, minor releases, and +security fixes. Additionally, this information supports key decisions related to the development road map. + +Deployments and individual users can opt-out of analytics by setting the configuration: + +``` +[dag_factory] enable_telemetry False +``` + +As described in the [official documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also possible to opt out by setting one of the following environment variables: + +```commandline +DO_NOT_TRACK=True +SCARF_NO_ANALYTICS=True +``` + +In addition to Scarf's default data collection, DAG Factory collects the following information: + +- DAG Factory version +- Airflow version +- Python version +- Operating system & machine architecture +- Event type +- Number of DAGs +- Number of TaskGroups +- Number of Tasks + +No user-identifiable information (IP included) is stored in Scarf. diff --git a/dagfactory/__init__.py b/dagfactory/__init__.py index 4f14b38e..e1bfffd3 100644 --- a/dagfactory/__init__.py +++ b/dagfactory/__init__.py @@ -2,6 +2,7 @@ from .dagfactory import DagFactory, load_yaml_dags +__version__ = "0.20.0a1" __all__ = [ "DagFactory", "load_yaml_dags", diff --git a/dagfactory/__version__.py b/dagfactory/__version__.py deleted file mode 100644 index 9af95997..00000000 --- a/dagfactory/__version__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Module contains the version of dag-factory""" - -__version__ = "0.19.0" diff --git a/dagfactory/constants.py b/dagfactory/constants.py new file mode 100644 index 00000000..ac7bb6d7 --- /dev/null +++ b/dagfactory/constants.py @@ -0,0 +1,3 @@ +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{telemetry_version}/{dagfactory_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}?{query_string}" +TELEMETRY_VERSION = "v1" +TELEMETRY_TIMEOUT = 5.0 diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index a94eb5a1..cdb90053 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -135,6 +135,8 @@ def __init__(self, dag_name: str, dag_config: Dict[str, Any], default_config: Di self.dag_name: str = dag_name self.dag_config: Dict[str, Any] = deepcopy(dag_config) self.default_config: Dict[str, Any] = deepcopy(default_config) + self.tasks_count: int = 0 + self.taskgroups_count: int = 0 # pylint: disable=too-many-branches,too-many-statements def get_dag_params(self) -> Dict[str, Any]: @@ -793,12 +795,14 @@ def build(self) -> Dict[str, Union[str, DAG]]: dag.tags = dag_params.get("tags", None) tasks: Dict[str, Dict[str, Any]] = dag_params["tasks"] + self.tasks_count = len(tasks) # add a property to mark this dag as an auto-generated on dag.is_dagfactory_auto_generated = True # create dictionary of task groups task_groups_dict: Dict[str, "TaskGroup"] = self.make_task_groups(dag_params.get("task_groups", {}), dag) + self.taskgroups_count = len(task_groups_dict) # create dictionary to track tasks and set dependencies tasks_dict: Dict[str, BaseOperator] = {} diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 0864c1ca..fdb7c111 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -10,6 +10,7 @@ from airflow.configuration import conf as airflow_conf from airflow.models import DAG +from dagfactory import telemetry from dagfactory.dagbuilder import DagBuilder from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException @@ -29,6 +30,9 @@ class DagFactory: """ def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] = None) -> None: + self.dags_count: int = 0 + self.tasks_count: int = 0 + self.taskgroups_count: int = 0 assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided" if config_filepath: DagFactory._validate_config_filepath(config_filepath=config_filepath) @@ -106,9 +110,21 @@ def build_dags(self) -> Dict[str, DAG]: dags[dag["dag_id"]]: DAG = dag["dag"] except Exception as err: raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err + else: + self.dags_count += 1 + self.taskgroups_count += dag_builder.taskgroups_count + self.tasks_count += dag_builder.tasks_count return dags + def emit_telemetry(self, event_type: str) -> None: + additional_telemetry_metrics = { + "dags_count": self.dags_count, + "tasks_count": self.tasks_count, + "taskgroups_count": self.taskgroups_count, + } + telemetry.emit_usage_metrics_if_enabled(event_type, additional_telemetry_metrics) + # pylint: disable=redefined-builtin @staticmethod def register_dags(dags: Dict[str, DAG], globals: Dict[str, Any]) -> None: @@ -130,6 +146,7 @@ def generate_dags(self, globals: Dict[str, Any]) -> None: """ dags: Dict[str, Any] = self.build_dags() self.register_dags(dags, globals) + self.emit_telemetry("generate_dags") def clean_dags(self, globals: Dict[str, Any]) -> None: """ @@ -153,6 +170,8 @@ def clean_dags(self, globals: Dict[str, Any]) -> None: for dag_to_remove in dags_to_remove: del globals[dag_to_remove] + self.emit_telemetry("clean_dags") + # pylint: enable=redefined-builtin @@ -183,7 +202,10 @@ def load_yaml_dags( config_file_abs_path = str(config_file_path.absolute()) logging.info("Loading %s", config_file_abs_path) try: - DagFactory(config_file_abs_path).generate_dags(globals_dict) - logging.info("DAG loaded: %s", config_file_path) + factory = DagFactory(config_file_abs_path) + factory.generate_dags(globals_dict) except Exception: # pylint: disable=broad-except logging.exception("Failed to load dag from %s", config_file_path) + else: + factory.emit_telemetry("load_yaml_dags") + logging.info("DAG loaded: %s", config_file_path) diff --git a/dagfactory/settings.py b/dagfactory/settings.py new file mode 100644 index 00000000..12252263 --- /dev/null +++ b/dagfactory/settings.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import os + +from airflow.configuration import conf + + +def convert_to_boolean(value: str | None) -> bool: + """ + Convert a string that represents a boolean to a Python boolean. + """ + value = str(value).lower().strip() + if value in ("f", "false", "0", "", "none"): + return False + return True + + +enable_telemetry = conf.getboolean("dag_factory", "enable_telemetry", fallback=True) +do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK")) +no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS")) diff --git a/dagfactory/telemetry.py b/dagfactory/telemetry.py new file mode 100644 index 00000000..2fe3051f --- /dev/null +++ b/dagfactory/telemetry.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import logging +import platform +from urllib.parse import urlencode + +import httpx +from airflow import __version__ as airflow_version + +import dagfactory +from dagfactory import constants, settings + + +def should_emit() -> bool: + """ + Identify if telemetry metrics should be emitted or not. + """ + return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics + + +def collect_standard_usage_metrics() -> dict[str, object]: + """ + Return standard telemetry metrics. + """ + metrics = { + "dagfactory_version": dagfactory.__version__, + "airflow_version": airflow_version, + "python_version": platform.python_version(), + "platform_system": platform.system(), + "platform_machine": platform.machine(), + "variables": {}, + } + return metrics + + +def emit_usage_metrics(metrics: dict[str, object]) -> bool: + """ + Emit desired telemetry metrics to remote telemetry endpoint. + + The metrics must contain the necessary fields to build the TELEMETRY_URL. + """ + query_string = urlencode(metrics) + telemetry_url = constants.TELEMETRY_URL.format( + **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string + ) + logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT) + if not response.is_success: + logging.warning( + "Unable to emit usage metrics to %s. Status code: %s. Message: %s", + telemetry_url, + response.status_code, + response.text, + ) + return response.is_success + + +def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool: + """ + Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics + and emit them to remote telemetry endpoint. + + :returns: If the event was successfully sent to the telemetry backend or not. + """ + if should_emit(): + metrics = collect_standard_usage_metrics() + metrics["type"] = event_type + metrics["variables"].update(additional_metrics) + is_success = emit_usage_metrics(metrics) + return is_success + else: + logging.debug("Telemetry is disabled. To enable it, export AIRFLOW__DAG_FACTORY__ENABLE_TELEMETRY=True.") + return False diff --git a/pyproject.toml b/pyproject.toml index 5daab940..0cc61a2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ tests = [ dependencies = [ "dag-factory[tests]", "apache-airflow~={matrix:airflow}.0,!=2.9.0,!=2.9.1", # https://github.com/apache/airflow/pull/39670 + "httpx>=0.25.0" ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] @@ -79,7 +80,7 @@ test-cov = 'sh scripts/test/unit-cov.sh' Source = "https://github.com/astronomer/dag-factory" [tool.hatch.version] -path = "dagfactory/__version__.py" +path = "dagfactory/__init__.py" [tool.hatch.build.targets.sdist] include = ["dagfactory"] @@ -90,6 +91,7 @@ packages = ["dagfactory"] [tool.pytest.ini_options] filterwarnings = ["ignore::DeprecationWarning"] minversion = "6.0" +markers = ["integration"] ###################################### # THIRD PARTY TOOLS diff --git a/tests/fixtures/dag_factory_variables_as_arguments.yml b/tests/fixtures/dag_factory_variables_as_arguments.yml index ac1730ba..f94a4566 100644 --- a/tests/fixtures/dag_factory_variables_as_arguments.yml +++ b/tests/fixtures/dag_factory_variables_as_arguments.yml @@ -33,3 +33,14 @@ example_dag: variables_as_arguments : [ {"variable":"var1","attribute":"bash_command"} ] + +second_example_dag: + default_args: + owner: 'custom_owner' + start_date: 3 days + description: 'this is a second example dag' + schedule_interval: '0 6 * * *' + tasks: + task_0: + operator: airflow.operators.bash_operator.BashOperator + bash_command: 'echo 1' diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 9b8036dd..c195fa00 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -1,6 +1,7 @@ import datetime import logging import os +from unittest.mock import patch import pytest from airflow import __version__ as AIRFLOW_VERSION @@ -433,9 +434,14 @@ def test_load_invalid_yaml_logs_error(caplog): assert caplog.messages == ["Failed to load dag from tests/fixtures/invalid_yaml.yml"] -def test_load_yaml_dags_succeed(): +@patch("dagfactory.telemetry.emit_usage_metrics_if_enabled") +def test_load_yaml_dags_succeed(mock_emit_usage_metrics_if_enabled): load_yaml_dags( globals_dict=globals(), dags_folder="tests/fixtures", suffix=["dag_factory_variables_as_arguments.yml"], ) + # Confirm the representative telemetry for all the DAGs defined in the desired YAML is being sent + args = mock_emit_usage_metrics_if_enabled.call_args.args + assert args[0] == "load_yaml_dags" + assert args[1] == {"dags_count": 2, "tasks_count": 4, "taskgroups_count": 0} diff --git a/tests/test_settings.py b/tests/test_settings.py new file mode 100644 index 00000000..480e3388 --- /dev/null +++ b/tests/test_settings.py @@ -0,0 +1,20 @@ +import pytest + +from dagfactory import settings + + +@pytest.mark.parametrize( + "value,expected_response", + [ + ("f", False), + ("false", False), + ("0", False), + ("", False), + ("none", False), + ("True", True), + ("true", True), + ("1", True), + ], +) +def test_convert_to_boolean(value, expected_response): + assert settings.convert_to_boolean(value) == expected_response diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 00000000..6e8f65ef --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,101 @@ +import logging +from unittest.mock import patch + +import pytest + +from dagfactory import telemetry + + +def test_should_emit_is_true_by_default(): + assert telemetry.should_emit() + + +@patch("dagfactory.settings.enable_telemetry", True) +def test_should_emit_is_true_when_only_enable_telemetry_is_true(): + assert telemetry.should_emit() + + +@patch("dagfactory.settings.do_not_track", True) +def test_should_emit_is_false_when_do_not_track(): + assert not telemetry.should_emit() + + +@patch("dagfactory.settings.no_analytics", True) +def test_should_emit_is_false_when_no_analytics(): + assert not telemetry.should_emit() + + +def test_collect_standard_usage_metrics(): + metrics = telemetry.collect_standard_usage_metrics() + expected_keus = [ + "airflow_version", + "dagfactory_version", + "platform_machine", + "platform_system", + "python_version", + "variables", + ] + assert sorted(metrics.keys()) == expected_keus + + +class MockFailedResponse: + is_success = False + status_code = "404" + text = "Non existent URL" + + +@patch("dagfactory.telemetry.httpx.get", return_value=MockFailedResponse()) +def test_emit_usage_metrics_fails(mock_httpx_get, caplog): + sample_metrics = { + "dagfactory_version": "0.2.0a1", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "variables": {"a": 1, "b": 2}, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + mock_httpx_get.assert_called_once_with( + "https://astronomer.gateway.scarf.sh/dag-factory/v1/0.2.0a1/2.10.1/3.11/darwin/amd64?dagfactory_version=0.2.0a1&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&variables=%7B%27a%27%3A+1%2C+%27b%27%3A+2%7D", + timeout=5.0, + ) + assert not is_success + log_msg = "Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/dag-factory/v1/0.2.0a1/2.10.1/3.11/darwin/amd64?dagfactory_version=0.2.0a1&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&variables=%7B%27a%27%3A+1%2C+%27b%27%3A+2%7D. Status code: 404. Message: Non existent URL" + assert caplog.text.startswith("WARNING") + assert log_msg in caplog.text + + +@pytest.mark.integration +def test_emit_usage_metrics_succeeds(caplog): + caplog.set_level(logging.DEBUG) + sample_metrics = { + "dagfactory_version": "0.2.0a1", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "variables": {"a": 1, "b": 2}, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + assert is_success + assert caplog.text.startswith("DEBUG") + assert "Telemetry is enabled. Emitting the following usage metrics to" in caplog.text + + +@patch("dagfactory.telemetry.should_emit", return_value=False) +def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog): + caplog.set_level(logging.DEBUG) + assert not telemetry.emit_usage_metrics_if_enabled("any", {}) + assert caplog.text.startswith("DEBUG") + assert "Telemetry is disabled. To enable it, export AIRFLOW__DAG_FACTORY__ENABLE_TELEMETRY=True." in caplog.text + + +@patch("dagfactory.telemetry.should_emit", return_value=True) +@patch("dagfactory.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "variables": {}}) +@patch("dagfactory.telemetry.emit_usage_metrics") +def test_emit_usage_metrics_if_enabled_succeeds( + mock_emit_usage_metrics, mock_collect_standard_usage_metrics, mock_should_emit +): + assert telemetry.emit_usage_metrics_if_enabled("any", {"k2": "v2"}) + mock_emit_usage_metrics.assert_called_once() + assert mock_emit_usage_metrics.call_args.args[0] == {"k1": "v1", "variables": {"k2": "v2"}, "type": "any"}