diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index e4be3e978d9f..4828a37bfc5b 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -671,7 +671,6 @@ def string_lower_type(val): ARG_UMASK = Arg( ("-u", "--umask"), help="Set the umask of celery worker in daemon mode", - default=conf.get('celery', 'worker_umask'), ) ARG_WITHOUT_MINGLE = Arg( ("--without-mingle",), diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py index d19490caa76c..c2deb16ff4c2 100644 --- a/airflow/cli/commands/celery_command.py +++ b/airflow/cli/commands/celery_command.py @@ -75,6 +75,7 @@ def flower(args): pidfile=TimeoutPIDLockFile(pidfile, -1), stdout=stdout, stderr=stderr, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: celery_app.start(options) @@ -183,6 +184,8 @@ def worker(args): with open(stdout, 'a') as stdout_handle, open(stderr, 'a') as stderr_handle: if args.umask: umask = args.umask + else: + umask = conf.get('celery', 'worker_umask', fallback=settings.DAEMON_UMASK) stdout_handle.truncate(0) stderr_handle.truncate(0) diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index 4c4e5b46eb7d..92f7f37ee16d 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -22,6 +22,7 @@ import daemon from daemon.pidfile import TimeoutPIDLockFile +from airflow import settings from airflow.configuration import conf from airflow.dag_processing.manager import DagFileProcessorManager from airflow.utils import cli as cli_utils @@ -69,6 +70,7 @@ def dag_processor(args): files_preserve=[handle], stdout=stdout_handle, stderr=stderr_handle, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: try: diff --git a/airflow/cli/commands/kerberos_command.py b/airflow/cli/commands/kerberos_command.py index 3a33363df996..4ccd07092f47 100644 --- a/airflow/cli/commands/kerberos_command.py +++ b/airflow/cli/commands/kerberos_command.py @@ -42,6 +42,7 @@ def kerberos(args): pidfile=TimeoutPIDLockFile(pid, -1), stdout=stdout_handle, stderr=stderr_handle, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index b6f35cff9e73..22a185794fb5 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -74,6 +74,7 @@ def scheduler(args): files_preserve=[handle], stdout=stdout_handle, stderr=stderr_handle, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: _run_scheduler_job(args=args) diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index defe476a8fcc..70fbbda2fcbb 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -48,6 +48,7 @@ def triggerer(args): files_preserve=[handle], stdout=stdout_handle, stderr=stderr_handle, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: job.run() diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index da2ed32fc221..37145c0c816c 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -458,6 +458,7 @@ def monitor_gunicorn(gunicorn_master_pid: int): files_preserve=[handle], stdout=stdout, stderr=stderr, + umask=int(settings.DAEMON_UMASK, 8), ) with ctx: subprocess.Popen(run_args, close_fds=True) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index cbb808137196..c3970dc01810 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -374,6 +374,19 @@ example: ~ default: "1024" + - name: daemon_umask + description: | + The default umask to use for process when run in daemon mode (scheduler, worker, etc.) + + This controls the file-creation mode mask which determines the initial value of file permission bits + for newly created files. + + This value is treated as an octal-integer. + version_added: 2.3.4 + type: string + default: "0o077" + example: ~ + - name: database description: ~ options: @@ -1652,15 +1665,6 @@ type: boolean example: ~ default: "true" - - name: worker_umask - description: | - Umask that will be used when starting workers with the ``airflow celery worker`` - in daemon mode. This control the file-creation mode mask which determines the initial - value of file permission bits for newly created files. - version_added: 2.0.0 - type: string - example: ~ - default: "0o077" - name: broker_url description: | The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 34046f4cb49c..90fa6e0df32c 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -212,6 +212,14 @@ default_pool_task_slot_count = 128 # mapped tasks from clogging the scheduler. max_map_length = 1024 +# The default umask to use for process when run in daemon mode (scheduler, worker, etc.) +# +# This controls the file-creation mode mask which determines the initial value of file permission bits +# for newly created files. +# +# This value is treated as an octal-integer. +daemon_umask = 0o077 + [database] # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engines. @@ -834,11 +842,6 @@ worker_prefetch_multiplier = 1 # prevent this by setting this to false. However, with this disabled Flower won't work. worker_enable_remote_control = true -# Umask that will be used when starting workers with the ``airflow celery worker`` -# in daemon mode. This control the file-creation mode mask which determines the initial -# value of file permission bits for newly created files. -worker_umask = 0o077 - # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally # a sqlalchemy database. Refer to the Celery documentation for more information. broker_url = redis://redis:6379/0 diff --git a/airflow/configuration.py b/airflow/configuration.py index 5e84de6ea4b4..61d2260e5c05 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -522,7 +522,17 @@ def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: raise ValueError(f"The value {section}/{key} should be set!") return value + @overload # type: ignore[override] + def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override] + + ... + + @overload # type: ignore[override] def get(self, section: str, key: str, **kwargs) -> Optional[str]: # type: ignore[override] + + ... + + def get(self, section: str, key: str, **kwargs) -> Optional[str]: # type: ignore[override, misc] section = str(section).lower() key = str(key).lower() diff --git a/airflow/settings.py b/airflow/settings.py index bea68ec8cfc1..beaed3b4599b 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -640,3 +640,5 @@ def initialize(): # Prefix used to identify tables holding data moved during migration. AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved" + +DAEMON_UMASK: str = conf.get('core', 'daemon_umask', fallback='0o077') diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py index 304d7a7c70a1..cf430903e3ec 100644 --- a/tests/cli/commands/test_celery_command.py +++ b/tests/cli/commands/test_celery_command.py @@ -354,6 +354,7 @@ def test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_locat pidfile=mock_pid_file.return_value, stderr=mock_open.return_value, stdout=mock_open.return_value, + umask=0o077, ), mock.call.DaemonContext().__enter__(), mock.call.DaemonContext().__exit__(None, None, None), diff --git a/tests/cli/commands/test_kerberos_command.py b/tests/cli/commands/test_kerberos_command.py index 63b14cd9a8b1..b03cdfd20c36 100644 --- a/tests/cli/commands/test_kerberos_command.py +++ b/tests/cli/commands/test_kerberos_command.py @@ -75,6 +75,7 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m pidfile=mock_pid_file.return_value, stderr=mock_open.return_value, stdout=mock_open.return_value, + umask=0o077, ), mock.call.DaemonContext().__enter__(), mock.call.DaemonContext().__exit__(None, None, None),