Skip to content

Commit

Permalink
Expose http configurations to clipper_admin (ucbrise#524)
Browse files Browse the repository at this point in the history
* Expose http configurations to clipper_admin

* solved: ucbrise#523
* exposed configurations
  * qf_http_thread_pool_size
    * int(optional)
    * The size of thread pool created in query frontend for http serving.
  * qf_http_timeout_request
    * int(optional)
    * The seconds of timeout on request handling in query frontend for http serving.
  * qf_http_timeout_content
    * int(optional)
    * The seconds of timeout on content handling in query frontend for http serving.

* Fix test cases

* Fix DockerContainerManager
  • Loading branch information
Sungjun.Kim authored and rkooo567 committed Apr 27, 2019
1 parent beda867 commit deaf9a6
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 31 deletions.
16 changes: 13 additions & 3 deletions clipper_admin/clipper_admin/clipper_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def start_clipper(self,
frontend_exporter_image='{}/frontend-exporter:{}'.format(
__registry__, __version__),
cache_size=DEFAULT_PREDICTION_CACHE_SIZE_BYTES,
qf_http_thread_pool_size=1,
qf_http_timeout_request=5,
qf_http_timeout_content=300,
num_frontend_replicas=1):
"""Start a new Clipper cluster and connect to it.
Expand All @@ -114,9 +117,15 @@ def start_clipper(self,
The frontend exporter docker image to use. You can set this argument to specify
a custom build of the management frontend, but any customization should maintain API
compability and preserve the expected behavior of the system.
cache_size : int, optional
cache_size : int(optional)
The size of Clipper's prediction cache in bytes. Default cache size is 32 MiB.
num_frontend_replicas : int, option
qf_http_thread_pool_size : int(optional)
The size of thread pool created in query frontend for http serving.
qf_http_timeout_request : int(optional)
The seconds of timeout on request handling in query frontend for http serving..
qf_http_timeout_content : int(optional)
The seconds of timeout on content handling in query frontend for http serving..
num_frontend_replicas : int(optional)
The number of query frontend to deploy for fault tolerance and high availability.
Raises
Expand All @@ -126,7 +135,8 @@ def start_clipper(self,
try:
self.cm.start_clipper(query_frontend_image, mgmt_frontend_image,
frontend_exporter_image, cache_size,
num_frontend_replicas)
qf_http_thread_pool_size, qf_http_timeout_request,
qf_http_timeout_content, num_frontend_replicas)
except ClipperException as e:
self.logger.warning("Error starting Clipper: {}".format(e.msg))
raise e
Expand Down
3 changes: 2 additions & 1 deletion clipper_admin/clipper_admin/container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class ContainerManager(object):
@abc.abstractmethod
def start_clipper(self, query_frontend_image, mgmt_frontend_image,
frontend_exporter_image, cache_size,
num_frontend_replicas):
qf_http_thread_pool_size, qf_http_timeout_request,
qf_http_timeout_content, num_frontend_replicas):
# NOTE: An implementation of this interface should be connected to a running
# Clipper instance when this method returns. ClipperConnection will not
# call ContainerManager.connect() separately after calling start_clipper(), so
Expand Down
15 changes: 11 additions & 4 deletions clipper_admin/clipper_admin/docker/docker_container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ def start_clipper(self,
mgmt_frontend_image,
frontend_exporter_image,
cache_size,
prometheus_version,
qf_http_thread_pool_size,
qf_http_timeout_request,
qf_http_timeout_content,
num_frontend_replicas=1):
if num_frontend_replicas != 1:
msg = "Docker container manager's query frontend scale-out " \
Expand Down Expand Up @@ -223,11 +225,16 @@ def start_clipper(self,

# query frontend
query_cmd = ("--redis_ip={redis_ip} --redis_port={redis_port} "
"--prediction_cache_size={cache_size}").format(
"--prediction_cache_size={cache_size} "
"--thread_pool_size={thread_pool_size} "
"--timeout_request={timeout_request} "
"--timeout_content={timeout_content}").format(
redis_ip=self.redis_ip,
redis_port=CLIPPER_INTERNAL_REDIS_PORT,
cache_size=cache_size)

cache_size=cache_size,
thread_pool_size=qf_http_thread_pool_size,
timeout_request=qf_http_timeout_request,
timeout_content=qf_http_timeout_content)
query_container_id = random.randint(0, 100000)
query_name = "query_frontend-{}".format(query_container_id)
self.clipper_query_port = find_unbound_port(self.clipper_query_port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,17 @@ def start_clipper(self,
mgmt_frontend_image,
frontend_exporter_image,
cache_size,
qf_http_thread_pool_size,
qf_http_timeout_request,
qf_http_timeout_content,
num_frontend_replicas=1):
self._start_redis()
self._start_mgmt(mgmt_frontend_image)
self.num_frontend_replicas = num_frontend_replicas
self._start_query(query_frontend_image, frontend_exporter_image,
cache_size, num_frontend_replicas)
cache_size, qf_http_thread_pool_size,
qf_http_timeout_request, qf_http_timeout_content,
num_frontend_replicas)
self._start_prometheus()
self.connect()

Expand Down Expand Up @@ -242,7 +247,8 @@ def _start_mgmt(self, mgmt_image):
body=mgmt_service_data, namespace=self.k8s_namespace)

def _start_query(self, query_image, frontend_exporter_image, cache_size,
num_replicas):
qf_http_thread_pool_size, qf_http_timeout_request,
qf_http_timeout_content, num_replicas):
for query_frontend_id in range(num_replicas):
with _pass_conflicts():
query_deployment_data = self._generate_config(
Expand All @@ -252,6 +258,9 @@ def _start_query(self, query_image, frontend_exporter_image, cache_size,
redis_service_host=self.redis_ip,
redis_service_port=self.redis_port,
cache_size=cache_size,
thread_pool_size=qf_http_thread_pool_size,
timeout_request=qf_http_timeout_request,
timeout_content=qf_http_timeout_content,
name='query-frontend-{}'.format(query_frontend_id),
id_label=str(query_frontend_id),
cluster_name=self.cluster_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ spec:
- "--redis_ip={{ redis_service_host | default('$(REDIS_SERVICE_HOST)', true) }}"
- "--redis_port={{ redis_service_port | default('$(REDIS_SERVICE_PORT)', true) }}"
- "--prediction_cache_size={{ cache_size }}"
- "--thread_pool_size={{ thread_pool_size }}"
- "--timeout_request={{ timeout_request }}"
- "--timeout_content={{ timeout_content }}"
image: {{ image }}
imagePullPolicy: Always
name: query-frontend
Expand Down
9 changes: 7 additions & 2 deletions src/frontends/src/query_frontend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,13 @@ class AppMetrics {
template <class QP>
class RequestHandler {
public:
RequestHandler(std::string address, int portno)
: server_(address, portno), query_processor_() {
RequestHandler(std::string address, int portno,
int thread_pool_size = clipper::DEFAULT_THREAD_POOL_SIZE,
int timeout_request = clipper::DEFAULT_TIMEOUT_REQUEST,
int timeout_content = clipper::DEFAULT_TIMEOUT_CONTENT)
: server_(address, portno, thread_pool_size, timeout_request,
timeout_content),
query_processor_() {
clipper::Config& conf = clipper::get_config();
while (!redis_connection_.connect(conf.get_redis_address(),
conf.get_redis_port())) {
Expand Down
13 changes: 11 additions & 2 deletions src/frontends/src/query_frontend_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ int main(int argc, char* argv[]) {
("rpc_service_port", "RPCService's port",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_RPC_SERVICE_PORT)))
("prediction_cache_size", "Size of the prediction cache in bytes, excluding cache metadata",
cxxopts::value<long>()->default_value(std::to_string(clipper::DEFAULT_PREDICTION_CACHE_SIZE_BYTES)));
cxxopts::value<long>()->default_value(std::to_string(clipper::DEFAULT_PREDICTION_CACHE_SIZE_BYTES)))
("thread_pool_size", "thread pool size of server_http",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_THREAD_POOL_SIZE)))
("timeout_request", "timeout_request of server_http",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_TIMEOUT_REQUEST)))
("timeout_content", "timeout_content of server_http",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_TIMEOUT_CONTENT)));
// clang-format on
options.parse(argc, argv);

Expand All @@ -29,6 +35,9 @@ int main(int argc, char* argv[]) {
conf.ready();

query_frontend::RequestHandler<clipper::QueryProcessor> rh(
"0.0.0.0", clipper::QUERY_FRONTEND_PORT);
"0.0.0.0", clipper::QUERY_FRONTEND_PORT,
options["thread_pool_size"].as<int>(),
options["timeout_request"].as<int>(),
options["timeout_content"].as<int>());
rh.start_listening();
}
30 changes: 25 additions & 5 deletions src/frontends/src/query_frontend_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ class QueryFrontendTest : public ::testing::Test {
std::shared_ptr<redox::Subscriber> subscriber_;

QueryFrontendTest()
: rh_("0.0.0.0", 1337),
: rh_("0.0.0.0",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT),
redis_(std::make_shared<redox::Redox>()),
subscriber_(std::make_shared<redox::Subscriber>()) {
Config& conf = get_config();
Expand Down Expand Up @@ -406,7 +410,11 @@ TEST_F(QueryFrontendTest, TestReadApplicationsAtStartup) {
ASSERT_TRUE(add_application(*redis_, name2, input_type2, policy2,
default_output2, latency_slo_micros2));

RequestHandler<MockQueryProcessor> rh2_("127.0.0.1", 1337);
RequestHandler<MockQueryProcessor> rh2_("127.0.0.1",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT);
size_t two_apps = rh2_.num_applications();
EXPECT_EQ(two_apps, (size_t)2);
}
Expand Down Expand Up @@ -435,7 +443,11 @@ TEST_F(QueryFrontendTest, TestReadModelsAtStartup) {
std::unordered_map<std::string, std::string> expected_models = {{"m", "2"},
{"n", "3"}};

RequestHandler<MockQueryProcessor> rh2_("127.0.0.1", 1337);
RequestHandler<MockQueryProcessor> rh2_("127.0.0.1",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT);
EXPECT_EQ(rh2_.get_current_model_versions(), expected_models);
}

Expand All @@ -458,7 +470,11 @@ TEST_F(QueryFrontendTest, TestReadModelLinksAtStartup) {

std::vector<std::string> expected_app1_linked_models = {"m1", "m2", "m3"};

RequestHandler<MockQueryProcessor> rh2_("127.0.0.1", 1337);
RequestHandler<MockQueryProcessor> rh2_("127.0.0.1",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT);

std::vector<std::string> app1_linked_models =
rh2_.get_linked_models_for_app(app_name_1);
Expand All @@ -482,7 +498,11 @@ TEST_F(QueryFrontendTest, TestReadInvalidModelVersionAtStartup) {
container_name, model_path, DEFAULT_BATCH_SIZE));
// Not setting the version number will cause get_current_model_version()
// to return -1, and the RequestHandler should then throw a runtime_error.
ASSERT_THROW(RequestHandler<QueryProcessor>("127.0.0.1", 1337),
ASSERT_THROW(RequestHandler<QueryProcessor>("127.0.0.1",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT),
std::runtime_error);
}

Expand Down
3 changes: 3 additions & 0 deletions src/libclipper/include/clipper/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ const std::string DEFAULT_REDIS_ADDRESS("localhost");
constexpr int DEFAULT_REDIS_PORT = 6379;
constexpr int DEFAULT_RPC_SERVICE_PORT = 7000;
constexpr long DEFAULT_PREDICTION_CACHE_SIZE_BYTES = 33554432; // 32 MiB
constexpr int DEFAULT_THREAD_POOL_SIZE = 1;
constexpr int DEFAULT_TIMEOUT_REQUEST = 5;
constexpr int DEFAULT_TIMEOUT_CONTENT = 300;

/**
* Globally readable constant configuration.
Expand Down
17 changes: 8 additions & 9 deletions src/libs/httpserver/server_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,9 @@ class ServerBase {
class Config {
friend class ServerBase<socket_type>;

Config(unsigned short port): port(port) {}
Config(std::string address, unsigned short port)
: port(port),
address(address) {}
Config(std::string address, unsigned short port): address(address), port(port) {}
Config(std::string address, unsigned short port, size_t thread_pool_size, size_t timeout_request, size_t timeout_content):
address(address), port(port), thread_pool_size(thread_pool_size), timeout_request(timeout_request), timeout_content(timeout_content) {}

public:
/// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS.
Expand Down Expand Up @@ -406,10 +405,9 @@ class ServerBase {
std::vector<std::thread> threads;
std::mutex mu;

ServerBase(unsigned short port)
: config(port){}
ServerBase(std::string address, unsigned short port)
: config(address, port) {}
ServerBase(std::string address, unsigned short port) : config(address, port){}
ServerBase(std::string address, unsigned short port, size_t thread_pool_size, size_t timeout_request, size_t timeout_content)
: config(address, port, thread_pool_size, timeout_request, timeout_content) {}

virtual void accept() = 0;

Expand Down Expand Up @@ -624,8 +622,9 @@ typedef boost::asio::ip::tcp::socket HTTP;
template <>
class Server<HTTP> : public ServerBase<HTTP> {
public:
Server(unsigned short port) : ServerBase<HTTP>::ServerBase(port) {}
Server(std::string address, unsigned short port) : ServerBase<HTTP>::ServerBase(address, port) {}
Server(std::string address, unsigned short port, size_t thread_pool_size, size_t timeout_request, size_t timeout_content) :
ServerBase<HTTP>::ServerBase(address, port, thread_pool_size, timeout_request, timeout_content) {}

protected:
void accept() {
Expand Down
3 changes: 2 additions & 1 deletion src/management/src/management_frontend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ std::string json_error_msg(const std::string& exception_msg,

class RequestHandler {
public:
RequestHandler(int portno) : server_(portno), state_db_{} {
RequestHandler(std::string address, int portno)
: server_(address, portno), state_db_{} {
clipper::Config& conf = clipper::get_config();
while (!redis_connection_.connect(conf.get_redis_address(),
conf.get_redis_port())) {
Expand Down
2 changes: 1 addition & 1 deletion src/management/src/management_frontend_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ int main(int argc, char* argv[]) {
conf.set_redis_address(options["redis_ip"].as<std::string>());
conf.set_redis_port(options["redis_port"].as<int>());
conf.ready();
management::RequestHandler rh(clipper::MANAGEMENT_FRONTEND_PORT);
management::RequestHandler rh("0.0.0.0", clipper::MANAGEMENT_FRONTEND_PORT);
rh.start_listening();
}
2 changes: 1 addition & 1 deletion src/management/src/management_frontend_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace {
class ManagementFrontendTest : public ::testing::Test {
public:
ManagementFrontendTest()
: rh_(MANAGEMENT_FRONTEND_PORT),
: rh_("0.0.0.0", MANAGEMENT_FRONTEND_PORT),
redis_(std::make_shared<redox::Redox>()),
subscriber_(std::make_shared<redox::Subscriber>()) {
Config& conf = get_config();
Expand Down

0 comments on commit deaf9a6

Please sign in to comment.