Skip to content

Commit

Permalink
Fix compatibility of updated airflow.io with released providers (#3…
Browse files Browse the repository at this point in the history
…6186)

The released providers added support to previous version of the
`airflow.io` - where options were not passed to `get_fs` method
that provides Fsspec compatible FileSystem. However #35820 added
positional "options" parameter when the method is called and it
broke already released providers.

This PR dynamically inspects signature of the get_fs method
and when one parameter is detected, it will skip passing options
to get_fs method call.
  • Loading branch information
potiuk authored Dec 12, 2023
1 parent f8b322d commit 7799c51
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions airflow/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.
from __future__ import annotations

import inspect
import logging
from typing import (
TYPE_CHECKING,
Callable,
Mapping,
)

from fsspec.implementations.local import LocalFileSystem
Expand Down Expand Up @@ -49,7 +51,12 @@ def _file(_: str | None, storage_options: Properties) -> LocalFileSystem:


@cache
def _register_filesystems() -> dict[str, Callable[[str | None, Properties], AbstractFileSystem]]:
def _register_filesystems() -> (
Mapping[
str,
Callable[[str | None, Properties], AbstractFileSystem] | Callable[[str | None], AbstractFileSystem],
]
):
scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy()
with Stats.timer("airflow.io.load_filesystems") as timer:
manager = ProvidersManager()
Expand Down Expand Up @@ -86,7 +93,13 @@ def get_fs(
raise ValueError(f"No filesystem registered for scheme {scheme}") from None

options = storage_options or {}
return fs(conn_id, options)
# MyPy does not recognize dynamic parameters inspection when we call the method, and we have to do
# it for compatibility reasons with already released providers, that's why we need to ignore
# mypy errors here
parameters = inspect.signature(fs).parameters
if len(parameters) == 1:
return fs(conn_id) # type: ignore[call-arg]
return fs(conn_id, options) # type: ignore[call-arg]


def has_fs(scheme: str) -> bool:
Expand Down

0 comments on commit 7799c51

Please sign in to comment.