Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fluentd Logging System Part 1 #652

Merged
merged 32 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
aa6e1db
Update Basic example README.md
rkooo567 Nov 24, 2018
f43fa27
Merge branch 'develop' into develop
simon-mo Jan 1, 2019
c43e313
Merge branch 'develop' into develop
simon-mo Jan 2, 2019
a05443a
Added a basic fluentd support
rkooo567 Feb 5, 2019
4525f96
Merge remote-tracking branch 'clipper/develop' into fluentd
rkooo567 Feb 10, 2019
d7bd73a
log_config option added
rkooo567 Feb 10, 2019
f0dae98
fluentd instance is now running
rkooo567 Feb 12, 2019
00c2d91
Now we can mount a fluentd conf file to a docker.
rkooo567 Mar 10, 2019
3b5b1ca
Changed pydocstring style
rkooo567 Mar 11, 2019
b255c4b
Cleaned up some styles
rkooo567 Mar 19, 2019
90d480a
Created an integration test
rkooo567 Mar 21, 2019
bb6a990
Cleaned up some part of code
rkooo567 Mar 21, 2019
09fc089
Refactor done
rkooo567 Mar 21, 2019
0418be7
Cleaning up. Refactoring tests to unittest style
rkooo567 Mar 22, 2019
dff44da
Merge branch 'develop' into fluentd
simon-mo Mar 22, 2019
7bf1f1b
Fixed errors found in CI. 1. Added __init__.py in fluentd folder. 2. …
rkooo567 Mar 23, 2019
132baa7
Merge branch 'develop' into fluentd
simon-mo Mar 28, 2019
a0c8501
Fixed port duplicated error. Internal and external port setting was w…
rkooo567 Mar 28, 2019
0f5af00
Merge branch 'fluentd' of https://github.com/rkooo567/clipper into fl…
rkooo567 Mar 28, 2019
43cfb4b
Merge branch 'develop' into fluentd
simon-mo Mar 30, 2019
80db695
Merge branch 'develop' into fluentd
simon-mo Mar 31, 2019
3500eff
Merge branch 'develop' into fluentd
rkooo567 Apr 1, 2019
b546b16
Merge branch 'develop' into fluentd
rkooo567 Apr 4, 2019
18bef18
Changed is_valid_state logic
rkooo567 Apr 5, 2019
2d3657a
Merge branch 'fluentd' of https://github.com/rkooo567/clipper into fl…
rkooo567 Apr 5, 2019
d9cd323
Remove fluentd test to see if it happens due to the test
rkooo567 Apr 5, 2019
008d0ac
Clean some part of code and add some strings for prettifying test logs
rkooo567 Apr 9, 2019
9aec964
Test with only one broken test to see if parallelization is an issue
rkooo567 Apr 9, 2019
5c9f25f
It will probably fix the bug
rkooo567 Apr 9, 2019
2ea53ce
Added a submodule logging in setup.py to resolve import error
rkooo567 Apr 10, 2019
6f5673a
changed some code based on code review
rkooo567 Apr 27, 2019
0977ad5
Merge branch 'develop' into fluentd
rkooo567 Apr 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/shipyard/clipper_test.cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

DOCKER_INTEGRATION_TESTS = {
"admin_unit_test": "python /clipper/integration-tests/clipper_admin_tests.py",
"fluentd": "python /clipper/integration-tests/clipper_fluentd_logging_docker.py",
"many_apps_many_models": "python /clipper/integration-tests/many_apps_many_models.py",
"pyspark": "python /clipper/integration-tests/deploy_pyspark_models.py",
"pyspark_pipeline": "python /clipper/integration-tests/deploy_pyspark_pipeline_models.py",
Expand Down
5 changes: 4 additions & 1 deletion clipper_admin/clipper_admin/container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CLIPPER_INTERNAL_RPC_PORT = 7000
CLIPPER_INTERNAL_METRIC_PORT = 1390
CLIPPER_INTERNAL_REDIS_PORT = 6379
CLIPPER_INTERNAL_FLUENTD_PORT = 24224

CLIPPER_DOCKER_LABEL = "ai.clipper.container.label"
CLIPPER_NAME_LABEL = "ai.clipper.name"
Expand All @@ -22,9 +23,11 @@
'query_rest': 'ai.clipper.query_frontend.query.port',
'query_rpc': 'ai.clipper.query_frontend.rpc.port',
'management': 'ai.clipper.management.port',
'metric': 'ai.clipper.metric.port'
'metric': 'ai.clipper.metric.port',
'fluentd': 'ai.clipper.fluentd.port'
}
CLIPPER_METRIC_CONFIG_LABEL = 'ai.clipper.metric.config'
CLIPPER_FLUENTD_CONFIG_LABEL = 'ai.clipper.fluentd.config'

# NOTE: we use '_' as the delimiter because kubernetes allows the use
# '_' in labels but not in deployment names. We force model names and
Expand Down
116 changes: 84 additions & 32 deletions clipper_admin/clipper_admin/docker/docker_container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,35 @@
import docker
import logging
import os
import sys
import random
import time
import json
import tempfile
from ..container_manager import (
create_model_container_label, parse_model_container_label,
ContainerManager, CLIPPER_DOCKER_LABEL, CLIPPER_MODEL_CONTAINER_LABEL,
CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL,
CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL, CLIPPER_INTERNAL_RPC_PORT,
CLIPPER_INTERNAL_QUERY_PORT, CLIPPER_INTERNAL_MANAGEMENT_PORT,
CLIPPER_INTERNAL_MANAGEMENT_PORT,
CLIPPER_INTERNAL_METRIC_PORT, CLIPPER_INTERNAL_REDIS_PORT,
CLIPPER_DOCKER_PORT_LABELS, CLIPPER_METRIC_CONFIG_LABEL, ClusterAdapter)
from ..exceptions import ClipperException
CLIPPER_DOCKER_PORT_LABELS, CLIPPER_METRIC_CONFIG_LABEL, ClusterAdapter,
CLIPPER_FLUENTD_CONFIG_LABEL)
from requests.exceptions import ConnectionError
from .docker_metric_utils import *
from clipper_admin.docker.logging.docker_logging_utils import (
get_logs_from_containers,
get_default_log_config
)
from clipper_admin.docker.logging.fluentd.fluentd import Fluentd

logger = logging.getLogger(__name__)


class DockerContainerManager(ContainerManager):
# Logging-TODO Add SQLITE support
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self,
cluster_name="default-cluster",
docker_ip_address="localhost",
use_centralized_log=False,
fluentd_port=24224,
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
clipper_query_port=1337,
clipper_management_port=1338,
clipper_rpc_port=7000,
Expand All @@ -47,6 +52,10 @@ def __init__(self,
The public hostname or IP address at which the Clipper Docker
containers can be accessed via their exposed ports. This should almost always
be "localhost". Only change if you know what you're doing!
use_centralized_log: bool, optional
If it is True, Clipper sets up Fluentd and DB (Currently SQlite) to centralize logs
fluentd_port : int, optional
The port on which the fluentd logging driver should listen to centralize logs.
clipper_query_port : int, optional
The port on which the query frontend should listen for incoming prediction requests.
clipper_management_port : int, optional
Expand Down Expand Up @@ -80,6 +89,7 @@ def __init__(self,
self.external_redis = True
self.redis_port = redis_port
self.prometheus_port = prometheus_port
self.centralize_log = use_centralized_log
if docker_network is "host":
raise ClipperException(
"DockerContainerManager does not support running Clipper on the "
Expand Down Expand Up @@ -113,6 +123,21 @@ def __init__(self,
'cluster_name': self.cluster_identifier
})

# Setting Docker cluster logging.
# Logging-TODO Add SQLITE support
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
self.logging_system = Fluentd
self.log_config = get_default_log_config()
self.logging_system_instance = None

if self.centralize_log:
self.logging_system_instance = self.logging_system(
self.logger,
self.cluster_name,
self.docker_client,
port=find_unbound_port(fluentd_port)
)
self.log_config = self.logging_system_instance.get_log_config()

def start_clipper(self,
query_frontend_image,
mgmt_frontend_image,
Expand Down Expand Up @@ -152,6 +177,11 @@ def start_clipper(self,
"Please use ClipperConnection.connect() to connect to it.".
format(self.cluster_name))

if self.centralize_log:
# Logging-TODO Initialize SQLite Logging DB
self.logging_system_instance.start(self.common_labels, self.extra_container_kwargs)

# Redis for cluster configuration
if not self.external_redis:
self.logger.info("Starting managed Redis instance in Docker")
self.redis_port = find_unbound_port(self.redis_port)
Expand All @@ -161,6 +191,7 @@ def start_clipper(self,
redis_container = self.docker_client.containers.run(
'redis:alpine',
"redis-server --port %s" % CLIPPER_INTERNAL_REDIS_PORT,
log_config=self.log_config,
name="redis-{}".format(random.randint(
0, 100000)), # generate a random name
ports={
Expand All @@ -170,6 +201,7 @@ def start_clipper(self,
**self.extra_container_kwargs)
self.redis_ip = redis_container.name

# frontend management
mgmt_cmd = "--redis_ip={redis_ip} --redis_port={redis_port}".format(
redis_ip=self.redis_ip, redis_port=CLIPPER_INTERNAL_REDIS_PORT)
self.clipper_management_port = find_unbound_port(
Expand All @@ -181,6 +213,7 @@ def start_clipper(self,
self.docker_client.containers.run(
mgmt_frontend_image,
mgmt_cmd,
log_config=self.log_config,
name="mgmt_frontend-{}".format(random.randint(
0, 100000)), # generate a random name
ports={
Expand All @@ -190,6 +223,7 @@ def start_clipper(self,
labels=mgmt_labels,
**self.extra_container_kwargs)

# query frontend
query_cmd = ("--redis_ip={redis_ip} --redis_port={redis_port} "
"--prediction_cache_size={cache_size} "
"--thread_pool_size={thread_pool_size} "
Expand All @@ -214,6 +248,7 @@ def start_clipper(self,
self.docker_client.containers.run(
query_frontend_image,
query_cmd,
log_config=self.log_config,
name=query_name,
ports={
'%s/tcp' % CLIPPER_INTERNAL_QUERY_PORT:
Expand All @@ -229,7 +264,7 @@ def start_clipper(self,
run_query_frontend_metric_image(
query_frontend_metric_name, self.docker_client, query_name,
frontend_exporter_image, self.common_labels,
self.extra_container_kwargs)
self.log_config, self.extra_container_kwargs)

self.prom_config_path = tempfile.NamedTemporaryFile(
'w', suffix='.yml', delete=False).name
Expand All @@ -247,7 +282,7 @@ def start_clipper(self,
metric_labels[CLIPPER_METRIC_CONFIG_LABEL] = self.prom_config_path
run_metric_image(self.docker_client, metric_labels,
self.prometheus_port, self.prom_config_path,
self.extra_container_kwargs)
self.log_config, self.extra_container_kwargs)

self.connect()

Expand Down Expand Up @@ -278,6 +313,19 @@ def connect(self):
self.prometheus_port = all_labels[CLIPPER_DOCKER_PORT_LABELS['metric']]
self.prom_config_path = all_labels[CLIPPER_METRIC_CONFIG_LABEL]

if self._is_valid_logging_state_to_connect(all_labels):
self.centralize_log= True
self.logging_system_instance = \
self.logging_system(
self.logger,
self.cluster_name,
self.docker_client,
port=all_labels[CLIPPER_DOCKER_PORT_LABELS['fluentd']],
conf_path=all_labels[CLIPPER_FLUENTD_CONFIG_LABEL]
)
self.log_config = self.logging_system_instance.get_log_config()
# Logging-TODO Add a Sqlite support

def deploy_model(self, name, version, input_type, image, num_replicas=1):
# Parameters
# ----------
Expand Down Expand Up @@ -335,11 +383,13 @@ def _add_replica(self, name, version, input_type, image):

model_container_name = model_container_label + '-{}'.format(
random.randint(0, 100000))

self.docker_client.containers.run(
image,
name=model_container_name,
environment=env_vars,
labels=labels,
log_config=self.log_config,
**self.extra_container_kwargs)

# Metric Section
Expand Down Expand Up @@ -396,30 +446,10 @@ def set_num_replicas(self, name, version, input_type, image, num_replicas):
self.prometheus_port)

def get_logs(self, logging_dir):
containers = self.docker_client.containers.list(
filters={
"label":
"{key}={val}".format(
key=CLIPPER_DOCKER_LABEL, val=self.cluster_name)
})
logging_dir = os.path.abspath(os.path.expanduser(logging_dir))

log_files = []
if not os.path.exists(logging_dir):
os.makedirs(logging_dir)
self.logger.info("Created logging directory: %s" % logging_dir)
for c in containers:
log_file_name = "image_{image}:container_{id}.log".format(
image=c.image.short_id, id=c.short_id)
log_file = os.path.join(logging_dir, log_file_name)
if sys.version_info < (3, 0):
with open(log_file, "w") as lf:
lf.write(c.logs(stdout=True, stderr=True))
else:
with open(log_file, "wb") as lf:
lf.write(c.logs(stdout=True, stderr=True))
log_files.append(log_file)
return log_files
if self.centralize_log:
return self.logging_system_instance.get_logs(logging_dir)
else:
return get_logs_from_containers(self, logging_dir)

def stop_models(self, models):
containers = self.docker_client.containers.list(
Expand Down Expand Up @@ -459,6 +489,26 @@ def stop_all(self, graceful=True):
else:
c.kill()

def _is_valid_logging_state_to_connect(self, all_labels):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change the logic of this part. I will make new clipper connection to turn on use_log_centralization flag if there is a fluentd instance running in a cluster regardless of use_log_centralization flag of the current DockerContainerManager instance. As you can see the current logic is that if the flag is different from the cluster context (meaning if use_centralization is on but there's no fluentd instance), it will cause an error. I will change this to

  1. If Fluentd instance is within a cluster and the current flag is off -> current flag is on.
  2. If current flag is on but there's no fluentd instance running -> ClipperException
  3. Otherwise same

if self.centralize_log and not self.logging_system.container_is_running(all_labels):
raise ConnectionError(
"Invalid state detected. "
"log centralization is {log_centralization_state}, "
"but cannot find fluentd instance running. "
"Please change your use_centralized_log parameter of DockerContainermanager"
.format(log_centralization_state=self.centralize_log)
)
elif self.logging_system.container_is_running(all_labels) and not self.centralize_log:
raise ConnectionError(
"Invalid state detected. "
"Fluentd instance is running, "
"but log centralization state is {log_centralization_state}. "
"Please change your use_centralized_log parameter of DockerContainerManager to True"
.format(log_centralization_state=self.centralize_log)
)
else:
return self.logging_system.container_is_running(all_labels)

def get_admin_addr(self):
return "{host}:{port}".format(
host=self.public_hostname, port=self.clipper_management_port)
Expand Down Expand Up @@ -514,3 +564,5 @@ def find_unbound_port(start=None,
start += 1
else:
start = random.randint(*port_range)


6 changes: 4 additions & 2 deletions clipper_admin/clipper_admin/docker/docker_metric_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_prometheus_base_config():

def run_query_frontend_metric_image(name, docker_client, query_name,
frontend_exporter_image, common_labels,
extra_container_kwargs):
log_config, extra_container_kwargs):
"""
Use docker_client to run a frontend-exporter image.
:param name: Name to pass in, need to be unique.
Expand All @@ -38,6 +38,7 @@ def run_query_frontend_metric_image(name, docker_client, query_name,
docker_client.containers.run(
frontend_exporter_image,
query_frontend_metric_cmd,
log_config=log_config,
name=name,
labels=query_frontend_metric_labels,
**extra_container_kwargs)
Expand Down Expand Up @@ -72,7 +73,7 @@ def setup_metric_config(query_frontend_metric_name, prom_config_path,


def run_metric_image(docker_client, common_labels, prometheus_port,
prom_config_path, extra_container_kwargs):
prom_config_path, log_config, extra_container_kwargs):
"""
Run the prometheus image.
:param docker_client: The docker client object
Expand All @@ -96,6 +97,7 @@ def run_metric_image(docker_client, common_labels, prometheus_port,
metric_cmd,
name="metric_frontend-{}".format(random.randint(0, 100000)),
ports={'9090/tcp': prometheus_port},
log_config=log_config,
volumes={
prom_config_path: {
'bind': '/etc/prometheus/prometheus.yml',
Expand Down
52 changes: 52 additions & 0 deletions clipper_admin/clipper_admin/docker/logging/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Clipper Logging with Fluentd

## Log Centralization (Beta)
Clipper uses Fluentd (https://www.fluentd.org/) for centralizing logs from Docker containers within Clipper cluster.
It is currently a beta version. It only supports centralizing logs into Fluentd instance for now, but we will add various functinoalities
like monitoring and debugging on the top of it. Please create an issue if you want any functionality.
Also, please don't hesistate to contribute if you add any features.

## How to guide
Firstly, when you define `DockerContainerManager`, you should set `use_centralized_log` parameter to be `True`

```python
clipper_conn = ClipperConnection(DockerContainerManager(use_centralized_log=True))
clipper_conn.start_clipper()
```

Once you start up the clipper cluster, you can check fluentd Docker container running.

```bash
$docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
170000ec75d7 default-cluster-simple-example:1 "/container/containe…" 11 seconds ago Up 10 seconds (healthy) simple-example_1-71538
5b533ff2fd3a prom/prometheus:v2.1.0 "/bin/prometheus --c…" 13 seconds ago Up 12 seconds 0.0.0.0:9090->9090/tcp metric_frontend-7206
b71b557a0001 clipper/frontend-exporter:develop "python /usr/src/app…" 14 seconds ago Up 13 seconds query_frontend_exporter-55488
bc8a7cc31754 clipper/query_frontend:develop "/clipper/release/sr…" 15 seconds ago Up 14 seconds 0.0.0.0:1337->1337/tcp, 0.0.0.0:7000->7000/tcp query_frontend-55488
d04f33c654fd clipper/management_frontend:develop "/clipper/release/sr…" 15 seconds ago Up 15 seconds 0.0.0.0:1338->1338/tcp mgmt_frontend-60461
30103e84e2a1 redis:alpine "docker-entrypoint.s…" 16 seconds ago Up 15 seconds 0.0.0.0:30356->6379/tcp redis-82152
b78c3242c3e7 fluent/fluentd:v1.3-debian-1 "tini -- /bin/entryp…" 17 seconds ago Up 16 seconds 5140/tcp, 0.0.0.0:24224->24224/tcp, 0.0.0.0:24224->24224/udp fluentd-51374
```

You can see centralized logs from fluentd container's stdout. Type

```bash
$docker logs <fluentd_container_id>
```

Currently, it just prints out huge amount of logs centralized. It is because this feature is in the beggining phase.
We will add persistent storage for logs and query feature in the upcoming version.

## How to customize
Currently, we don't recommend customizing a logging feature or using it for production. It is immature and unstable. Some APIs can be drastically changed.
If you still want to use it, you can directly modify fluentd conf file. It is mounted in a temp folder which you can easily find through python interactive shell.

```python
>>> # Make sure you already ran clipper_conn.clipper_conn.start_clipper() with DockerContainerManager(use_centralized_log=True). Also, it is the python shell.
>>> clipper_conn = ClipperConnection(DockerContainerManager(use_centralized_log=True))
>>> clipper_conn.connect()
19-03-21:10:36:58 INFO [clipper_admin.py:157] [default-cluster] Successfully connected to Clipper cluster at localhost:1337
>>> cm = clipper_conn.cm
>>> cm.logging_system_instance.conf_path
# It will show you conf file path mounted on your local machine.
```
Empty file.
Loading