Skip to content

Commit

Permalink
Support telemetry using Scarf (#250)
Browse files Browse the repository at this point in the history
Export telemetry related to DAG Factory usage to
[Scarf](https://about.scarf.sh/).

This data assists the project maintainers in better understanding how
DAG Factory is used. Insights from this telemetry are critical for
prioritizing patches, minor releases, and security fixes. Additionally,
this information supports critical decisions related to the development
road map.

Deployments and individual users can opt out of analytics by setting the
configuration:

```
[dag_factory] enable_telemetry False
```

As described in the [official
documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also
possible to opt-out by setting one of the following environment
variables:

```commandline
AIRFLOW__DAG_FACTORY__ENABLE_TELEMETRY=False
DO_NOT_TRACK=True
SCARF_NO_ANALYTICS=True
```

In addition to Scarf's default data collection, DAG Factory collects the
following information:

- DAG Factory version
- Airflow version
- Python version
- Operating system & machine architecture
- Event type
- Number of DAGs
- Number of TaskGroups
- Number of Tasks

No user-identifiable information (IP included) is stored in Scarf, even
though Scarf infers information from the IP, such as location, and
stores that. The data collection is GDPR compliant.

The data is not currently being emitted for pre-releases except from
integration tests.

The Apache Foundation supports this same strategy in many of its
OpenSource projects, including Airflow
([#39510](apache/airflow#39510)).

Example of visualisation of the data via the Scarf UI:

<img width="1624" alt="Screenshot 2024-10-17 at 01 56 09"
src="https://github.com/user-attachments/assets/d4191834-1e02-4192-811b-125d3fa735fe">

<img width="1624" alt="Screenshot 2024-10-17 at 01 55 59"
src="https://github.com/user-attachments/assets/cd814e11-7f77-45c8-95a0-56e29d9f9f12">

<img width="1624" alt="Screenshot 2024-10-17 at 01 55 47"
src="https://github.com/user-attachments/assets/2950ddbb-ea25-415f-b61c-3fbdcf4fc739">

<img width="1624" alt="Screenshot 2024-10-17 at 01 55 42"
src="https://github.com/user-attachments/assets/a56ecefd-0cd0-486c-9faf-026b1e9a4ceb">

Closes: #214
  • Loading branch information
tatiana authored Oct 17, 2024
1 parent 6f31846 commit 01e0871
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 8 deletions.
35 changes: 34 additions & 1 deletion PRIVACY_NOTICE.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,36 @@
# Privacy Notice

This project follows the [Privacy Policy of Astronomer](https://www.astronomer.io/privacy/)
This project follows the [Privacy Policy of Astronomer](https://www.astronomer.io/privacy/).

## Collection of Data

DAG Factory integrates [Scarf](https://about.scarf.sh/) to collect basic telemetry data during operation.
This data assists the project maintainers in better understanding how DAG Factory is used.
Insights gained from this telemetry are critical for prioritizing patches, minor releases, and
security fixes. Additionally, this information supports key decisions related to the development road map.

Deployments and individual users can opt-out of analytics by setting the configuration:

```
[dag_factory] enable_telemetry False
```

As described in the [official documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also possible to opt out by setting one of the following environment variables:

```commandline
DO_NOT_TRACK=True
SCARF_NO_ANALYTICS=True
```

In addition to Scarf's default data collection, DAG Factory collects the following information:

- DAG Factory version
- Airflow version
- Python version
- Operating system & machine architecture
- Event type
- Number of DAGs
- Number of TaskGroups
- Number of Tasks

No user-identifiable information (IP included) is stored in Scarf.
1 change: 1 addition & 0 deletions dagfactory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .dagfactory import DagFactory, load_yaml_dags

__version__ = "0.20.0a1"
__all__ = [
"DagFactory",
"load_yaml_dags",
Expand Down
3 changes: 0 additions & 3 deletions dagfactory/__version__.py

This file was deleted.

3 changes: 3 additions & 0 deletions dagfactory/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{telemetry_version}/{dagfactory_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}?{query_string}"
TELEMETRY_VERSION = "v1"
TELEMETRY_TIMEOUT = 5.0
4 changes: 4 additions & 0 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def __init__(self, dag_name: str, dag_config: Dict[str, Any], default_config: Di
self.dag_name: str = dag_name
self.dag_config: Dict[str, Any] = deepcopy(dag_config)
self.default_config: Dict[str, Any] = deepcopy(default_config)
self.tasks_count: int = 0
self.taskgroups_count: int = 0

# pylint: disable=too-many-branches,too-many-statements
def get_dag_params(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -793,12 +795,14 @@ def build(self) -> Dict[str, Union[str, DAG]]:
dag.tags = dag_params.get("tags", None)

tasks: Dict[str, Dict[str, Any]] = dag_params["tasks"]
self.tasks_count = len(tasks)

# add a property to mark this dag as an auto-generated on
dag.is_dagfactory_auto_generated = True

# create dictionary of task groups
task_groups_dict: Dict[str, "TaskGroup"] = self.make_task_groups(dag_params.get("task_groups", {}), dag)
self.taskgroups_count = len(task_groups_dict)

# create dictionary to track tasks and set dependencies
tasks_dict: Dict[str, BaseOperator] = {}
Expand Down
26 changes: 24 additions & 2 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from airflow.configuration import conf as airflow_conf
from airflow.models import DAG

from dagfactory import telemetry
from dagfactory.dagbuilder import DagBuilder
from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException

Expand All @@ -29,6 +30,9 @@ class DagFactory:
"""

def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] = None) -> None:
self.dags_count: int = 0
self.tasks_count: int = 0
self.taskgroups_count: int = 0
assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided"
if config_filepath:
DagFactory._validate_config_filepath(config_filepath=config_filepath)
Expand Down Expand Up @@ -106,9 +110,21 @@ def build_dags(self) -> Dict[str, DAG]:
dags[dag["dag_id"]]: DAG = dag["dag"]
except Exception as err:
raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err
else:
self.dags_count += 1
self.taskgroups_count += dag_builder.taskgroups_count
self.tasks_count += dag_builder.tasks_count

return dags

def emit_telemetry(self, event_type: str) -> None:
additional_telemetry_metrics = {
"dags_count": self.dags_count,
"tasks_count": self.tasks_count,
"taskgroups_count": self.taskgroups_count,
}
telemetry.emit_usage_metrics_if_enabled(event_type, additional_telemetry_metrics)

# pylint: disable=redefined-builtin
@staticmethod
def register_dags(dags: Dict[str, DAG], globals: Dict[str, Any]) -> None:
Expand All @@ -130,6 +146,7 @@ def generate_dags(self, globals: Dict[str, Any]) -> None:
"""
dags: Dict[str, Any] = self.build_dags()
self.register_dags(dags, globals)
self.emit_telemetry("generate_dags")

def clean_dags(self, globals: Dict[str, Any]) -> None:
"""
Expand All @@ -153,6 +170,8 @@ def clean_dags(self, globals: Dict[str, Any]) -> None:
for dag_to_remove in dags_to_remove:
del globals[dag_to_remove]

self.emit_telemetry("clean_dags")

# pylint: enable=redefined-builtin


Expand Down Expand Up @@ -183,7 +202,10 @@ def load_yaml_dags(
config_file_abs_path = str(config_file_path.absolute())
logging.info("Loading %s", config_file_abs_path)
try:
DagFactory(config_file_abs_path).generate_dags(globals_dict)
logging.info("DAG loaded: %s", config_file_path)
factory = DagFactory(config_file_abs_path)
factory.generate_dags(globals_dict)
except Exception: # pylint: disable=broad-except
logging.exception("Failed to load dag from %s", config_file_path)
else:
factory.emit_telemetry("load_yaml_dags")
logging.info("DAG loaded: %s", config_file_path)
20 changes: 20 additions & 0 deletions dagfactory/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations

import os

from airflow.configuration import conf


def convert_to_boolean(value: str | None) -> bool:
"""
Convert a string that represents a boolean to a Python boolean.
"""
value = str(value).lower().strip()
if value in ("f", "false", "0", "", "none"):
return False
return True


enable_telemetry = conf.getboolean("dag_factory", "enable_telemetry", fallback=True)
do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK"))
no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS"))
73 changes: 73 additions & 0 deletions dagfactory/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from __future__ import annotations

import logging
import platform
from urllib.parse import urlencode

import httpx
from airflow import __version__ as airflow_version

import dagfactory
from dagfactory import constants, settings


def should_emit() -> bool:
"""
Identify if telemetry metrics should be emitted or not.
"""
return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics


def collect_standard_usage_metrics() -> dict[str, object]:
"""
Return standard telemetry metrics.
"""
metrics = {
"dagfactory_version": dagfactory.__version__,
"airflow_version": airflow_version,
"python_version": platform.python_version(),
"platform_system": platform.system(),
"platform_machine": platform.machine(),
"variables": {},
}
return metrics


def emit_usage_metrics(metrics: dict[str, object]) -> bool:
"""
Emit desired telemetry metrics to remote telemetry endpoint.
The metrics must contain the necessary fields to build the TELEMETRY_URL.
"""
query_string = urlencode(metrics)
telemetry_url = constants.TELEMETRY_URL.format(
**metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string
)
logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT)
if not response.is_success:
logging.warning(
"Unable to emit usage metrics to %s. Status code: %s. Message: %s",
telemetry_url,
response.status_code,
response.text,
)
return response.is_success


def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool:
"""
Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics
and emit them to remote telemetry endpoint.
:returns: If the event was successfully sent to the telemetry backend or not.
"""
if should_emit():
metrics = collect_standard_usage_metrics()
metrics["type"] = event_type
metrics["variables"].update(additional_metrics)
is_success = emit_usage_metrics(metrics)
return is_success
else:
logging.debug("Telemetry is disabled. To enable it, export AIRFLOW__DAG_FACTORY__ENABLE_TELEMETRY=True.")
return False
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ tests = [
dependencies = [
"dag-factory[tests]",
"apache-airflow~={matrix:airflow}.0,!=2.9.0,!=2.9.1", # https://github.com/apache/airflow/pull/39670
"httpx>=0.25.0"
]
pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"]

Expand All @@ -79,7 +80,7 @@ test-cov = 'sh scripts/test/unit-cov.sh'
Source = "https://github.com/astronomer/dag-factory"

[tool.hatch.version]
path = "dagfactory/__version__.py"
path = "dagfactory/__init__.py"

[tool.hatch.build.targets.sdist]
include = ["dagfactory"]
Expand All @@ -90,6 +91,7 @@ packages = ["dagfactory"]
[tool.pytest.ini_options]
filterwarnings = ["ignore::DeprecationWarning"]
minversion = "6.0"
markers = ["integration"]

######################################
# THIRD PARTY TOOLS
Expand Down
11 changes: 11 additions & 0 deletions tests/fixtures/dag_factory_variables_as_arguments.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,14 @@ example_dag:
variables_as_arguments : [
{"variable":"var1","attribute":"bash_command"}
]

second_example_dag:
default_args:
owner: 'custom_owner'
start_date: 3 days
description: 'this is a second example dag'
schedule_interval: '0 6 * * *'
tasks:
task_0:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 1'
8 changes: 7 additions & 1 deletion tests/test_dagfactory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import logging
import os
from unittest.mock import patch

import pytest
from airflow import __version__ as AIRFLOW_VERSION
Expand Down Expand Up @@ -433,9 +434,14 @@ def test_load_invalid_yaml_logs_error(caplog):
assert caplog.messages == ["Failed to load dag from tests/fixtures/invalid_yaml.yml"]


def test_load_yaml_dags_succeed():
@patch("dagfactory.telemetry.emit_usage_metrics_if_enabled")
def test_load_yaml_dags_succeed(mock_emit_usage_metrics_if_enabled):
load_yaml_dags(
globals_dict=globals(),
dags_folder="tests/fixtures",
suffix=["dag_factory_variables_as_arguments.yml"],
)
# Confirm the representative telemetry for all the DAGs defined in the desired YAML is being sent
args = mock_emit_usage_metrics_if_enabled.call_args.args
assert args[0] == "load_yaml_dags"
assert args[1] == {"dags_count": 2, "tasks_count": 4, "taskgroups_count": 0}
20 changes: 20 additions & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

from dagfactory import settings


@pytest.mark.parametrize(
"value,expected_response",
[
("f", False),
("false", False),
("0", False),
("", False),
("none", False),
("True", True),
("true", True),
("1", True),
],
)
def test_convert_to_boolean(value, expected_response):
assert settings.convert_to_boolean(value) == expected_response
Loading

0 comments on commit 01e0871

Please sign in to comment.