Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

verify sane log times in logging stack #4682

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ class LoggingCheck(OpenShiftCheck):
"""Base class for logging component checks"""

name = "logging"
logging_namespace = "logging"

@classmethod
def is_active(cls, task_vars):
return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars)
logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=False)
return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars) and logging_deployed

@staticmethod
def is_first_master(task_vars):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bad name, makes the is_active expression look like we only care about being first master, when in fact it is a bit more than that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated method to only check if first master. Moved logging_deployed check to is_active

"""Run only on first master and only when logging is configured. Returns: bool"""
logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=True)
"""Run only on first master. Returns: bool"""
# Note: It would be nice to use membership in oo_first_master group, however for now it
# seems best to avoid requiring that setup and just check this is the first master.
hostname = get_var(task_vars, "ansible_ssh_host") or [None]
masters = get_var(task_vars, "groups", "masters", default=None) or [None]
return logging_deployed and masters[0] == hostname
return masters and masters[0] == hostname

def run(self, tmp, task_vars):
pass
Expand All @@ -45,7 +46,7 @@ def get_pods_for_component(self, execute_module, namespace, logging_component, t
raise ValueError()
except ValueError:
# successful run but non-parsing data generally means there were no pods in the namespace
return None, 'There are no pods in the {} namespace. Is logging deployed?'.format(namespace)
return None, 'No pods were found for the "{}" logging component.'.format(logging_component)

return pods['items'], None

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Check for ensuring logs from pods can be queried in a reasonable amount of time.
"""

import json
import time

from uuid import uuid4

from openshift_checks import get_var, OpenShiftCheckException
from openshift_checks.logging.logging import LoggingCheck


ES_CMD_TIMEOUT_SECONDS = 30


class LoggingIndexTime(LoggingCheck):
"""Check that pod logs are aggregated and indexed in ElasticSearch within a reasonable amount of time."""
name = "logging_index_time"
tags = ["health", "logging"]

logging_namespace = "logging"

def run(self, tmp, task_vars):
"""Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs."""
try:
log_index_timeout = int(
get_var(task_vars, "openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS)
)
except ValueError:
return {
"failed": True,
"msg": ('Invalid value provided for "openshift_check_logging_index_timeout_seconds". '
'Value must be an integer representing an amount in seconds.'),
}

running_component_pods = dict()

# get all component pods
self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default=self.logging_namespace)
for component, name in (['kibana', 'Kibana'], ['es', 'Elasticsearch']):
pods, error = self.get_pods_for_component(
self.execute_module, self.logging_namespace, component, task_vars,
)

if error:
msg = 'Unable to retrieve pods for the {} logging component: {}'
return {"failed": True, "changed": False, "msg": msg.format(name, error)}

running_pods = self.running_pods(pods)

if not running_pods:
msg = ('No {} pods in the "Running" state were found.'
'At least one pod is required in order to perform this check.')
return {"failed": True, "changed": False, "msg": msg.format(name)}

running_component_pods[component] = running_pods

uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0], task_vars)
self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout, task_vars)
return {}

def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs, task_vars):
"""Retry an Elasticsearch query every second until query success, or a defined
length of time has passed."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, now this is out of sync with the implementation?

Re-reading the previous conversation in #4682 (comment) and reconsidering, I get that the intention of this check is look at the elapsed time between writing something to Kibana and being able to read it from Elasticsearch.

Sorry for the back-and-forth, but I'll go with what @sosiouxme said and what you had originally, that we want the whole check to run within a certain timeout, and we try to query ES as many times as possible within that time, always spaced by some interval.

deadline = time.time() + timeout_secs
interval = 1
while not self.query_es_from_es(es_pod, uuid, task_vars):
if time.time() + interval > deadline:
msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s."
raise OpenShiftCheckException(msg.format(uuid, timeout_secs))
time.sleep(interval)

def curl_kibana_with_uuid(self, kibana_pod, task_vars):
"""curl Kibana with a unique uuid."""
uuid = self.generate_uuid()
pod_name = kibana_pod["metadata"]["name"]
exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uuid}"
exec_cmd = exec_cmd.format(pod_name=pod_name, uuid=uuid)

error_str = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars)

try:
error_code = json.loads(error_str)["statusCode"]
except KeyError:
msg = ('invalid response returned from Kibana request (Missing "statusCode" key):\n'
'Command: {}\nResponse: {}').format(exec_cmd, error_str)
raise OpenShiftCheckException(msg)
except ValueError:
msg = ('invalid response returned from Kibana request (Non-JSON output):\n'
'Command: {}\nResponse: {}').format(exec_cmd, error_str)
raise OpenShiftCheckException(msg)

if error_code != 404:
msg = 'invalid error code returned from Kibana request. Expecting error code "404", but got "{}" instead.'
raise OpenShiftCheckException(msg.format(error_code))

return uuid

def query_es_from_es(self, es_pod, uuid, task_vars):
"""curl the Elasticsearch pod and look for a unique uuid in its logs."""
pod_name = es_pod["metadata"]["name"]
exec_cmd = (
"exec {pod_name} -- curl --max-time 30 -s -f "
"--cacert /etc/elasticsearch/secret/admin-ca "
"--cert /etc/elasticsearch/secret/admin-cert "
"--key /etc/elasticsearch/secret/admin-key "
"https://logging-es:9200/project.{namespace}*/_count?q=message:{uuid}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it should probably retrieve the project's UUID and be more specific about the ES index it's querying - reduce query time and load on ES. However I think it's a small enough concern that we can save this for when it becomes a problem or we're revisiting the code for some reason.

)
exec_cmd = exec_cmd.format(pod_name=pod_name, namespace=self.logging_namespace, uuid=uuid)
result = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars)

try:
count = json.loads(result)["count"]
except KeyError:
msg = 'invalid response from Elasticsearch query:\n"{}"\nMissing "count" key:\n{}'
raise OpenShiftCheckException(msg.format(exec_cmd, result))
except ValueError:
msg = 'invalid response from Elasticsearch query:\n"{}"\nNon-JSON output:\n{}'
raise OpenShiftCheckException(msg.format(exec_cmd, result))

return count

@staticmethod
def running_pods(pods):
"""Filter pods that are running."""
return [pod for pod in pods if pod['status']['phase'] == 'Running']

@staticmethod
def generate_uuid():
"""Wrap uuid generator. Allows for testing with expected values."""
return str(uuid4())
2 changes: 1 addition & 1 deletion roles/openshift_health_checker/test/logging_check_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_is_active(groups, logging_deployed, is_active):
(
'No resources found.',
None,
'There are no pods in the logging namespace',
'No pods were found for the "es"',
),
(
json.dumps({'items': [plain_kibana_pod, plain_es_pod, plain_curator_pod, fluentd_pod_node1]}),
Expand Down
182 changes: 182 additions & 0 deletions roles/openshift_health_checker/test/logging_index_time_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import json

import pytest

from openshift_checks.logging.logging_index_time import LoggingIndexTime, OpenShiftCheckException


SAMPLE_UUID = "unique-test-uuid"


def canned_loggingindextime(exec_oc=None):
"""Create a check object with a canned exec_oc method"""
check = LoggingIndexTime("dummy") # fails if a module is actually invoked
if exec_oc:
check.exec_oc = exec_oc
return check


plain_running_elasticsearch_pod = {
"metadata": {
"labels": {"component": "es", "deploymentconfig": "logging-es-data-master"},
"name": "logging-es-data-master-1",
},
"status": {
"containerStatuses": [{"ready": True}, {"ready": True}],
"phase": "Running",
}
}
plain_running_kibana_pod = {
"metadata": {
"labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
"name": "logging-kibana-1",
},
"status": {
"containerStatuses": [{"ready": True}, {"ready": True}],
"phase": "Running",
}
}
not_running_kibana_pod = {
"metadata": {
"labels": {"component": "kibana", "deploymentconfig": "logging-kibana"},
"name": "logging-kibana-2",
},
"status": {
"containerStatuses": [{"ready": True}, {"ready": False}],
"conditions": [{"status": "True", "type": "Ready"}],
"phase": "pending",
}
}


@pytest.mark.parametrize('pods, expect_pods', [
(
[not_running_kibana_pod],
[],
),
(
[plain_running_kibana_pod],
[plain_running_kibana_pod],
),
(
[],
[],
)
])
def test_check_running_pods(pods, expect_pods):
check = canned_loggingindextime(None)
pods = check.running_pods(pods)
assert pods == expect_pods


@pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [
(
'valid count in response',
{
"count": 1,
},
SAMPLE_UUID,
0.001,
[],
),
], ids=lambda argval: argval[0])
def test_wait_until_cmd_or_err_succeeds(name, json_response, uuid, timeout, extra_words):
def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
return json.dumps(json_response)

check = canned_loggingindextime(exec_oc)
check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None)


@pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [
(
'invalid json response',
{
"invalid_field": 1,
},
SAMPLE_UUID,
0.001,
["invalid response", "Elasticsearch"],
),
(
'empty response',
{},
SAMPLE_UUID,
0.001,
["invalid response", "Elasticsearch"],
),
(
'valid response but invalid match count',
{
"count": 0,
},
SAMPLE_UUID,
0.005,
["expecting match", SAMPLE_UUID, "0.005s"],
)
], ids=lambda argval: argval[0])
def test_wait_until_cmd_or_err(name, json_response, uuid, timeout, extra_words):
def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
return json.dumps(json_response)

check = canned_loggingindextime(exec_oc)
with pytest.raises(OpenShiftCheckException) as error:
check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None)

for word in extra_words:
assert word in str(error)


@pytest.mark.parametrize('name, json_response, uuid, extra_words', [
(
'correct response code, found unique id is returned',
{
"statusCode": 404,
},
"sample unique id",
["sample unique id"],
),
], ids=lambda argval: argval[0])
def test_curl_kibana_with_uuid(name, json_response, uuid, extra_words):
def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
return json.dumps(json_response)

check = canned_loggingindextime(exec_oc)
check.generate_uuid = lambda: uuid

result = check.curl_kibana_with_uuid(plain_running_kibana_pod, None)

for word in extra_words:
assert word in result


@pytest.mark.parametrize('name, json_response, uuid, extra_words', [
(
'invalid json response',
{
"invalid_field": "invalid",
},
SAMPLE_UUID,
["invalid response returned", 'Missing "statusCode" key'],
),
(
'wrong error code in response',
{
"statusCode": 500,
},
SAMPLE_UUID,
["Expecting error code", "500"],
),
], ids=lambda argval: argval[0])
def test_failed_curl_kibana_with_uuid(name, json_response, uuid, extra_words):
def exec_oc(execute_module, ns, exec_cmd, args, task_vars):
return json.dumps(json_response)

check = canned_loggingindextime(exec_oc)
check.generate_uuid = lambda: uuid

with pytest.raises(OpenShiftCheckException) as error:
check.curl_kibana_with_uuid(plain_running_kibana_pod, None)

for word in extra_words:
assert word in str(error)