Skip to content

Commit

Permalink
Improve importing the module in Airflow utils package (#33803)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Aug 27, 2023
1 parent 97091b1 commit 58d8577
Show file tree
Hide file tree
Showing 16 changed files with 65 additions and 37 deletions.
7 changes: 4 additions & 3 deletions airflow/utils/airflow_flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
# under the License.
from __future__ import annotations

from typing import Any, cast
from typing import TYPE_CHECKING, Any, cast

from flask import Flask

from airflow.models.dagbag import DagBag
from airflow.www.extensions.init_appbuilder import AirflowAppBuilder
if TYPE_CHECKING:
from airflow.models.dagbag import DagBag
from airflow.www.extensions.init_appbuilder import AirflowAppBuilder


class AirflowApp(Flask):
Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import re2
from sqlalchemy import select
from sqlalchemy.orm import Session

from airflow import settings
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
Expand All @@ -45,6 +44,8 @@
T = TypeVar("T", bound=Callable)

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.models.dag import DAG

logger = logging.getLogger(__name__)
Expand Down
8 changes: 6 additions & 2 deletions airflow/utils/dag_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
# under the License.
from __future__ import annotations

from airflow.models import Operator
from typing import TYPE_CHECKING

from airflow.models.abstractoperator import AbstractOperator
from airflow.models.dag import DAG

if TYPE_CHECKING:
from airflow.models import Operator
from airflow.models.dag import DAG


def dag_edges(dag: DAG):
Expand Down
12 changes: 8 additions & 4 deletions airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,28 @@
import os
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any
from typing import TYPE_CHECKING, Any

from pendulum import DateTime
from sqlalchemy import and_, column, false, func, inspect, select, table, text
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import Query, Session, aliased
from sqlalchemy.orm import aliased
from sqlalchemy.sql.expression import ClauseElement, Executable, tuple_

from airflow import AirflowException
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.models import Base
from airflow.utils import timezone
from airflow.utils.db import reflect_tables
from airflow.utils.helpers import ask_yesno
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from pendulum import DateTime
from sqlalchemy.orm import Query, Session

from airflow.models import Base

logger = logging.getLogger(__file__)

ARCHIVE_TABLE_PREFIX = "_airflow_deleted__"
Expand Down
12 changes: 7 additions & 5 deletions airflow/utils/dot_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@
"""Renderer DAG (tasks and dependencies) to the graphviz object."""
from __future__ import annotations

from typing import Any
from typing import TYPE_CHECKING, Any

import graphviz

from airflow import AirflowException
from airflow.models import TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskmixin import DependencyMixin
from airflow.serialization.serialized_objects import DagDependency
from airflow.utils.dag_edges import dag_edges
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup

if TYPE_CHECKING:
from airflow.models import TaskInstance
from airflow.models.dag import DAG
from airflow.models.taskmixin import DependencyMixin
from airflow.serialization.serialized_objects import DagDependency


def _refine_color(color: str):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.utils.context import Context
from airflow.utils.module_loading import import_string
from airflow.utils.types import NOTSET

if TYPE_CHECKING:
import jinja2

from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context

KEY_REGEX = re.compile(r"^[\w.-]+$")
GROUP_KEY_REGEX = re.compile(r"^[\w-]+$")
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/log/colored_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
from __future__ import annotations

import sys
from logging import LogRecord
from typing import Any
from typing import TYPE_CHECKING, Any

import re2
from colorlog import TTYColoredFormatter
from colorlog.escape_codes import esc, escape_codes

from airflow.utils.log.timezone_aware import TimezoneAware

if TYPE_CHECKING:
from logging import LogRecord

DEFAULT_COLORS = {
"DEBUG": "green",
"INFO": "",
Expand Down
10 changes: 6 additions & 4 deletions airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
import logging
import time
from functools import cached_property
from typing import Iterator

from sqlalchemy.orm.session import Session
from typing import TYPE_CHECKING, Iterator

from airflow.configuration import conf
from airflow.models.taskinstance import TaskInstance
from airflow.utils.helpers import render_log_filename
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.models.taskinstance import TaskInstance


class TaskLogReader:
"""Task log reader."""
Expand Down
7 changes: 5 additions & 2 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import logging
import sys
from io import IOBase
from logging import Handler, Logger, StreamHandler
from typing import IO, Any, TypeVar, cast
from logging import Handler, StreamHandler
from typing import IO, TYPE_CHECKING, Any, TypeVar, cast

import re2

if TYPE_CHECKING:
from logging import Logger

# 7-bit C1 ANSI escape sequences
ANSI_ESCAPE = re2.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")

Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/log/secrets_masker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@

from airflow import settings
from airflow.compat.functools import cache
from airflow.typing_compat import TypeGuard

if TYPE_CHECKING:
from kubernetes.client import V1EnvVar

from airflow.typing_compat import TypeGuard

Redactable = TypeVar("Redactable", str, "V1EnvVar", Dict[Any, Any], Tuple[Any, ...], List[Any])
Redacted = Union[Redactable, str]

Expand Down
4 changes: 3 additions & 1 deletion airflow/utils/log/trigger_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
from contextvars import ContextVar
from copy import copy
from logging.handlers import QueueHandler
from typing import TYPE_CHECKING

from airflow.utils.log.file_task_handler import FileTaskHandler
if TYPE_CHECKING:
from airflow.utils.log.file_task_handler import FileTaskHandler

ctx_task_instance: ContextVar = ContextVar("task_instance")
ctx_trigger_id: ContextVar = ContextVar("trigger_id")
Expand Down
5 changes: 2 additions & 3 deletions airflow/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
from __future__ import annotations

import multiprocessing
import multiprocessing.context
import typing

from airflow.configuration import conf
from airflow.utils.context import Context

if typing.TYPE_CHECKING:
import multiprocessing.context

from airflow.models.operator import Operator
from airflow.utils.context import Context


class MultiprocessingStartMethodMixin:
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/module_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import pkgutil
from importlib import import_module
from types import ModuleType
from typing import Callable
from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from types import ModuleType


def import_string(dotted_path: str):
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
from inspect import signature
from typing import Callable, Generator, TypeVar, cast

from sqlalchemy.orm import Session as SASession

from airflow import settings
from airflow.typing_compat import ParamSpec


@contextlib.contextmanager
def create_session() -> Generator[settings.SASession, None, None]:
def create_session() -> Generator[SASession, None, None]:
"""Contextmanager that will create and teardown a session."""
Session = getattr(settings, "Session", None)
if Session is None:
Expand Down Expand Up @@ -83,4 +85,4 @@ def wrapper(*args, **kwargs) -> RT:
# the 'session' argument to be of type Session instead of Session | None,
# making it easier to type hint the function body without dealing with the None
# case that can never happen at runtime.
NEW_SESSION: settings.SASession = cast(settings.SASession, None)
NEW_SESSION: SASession = cast(SASession, None)
8 changes: 5 additions & 3 deletions airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
from dateutil import relativedelta
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
from sqlalchemy.dialects import mssql, mysql
from sqlalchemy.exc import OperationalError
from sqlalchemy.sql import ColumnElement, Select
from sqlalchemy.types import JSON, Text, TypeDecorator, TypeEngine, UnicodeText
from sqlalchemy.sql import Select
from sqlalchemy.types import JSON, Text, TypeDecorator, UnicodeText

from airflow import settings
from airflow.configuration import conf
Expand All @@ -39,8 +38,11 @@

if TYPE_CHECKING:
from kubernetes.client.models.v1_pod import V1Pod
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql import ColumnElement
from sqlalchemy.sql.expression import ColumnOperators
from sqlalchemy.types import TypeEngine

log = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
DuplicateTaskIdFound,
TaskAlreadyInTaskGroup,
)
from airflow.models.taskmixin import DAGNode, DependencyMixin
from airflow.models.taskmixin import DAGNode
from airflow.serialization.enums import DagAttributeTypes
from airflow.utils.helpers import validate_group_key

Expand All @@ -45,6 +45,7 @@
from airflow.models.dag import DAG
from airflow.models.expandinput import ExpandInput
from airflow.models.operator import Operator
from airflow.models.taskmixin import DependencyMixin
from airflow.utils.edgemodifier import EdgeModifier


Expand Down

0 comments on commit 58d8577

Please sign in to comment.