diff --git a/clipper_admin/clipper_manager.py b/clipper_admin/clipper_manager.py index d4cfc2386..cf0afefae 100644 --- a/clipper_admin/clipper_manager.py +++ b/clipper_admin/clipper_manager.py @@ -44,6 +44,9 @@ CLIPPER_DOCKER_LABEL = "ai.clipper.container.label" CLIPPER_MODEL_CONTAINER_LABEL = "ai.clipper.model_container.model_version" +DEFAULT_MODEL_VERSION = 1 +DEFAULT_DEFAULT_OUTPUT = "None" +DEFAULT_SLO_MICROS = 100000 DEFAULT_LABEL = ["DEFAULT"] aws_cli_config = """ @@ -321,7 +324,7 @@ def start(self): def register_application(self, name, model, input_type, default_output, slo_micros): - """Register a new Clipper application. + """Register a new Clipper application and returns the response object. Parameters ---------- @@ -345,6 +348,12 @@ def register_application(self, name, model, input_type, default_output, the objective not be set aggressively low unless absolutely necessary. 40000 (40ms) is a good starting value, but the optimal latency objective will vary depending on the application. + + Returns + ------- + bool + Returns true iff the app registration request was successful + """ url = "http://%s:%d/admin/add_app" % (self.host, CLIPPER_MANAGEMENT_PORT) @@ -358,6 +367,7 @@ def register_application(self, name, model, input_type, default_output, headers = {'Content-type': 'application/json'} r = requests.post(url, headers=headers, data=req_json) print(r.text) + return r.status_code == requests.codes.ok def get_all_apps(self, verbose=False): """Gets information about all applications registered with Clipper. @@ -689,7 +699,7 @@ def deploy_predict_function(self, input_type : str One of "integers", "floats", "doubles", "bytes", or "strings". labels : list of str, optional - A list of strings annotating the model + A list of strings annotating the model. num_containers : int, optional The number of replicas of the model to create. More replicas can be created later as well. Defaults to 1. @@ -735,6 +745,121 @@ def centered_predict(inputs): return deploy_result + def _register_app_and_check_success(self, name, input_type, default_output, + slo_micros): + if self.register_application(name, name, input_type, default_output, + slo_micros): + print("Application registration sucessful! Deploying model.") + return True + print("Application registration unsuccessful. Will not deploy model.") + return False + + def register_app_and_deploy_predict_function( + self, + name, + predict_function, + input_type, + default_output=DEFAULT_DEFAULT_OUTPUT, + model_version=DEFAULT_MODEL_VERSION, + slo_micros=DEFAULT_SLO_MICROS, + labels=DEFAULT_LABEL, + num_containers=1): + """Registers an app and deploys provided predict function as a model. + + Parameters + ---------- + name : str + The to be assigned to the registered app and deployed model. + predict_function : function + The prediction function. Any state associated with the function should be + captured via closure capture. + input_type : str + The input_type to be associated with the registered app and deployed model. + One of "integers", "floats", "doubles", "bytes", or "strings". + default_output : string, optional + The default prediction to use if the model does not return a prediction + by the end of the latency objective. + model_version : Any object with a string representation (with __str__ implementation), optional + The version to assign the deployed model. + slo_micros : int + The query latency objective for the application in microseconds. + This is the processing latency between Clipper receiving a request + and sending a response. It does not account for network latencies + before a request is received or after a response is sent. + labels : list of str, optional + A list of strings annotating the model. + num_containers : int, optional + The number of replicas of the model to create. More replicas can be + created later as well. + """ + if not self._register_app_and_check_success( + name, input_type, default_output, slo_micros): + return False + + return self.deploy_predict_function(name, model_version, + predict_function, input_type, + labels, num_containers) + + def register_app_and_deploy_pyspark_model( + self, + name, + predict_function, + pyspark_model, + sc, + input_type, + default_output=DEFAULT_DEFAULT_OUTPUT, + model_version=DEFAULT_MODEL_VERSION, + slo_micros=DEFAULT_SLO_MICROS, + labels=DEFAULT_LABEL, + num_containers=1): + """Registers an app and deploys provided spark model. + + Parameters + ---------- + name : str + The to be assigned to the registered app and deployed model. + predict_function : function + A function that takes three arguments, a SparkContext, the ``model`` parameter and + a list of inputs of the type specified by the ``input_type`` argument. + Any state associated with the function other than the Spark model should + be captured via closure capture. Note that the function must not capture + the SparkContext or the model implicitly, as these objects are not pickleable + and therefore will prevent the ``predict_function`` from being serialized. + pyspark_model : pyspark.mllib.util.Saveable + An object that mixes in the pyspark Saveable mixin. Generally this + is either an mllib model or transformer. This model will be loaded + into the Clipper model container and provided as an argument to the + predict function each time it is called. + sc : SparkContext + The SparkContext associated with the model. This is needed + to save the model for pyspark.mllib models. + input_type : str + The input_type to be associated with the registered app and deployed model. + One of "integers", "floats", "doubles", "bytes", or "strings". + default_output : string, optional + The default prediction to use if the model does not return a prediction + by the end of the latency objective. + model_version : Any object with a string representation (with __str__ implementation), optional + The version to assign the deployed model. + slo_micros : int, optional + The query latency objective for the application in microseconds. + This is the processing latency between Clipper receiving a request + and sending a response. It does not account for network latencies + before a request is received or after a response is sent. + labels : list of str, optional + A list of strings annotating the model. + num_containers : int, optional + The number of replicas of the model to create. More replicas can be + created later as well. + """ + if not self._register_app_and_check_success( + name, input_type, default_output, slo_micros): + return False + + return self.deploy_pyspark_model(name, model_version, predict_function, + pyspark_model, sc, input_type, labels, + num_containers) + def get_all_models(self, verbose=False): """Gets information about all models registered with Clipper. diff --git a/integration-tests/clipper_manager_tests.py b/integration-tests/clipper_manager_tests.py index c74984455..2680dfc12 100644 --- a/integration-tests/clipper_manager_tests.py +++ b/integration-tests/clipper_manager_tests.py @@ -8,7 +8,8 @@ from argparse import ArgumentParser cur_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.abspath('%s/../' % cur_dir)) -from clipper_admin.clipper_manager import Clipper +import clipper_admin.clipper_manager +Clipper = clipper_admin.clipper_manager.Clipper import random import socket """ @@ -214,6 +215,25 @@ def test_predict_function_deploys_successfully(self): self.assertIsNotNone(running_containers_output) self.assertGreaterEqual(len(running_containers_output), 1) + def test_register_app_and_deploy_predict_function_is_successful(self): + model_version = 1 + app_and_model_name = "easy_register_app_model" + predict_func = lambda inputs: ["0" for x in inputs] + input_type = "doubles" + + result = self.clipper_inst.register_app_and_deploy_predict_function( + app_and_model_name, predict_func, input_type) + + self.assertTrue(result) + model_info = self.clipper_inst.get_model_info( + app_and_model_name, + clipper_admin.clipper_manager.DEFAULT_MODEL_VERSION) + self.assertIsNotNone(model_info) + running_containers_output = self.clipper_inst._execute_standard( + "docker ps -q --filter \"ancestor=clipper/python-container\"") + self.assertIsNotNone(running_containers_output) + self.assertGreaterEqual(len(running_containers_output), 2) + class ClipperManagerTestCaseLong(unittest.TestCase): @classmethod @@ -299,12 +319,14 @@ def test_deployed_predict_function_queried_successfully(self): 'get_app_info_for_registered_app_returns_info_dictionary', 'get_app_info_for_nonexistent_app_returns_none', 'test_add_container_for_external_model_fails', - 'test_model_version_sets_correctly', 'test_get_logs_creates_log_files', + 'test_model_version_sets_correctly', + 'test_get_logs_creates_log_files', 'test_inspect_instance_returns_json_dict', 'test_model_deploys_successfully', 'test_add_container_for_deployed_model_succeeds', 'test_remove_inactive_containers_succeeds', - 'test_predict_function_deploys_successfully' + 'test_predict_function_deploys_successfully', + 'test_register_app_and_deploy_predict_function_is_successful', ] LONG_TEST_ORDERING = [ diff --git a/integration-tests/deploy_pyspark_models.py b/integration-tests/deploy_pyspark_models.py index bd62dd784..d627993af 100644 --- a/integration-tests/deploy_pyspark_models.py +++ b/integration-tests/deploy_pyspark_models.py @@ -87,12 +87,16 @@ def predict(spark, model, xs): def deploy_and_test_model(sc, clipper, model, version): clipper.deploy_pyspark_model(model_name, version, predict, model, sc, "ints") + _test_deployed_model(app_name, version) + + +def _test_deployed_model(app, version): time.sleep(25) num_preds = 25 num_defaults = 0 for i in range(num_preds): response = requests.post( - "http://localhost:1337/%s/predict" % app_name, + "http://localhost:1337/%s/predict" % app, headers=headers, data=json.dumps({ 'input': get_test_point() @@ -109,7 +113,7 @@ def deploy_and_test_model(sc, clipper, model, version): num_preds)) if num_defaults > num_preds / 2: raise BenchmarkException("Error querying APP %s, MODEL %s:%d" % - (app_name, model_name, version)) + (app, model_name, version)) def train_logistic_regression(trainRDD): @@ -144,7 +148,8 @@ def get_test_point(): lambda line: parseData(line, objective, pos_label)).cache() try: - clipper.register_application(app_name, model_name, "ints", + input_type = "ints" + clipper.register_application(app_name, model_name, input_type, "default_pred", 100000) time.sleep(1) response = requests.post( @@ -169,6 +174,13 @@ def get_test_point(): version += 1 rf_model = train_random_forest(trainRDD, 20, 16) deploy_and_test_model(sc, clipper, svm_model, version) + + version += 1 + app_and_model_name = "easy_register_app_model" + clipper.register_app_and_deploy_pyspark_model( + app_and_model_name, predict, lr_model, sc, input_type) + _test_deployed_model(app_and_model_name, version) + except BenchmarkException as e: print(e) clipper.stop_all()