-
Notifications
You must be signed in to change notification settings - Fork 14.4k
/
configuration.py
2326 lines (2048 loc) · 98 KB
/
configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import contextlib
import datetime
import functools
import itertools
import json
import logging
import multiprocessing
import os
import pathlib
import shlex
import stat
import subprocess
import sys
import warnings
from base64 import b64encode
from configparser import ConfigParser, NoOptionError, NoSectionError
from contextlib import contextmanager
from copy import deepcopy
from io import StringIO
from json.decoder import JSONDecodeError
from typing import IO, TYPE_CHECKING, Any, Dict, Generator, Iterable, Pattern, Set, Tuple, Union
from urllib.parse import urlsplit
import re2
from packaging.version import parse as parse_version
from typing_extensions import overload
from airflow.exceptions import AirflowConfigException
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
from airflow.utils import yaml
from airflow.utils.empty_set import _get_empty_set_for_configuration
from airflow.utils.module_loading import import_string
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.weight_rule import WeightRule
if TYPE_CHECKING:
from airflow.auth.managers.base_auth_manager import BaseAuthManager
from airflow.secrets import BaseSecretsBackend
log = logging.getLogger(__name__)
# show Airflow's deprecation warnings
if not sys.warnoptions:
warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
_SQLITE3_VERSION_PATTERN = re2.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
ConfigType = Union[str, int, float, bool]
ConfigOptionsDictType = Dict[str, ConfigType]
ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
ConfigSourcesType = Dict[str, ConfigSectionSourcesType]
ENV_VAR_PREFIX = "AIRFLOW__"
def _parse_sqlite_version(s: str) -> tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
if match is None:
return ()
return tuple(int(p) for p in match.group("version").split("."))
@overload
def expand_env_var(env_var: None) -> None:
...
@overload
def expand_env_var(env_var: str) -> str:
...
def expand_env_var(env_var: str | None) -> str | None:
"""
Expand (potentially nested) env vars.
Repeat and apply `expandvars` and `expanduser` until
interpolation stops having any effect.
"""
if not env_var:
return env_var
while True:
interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
if interpolated == env_var:
return interpolated
else:
env_var = interpolated
def run_command(command: str) -> str:
"""Run command and returns stdout."""
process = subprocess.Popen(
shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
)
output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
if process.returncode != 0:
raise AirflowConfigException(
f"Cannot execute {command}. Error code is: {process.returncode}. "
f"Output: {output}, Stderr: {stderr}"
)
return output
def _get_config_value_from_secret_backend(config_key: str) -> str | None:
"""Get Config option values from Secret Backend."""
try:
secrets_client = get_custom_secret_backend()
if not secrets_client:
return None
return secrets_client.get_config(config_key)
except Exception as e:
raise AirflowConfigException(
"Cannot retrieve config from alternative secrets backend. "
"Make sure it is configured properly and that the Backend "
"is accessible.\n"
f"{e}"
)
def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool:
"""
Check if the config is a template.
:param configuration_description: description of configuration
:param section: section
:param key: key
:return: True if the config is a template
"""
return configuration_description.get(section, {}).get(key, {}).get("is_template", False)
def _default_config_file_path(file_name: str) -> str:
templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
return os.path.join(templates_dir, file_name)
def retrieve_configuration_description(
include_airflow: bool = True,
include_providers: bool = True,
selected_provider: str | None = None,
) -> dict[str, dict[str, Any]]:
"""
Read Airflow configuration description from YAML file.
:param include_airflow: Include Airflow configs
:param include_providers: Include provider configs
:param selected_provider: If specified, include selected provider only
:return: Python dictionary containing configs & their info
"""
base_configuration_description: dict[str, dict[str, Any]] = {}
if include_airflow:
with open(_default_config_file_path("config.yml")) as config_file:
base_configuration_description.update(yaml.safe_load(config_file))
if include_providers:
from airflow.providers_manager import ProvidersManager
for provider, config in ProvidersManager().provider_configs:
if not selected_provider or provider == selected_provider:
base_configuration_description.update(config)
return base_configuration_description
class AirflowConfigParser(ConfigParser):
"""
Custom Airflow Configparser supporting defaults and deprecated options.
This is a subclass of ConfigParser that supports defaults and deprecated options.
The defaults are stored in the ``_default_values ConfigParser. The configuration description keeps
description of all the options available in Airflow (description follow config.yaml.schema).
:param default_config: default configuration (in the form of ini file).
:param configuration_description: description of configuration to use
"""
def __init__(
self,
default_config: str | None = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.configuration_description = retrieve_configuration_description(include_providers=False)
self.upgraded_values = {}
# For those who would like to use a different data structure to keep defaults:
# We have to keep the default values in a ConfigParser rather than in any other
# data structure, because the values we have might contain %% which are ConfigParser
# interpolation placeholders. The _default_values config parser will interpolate them
# properly when we call get() on it.
self._default_values = create_default_config_parser(self.configuration_description)
self._pre_2_7_default_values = create_pre_2_7_defaults()
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
self.is_validated = False
self._suppress_future_warnings = False
self._providers_configuration_loaded = False
def _update_logging_deprecated_template_to_one_from_defaults(self):
default = self.get_default_value("logging", "log_filename_template")
if default:
# Tuple does not support item assignment, so we have to create a new tuple and replace it
original_replacement = self.deprecated_values["logging"]["log_filename_template"]
self.deprecated_values["logging"]["log_filename_template"] = (
original_replacement[0],
default,
original_replacement[2],
)
def is_template(self, section: str, key) -> bool:
"""
Return whether the value is templated.
:param section: section of the config
:param key: key in the section
:return: True if the value is templated
"""
if self.configuration_description is None:
return False
return _is_template(self.configuration_description, section, key)
def _update_defaults_from_string(self, config_string: str):
"""
Update the defaults in _default_values based on values in config_string ("ini" format).
Note that those values are not validated and cannot contain variables because we are using
regular config parser to load them. This method is used to test the config parser in unit tests.
:param config_string: ini-formatted config string
"""
parser = ConfigParser()
parser.read_string(config_string)
for section in parser.sections():
if section not in self._default_values.sections():
self._default_values.add_section(section)
errors = False
for key, value in parser.items(section):
if not self.is_template(section, key) and "{" in value:
errors = True
log.error(
f"The {section}.{key} value {value} read from string contains "
"variable. This is not supported"
)
self._default_values.set(section, key, value)
if errors:
raise Exception(
f"The string config passed as default contains variables. "
f"This is not supported. String config: {config_string}"
)
def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any:
"""
Retrieve default value from default config parser.
This will retrieve the default value from the default config parser. Optionally a raw, stored
value can be retrieved by setting skip_interpolation to True. This is useful for example when
we want to write the default value to a file, and we don't want the interpolation to happen
as it is going to be done later when the config is read.
:param section: section of the config
:param key: key to use
:param fallback: fallback value to use
:param raw: if raw, then interpolation will be reversed
:param kwargs: other args
:return:
"""
value = self._default_values.get(section, key, fallback=fallback, **kwargs)
if raw and value is not None:
return value.replace("%", "%%")
return value
def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any:
"""Get pre 2.7 default config values."""
return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs)
# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
# These configs can also be fetched from Secrets backend
# following the "{section}__{name}__secret" pattern
@functools.cached_property
def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006
if self.configuration_description is None:
return (
_get_empty_set_for_configuration()
) # we can't use set() here because set is defined below # ¯\_(ツ)_/¯
flattened = {
(s, k): item
for s, s_c in self.configuration_description.items()
for k, item in s_c.get("options").items() # type: ignore[union-attr]
}
sensitive = {
(section.lower(), key.lower())
for (section, key), v in flattened.items()
if v.get("sensitive") is True
}
depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
depr_section = {
(self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
}
sensitive.update(depr_section, depr_option)
return sensitive
# A mapping of (new section, new option) -> (old section, old option, since_version).
# When reading new option, the old option will be checked to see if it exists. If it does a
# DeprecationWarning will be issued and the old option will be used instead
deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"),
("logging", "interleave_timestamp_parser"): ("core", "interleave_timestamp_parser", "2.6.1"),
("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"),
("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"),
("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"),
("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"),
("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"),
("logging", "logging_level"): ("core", "logging_level", "2.0.0"),
("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"),
("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"),
("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"),
("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"),
("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"),
("logging", "log_format"): ("core", "log_format", "2.0.0"),
("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"),
("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"),
("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"),
("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"),
("logging", "dag_processor_manager_log_location"): (
"core",
"dag_processor_manager_log_location",
"2.0.0",
),
("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"),
("metrics", "metrics_allow_list"): ("metrics", "statsd_allow_list", "2.6.0"),
("metrics", "metrics_block_list"): ("metrics", "statsd_block_list", "2.6.0"),
("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"),
("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"),
("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"),
("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"),
("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"),
("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"),
("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"),
("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"),
("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"),
("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"),
("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"),
("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"),
("operators", "default_queue"): ("celery", "default_queue", "2.1.0"),
("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"),
("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"),
("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"),
("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"),
("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"),
("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"),
("api", "auth_backends"): ("api", "auth_backend", "2.3.0"),
("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"),
("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"),
("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"),
("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"),
("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"),
("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"),
("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"),
("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"),
("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"),
("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"),
("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"),
("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"),
("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
("scheduler", "task_queued_timeout_check_interval"): (
"kubernetes_executor",
"worker_pods_pending_timeout_check_interval",
"2.6.0",
),
}
# A mapping of new configurations to a list of old configurations for when one configuration
# deprecates more than one other deprecation. The deprecation logic for these configurations
# is defined in SchedulerJobRunner.
many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = {
("scheduler", "task_queued_timeout"): [
("celery", "stalled_task_timeout", "2.6.0"),
("celery", "task_adoption_timeout", "2.6.0"),
("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"),
]
}
# A mapping of new section -> (old section, since_version).
deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")}
# Now build the inverse so we can go from old_section/old_key to new_section/new_key
# if someone tries to retrieve it based on old_section/old_key
@functools.cached_property
def inversed_deprecated_options(self):
return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
@functools.cached_property
def inversed_deprecated_sections(self):
return {
old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
}
# A mapping of old default values that we want to change and warn the user
# about. Mapping of section -> setting -> { old, replace, by_version }
deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = {
"core": {
"hostname_callable": (re2.compile(r":"), r".", "2.1"),
},
"webserver": {
"navbar_color": (re2.compile(r"(?i)\A#007A87\z"), "#fff", "2.1"),
"dag_default_view": (re2.compile(r"^tree$"), "grid", "3.0"),
},
"email": {
"email_backend": (
re2.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"),
r"airflow.providers.sendgrid.utils.emailer.send_email",
"2.1",
),
},
"logging": {
"log_filename_template": (
re2.compile(re2.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")),
# The actual replacement value will be updated after defaults are loaded from config.yml
"XX-set-after-default-config-loaded-XX",
"3.0",
),
},
"api": {
"auth_backends": (
re2.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
"airflow.api.auth.backend.session",
"3.0",
),
},
"elasticsearch": {
"log_id_template": (
re2.compile("^" + re2.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"),
"{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
"3.0",
)
},
}
_available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
enums_options = {
("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"],
("logging", "logging_level"): _available_logging_levels,
("logging", "fab_logging_level"): _available_logging_levels,
# celery_logging_level can be empty, which uses logging_level as fallback
("logging", "celery_logging_level"): [*_available_logging_levels, ""],
("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", ""],
}
upgraded_values: dict[tuple[str, str], str]
"""Mapping of (section,option) to the old value that was upgraded"""
def get_sections_including_defaults(self) -> list[str]:
"""
Retrieve all sections from the configuration parser, including sections defined by built-in defaults.
:return: list of section names
"""
return list(dict.fromkeys(itertools.chain(self.configuration_description, self.sections())))
def get_options_including_defaults(self, section: str) -> list[str]:
"""
Retrieve all possible option from the configuration parser for the section given.
Includes options defined by built-in defaults.
:return: list of option names for the section given
"""
my_own_options = self.options(section) if self.has_section(section) else []
all_options_from_defaults = self.configuration_description.get(section, {}).get("options", {})
return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options)))
def optionxform(self, optionstr: str) -> str:
"""
Transform option names on every read, get, or set operation.
This changes from the default behaviour of ConfigParser from lower-casing
to instead be case-preserving.
:param optionstr:
:return:
"""
return optionstr
@contextmanager
def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]:
"""
Make sure configuration is loaded with or without providers.
This happens regardless if the provider configuration has been loaded before or not.
Restores configuration to the state before entering the context.
:param with_providers: whether providers should be loaded
"""
reload_providers_when_leaving = False
if with_providers and not self._providers_configuration_loaded:
# make sure providers are initialized
from airflow.providers_manager import ProvidersManager
# run internal method to initialize providers configuration in ordered to not trigger the
# initialize_providers_configuration cache (because we will be unloading it now
ProvidersManager()._initialize_providers_configuration()
elif not with_providers and self._providers_configuration_loaded:
reload_providers_when_leaving = True
self.restore_core_default_configuration()
yield
if reload_providers_when_leaving:
self.load_providers_configuration()
@staticmethod
def _write_section_header(
file: IO[str],
include_descriptions: bool,
section_config_description: dict[str, str],
section_to_write: str,
) -> None:
"""Write header for configuration section."""
file.write(f"[{section_to_write}]\n")
section_description = section_config_description.get("description")
if section_description and include_descriptions:
for line in section_description.splitlines():
file.write(f"# {line}\n")
file.write("\n")
def _write_option_header(
self,
file: IO[str],
option: str,
extra_spacing: bool,
include_descriptions: bool,
include_env_vars: bool,
include_examples: bool,
include_sources: bool,
section_config_description: dict[str, dict[str, Any]],
section_to_write: str,
sources_dict: ConfigSourcesType,
) -> tuple[bool, bool]:
"""
Write header for configuration option.
Returns tuple of (should_continue, needs_separation) where needs_separation should be
set if the option needs additional separation to visually separate it from the next option.
"""
from airflow import __version__ as airflow_version
option_config_description = (
section_config_description.get("options", {}).get(option, {})
if section_config_description
else {}
)
version_added = option_config_description.get("version_added")
if version_added is not None and parse_version(version_added) > parse_version(
parse_version(airflow_version).base_version
):
# skip if option is going to be added in the future version
return False, False
description = option_config_description.get("description")
needs_separation = False
if description and include_descriptions:
for line in description.splitlines():
file.write(f"# {line}\n")
needs_separation = True
example = option_config_description.get("example")
if example is not None and include_examples:
if extra_spacing:
file.write("#\n")
file.write(f"# Example: {option} = {example}\n")
needs_separation = True
if include_sources and sources_dict:
sources_section = sources_dict.get(section_to_write)
value_with_source = sources_section.get(option) if sources_section else None
if value_with_source is None:
file.write("#\n# Source: not defined\n")
else:
file.write(f"#\n# Source: {value_with_source[1]}\n")
needs_separation = True
if include_env_vars:
file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n")
if extra_spacing:
file.write("#\n")
needs_separation = True
return True, needs_separation
def _write_value(
self,
file: IO[str],
option: str,
comment_out_everything: bool,
needs_separation: bool,
only_defaults: bool,
section_to_write: str,
):
if self._default_values is None:
default_value = None
else:
default_value = self.get_default_value(section_to_write, option, raw=True)
if only_defaults:
value = default_value
else:
value = self.get(section_to_write, option, fallback=default_value, raw=True)
if value is None:
file.write(f"# {option} = \n")
else:
if comment_out_everything:
file.write(f"# {option} = {value}\n")
else:
file.write(f"{option} = {value}\n")
if needs_separation:
file.write("\n")
def write( # type: ignore[override]
self,
file: IO[str],
section: str | None = None,
include_examples: bool = True,
include_descriptions: bool = True,
include_sources: bool = True,
include_env_vars: bool = True,
include_providers: bool = True,
comment_out_everything: bool = False,
hide_sensitive_values: bool = False,
extra_spacing: bool = True,
only_defaults: bool = False,
**kwargs: Any,
) -> None:
"""
Write configuration with comments and examples to a file.
:param file: file to write to
:param section: section of the config to write, defaults to all sections
:param include_examples: Include examples in the output
:param include_descriptions: Include descriptions in the output
:param include_sources: Include the source of each config option
:param include_env_vars: Include environment variables corresponding to each config option
:param include_providers: Include providers configuration
:param comment_out_everything: Comment out all values
:param hide_sensitive_values: Include sensitive values in the output
:param extra_spacing: Add extra spacing before examples and after variables
:param only_defaults: Only include default values when writing the config, not the actual values
"""
sources_dict = {}
if include_sources:
sources_dict = self.as_dict(display_source=True)
if self._default_values is None:
raise RuntimeError("Cannot write default config, no default config set")
if self.configuration_description is None:
raise RuntimeError("Cannot write default config, no default configuration description set")
with self.make_sure_configuration_loaded(with_providers=include_providers):
for section_to_write in self.get_sections_including_defaults():
section_config_description = self.configuration_description.get(section_to_write, {})
if section_to_write != section and section is not None:
continue
if self._default_values.has_section(section_to_write) or self.has_section(section_to_write):
self._write_section_header(
file, include_descriptions, section_config_description, section_to_write
)
for option in self.get_options_including_defaults(section_to_write):
should_continue, needs_separation = self._write_option_header(
file=file,
option=option,
extra_spacing=extra_spacing,
include_descriptions=include_descriptions,
include_env_vars=include_env_vars,
include_examples=include_examples,
include_sources=include_sources,
section_config_description=section_config_description,
section_to_write=section_to_write,
sources_dict=sources_dict,
)
self._write_value(
file=file,
option=option,
comment_out_everything=comment_out_everything,
needs_separation=needs_separation,
only_defaults=only_defaults,
section_to_write=section_to_write,
)
if include_descriptions and not needs_separation:
# extra separation between sections in case last option did not need it
file.write("\n")
def restore_core_default_configuration(self) -> None:
"""Restore default configuration for core Airflow.
It does not restore configuration for providers. If you want to restore configuration for
providers, you need to call ``load_providers_configuration`` method.
"""
self.configuration_description = retrieve_configuration_description(include_providers=False)
self._default_values = create_default_config_parser(self.configuration_description)
self._providers_configuration_loaded = False
def validate(self):
self._validate_sqlite3_version()
self._validate_enums()
for section, replacement in self.deprecated_values.items():
for name, info in replacement.items():
old, new, version = info
current_value = self.get(section, name, fallback="")
if self._using_old_value(old, current_value):
self.upgraded_values[(section, name)] = current_value
new_value = old.sub(new, current_value)
self._update_env_var(section=section, name=name, new_value=new_value)
self._create_future_warning(
name=name,
section=section,
current_value=current_value,
new_value=new_value,
version=version,
)
self._upgrade_auth_backends()
self._upgrade_postgres_metastore_conn()
self.is_validated = True
def _upgrade_auth_backends(self):
"""
Ensure a custom auth_backends setting contains session.
This is required by the UI for ajax queries.
"""
old_value = self.get("api", "auth_backends", fallback="")
if old_value in ("airflow.api.auth.backend.default", ""):
# handled by deprecated_values
pass
elif old_value.find("airflow.api.auth.backend.session") == -1:
new_value = old_value + ",airflow.api.auth.backend.session"
self._update_env_var(section="api", name="auth_backends", new_value=new_value)
self.upgraded_values[("api", "auth_backends")] = old_value
# if the old value is set via env var, we need to wipe it
# otherwise, it'll "win" over our adjusted value
old_env_var = self._env_var_name("api", "auth_backend")
os.environ.pop(old_env_var, None)
warnings.warn(
"The auth_backends setting in [api] has had airflow.api.auth.backend.session added "
"in the running config, which is needed by the UI. Please update your config before "
"Apache Airflow 3.0.",
FutureWarning,
)
def _upgrade_postgres_metastore_conn(self):
"""
Upgrade SQL schemas.
As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
must be replaced with `postgresql`.
"""
section, key = "database", "sql_alchemy_conn"
old_value = self.get(section, key, _extra_stacklevel=1)
bad_schemes = ["postgres+psycopg2", "postgres"]
good_scheme = "postgresql"
parsed = urlsplit(old_value)
if parsed.scheme in bad_schemes:
warnings.warn(
f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. "
"As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
f"change to `{good_scheme}` before the next Airflow release.",
FutureWarning,
)
self.upgraded_values[(section, key)] = old_value
new_value = re2.sub("^" + re2.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
self._update_env_var(section=section, name=key, new_value=new_value)
# if the old value is set via env var, we need to wipe it
# otherwise, it'll "win" over our adjusted value
old_env_var = self._env_var_name("core", key)
os.environ.pop(old_env_var, None)
def _validate_enums(self):
"""Validate that enum type config has an accepted value."""
for (section_key, option_key), enum_options in self.enums_options.items():
if self.has_option(section_key, option_key):
value = self.get(section_key, option_key, fallback=None)
if value and value not in enum_options:
raise AirflowConfigException(
f"`[{section_key}] {option_key}` should not be "
f"{value!r}. Possible values: {', '.join(enum_options)}."
)
def _validate_sqlite3_version(self):
"""Validate SQLite version.
Some features in storing rendered fields require SQLite >= 3.15.0.
"""
if "sqlite" not in self.get("database", "sql_alchemy_conn"):
return
import sqlite3
min_sqlite_version = (3, 15, 0)
if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
return
from airflow.utils.docs import get_docs_url
min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
raise AirflowConfigException(
f"error: SQLite C library too old (< {min_sqlite_version_str}). "
f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
)
def _using_old_value(self, old: Pattern, current_value: str) -> bool:
return old.search(current_value) is not None
def _update_env_var(self, section: str, name: str, new_value: str):
env_var = self._env_var_name(section, name)
# Set it as an env var so that any subprocesses keep the same override!
os.environ[env_var] = new_value
@staticmethod
def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str):
warnings.warn(
f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
f"This value has been changed to {new_value!r} in the running config, but "
f"please update your config before Apache Airflow {version}.",
FutureWarning,
)
def _env_var_name(self, section: str, key: str) -> str:
return f"{ENV_VAR_PREFIX}{section.replace('.', '_').upper()}__{key.upper()}"
def _get_env_var_option(self, section: str, key: str):
# must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
env_var = self._env_var_name(section, key)
if env_var in os.environ:
return expand_env_var(os.environ[env_var])
# alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
env_var_cmd = env_var + "_CMD"
if env_var_cmd in os.environ:
# if this is a valid command key...
if (section, key) in self.sensitive_config_values:
return run_command(os.environ[env_var_cmd])
# alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
env_var_secret_path = env_var + "_SECRET"
if env_var_secret_path in os.environ:
# if this is a valid secret path...
if (section, key) in self.sensitive_config_values:
return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])
return None
def _get_cmd_option(self, section: str, key: str):
fallback_key = key + "_cmd"
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
command = super().get(section, fallback_key)
return run_command(command)
return None
def _get_cmd_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> str | None:
fallback_key = key + "_cmd"
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
command_value = section_dict.get(fallback_key)
if command_value is not None:
if isinstance(command_value, str):
command = command_value
else:
command = command_value[0]
return run_command(command)
return None
def _get_secret_option(self, section: str, key: str) -> str | None:
"""Get Config option values from Secret Backend."""
fallback_key = key + "_secret"
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
secrets_path = super().get(section, fallback_key)
return _get_config_value_from_secret_backend(secrets_path)
return None
def _get_secret_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> str | None:
fallback_key = key + "_secret"
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
secrets_path_value = section_dict.get(fallback_key)
if secrets_path_value is not None:
if isinstance(secrets_path_value, str):
secrets_path = secrets_path_value
else:
secrets_path = secrets_path_value[0]
return _get_config_value_from_secret_backend(secrets_path)
return None
def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
value = self.get(section, key, _extra_stacklevel=1, **kwargs)
if value is None:
raise ValueError(f"The value {section}/{key} should be set!")
return value
@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str:
...
@overload # type: ignore[override]
def get(self, section: str, key: str, **kwargs) -> str | None:
...
def get( # type: ignore[override,misc]
self,
section: str,
key: str,
suppress_warnings: bool = False,
_extra_stacklevel: int = 0,
**kwargs,
) -> str | None:
section = section.lower()
key = key.lower()
warning_emitted = False
deprecated_section: str | None
deprecated_key: str | None
option_description = self.configuration_description.get(section, {}).get(key, {})
if option_description.get("deprecated"):
deprecation_reason = option_description.get("deprecation_reason", "")
warnings.warn(
f"The '{key}' option in section {section} is deprecated. {deprecation_reason}",
DeprecationWarning,
stacklevel=2 + _extra_stacklevel,
)
# For when we rename whole sections
if section in self.inversed_deprecated_sections:
deprecated_section, deprecated_key = (section, key)
section = self.inversed_deprecated_sections[section]
if not self._suppress_future_warnings:
warnings.warn(
f"The config section [{deprecated_section}] has been renamed to "
f"[{section}]. Please update your `conf.get*` call to use the new name",
FutureWarning,
stacklevel=2 + _extra_stacklevel,
)
# Don't warn about individual rename if the whole section is renamed
warning_emitted = True
elif (section, key) in self.inversed_deprecated_options:
# Handle using deprecated section/key instead of the new section/key
new_section, new_key = self.inversed_deprecated_options[(section, key)]
if not self._suppress_future_warnings and not warning_emitted:
warnings.warn(
f"section/key [{section}/{key}] has been deprecated, you should use"
f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
"new name",
FutureWarning,
stacklevel=2 + _extra_stacklevel,
)
warning_emitted = True
deprecated_section, deprecated_key = section, key
section, key = (new_section, new_key)
elif section in self.deprecated_sections:
# When accessing the new section name, make sure we check under the old config name
deprecated_key = key
deprecated_section = self.deprecated_sections[section][0]
else:
deprecated_section, deprecated_key, _ = self.deprecated_options.get(
(section, key), (None, None, None)
)
# first check environment variables
option = self._get_environment_variables(
deprecated_key,
deprecated_section,
key,
section,
issue_warning=not warning_emitted,
extra_stacklevel=_extra_stacklevel,
)
if option is not None:
return option
# ...then the config file