diff --git a/bin/build_docker_images.sh b/bin/build_docker_images.sh index aeae1ada5..492034530 100755 --- a/bin/build_docker_images.sh +++ b/bin/build_docker_images.sh @@ -259,9 +259,10 @@ build_images () { create_image tf_cifar_container TensorFlowCifarDockerfile $public create_image tf-container TensorFlowDockerfile $public create_image pytorch-container PyTorchContainerDockerfile $public -} - + # Build Metric Monitor image - no dependency + create_image frontend-exporter FrontendExporterDockerfile $public +} usage () { diff --git a/bin/run_unittests.sh b/bin/run_unittests.sh index f8a66e371..2148c23aa 100755 --- a/bin/run_unittests.sh +++ b/bin/run_unittests.sh @@ -143,6 +143,7 @@ function run_integration_tests { python ../integration-tests/kubernetes_integration_test.py python ../integration-tests/deploy_tensorflow_models.py ../integration-tests/r_integration_test/rclipper_test.sh + python ../integration-tests/clipper_metric.py } function run_all_tests { diff --git a/clipper_admin/clipper_admin/container_manager.py b/clipper_admin/clipper_admin/container_manager.py index 04d6ab020..7eba7f452 100644 --- a/clipper_admin/clipper_admin/container_manager.py +++ b/clipper_admin/clipper_admin/container_manager.py @@ -5,6 +5,7 @@ CLIPPER_INTERNAL_QUERY_PORT = 1337 CLIPPER_INTERNAL_MANAGEMENT_PORT = 1338 CLIPPER_INTERNAL_RPC_PORT = 7000 +CLIPPER_INTERNAL_METRIC_PORT = 1390 CLIPPER_DOCKER_LABEL = "ai.clipper.container.label" CLIPPER_MODEL_CONTAINER_LABEL = "ai.clipper.model_container.label" diff --git a/clipper_admin/clipper_admin/docker/docker_container_manager.py b/clipper_admin/clipper_admin/docker/docker_container_manager.py index aed3bccbe..cce7396bb 100644 --- a/clipper_admin/clipper_admin/docker/docker_container_manager.py +++ b/clipper_admin/clipper_admin/docker/docker_container_manager.py @@ -8,9 +8,11 @@ ContainerManager, CLIPPER_DOCKER_LABEL, CLIPPER_MODEL_CONTAINER_LABEL, CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL, CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL, CLIPPER_INTERNAL_RPC_PORT, - CLIPPER_INTERNAL_QUERY_PORT, CLIPPER_INTERNAL_MANAGEMENT_PORT) + CLIPPER_INTERNAL_QUERY_PORT, CLIPPER_INTERNAL_MANAGEMENT_PORT, + CLIPPER_INTERNAL_METRIC_PORT) from ..exceptions import ClipperException from requests.exceptions import ConnectionError +from .docker_metric_utils import * logger = logging.getLogger(__name__) @@ -93,7 +95,7 @@ def start_clipper(self, query_frontend_image, mgmt_frontend_image, logger.debug( "{nw} network already exists".format(nw=self.docker_network)) except ConnectionError: - msg = "Unable to connect to Docker. Please Check if Docker is running." + msg = "Unable to Connect to Docker. Please Check if Docker is running." raise ClipperException(msg) if not self.external_redis: @@ -123,17 +125,19 @@ def start_clipper(self, query_frontend_image, mgmt_frontend_image, }, labels=mgmt_labels, **self.extra_container_kwargs) + query_cmd = "--redis_ip={redis_ip} --redis_port={redis_port} --prediction_cache_size={cache_size}".format( redis_ip=self.redis_ip, redis_port=self.redis_port, cache_size=cache_size) query_labels = self.common_labels.copy() query_labels[CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL] = "" + query_container_id = random.randint(0, 100000) + query_name = "query_frontend-{}".format(query_container_id) self.docker_client.containers.run( query_frontend_image, query_cmd, - name="query_frontend-{}".format( - random.randint(0, 100000)), # generate a random name + name=query_name, ports={ '%s/tcp' % CLIPPER_INTERNAL_QUERY_PORT: self.clipper_query_port, @@ -141,6 +145,18 @@ def start_clipper(self, query_frontend_image, mgmt_frontend_image, }, labels=query_labels, **self.extra_container_kwargs) + + # Metric Section + query_frontend_metric_name = "query_frontend_exporter-{}".format( + query_container_id) + run_query_frontend_metric_image( + query_frontend_metric_name, self.docker_client, query_name, + self.common_labels, self.extra_container_kwargs) + setup_metric_config(query_frontend_metric_name, + CLIPPER_INTERNAL_METRIC_PORT) + run_metric_image(self.docker_client, self.common_labels, + self.extra_container_kwargs) + self.connect() def connect(self): @@ -187,15 +203,24 @@ def _add_replica(self, name, version, input_type, image): "CLIPPER_IP": query_frontend_hostname, "CLIPPER_INPUT_TYPE": input_type, } + + model_container_label = create_model_container_label(name, version) labels = self.common_labels.copy() - labels[CLIPPER_MODEL_CONTAINER_LABEL] = create_model_container_label( - name, version) + labels[CLIPPER_MODEL_CONTAINER_LABEL] = model_container_label + + # Metric Section + model_container_name = model_container_label + '-{}'.format( + random.randint(0, 100000)) self.docker_client.containers.run( image, + name=model_container_name, environment=env_vars, labels=labels, **self.extra_container_kwargs) + update_metric_config(model_container_name, + CLIPPER_INTERNAL_METRIC_PORT) + def set_num_replicas(self, name, version, input_type, image, num_replicas): current_replicas = self._get_replicas(name, version) if len(current_replicas) < num_replicas: diff --git a/clipper_admin/clipper_admin/docker/docker_metric_utils.py b/clipper_admin/clipper_admin/docker/docker_metric_utils.py new file mode 100644 index 000000000..a0adf6101 --- /dev/null +++ b/clipper_admin/clipper_admin/docker/docker_metric_utils.py @@ -0,0 +1,141 @@ +import yaml +import requests +import random +import os +from ..version import __version__ + + +def ensure_clipper_tmp(): + """ + Make sure /tmp/clipper directory exist. If not, make one. + :return: None + """ + try: + os.makedirs('/tmp/clipper') + except OSError as e: + # Equivalent to os.makedirs(., exist_ok=True) in py3 + pass + + +def get_prometheus_base_config(): + """ + Generate a basic configuration dictionary for prometheus + :return: dictionary + """ + conf = dict() + conf['global'] = {'evaluation_interval': '5s', 'scrape_interval': '5s'} + conf['scrape_configs'] = [] + return conf + + +def run_query_frontend_metric_image(name, docker_client, query_name, + common_labels, extra_container_kwargs): + """ + Use docker_client to run a frontend-exporter image. + :param name: Name to pass in, need to be unique. + :param docker_client: The docker_client object. + :param query_name: The corresponding frontend name + :param common_labels: Labels to pass in. + :param extra_container_kwargs: Kwargs to pass in. + :return: None + """ + + query_frontend_metric_cmd = "--query_frontend_name {}".format(query_name) + query_frontend_metric_labels = common_labels.copy() + + docker_client.containers.run( + "clipper/frontend-exporter:{}".format(__version__), + query_frontend_metric_cmd, + name=name, + labels=query_frontend_metric_labels, + **extra_container_kwargs) + + +def setup_metric_config(query_frontend_metric_name, + CLIPPER_INTERNAL_METRIC_PORT): + """ + Write to file prometheus.yml after frontend-metric is setup. + :param query_frontend_metric_name: Corresponding image name + :param CLIPPER_INTERNAL_METRIC_PORT: Default port. + :return: None + """ + + ensure_clipper_tmp() + + with open('/tmp/clipper/prometheus.yml', 'w') as f: + prom_config = get_prometheus_base_config() + prom_config_query_frontend = { + 'job_name': + 'query', + 'static_configs': [{ + 'targets': [ + '{name}:{port}'.format( + name=query_frontend_metric_name, + port=CLIPPER_INTERNAL_METRIC_PORT) + ] + }] + } + prom_config['scrape_configs'].append(prom_config_query_frontend) + + yaml.dump(prom_config, f) + + +def run_metric_image(docker_client, common_labels, extra_container_kwargs): + """ + Run the prometheus image. + :param docker_client: The docker client object + :param common_labels: Labels to pass in + :param extra_container_kwargs: Kwargs to pass in. + :return: None + """ + + metric_cmd = [ + "--config.file=/etc/prometheus/prometheus.yml", + "--storage.tsdb.path=/prometheus", + "--web.console.libraries=/etc/prometheus/console_libraries", + "--web.console.templates=/etc/prometheus/consoles", + "--web.enable-lifecycle" + ] + metric_labels = common_labels.copy() + docker_client.containers.run( + "prom/prometheus", + metric_cmd, + name="metric_frontend-{}".format(random.randint(0, 100000)), + ports={'9090/tcp': 9090}, + volumes={ + '/tmp/clipper/prometheus.yml': { + 'bind': '/etc/prometheus/prometheus.yml', + 'mode': 'ro' + } + }, + labels=metric_labels, + **extra_container_kwargs) + + +def update_metric_config(model_container_name, CLIPPER_INTERNAL_METRIC_PORT): + """ + Update the prometheus.yml configuration file. + :param model_container_name: New model container_name, need to be unique. + :param CLIPPER_INTERNAL_METRIC_PORT: Default port + :return: None + """ + with open('/tmp/clipper/prometheus.yml', 'r') as f: + conf = yaml.load(f) + + new_job_dict = { + 'job_name': + '{}'.format(model_container_name), + 'static_configs': [{ + 'targets': [ + '{name}:{port}'.format( + name=model_container_name, + port=CLIPPER_INTERNAL_METRIC_PORT) + ] + }] + } + conf['scrape_configs'].append(new_job_dict) + + with open('/tmp/clipper/prometheus.yml', 'w') as f: + yaml.dump(conf, f) + + requests.post('http://localhost:9090/-/reload') diff --git a/clipper_admin/requirements.txt b/clipper_admin/requirements.txt index 61013786c..2e4c3934d 100644 --- a/clipper_admin/requirements.txt +++ b/clipper_admin/requirements.txt @@ -5,3 +5,4 @@ docker==2.5.1 kubernetes==3.0.0 six==1.10.0 mock +prometheus_client \ No newline at end of file diff --git a/clipper_admin/setup.py b/clipper_admin/setup.py index 428bfde7e..fdf55bfe1 100644 --- a/clipper_admin/setup.py +++ b/clipper_admin/setup.py @@ -26,7 +26,13 @@ package_data={'clipper_admin': ['*.txt', '*/*.yaml']}, keywords=['clipper', 'prediction', 'model', 'management'], install_requires=[ - 'requests', 'subprocess32', 'pyyaml', 'docker', 'kubernetes', 'six', + 'requests', + 'subprocess32', + 'pyyaml', + 'docker', + 'kubernetes', + 'prometheus_client', + 'six', 'cloudpickle>=0.5.2' ], extras_require={ diff --git a/containers/python/rpc.py b/containers/python/rpc.py index 80dd235f0..8f5613c96 100644 --- a/containers/python/rpc.py +++ b/containers/python/rpc.py @@ -8,6 +8,9 @@ import socket import sys from collections import deque +from multiprocessing import Pipe, Process +from prometheus_client import start_http_server +from prometheus_client.core import GaugeMetricFamily, REGISTRY INPUT_TYPE_BYTES = 0 INPUT_TYPE_INTS = 1 @@ -180,7 +183,7 @@ def get_prediction_function(self): def get_event_history(self): return self.event_history.get_events() - def run(self): + def run(self, metric_conn): print("Serving predictions for {0} input type.".format( input_type_to_string(self.model_input_type))) connected = False @@ -189,6 +192,9 @@ def run(self): poller = zmq.Poller() sys.stdout.flush() sys.stderr.flush() + + pred_metric = dict(model_pred_count=0) + while True: socket = self.context.socket(zmq.DEALER) poller.register(socket, zmq.POLLIN) @@ -306,6 +312,10 @@ def run(self): response.send(socket, self.event_history) + pred_metric['model_pred_count'] += 1 + + metric_conn.send(pred_metric) + print("recv: %f us, parse: %f us, handle: %f us" % ((t2 - t1).microseconds, (t3 - t2).microseconds, (t4 - t3).microseconds)) @@ -488,4 +498,40 @@ def start(self, model, host, port, model_name, model_version, input_type): self.server.model_version = model_version self.server.model_input_type = model_input_type self.server.model = model - self.server.run() + + parent_conn, child_conn = Pipe(duplex=True) + metrics_proc = Process(target=run_metric, args=(child_conn, )) + metrics_proc.start() + self.server.run(parent_conn) + + +class MetricCollector: + def __init__(self, pipe_child_conn): + self.pipe_conn = pipe_child_conn + + def collect(self): + latest_metric_dict = None + while self.pipe_conn.poll(): + latest_metric_dict = self.pipe_conn.recv() + if latest_metric_dict: + for name, val in latest_metric_dict.items(): + try: + yield GaugeMetricFamily( + name=name, + documentation=name, # Required Argument + value=val) + except ValueError: + pass + + +def run_metric(child_conn): + """ + This function takes a child_conn at the end of the pipe and + receive object to update prometheus metric. + + It is recommended to be ran in a separate process. + """ + REGISTRY.register(MetricCollector(child_conn)) + start_http_server(1390) + while True: + time.sleep(1) diff --git a/dockerfiles/FrontendExporterDockerfile b/dockerfiles/FrontendExporterDockerfile new file mode 100644 index 000000000..ada207d25 --- /dev/null +++ b/dockerfiles/FrontendExporterDockerfile @@ -0,0 +1,10 @@ +# This ARG isn't used but prevents warnings in the build script +ARG CODE_VERSION +FROM python:3 + +WORKDIR /usr/src/app + +RUN pip install requests prometheus_client flatten_json + +COPY monitoring/front_end_exporter.py . +ENTRYPOINT ["python", "./front_end_exporter.py"] \ No newline at end of file diff --git a/dockerfiles/RPCDockerfile b/dockerfiles/RPCDockerfile index bf47dc974..02ba7f3f4 100644 --- a/dockerfiles/RPCDockerfile +++ b/dockerfiles/RPCDockerfile @@ -8,7 +8,8 @@ MAINTAINER Dan Crankshaw RUN mkdir -p /model \ && apt-get update \ && apt-get install -y libzmq3 libzmq3-dev \ - && conda install -y pyzmq + && conda install -y pyzmq \ + && pip install prometheus_client WORKDIR /container diff --git a/integration-tests/clipper_metric.py b/integration-tests/clipper_metric.py new file mode 100644 index 000000000..c678af7ab --- /dev/null +++ b/integration-tests/clipper_metric.py @@ -0,0 +1,60 @@ +from __future__ import print_function +import json +import requests +from datetime import datetime +import time +import numpy as np +import signal +import sys +import os +import logging +from test_utils import (create_docker_connection, BenchmarkException, headers, + log_clipper_state) + +cur_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.abspath("%s/../clipper_admin" % cur_dir)) +from clipper_admin import ClipperConnection, DockerContainerManager +from clipper_admin.deployers import python as python_deployer + + +def predict(addr, x): + url = "http://%s/simple-example/predict" % addr + req_json = json.dumps({'input': list(x)}) + headers = {'Content-type': 'application/json'} + r = requests.post(url, headers=headers, data=req_json) + + +def feature_sum(xs): + return [str(sum(x)) for x in xs] + + +logging.basicConfig( + format='%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', + datefmt='%y-%m-%d:%H:%M:%S', + level=logging.INFO) + +logger = logging.getLogger(__name__) + +if __name__ == '__main__': + logger.info("Start Metric Test (0/1): Running 2 Replicas") + clipper_conn = ClipperConnection(DockerContainerManager(redis_port=6380)) + clipper_conn.start_clipper() + python_deployer.create_endpoint( + clipper_conn, "simple-example", "doubles", feature_sum, num_replicas=2) + time.sleep(2) + try: + for _ in range(100): + predict(clipper_conn.get_query_addr(), np.random.random(200)) + time.sleep(0.2) + up_response = requests.get( + "http://localhost:9090/api/v1/series?match[]=up").json() + logger.debug(up_response) + assert up_response['status'] == 'success' + assert len(up_response['data']) == 3 + logger.info("Metric Test Done, Cleaning up...") + clipper_conn.stop_all() + except Exception as e: + logger.error(e) + log_clipper_state(clipper_conn) + clipper_conn.stop_all() + sys.exit(1) \ No newline at end of file diff --git a/integration-tests/deploy_pyspark_pipeline_models.py b/integration-tests/deploy_pyspark_pipeline_models.py index 546efe4b9..d54816171 100644 --- a/integration-tests/deploy_pyspark_pipeline_models.py +++ b/integration-tests/deploy_pyspark_pipeline_models.py @@ -85,8 +85,9 @@ def run_test(): prediction)) # test predict function - print(predict(spark, model, - [json.dumps((np.random.randint(1000), "spark abcd"))])) + print( + predict(spark, model, + [json.dumps((np.random.randint(1000), "spark abcd"))])) try: clipper_conn = create_docker_connection( diff --git a/integration-tests/deploy_tensorflow_models.py b/integration-tests/deploy_tensorflow_models.py index 368bb5642..5d9cd07a7 100644 --- a/integration-tests/deploy_tensorflow_models.py +++ b/integration-tests/deploy_tensorflow_models.py @@ -139,9 +139,11 @@ def train_logistic_regression(sess, X_train, y_train): sess.run(train, feed_dict={x: X_train, y_labels: y_train}) if i % 1000 == 0: print('Cost , Accuracy') - print(sess.run( - [loss, accuracy], feed_dict={x: X_train, - y_labels: y_train})) + print( + sess.run( + [loss, accuracy], + feed_dict={x: X_train, + y_labels: y_train})) return sess diff --git a/monitoring/README.md b/monitoring/README.md new file mode 100644 index 000000000..c409eb75c --- /dev/null +++ b/monitoring/README.md @@ -0,0 +1,10 @@ +This module is related to Clipper's metric monitoring function. For full design, see [Design Doc.](https://docs.google.com/document/d/10whRxCc97gOJl4j2lY6R-v7cI_ZoAMVcNGPG_9oj6iY/edit?usp=sharing) + +## Prometheus Server +We use prometheus as the metric tracking system. Once you spin up a clipper query frontend and a model containers. You can view prometheus UI at: [`http://localhost:9090`](http://localhost:9090). + +Please note that Prometheus UI is for debug purpose only. You can view certain metric and graph the timeseries. But for better visualization, we recommend [Grafana](https://grafana.com/). Grafana has default support for Prometheus Client. + +## Avaliable Metrics +For now, the following metrics are avaliable, in the corresponding Prometheus data type: +- [x] model_pred_count (Guage) diff --git a/monitoring/front_end_exporter.py b/monitoring/front_end_exporter.py new file mode 100644 index 000000000..5cf4ead87 --- /dev/null +++ b/monitoring/front_end_exporter.py @@ -0,0 +1,69 @@ +import requests +from flatten_json import flatten +import itertools +import time +from prometheus_client import start_http_server +from prometheus_client.core import GaugeMetricFamily, REGISTRY +import argparse + +parser = argparse.ArgumentParser( + description='Spin up a node exporter for query_frontend.') +parser.add_argument( + '--query_frontend_name', + metavar='str', + type=str, + required=True, + help='The name of docker container in clipper_network') +args = parser.parse_args() + +query_frontend_id = args.query_frontend_name + +ADDRESS = 'http://{}:1337/metrics'.format(query_frontend_id) #Sub with name + + +def load_metric(): + res = requests.get(ADDRESS) + return res.json() + + +def multi_dict_unpacking(lst): + """ + Receive a list of dictionaries, join them into one big dictionary + """ + result = {} + for d in lst: + result = {**result, **d} + return result + + +def parse_metric(metrics): + wo_type = list(itertools.chain.from_iterable(metrics.values())) + wo_type_flattened = list(itertools.chain([flatten(d) for d in wo_type])) + wo_type_joined = multi_dict_unpacking(wo_type_flattened) + return wo_type_joined + + +class ClipperCollector(object): + def __init__(self): + pass + + def collect(self): + metrics = parse_metric(load_metric()) + + for name, val in metrics.items(): + try: + if '.' or 'e' in val: + val = float(val) + else: + val = int(val) + name = name.replace(':', '_').replace('-', '_') + yield GaugeMetricFamily(name, 'help', value=val) + except ValueError: + pass + + +if __name__ == '__main__': + REGISTRY.register(ClipperCollector()) + start_http_server(1390) + while True: + time.sleep(1)