Skip to content

Commit

Permalink
ZMQ Frontend + Caching Improvements (#7)
Browse files Browse the repository at this point in the history
* rpc test infra change

* rpcbench more

* bench, metrics fix

* concurrent queue for rpc

* Add send/recv balance to rpc bench

* multi container bench

* add concurrent queue

* fix end_to_end bench

* [CLIPPER-224] Replace boost futures with folly futures (ucbrise#266)

* replace boost futures with folly

* Remove unused futures code

* some code formatting

* Update query frontend dockerfile to support Folly

* fix cache move issue

* remove unused import

* format code

* remove version check

* remove boost thread dependency

* remove boost thread as required component

* format code

* Create base dockerfile for installing clipper deps, query and mgmt inherit from this base

* rename lib-base >> lib_base

* makes FindFolly.cmake search in FOLLY_ROOT

* format code

* Pin library versions in base dockerfile

* Fix FindFolly cmake

* Add additional link flags for folly cmake

* Add missing threadpool imports

* fix boost thread import

* Add more cmake link flags

* Revert "Add more cmake link flags"

This reverts commit 5604672.

* Update QueryFrontendDockerfile

* Update ManagementFrontendDockerfile

* query proc return default future

* grpc benchmark script

* Add request throughput meter to grpc frontend

* change meter name

* Add metrics reporting

* update grpc bench

* fixes

* frontend fixes

* update grpc benchmark

* multiproc changes

* more bench fixes

* allow query proc to flourish

* logging perf

* batch size log

* Management frontend: Improve clarity of response messages (ucbrise#271)

* format code

* Update clipper mgr register external output

* logs

* queue size

* Fix histogram overflow errors (ucbrise#272)

* Recalculate mean on insertion, improve histogram data precision and value capacity

* Format code

* Implement CLOCK policy

* add cache tests

* revert clipper mgr changes

* remove quotes

* add test assertion

* format code

* Refactor PredictionCache to evict based on byte size rather than num pages

* Add config flag for cache size

* Format code

* Update config tests

* format code

* Update config.hpp

* Remove unused task executor threadpool size config

* Add cache size option to clipper mgr

* format code

* remove task execution variable

* when evicting, stop if we're out of pages

* remove task execution test statement

* Fix rebase issue

* updates

* fix

* qp limit

* fix

* fix test

* qp lat

* fix

* fox

* test

* test2

* test3

* test4

* test5

* Wangle

* more wangle

* threadpool

* ..

* link wangle

* bench log

* update

* ..

* test

* test

* testfix

* test2

* fixes

* var naming

* more metrics

* time diff

* ..

* one more metric change

* queue insert metric

* more seg monitor

* measure something that probably isn't the issue

* measure something that probably is the problem

* fixes

* hopeful move fix

* measure other part of cache

* measure cache put

* measure with some wangle

* wangle with the frontend

* more measure

* hone measurement

* try fix

* var issues

* add move

* make copy

* Add a log

* expect tid match

* remove log

* another continuation change

* bump hist size

* change cache seg to fetch

* urgh

* hm

* fix segfault

* var naming

* fix merge issues

* measure cache insertion latency

* cache eviction

* cache unique ptr

* fix method prototype

* fixes

* remove hit ratio

* remove lookups counter

* bench other seg

* metric move

* change qp seg hist

* move hist

* ..

* hist size

* ...

* send/recv balance

* meterfix

* remove metrics

* experiment

* varfix

* ..

* ...

* ..

* logging

* varfix

* why

* measure elsewhere

* measure thread task lat

* compfixes

* compfix2

* compfix3

* fail to fix seg fault

* fix segfault

* get num steps

* time first pass

* track queue insertion

* log output

* Add moar ctx

* Remove log

* ..

* Add a logging tag

* varfix

* zmq frontend

* cmake fix

* import fixes

* remove grpc stuff

* fixes

* more fixes

* fixes again

* fix

* zmq client

* varfox

* add socket bind

* string conversion

* update

* ..

* ....

* test

* ...

* protocol fix

* fix?

* fail

* fix polling

* client fix

* api fix

* fixes

* actually add the application to rpc

* debug

* bench fix

* client logs

* client log

* new log

* Add hash checks

* remove a log

* log

* test

* routing id log

* cmake fix

* import fix

* client fix

* routing id fix

* ..

* constness

* try again

* prototype changes

* constness

* constness

* client test

* client test

* more routing id logs'

* remove ref

* remove move

* routing id rpc log

* ...

* req id int

* ordering

* fix really dumb error

* prototype fixes

* client throughput logging

* client fix

* client fix2

* units

* ..

* ..

* clean up task executor

* cleanup qp

* cleanup

* bench update

* remove unnecessary file handles from grpc bench

* remove unused chron statements

* re-add deadline comparator
  • Loading branch information
Corey-Zumar authored Sep 12, 2017
1 parent 78f6783 commit d8b03ae
Show file tree
Hide file tree
Showing 37 changed files with 5,445 additions and 250 deletions.
10 changes: 9 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ find_package(Folly REQUIRED)
add_library(folly INTERFACE IMPORTED)
set_property(TARGET folly PROPERTY
INTERFACE_INCLUDE_DIRECTORIES ${FOLLY_INCLUDE_DIR})
set(FOLLY_LINK_FLAGS "-lglog")
set(FOLLY_LINK_FLAGS "-lglog -lcrypto -lssl")
set_property(TARGET folly PROPERTY
INTERFACE_LINK_LIBRARIES ${FOLLY_LINK_FLAGS} ${FOLLY_LIBRARY})

# Include Wangle as an imported target
find_package(Wangle REQUIRED)
add_library(wangle INTERFACE IMPORTED)
set_property(TARGET wangle PROPERTY
INTERFACE_INCLUDE_DIRECTORIES ${WANGLE_INCLUDE_DIR})
set_property(TARGET wangle PROPERTY
INTERFACE_LINK_LIBRARIES ${WANGLE_LIBRARY})

# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -pedantic -Wextra -Werror -Wno-deprecated-declarations -Wno-unsupported-friend")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-deprecated-declarations -Wno-unused-parameter -Wno-sign-compare")
if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
Expand Down
12 changes: 9 additions & 3 deletions clipper_admin/clipper_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

DEFAULT_REDIS_IP = "redis"
DEFAULT_REDIS_PORT = 6379
DEFAULT_PREDICTION_CACHE_SIZE_BYTES = 33554432L
CLIPPER_QUERY_PORT = 1337
CLIPPER_MANAGEMENT_PORT = 1338
CLIPPER_RPC_PORT = 7000
Expand Down Expand Up @@ -104,7 +105,9 @@ class Clipper:
restart_containers : bool, optional
If true, containers will restart on failure. If false, containers
will not restart automatically.
cache_size : int, optional
The size of Clipper's prediction cache in bytes. Default cache size is
32 MiB
"""

def __init__(self,
Expand All @@ -118,7 +121,8 @@ def __init__(self,
redis_port=DEFAULT_REDIS_PORT,
redis_persistence_path=None,
restart_containers=False,
rpc_frontend=False):
rpc_frontend=False,
cache_size=DEFAULT_PREDICTION_CACHE_SIZE_BYTES):
self.redis_ip = redis_ip
self.redis_port = redis_port
if rpc_frontend:
Expand Down Expand Up @@ -152,7 +156,8 @@ def __init__(self,
'query_frontend': {
'command': [
'--redis_ip=%s' % self.redis_ip,
'--redis_port=%d' % self.redis_port
'--redis_port=%d' % self.redis_port,
'--prediction_cache_size=%d' % cache_size
],
'depends_on': ['mgmt_frontend'],
'image':
Expand Down Expand Up @@ -1520,6 +1525,7 @@ def _publish_new_model(self, name, version, labels, input_type,
headers = {'Content-type': 'application/json'}
r = requests.post(url, headers=headers, data=req_json)
if r.status_code == requests.codes.ok:
print(r.text)
return True
else:
print("Error publishing model: %s" % r.text)
Expand Down
4 changes: 2 additions & 2 deletions cmake/FindFolly.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8.7 FATAL_ERROR)

INCLUDE(FindPackageHandleStandardArgs)

FIND_LIBRARY(FOLLY_LIBRARY folly)
FIND_PATH(FOLLY_INCLUDE_DIR "folly/String.h")
FIND_LIBRARY(FOLLY_LIBRARY folly PATHS $ENV{FOLLY_ROOT}/lib $ENV{PATH})
FIND_PATH(FOLLY_INCLUDE_DIR "folly/String.h" HINTS $ENV{FOLLY_ROOT}/include)

SET(FOLLY_LIBRARIES ${FOLLY_LIBRARY})

Expand Down
35 changes: 35 additions & 0 deletions cmake/FindWangle.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# - Try to find Facebook wangle library
# This will define
# WANGLE_FOUND
# WANGLE_INCLUDE_DIR
# WANGLE_LIBRARIES
#

find_package(Folly REQUIRED)

find_path(
WANGLE_INCLUDE_DIR
NAMES "wangle/channel/Pipeline.h"
HINTS
"/usr/local/facebook/include"
)

find_library(
WANGLE_LIBRARY
NAMES wangle
HINTS
"/usr/local/facebook/lib"
)

set(WANGLE_LIBRARIES ${WANGLE_LIBRARY} ${FOLLY_LIBRARIES})

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(
WANGLE DEFAULT_MSG WANGLE_INCLUDE_DIR WANGLE_LIBRARIES)

mark_as_advanced(WANGLE_INCLUDE_DIR WANGLE_LIBRARIES WANGLE_FOUND)

if(WANGLE_FOUND AND NOT WANGLE_FIND_QUIETLY)
message(STATUS "WANGLE: ${WANGLE_INCLUDE_DIR}")
endif()
64 changes: 64 additions & 0 deletions grpc_client_py/grpc_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import print_function

import sys
import grpc
import logging
import numpy as np
import time

import clipper_frontend_pb2
import clipper_frontend_pb2_grpc

from datetime import datetime
from multiprocessing import Process

DATA_TYPE_BYTES = 0
DATA_TYPE_INTS = 1
DATA_TYPE_FLOATS = 2
DATA_TYPE_DOUBLES = 3
DATA_TYPE_STRINGS = 4

CIFAR_SIZE_DOUBLES = 384

input_type = "doubles"
app_name = "app1"
model_name = "m1"

def run(proc_num):
channel = grpc.insecure_channel('localhost:1337')
stub = clipper_frontend_pb2_grpc.PredictStub(channel)
i = 0
latency = 0
file_name = "/tmp/bench_{}".format(proc_num)
while True:
begin = datetime.now()
x = clipper_frontend_pb2.DoubleData(data=list(np.random.random(CIFAR_SIZE_DOUBLES)))
req = clipper_frontend_pb2.PredictRequest(application=app_name, data_type=DATA_TYPE_DOUBLES, double_data=x)
response = stub.Predict(req)
print("Received response!")
end = datetime.now()

latency += (end - begin).total_seconds()

if i > 0 and i % 100 == 0:
print("Throughput: {} qps\n".format(float(latency) / i))
i = 0
latency = 0

i += 1

if __name__ == "__main__":
if len(sys.argv) < 2:
raise

num_procs = int(sys.argv[1])

processes = []

for i in range(num_procs):
p = Process(target=run, args=('%d'.format(i),))
p.start()
processes.append(p)

for p in processes:
p.join()
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ add_subdirectory(libs)
add_subdirectory(libclipper)
add_subdirectory(rest_frontend)
add_subdirectory(rpc_frontend)
add_subdirectory(zmq_frontend)
add_subdirectory(management)
add_subdirectory(benchmarks)
add_subdirectory(container)
3 changes: 2 additions & 1 deletion src/benchmarks/src/end_to_end_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ void run_benchmark(std::unordered_map<std::string, std::string> &config) {
static_cast<char *>(malloc(DEFAULT_OUTPUT.size())), free);
memcpy(default_output_content.get(), DEFAULT_OUTPUT.data(),
DEFAULT_OUTPUT.size());

clipper::Output parsed_default_output(
std::make_tuple(default_output_content, 0, DEFAULT_OUTPUT.size()), {});
std::make_shared<StringOutput>(default_output_content, 0, DEFAULT_OUTPUT.size()), {});
auto init_state = p.init_state(parsed_default_output);
clipper::StateKey state_key{TEST_APPLICATION_LABEL, clipper::DEFAULT_USER_ID,
0};
Expand Down
Loading

0 comments on commit d8b03ae

Please sign in to comment.