Skip to content

Commit

Permalink
sagemaker endpoint: enhance integ test coverage for failure conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
surajkota committed Mar 25, 2021
1 parent 9d9f86b commit d3d9ed8
Show file tree
Hide file tree
Showing 14 changed files with 621 additions and 318 deletions.
29 changes: 28 additions & 1 deletion test/e2e/common/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,31 @@ def duplicate_s3_contents(source_bucket: object, destination_bucket: object):
destination_bucket.copy({
"Bucket": source_object.bucket_name,
"Key": source_object.key,
}, source_object.key)
}, source_object.key)

def copy_s3_object(bucket_name: str, copy_source: object, key: str):
"""
Copy an S3 object. Check the following API documentation for input format of the arguments
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.copy
"""
region = get_aws_region()
bucket = boto3.resource("s3", region_name=region).Bucket(bucket_name)
bucket.copy(copy_source, key)

def delete_s3_object(bucket_name: str, key: str):
"""
Delete an S3 object. Check the following API documentation for input format of the arguments
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.delete_objects
"""
region = get_aws_region()
bucket = boto3.resource("s3", region_name=region).Bucket(bucket_name)
bucket.delete_objects(
Delete={
"Objects": [
{
"Key": key,
},
],
"Quiet": False,
},
)
177 changes: 126 additions & 51 deletions test/e2e/common/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from common.resources import load_resource_file

_k8s_api_client = None


Expand Down Expand Up @@ -53,6 +55,78 @@ def to_short_resource_string(self):
def to_long_resource_string(self):
return f"{self.plural}.{self.version}.{self.group}/{self._printable_namespace}:{self.name}"

def load_resource(service_name: str,
spec_file: str,
replacements: object):
"""
Load a yaml spec to memory from root_test_path/{service}/resources and replace the values in replacement dict
:param service_name: name of service
:param spec_file: Name of the spec file under resources directory of the service
:param replacements: A dictionary of values to be replaced
:return: spec as json object
"""
spec = load_resource_file(
service_name, spec_file, additional_replacements=replacements
)
logging.debug(f"loaded spec: {spec}")
return spec

def create_reference(crd_group: str,
crd_version: str,
resource_plural: str,
resource_name: str,
namespace: str):
"""
Create an instance of CustomResourceReference based on the parameters
:param crd_group: CRD Group
:param crd_version: CRD version
:param resource_plural: resource plural
:param resource_name: name of resource to be created in cluster
:param namespace: namespace in which resource should be created
:return: an instance of CustomResourceReference
"""
reference = CustomResourceReference(
crd_group, crd_version, resource_plural, resource_name, namespace=namespace
)
return reference

def create_resource(reference: CustomResourceReference,
spec: object):
"""
Create a resource from the reference and wait to be consumed by controller
:param reference: instance of CustomResourceReference which needs to be created
:param spec: spec of the resource corresponding to the reference
:return: resource if it was created successfully, otherwise None
"""
resource = create_custom_resource(reference, spec)
resource = wait_resource_consumed_by_controller(reference)
return resource

def load_and_create_resource(service_name: str,
crd_group: str,
crd_version: str,
resource_plural: str,
resource_name: str,
spec_file_name: str,
replacements: object,
namespace: str = "default"):
"""
Helper method to encapsulate the common methods used to create a resource.
Load a spec file from disk, create an instance of CustomResourceReference and resource in K8s cluster.
See respective methods for paramater definitions and return types
:returns: an instance of CustomResourceReference, spec loaded from disk, resource created from the reference
"""
spec = load_resource(service_name, spec_file_name, replacements)
reference = create_reference(crd_group, crd_version, resource_plural, resource_name, namespace)
resource = create_resource(reference, spec)
return reference, spec, resource

def _get_k8s_api_client() -> ApiClient:
global _k8s_api_client
Expand Down Expand Up @@ -167,22 +241,6 @@ def wait_resource_consumed_by_controller(
f"Wait for resource {reference} to be consumed by controller timed out")
return None

def _get_terminal_condition(resource: object) -> Union[None, bool]:
"""Get the .status.ACK.Terminal boolean from a given resource.
Returns:
None or bool: None if the status field doesn't exist, otherwise the
field value cast to a boolean (default False).
"""
if 'status' not in resource or 'conditions' not in resource['status']:
return None

conditions: Dict = resource['status']['conditions']
if 'ACK' not in conditions or 'Terminal' not in conditions['ACK']:
return None

terminal: Dict = conditions['ACK']['Terminal']
return bool(terminal.get('status', False))

def get_resource_arn(resource: object) -> Union[None, str]:
"""Get the .status.ackResourceMetadata.arn value from a given resource.
Expand Down Expand Up @@ -253,54 +311,71 @@ def wait_on_condition(reference: CustomResourceReference,
logging.error(f"Resource {reference} does not exist")
return False

desired_condition = None
for i in range(wait_periods):
logging.debug(f"Waiting on condition {condition_name} to reach {desired_condition_status} for resource {reference} ({i})")
resource = get_resource(reference)

if 'conditions' not in resource['status']:
logging.error(f"Resource {reference} does not have a .status.conditions field.")
return False

desired_condition = None
for condition in resource['status']['conditions']:
if condition['type'] == condition_name:
desired_condition = condition

if not desired_condition:
logging.error(f"Resource {reference} does not have a condition of type {condition_name}.")
return False
else:
if desired_condition['status'] == desired_condition_status:
logging.info(f"Condition {condition_name} has status {desired_condition_status}, continuing...")
return True
desired_condition = get_resource_condition(reference, condition_name)
if desired_condition is not None and desired_condition['status'] == desired_condition_status:
logging.info(f"Condition {condition_name} has status {desired_condition_status}, continuing...")
return True

sleep(period_length)

logging.error(f"Wait for condition {condition_name} to reach status {desired_condition_status} timed out")
if not desired_condition:
logging.error(f"Resource {reference} does not have a condition of type {condition_name}.")
else:
logging.error(f"Wait for condition {condition_name} to reach status {desired_condition_status} timed out")
return False

def is_resource_in_terminal_condition(
reference: CustomResourceReference, expected_substring: str):
def get_resource_condition(reference: CustomResourceReference, condition_name: str):
"""
Returns the required condition from .status.conditions
Precondition:
resource must exist in the cluster
Returns:
condition json if it exists. None otherwise
"""
if not get_resource_exists(reference):
logging.error(f"Resource {reference} does not exist")
return False
return None

resource = get_resource(reference)
terminal_status = _get_terminal_condition(resource)
# Ensure the status existed
if terminal_status is None:
logging.error(f"Expected .ACK.Terminal to exist in {reference}")
return False
if 'status' not in resource or 'conditions' not in resource['status']:
logging.error(f"Resource {reference} does not have a .status.conditions field.")
return None

if not terminal_status:
logging.error(
f"Expected terminal condition for resource {reference} to be true")
return False
for condition in resource['status']['conditions']:
if condition['type'] == condition_name:
return condition

return None

def assert_condition_state_message(reference: CustomResourceReference,
condition_name: str,
desired_condition_status: str,
desired_condition_message: Union[None, str]):
"""
Helper method to check the state and message of a condition on resource.
Caller can pass None for desired_condition_message if expected message is nil
terminal_message = terminal.get('message', None)
if terminal_message != expected_substring:
logging.error(f"Resource {reference} has terminal condition set True, but with a different message than expected."
f" Expected '{expected_substring}', found '{terminal_message}'")
Returns:
bool: True if condition exists and both the status and message match the desired values
"""
condition = get_resource_condition(reference, condition_name)
# Ensure the status existed
if condition is None:
logging.error(f"Resource {reference} does not have a condition of type {condition_name}")
return False

return True
current_condition_status = condition.get('status', None)
current_condition_message = condition.get('message', None)
if current_condition_status == desired_condition_status and current_condition_message == desired_condition_message:
logging.info(f"Condition {condition_name} has status {desired_condition_status} and message {desired_condition_message}, continuing...")
return True

logging.error(f"Resource {reference} has {condition_name} set {current_condition_status}, expected {desired_condition_status}; with message"
f" {current_condition_message}, expected {desired_condition_message}")
return False
31 changes: 27 additions & 4 deletions test/e2e/sagemaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,45 @@
# not use this file except in compliance with the License. A copy of the
# License is located at
#
# http://aws.amazon.com/apache2.0/
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

import pytest
import logging
from common import k8s

SERVICE_NAME = "sagemaker"
CRD_GROUP = "sagemaker.services.k8s.aws"
CRD_VERSION = "v1alpha1"

CONFIG_RESOURCE_PLURAL = 'endpointconfigs'
MODEL_RESOURCE_PLURAL = 'models'
ENDPOINT_RESOURCE_PLURAL = 'endpoints'
CONFIG_RESOURCE_PLURAL = "endpointconfigs"
MODEL_RESOURCE_PLURAL = "models"
ENDPOINT_RESOURCE_PLURAL = "endpoints"

# PyTest marker for the current service
service_marker = pytest.mark.service(arg=SERVICE_NAME)


def create_sagemaker_resource(
resource_plural, resource_name, spec_file, replacements, namespace="default"
):
"""
Wrapper around k8s.load_and_create_resource to create a SageMaker resource
"""

reference, spec, resource = k8s.load_and_create_resource(
SERVICE_NAME,
CRD_GROUP,
CRD_VERSION,
resource_plural,
resource_name,
spec_file,
replacements,
namespace,
)

return reference, spec, resource
18 changes: 18 additions & 0 deletions test/e2e/sagemaker/resources/endpoint_config_multi_variant.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: sagemaker.services.k8s.aws/v1alpha1
kind: EndpointConfig
metadata:
name: $CONFIG_NAME
spec:
endpointConfigName: $CONFIG_NAME
productionVariants:
- variantName: variant-1
modelName: $MODEL_NAME
initialInstanceCount: 1
# This is the smallest instance type which will support scaling
instanceType: ml.c5.large
initialVariantWeight: 1
- variantName: variant-2
modelName: $MODEL_NAME
initialInstanceCount: 1
instanceType: ml.c5.large
initialVariantWeight: 1
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ spec:
productionVariants:
- variantName: variant-1
modelName: $MODEL_NAME
initialInstanceCount: 1
# instanceCount is 2 to test retainAllVariantProperties
initialInstanceCount: 2
# This is the smallest instance type which will support scaling
instanceType: ml.c5.large
initialVariantWeight: 1
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/sagemaker/resources/xgboost_model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ spec:
containerHostname: xgboost
modelDataURL: s3://$SAGEMAKER_DATA_BUCKET/sagemaker/model/xgboost-mnist-model.tar.gz
image: $XGBOOST_IMAGE_URI
executionRoleARN: $SAGEMAKER_EXECUTION_ROLE_ARN
tags:
- key: key
value: value
environment:
my_var: my_value
my_var2: my_value2
executionRoleARN: $SAGEMAKER_EXECUTION_ROLE_ARN
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: sagemaker.services.k8s.aws/v1alpha1
kind: Model
metadata:
name: $MODEL_NAME
spec:
modelName: $MODEL_NAME
containers:
- containerHostname: xgboost
modelDataURL: $MODEL_LOCATION
image: $XGBOOST_IMAGE_URI
imageConfig:
repositoryAccessMode: Platform
environment:
my_var: my_value
my_var2: my_value2
executionRoleARN: $SAGEMAKER_EXECUTION_ROLE_ARN
5 changes: 1 addition & 4 deletions test/e2e/sagemaker/resources/xgboost_trainingjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,4 @@ spec:
s3URI: s3://$SAGEMAKER_DATA_BUCKET/sagemaker/training/validation
s3DataDistributionType: FullyReplicated
contentType: text/csv
compressionType: None
tags:
- key: key
value: value
compressionType: None
Loading

0 comments on commit d3d9ed8

Please sign in to comment.