Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Pipelines] avoid changing base_path in validation #26851

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,38 @@
from azure.ai.ml.entities._validation import MutableValidationResult, _ValidationResultBuilder

ADDITIONAL_INCLUDES_SUFFIX = "additional_includes"
PLACEHOLDER_FILE_NAME = "_placeholder_spec.yaml"


class _AdditionalIncludes:
def __init__(self, code_path: Union[None, str], yaml_path: str):
self.__yaml_path = Path(yaml_path)
self.__yaml_path = yaml_path
self.__code_path = code_path

self._tmp_code_path = None
self._includes = None
if self._additional_includes_file_path.is_file():
self.__includes = None

@property
def _includes(self):
if not self._additional_includes_file_path.is_file():
return []
if self.__includes is None:
with open(self._additional_includes_file_path, "r") as f:
lines = f.readlines()
self._includes = [line.strip() for line in lines if len(line.strip()) > 0]
self.__includes = [line.strip() for line in lines if len(line.strip()) > 0]
return self.__includes

@property
def with_includes(self):
return len(self._includes) != 0

@property
def _yaml_path(self) -> Path:
return self.__yaml_path
if self.__yaml_path is None:
# if yaml path is not specified, use a not created
# temp file name
elliotzh marked this conversation as resolved.
Show resolved Hide resolved
return Path.cwd() / PLACEHOLDER_FILE_NAME
return Path(self.__yaml_path)

@property
def _code_path(self) -> Path:
Expand Down Expand Up @@ -56,25 +71,28 @@ def _copy(src: Path, dst: Path) -> None:
shutil.copytree(src, dst)

def _validate(self) -> MutableValidationResult:
# pylint: disable=too-many-return-statements
if self._includes is None:
return _ValidationResultBuilder.success()
validation_result = _ValidationResultBuilder.success()
if not self.with_includes:
return validation_result
for additional_include in self._includes:
include_path = self._additional_includes_file_path.parent / additional_include
# if additional include has not supported characters, resolve will fail and raise OSError
try:
src_path = include_path.resolve()
except OSError:
error_msg = f"Failed to resolve additional include {additional_include} for {self._yaml_name}."
return _ValidationResultBuilder.from_single_message(error_msg)
validation_result.append_error(message=error_msg)
continue

if not src_path.exists():
error_msg = f"Unable to find additional include {additional_include} for {self._yaml_name}."
return _ValidationResultBuilder.from_single_message(error_msg)
validation_result.append_error(message=error_msg)
continue

if len(src_path.parents) == 0:
error_msg = f"Root directory is not supported for additional includes for {self._yaml_name}."
return _ValidationResultBuilder.from_single_message(error_msg)
validation_result.append_error(message=error_msg)
continue

dst_path = Path(self._code_path) / src_path.name
if dst_path.is_symlink():
Expand All @@ -84,11 +102,12 @@ def _validate(self) -> MutableValidationResult:
f"A symbolic link already exists for additional include {additional_include} "
f"for {self._yaml_name}."
)
return _ValidationResultBuilder.from_single_message(error_msg)
validation_result.append_error(message=error_msg)
continue
elif dst_path.exists():
error_msg = f"A file already exists for additional include {additional_include} for {self._yaml_name}."
return _ValidationResultBuilder.from_single_message(error_msg)
return _ValidationResultBuilder.success()
validation_result.append_error(message=error_msg)
return validation_result

def resolve(self) -> None:
"""Resolve code and potential additional includes.
Expand All @@ -97,7 +116,7 @@ def resolve(self) -> None:
original real code path; otherwise, create a tmp folder and copy
all files under real code path and additional includes to it.
"""
if self._includes is None:
if not self.with_includes:
return
tmp_folder_path = Path(tempfile.mkdtemp())
# code can be either file or folder, as additional includes exists, need to copy to temporary folder
Expand Down
53 changes: 23 additions & 30 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# disable redefined-builtin to use id/type as argument name
from contextlib import contextmanager
from typing import Dict, Union
import os

from marshmallow import INCLUDE, Schema

Expand Down Expand Up @@ -177,30 +176,24 @@ def _additional_includes(self):
def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
return InternalBaseComponentSchema(context=context)

def _validate(self, raise_error=False) -> MutableValidationResult:
if self._additional_includes is not None and self._additional_includes._validate().passed:
# update source path in case dependency file is in additional_includes
with self._resolve_local_code() as tmp_base_path:
origin_base_path, origin_source_path = self._base_path, self._source_path

try:
self._base_path, self._source_path = \
tmp_base_path, tmp_base_path / os.path.basename(self._source_path)
return super()._validate(raise_error=raise_error)
finally:
self._base_path, self._source_path = origin_base_path, origin_source_path

return super()._validate(raise_error=raise_error)

def _customized_validate(self) -> MutableValidationResult:
validation_result = super(InternalComponent, self)._customized_validate()
if self._additional_includes.with_includes:
validation_result.merge_with(self._additional_includes._validate())
# resolving additional includes & update self._base_path can be dangerous,
# so we just skip path validation if additional_includes is used
# note that there will still be runtime error in submission or execution
skip_path_validation = True
else:
skip_path_validation = False
if isinstance(self.environment, InternalEnvironment):
validation_result.merge_with(
self.environment._validate(self._source_path),
self.environment._validate(
self._base_path,
skip_path_validation=skip_path_validation
),
field_name="environment",
)
if self._additional_includes is not None:
validation_result.merge_with(self._additional_includes._validate())
return validation_result

@classmethod
Expand All @@ -215,8 +208,6 @@ def _load_from_rest(cls, obj: ComponentVersionData) -> "InternalComponent":
)

def _to_rest_object(self) -> ComponentVersionData:
if isinstance(self.environment, InternalEnvironment):
self.environment.resolve(self._source_path)
component = convert_ordered_dict_to_dict(self._to_dict())

properties = ComponentVersionDetails(
Expand All @@ -232,15 +223,17 @@ def _to_rest_object(self) -> ComponentVersionData:

@contextmanager
def _resolve_local_code(self):
# if `self._source_path` is None, component is not loaded from local yaml and
# no need to resolve
if self._source_path is None:
yield self.code
else:
self._additional_includes.resolve()
# use absolute path in case temp folder & work dir are in different drive
yield self._additional_includes.code.absolute()
self._additional_includes.cleanup()
self._additional_includes.resolve()

# file dependency in code will be read during internal environment resolution
# for example, docker file of the environment may be in additional includes
# and it will be read then insert to the environment object during resolution
# so we need to resolve environment based on the temporary code path
if isinstance(self.environment, InternalEnvironment):
self.environment.resolve(self._additional_includes.code)
# use absolute path in case temp folder & work dir are in different drive
yield self._additional_includes.code.absolute()
self._additional_includes.cleanup()

def __call__(self, *args, **kwargs) -> InternalBaseNode: # pylint: disable=useless-super-delegation
return super(InternalComponent, self).__call__(*args, **kwargs)
34 changes: 17 additions & 17 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# ---------------------------------------------------------

from pathlib import Path
from typing import Dict
from typing import Dict, Union

from azure.ai.ml._utils.utils import load_yaml
from azure.ai.ml.constants._common import FILE_PREFIX
Expand Down Expand Up @@ -40,7 +40,7 @@ def __init__(
def _parse_file_path(value: str) -> str:
return value[len(FILE_PREFIX) :] if value.startswith(FILE_PREFIX) else value

def _validate_conda_section(self, source_path: str) -> MutableValidationResult:
def _validate_conda_section(self, base_path: str, skip_path_validation: bool) -> MutableValidationResult:
validation_result = _ValidationResultBuilder.success()
if not self.conda:
return validation_result
Expand All @@ -53,56 +53,56 @@ def _validate_conda_section(self, source_path: str) -> MutableValidationResult:
)
if self.conda.get(self.CONDA_DEPENDENCIES_FILE):
conda_dependencies_file = self.conda[self.CONDA_DEPENDENCIES_FILE]
if not (Path(source_path).parent / conda_dependencies_file).is_file():
if not skip_path_validation and not (Path(base_path) / conda_dependencies_file).is_file():
validation_result.append_error(
yaml_path=f"conda.{self.CONDA_DEPENDENCIES_FILE}",
message=f"Cannot find conda dependencies file: {conda_dependencies_file!r}",
)
if self.conda.get(self.PIP_REQUIREMENTS_FILE):
pip_requirements_file = self.conda[self.PIP_REQUIREMENTS_FILE]
if not (Path(source_path).parent / pip_requirements_file).is_file():
if not skip_path_validation and not (Path(base_path) / pip_requirements_file).is_file():
validation_result.append_error(
yaml_path=f"conda.{self.PIP_REQUIREMENTS_FILE}",
message=f"Cannot find pip requirements file: {pip_requirements_file!r}",
)
return validation_result

def _validate_docker_section(self, source_path: str) -> MutableValidationResult:
def _validate_docker_section(self, base_path: str, skip_path_validation: bool) -> MutableValidationResult:
validation_result = _ValidationResultBuilder.success()
if not self.docker:
return validation_result
if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE):
return validation_result
dockerfile_file = self.docker[self.BUILD][self.DOCKERFILE]
dockerfile_file = self._parse_file_path(dockerfile_file)
if not (Path(source_path).parent / dockerfile_file).is_file():
if not skip_path_validation and not (Path(base_path) / dockerfile_file).is_file():
validation_result.append_error(
yaml_path=f"docker.{self.BUILD}.{self.DOCKERFILE}",
message=f"Dockerfile not exists: {dockerfile_file}",
)
return validation_result

def _validate(self, source_path: str) -> MutableValidationResult:
def _validate(self, base_path: str, skip_path_validation: bool = False) -> MutableValidationResult:
validation_result = _ValidationResultBuilder.success()
if self.os is not None and self.os not in {"Linux", "Windows"}:
validation_result.append_error(
yaml_path="os",
message=f"Only support 'Linux' and 'Windows', but got {self.os!r}",
)
validation_result.merge_with(self._validate_conda_section(source_path))
validation_result.merge_with(self._validate_docker_section(source_path))
validation_result.merge_with(self._validate_conda_section(base_path, skip_path_validation))
validation_result.merge_with(self._validate_docker_section(base_path, skip_path_validation))
return validation_result

def _resolve_conda_section(self, source_path: str) -> None:
def _resolve_conda_section(self, base_path: Union[Path, str]) -> None:
if not self.conda:
return
if self.conda.get(self.CONDA_DEPENDENCIES_FILE):
conda_dependencies_file = self.conda.pop(self.CONDA_DEPENDENCIES_FILE)
self.conda[self.CONDA_DEPENDENCIES] = load_yaml(Path(source_path).parent / conda_dependencies_file)
self.conda[self.CONDA_DEPENDENCIES] = load_yaml(Path(base_path) / conda_dependencies_file)
return
if self.conda.get(self.PIP_REQUIREMENTS_FILE):
pip_requirements_file = self.conda.pop(self.PIP_REQUIREMENTS_FILE)
with open(Path(source_path).parent / pip_requirements_file) as f:
with open(Path(base_path) / pip_requirements_file) as f:
pip_requirements = f.read().splitlines()
self.conda = {
self.CONDA_DEPENDENCIES: {
Expand All @@ -117,7 +117,7 @@ def _resolve_conda_section(self, source_path: str) -> None:
}
return

def _resolve_docker_section(self, source_path: str) -> None:
def _resolve_docker_section(self, base_path: Union[Path, str]) -> None:
if not self.docker:
return
if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE):
Expand All @@ -126,10 +126,10 @@ def _resolve_docker_section(self, source_path: str) -> None:
if not dockerfile_file.startswith(FILE_PREFIX):
return
dockerfile_file = self._parse_file_path(dockerfile_file)
with open(Path(source_path).parent / dockerfile_file, "r") as f:
with open(Path(base_path) / dockerfile_file, "r") as f:
self.docker[self.BUILD][self.DOCKERFILE] = f.read()
return

def resolve(self, source_path: str) -> None:
self._resolve_conda_section(source_path)
self._resolve_docker_section(source_path)
def resolve(self, base_path: Union[Path, str]) -> None:
self._resolve_conda_section(base_path)
self._resolve_docker_section(base_path)
13 changes: 9 additions & 4 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_schema/core/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,26 @@ def _serialize(self, value, attr, obj, **kwargs) -> typing.Optional[str]:
return super(LocalPathField, self)._serialize(value, attr, obj, **kwargs)

def _validate(self, value):
base_path_err_msg = ""
try:
path = Path(value)
base_path = Path(self.context[BASE_PATH_CONTEXT_KEY])
if not path.is_absolute():
path = base_path / path
path.resolve()
base_path_err_msg = f" Resolved absolute path: {path.absolute()}"
if (self._allow_dir and path.is_dir()) or (self._allow_file and path.is_file()):
return super(LocalPathField, self)._validate(value)
except OSError:
pass
if self._allow_dir and self._allow_file:
raise ValidationError(f"{value} is not a valid path")
if self._allow_dir:
raise ValidationError(f"{value} is not a valid directory")
raise ValidationError(f"{value} is not a valid file")
allow_type = "directory or file"
elif self._allow_dir:
allow_type = "directory"
else:
allow_type = "file"
raise ValidationError(f"Value {value!r} passed is not a valid "
f"{allow_type} path.{base_path_err_msg}")


class SerializeValidatedUrl(fields.Url):
Expand Down
27 changes: 15 additions & 12 deletions sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,20 +449,23 @@ def __call__(self, *args, **kwargs) -> [..., Union["Command", "Parallel"]]:
@contextmanager
def _resolve_local_code(self):
"""Resolve working directory path for the component."""
with tempfile.TemporaryDirectory() as tmp_dir:
if hasattr(self, "code"):
code = getattr(self, "code")
# Hack: when code not specified, we generated a file which contains
# COMPONENT_PLACEHOLDER as code
# This hack was introduced because job does not allow running component without a
# code, and we need to make sure when component updated some field(eg: description),
# the code remains the same. Benefit of using a constant code for all components
# without code is this will generate same code for anonymous components which
# enables component reuse
if code is None:
if hasattr(self, "code"):
code = getattr(self, "code")
# Hack: when code not specified, we generated a file which contains
# COMPONENT_PLACEHOLDER as code
# This hack was introduced because job does not allow running component without a
# code, and we need to make sure when component updated some field(eg: description),
# the code remains the same. Benefit of using a constant code for all components
# without code is this will generate same code for anonymous components which
# enables component reuse
if code is None:
with tempfile.TemporaryDirectory() as tmp_dir:
code = Path(tmp_dir) / COMPONENT_PLACEHOLDER
with open(code, "w") as f:
f.write(COMPONENT_CODE_PLACEHOLDER)
yield code
yield code
else:
yield code
else:
with tempfile.TemporaryDirectory() as tmp_dir:
yield tmp_dir
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def test_serialize_deserialize_default_code(self, mock_machinelearning_client: M
component_entity = load_component_entity_from_yaml(test_path, mock_machinelearning_client)
# make sure default code has generated with name and version as content
assert component_entity.code
assert COMPONENT_CODE_PLACEHOLDER == component_entity.code
assert component_entity.code == COMPONENT_CODE_PLACEHOLDER

def test_serialize_deserialize_input_output_path(self, mock_machinelearning_client: MLClient):
expected_value_dict = {
Expand Down
Loading