From 01e0871ec7bc3a58d94b2552caeeab6a855d8338 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 17 Oct 2024 09:25:13 +0100 Subject: [PATCH] Support telemetry using Scarf (#250) Export telemetry related to DAG Factory usage to [Scarf](https://about.scarf.sh/). This data assists the project maintainers in better understanding how DAG Factory is used. Insights from this telemetry are critical for prioritizing patches, minor releases, and security fixes. Additionally, this information supports critical decisions related to the development road map. Deployments and individual users can opt out of analytics by setting the configuration: ``` [dag_factory] enable_telemetry False ``` As described in the [official documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also possible to opt-out by setting one of the following environment variables: ```commandline AIRFLOW__DAG_FACTORY__ENABLE_TELEMETRY=False DO_NOT_TRACK=True SCARF_NO_ANALYTICS=True ``` In addition to Scarf's default data collection, DAG Factory collects the following information: - DAG Factory version - Airflow version - Python version - Operating system & machine architecture - Event type - Number of DAGs - Number of TaskGroups - Number of Tasks No user-identifiable information (IP included) is stored in Scarf, even though Scarf infers information from the IP, such as location, and stores that. The data collection is GDPR compliant. The data is not currently being emitted for pre-releases except from integration tests. The Apache Foundation supports this same strategy in many of its OpenSource projects, including Airflow ([#39510](https://github.com/apache/airflow/pull/39510)). Example of visualisation of the data via the Scarf UI: Screenshot 2024-10-17 at 01 56 09 Screenshot 2024-10-17 at 01 55 59 Screenshot 2024-10-17 at 01 55 47 Screenshot 2024-10-17 at 01 55 42 Closes: #214 --- PRIVACY_NOTICE.md | 35 +++++- dagfactory/__init__.py | 1 + dagfactory/__version__.py | 3 - dagfactory/constants.py | 3 + dagfactory/dagbuilder.py | 4 + dagfactory/dagfactory.py | 26 ++++- dagfactory/settings.py | 20 ++++ dagfactory/telemetry.py | 73 +++++++++++++ pyproject.toml | 4 +- .../dag_factory_variables_as_arguments.yml | 11 ++ tests/test_dagfactory.py | 8 +- tests/test_settings.py | 20 ++++ tests/test_telemetry.py | 101 ++++++++++++++++++ 13 files changed, 301 insertions(+), 8 deletions(-) delete mode 100644 dagfactory/__version__.py create mode 100644 dagfactory/constants.py create mode 100644 dagfactory/settings.py create mode 100644 dagfactory/telemetry.py create mode 100644 tests/test_settings.py create mode 100644 tests/test_telemetry.py diff --git a/PRIVACY_NOTICE.md b/PRIVACY_NOTICE.md index f40eb5cf..d70458a6 100644 --- a/PRIVACY_NOTICE.md +++ b/PRIVACY_NOTICE.md @@ -1,3 +1,36 @@ # Privacy Notice -This project follows the [Privacy Policy of Astronomer](https://www.astronomer.io/privacy/) +This project follows the [Privacy Policy of Astronomer](https://www.astronomer.io/privacy/). + +## Collection of Data + +DAG Factory integrates [Scarf](https://about.scarf.sh/) to collect basic telemetry data during operation. +This data assists the project maintainers in better understanding how DAG Factory is used. +Insights gained from this telemetry are critical for prioritizing patches, minor releases, and +security fixes. Additionally, this information supports key decisions related to the development road map. + +Deployments and individual users can opt-out of analytics by setting the configuration: + +``` +[dag_factory] enable_telemetry False +``` + +As described in the [official documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also possible to opt out by setting one of the following environment variables: + +```commandline +DO_NOT_TRACK=True +SCARF_NO_ANALYTICS=True +``` + +In addition to Scarf's default data collection, DAG Factory collects the following information: + +- DAG Factory version +- Airflow version +- Python version +- Operating system & machine architecture +- Event type +- Number of DAGs +- Number of TaskGroups +- Number of Tasks + +No user-identifiable information (IP included) is stored in Scarf. diff --git a/dagfactory/__init__.py b/dagfactory/__init__.py index 4f14b38e..e1bfffd3 100644 --- a/dagfactory/__init__.py +++ b/dagfactory/__init__.py @@ -2,6 +2,7 @@ from .dagfactory import DagFactory, load_yaml_dags +__version__ = "0.20.0a1" __all__ = [ "DagFactory", "load_yaml_dags", diff --git a/dagfactory/__version__.py b/dagfactory/__version__.py deleted file mode 100644 index 9af95997..00000000 --- a/dagfactory/__version__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Module contains the version of dag-factory""" - -__version__ = "0.19.0" diff --git a/dagfactory/constants.py b/dagfactory/constants.py new file mode 100644 index 00000000..ac7bb6d7 --- /dev/null +++ b/dagfactory/constants.py @@ -0,0 +1,3 @@ +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{telemetry_version}/{dagfactory_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}?{query_string}" +TELEMETRY_VERSION = "v1" +TELEMETRY_TIMEOUT = 5.0 diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index a94eb5a1..cdb90053 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -135,6 +135,8 @@ def __init__(self, dag_name: str, dag_config: Dict[str, Any], default_config: Di self.dag_name: str = dag_name self.dag_config: Dict[str, Any] = deepcopy(dag_config) self.default_config: Dict[str, Any] = deepcopy(default_config) + self.tasks_count: int = 0 + self.taskgroups_count: int = 0 # pylint: disable=too-many-branches,too-many-statements def get_dag_params(self) -> Dict[str, Any]: @@ -793,12 +795,14 @@ def build(self) -> Dict[str, Union[str, DAG]]: dag.tags = dag_params.get("tags", None) tasks: Dict[str, Dict[str, Any]] = dag_params["tasks"] + self.tasks_count = len(tasks) # add a property to mark this dag as an auto-generated on dag.is_dagfactory_auto_generated = True # create dictionary of task groups task_groups_dict: Dict[str, "TaskGroup"] = self.make_task_groups(dag_params.get("task_groups", {}), dag) + self.taskgroups_count = len(task_groups_dict) # create dictionary to track tasks and set dependencies tasks_dict: Dict[str, BaseOperator] = {} diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 0864c1ca..fdb7c111 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -10,6 +10,7 @@ from airflow.configuration import conf as airflow_conf from airflow.models import DAG +from dagfactory import telemetry from dagfactory.dagbuilder import DagBuilder from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException @@ -29,6 +30,9 @@ class DagFactory: """ def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] = None) -> None: + self.dags_count: int = 0 + self.tasks_count: int = 0 + self.taskgroups_count: int = 0 assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided" if config_filepath: DagFactory._validate_config_filepath(config_filepath=config_filepath) @@ -106,9 +110,21 @@ def build_dags(self) -> Dict[str, DAG]: dags[dag["dag_id"]]: DAG = dag["dag"] except Exception as err: raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err + else: + self.dags_count += 1 + self.taskgroups_count += dag_builder.taskgroups_count + self.tasks_count += dag_builder.tasks_count return dags + def emit_telemetry(self, event_type: str) -> None: + additional_telemetry_metrics = { + "dags_count": self.dags_count, + "tasks_count": self.tasks_count, + "taskgroups_count": self.taskgroups_count, + } + telemetry.emit_usage_metrics_if_enabled(event_type, additional_telemetry_metrics) + # pylint: disable=redefined-builtin @staticmethod def register_dags(dags: Dict[str, DAG], globals: Dict[str, Any]) -> None: @@ -130,6 +146,7 @@ def generate_dags(self, globals: Dict[str, Any]) -> None: """ dags: Dict[str, Any] = self.build_dags() self.register_dags(dags, globals) + self.emit_telemetry("generate_dags") def clean_dags(self, globals: Dict[str, Any]) -> None: """ @@ -153,6 +170,8 @@ def clean_dags(self, globals: Dict[str, Any]) -> None: for dag_to_remove in dags_to_remove: del globals[dag_to_remove] + self.emit_telemetry("clean_dags") + # pylint: enable=redefined-builtin @@ -183,7 +202,10 @@ def load_yaml_dags( config_file_abs_path = str(config_file_path.absolute()) logging.info("Loading %s", config_file_abs_path) try: - DagFactory(config_file_abs_path).generate_dags(globals_dict) - logging.info("DAG loaded: %s", config_file_path) + factory = DagFactory(config_file_abs_path) + factory.generate_dags(globals_dict) except Exception: # pylint: disable=broad-except logging.exception("Failed to load dag from %s", config_file_path) + else: + factory.emit_telemetry("load_yaml_dags") + logging.info("DAG loaded: %s", config_file_path) diff --git a/dagfactory/settings.py b/dagfactory/settings.py new file mode 100644 index 00000000..12252263 --- /dev/null +++ b/dagfactory/settings.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import os + +from airflow.configuration import conf + + +def convert_to_boolean(value: str | None) -> bool: + """ + Convert a string that represents a boolean to a Python boolean. + """ + value = str(value).lower().strip() + if value in ("f", "false", "0", "", "none"): + return False + return True + + +enable_telemetry = conf.getboolean("dag_factory", "enable_telemetry", fallback=True) +do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK")) +no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS")) diff --git a/dagfactory/telemetry.py b/dagfactory/telemetry.py new file mode 100644 index 00000000..2fe3051f --- /dev/null +++ b/dagfactory/telemetry.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import logging +import platform +from urllib.parse import urlencode + +import httpx +from airflow import __version__ as airflow_version + +import dagfactory +from dagfactory import constants, settings + + +def should_emit() -> bool: + """ + Identify if telemetry metrics should be emitted or not. + """ + return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics + + +def collect_standard_usage_metrics() -> dict[str, object]: + """ + Return standard telemetry metrics. + """ + metrics = { + "dagfactory_version": dagfactory.__version__, + "airflow_version": airflow_version, + "python_version": platform.python_version(), + "platform_system": platform.system(), + "platform_machine": platform.machine(), + "variables": {}, + } + return metrics + + +def emit_usage_metrics(metrics: dict[str, object]) -> bool: + """ + Emit desired telemetry metrics to remote telemetry endpoint. + + The metrics must contain the necessary fields to build the TELEMETRY_URL. + """ + query_string = urlencode(metrics) + telemetry_url = constants.TELEMETRY_URL.format( + **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string + ) + logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT) + if not response.is_success: + logging.warning( + "Unable to emit usage metrics to %s. Status code: %s. Message: %s", + telemetry_url, + response.status_code, + response.text, + ) + return response.is_success + + +def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool: + """ + Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics + and emit them to remote telemetry endpoint. + + :returns: If the event was successfully sent to the telemetry backend or not. + """ + if should_emit(): + metrics = collect_standard_usage_metrics() + metrics["type"] = event_type + metrics["variables"].update(additional_metrics) + is_success = emit_usage_metrics(metrics) + return is_success + else: + logging.debug("Telemetry is disabled. To enable it, export AIRFLOW__DAG_FACTORY__ENABLE_TELEMETRY=True.") + return False diff --git a/pyproject.toml b/pyproject.toml index 5daab940..0cc61a2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ tests = [ dependencies = [ "dag-factory[tests]", "apache-airflow~={matrix:airflow}.0,!=2.9.0,!=2.9.1", # https://github.com/apache/airflow/pull/39670 + "httpx>=0.25.0" ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] @@ -79,7 +80,7 @@ test-cov = 'sh scripts/test/unit-cov.sh' Source = "https://github.com/astronomer/dag-factory" [tool.hatch.version] -path = "dagfactory/__version__.py" +path = "dagfactory/__init__.py" [tool.hatch.build.targets.sdist] include = ["dagfactory"] @@ -90,6 +91,7 @@ packages = ["dagfactory"] [tool.pytest.ini_options] filterwarnings = ["ignore::DeprecationWarning"] minversion = "6.0" +markers = ["integration"] ###################################### # THIRD PARTY TOOLS diff --git a/tests/fixtures/dag_factory_variables_as_arguments.yml b/tests/fixtures/dag_factory_variables_as_arguments.yml index ac1730ba..f94a4566 100644 --- a/tests/fixtures/dag_factory_variables_as_arguments.yml +++ b/tests/fixtures/dag_factory_variables_as_arguments.yml @@ -33,3 +33,14 @@ example_dag: variables_as_arguments : [ {"variable":"var1","attribute":"bash_command"} ] + +second_example_dag: + default_args: + owner: 'custom_owner' + start_date: 3 days + description: 'this is a second example dag' + schedule_interval: '0 6 * * *' + tasks: + task_0: + operator: airflow.operators.bash_operator.BashOperator + bash_command: 'echo 1' diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 9b8036dd..c195fa00 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -1,6 +1,7 @@ import datetime import logging import os +from unittest.mock import patch import pytest from airflow import __version__ as AIRFLOW_VERSION @@ -433,9 +434,14 @@ def test_load_invalid_yaml_logs_error(caplog): assert caplog.messages == ["Failed to load dag from tests/fixtures/invalid_yaml.yml"] -def test_load_yaml_dags_succeed(): +@patch("dagfactory.telemetry.emit_usage_metrics_if_enabled") +def test_load_yaml_dags_succeed(mock_emit_usage_metrics_if_enabled): load_yaml_dags( globals_dict=globals(), dags_folder="tests/fixtures", suffix=["dag_factory_variables_as_arguments.yml"], ) + # Confirm the representative telemetry for all the DAGs defined in the desired YAML is being sent + args = mock_emit_usage_metrics_if_enabled.call_args.args + assert args[0] == "load_yaml_dags" + assert args[1] == {"dags_count": 2, "tasks_count": 4, "taskgroups_count": 0} diff --git a/tests/test_settings.py b/tests/test_settings.py new file mode 100644 index 00000000..480e3388 --- /dev/null +++ b/tests/test_settings.py @@ -0,0 +1,20 @@ +import pytest + +from dagfactory import settings + + +@pytest.mark.parametrize( + "value,expected_response", + [ + ("f", False), + ("false", False), + ("0", False), + ("", False), + ("none", False), + ("True", True), + ("true", True), + ("1", True), + ], +) +def test_convert_to_boolean(value, expected_response): + assert settings.convert_to_boolean(value) == expected_response diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 00000000..6e8f65ef --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,101 @@ +import logging +from unittest.mock import patch + +import pytest + +from dagfactory import telemetry + + +def test_should_emit_is_true_by_default(): + assert telemetry.should_emit() + + +@patch("dagfactory.settings.enable_telemetry", True) +def test_should_emit_is_true_when_only_enable_telemetry_is_true(): + assert telemetry.should_emit() + + +@patch("dagfactory.settings.do_not_track", True) +def test_should_emit_is_false_when_do_not_track(): + assert not telemetry.should_emit() + + +@patch("dagfactory.settings.no_analytics", True) +def test_should_emit_is_false_when_no_analytics(): + assert not telemetry.should_emit() + + +def test_collect_standard_usage_metrics(): + metrics = telemetry.collect_standard_usage_metrics() + expected_keus = [ + "airflow_version", + "dagfactory_version", + "platform_machine", + "platform_system", + "python_version", + "variables", + ] + assert sorted(metrics.keys()) == expected_keus + + +class MockFailedResponse: + is_success = False + status_code = "404" + text = "Non existent URL" + + +@patch("dagfactory.telemetry.httpx.get", return_value=MockFailedResponse()) +def test_emit_usage_metrics_fails(mock_httpx_get, caplog): + sample_metrics = { + "dagfactory_version": "0.2.0a1", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "variables": {"a": 1, "b": 2}, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + mock_httpx_get.assert_called_once_with( + "https://astronomer.gateway.scarf.sh/dag-factory/v1/0.2.0a1/2.10.1/3.11/darwin/amd64?dagfactory_version=0.2.0a1&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&variables=%7B%27a%27%3A+1%2C+%27b%27%3A+2%7D", + timeout=5.0, + ) + assert not is_success + log_msg = "Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/dag-factory/v1/0.2.0a1/2.10.1/3.11/darwin/amd64?dagfactory_version=0.2.0a1&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&variables=%7B%27a%27%3A+1%2C+%27b%27%3A+2%7D. Status code: 404. Message: Non existent URL" + assert caplog.text.startswith("WARNING") + assert log_msg in caplog.text + + +@pytest.mark.integration +def test_emit_usage_metrics_succeeds(caplog): + caplog.set_level(logging.DEBUG) + sample_metrics = { + "dagfactory_version": "0.2.0a1", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "variables": {"a": 1, "b": 2}, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + assert is_success + assert caplog.text.startswith("DEBUG") + assert "Telemetry is enabled. Emitting the following usage metrics to" in caplog.text + + +@patch("dagfactory.telemetry.should_emit", return_value=False) +def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog): + caplog.set_level(logging.DEBUG) + assert not telemetry.emit_usage_metrics_if_enabled("any", {}) + assert caplog.text.startswith("DEBUG") + assert "Telemetry is disabled. To enable it, export AIRFLOW__DAG_FACTORY__ENABLE_TELEMETRY=True." in caplog.text + + +@patch("dagfactory.telemetry.should_emit", return_value=True) +@patch("dagfactory.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "variables": {}}) +@patch("dagfactory.telemetry.emit_usage_metrics") +def test_emit_usage_metrics_if_enabled_succeeds( + mock_emit_usage_metrics, mock_collect_standard_usage_metrics, mock_should_emit +): + assert telemetry.emit_usage_metrics_if_enabled("any", {"k2": "v2"}) + mock_emit_usage_metrics.assert_called_once() + assert mock_emit_usage_metrics.call_args.args[0] == {"k1": "v1", "variables": {"k2": "v2"}, "type": "any"}