Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose http configurations to clipper_admin #524

Merged
merged 7 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions clipper_admin/clipper_admin/clipper_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def start_clipper(self,
frontend_exporter_image='{}/frontend-exporter:{}'.format(
__registry__, __version__),
cache_size=DEFAULT_PREDICTION_CACHE_SIZE_BYTES,
qf_http_thread_pool_size=1,
qf_http_timeout_request=5,
qf_http_timeout_content=300,
num_frontend_replicas=1):
"""Start a new Clipper cluster and connect to it.
Expand All @@ -114,9 +117,15 @@ def start_clipper(self,
The frontend exporter docker image to use. You can set this argument to specify
a custom build of the management frontend, but any customization should maintain API
compability and preserve the expected behavior of the system.
cache_size : int, optional
cache_size : int(optional)
The size of Clipper's prediction cache in bytes. Default cache size is 32 MiB.
num_frontend_replicas : int, option
qf_http_thread_pool_size : int(optional)
The size of thread pool created in query frontend for http serving.
qf_http_timeout_request : int(optional)
The seconds of timeout on request handling in query frontend for http serving..
qf_http_timeout_content : int(optional)
The seconds of timeout on content handling in query frontend for http serving..
num_frontend_replicas : int(optional)
The number of query frontend to deploy for fault tolerance and high availability.
Raises
Expand All @@ -126,7 +135,8 @@ def start_clipper(self,
try:
self.cm.start_clipper(query_frontend_image, mgmt_frontend_image,
frontend_exporter_image, cache_size,
num_frontend_replicas)
qf_http_thread_pool_size, qf_http_timeout_request,
qf_http_timeout_content, num_frontend_replicas)
except ClipperException as e:
self.logger.warning("Error starting Clipper: {}".format(e.msg))
raise e
Expand Down
3 changes: 2 additions & 1 deletion clipper_admin/clipper_admin/container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class ContainerManager(object):
@abc.abstractmethod
def start_clipper(self, query_frontend_image, mgmt_frontend_image,
frontend_exporter_image, cache_size,
num_frontend_replicas):
qf_http_thread_pool_size, qf_http_timeout_request,
qf_http_timeout_content, num_frontend_replicas):
# NOTE: An implementation of this interface should be connected to a running
# Clipper instance when this method returns. ClipperConnection will not
# call ContainerManager.connect() separately after calling start_clipper(), so
Expand Down
15 changes: 11 additions & 4 deletions clipper_admin/clipper_admin/docker/docker_container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def start_clipper(self,
mgmt_frontend_image,
frontend_exporter_image,
cache_size,
prometheus_version,
qf_http_thread_pool_size,
qf_http_timeout_request,
qf_http_timeout_content,
num_frontend_replicas=1):
if num_frontend_replicas != 1:
msg = "Docker container manager's query frontend scale-out " \
Expand Down Expand Up @@ -189,11 +191,16 @@ def start_clipper(self,
**self.extra_container_kwargs)

query_cmd = ("--redis_ip={redis_ip} --redis_port={redis_port} "
"--prediction_cache_size={cache_size}").format(
"--prediction_cache_size={cache_size} "
"--thread_pool_size={thread_pool_size} "
"--timeout_request={timeout_request} "
"--timeout_content={timeout_content}").format(
redis_ip=self.redis_ip,
redis_port=CLIPPER_INTERNAL_REDIS_PORT,
cache_size=cache_size)

cache_size=cache_size,
thread_pool_size=qf_http_thread_pool_size,
timeout_request=qf_http_timeout_request,
timeout_content=qf_http_timeout_content)
query_container_id = random.randint(0, 100000)
query_name = "query_frontend-{}".format(query_container_id)
self.clipper_query_port = find_unbound_port(self.clipper_query_port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,17 @@ def start_clipper(self,
mgmt_frontend_image,
frontend_exporter_image,
cache_size,
qf_http_thread_pool_size,
qf_http_timeout_request,
qf_http_timeout_content,
num_frontend_replicas=1):
self._start_redis()
self._start_mgmt(mgmt_frontend_image)
self.num_frontend_replicas = num_frontend_replicas
self._start_query(query_frontend_image, frontend_exporter_image,
cache_size, num_frontend_replicas)
cache_size, qf_http_thread_pool_size,
qf_http_timeout_request, qf_http_timeout_content,
num_frontend_replicas)
self._start_prometheus()
self.connect()

Expand Down Expand Up @@ -242,7 +247,8 @@ def _start_mgmt(self, mgmt_image):
body=mgmt_service_data, namespace=self.k8s_namespace)

def _start_query(self, query_image, frontend_exporter_image, cache_size,
num_replicas):
qf_http_thread_pool_size, qf_http_timeout_request,
qf_http_timeout_content, num_replicas):
for query_frontend_id in range(num_replicas):
with _pass_conflicts():
query_deployment_data = self._generate_config(
Expand All @@ -252,6 +258,9 @@ def _start_query(self, query_image, frontend_exporter_image, cache_size,
redis_service_host=self.redis_ip,
redis_service_port=self.redis_port,
cache_size=cache_size,
thread_pool_size=qf_http_thread_pool_size,
timeout_request=qf_http_timeout_request,
timeout_content=qf_http_timeout_content,
name='query-frontend-{}'.format(query_frontend_id),
id_label=str(query_frontend_id),
cluster_name=self.cluster_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ spec:
- "--redis_ip={{ redis_service_host | default('$(REDIS_SERVICE_HOST)', true) }}"
- "--redis_port={{ redis_service_port | default('$(REDIS_SERVICE_PORT)', true) }}"
- "--prediction_cache_size={{ cache_size }}"
- "--thread_pool_size={{ thread_pool_size }}"
- "--timeout_request={{ timeout_request }}"
- "--timeout_content={{ timeout_content }}"
image: {{ image }}
imagePullPolicy: Always
name: query-frontend
Expand Down
9 changes: 7 additions & 2 deletions src/frontends/src/query_frontend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,13 @@ class AppMetrics {
template <class QP>
class RequestHandler {
public:
RequestHandler(std::string address, int portno)
: server_(address, portno), query_processor_() {
RequestHandler(std::string address, int portno,
int thread_pool_size = clipper::DEFAULT_THREAD_POOL_SIZE,
int timeout_request = clipper::DEFAULT_TIMEOUT_REQUEST,
int timeout_content = clipper::DEFAULT_TIMEOUT_CONTENT)
: server_(address, portno, thread_pool_size, timeout_request,
timeout_content),
query_processor_() {
clipper::Config& conf = clipper::get_config();
while (!redis_connection_.connect(conf.get_redis_address(),
conf.get_redis_port())) {
Expand Down
13 changes: 11 additions & 2 deletions src/frontends/src/query_frontend_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ int main(int argc, char* argv[]) {
("rpc_service_port", "RPCService's port",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_RPC_SERVICE_PORT)))
("prediction_cache_size", "Size of the prediction cache in bytes, excluding cache metadata",
cxxopts::value<long>()->default_value(std::to_string(clipper::DEFAULT_PREDICTION_CACHE_SIZE_BYTES)));
cxxopts::value<long>()->default_value(std::to_string(clipper::DEFAULT_PREDICTION_CACHE_SIZE_BYTES)))
("thread_pool_size", "thread pool size of server_http",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_THREAD_POOL_SIZE)))
("timeout_request", "timeout_request of server_http",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_TIMEOUT_REQUEST)))
("timeout_content", "timeout_content of server_http",
cxxopts::value<int>()->default_value(std::to_string(clipper::DEFAULT_TIMEOUT_CONTENT)));
// clang-format on
options.parse(argc, argv);

Expand All @@ -29,6 +35,9 @@ int main(int argc, char* argv[]) {
conf.ready();

query_frontend::RequestHandler<clipper::QueryProcessor> rh(
"0.0.0.0", clipper::QUERY_FRONTEND_PORT);
"0.0.0.0", clipper::QUERY_FRONTEND_PORT,
options["thread_pool_size"].as<int>(),
options["timeout_request"].as<int>(),
options["timeout_content"].as<int>());
rh.start_listening();
}
30 changes: 25 additions & 5 deletions src/frontends/src/query_frontend_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ class QueryFrontendTest : public ::testing::Test {
std::shared_ptr<redox::Subscriber> subscriber_;

QueryFrontendTest()
: rh_("0.0.0.0", 1337),
: rh_("0.0.0.0",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT),
redis_(std::make_shared<redox::Redox>()),
subscriber_(std::make_shared<redox::Subscriber>()) {
Config& conf = get_config();
Expand Down Expand Up @@ -406,7 +410,11 @@ TEST_F(QueryFrontendTest, TestReadApplicationsAtStartup) {
ASSERT_TRUE(add_application(*redis_, name2, input_type2, policy2,
default_output2, latency_slo_micros2));

RequestHandler<MockQueryProcessor> rh2_("127.0.0.1", 1337);
RequestHandler<MockQueryProcessor> rh2_("127.0.0.1",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT);
size_t two_apps = rh2_.num_applications();
EXPECT_EQ(two_apps, (size_t)2);
}
Expand Down Expand Up @@ -435,7 +443,11 @@ TEST_F(QueryFrontendTest, TestReadModelsAtStartup) {
std::unordered_map<std::string, std::string> expected_models = {{"m", "2"},
{"n", "3"}};

RequestHandler<MockQueryProcessor> rh2_("127.0.0.1", 1337);
RequestHandler<MockQueryProcessor> rh2_("127.0.0.1",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT);
EXPECT_EQ(rh2_.get_current_model_versions(), expected_models);
}

Expand All @@ -458,7 +470,11 @@ TEST_F(QueryFrontendTest, TestReadModelLinksAtStartup) {

std::vector<std::string> expected_app1_linked_models = {"m1", "m2", "m3"};

RequestHandler<MockQueryProcessor> rh2_("127.0.0.1", 1337);
RequestHandler<MockQueryProcessor> rh2_("127.0.0.1",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT);

std::vector<std::string> app1_linked_models =
rh2_.get_linked_models_for_app(app_name_1);
Expand All @@ -482,7 +498,11 @@ TEST_F(QueryFrontendTest, TestReadInvalidModelVersionAtStartup) {
container_name, model_path, DEFAULT_BATCH_SIZE));
// Not setting the version number will cause get_current_model_version()
// to return -1, and the RequestHandler should then throw a runtime_error.
ASSERT_THROW(RequestHandler<QueryProcessor>("127.0.0.1", 1337),
ASSERT_THROW(RequestHandler<QueryProcessor>("127.0.0.1",
QUERY_FRONTEND_PORT,
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_TIMEOUT_REQUEST,
DEFAULT_TIMEOUT_CONTENT),
std::runtime_error);
}

Expand Down
3 changes: 3 additions & 0 deletions src/libclipper/include/clipper/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ const std::string DEFAULT_REDIS_ADDRESS("localhost");
constexpr int DEFAULT_REDIS_PORT = 6379;
constexpr int DEFAULT_RPC_SERVICE_PORT = 7000;
constexpr long DEFAULT_PREDICTION_CACHE_SIZE_BYTES = 33554432; // 32 MiB
constexpr int DEFAULT_THREAD_POOL_SIZE = 1;
constexpr int DEFAULT_TIMEOUT_REQUEST = 5;
constexpr int DEFAULT_TIMEOUT_CONTENT = 300;

/**
* Globally readable constant configuration.
Expand Down
17 changes: 8 additions & 9 deletions src/libs/httpserver/server_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,9 @@ class ServerBase {
class Config {
friend class ServerBase<socket_type>;

Config(unsigned short port): port(port) {}
Config(std::string address, unsigned short port)
: port(port),
address(address) {}
Config(std::string address, unsigned short port): address(address), port(port) {}
Config(std::string address, unsigned short port, size_t thread_pool_size, size_t timeout_request, size_t timeout_content):
address(address), port(port), thread_pool_size(thread_pool_size), timeout_request(timeout_request), timeout_content(timeout_content) {}

public:
/// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS.
Expand Down Expand Up @@ -406,10 +405,9 @@ class ServerBase {
std::vector<std::thread> threads;
std::mutex mu;

ServerBase(unsigned short port)
: config(port){}
ServerBase(std::string address, unsigned short port)
: config(address, port) {}
ServerBase(std::string address, unsigned short port) : config(address, port){}
ServerBase(std::string address, unsigned short port, size_t thread_pool_size, size_t timeout_request, size_t timeout_content)
: config(address, port, thread_pool_size, timeout_request, timeout_content) {}

virtual void accept() = 0;

Expand Down Expand Up @@ -624,8 +622,9 @@ typedef boost::asio::ip::tcp::socket HTTP;
template <>
class Server<HTTP> : public ServerBase<HTTP> {
public:
Server(unsigned short port) : ServerBase<HTTP>::ServerBase(port) {}
Server(std::string address, unsigned short port) : ServerBase<HTTP>::ServerBase(address, port) {}
Server(std::string address, unsigned short port, size_t thread_pool_size, size_t timeout_request, size_t timeout_content) :
ServerBase<HTTP>::ServerBase(address, port, thread_pool_size, timeout_request, timeout_content) {}

protected:
void accept() {
Expand Down
3 changes: 2 additions & 1 deletion src/management/src/management_frontend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ std::string json_error_msg(const std::string& exception_msg,

class RequestHandler {
public:
RequestHandler(int portno) : server_(portno), state_db_{} {
RequestHandler(std::string address, int portno)
: server_(address, portno), state_db_{} {
clipper::Config& conf = clipper::get_config();
while (!redis_connection_.connect(conf.get_redis_address(),
conf.get_redis_port())) {
Expand Down
2 changes: 1 addition & 1 deletion src/management/src/management_frontend_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ int main(int argc, char* argv[]) {
conf.set_redis_address(options["redis_ip"].as<std::string>());
conf.set_redis_port(options["redis_port"].as<int>());
conf.ready();
management::RequestHandler rh(clipper::MANAGEMENT_FRONTEND_PORT);
management::RequestHandler rh("0.0.0.0", clipper::MANAGEMENT_FRONTEND_PORT);
rh.start_listening();
}
2 changes: 1 addition & 1 deletion src/management/src/management_frontend_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace {
class ManagementFrontendTest : public ::testing::Test {
public:
ManagementFrontendTest()
: rh_(MANAGEMENT_FRONTEND_PORT),
: rh_("0.0.0.0", MANAGEMENT_FRONTEND_PORT),
redis_(std::make_shared<redox::Redox>()),
subscriber_(std::make_shared<redox::Subscriber>()) {
Config& conf = get_config();
Expand Down