diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 61491d860922f..e32b8ccf0eb5d 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -445,8 +445,8 @@ ``store_serialized_dags`` setting. version_added: 1.10.10 type: string - example: ~ - default: "%(store_serialized_dags)s" + example: "False" + default: ~ - name: max_num_rendered_ti_fields_per_task description: | Maximum number of Rendered Task Instance Fields (Template Fields) per task to store diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 2cc97e2f96193..c75d3ae32611e 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -232,7 +232,8 @@ min_serialized_dag_update_interval = 30 # If set to True, Webserver reads file contents from DB instead of # trying to access files in a DAG folder. Defaults to same as the # ``store_serialized_dags`` setting. -store_dag_code = %(store_serialized_dags)s +# Example: store_dag_code = False +# store_dag_code = # Maximum number of Rendered Task Instance Fields (Template Fields) per task to store # in the Database. diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 7759cb332edc5..933dc1026b63b 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1529,7 +1529,7 @@ def sync_to_db(self, owner=None, sync_time=None, session=None): orm_dag.schedule_interval = self.schedule_interval orm_dag.tags = self.get_dagtags(session=session) - if conf.getboolean('core', 'store_dag_code', fallback=False): + if settings.STORE_DAG_CODE: DagCode.bulk_sync_to_db([orm_dag.fileloc]) session.commit() diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py index 513ec185be6e5..6aa7b6a87e465 100644 --- a/airflow/models/dagcode.py +++ b/airflow/models/dagcode.py @@ -21,9 +21,9 @@ from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists -from airflow.configuration import conf from airflow.exceptions import AirflowException, DagCodeNotFound from airflow.models import Base +from airflow.settings import STORE_DAG_CODE from airflow.utils import timezone from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped from airflow.utils.db import provide_session @@ -178,7 +178,7 @@ def code(cls, fileloc): :return: source code as string """ - if conf.getboolean('core', 'store_dag_code', fallback=False): + if STORE_DAG_CODE: return cls._get_code_from_db(fileloc) else: return cls._get_code_from_file(fileloc) diff --git a/airflow/settings.py b/airflow/settings.py index 513f19274f0c0..c86d4b29ce7b9 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -401,6 +401,11 @@ def initialize(): MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint( 'core', 'min_serialized_dag_update_interval', fallback=30) +# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents +# from DB instead of trying to access files in a DAG folder. +# Defaults to same as the store_serialized_dags setting. +STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS) + # If donot_modify_handlers=True, we do not modify logging handlers in task_run command # If the flag is set to False, we remove all handlers from the root logger # and add all handlers from 'airflow.task' logger to the root Logger. This is done diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 6e4e045957417..3aac8fd315989 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -50,7 +50,7 @@ from airflow.exceptions import AirflowException from airflow.settings import Stats from airflow.models import errors -from airflow.settings import STORE_SERIALIZED_DAGS +from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS from airflow.utils import timezone from airflow.utils.helpers import reap_process_group from airflow.utils.db import provide_session @@ -914,7 +914,7 @@ def _refresh_dag_dir(self): SerializedDagModel.remove_deleted_dags(self._file_paths) DagModel.deactivate_deleted_dags(self._file_paths) - if conf.getboolean('core', 'store_dag_code', fallback=False): + if STORE_DAG_CODE: from airflow.models.dagcode import DagCode DagCode.remove_deleted_code(self._file_paths) diff --git a/tests/test_configuration.py b/tests/test_configuration.py index af1df69a2d14b..5c40cad11955c 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -32,6 +32,7 @@ from airflow import configuration from airflow.configuration import conf, AirflowConfigParser, parameterized_config from tests.compat import mock +from tests.test_utils.config import conf_vars from tests.test_utils.reset_warning_registry import reset_warning_registry if six.PY2: @@ -494,3 +495,22 @@ def test_write_should_respect_env_variable(self): conf.write(string_file) content = string_file.getvalue() self.assertIn("dags_folder = /tmp/test_folder", content) + + @conf_vars({("core", "store_serialized_dags"): "True"}) + def test_store_dag_code_default_config(self): + store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False) + store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags) + self.assertFalse(conf.has_option("core", "store_dag_code")) + self.assertTrue(store_serialized_dags) + self.assertTrue(store_dag_code) + + @conf_vars({ + ("core", "store_serialized_dags"): "True", + ("core", "store_dag_code"): "False" + }) + def test_store_dag_code_config_when_set(self): + store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False) + store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags) + self.assertTrue(conf.has_option("core", "store_dag_code")) + self.assertTrue(store_serialized_dags) + self.assertFalse(store_dag_code)