Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

add alert for jobs which in pending phrase for a long time #3761

Merged
merged 7 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/prometheus/deploy/alerting/jobs.rules
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ groups:
for: 1h # only when it exceed 1 hour
annotations:
summary: "zombie container in {{$labels.instance}} detected"

- alert: PaiJobPending
expr: pai_job_pod_count{pod_bound="true", phase="pending"} > 0
for: 30m
yqwang-ms marked this conversation as resolved.
Show resolved Hide resolved
annotations:
summary: "Job {{$labels.job_name}} in pending status detected"
2 changes: 1 addition & 1 deletion src/prometheus/deploy/alerting/k8s.rules
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ groups:
rules:
- alert: k8sApiServerNotOk
expr: k8s_api_server_count{error!="ok"} > 0
for: 5m
for: 10m
annotations:
summary: "api server in {{$labels.host_ip}} is {{$labels.error}}"

Expand Down
6 changes: 3 additions & 3 deletions src/prometheus/deploy/alerting/node.rules
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ groups:

- alert: NodeDiskPressure
expr: pai_node_count{disk_pressure="true"} > 0
for: 5m
for: 10m
annotations:
summary: "{{$labels.name}} is under disk pressure"

- alert: NodeOutOfDisk
expr: pai_node_count{out_of_disk="true"} > 0
for: 5m
for: 10m
annotations:
summary: "{{$labels.name}} is out of disk"

- alert: NodeNotReady
expr: pai_node_count{ready!="true"} > 0
for: 5m
for: 10m
annotations:
summary: "{{$labels.name}} is not ready"

Expand Down
4 changes: 2 additions & 2 deletions src/prometheus/deploy/alerting/pai-services.rules
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ groups:
rules:
- alert: PaiServicePodNotRunning
expr: pai_pod_count{phase!="running"} > 0
for: 5m
for: 10m
annotations:
summary: "{{$labels.name}} in {{$labels.host_ip}} not running detected"

- alert: PaiServicePodNotReady
expr: pai_pod_count{phase="running", ready="false"} > 0
for: 5m
for: 10m
labels:
type: pai_service
annotations:
Expand Down
2 changes: 1 addition & 1 deletion src/watchdog/deploy/watchdog.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ spec:
- "python"
- "/watchdog.py"
- "--interval"
- "30"
- "180"
- "--port"
- "9101"
- "--ca"
Expand Down
111 changes: 69 additions & 42 deletions src/watchdog/src/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def gen_pai_pod_gauge():
labels=["service_name", "name", "namespace", "phase", "host_ip",
"initialized", "pod_scheduled", "ready"])

def gen_pai_job_pod_gauge():
return GaugeMetricFamily("pai_job_pod_count", "count of pai job pod",
labels=["job_name", "name", "phase", "host_ip",
"initialized", "pod_bound", "pod_scheduled", "ready"])

def gen_pai_container_gauge():
return GaugeMetricFamily("pai_container_count", "count of container pod",
labels=["service_name", "pod_name", "name", "namespace", "state",
Expand Down Expand Up @@ -185,35 +190,38 @@ def __init__(self, name, gpu):
def __repr__(self):
return "%s: %s" % (self.name, self.gpu)

def parse_pod_item(pod, pai_pod_gauge, pai_container_gauge, pods_info):
""" add metrics to pai_pod_gauge or pai_container_gauge if successfully paesed pod.
def parse_pod_item(pod, pai_pod_gauge, pai_container_gauge, pai_job_pod_gauge, pods_info):
""" add metrics to pai_pod_gauge or pai_container_gauge if successfully parse pod.
Because we are parsing json outputed by k8s, its format is subjected to change,
we should test if field exists before accessing it to avoid KeyError """

pod_name = pod["metadata"]["name"]
namespace = walk_json_field_safe(pod, "metadata", "namespace") or "default"
host_ip = walk_json_field_safe(pod, "status", "hostIP") or "unscheduled"

used_gpu = 0
status = pod["status"]
containers = walk_json_field_safe(pod, "spec", "containers")
if containers is not None:
for container in containers:
req_gpu = int(walk_json_field_safe(container, "resources", "requests",
"nvidia.com/gpu") or 0)
limit_gpu = int(walk_json_field_safe(container, "resources", "limits",
"nvidia.com/gpu") or 0)
used_gpu += max(req_gpu, limit_gpu)
pods_info[host_ip].append(PodInfo(pod_name, used_gpu))

labels = pod["metadata"].get("labels")
if labels is None or "app" not in labels:
logger.info("unknown pod %s", pod["metadata"]["name"])
node_name = walk_json_field_safe(pod, "spec", "nodeName")

service_name = walk_json_field_safe(labels, "app")
job_name = walk_json_field_safe(labels, "jobName")
if service_name is None and job_name is None:
logger.info("unknown pod %s", pod_name)
return None

service_name = labels["app"] # get pai service name from label
generate_pods_info(pod_name, containers, host_ip, pods_info)
generate_pod_metrics(pai_pod_gauge, pai_job_pod_gauge, service_name,
job_name, pod_name, node_name, host_ip, status, namespace)

# generate pai_containers
if service_name is not None and status.get("containerStatuses") is not None:
container_statuses = status["containerStatuses"]
generate_container_metrics(
pai_container_gauge, service_name, pod_name, container_statuses, namespace, host_ip)

status = pod["status"]

def generate_pod_metrics(pai_pod_gauge, pai_job_pod_gauge, service_name, job_name, pod_name,
node_name, host_ip, status, namespace):
if status.get("phase") is not None:
phase = status["phase"].lower()
else:
Expand All @@ -237,44 +245,62 @@ def parse_pod_item(pod, pai_pod_gauge, pai_container_gauge, pods_info):
error_counter.labels(type="unknown_pod_cond").inc()
logger.warning("unexpected condition %s in pod %s", cond_t, pod_name)

pai_pod_gauge.add_metric([service_name, pod_name, namespace, phase, host_ip,
initialized, pod_scheduled, ready], 1)
# used to judge if sechduler has bound pod to a certain node
pod_bound = "true" if node_name else "false"

# generate pai_containers
if status.get("containerStatuses") is not None:
container_statuses = status["containerStatuses"]
if service_name is not None:
pai_pod_gauge.add_metric([service_name, pod_name, namespace, phase, host_ip,
initialized, pod_scheduled, ready], 1)
if job_name is not None:
pai_job_pod_gauge.add_metric([job_name, pod_name, phase, host_ip,
initialized, pod_bound, pod_scheduled, ready], 1)

for container_status in container_statuses:
container_name = container_status["name"]

ready = False
def generate_pods_info(pod_name, containers, host_ip, pods_info):
used_gpu = 0
if containers is not None:
for container in containers:
req_gpu = int(walk_json_field_safe(container, "resources", "requests",
"nvidia.com/gpu") or 0)
limit_gpu = int(walk_json_field_safe(container, "resources", "limits",
"nvidia.com/gpu") or 0)
used_gpu += max(req_gpu, limit_gpu)
pods_info[host_ip].append(PodInfo(pod_name, used_gpu))

if container_status.get("ready") is not None:
ready = container_status["ready"]

container_state = None
if container_status.get("state") is not None:
state = container_status["state"]
if len(state) != 1:
error_counter.labels(type="unexpected_container_state").inc()
logger.error("unexpected state %s in container %s",
json.dumps(state), container_name)
else:
container_state = list(state.keys())[0].lower()
def generate_container_metrics(pai_container_gauge, service_name, pod_name, container_statuses,
namespace, host_ip):
for container_status in container_statuses:
container_name = container_status["name"]

ready = False

pai_container_gauge.add_metric([service_name, pod_name, container_name,
namespace, container_state, host_ip, str(ready).lower()], 1)
if container_status.get("ready") is not None:
yqwang-ms marked this conversation as resolved.
Show resolved Hide resolved
ready = container_status["ready"]

container_state = None
if container_status.get("state") is not None:
state = container_status["state"]
if len(state) != 1:
error_counter.labels(type="unexpected_container_state").inc()
logger.error("unexpected state %s in container %s",
json.dumps(state), container_name)
else:
container_state = list(state.keys())[0].lower()

pai_container_gauge.add_metric([service_name, pod_name, container_name,
namespace, container_state, host_ip, str(ready).lower()], 1)

def process_pods_status(pods_object, pai_pod_gauge, pai_container_gauge,

def process_pods_status(pods_object, pai_pod_gauge, pai_container_gauge, pai_job_pod_gauge,
pods_info):
def _map_fn(item):
return catch_exception(parse_pod_item,
"catch exception when parsing pod item",
None,
item,
pai_pod_gauge, pai_container_gauge,
pods_info)
pai_job_pod_gauge, pods_info)

list(map(_map_fn, pods_object["items"]))

Expand Down Expand Up @@ -415,17 +441,18 @@ def process_pods(k8s_api_addr, ca_path, headers, pods_info):

pai_pod_gauge = gen_pai_pod_gauge()
pai_container_gauge = gen_pai_container_gauge()
pai_job_pod_gauge = gen_pai_job_pod_gauge()

try:
pods_object = request_with_histogram(list_pods_url, list_pods_histogram,
ca_path, headers)
process_pods_status(pods_object, pai_pod_gauge, pai_container_gauge,
process_pods_status(pods_object, pai_pod_gauge, pai_container_gauge, pai_job_pod_gauge,
pods_info)
except Exception as e:
error_counter.labels(type="parse").inc()
logger.exception("failed to process pods from namespace %s", ns)

return [pai_pod_gauge, pai_container_gauge]
return [pai_pod_gauge, pai_container_gauge, pai_job_pod_gauge]


def process_nodes(k8s_api_addr, ca_path, headers, pods_info):
Expand Down
Loading