Skip to content

Commit

Permalink
One-line app registration and model deployment (#223)
Browse files Browse the repository at this point in the history
* Easy registration + model deployment management library support

* Reformatted

* Fixup

* Addressed comments and added function documentation

* Addressed comments
  • Loading branch information
nishadsingh1 authored and dcrankshaw committed Jun 23, 2017
1 parent 7811207 commit 06e2a7c
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 8 deletions.
129 changes: 127 additions & 2 deletions clipper_admin/clipper_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
CLIPPER_DOCKER_LABEL = "ai.clipper.container.label"
CLIPPER_MODEL_CONTAINER_LABEL = "ai.clipper.model_container.model_version"

DEFAULT_MODEL_VERSION = 1
DEFAULT_DEFAULT_OUTPUT = "None"
DEFAULT_SLO_MICROS = 100000
DEFAULT_LABEL = ["DEFAULT"]

aws_cli_config = """
Expand Down Expand Up @@ -321,7 +324,7 @@ def start(self):

def register_application(self, name, model, input_type, default_output,
slo_micros):
"""Register a new Clipper application.
"""Register a new Clipper application and returns the response object.
Parameters
----------
Expand All @@ -345,6 +348,12 @@ def register_application(self, name, model, input_type, default_output,
the objective not be set aggressively low unless absolutely necessary.
40000 (40ms) is a good starting value, but the optimal latency objective
will vary depending on the application.
Returns
-------
bool
Returns true iff the app registration request was successful
"""
url = "http://%s:%d/admin/add_app" % (self.host,
CLIPPER_MANAGEMENT_PORT)
Expand All @@ -358,6 +367,7 @@ def register_application(self, name, model, input_type, default_output,
headers = {'Content-type': 'application/json'}
r = requests.post(url, headers=headers, data=req_json)
print(r.text)
return r.status_code == requests.codes.ok

def get_all_apps(self, verbose=False):
"""Gets information about all applications registered with Clipper.
Expand Down Expand Up @@ -689,7 +699,7 @@ def deploy_predict_function(self,
input_type : str
One of "integers", "floats", "doubles", "bytes", or "strings".
labels : list of str, optional
A list of strings annotating the model
A list of strings annotating the model.
num_containers : int, optional
The number of replicas of the model to create. More replicas can be
created later as well. Defaults to 1.
Expand Down Expand Up @@ -735,6 +745,121 @@ def centered_predict(inputs):

return deploy_result

def _register_app_and_check_success(self, name, input_type, default_output,
slo_micros):
if self.register_application(name, name, input_type, default_output,
slo_micros):
print("Application registration sucessful! Deploying model.")
return True
print("Application registration unsuccessful. Will not deploy model.")
return False

def register_app_and_deploy_predict_function(
self,
name,
predict_function,
input_type,
default_output=DEFAULT_DEFAULT_OUTPUT,
model_version=DEFAULT_MODEL_VERSION,
slo_micros=DEFAULT_SLO_MICROS,
labels=DEFAULT_LABEL,
num_containers=1):
"""Registers an app and deploys provided predict function as a model.
Parameters
----------
name : str
The to be assigned to the registered app and deployed model.
predict_function : function
The prediction function. Any state associated with the function should be
captured via closure capture.
input_type : str
The input_type to be associated with the registered app and deployed model.
One of "integers", "floats", "doubles", "bytes", or "strings".
default_output : string, optional
The default prediction to use if the model does not return a prediction
by the end of the latency objective.
model_version : Any object with a string representation (with __str__ implementation), optional
The version to assign the deployed model.
slo_micros : int
The query latency objective for the application in microseconds.
This is the processing latency between Clipper receiving a request
and sending a response. It does not account for network latencies
before a request is received or after a response is sent.
labels : list of str, optional
A list of strings annotating the model.
num_containers : int, optional
The number of replicas of the model to create. More replicas can be
created later as well.
"""
if not self._register_app_and_check_success(
name, input_type, default_output, slo_micros):
return False

return self.deploy_predict_function(name, model_version,
predict_function, input_type,
labels, num_containers)

def register_app_and_deploy_pyspark_model(
self,
name,
predict_function,
pyspark_model,
sc,
input_type,
default_output=DEFAULT_DEFAULT_OUTPUT,
model_version=DEFAULT_MODEL_VERSION,
slo_micros=DEFAULT_SLO_MICROS,
labels=DEFAULT_LABEL,
num_containers=1):
"""Registers an app and deploys provided spark model.
Parameters
----------
name : str
The to be assigned to the registered app and deployed model.
predict_function : function
A function that takes three arguments, a SparkContext, the ``model`` parameter and
a list of inputs of the type specified by the ``input_type`` argument.
Any state associated with the function other than the Spark model should
be captured via closure capture. Note that the function must not capture
the SparkContext or the model implicitly, as these objects are not pickleable
and therefore will prevent the ``predict_function`` from being serialized.
pyspark_model : pyspark.mllib.util.Saveable
An object that mixes in the pyspark Saveable mixin. Generally this
is either an mllib model or transformer. This model will be loaded
into the Clipper model container and provided as an argument to the
predict function each time it is called.
sc : SparkContext
The SparkContext associated with the model. This is needed
to save the model for pyspark.mllib models.
input_type : str
The input_type to be associated with the registered app and deployed model.
One of "integers", "floats", "doubles", "bytes", or "strings".
default_output : string, optional
The default prediction to use if the model does not return a prediction
by the end of the latency objective.
model_version : Any object with a string representation (with __str__ implementation), optional
The version to assign the deployed model.
slo_micros : int, optional
The query latency objective for the application in microseconds.
This is the processing latency between Clipper receiving a request
and sending a response. It does not account for network latencies
before a request is received or after a response is sent.
labels : list of str, optional
A list of strings annotating the model.
num_containers : int, optional
The number of replicas of the model to create. More replicas can be
created later as well.
"""
if not self._register_app_and_check_success(
name, input_type, default_output, slo_micros):
return False

return self.deploy_pyspark_model(name, model_version, predict_function,
pyspark_model, sc, input_type, labels,
num_containers)

def get_all_models(self, verbose=False):
"""Gets information about all models registered with Clipper.
Expand Down
28 changes: 25 additions & 3 deletions integration-tests/clipper_manager_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from argparse import ArgumentParser
cur_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.abspath('%s/../' % cur_dir))
from clipper_admin.clipper_manager import Clipper
import clipper_admin.clipper_manager
Clipper = clipper_admin.clipper_manager.Clipper
import random
import socket
"""
Expand Down Expand Up @@ -214,6 +215,25 @@ def test_predict_function_deploys_successfully(self):
self.assertIsNotNone(running_containers_output)
self.assertGreaterEqual(len(running_containers_output), 1)

def test_register_app_and_deploy_predict_function_is_successful(self):
model_version = 1
app_and_model_name = "easy_register_app_model"
predict_func = lambda inputs: ["0" for x in inputs]
input_type = "doubles"

result = self.clipper_inst.register_app_and_deploy_predict_function(
app_and_model_name, predict_func, input_type)

self.assertTrue(result)
model_info = self.clipper_inst.get_model_info(
app_and_model_name,
clipper_admin.clipper_manager.DEFAULT_MODEL_VERSION)
self.assertIsNotNone(model_info)
running_containers_output = self.clipper_inst._execute_standard(
"docker ps -q --filter \"ancestor=clipper/python-container\"")
self.assertIsNotNone(running_containers_output)
self.assertGreaterEqual(len(running_containers_output), 2)


class ClipperManagerTestCaseLong(unittest.TestCase):
@classmethod
Expand Down Expand Up @@ -299,12 +319,14 @@ def test_deployed_predict_function_queried_successfully(self):
'get_app_info_for_registered_app_returns_info_dictionary',
'get_app_info_for_nonexistent_app_returns_none',
'test_add_container_for_external_model_fails',
'test_model_version_sets_correctly', 'test_get_logs_creates_log_files',
'test_model_version_sets_correctly',
'test_get_logs_creates_log_files',
'test_inspect_instance_returns_json_dict',
'test_model_deploys_successfully',
'test_add_container_for_deployed_model_succeeds',
'test_remove_inactive_containers_succeeds',
'test_predict_function_deploys_successfully'
'test_predict_function_deploys_successfully',
'test_register_app_and_deploy_predict_function_is_successful',
]

LONG_TEST_ORDERING = [
Expand Down
18 changes: 15 additions & 3 deletions integration-tests/deploy_pyspark_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,16 @@ def predict(spark, model, xs):
def deploy_and_test_model(sc, clipper, model, version):
clipper.deploy_pyspark_model(model_name, version, predict, model, sc,
"ints")
_test_deployed_model(app_name, version)


def _test_deployed_model(app, version):
time.sleep(25)
num_preds = 25
num_defaults = 0
for i in range(num_preds):
response = requests.post(
"http://localhost:1337/%s/predict" % app_name,
"http://localhost:1337/%s/predict" % app,
headers=headers,
data=json.dumps({
'input': get_test_point()
Expand All @@ -109,7 +113,7 @@ def deploy_and_test_model(sc, clipper, model, version):
num_preds))
if num_defaults > num_preds / 2:
raise BenchmarkException("Error querying APP %s, MODEL %s:%d" %
(app_name, model_name, version))
(app, model_name, version))


def train_logistic_regression(trainRDD):
Expand Down Expand Up @@ -144,7 +148,8 @@ def get_test_point():
lambda line: parseData(line, objective, pos_label)).cache()

try:
clipper.register_application(app_name, model_name, "ints",
input_type = "ints"
clipper.register_application(app_name, model_name, input_type,
"default_pred", 100000)
time.sleep(1)
response = requests.post(
Expand All @@ -169,6 +174,13 @@ def get_test_point():
version += 1
rf_model = train_random_forest(trainRDD, 20, 16)
deploy_and_test_model(sc, clipper, svm_model, version)

version += 1
app_and_model_name = "easy_register_app_model"
clipper.register_app_and_deploy_pyspark_model(
app_and_model_name, predict, lr_model, sc, input_type)
_test_deployed_model(app_and_model_name, version)

except BenchmarkException as e:
print(e)
clipper.stop_all()
Expand Down

0 comments on commit 06e2a7c

Please sign in to comment.