diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index b2752c2be7c2..6684fd18e51a 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -82,7 +82,6 @@ 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'filename_template': FILENAME_TEMPLATE, 'filters': ['mask_secrets'], }, 'processor': { diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 2f9b6fa264b1..83260d0d5250 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -54,7 +54,6 @@ base_log_folder = {AIRFLOW_HOME}/logs logging_level = INFO celery_logging_level = WARN fab_logging_level = WARN -log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log log_processor_filename_template = {{{{ filename }}}}.log dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log worker_log_server_port = 8793 diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index d9c4eeb72637..ad0dcdfebdbb 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1209,14 +1209,23 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = NEW_SES return count @provide_session - def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str: + def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate: if self.log_template_id is None: # DagRun created before LogTemplate introduction. - template = session.query(LogTemplate.filename).order_by(LogTemplate.id).limit(1).scalar() + template = session.query(LogTemplate).order_by(LogTemplate.id).first() else: - template = session.query(LogTemplate.filename).filter_by(id=self.log_template_id).scalar() + template = session.query(LogTemplate).get(self.log_template_id) if template is None: raise AirflowException( f"No log_template entry found for ID {self.log_template_id!r}. " f"Please make sure you set up the metadatabase correctly." ) return template + + @provide_session + def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str: + warnings.warn( + "This method is deprecated. Please use get_log_template instead.", + DeprecationWarning, + stacklevel=2, + ) + return self.get_log_template(session=session).filename diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py index d26bfbfd048d..ec61972ffcd4 100644 --- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -36,7 +36,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin): uploads to and reads from OSS remote storage. """ - def __init__(self, base_log_folder, oss_log_folder, filename_template): + def __init__(self, base_log_folder, oss_log_folder, filename_template=None): self.log.info("Using oss_task_handler for remote logging...") super().__init__(base_log_folder, filename_template) (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder) diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index c975a2cb83fc..7d4f81006b38 100644 --- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -17,6 +17,7 @@ # under the License. import sys from datetime import datetime +from typing import Optional import watchtower @@ -42,7 +43,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin): :param filename_template: template for file name (local storage) or log stream name (remote) """ - def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: str): + def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: Optional[str] = None): super().__init__(base_log_folder, filename_template) split_arn = log_group_arn.split(':') diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index 695c4623d97b..0abea94c665c 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -17,6 +17,7 @@ # under the License. import os import sys +from typing import Optional if sys.version_info >= (3, 8): from functools import cached_property @@ -35,7 +36,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): uploads to and reads from S3 remote storage. """ - def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str): + def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: Optional[str] = None): super().__init__(base_log_folder, filename_template) self.remote_base = s3_log_folder self.log_relative_path = '' diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 83c1163d80c8..64fce0df53c1 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -18,6 +18,7 @@ import logging import sys +import warnings from collections import defaultdict from datetime import datetime from operator import attrgetter @@ -31,15 +32,22 @@ from elasticsearch_dsl import Search from airflow.configuration import conf -from airflow.models import TaskInstance +from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.json_formatter import JSONFormatter from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin +from airflow.utils.session import create_session # Elasticsearch hosted log type EsLogMsgType = List[Tuple[str, str]] +# Compatibility: Airflow 2.3.3 and up uses this method, which accesses the +# LogTemplate model to record the log ID template used. If this function does +# not exist, the task handler should use the log_id_template attribute instead. +USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") + class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ @@ -65,8 +73,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix def __init__( self, base_log_folder: str, - filename_template: str, - log_id_template: str, end_of_log_mark: str, write_stdout: bool, json_format: bool, @@ -76,6 +82,9 @@ def __init__( host: str = "localhost:9200", frontend: str = "localhost:5601", es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"), + *, + filename_template: Optional[str] = None, + log_id_template: Optional[str] = None, ): """ :param base_log_folder: base folder to store logs locally @@ -88,7 +97,13 @@ def __init__( self.client = elasticsearch.Elasticsearch([host], **es_kwargs) # type: ignore[attr-defined] - self.log_id_template = log_id_template + if USE_PER_RUN_LOG_ID and log_id_template is not None: + warnings.warn( + "Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect", + DeprecationWarning, + ) + + self.log_id_template = log_id_template # Only used on Airflow < 2.3.2. self.frontend = frontend self.mark_end_on_close = True self.end_of_log_mark = end_of_log_mark @@ -103,7 +118,13 @@ def __init__( self.handler: Union[logging.FileHandler, logging.StreamHandler] # type: ignore[assignment] def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: - dag_run = ti.get_dagrun() + with create_session() as session: + dag_run = ti.get_dagrun(session=session) + if USE_PER_RUN_LOG_ID: + log_id_template = dag_run.get_log_template(session=session).elasticsearch_id + else: + log_id_template = self.log_id_template + dag = ti.task.dag assert dag is not None # For Mypy. try: @@ -126,7 +147,7 @@ def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: data_interval_end = "" execution_date = dag_run.execution_date.isoformat() - return self.log_id_template.format( + return log_id_template.format( dag_id=ti.dag_id, task_id=ti.task_id, run_id=getattr(ti, "run_id", ""), diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 92d133d109af..81f1426d7515 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -67,7 +67,7 @@ def __init__( *, base_log_folder: str, gcs_log_folder: str, - filename_template: str, + filename_template: Optional[str] = None, gcp_key_path: Optional[str] = None, gcp_keyfile_dict: Optional[dict] = None, gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS, diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index 9ec0cdf646fc..f5e89c2c21a6 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -44,8 +44,9 @@ def __init__( base_log_folder: str, wasb_log_folder: str, wasb_container: str, - filename_template: str, delete_local_copy: str, + *, + filename_template: Optional[str] = None, ) -> None: super().__init__(base_log_folder, filename_template) self.wasb_container = wasb_container diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 321e12528876..2c53529a72dc 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -18,6 +18,7 @@ """File logging handler for tasks.""" import logging import os +import warnings from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Optional, Tuple @@ -27,6 +28,7 @@ from airflow.utils.helpers import parse_template_string, render_template_to_string from airflow.utils.jwt_signer import JWTSigner from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler +from airflow.utils.session import create_session if TYPE_CHECKING: from airflow.models import TaskInstance @@ -43,11 +45,15 @@ class FileTaskHandler(logging.Handler): :param filename_template: template filename string """ - def __init__(self, base_log_folder: str, filename_template: str): + def __init__(self, base_log_folder: str, filename_template: Optional[str] = None): super().__init__() self.handler: Optional[logging.FileHandler] = None self.local_base = base_log_folder - self.filename_template, self.filename_jinja_template = parse_template_string(filename_template) + if filename_template is not None: + warnings.warn( + "Passing filename_template to FileTaskHandler is deprecated and has no effect", + DeprecationWarning, + ) def set_context(self, ti: "TaskInstance"): """ @@ -74,15 +80,19 @@ def close(self): self.handler.close() def _render_filename(self, ti: "TaskInstance", try_number: int) -> str: - if self.filename_jinja_template: + with create_session() as session: + dag_run = ti.get_dagrun(session=session) + template = dag_run.get_log_template(session=session).filename + str_tpl, jinja_tpl = parse_template_string(template) + + if jinja_tpl: if hasattr(ti, "task"): context = ti.get_template_context() else: - context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat()) + context = Context(ti=ti, ts=dag_run.logical_date.isoformat()) context["try_number"] = try_number - return render_template_to_string(self.filename_jinja_template, context) - elif self.filename_template: - dag_run = ti.get_dagrun() + return render_template_to_string(jinja_tpl, context) + elif str_tpl: dag = ti.task.dag assert dag is not None # For Mypy. try: @@ -97,7 +107,7 @@ def _render_filename(self, ti: "TaskInstance", try_number: int) -> str: data_interval_end = data_interval[1].isoformat() else: data_interval_end = "" - return self.filename_template.format( + return str_tpl.format( dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id, diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index 396ab90a324f..f241c22df188 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -121,6 +121,6 @@ def render_log_filename( attachment_filename = render_log_filename( ti=ti, try_number="all" if try_number is None else try_number, - filename_template=dagrun.get_log_filename_template(session=session), + filename_template=dagrun.get_log_template(session=session).filename, ) return attachment_filename diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py index 614e1fa3a19d..1b226be96f98 100644 --- a/tests/api_connexion/endpoints/test_log_endpoint.py +++ b/tests/api_connexion/endpoints/test_log_endpoint.py @@ -99,7 +99,7 @@ def setup_attrs(self, configured_app, configure_loggers, dag_maker, session) -> self.ti.hostname = 'localhost' @pytest.fixture - def configure_loggers(self, tmp_path): + def configure_loggers(self, tmp_path, create_log_template): self.log_dir = tmp_path dir_path = tmp_path / self.DAG_ID / self.TASK_ID / self.default_time.replace(':', '.') @@ -112,9 +112,9 @@ def configure_loggers(self, tmp_path): logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) logging_config['handlers']['task']['base_log_folder'] = self.log_dir - logging_config['handlers']['task'][ - 'filename_template' - ] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log' + create_log_template( + '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log' + ) logging.config.dictConfig(logging_config) diff --git a/tests/conftest.py b/tests/conftest.py index 68d318e13c50..b153c213d5f9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,6 +36,9 @@ os.environ["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1" os.environ["CREDENTIALS_DIR"] = os.environ.get('CREDENTIALS_DIR') or "/files/airflow-breeze-config/keys" +from airflow import settings # noqa: E402 +from airflow.models.tasklog import LogTemplate # noqa: E402 + from tests.test_utils.perf.perf_kit.sqlalchemy import ( # noqa isort:skip count_queries, trace_queries, @@ -775,3 +778,39 @@ def session(): with create_session() as session: yield session session.rollback() + + +@pytest.fixture() +def get_test_dag(): + def _get(dag_id): + from airflow.models.dagbag import DagBag + from airflow.models.serialized_dag import SerializedDagModel + + dag_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'dags', f'{dag_id}.py') + dagbag = DagBag(dag_folder=dag_file, include_examples=False) + + dag = dagbag.get_dag(dag_id) + dag.sync_to_db() + SerializedDagModel.write_dag(dag) + + return dag + + return _get + + +@pytest.fixture() +def create_log_template(request): + session = settings.Session() + + def _create_log_template(filename_template, elasticsearch_id=""): + log_template = LogTemplate(filename=filename_template, elasticsearch_id=elasticsearch_id) + session.add(log_template) + session.commit() + + def _delete_log_template(): + session.delete(log_template) + session.commit() + + request.addfinalizer(_delete_log_template) + + return _create_log_template diff --git a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py index 24eb73b92e92..30e8cc32b9b2 100644 --- a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py +++ b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py @@ -35,10 +35,7 @@ class TestOSSTaskHandler(unittest.TestCase): def setUp(self): self.base_log_folder = 'local/airflow/logs/1.log' self.oss_log_folder = f'oss://{MOCK_BUCKET_NAME}/airflow/logs' - self.filename_template = '{try_number}.log' - self.oss_task_handler = OSSTaskHandler( - self.base_log_folder, self.oss_log_folder, self.filename_template - ) + self.oss_task_handler = OSSTaskHandler(self.base_log_folder, self.oss_log_folder) @mock.patch(OSS_TASK_HANDLER_STRING.format('conf.get')) @mock.patch(OSS_TASK_HANDLER_STRING.format('OSSHook')) diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py index dbd2ae28d5ad..8b23218c8cc3 100644 --- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py @@ -16,17 +16,18 @@ # specific language governing permissions and limitations # under the License. import time -import unittest from datetime import datetime as dt from unittest import mock from unittest.mock import ANY, call +import pytest from watchtower import CloudWatchLogHandler from airflow.models import DAG, DagRun, TaskInstance from airflow.operators.empty import EmptyOperator from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudwatchTaskHandler +from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timezone import datetime from tests.test_utils.config import conf_vars @@ -44,19 +45,24 @@ def get_time_str(time_in_milliseconds): return dt_time.strftime("%Y-%m-%d %H:%M:%S,000") -@unittest.skipIf(mock_logs is None, "Skipping test because moto.mock_logs is not available") -@mock_logs -class TestCloudwatchTaskHandler(unittest.TestCase): +@pytest.fixture(autouse=True, scope="module") +def logmock(): + with mock_logs(): + yield + + +@pytest.mark.skipif(mock_logs is None, reason="Skipping test because moto.mock_logs is not available") +class TestCloudwatchTaskHandler: @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'}) - def setUp(self): + @pytest.fixture(autouse=True) + def setup(self, create_log_template): self.remote_log_group = 'log_group_name' self.region_name = 'us-west-2' self.local_log_location = 'local/log/location' - self.filename_template = '{dag_id}/{task_id}/{execution_date}/{try_number}.log' + create_log_template('{dag_id}/{task_id}/{execution_date}/{try_number}.log') self.cloudwatch_task_handler = CloudwatchTaskHandler( self.local_log_location, f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}", - self.filename_template, ) self.cloudwatch_task_handler.hook @@ -65,21 +71,29 @@ def setUp(self): task_id = 'task_for_testing_cloudwatch_log_handler' self.dag = DAG(dag_id=dag_id, start_date=date) task = EmptyOperator(task_id=task_id, dag=self.dag) - dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test") - self.ti = TaskInstance(task=task) + dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + with create_session() as session: + session.add(dag_run) + session.commit() + session.refresh(dag_run) + + self.ti = TaskInstance(task=task, run_id=dag_run.run_id) self.ti.dag_run = dag_run self.ti.try_number = 1 self.ti.state = State.RUNNING - self.remote_log_stream = f'{dag_id}/{task_id}/{date.isoformat()}/{self.ti.try_number}.log'.replace( + self.remote_log_stream = (f'{dag_id}/{task_id}/{date.isoformat()}/{self.ti.try_number}.log').replace( ':', '_' ) moto.moto_api._internal.models.moto_api_backend.reset() self.conn = boto3.client('logs', region_name=self.region_name) - def tearDown(self): + yield + self.cloudwatch_task_handler.handler = None + with create_session() as session: + session.query(DagRun).delete() def test_hook(self): assert isinstance(self.cloudwatch_task_handler.hook, AwsLogsHook) @@ -89,7 +103,6 @@ def test_hook_raises(self): handler = CloudwatchTaskHandler( self.local_log_location, f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}", - self.filename_template, ) with mock.patch.object(handler.log, 'error') as mock_error: diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py index a322f167eccc..d5a5185f758c 100644 --- a/tests/providers/amazon/aws/log/test_s3_task_handler.py +++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py @@ -17,7 +17,6 @@ # under the License. import os -import unittest from unittest import mock from unittest.mock import ANY @@ -28,6 +27,7 @@ from airflow.operators.empty import EmptyOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler +from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timezone import datetime from tests.test_utils.config import conf_vars @@ -40,32 +40,39 @@ mock_s3 = None -@unittest.skipIf(mock_s3 is None, "Skipping test because moto.mock_s3 is not available") -@mock_s3 -class TestS3TaskHandler(unittest.TestCase): +@pytest.fixture(autouse=True, scope="module") +def s3mock(): + with mock_s3(): + yield + + +@pytest.mark.skipif(mock_s3 is None, reason="Skipping test because moto.mock_s3 is not available") +class TestS3TaskHandler: @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'}) - def setUp(self): - super().setUp() + @pytest.fixture(autouse=True) + def setup(self, create_log_template): self.remote_log_base = 's3://bucket/remote/log/location' self.remote_log_location = 's3://bucket/remote/log/location/1.log' self.remote_log_key = 'remote/log/location/1.log' self.local_log_location = 'local/log/location' - self.filename_template = '{try_number}.log' - self.s3_task_handler = S3TaskHandler( - self.local_log_location, self.remote_log_base, self.filename_template - ) + create_log_template('{try_number}.log') + self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base) # Vivfy the hook now with the config override assert self.s3_task_handler.hook is not None date = datetime(2016, 1, 1) self.dag = DAG('dag_for_testing_s3_task_handler', start_date=date) task = EmptyOperator(task_id='task_for_testing_s3_log_handler', dag=self.dag) - dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test") - self.ti = TaskInstance(task=task) + dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="manual") + with create_session() as session: + session.add(dag_run) + session.commit() + session.refresh(dag_run) + + self.ti = TaskInstance(task=task, run_id=dag_run.run_id) self.ti.dag_run = dag_run self.ti.try_number = 1 self.ti.state = State.RUNNING - self.addCleanup(self.dag.clear) self.conn = boto3.client('s3') # We need to create the bucket since this is all in Moto's 'virtual' @@ -73,7 +80,13 @@ def setUp(self): moto.moto_api._internal.models.moto_api_backend.reset() self.conn.create_bucket(Bucket="bucket") - def tearDown(self): + yield + + self.dag.clear() + + with create_session() as session: + session.query(DagRun).delete() + if self.s3_task_handler.handler: try: os.remove(self.s3_task_handler.handler.baseFilename) @@ -86,7 +99,7 @@ def test_hook(self): @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'}) def test_hook_raises(self): - handler = S3TaskHandler(self.local_log_location, self.remote_log_base, self.filename_template) + handler = S3TaskHandler(self.local_log_location, self.remote_log_base) with mock.patch.object(handler.log, 'error') as mock_error: with mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook") as mock_hook: mock_hook.side_effect = Exception('Failed to connect') diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 03eab3dbb78a..e26a78fe77e8 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -58,9 +58,11 @@ class TestElasticsearchTaskHandler: EXECUTION_DATE = datetime(2016, 1, 1) LOG_ID = f'{DAG_ID}-{TASK_ID}-2016-01-01T00:00:00+00:00-1' JSON_LOG_ID = f'{DAG_ID}-{TASK_ID}-{ElasticsearchTaskHandler._clean_date(EXECUTION_DATE)}-1' + FILENAME_TEMPLATE = '{try_number}.log' @pytest.fixture() - def ti(self, create_task_instance): + def ti(self, create_task_instance, create_log_template): + create_log_template(self.FILENAME_TEMPLATE, '{dag_id}-{task_id}-{execution_date}-{try_number}') yield get_ti( dag_id=self.DAG_ID, task_id=self.TASK_ID, @@ -73,8 +75,6 @@ def ti(self, create_task_instance): @elasticmock def setup(self): self.local_log_location = 'local/log/location' - self.filename_template = '{try_number}.log' - self.log_id_template = '{dag_id}-{task_id}-{execution_date}-{try_number}' self.end_of_log_mark = 'end_of_log\n' self.write_stdout = False self.json_format = False @@ -82,15 +82,13 @@ def setup(self): self.host_field = 'host' self.offset_field = 'offset' self.es_task_handler = ElasticsearchTaskHandler( - self.local_log_location, - self.filename_template, - self.log_id_template, - self.end_of_log_mark, - self.write_stdout, - self.json_format, - self.json_fields, - self.host_field, - self.offset_field, + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + json_format=self.json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, ) self.es = elasticsearch.Elasticsearch(hosts=[{'host': 'localhost', 'port': 9200}]) @@ -115,15 +113,13 @@ def test_client_with_config(self): assert es_conf == expected_dict # ensure creating with configs does not fail ElasticsearchTaskHandler( - self.local_log_location, - self.filename_template, - self.log_id_template, - self.end_of_log_mark, - self.write_stdout, - self.json_format, - self.json_fields, - self.host_field, - self.offset_field, + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + json_format=self.json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, es_kwargs=es_conf, ) @@ -395,7 +391,7 @@ def test_close(self, ti): self.es_task_handler.set_context(ti) self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: # end_of_log_mark may contain characters like '\n' which is needed to # have the log uploaded but will not be stored in elasticsearch. @@ -409,7 +405,7 @@ def test_close_no_mark_end(self, ti): self.es_task_handler.set_context(ti) self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert self.end_of_log_mark not in log_file.read() assert self.es_task_handler.closed @@ -419,7 +415,7 @@ def test_close_closed(self, ti): self.es_task_handler.set_context(ti) self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert 0 == len(log_file.read()) @@ -428,7 +424,7 @@ def test_close_with_no_handler(self, ti): self.es_task_handler.handler = None self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert 0 == len(log_file.read()) assert self.es_task_handler.closed @@ -438,7 +434,7 @@ def test_close_with_no_stream(self, ti): self.es_task_handler.handler.stream = None self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert self.end_of_log_mark in log_file.read() assert self.es_task_handler.closed @@ -447,7 +443,7 @@ def test_close_with_no_stream(self, ti): self.es_task_handler.handler.stream.close() self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert self.end_of_log_mark in log_file.read() assert self.es_task_handler.closed @@ -478,15 +474,13 @@ def test_clean_date(self): ) def test_get_external_log_url(self, ti, json_format, es_frontend, expected_url): es_task_handler = ElasticsearchTaskHandler( - self.local_log_location, - self.filename_template, - self.log_id_template, - self.end_of_log_mark, - self.write_stdout, - json_format, - self.json_fields, - self.host_field, - self.offset_field, + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + json_format=json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, frontend=es_frontend, ) url = es_task_handler.get_external_log_url(ti, ti.try_number) @@ -508,8 +502,6 @@ def test_dynamic_offset(self, stdout_mock, ti): # arrange handler = ElasticsearchTaskHandler( base_log_folder=self.local_log_location, - filename_template=self.filename_template, - log_id_template=self.log_id_template, end_of_log_mark=self.end_of_log_mark, write_stdout=True, json_format=True, diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index 6517be8f3124..b443a9f8ec8e 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -49,13 +49,11 @@ def local_log_location(self): yield td @pytest.fixture(autouse=True) - def gcs_task_handler(self, local_log_location): - self.remote_log_base = "gs://bucket/remote/log/location" - self.filename_template = "{try_number}.log" + def gcs_task_handler(self, create_log_template, local_log_location): + create_log_template("{try_number}.log") self.gcs_task_handler = GCSTaskHandler( base_log_folder=local_log_location, - gcs_log_folder=self.remote_log_base, - filename_template=self.filename_template, + gcs_log_folder="gs://bucket/remote/log/location", ) yield self.gcs_task_handler diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py index 4fe967671729..3c92aa78aaa2 100644 --- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py +++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py @@ -22,23 +22,25 @@ from airflow.providers.microsoft.azure.hooks.wasb import WasbHook from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbTaskHandler -from airflow.utils.state import State +from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_runs +DEFAULT_DATE = datetime(2020, 8, 10) + class TestWasbTaskHandler: @pytest.fixture(autouse=True) - def ti(self, create_task_instance): - date = datetime(2020, 8, 10) + def ti(self, create_task_instance, create_log_template): + create_log_template("{try_number}.log") ti = create_task_instance( dag_id='dag_for_testing_wasb_task_handler', task_id='task_for_testing_wasb_log_handler', - execution_date=date, - start_date=date, - dagrun_state=State.RUNNING, - state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + dagrun_state=TaskInstanceState.RUNNING, + state=TaskInstanceState.RUNNING, ) ti.try_number = 1 ti.hostname = 'localhost' @@ -52,12 +54,10 @@ def setup_method(self): self.remote_log_location = 'remote/log/location/1.log' self.local_log_location = 'local/log/location' self.container_name = "wasb-container" - self.filename_template = '{try_number}.log' self.wasb_task_handler = WasbTaskHandler( base_log_folder=self.local_log_location, wasb_log_folder=self.wasb_log_folder, wasb_container=self.container_name, - filename_template=self.filename_template, delete_local_copy=True, ) @@ -68,9 +68,7 @@ def test_hook(self, mock_service): @conf_vars({('logging', 'remote_log_conn_id'): 'wasb_default'}) def test_hook_raises(self): - handler = WasbTaskHandler( - self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True - ) + handler = self.wasb_task_handler with mock.patch.object(handler.log, 'error') as mock_error: with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook: mock_hook.side_effect = AzureHttpError("failed to connect", 404) @@ -120,15 +118,14 @@ def test_wasb_read(self, mock_hook, ti): [{'end_of_log': True}], ) - def test_wasb_read_raises(self): - handler = WasbTaskHandler( - self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True - ) + @mock.patch( + "airflow.providers.microsoft.azure.hooks.wasb.WasbHook", + **{"return_value.read_file.side_effect": AzureHttpError("failed to connect", 404)}, + ) + def test_wasb_read_raises(self, mock_hook): + handler = self.wasb_task_handler with mock.patch.object(handler.log, 'error') as mock_error: - with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook: - mock_hook.return_value.read_file.side_effect = AzureHttpError("failed to connect", 404) - - handler.wasb_read(self.remote_log_location, return_error=True) + handler.wasb_read(self.remote_log_location, return_error=True) mock_error.assert_called_once_with( 'Could not read logs from remote/log/location/1.log', exc_info=True, @@ -164,9 +161,7 @@ def test_write_when_append_is_false(self, mock_hook): ) def test_write_raises(self): - handler = WasbTaskHandler( - self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True - ) + handler = self.wasb_task_handler with mock.patch.object(handler.log, 'error') as mock_error: with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook: mock_hook.return_value.load_string.side_effect = AzureHttpError("failed to connect", 404) diff --git a/tests/task/task_runner/test_task_runner.py b/tests/task/task_runner/test_task_runner.py index ab140e05f598..fc5f3cc89465 100644 --- a/tests/task/task_runner/test_task_runner.py +++ b/tests/task/task_runner/test_task_runner.py @@ -36,6 +36,7 @@ def test_should_have_valid_imports(self, import_path): def test_should_support_core_task_runner(self, mock_subprocess): ti = mock.MagicMock(map_index=-1, run_as_user=None) ti.get_template_context.return_value = {"ti": ti} + ti.get_dagrun.return_value.get_log_template.return_value.filename = "blah" local_task_job = mock.MagicMock(task_instance=ti) task_runner = get_task_runner(local_task_job) diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index 4e6e942741df..9a76ada725d3 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -29,6 +29,7 @@ from airflow import settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DagRun +from airflow.models.tasklog import LogTemplate from airflow.operators.python import PythonOperator from airflow.timetables.base import DataInterval from airflow.utils import timezone @@ -44,6 +45,7 @@ class TestLogView: DAG_ID = "dag_log_reader" TASK_ID = "task_log_reader" DEFAULT_DATE = timezone.datetime(2017, 9, 1) + FILENAME_TEMPLATE = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(':', '.') }}/{{ try_number }}.log" @pytest.fixture(autouse=True) def log_dir(self): @@ -70,9 +72,7 @@ def settings_folder(self): def configure_loggers(self, log_dir, settings_folder): logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) logging_config["handlers"]["task"]["base_log_folder"] = log_dir - logging_config["handlers"]["task"][ - "filename_template" - ] = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(':', '.') }}/{{ try_number }}.log" + logging_config["handlers"]["task"]["filename_template"] = self.FILENAME_TEMPLATE settings_file = os.path.join(settings_folder, "airflow_local_settings.py") with open(settings_file, "w") as handle: new_logging_file = f"LOGGING_CONFIG = {logging_config}" @@ -93,6 +93,10 @@ def prepare_log_files(self, log_dir): @pytest.fixture(autouse=True) def prepare_db(self, create_task_instance): + session = settings.Session() + log_template = LogTemplate(filename=self.FILENAME_TEMPLATE, elasticsearch_id="") + session.add(log_template) + session.commit() ti = create_task_instance( dag_id=self.DAG_ID, task_id=self.TASK_ID, @@ -107,6 +111,8 @@ def prepare_db(self, create_task_instance): yield clear_db_runs() clear_db_dags() + session.delete(log_template) + session.commit() def test_test_read_log_chunks_should_read_one_try(self): task_log_reader = TaskLogReader() diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index f4b4f7b2e31d..28b9c7cf1aa2 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,8 +21,6 @@ import os import re -import pytest - from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, DagRun, TaskInstance from airflow.operators.python import PythonOperator @@ -218,34 +216,37 @@ def task_callable(ti): os.remove(log_filename) -@pytest.fixture() -def filename_rendering_ti(session, create_task_instance): - return create_task_instance( - dag_id='dag_for_testing_filename_rendering', - task_id='task_for_testing_filename_rendering', - run_type=DagRunType.SCHEDULED, - execution_date=DEFAULT_DATE, - session=session, - ) - - class TestFilenameRendering: - def test_python_formatting(self, filename_rendering_ti): - expected_filename = ( - f'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/' - f'{DEFAULT_DATE.isoformat()}/42.log' + def test_python_formatting(self, create_log_template, create_task_instance): + create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log") + filename_rendering_ti = create_task_instance( + dag_id="dag_for_testing_filename_rendering", + task_id="task_for_testing_filename_rendering", + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, ) - fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log') + expected_filename = ( + f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/" + f"{DEFAULT_DATE.isoformat()}/42.log" + ) + fth = FileTaskHandler("") rendered_filename = fth._render_filename(filename_rendering_ti, 42) assert expected_filename == rendered_filename - def test_jinja_rendering(self, filename_rendering_ti): - expected_filename = ( - f'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/' - f'{DEFAULT_DATE.isoformat()}/42.log' + def test_jinja_rendering(self, create_log_template, create_task_instance): + create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log") + filename_rendering_ti = create_task_instance( + dag_id="dag_for_testing_filename_rendering", + task_id="task_for_testing_filename_rendering", + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, ) - fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log') + expected_filename = ( + f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/" + f"{DEFAULT_DATE.isoformat()}/42.log" + ) + fth = FileTaskHandler("") rendered_filename = fth._render_filename(filename_rendering_ti, 42) assert expected_filename == rendered_filename diff --git a/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log b/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log new file mode 100644 index 000000000000..bc10ef788029 --- /dev/null +++ b/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log @@ -0,0 +1 @@ +Log for testing. diff --git a/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log b/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log new file mode 100644 index 000000000000..bc10ef788029 --- /dev/null +++ b/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log @@ -0,0 +1 @@ +Log for testing. diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index 988d28593649..fd136351cf1a 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -85,9 +85,6 @@ def factory(): logging_config['handlers']['task']['base_log_folder'] = str( pathlib.Path(__file__, "..", "..", "test_logs").resolve(), ) - logging_config['handlers']['task'][ - 'filename_template' - ] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log' with tempfile.TemporaryDirectory() as settings_dir: local_settings = pathlib.Path(settings_dir, "airflow_local_settings.py")