diff --git a/src/sagemaker/local/image.py b/src/sagemaker/local/image.py index 60f28d3b0c..f38bc1fbe5 100644 --- a/src/sagemaker/local/image.py +++ b/src/sagemaker/local/image.py @@ -42,6 +42,7 @@ import sagemaker.utils CONTAINER_PREFIX = "algo" +STUDIO_HOST_NAME = "sagemaker-local" DOCKER_COMPOSE_FILENAME = "docker-compose.yaml" DOCKER_COMPOSE_HTTP_TIMEOUT_ENV = "COMPOSE_HTTP_TIMEOUT" DOCKER_COMPOSE_HTTP_TIMEOUT = "120" @@ -50,6 +51,7 @@ REGION_ENV_NAME = "AWS_REGION" TRAINING_JOB_NAME_ENV_NAME = "TRAINING_JOB_NAME" S3_ENDPOINT_URL_ENV_NAME = "S3_ENDPOINT_URL" +SM_STUDIO_LOCAL_MODE = "SM_STUDIO_LOCAL_MODE" # SELinux Enabled SELINUX_ENABLED = os.environ.get("SAGEMAKER_LOCAL_SELINUX_ENABLED", "False").lower() in [ @@ -107,10 +109,30 @@ def __init__( # Since we are using a single docker network, Generate a random suffix to attach to the # container names. This way multiple jobs can run in parallel. suffix = "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(5)) - self.hosts = [ - "{}-{}-{}".format(CONTAINER_PREFIX, i, suffix) - for i in range(1, self.instance_count + 1) - ] + self.is_studio = sagemaker.local.utils.check_for_studio() + if self.is_studio: + if self.instance_count > 1: + raise NotImplementedError( + "Multi instance Local Mode execution is " + "currently not supported in SageMaker Studio." + ) + # For studio use-case, directories need to be created in `~/tmp`, rather than /tmp + home = os.path.expanduser("~") + root_dir = os.path.join(home, "tmp") + if not os.path.isdir(root_dir): + os.mkdir(root_dir) + if self.sagemaker_session.config: + self.sagemaker_session.config["local"]["container_root"] = root_dir + else: + self.sagemaker_session.config = {"local": {"container_root": root_dir}} + # Studio only supports single instance run + self.hosts = [STUDIO_HOST_NAME] + else: + self.hosts = [ + "{}-{}-{}".format(CONTAINER_PREFIX, i, suffix) + for i in range(1, self.instance_count + 1) + ] + self.container_root = None self.container = None @@ -201,22 +223,17 @@ def process( self._generate_compose_file( "process", additional_volumes=volumes, additional_env_vars=environment ) - compose_command = self._compose() if _ecr_login_if_needed(self.sagemaker_session.boto_session, self.image): _pull_image(self.image) + compose_command = self._compose() process = subprocess.Popen( compose_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) try: _stream_output(process) - except RuntimeError as e: - # _stream_output() doesn't have the command line. We will handle the exception - # which contains the exit code and append the command line to it. - msg = f"Failed to run: {compose_command}" - raise RuntimeError(msg) from e finally: # Uploading processing outputs back to Amazon S3. self._upload_processing_outputs(data_dir, processing_output_config) @@ -283,22 +300,17 @@ def train(self, input_data_config, output_data_config, hyperparameters, environm compose_data = self._generate_compose_file( "train", additional_volumes=volumes, additional_env_vars=training_env_vars ) - compose_command = self._compose() if _ecr_login_if_needed(self.sagemaker_session.boto_session, self.image): _pull_image(self.image) + compose_command = self._compose() process = subprocess.Popen( compose_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) try: _stream_output(process) - except RuntimeError as e: - # _stream_output() doesn't have the command line. We will handle the exception - # which contains the exit code and append the command line to it. - msg = "Failed to run: %s, %s" % (compose_command, str(e)) - raise RuntimeError(msg) finally: artifacts = self.retrieve_artifacts(compose_data, output_data_config, job_name) @@ -347,6 +359,7 @@ def serve(self, model_dir, environment): self._generate_compose_file( "serve", additional_env_vars=environment, additional_volumes=volumes ) + compose_command = self._compose() self.container = _HostingContainer(compose_command) @@ -710,6 +723,9 @@ def _generate_compose_file(self, command, additional_volumes=None, additional_en additional_env_var_list = ["{}={}".format(k, v) for k, v in additional_env_vars.items()] environment.extend(additional_env_var_list) + if self.is_studio: + environment.extend([f"{SM_STUDIO_LOCAL_MODE}=True"]) + if os.environ.get(DOCKER_COMPOSE_HTTP_TIMEOUT_ENV) is None: os.environ[DOCKER_COMPOSE_HTTP_TIMEOUT_ENV] = DOCKER_COMPOSE_HTTP_TIMEOUT @@ -723,12 +739,19 @@ def _generate_compose_file(self, command, additional_volumes=None, additional_en for h in self.hosts } - content = { - # Use version 2.3 as a minimum so that we can specify the runtime - "version": "2.3", - "services": services, - "networks": {"sagemaker-local": {"name": "sagemaker-local"}}, - } + if self.is_studio: + content = { + # Use version 2.3 as a minimum so that we can specify the runtime + "version": "2.3", + "services": services, + } + else: + content = { + # Use version 2.3 as a minimum so that we can specify the runtime + "version": "2.3", + "services": services, + "networks": {"sagemaker-local": {"name": "sagemaker-local"}}, + } docker_compose_path = os.path.join(self.container_root, DOCKER_COMPOSE_FILENAME) @@ -810,7 +833,6 @@ def _create_docker_host( "tty": True, "volumes": [v.map for v in optml_volumes], "environment": environment, - "networks": {"sagemaker-local": {"aliases": [host]}}, } is_train_with_entrypoint = False @@ -827,6 +849,11 @@ def _create_docker_host( if self.container_arguments: host_config["entrypoint"] = host_config["entrypoint"] + self.container_arguments + if self.is_studio: + host_config["network_mode"] = "sagemaker" + else: + host_config["networks"] = {"sagemaker-local": {"aliases": [host]}} + # for GPU support pass in nvidia as the runtime, this is equivalent # to setting --runtime=nvidia in the docker commandline. if self.instance_type == "local_gpu": @@ -834,7 +861,7 @@ def _create_docker_host( "resources": {"reservations": {"devices": [{"capabilities": ["gpu"]}]}} } - if command == "serve": + if not self.is_studio and command == "serve": serving_port = ( sagemaker.utils.get_config_value( "local.serving_port", self.sagemaker_session.config @@ -910,7 +937,7 @@ def __init__(self, command): """Creates a new threaded hosting container. Args: - command: + command (dict): docker compose command """ Thread.__init__(self) self.command = command @@ -987,8 +1014,8 @@ def _stream_output(process): sys.stdout.write(stdout) exit_code = process.poll() - if exit_code != 0: - raise RuntimeError("Process exited with code: %s" % exit_code) + if exit_code not in [0, 130]: + raise RuntimeError(f"Failed to run: {process.args}. Process exited with code: {exit_code}") return exit_code diff --git a/src/sagemaker/local/utils.py b/src/sagemaker/local/utils.py index 16375de7d4..950d0974db 100644 --- a/src/sagemaker/local/utils.py +++ b/src/sagemaker/local/utils.py @@ -28,6 +28,8 @@ logger = logging.getLogger(__name__) +STUDIO_APP_TYPES = ["KernelGateway", "CodeEditor", "JupyterLab"] + def copy_directory_structure(destination_directory, relative_path): """Creates intermediate directory structure for relative_path. @@ -216,3 +218,29 @@ def get_using_dot_notation(dictionary, keys): return get_using_dot_notation(inner_dict, rest) except (KeyError, IndexError, TypeError): raise ValueError(f"{keys} does not exist in input dictionary.") + + +def check_for_studio(): + """Helper function to determine if the run environment is studio. + + Returns (bool): Returns True if valid Studio request. + + Raises: + NotImplementedError: + if run environment = Studio and AppType not in STUDIO_APP_TYPES + """ + is_studio = False + if os.path.exists("/opt/ml/metadata/resource-metadata.json"): + with open("/opt/ml/metadata/resource-metadata.json", "r") as handle: + metadata = json.load(handle) + app_type = metadata.get("AppType") + if app_type: + # check if the execution is triggered from Studio KernelGateway App + if app_type in STUDIO_APP_TYPES: + is_studio = True + else: + raise NotImplementedError( + f"AppType {app_type} in Studio does not support Local Mode." + ) + # if no apptype, case of classic notebooks + return is_studio diff --git a/tests/unit/sagemaker/local/test_local_image.py b/tests/unit/sagemaker/local/test_local_image.py index 22a565b306..ebca91a9f9 100644 --- a/tests/unit/sagemaker/local/test_local_image.py +++ b/tests/unit/sagemaker/local/test_local_image.py @@ -27,9 +27,10 @@ import pytest import yaml -from mock import patch, Mock, MagicMock +from mock import patch, Mock, MagicMock, mock_open, call import sagemaker from sagemaker.local.image import _SageMakerContainer, _Volume, _aws_credentials +from sagemaker.local.utils import STUDIO_APP_TYPES REGION = "us-west-2" BUCKET_NAME = "mybucket" @@ -75,6 +76,22 @@ ENVIRONMENT = {"MYVAR": "HELLO_WORLD"} +LOCAL_STUDIO_METADATA_BASE = '{{"AppType":"{app_type}","DomainId":"d-1234567890","UserProfileName": \ + "dummy-profile","ResourceArn":"arn:aws:sagemaker:us-west-2:123456789012:app/arn", \ + "ResourceName":"datascience-1-0-ml-t3-medium-1234567890"}}' + +LOCAL_STUDIO_METADATA_WITH_SPACE = '{"AppType":"KernelGateway","DomainId":"d-1234567890","SpaceName": \ + "dummy-space","ResourceArn":"arn:aws:sagemaker:us-west-2:123456789012:app/arn", \ + "ResourceName":"datascience-1-0-ml-t3-medium-1234567890"}' + +DUMMY_APPTYPE_METADATA = '{"AppType":"DUMMY"}' + +LOCAL_STUDIO_INCOMPLETE_METADATA = '{"AppType":"KernelGateway"}' + +CLASSIC_METADATA = '{"ResourceArn": \ + "arn:aws:sagemaker:us-west-2:616250812882:notebook-instance/test", \ + "ResourceName": "test"}' + @pytest.fixture() def sagemaker_session(): @@ -90,6 +107,49 @@ def sagemaker_session(): return sms +@patch("os.path.exists", return_value=True) +def test_check_for_studio(patch_os_exists, sagemaker_session): + for app_type in STUDIO_APP_TYPES: + metadata = LOCAL_STUDIO_METADATA_BASE.format(app_type=app_type) + print(metadata) + with patch("sagemaker.local.utils.open", mock_open(read_data=metadata)): + with pytest.raises( + NotImplementedError, + match="Multi instance Local Mode execution is currently not supported in SageMaker Studio.", + ): + _SageMakerContainer("local", 2, "my-image", sagemaker_session=sagemaker_session) + + sagemaker_container = _SageMakerContainer( + "local", 1, "my-image", sagemaker_session=sagemaker_session + ) + assert sagemaker_container.is_studio + + with patch("sagemaker.local.utils.open", mock_open(read_data=LOCAL_STUDIO_METADATA_WITH_SPACE)): + with pytest.raises( + NotImplementedError, + match="Multi instance Local Mode execution is currently not supported in SageMaker Studio.", + ): + _SageMakerContainer("local", 2, "my-image", sagemaker_session=sagemaker_session) + + sagemaker_container = _SageMakerContainer( + "local", 1, "my-image", sagemaker_session=sagemaker_session + ) + assert sagemaker_container.is_studio + + with patch("sagemaker.local.utils.open", mock_open(read_data=CLASSIC_METADATA)): + sagemaker_container = _SageMakerContainer( + "local", 1, "my-image", sagemaker_session=sagemaker_session + ) + assert not sagemaker_container.is_studio + + with patch("sagemaker.local.utils.open", mock_open(read_data=DUMMY_APPTYPE_METADATA)): + with pytest.raises( + NotImplementedError, + match="AppType DUMMY in Studio does not support Local Mode.", + ): + _SageMakerContainer("local", 2, "my-image", sagemaker_session=sagemaker_session) + + @patch("subprocess.check_output", Mock(return_value="Docker Compose version v2.0.0-rc.3")) def test_get_compose_cmd_prefix_with_docker_cli(): compose_cmd_prefix = _SageMakerContainer._get_compose_cmd_prefix() @@ -432,6 +492,87 @@ def test_train( assert "[Masked]" in caplog.text +@patch("sagemaker.local.local_session.LocalSession", Mock()) +@patch("sagemaker.local.image._stream_output", Mock()) +@patch("sagemaker.local.image._SageMakerContainer._cleanup") +@patch("sagemaker.local.image._SageMakerContainer.retrieve_artifacts") +@patch( + "sagemaker.local.image._SageMakerContainer._get_compose_cmd_prefix", + Mock(return_value=["docker-compose"]), +) +@patch("sagemaker.local.data.get_data_source_instance") +@patch("subprocess.Popen") +def test_train_for_studio( + popen, get_data_source_instance, retrieve_artifacts, cleanup, tmpdir, sagemaker_session, caplog +): + data_source = Mock() + data_source.get_root_dir.return_value = "foo" + get_data_source_instance.return_value = data_source + + caplog.set_level(logging.INFO) + + directories = [str(tmpdir.mkdir("container-root")), str(tmpdir.mkdir("data"))] + with patch( + "sagemaker.local.image._SageMakerContainer._create_tmp_folder", side_effect=directories + ): + instance_count = 1 + image = "my-image" + metadata = LOCAL_STUDIO_METADATA_BASE.format(app_type="KernelGateway") + with patch("sagemaker.local.utils.open", mock_open(read_data=metadata)): + with patch("os.path.exists", return_value=True): + sagemaker_container = _SageMakerContainer( + "local", instance_count, image, sagemaker_session=sagemaker_session + ) + + sagemaker_container.train( + INPUT_DATA_CONFIG, + OUTPUT_DATA_CONFIG, + HYPERPARAMETERS, + ENVIRONMENT, + TRAINING_JOB_NAME, + ) + + docker_compose_file = os.path.join( + sagemaker_container.container_root, "docker-compose.yaml" + ) + + expected_up_cmd = [ + "docker-compose", + "-f", + docker_compose_file, + "up", + "--build", + "--abort-on-container-exit", + ] + + popen.assert_has_calls( + [ + call(expected_up_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT), + ] + ) + + with open(docker_compose_file, "r") as f: + config = yaml.load(f, Loader=yaml.SafeLoader) + assert len(config["services"]) == instance_count + for h in sagemaker_container.hosts: + assert config["services"][h]["image"] == image + assert config["services"][h]["command"] == "train" + assert ( + "TRAINING_JOB_NAME={}".format(TRAINING_JOB_NAME) + in config["services"][h]["environment"] + ) + assert "SM_STUDIO_LOCAL_MODE=True" in config["services"][h]["environment"] + assert config["services"][h]["network_mode"] == "sagemaker" + + # assert that expected by sagemaker container output directories exist + assert os.path.exists(os.path.join(sagemaker_container.container_root, "output")) + assert os.path.exists(os.path.join(sagemaker_container.container_root, "output/data")) + + retrieve_artifacts.assert_called_once() + cleanup.assert_called_once() + assert "[Masked]" in caplog.text + + @patch("sagemaker.local.local_session.LocalSession", Mock()) @patch("sagemaker.local.image._stream_output", Mock()) @patch("sagemaker.local.image._SageMakerContainer._cleanup") @@ -563,7 +704,12 @@ def test_train_with_hyperparameters_without_job_name( @patch("sagemaker.local.data.get_data_source_instance") @patch("subprocess.Popen", Mock()) def test_train_error( - get_data_source_instance, retrieve_artifacts, cleanup, _stream_output, tmpdir, sagemaker_session + get_data_source_instance, + retrieve_artifacts, + cleanup, + _stream_output, + tmpdir, + sagemaker_session, ): data_source = Mock() data_source.get_root_dir.return_value = "foo" @@ -787,6 +933,63 @@ def test_serve(tmpdir, sagemaker_session, caplog): assert "[Masked]" in caplog.text +@patch("sagemaker.local.image._stream_output", Mock()) +@patch("sagemaker.local.image._SageMakerContainer._prepare_serving_volumes", Mock(return_value=[])) +@patch("shutil.copy", Mock()) +@patch("shutil.copytree", Mock()) +@patch( + "sagemaker.local.image._SageMakerContainer._get_compose_cmd_prefix", + Mock(return_value=["docker-compose"]), +) +@patch("subprocess.Popen") +def test_serve_for_studio(popen, tmpdir, sagemaker_session, caplog): + caplog.set_level(logging.INFO) + with patch( + "sagemaker.local.image._SageMakerContainer._create_tmp_folder", + return_value=str(tmpdir.mkdir("container-root")), + ): + instance_count = 1 + image = "my-image" + metadata = LOCAL_STUDIO_METADATA_BASE.format(app_type="KernelGateway") + with patch("sagemaker.local.utils.open", mock_open(read_data=metadata)): + with patch("os.path.exists", return_value=True): + sagemaker_container = _SageMakerContainer( + "local", instance_count, image, sagemaker_session=sagemaker_session + ) + + environment = {"env1": 1, "env2": "b", "SAGEMAKER_SUBMIT_DIRECTORY": "s3://some/path"} + + sagemaker_container.serve("/some/model/path", environment) + docker_compose_file = os.path.join( + sagemaker_container.container_root, "docker-compose.yaml" + ) + + expected_up_cmd = [ + "docker-compose", + "-f", + docker_compose_file, + "up", + "--build", + "--abort-on-container-exit", + ] + + popen.assert_has_calls( + [ + call(expected_up_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE), + ] + ) + + with open(docker_compose_file, "r") as f: + config = yaml.load(f, Loader=yaml.SafeLoader) + + for h in sagemaker_container.hosts: + assert config["services"][h]["image"] == image + assert config["services"][h]["command"] == "serve" + assert "SM_STUDIO_LOCAL_MODE=True" in config["services"][h]["environment"] + assert config["services"][h]["network_mode"] == "sagemaker" + assert "[Masked]" in caplog.text + + @patch("sagemaker.local.image._HostingContainer.run", Mock()) @patch("sagemaker.local.image._SageMakerContainer._prepare_serving_volumes", Mock(return_value=[])) @patch("shutil.copy", Mock()) diff --git a/tests/unit/sagemaker/local/test_local_utils.py b/tests/unit/sagemaker/local/test_local_utils.py index 39b9e2b392..42710b2495 100644 --- a/tests/unit/sagemaker/local/test_local_utils.py +++ b/tests/unit/sagemaker/local/test_local_utils.py @@ -15,7 +15,8 @@ import os import errno import pytest -from mock import patch, Mock +import json +from mock import patch, Mock, mock_open import sagemaker.local.utils from sagemaker.session_settings import SessionSettings @@ -198,3 +199,27 @@ def test_move_to_destination_local_root_failure(recursive_copy, mock_rmtree): recursive_copy.assert_called_with( "/tmp/data", os.path.abspath(os.path.join(os.sep, "target", "dir")) ) + + +def test_check_for_studio_with_valid_request(): + metadata = {"AppType": "KernelGateway"} + with patch("builtins.open", mock_open(read_data=json.dumps(metadata))): + with patch("os.path.exists", return_value=True): + is_studio = sagemaker.local.utils.check_for_studio() + assert is_studio is True + + +def test_check_for_studio_with_invalid_request(): + metadata = {"AppType": "DUMMY"} + with patch("builtins.open", mock_open(read_data=json.dumps(metadata))): + with patch("os.path.exists", return_value=True): + with pytest.raises(NotImplementedError): + sagemaker.local.utils.check_for_studio() + + +def test_check_for_studio_without_app_type(): + metadata = {} + with patch("builtins.open", mock_open(read_data=json.dumps(metadata))): + with patch("os.path.exists", return_value=True): + is_studio = sagemaker.local.utils.check_for_studio() + assert is_studio is False