Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve importing the module in Airflow utils package #33803

Merged
merged 1 commit into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions airflow/utils/airflow_flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
# under the License.
from __future__ import annotations

from typing import Any, cast
from typing import TYPE_CHECKING, Any, cast

from flask import Flask

from airflow.models.dagbag import DagBag
from airflow.www.extensions.init_appbuilder import AirflowAppBuilder
if TYPE_CHECKING:
from airflow.models.dagbag import DagBag
from airflow.www.extensions.init_appbuilder import AirflowAppBuilder


class AirflowApp(Flask):
Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import re2
from sqlalchemy import select
from sqlalchemy.orm import Session

from airflow import settings
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
Expand All @@ -45,6 +44,8 @@
T = TypeVar("T", bound=Callable)

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.models.dag import DAG

logger = logging.getLogger(__name__)
Expand Down
8 changes: 6 additions & 2 deletions airflow/utils/dag_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
# under the License.
from __future__ import annotations

from airflow.models import Operator
from typing import TYPE_CHECKING

from airflow.models.abstractoperator import AbstractOperator
from airflow.models.dag import DAG

if TYPE_CHECKING:
from airflow.models import Operator
from airflow.models.dag import DAG


def dag_edges(dag: DAG):
Expand Down
12 changes: 8 additions & 4 deletions airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,28 @@
import os
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any
from typing import TYPE_CHECKING, Any

from pendulum import DateTime
from sqlalchemy import and_, column, false, func, inspect, select, table, text
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import Query, Session, aliased
from sqlalchemy.orm import aliased
from sqlalchemy.sql.expression import ClauseElement, Executable, tuple_

from airflow import AirflowException
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.models import Base
from airflow.utils import timezone
from airflow.utils.db import reflect_tables
from airflow.utils.helpers import ask_yesno
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from pendulum import DateTime
from sqlalchemy.orm import Query, Session

from airflow.models import Base

logger = logging.getLogger(__file__)

ARCHIVE_TABLE_PREFIX = "_airflow_deleted__"
Expand Down
12 changes: 7 additions & 5 deletions airflow/utils/dot_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@
"""Renderer DAG (tasks and dependencies) to the graphviz object."""
from __future__ import annotations

from typing import Any
from typing import TYPE_CHECKING, Any

import graphviz

from airflow import AirflowException
from airflow.models import TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskmixin import DependencyMixin
from airflow.serialization.serialized_objects import DagDependency
from airflow.utils.dag_edges import dag_edges
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup

if TYPE_CHECKING:
from airflow.models import TaskInstance
from airflow.models.dag import DAG
from airflow.models.taskmixin import DependencyMixin
from airflow.serialization.serialized_objects import DagDependency


def _refine_color(color: str):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.utils.context import Context
from airflow.utils.module_loading import import_string
from airflow.utils.types import NOTSET

if TYPE_CHECKING:
import jinja2

from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context

KEY_REGEX = re.compile(r"^[\w.-]+$")
GROUP_KEY_REGEX = re.compile(r"^[\w-]+$")
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/log/colored_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
from __future__ import annotations

import sys
from logging import LogRecord
from typing import Any
from typing import TYPE_CHECKING, Any

import re2
from colorlog import TTYColoredFormatter
from colorlog.escape_codes import esc, escape_codes

from airflow.utils.log.timezone_aware import TimezoneAware

if TYPE_CHECKING:
from logging import LogRecord

DEFAULT_COLORS = {
"DEBUG": "green",
"INFO": "",
Expand Down
10 changes: 6 additions & 4 deletions airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
import logging
import time
from functools import cached_property
from typing import Iterator

from sqlalchemy.orm.session import Session
from typing import TYPE_CHECKING, Iterator

from airflow.configuration import conf
from airflow.models.taskinstance import TaskInstance
from airflow.utils.helpers import render_log_filename
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.models.taskinstance import TaskInstance


class TaskLogReader:
"""Task log reader."""
Expand Down
7 changes: 5 additions & 2 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import logging
import sys
from io import IOBase
from logging import Handler, Logger, StreamHandler
from typing import IO, Any, TypeVar, cast
from logging import Handler, StreamHandler
from typing import IO, TYPE_CHECKING, Any, TypeVar, cast

import re2

if TYPE_CHECKING:
from logging import Logger

# 7-bit C1 ANSI escape sequences
ANSI_ESCAPE = re2.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")

Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/log/secrets_masker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@

from airflow import settings
from airflow.compat.functools import cache
from airflow.typing_compat import TypeGuard

if TYPE_CHECKING:
from kubernetes.client import V1EnvVar

from airflow.typing_compat import TypeGuard

Redactable = TypeVar("Redactable", str, "V1EnvVar", Dict[Any, Any], Tuple[Any, ...], List[Any])
Redacted = Union[Redactable, str]

Expand Down
4 changes: 3 additions & 1 deletion airflow/utils/log/trigger_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
from contextvars import ContextVar
from copy import copy
from logging.handlers import QueueHandler
from typing import TYPE_CHECKING

from airflow.utils.log.file_task_handler import FileTaskHandler
if TYPE_CHECKING:
from airflow.utils.log.file_task_handler import FileTaskHandler

ctx_task_instance: ContextVar = ContextVar("task_instance")
ctx_trigger_id: ContextVar = ContextVar("trigger_id")
Expand Down
5 changes: 2 additions & 3 deletions airflow/utils/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
from __future__ import annotations

import multiprocessing
import multiprocessing.context
import typing

from airflow.configuration import conf
from airflow.utils.context import Context

if typing.TYPE_CHECKING:
import multiprocessing.context

from airflow.models.operator import Operator
from airflow.utils.context import Context


class MultiprocessingStartMethodMixin:
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/module_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import pkgutil
from importlib import import_module
from types import ModuleType
from typing import Callable
from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from types import ModuleType


def import_string(dotted_path: str):
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
from inspect import signature
from typing import Callable, Generator, TypeVar, cast

from sqlalchemy.orm import Session as SASession

from airflow import settings
from airflow.typing_compat import ParamSpec


@contextlib.contextmanager
def create_session() -> Generator[settings.SASession, None, None]:
def create_session() -> Generator[SASession, None, None]:
"""Contextmanager that will create and teardown a session."""
Session = getattr(settings, "Session", None)
if Session is None:
Expand Down Expand Up @@ -83,4 +85,4 @@ def wrapper(*args, **kwargs) -> RT:
# the 'session' argument to be of type Session instead of Session | None,
# making it easier to type hint the function body without dealing with the None
# case that can never happen at runtime.
NEW_SESSION: settings.SASession = cast(settings.SASession, None)
NEW_SESSION: SASession = cast(SASession, None)
8 changes: 5 additions & 3 deletions airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
from dateutil import relativedelta
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
from sqlalchemy.dialects import mssql, mysql
from sqlalchemy.exc import OperationalError
from sqlalchemy.sql import ColumnElement, Select
from sqlalchemy.types import JSON, Text, TypeDecorator, TypeEngine, UnicodeText
from sqlalchemy.sql import Select
from sqlalchemy.types import JSON, Text, TypeDecorator, UnicodeText

from airflow import settings
from airflow.configuration import conf
Expand All @@ -39,8 +38,11 @@

if TYPE_CHECKING:
from kubernetes.client.models.v1_pod import V1Pod
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql import ColumnElement
from sqlalchemy.sql.expression import ColumnOperators
from sqlalchemy.types import TypeEngine

log = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
DuplicateTaskIdFound,
TaskAlreadyInTaskGroup,
)
from airflow.models.taskmixin import DAGNode, DependencyMixin
from airflow.models.taskmixin import DAGNode
from airflow.serialization.enums import DagAttributeTypes
from airflow.utils.helpers import validate_group_key

Expand All @@ -45,6 +45,7 @@
from airflow.models.dag import DAG
from airflow.models.expandinput import ExpandInput
from airflow.models.operator import Operator
from airflow.models.taskmixin import DependencyMixin
from airflow.utils.edgemodifier import EdgeModifier


Expand Down