Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-39791 native hooks for importlib.resources.files #20576

Merged
merged 12 commits into from
Jun 8, 2020
Merged
26 changes: 2 additions & 24 deletions Lib/importlib/_bootstrap_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,32 +982,10 @@ def get_data(self, path):
with _io.FileIO(path, 'r') as file:
return file.read()

# ResourceReader ABC API.

@_check_name
def get_resource_reader(self, module):
if self.is_package(module):
return self
return None

def open_resource(self, resource):
path = _path_join(_path_split(self.path)[0], resource)
return _io.FileIO(path, 'r')

def resource_path(self, resource):
if not self.is_resource(resource):
raise FileNotFoundError
path = _path_join(_path_split(self.path)[0], resource)
return path

def is_resource(self, name):
if path_sep in name:
return False
path = _path_join(_path_split(self.path)[0], name)
return _path_isfile(path)

def contents(self):
return iter(_os.listdir(_path_split(self.path)[0]))
from importlib.readers import FileReader
return FileReader(self)


class SourceFileLoader(FileLoader, SourceLoader):
Expand Down
82 changes: 63 additions & 19 deletions Lib/importlib/_common.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,82 @@
import os
import pathlib
import zipfile
import tempfile
import functools
import contextlib
import types
import importlib

from typing import Union, Any, Optional
from .abc import ResourceReader

def from_package(package):
Package = Union[types.ModuleType, str]


def files(package):
"""
Return a Traversable object for the given package.
Get a Traversable resource from a package
"""
return from_package(get_package(package))


def normalize_path(path):
# type: (Any) -> str
"""Normalize a path by ensuring it is a string.

If the resulting string contains path separators, an exception is raised.
"""
spec = package.__spec__
return from_traversable_resources(spec) or fallback_resources(spec)
str_path = str(path)
parent, file_name = os.path.split(str_path)
if parent:
raise ValueError('{!r} must be only a file name'.format(path))
return file_name


def from_traversable_resources(spec):
def get_resource_reader(package):
# type: (types.ModuleType) -> Optional[ResourceReader]
"""
If the spec.loader implements TraversableResources,
directly or implicitly, it will have a ``files()`` method.
Return the package's loader if it's a ResourceReader.
"""
with contextlib.suppress(AttributeError):
return spec.loader.files()
# We can't use
# a issubclass() check here because apparently abc.'s __subclasscheck__()
# hook wants to create a weak reference to the object, but
# zipimport.zipimporter does not support weak references, resulting in a
# TypeError. That seems terrible.
spec = package.__spec__
reader = getattr(spec.loader, 'get_resource_reader', None)
if reader is None:
return None
return reader(spec.name)


def fallback_resources(spec):
package_directory = pathlib.Path(spec.origin).parent
try:
archive_path = spec.loader.archive
rel_path = package_directory.relative_to(archive_path)
return zipfile.Path(archive_path, str(rel_path) + '/')
except Exception:
pass
return package_directory
def resolve(cand):
# type: (Package) -> types.ModuleType
return (
cand if isinstance(cand, types.ModuleType)
else importlib.import_module(cand)
)


def get_package(package):
# type: (Package) -> types.ModuleType
"""Take a package name or module object and return the module.

Raise an exception if the resolved module is not a package.
"""
resolved = resolve(package)
if resolved.__spec__.submodule_search_locations is None:
raise TypeError('{!r} is not a package'.format(package))
return resolved


def from_package(package):
"""
Return a Traversable object for the given package.

"""
spec = package.__spec__
reader = spec.loader.get_resource_reader(spec.name)
return reader.files()


@contextlib.contextmanager
Expand Down
2 changes: 1 addition & 1 deletion Lib/importlib/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def resource_path(self, resource):
raise FileNotFoundError(resource)

def is_resource(self, path):
return self.files().joinpath(path).isfile()
return self.files().joinpath(path).is_file()

def contents(self):
return (item.name for item in self.files().iterdir())
30 changes: 30 additions & 0 deletions Lib/importlib/readers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import zipfile
import pathlib
from . import abc


class FileReader(abc.TraversableResources):
def __init__(self, loader):
self.path = pathlib.Path(loader.path).parent

def files(self):
return self.path


class ZipReader(FileReader):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably going to decouple FileReader from ZipReader. The only shared functionality is the one-line files method.

def __init__(self, loader, module):
_, _, name = module.rpartition('.')
prefix = loader.prefix.replace('\\', '/') + name + '/'
self.path = zipfile.Path(loader.archive, prefix)

def open_resource(self, resource):
try:
return super().open_resource(resource)
except KeyError as exc:
raise FileNotFoundError(exc.args[0])

def is_resource(self, path):
# workaround for `zipfile.Path.is_file` returning true
# for non-existent paths.
target = self.files().joinpath(path)
return target.is_file() and target.exists()
85 changes: 14 additions & 71 deletions Lib/importlib/resources.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import os

from . import abc as resources_abc
from . import _common
from ._common import as_file
from ._common import as_file, files
from contextlib import contextmanager, suppress
from importlib import import_module
from importlib.abc import ResourceLoader
from io import BytesIO, TextIOWrapper
from pathlib import Path
from types import ModuleType
from typing import ContextManager, Iterable, Optional, Union
from typing import ContextManager, Iterable, Union
from typing import cast
from typing.io import BinaryIO, TextIO

Expand All @@ -33,60 +31,11 @@
Resource = Union[str, os.PathLike]


def _resolve(name) -> ModuleType:
"""If name is a string, resolve to a module."""
if hasattr(name, '__spec__'):
return name
return import_module(name)


def _get_package(package) -> ModuleType:
"""Take a package name or module object and return the module.

If a name, the module is imported. If the resolved module
object is not a package, raise an exception.
"""
module = _resolve(package)
if module.__spec__.submodule_search_locations is None:
raise TypeError('{!r} is not a package'.format(package))
return module


def _normalize_path(path) -> str:
"""Normalize a path by ensuring it is a string.

If the resulting string contains path separators, an exception is raised.
"""
parent, file_name = os.path.split(path)
if parent:
raise ValueError('{!r} must be only a file name'.format(path))
return file_name


def _get_resource_reader(
package: ModuleType) -> Optional[resources_abc.ResourceReader]:
# Return the package's loader if it's a ResourceReader. We can't use
# a issubclass() check here because apparently abc.'s __subclasscheck__()
# hook wants to create a weak reference to the object, but
# zipimport.zipimporter does not support weak references, resulting in a
# TypeError. That seems terrible.
spec = package.__spec__
if hasattr(spec.loader, 'get_resource_reader'):
return cast(resources_abc.ResourceReader,
spec.loader.get_resource_reader(spec.name))
return None


def _check_location(package):
if package.__spec__.origin is None or not package.__spec__.has_location:
raise FileNotFoundError(f'Package has no location {package!r}')


def open_binary(package: Package, resource: Resource) -> BinaryIO:
"""Return a file-like object opened for binary reading of the resource."""
resource = _normalize_path(resource)
package = _get_package(package)
reader = _get_resource_reader(package)
resource = _common.normalize_path(resource)
package = _common.get_package(package)
reader = _common.get_resource_reader(package)
if reader is not None:
return reader.open_resource(resource)
absolute_package_path = os.path.abspath(
Expand Down Expand Up @@ -140,13 +89,6 @@ def read_text(package: Package,
return fp.read()


def files(package: Package) -> resources_abc.Traversable:
"""
Get a Traversable resource from a package
"""
return _common.from_package(_get_package(package))


def path(
package: Package, resource: Resource,
) -> 'ContextManager[Path]':
Expand All @@ -158,17 +100,18 @@ def path(
raised if the file was deleted prior to the context manager
exiting).
"""
reader = _get_resource_reader(_get_package(package))
reader = _common.get_resource_reader(_common.get_package(package))
return (
_path_from_reader(reader, resource)
if reader else
_common.as_file(files(package).joinpath(_normalize_path(resource)))
_common.as_file(
_common.files(package).joinpath(_common.normalize_path(resource)))
)


@contextmanager
def _path_from_reader(reader, resource):
norm_resource = _normalize_path(resource)
norm_resource = _common.normalize_path(resource)
with suppress(FileNotFoundError):
yield Path(reader.resource_path(norm_resource))
return
Expand All @@ -182,9 +125,9 @@ def is_resource(package: Package, name: str) -> bool:

Directories are *not* resources.
"""
package = _get_package(package)
_normalize_path(name)
reader = _get_resource_reader(package)
package = _common.get_package(package)
_common.normalize_path(name)
reader = _common.get_resource_reader(package)
if reader is not None:
return reader.is_resource(name)
package_contents = set(contents(package))
Expand All @@ -200,8 +143,8 @@ def contents(package: Package) -> Iterable[str]:
not considered resources. Use `is_resource()` on each entry returned here
to check if it is a resource or not.
"""
package = _get_package(package)
reader = _get_resource_reader(package)
package = _common.get_package(package)
reader = _common.get_resource_reader(package)
if reader is not None:
return reader.contents()
# Is the package a namespace package? By definition, namespace packages
Expand Down
Loading