Skip to content

Commit

Permalink
refactor: group functions for requirements parsing into a class (#7693)
Browse files Browse the repository at this point in the history
  • Loading branch information
radoering committed May 5, 2023
1 parent 6548666 commit 3248fe1
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 166 deletions.
14 changes: 5 additions & 9 deletions src/poetry/console/commands/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from poetry.console.commands.command import Command
from poetry.console.commands.env_command import EnvCommand
from poetry.utils.dependency_specification import parse_dependency_specification
from poetry.utils.dependency_specification import RequirementsParser


if TYPE_CHECKING:
Expand Down Expand Up @@ -437,14 +437,10 @@ def _parse_requirements(self, requirements: list[str]) -> list[dict[str, Any]]:
except (PyProjectException, RuntimeError):
cwd = Path.cwd()

return [
parse_dependency_specification(
requirement=requirement,
env=self.env if isinstance(self, EnvCommand) else None,
cwd=cwd,
)
for requirement in requirements
]
parser = RequirementsParser(
self.env if isinstance(self, EnvCommand) else None, cwd
)
return [parser.parse(requirement) for requirement in requirements]

def _format_requirements(self, requirements: list[dict[str, str]]) -> Requirements:
requires: Requirements = {}
Expand Down
306 changes: 151 additions & 155 deletions src/poetry/utils/dependency_specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,128 +26,6 @@


DependencySpec = Dict[str, Union[str, bool, Dict[str, Union[str, bool]], List[str]]]


def _parse_dependency_specification_git_url(
requirement: str, env: Env | None = None
) -> DependencySpec | None:
from poetry.core.vcs.git import Git
from poetry.core.vcs.git import ParsedUrl

parsed = ParsedUrl.parse(requirement)
url = Git.normalize_url(requirement)

pair = {"name": parsed.name, "git": url.url}

if parsed.rev:
pair["rev"] = url.revision

if parsed.subdirectory:
pair["subdirectory"] = parsed.subdirectory

source_root = env.path.joinpath("src") if env else None
package = Provider.get_package_from_vcs(
"git",
url=url.url,
rev=pair.get("rev"),
subdirectory=parsed.subdirectory,
source_root=source_root,
)
pair["name"] = package.name
return pair


def _parse_dependency_specification_url(
requirement: str, env: Env | None = None
) -> DependencySpec | None:
url_parsed = urllib.parse.urlparse(requirement)
if not (url_parsed.scheme and url_parsed.netloc):
return None

if url_parsed.scheme in ["git+https", "git+ssh"]:
return _parse_dependency_specification_git_url(requirement, env)

if url_parsed.scheme in ["http", "https"]:
package = Provider.get_package_from_url(requirement)
assert package.source_url is not None
return {"name": package.name, "url": package.source_url}

return None


def _parse_dependency_specification_path(
requirement: str, cwd: Path
) -> DependencySpec | None:
if (os.path.sep in requirement or "/" in requirement) and (
cwd.joinpath(requirement).exists()
or Path(requirement).expanduser().exists()
and Path(requirement).expanduser().is_absolute()
):
path = Path(requirement).expanduser()
is_absolute = path.is_absolute()

if not path.is_absolute():
path = cwd.joinpath(requirement)

if path.is_file():
package = Provider.get_package_from_file(path.resolve())
else:
package = Provider.get_package_from_directory(path.resolve())

return {
"name": package.name,
"path": (
path.relative_to(cwd).as_posix() if not is_absolute else path.as_posix()
),
}

return None


def _parse_dependency_specification_simple(
requirement: str,
) -> DependencySpec | None:
extras: list[str] = []
pair = re.sub("^([^@=: ]+)(?:@|==|(?<![<>~!])=|:| )(.*)$", "\\1 \\2", requirement)
pair = pair.strip()

require: DependencySpec = {}

if " " in pair:
name, version = pair.split(" ", 1)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")

require["name"] = name
if version != "latest":
require["version"] = version
else:
m = re.match(r"^([^><=!: ]+)((?:>=|<=|>|<|!=|~=|~|\^).*)$", requirement.strip())
if m:
name, constraint = m.group(1), m.group(2)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")

require["name"] = name
require["version"] = constraint
else:
extras_m = re.search(r"\[([\w\d,-_]+)\]$", pair)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
pair, _ = pair.split("[")

require["name"] = pair

if extras:
require["extras"] = extras

return require


BaseSpec = TypeVar("BaseSpec", DependencySpec, InlineTable)


Expand Down Expand Up @@ -178,49 +56,167 @@ def dependency_to_specification(
return specification


def pep508_to_dependency_specification(requirement: str) -> DependencySpec | None:
if " ; " not in requirement and re.search(r"@[\^~!=<>\d]", requirement):
# this is of the form package@<semver>, do not attempt to parse it
return None
class RequirementsParser:
def __init__(self, env: Env | None = None, cwd: Path | None = None) -> None:
self._env = env
self._cwd = cwd or Path.cwd()

def parse(self, requirement: str) -> DependencySpec:
requirement = requirement.strip()

with contextlib.suppress(ValueError):
dependency = Dependency.create_from_pep_508(requirement)
specification: DependencySpec = {}
specification = dependency_to_specification(dependency, specification)
specification = self._parse_pep508(requirement)

if specification is not None:
return specification

extras = []
extras_m = re.search(r"\[([\w\d,-_ ]+)\]$", requirement)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
requirement, _ = requirement.split("[")

specification = (
self._parse_url(requirement)
or self._parse_path(requirement)
or self._parse_simple(requirement)
)

if specification:
specification["name"] = dependency.name
if extras and "extras" not in specification:
specification["extras"] = extras
return specification

return None
raise ValueError(f"Invalid dependency specification: {requirement}")

def _parse_pep508(self, requirement: str) -> DependencySpec | None:
if " ; " not in requirement and re.search(r"@[\^~!=<>\d]", requirement):
# this is of the form package@<semver>, do not attempt to parse it
return None

with contextlib.suppress(ValueError):
dependency = Dependency.create_from_pep_508(requirement)
specification: DependencySpec = {}
specification = dependency_to_specification(dependency, specification)

if specification:
specification["name"] = dependency.name
return specification

return None

def _parse_git_url(self, requirement: str) -> DependencySpec | None:
from poetry.core.vcs.git import Git
from poetry.core.vcs.git import ParsedUrl

parsed = ParsedUrl.parse(requirement)
url = Git.normalize_url(requirement)

pair = {"name": parsed.name, "git": url.url}

if parsed.rev:
pair["rev"] = url.revision

if parsed.subdirectory:
pair["subdirectory"] = parsed.subdirectory

def parse_dependency_specification(
requirement: str, env: Env | None = None, cwd: Path | None = None
) -> DependencySpec:
requirement = requirement.strip()
cwd = cwd or Path.cwd()
source_root = self._env.path.joinpath("src") if self._env else None
package = Provider.get_package_from_vcs(
"git",
url=url.url,
rev=pair.get("rev"),
subdirectory=parsed.subdirectory,
source_root=source_root,
)
pair["name"] = package.name
return pair

specification = pep508_to_dependency_specification(requirement)
def _parse_url(self, requirement: str) -> DependencySpec | None:
url_parsed = urllib.parse.urlparse(requirement)
if not (url_parsed.scheme and url_parsed.netloc):
return None

if specification is not None:
return specification
if url_parsed.scheme in ["git+https", "git+ssh"]:
return self._parse_git_url(requirement)

extras = []
extras_m = re.search(r"\[([\w\d,-_ ]+)\]$", requirement)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
requirement, _ = requirement.split("[")
if url_parsed.scheme in ["http", "https"]:
package = Provider.get_package_from_url(requirement)
assert package.source_url is not None
return {"name": package.name, "url": package.source_url}

specification = (
_parse_dependency_specification_url(requirement, env=env)
or _parse_dependency_specification_path(requirement, cwd=cwd)
or _parse_dependency_specification_simple(requirement)
)
return None

def _parse_path(self, requirement: str) -> DependencySpec | None:
if (os.path.sep in requirement or "/" in requirement) and (
self._cwd.joinpath(requirement).exists()
or Path(requirement).expanduser().exists()
and Path(requirement).expanduser().is_absolute()
):
path = Path(requirement).expanduser()
is_absolute = path.is_absolute()

if not path.is_absolute():
path = self._cwd.joinpath(requirement)

if path.is_file():
package = Provider.get_package_from_file(path.resolve())
else:
package = Provider.get_package_from_directory(path.resolve())

return {
"name": package.name,
"path": (
path.relative_to(self._cwd).as_posix()
if not is_absolute
else path.as_posix()
),
}

return None

if specification:
if extras and "extras" not in specification:
specification["extras"] = extras
return specification
def _parse_simple(
self,
requirement: str,
) -> DependencySpec | None:
extras: list[str] = []
pair = re.sub(
"^([^@=: ]+)(?:@|==|(?<![<>~!])=|:| )(.*)$", "\\1 \\2", requirement
)
pair = pair.strip()

raise ValueError(f"Invalid dependency specification: {requirement}")
require: DependencySpec = {}

if " " in pair:
name, version = pair.split(" ", 1)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")

require["name"] = name
if version != "latest":
require["version"] = version
else:
m = re.match(
r"^([^><=!: ]+)((?:>=|<=|>|<|!=|~=|~|\^).*)$", requirement.strip()
)
if m:
name, constraint = m.group(1), m.group(2)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")

require["name"] = name
require["version"] = constraint
else:
extras_m = re.search(r"\[([\w\d,-_]+)\]$", pair)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
pair, _ = pair.split("[")

require["name"] = pair

if extras:
require["extras"] = extras

return require
4 changes: 2 additions & 2 deletions tests/utils/test_dependency_specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from deepdiff import DeepDiff

from poetry.utils.dependency_specification import parse_dependency_specification
from poetry.utils.dependency_specification import RequirementsParser


if TYPE_CHECKING:
Expand Down Expand Up @@ -116,5 +116,5 @@ def _mock(self: Path) -> bool:
mocker.patch("pathlib.Path.exists", _mock)

assert not DeepDiff(
parse_dependency_specification(requirement), specification, ignore_order=True
RequirementsParser().parse(requirement), specification, ignore_order=True
)

0 comments on commit 3248fe1

Please sign in to comment.