diff --git a/doc/api/prep_data/feature_store.rst b/doc/api/prep_data/feature_store.rst index 50a10c5089..731b2e32d1 100644 --- a/doc/api/prep_data/feature_store.rst +++ b/doc/api/prep_data/feature_store.rst @@ -60,6 +60,7 @@ Feature Definition :members: :show-inheritance: + Inputs ****** @@ -181,9 +182,13 @@ Feature Processor Data Source :members: :show-inheritance: +.. autoclass:: sagemaker.feature_store.feature_processor.PySparkDataSource + :members: + :show-inheritance: -Feature Processor Scheduler -*************************** + +Feature Processor Scheduler and Triggers +**************************************** .. automethod:: sagemaker.feature_store.feature_processor.to_pipeline @@ -196,3 +201,12 @@ Feature Processor Scheduler .. automethod:: sagemaker.feature_store.feature_processor.describe .. automethod:: sagemaker.feature_store.feature_processor.list_pipelines + +.. automethod:: sagemaker.feature_store.feature_processor.put_trigger + +.. automethod:: sagemaker.feature_store.feature_processor.enable_trigger + +.. automethod:: sagemaker.feature_store.feature_processor.disable_trigger + +.. automethod:: sagemaker.feature_store.feature_processor.delete_trigger + diff --git a/requirements/extras/test_requirements.txt b/requirements/extras/test_requirements.txt index 0ccce9cb7a..8bd665d1df 100644 --- a/requirements/extras/test_requirements.txt +++ b/requirements/extras/test_requirements.txt @@ -12,7 +12,7 @@ awslogs==0.14.0 black==22.3.0 stopit==1.1.2 # Update tox.ini to have correct version of airflow constraints file -apache-airflow==2.8.1 +apache-airflow==2.8.2 apache-airflow-providers-amazon==7.2.1 attrs>=23.1.0,<24 fabric==2.6.0 diff --git a/src/sagemaker/config/config.py b/src/sagemaker/config/config.py index fa30b05a0e..23b3957905 100644 --- a/src/sagemaker/config/config.py +++ b/src/sagemaker/config/config.py @@ -181,7 +181,9 @@ def _load_config_from_file(file_path: str) -> dict: f"Provide a valid file path" ) logger.debug("Fetching defaults config from location: %s", file_path) - return yaml.safe_load(open(inferred_file_path, "r")) + with open(inferred_file_path, "r") as f: + content = yaml.safe_load(f) + return content def _load_config_from_s3(s3_uri, s3_resource_for_config) -> dict: diff --git a/src/sagemaker/jumpstart/artifacts/resource_requirements.py b/src/sagemaker/jumpstart/artifacts/resource_requirements.py index 8356d1efac..6ee4f31c56 100644 --- a/src/sagemaker/jumpstart/artifacts/resource_requirements.py +++ b/src/sagemaker/jumpstart/artifacts/resource_requirements.py @@ -13,7 +13,7 @@ """This module contains functions for obtaining JumpStart resoure requirements.""" from __future__ import absolute_import -from typing import Optional +from typing import Dict, Optional, Tuple from sagemaker.jumpstart.constants import ( DEFAULT_JUMPSTART_SAGEMAKER_SESSION, @@ -28,6 +28,20 @@ from sagemaker.session import Session from sagemaker.compute_resource_requirements.resource_requirements import ResourceRequirements +REQUIREMENT_TYPE_TO_SPEC_FIELD_NAME_TO_RESOURCE_REQUIREMENT_NAME_MAP: Dict[ + str, Dict[str, Tuple[str, str]] +] = { + "requests": { + "num_accelerators": ("num_accelerators", "num_accelerators"), + "num_cpus": ("num_cpus", "num_cpus"), + "copies": ("copies", "copy_count"), + "min_memory_mb": ("memory", "min_memory"), + }, + "limits": { + "max_memory_mb": ("memory", "max_memory"), + }, +} + def _retrieve_default_resources( model_id: str, @@ -37,6 +51,7 @@ def _retrieve_default_resources( tolerate_vulnerable_model: bool = False, tolerate_deprecated_model: bool = False, sagemaker_session: Session = DEFAULT_JUMPSTART_SAGEMAKER_SESSION, + instance_type: Optional[str] = None, ) -> ResourceRequirements: """Retrieves the default resource requirements for the model. @@ -60,6 +75,8 @@ def _retrieve_default_resources( object, used for SageMaker interactions. If not specified, one is created using the default AWS configuration chain. (Default: sagemaker.jumpstart.constants.DEFAULT_JUMPSTART_SAGEMAKER_SESSION). + instance_type (str): An instance type to optionally supply in order to get + host requirements specific for the instance type. Returns: str: The default resource requirements to use for the model or None. @@ -87,23 +104,44 @@ def _retrieve_default_resources( is_dynamic_container_deployment_supported = ( model_specs.dynamic_container_deployment_supported ) - default_resource_requirements = model_specs.hosting_resource_requirements + default_resource_requirements: Dict[str, int] = ( + model_specs.hosting_resource_requirements or {} + ) else: raise NotImplementedError( f"Unsupported script scope for retrieving default resource requirements: '{scope}'" ) + instance_specific_resource_requirements: Dict[str, int] = ( + model_specs.hosting_instance_type_variants.get_instance_specific_resource_requirements( + instance_type + ) + if instance_type + and getattr(model_specs, "hosting_instance_type_variants", None) is not None + else {} + ) + + default_resource_requirements = { + **default_resource_requirements, + **instance_specific_resource_requirements, + } + if is_dynamic_container_deployment_supported: - requests = {} - if "num_accelerators" in default_resource_requirements: - requests["num_accelerators"] = default_resource_requirements["num_accelerators"] - if "min_memory_mb" in default_resource_requirements: - requests["memory"] = default_resource_requirements["min_memory_mb"] - if "num_cpus" in default_resource_requirements: - requests["num_cpus"] = default_resource_requirements["num_cpus"] - - limits = {} - if "max_memory_mb" in default_resource_requirements: - limits["memory"] = default_resource_requirements["max_memory_mb"] - return ResourceRequirements(requests=requests, limits=limits) + + all_resource_requirement_kwargs = {} + + for ( + requirement_type, + spec_field_to_resource_requirement_map, + ) in REQUIREMENT_TYPE_TO_SPEC_FIELD_NAME_TO_RESOURCE_REQUIREMENT_NAME_MAP.items(): + requirement_kwargs = {} + for spec_field, resource_requirement in spec_field_to_resource_requirement_map.items(): + if spec_field in default_resource_requirements: + requirement_kwargs[resource_requirement[0]] = default_resource_requirements[ + spec_field + ] + + all_resource_requirement_kwargs[requirement_type] = requirement_kwargs + + return ResourceRequirements(**all_resource_requirement_kwargs) return None diff --git a/src/sagemaker/jumpstart/factory/model.py b/src/sagemaker/jumpstart/factory/model.py index 64e4727116..1b41cad714 100644 --- a/src/sagemaker/jumpstart/factory/model.py +++ b/src/sagemaker/jumpstart/factory/model.py @@ -481,6 +481,7 @@ def _add_resources_to_kwargs(kwargs: JumpStartModelInitKwargs) -> JumpStartModel tolerate_deprecated_model=kwargs.tolerate_deprecated_model, tolerate_vulnerable_model=kwargs.tolerate_vulnerable_model, sagemaker_session=kwargs.sagemaker_session, + instance_type=kwargs.instance_type, ) return kwargs diff --git a/src/sagemaker/jumpstart/types.py b/src/sagemaker/jumpstart/types.py index 49d3e295c5..810d1c4cd3 100644 --- a/src/sagemaker/jumpstart/types.py +++ b/src/sagemaker/jumpstart/types.py @@ -478,6 +478,29 @@ def get_instance_specific_artifact_key(self, instance_type: str) -> Optional[str instance_type=instance_type, property_name="artifact_key" ) + def get_instance_specific_resource_requirements(self, instance_type: str) -> Optional[str]: + """Returns instance specific resource requirements. + + If a value exists for both the instance family and instance type, the instance type value + is chosen. + """ + + instance_specific_resource_requirements: dict = ( + self.variants.get(instance_type, {}) + .get("properties", {}) + .get("resource_requirements", {}) + ) + + instance_type_family = get_instance_type_family(instance_type) + + instance_family_resource_requirements: dict = ( + self.variants.get(instance_type_family, {}) + .get("properties", {}) + .get("resource_requirements", {}) + ) + + return {**instance_family_resource_requirements, **instance_specific_resource_requirements} + def _get_instance_specific_property( self, instance_type: str, property_name: str ) -> Optional[str]: diff --git a/src/sagemaker/local/image.py b/src/sagemaker/local/image.py index 7893ee9260..39c879ef6d 100644 --- a/src/sagemaker/local/image.py +++ b/src/sagemaker/local/image.py @@ -860,7 +860,9 @@ def _create_docker_host( # to setting --runtime=nvidia in the docker commandline. if self.instance_type == "local_gpu": host_config["deploy"] = { - "resources": {"reservations": {"devices": [{"capabilities": ["gpu"]}]}} + "resources": { + "reservations": {"devices": [{"count": "all", "capabilities": ["gpu"]}]} + } } if not self.is_studio and command == "serve": diff --git a/src/sagemaker/remote_function/client.py b/src/sagemaker/remote_function/client.py index 49091fc60c..0dc69d8647 100644 --- a/src/sagemaker/remote_function/client.py +++ b/src/sagemaker/remote_function/client.py @@ -694,11 +694,6 @@ def __init__( encrypt_inter_container_traffic (bool): A flag that specifies whether traffic between training containers is encrypted for the training job. Defaults to ``False``. - enable_network_isolation (bool): A flag that specifies whether container will run in - network isolation mode. Defaults to ``False``. Network isolation mode restricts the - container access to outside networks (such as the Internet). The container does not - make any inbound or outbound network calls. Also known as Internet-free mode. - spark_config (SparkConfig): Configurations to the Spark application that runs on Spark image. If ``spark_config`` is specified, a SageMaker Spark image uri will be used for training. Note that ``image_uri`` can not be specified at the diff --git a/src/sagemaker/resource_requirements.py b/src/sagemaker/resource_requirements.py index 446d034bf3..93b2833a35 100644 --- a/src/sagemaker/resource_requirements.py +++ b/src/sagemaker/resource_requirements.py @@ -16,6 +16,7 @@ import logging from typing import Optional +from sagemaker.compute_resource_requirements.resource_requirements import ResourceRequirements from sagemaker.jumpstart import utils as jumpstart_utils from sagemaker.jumpstart import artifacts @@ -33,7 +34,8 @@ def retrieve_default( tolerate_vulnerable_model: bool = False, tolerate_deprecated_model: bool = False, sagemaker_session: Session = DEFAULT_JUMPSTART_SAGEMAKER_SESSION, -) -> str: + instance_type: Optional[str] = None, +) -> ResourceRequirements: """Retrieves the default resource requirements for the model matching the given arguments. Args: @@ -56,6 +58,8 @@ def retrieve_default( object, used for SageMaker interactions. If not specified, one is created using the default AWS configuration chain. (Default: sagemaker.jumpstart.constants.DEFAULT_JUMPSTART_SAGEMAKER_SESSION). + instance_type (str): An instance type to optionally supply in order to get + host requirements specific for the instance type. Returns: str: The default resource requirements to use for the model. @@ -79,4 +83,5 @@ def retrieve_default( tolerate_vulnerable_model, tolerate_deprecated_model, sagemaker_session=sagemaker_session, + instance_type=instance_type, ) diff --git a/tests/unit/sagemaker/config/test_config.py b/tests/unit/sagemaker/config/test_config.py index 9e89a5890b..35135db81e 100644 --- a/tests/unit/sagemaker/config/test_config.py +++ b/tests/unit/sagemaker/config/test_config.py @@ -34,7 +34,9 @@ @pytest.fixture() def config_file_as_yaml(get_data_dir): config_file_path = os.path.join(get_data_dir, "config.yaml") - return open(config_file_path, "r").read() + with open(config_file_path, "r") as f: + content = f.read() + return content @pytest.fixture() @@ -42,7 +44,13 @@ def expected_merged_config(get_data_dir): expected_merged_config_file_path = os.path.join( get_data_dir, "expected_output_config_after_merge.yaml" ) - return yaml.safe_load(open(expected_merged_config_file_path, "r").read()) + with open(expected_merged_config_file_path, "r") as f: + content = yaml.safe_load(f.read()) + return content + + +def _raise_valueerror(*args): + raise ValueError(args) def test_config_when_default_config_file_and_user_config_file_is_not_found(): @@ -60,7 +68,8 @@ def test_config_when_overriden_default_config_file_is_not_found(get_data_dir): def test_invalid_config_file_which_has_python_code(get_data_dir): invalid_config_file_path = os.path.join(get_data_dir, "config_file_with_code.yaml") # no exceptions will be thrown with yaml.unsafe_load - yaml.unsafe_load(open(invalid_config_file_path, "r")) + with open(invalid_config_file_path, "r") as f: + yaml.unsafe_load(f) # PyYAML will throw exceptions for yaml.safe_load. SageMaker Config is using # yaml.safe_load internally with pytest.raises(ConstructorError) as exception_info: @@ -228,7 +237,8 @@ def test_merge_of_s3_default_config_file_and_regular_config_file( get_data_dir, expected_merged_config, s3_resource_mock ): config_file_content_path = os.path.join(get_data_dir, "sample_config_for_merge.yaml") - config_file_as_yaml = open(config_file_content_path, "r").read() + with open(config_file_content_path, "r") as f: + config_file_as_yaml = f.read() config_file_bucket = "config-file-bucket" config_file_s3_prefix = "config/config.yaml" config_file_s3_uri = "s3://{}/{}".format(config_file_bucket, config_file_s3_prefix) @@ -440,8 +450,11 @@ def test_load_local_mode_config(mock_load_config): mock_load_config.assert_called_with(_DEFAULT_LOCAL_MODE_CONFIG_FILE_PATH) -def test_load_local_mode_config_when_config_file_is_not_found(): +@patch("sagemaker.config.config._load_config_from_file", side_effect=_raise_valueerror) +def test_load_local_mode_config_when_config_file_is_not_found(mock_load_config): + # Patch is needed because one might actually have a local config file assert load_local_mode_config() is None + mock_load_config.assert_called_with(_DEFAULT_LOCAL_MODE_CONFIG_FILE_PATH) @pytest.mark.parametrize( diff --git a/tests/unit/sagemaker/jumpstart/constants.py b/tests/unit/sagemaker/jumpstart/constants.py index a3c4c747f7..605253466a 100644 --- a/tests/unit/sagemaker/jumpstart/constants.py +++ b/tests/unit/sagemaker/jumpstart/constants.py @@ -840,8 +840,22 @@ "model_package_arn": "$gpu_model_package_arn", } }, + "g5": { + "properties": { + "resource_requirements": { + "num_accelerators": 888810, + "randon-field-2": 2222, + } + } + }, "m2": {"regional_properties": {"image_uri": "$cpu_image_uri"}}, "c2": {"regional_properties": {"image_uri": "$cpu_image_uri"}}, + "ml.g5.xlarge": { + "properties": { + "environment_variables": {"TENSOR_PARALLEL_DEGREE": "8"}, + "resource_requirements": {"num_accelerators": 10}, + } + }, "ml.g5.48xlarge": { "properties": {"environment_variables": {"TENSOR_PARALLEL_DEGREE": "8"}} }, @@ -857,6 +871,12 @@ "framework_version": "1.5.0", "py_version": "py3", }, + "dynamic_container_deployment_supported": True, + "hosting_resource_requirements": { + "min_memory_mb": 81999, + "num_accelerators": 1, + "random_field_1": 1, + }, "hosting_artifact_key": "pytorch-infer/infer-pytorch-ic-mobilenet-v2.tar.gz", "training_artifact_key": "pytorch-training/train-pytorch-ic-mobilenet-v2.tar.gz", "hosting_script_key": "source-directory-tarballs/pytorch/inference/ic/v1.0.0/sourcedir.tar.gz", diff --git a/tests/unit/sagemaker/jumpstart/test_types.py b/tests/unit/sagemaker/jumpstart/test_types.py index 82e69e1d89..a9daad934d 100644 --- a/tests/unit/sagemaker/jumpstart/test_types.py +++ b/tests/unit/sagemaker/jumpstart/test_types.py @@ -34,6 +34,7 @@ "variants": { "ml.p2.12xlarge": { "properties": { + "resource_requirements": {"req1": 1, "req2": {"1": 2, "2": 3}, "req3": 9}, "environment_variables": {"TENSOR_PARALLEL_DEGREE": "4"}, "supported_inference_instance_types": ["ml.p5.xlarge"], "default_inference_instance_type": "ml.p5.xlarge", @@ -60,6 +61,11 @@ "p2": { "regional_properties": {"image_uri": "$gpu_image_uri"}, "properties": { + "resource_requirements": { + "req2": {"2": 5, "9": 999}, + "req3": 999, + "req4": "blah", + }, "supported_inference_instance_types": ["ml.p2.xlarge", "ml.p3.xlarge"], "default_inference_instance_type": "ml.p2.xlarge", "metrics": [ @@ -879,3 +885,20 @@ def test_jumpstart_training_artifact_key_instance_variants(): ) is None ) + + +def test_jumpstart_resource_requirements_instance_variants(): + assert INSTANCE_TYPE_VARIANT.get_instance_specific_resource_requirements( + instance_type="ml.p2.xlarge" + ) == {"req2": {"2": 5, "9": 999}, "req3": 999, "req4": "blah"} + + assert INSTANCE_TYPE_VARIANT.get_instance_specific_resource_requirements( + instance_type="ml.p2.12xlarge" + ) == {"req1": 1, "req2": {"1": 2, "2": 3}, "req3": 9, "req4": "blah"} + + assert ( + INSTANCE_TYPE_VARIANT.get_instance_specific_resource_requirements( + instance_type="ml.p99.12xlarge" + ) + == {} + ) diff --git a/tests/unit/sagemaker/local/test_local_image.py b/tests/unit/sagemaker/local/test_local_image.py index ebca91a9f9..08c55fa0b4 100644 --- a/tests/unit/sagemaker/local/test_local_image.py +++ b/tests/unit/sagemaker/local/test_local_image.py @@ -871,7 +871,7 @@ def test_container_has_gpu_support(tmpdir, sagemaker_session): docker_host = sagemaker_container._create_docker_host("host-1", {}, set(), "train", []) assert "deploy" in docker_host assert docker_host["deploy"] == { - "resources": {"reservations": {"devices": [{"capabilities": ["gpu"]}]}} + "resources": {"reservations": {"devices": [{"count": "all", "capabilities": ["gpu"]}]}} } diff --git a/tests/unit/sagemaker/resource_requirements/jumpstart/test_resource_requirements.py b/tests/unit/sagemaker/resource_requirements/jumpstart/test_resource_requirements.py index 28b53270f8..b0cef0e3d4 100644 --- a/tests/unit/sagemaker/resource_requirements/jumpstart/test_resource_requirements.py +++ b/tests/unit/sagemaker/resource_requirements/jumpstart/test_resource_requirements.py @@ -18,6 +18,10 @@ import pytest from sagemaker import resource_requirements +from sagemaker.compute_resource_requirements.resource_requirements import ResourceRequirements +from sagemaker.jumpstart.artifacts.resource_requirements import ( + REQUIREMENT_TYPE_TO_SPEC_FIELD_NAME_TO_RESOURCE_REQUIREMENT_NAME_MAP, +) from tests.unit.sagemaker.jumpstart.utils import get_spec_from_base_spec, get_special_model_spec @@ -50,6 +54,55 @@ def test_jumpstart_resource_requirements(patched_get_model_specs): patched_get_model_specs.reset_mock() +@patch("sagemaker.jumpstart.accessors.JumpStartModelsAccessor.get_model_specs") +def test_jumpstart_resource_requirements_instance_type_variants(patched_get_model_specs): + + patched_get_model_specs.side_effect = get_special_model_spec + region = "us-west-2" + mock_client = boto3.client("s3") + mock_session = Mock(s3_client=mock_client) + + model_id, model_version = "variant-model", "*" + default_inference_resource_requirements = resource_requirements.retrieve_default( + region=region, + model_id=model_id, + model_version=model_version, + scope="inference", + sagemaker_session=mock_session, + instance_type="ml.g5.xlarge", + ) + assert default_inference_resource_requirements.requests == { + "memory": 81999, + "num_accelerators": 10, + } + + default_inference_resource_requirements = resource_requirements.retrieve_default( + region=region, + model_id=model_id, + model_version=model_version, + scope="inference", + sagemaker_session=mock_session, + instance_type="ml.g5.555xlarge", + ) + assert default_inference_resource_requirements.requests == { + "memory": 81999, + "num_accelerators": 888810, + } + + default_inference_resource_requirements = resource_requirements.retrieve_default( + region=region, + model_id=model_id, + model_version=model_version, + scope="inference", + sagemaker_session=mock_session, + instance_type="ml.f9.555xlarge", + ) + assert default_inference_resource_requirements.requests == { + "memory": 81999, + "num_accelerators": 1, + } + + @patch("sagemaker.jumpstart.accessors.JumpStartModelsAccessor.get_model_specs") def test_jumpstart_no_supported_resource_requirements(patched_get_model_specs): patched_get_model_specs.side_effect = get_special_model_spec @@ -80,3 +133,18 @@ def test_jumpstart_no_supported_resource_requirements(patched_get_model_specs): resource_requirements.retrieve_default( region=region, model_id=model_id, model_version=model_version, scope="training" ) + + +def test_jumpstart_supports_all_resource_requirement_fields(): + + all_tracked_resource_requirement_fields = { + field + for requirements in REQUIREMENT_TYPE_TO_SPEC_FIELD_NAME_TO_RESOURCE_REQUIREMENT_NAME_MAP.values() + for _, field in requirements.values() + } + + excluded_resource_requirement_fields = {"requests", "limits"} + assert ( + set(ResourceRequirements().__dict__.keys()) - excluded_resource_requirement_fields + == all_tracked_resource_requirement_fields + ) diff --git a/tox.ini b/tox.ini index 66e546372b..d990467b3b 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = black-format,flake8,pylint,docstyle,sphinx,doc8,twine,py38,py39,py310 +envlist = black-format,flake8,pylint,docstyle,sphinx,doc8,twine,py38,py39,py310,py311 skip_missing_interpreters = False @@ -81,15 +81,16 @@ passenv = # Can be used to specify which tests to run, e.g.: tox -- -s commands = python -c "import os; os.system('install-custom-pkgs --install-boto-wheels')" - pip install 'apache-airflow==2.8.1' --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.8.txt" + pip install 'apache-airflow==2.8.2' --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.2/constraints-3.8.txt" pip install 'torch==2.0.1+cpu' -f 'https://download.pytorch.org/whl/torch_stable.html' pip install 'torchvision==0.15.2+cpu' -f 'https://download.pytorch.org/whl/torch_stable.html' + pip install 'dill>=0.3.8' pytest --cov=sagemaker --cov-append {posargs} - {env:IGNORE_COVERAGE:} coverage report -i --fail-under=86 +{env:IGNORE_COVERAGE:} coverage report -i --fail-under=86 deps = .[test] depends = - {py38,py39,py310}: clean + {py38,py39,py310,p311}: clean [testenv:flake8] skipdist = true