diff --git a/docs/tools/how-to-setup-report-script.md b/docs/tools/how-to-setup-report-script.md new file mode 100644 index 0000000000..9fd55a6b7b --- /dev/null +++ b/docs/tools/how-to-setup-report-script.md @@ -0,0 +1,169 @@ +# How to setup report script + +Many OpenPai cluster admins are interested in how is cluster usage and performance, who used the most/least resources, etc. Developers of OpenPai system are interested in what causes job failure, how to design and implment a system that can prevent such failure and avoid wasting cluster resources. + +But since not everyone is interested in this report, we do not maintain such a service, and merely provide a script for admins who interested in the report to execute. This document will provide informations about what the script will report, and how to maintain and query the result. + +## What the script will report + +The report consists of 4 reports `job`, `alert`, `raw_job` and `gpu`. + +### job + +This report will tell you uses' job statistic, this including the final status, job count and job resources, it have following columns: + +* user: username in OpenPai cluster +* vc: VC name in OpenPai cluster +* total job info: sum of all jobs +* successful job info: those finished and exit code is 0 +* failed job info: those finished and exit code is not 0 +* stopped job info: those stopped by user +* running job info: those running +* waiting job info: those waiting + +The job info is group of following subcolumns: + +* count: job count of this category +* elapsed time: job running time of this category +* cpu second: how much vcore-second used by jobs of this category +* memory second: how much memory(GB)-second used by jobs of this category +* gpu second: how much gpu-second used by jobs of this category + +### alert + +This report will tell you what alerts was triggered in your cluster, the script can generate this report even if you didn't set an alert manager. Because the Prometheus service will delete data that's old enough, in default setup, it only retains 15 days of data, you may want to extend the retaintion date if you want an accurate number in montly report. + +It have following columns: + +* alert name: alert name defined in prometheus +* host_ip: from which node this alert was triggered +* source: the actual component in problem(have different meanings in different alert) +* start: start time of this alert +* durtion: how much time(seconds) this alert lasts +* labels: original label sent along with alert + +### raw_job + +This report is a detailed job info, the `job.csv` can be deemed as aggreated statistic of this report. + +The report have following columns: + +* user: username in OpenPai cluster +* vc: VC name in OpenPai cluster +* job name: job name in OpenPai cluster +* start time: when the job got started +* finished time: when the job finished, if the job is still running, this will have value `1970/01/01` +* waiting time: how much time(second) this job is in waiting status before running, this include waiting time of the retries. If the job is still running, this will have value 0 +* running time: how much time this job is in running status, this include retries +* retries: the retry count of this job +* status: the status of the job, it could be `WAITING`, `RUNNING`, `SUCCEEDED`, `STOPPED`, `FAILED` and `UNKNOWN` +* exit code: the exit code of the job, if the job is still in running, it will be value `N/A` +* cpu allocated: how many vcore allocated to the job, this include the vcore allocated to app master +* memory allocated: how much memory(GB) allocated to the job, this include the memory allocated to app master +* max memory usage: maximum memory(GB) usage of this job, it will have value of `N/A` if Pai did not have record of memory usage, maybe due to running time of job is too short or system error +* gpu allocated: how many gpu card allocated to the job + +### gpu + +This report is about all gpu util info in cluster. + +The report have following columns: + +* host_ip: where this gpu installed +* gpu_id: gpu minor number in the node +* avg: avg utils during the report time frame + +## Prerequisite + +You should prepare a node that have access to OpenPai services, the script will need to access hadoop-resource-manager, framework-launcher and Prometheus deployed by OpenPai. This node do not need to have much memory resource and do not need to have GPU cards. You only need to make sure this node will not restart frequently. Usually the master node of the OpenPai cluster is a good choice. + +After you choose a node, please make sure you have following software installed: + +* python3 +* requests library +* flask library + +If your node is ubuntu node, you can install these software using following commands: + +``` sh +sudo apt-get install -y python3 python3-pip +pip3 install -r $PAI_DIR/src/tools/reports_requirements.txt +``` + +## How to Setup + +The [script](../../src/tools/reports.py) has thress actions, `refresh`, `report` and `serve`. + +The `refresh` action will tries to collect data from hadoop-resource-manager and framework-launcher, and save the data in sqlite3 DB for future process. The script needs to save data because hadoop-resource-manager will not retain job info too long, if we do not fetch them and save somewhere, we will not be able to generate correct report. We recommend admin run this script every 10 minutes using CRON job. + +The `report` action will query data about vc usage and job statistic from sqlite3 DB and generate vc/job/raw_job/gpu csv files, it will also get data from Prometheus to generate alert reports. You can execute this action whenever you want the reports. + +The `serve` action will start a http server so outside world can query report through web server instead of using files. + +Both `serve` and `report` will need `refresh` being called periodically to fetch data +from underlaying source. + +First, log into the node you choose, put the [script](../../src/tools/reports.py) somewhere, for example, I put it in directory `/home/core/report`, edit the crontab using + +``` sh +crontab -e +``` + +It will prompt an editor with some documentation, you will need to paste following content at the end of the file + +``` crontab +*/10 * * * * python3 /home/core/report/reports.py refresh -y $yarn_url -p $prometheus_url -l $launcher_url -d /home/core/report/cluster.sqlite >> /home/core/report/cluster.log 2>&1 +``` + +Please replace `$yarn_url`, `$prometheus_url` and `$launcher_url` with your cluster value, they are should be like `http://$master:8088`, `http://$master:9091` and `http://$master:9086` respectively where `$master` is the IP/hostname of your OpenPai master, please also make sure they are in one line. It is a good practice to execute the command before put into crontab. + +After finished, you should save and exit the editor. You can then execute + +``` sh +crontab -l +``` + +to view your current crontab. It should showing what you edited. + +All available arguments and meanings can be viewed by executing script with `-h` arguments. + +The script will automatically delete old data, by default, it will retain 6 months of data. If this is too large for you, for example, if you only want to retain 1 months of data, you can add `-r 31` to above command to tell script delete data that's older than 31 days. + +You have two options to get report: `report` or `serve` action. + +### `report` + +Whenever you want an report, you can log into that node again and execute following command + +``` sh +python3 /home/core/report/reports.py report -y $yarn_url -p $prometheus_url -l $launcher_url -d /home/core/report/cluster.sqlite +``` + +By default, the script will generate a monthly report, which means it will query data from one month ago until now and use these data to generate the reports, you can change the time range using `--since` and `--until` argument, for example, if you want the reports from one month ago and until one week ago, you can add these arguments: + +``` sh +--since `date --date='-1 month' +"%s"` --until `date --date='-1 week' +"%s"` +``` + +### `serve` + +Some external tools can query http server directly, so you can start serve process and issue http request when you want a report, without having to login node and execute a command. + +To setup serve process, execute following command + +``` sh +nohup python3 /home/core/report/reports.py serve -y $yarn_url -p $prometheus_url -l $launcher_url -d /home/core/report/cluster.sqlite > serve.log 2> serve.err.log & +``` + +This will start a process in background and listen to default 10240 port, you can specify `--port` argument to change default port. + +With http server setup, you can now get those reports with the same name of csv file like: + +``` +http://$IP:10240/job +http://$IP:10240/raw_job +http://$IP:10240/alert +http://$IP:10240/gpu +``` + +These end point all accept `span` argument, you can provide with value: `day`, `week` or `month`, which will generate report in that time span. The default span is week. This will get jobs finished during this time or is still running. diff --git a/src/tools/reports.py b/src/tools/reports.py new file mode 100644 index 0000000000..96df6e7866 --- /dev/null +++ b/src/tools/reports.py @@ -0,0 +1,961 @@ +#!/usr/bin/env python3 +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import urllib.parse +import argparse +import logging +import datetime +import json +import collections +import re +import sys +import math + +import sqlite3 +import requests + +import flask +from flask import Flask +from flask import request +from flask import Response + +logger = logging.getLogger(__name__) + + +def walk_json_field_safe(obj, *fields): + """ for example a=[{"a": {"b": 2}}] + walk_json_field_safe(a, 0, "a", "b") will get 2 + walk_json_field_safe(a, 0, "not_exist") will get None + """ + try: + for f in fields: + obj = obj[f] + return obj + except: + return None + + +def request_with_error_handling(url): + try: + response = requests.get(url, allow_redirects=True, timeout=15) + response.raise_for_status() + return response.json() + except Exception as e: + logger.exception(e) + return None + + +def format_time(timestamp): + d = datetime.datetime.fromtimestamp(timestamp) + return d.strftime("%Y/%m/%d-%H:%M:%S") + + +def get_ip(ip_port): + """ return 1.2.3.4 on 1.2.3.4:123 """ + m = re.match("([0-9]+[.][0-9]+[.][0-9]+[.][0-9]+):?.*", ip_port) + if m: + return m.groups()[0] + return ip_port + + +class JobInfo(object): + def __init__(self, job_count=0, elapsed_time=0, cpu_sec=0, mem_sec=0, gpu_sec=0, + user="unknown", vc="unknown", start_time=0, finished_time=0, retries=0, + status="unknown", exit_code="N/A", max_mem_usage="N/A"): + """ elapsed_time is seconds, cpu_sec is vcore-seconds, mem_sec is + megabyte-seconds, gpu_sec is card-seconds """ + self.job_count = job_count + self.elapsed_time = elapsed_time + self.cpu_sec = cpu_sec + self.mem_sec = mem_sec + self.gpu_sec = gpu_sec + + self.user = user + self.vc = vc + self.start_time = start_time + self.finished_time = finished_time + self.retries = retries + self.status = status + self.exit_code = exit_code + self.max_mem_usage = max_mem_usage + + def __iadd__(self, o): + self.job_count += o.job_count + self.elapsed_time += o.elapsed_time + self.cpu_sec += o.cpu_sec + self.mem_sec += o.mem_sec + self.gpu_sec += o.gpu_sec + return self + + def __add__(self, o): + return JobInfo( + job_count=self.job_count + o.job_count, + elapsed_time=self.elapsed_time + o.elapsed_time, + cpu_sec=self.cpu_sec + o.cpu_sec, + mem_sec=self.mem_sec + o.mem_sec, + gpu_sec=self.gpu_sec + o.gpu_sec) + + def values(self): + return [self.job_count, self.elapsed_time, + self.cpu_sec, self.mem_sec, self.gpu_sec] + + def __repr__(self): + # NOTE this is used to generate final report + return ",".join(map(str, self.values())) + + +class JobReportEntries(object): + def __init__(self, username, vc, total_job_info, success_job_info, + failed_job_info, stopped_job_info, running_job_info, waiting_job_info): + self.username = username + self.vc = vc + self.total_job_info = total_job_info + self.success_job_info = success_job_info + self.failed_job_info = failed_job_info + self.stopped_job_info = stopped_job_info + self.running_job_info = running_job_info + self.waiting_job_info = waiting_job_info + + def values(self): + result = [self.username, self.vc] + result.extend(self.total_job_info.values()) + result.extend(self.success_job_info.values()) + result.extend(self.failed_job_info.values()) + result.extend(self.stopped_job_info.values()) + result.extend(self.running_job_info.values()) + result.extend(self.waiting_job_info.values()) + return result + + def __repr__(self): + # NOTE this is used to generate final report + return ",".join(map(str, self.values())) + + +class RawJob(object): + def __init__(self, user, vc, job, + start_time, finish_time, waiting_time, run_time, + retries, status, exit_code, cpu, mem, max_mem, gpu): + self.user = user + self.vc = vc + self.job = job + self.start_time = start_time + self.finish_time = finish_time + self.waiting_time = waiting_time + self.run_time = run_time + self.retries = retries + self.status = status + self.exit_code = exit_code + self.cpu = cpu + self.mem = mem + self.max_mem = max_mem + self.gpu = gpu + + def values(self): + return [self.user, self.vc, self.job, + self.start_time, self.finish_time, self.waiting_time, self.run_time, + self.retries, self.status, self.exit_code, + self.cpu, self.mem, self.max_mem, self.gpu] + + def __repr__(self): + # NOTE this is used to generate final report + return ",".join(map(str, self.values())) + + +class Alert(object): + default_get_ip = lambda a: get_ip(a["instance"]) + host_ip_mapping = { + "NodeNotReady": lambda a: get_ip(a["name"]), + "k8sApiServerNotOk": lambda a: get_ip(a["host_ip"]), + "NodeDiskPressure": lambda a: get_ip(a["name"]), + "NodeNotReady": lambda a: get_ip(a["name"]), + "PaiServicePodNotRunning": lambda a: get_ip(a["host_ip"]), + "PaiServicePodNotReady": lambda a: get_ip(a["host_ip"]), + } + + src_mapping = { + "NvidiaSmiEccError": lambda a: a["minor_number"], + "NvidiaMemoryLeak": lambda a: a["minor_number"], + "GpuUsedByExternalProcess": lambda a: a["minor_number"], + "GpuUsedByZombieContainer": lambda a: a["minor_number"], + "PaiJobsZombie": lambda a: a["minor_number"], + "k8sApiServerNotOk": lambda a: a["error"], + "k8sDockerDaemonNotOk": lambda a: a["error"], + "NodeFilesystemUsage": lambda a: a["device"], + "NodeDiskPressure": lambda a: get_ip(a["name"]), + "NodeNotReady": lambda a: get_ip(a["name"]), + "AzureAgentConsumeTooMuchMem": lambda a: a["cmd"], + "PaiServicePodNotRunning": lambda a: a["name"], + "PaiServicePodNotReady": lambda a: a["name"], + "PaiServiceNotUp": lambda a: a["pai_service_name"], + "JobExporterHangs": lambda a: a["name"], + } + + def __init__(self, alert_name, start, durtion, labels): + """ alert_name are derived from labels, start/durtion is timestamp + value """ + self.alert_name = alert_name + self.start = start + self.durtion = durtion + self.labels = labels + + #f.write("alert_name,host_ip,source,start,durtion,labels\n") + + @staticmethod + def get_info(alert_name, labels, mapping): + return mapping.get(alert_name, Alert.default_get_ip)(labels) + + def labels_repr(self): + r = [] + for k, v in self.labels.items(): + if k in {"__name__", "alertname", "alertstate", "job", "type"}: + continue + r.append("%s:%s" % (k, v)) + return "|".join(r) + + def values(self): + return [self.alert_name, + Alert.get_info(self.alert_name, self.labels, Alert.host_ip_mapping), + Alert.get_info(self.alert_name, self.labels, Alert.src_mapping), + format_time(self.start), + self.durtion, + self.labels_repr()] + + def __repr__(self): + # NOTE this is used to generate final report + return ",".join(map(str, self.values())) + + +class GPUEntry(object): + def __init__(self, node_ip, gpu_id, avg_util): + self.node_ip = node_ip + self.gpu_id = gpu_id + self.avg_util = avg_util + + def values(self): + return [self.node_ip, self.gpu_id, self.avg_util] + + def __repr__(self): + # NOTE this is used to generate final report + return ",".join(map(str, self.values())) + + +class DB(object): + # If app is running, the finished_time is 0, should not delete it in delete_old_data + CREATE_APPS_TABLE = """CREATE TABLE IF NOT EXISTS apps ( + app_id text NOT NULL, + finished_time integer NOT NULL, + content text NOT NULL + )""" + CREATE_APP_ID_INDEX = "CREATE INDEX IF NOT EXISTS app_id_index ON apps (app_id);" + CREATE_APP_TIME_INDEX = "CREATE INDEX IF NOT EXISTS app_time_index ON apps (finished_time);" + + # If job is running, the finished_time is 0, should not delete it in delete_old_data + CREATE_FRAMEWORKS_TABLE = """CREATE TABLE IF NOT EXISTS frameworks ( + name text NOT NULL, + start_time integer NOT NULL, + finished_time integer NOT NULL, + content text NOT NULL + )""" + CREATE_FRAMEWORK_NAME_INDEX = "CREATE INDEX IF NOT EXISTS framework_name_index ON frameworks (name);" + CREATE_FRAMEWORK_TIME_INDEX = "CREATE INDEX IF NOT EXISTS framework_time_index ON frameworks (start_time, finished_time);" + + def __init__(self, db_path): + self.db_path = db_path + self.conn = sqlite3.connect(self.db_path) + cursor = self.conn.cursor() + cursor.execute(DB.CREATE_APPS_TABLE) + cursor.execute(DB.CREATE_APP_ID_INDEX) + cursor.execute(DB.CREATE_APP_TIME_INDEX) + cursor.execute(DB.CREATE_FRAMEWORKS_TABLE) + cursor.execute(DB.CREATE_FRAMEWORK_NAME_INDEX) + cursor.execute(DB.CREATE_FRAMEWORK_TIME_INDEX) + self.conn.commit() + + +def get_yarn_apps(yarn_url): + apps_url = urllib.parse.urljoin(yarn_url, "/ws/v1/cluster/apps") + result = [] + + obj = request_with_error_handling(apps_url) + + apps = walk_json_field_safe(obj, "apps", "app") + + if apps is None: + return result + + for app in apps: + app_id = walk_json_field_safe(app, "id") + if app_id is None: + continue + + finished_time = walk_json_field_safe(app, "finishedTime") or 0 + finished_time = int(finished_time / 1000) # yarn's time is in millisecond + content = json.dumps(app) + result.append({"app_id": app_id, + "finished_time": finished_time, "content": content}) + + return result + + +def get_frameworks(launcher_url): + launcher_url = urllib.parse.urljoin(launcher_url, "/v1/Frameworks") + result = [] + + obj = request_with_error_handling(launcher_url) + + frameworks = walk_json_field_safe(obj, "summarizedFrameworkInfos") + + if frameworks is None: + return result + + for framework in frameworks: + name = walk_json_field_safe(framework, "frameworkName") + if name is None: + continue + + finished_time = walk_json_field_safe(framework, "frameworkCompletedTimestamp") or 0 + finished_time = int(finished_time / 1000) # yarn's time is in millisecond + start_time = walk_json_field_safe(framework, "firstRequestTimestamp") or 0 + start_time = int(start_time / 1000) # yarn's time is in millisecond + content = json.dumps(framework) + result.append({"name": name, "start_time": start_time, + "finished_time": finished_time, "content": content}) + + return result + + +def refresh_cache(database, yarn_url, launcher_url): + db = DB(database) + + apps = get_yarn_apps(yarn_url) + logger.info("get %d of apps from yarn", len(apps)) + + with db.conn: + cursor = db.conn.cursor() + + for app in apps: + cursor.execute("""SELECT COUNT(*) FROM apps + WHERE app_id=?""", + (app["app_id"],)) + result = cursor.fetchone() + + if result[0] > 0: + cursor.execute("""UPDATE apps SET finished_time=?, content=? + WHERE app_id=?""", + (app["finished_time"], app["content"], app["app_id"])) + else: + cursor.execute("""INSERT INTO apps(app_id,finished_time,content) + VALUES(?,?,?)""", + (app["app_id"], app["finished_time"], app["content"])) + + db.conn.commit() + + frameworks = get_frameworks(launcher_url) + logger.info("get %d of frameworks from launcher", len(frameworks)) + + with db.conn: + cursor = db.conn.cursor() + + for framework in frameworks: + cursor.execute("""SELECT COUNT(*) FROM frameworks + WHERE name=?""", + (framework["name"],)) + result = cursor.fetchone() + + if result[0] > 0: + cursor.execute("""UPDATE frameworks SET finished_time=?, content=? + WHERE name=?""", + (framework["finished_time"], framework["content"], framework["name"])) + else: + cursor.execute("""INSERT INTO frameworks(name,start_time,finished_time,content) + VALUES(?,?,?,?)""", + (framework["name"], + framework["start_time"], + framework["finished_time"], + framework["content"])) + + db.conn.commit() + + +# https://github.com/Microsoft/pai/blob/pai-0.9.y/src/rest-server/src/models/job.js#L45 +# https://github.com/microsoft/pai/blob/v0.13.0/src/job-exit-spec/config/job-exit-spec.md +def convert_job_state(framework_state, exit_code): + if framework_state in { + "FRAMEWORK_WAITING", + "APPLICATION_CREATED", + "APPLICATION_LAUNCHED", + "APPLICATION_WAITING"}: + return "WAITING" + elif framework_state in { + "APPLICATION_RUNNING", + "APPLICATION_RETRIEVING_DIAGNOSTICS", + "APPLICATION_COMPLETED"}: + return "RUNNING" + elif framework_state == "FRAMEWORK_COMPLETED": + if exit_code is not None: + if exit_code == 0: + return "SUCCEEDED" + elif exit_code == -7351: + return "STOPPED" + else: + return "FAILED" + else: + return "FAILED" + + return "UNKNOWN" + + +def get_job_report(database, since, until, max_mem_usage): + """ return two values, one is aggregated job info, the other is raw job status """ + db = DB(database) + + with db.conn: + # Select more apps, since framework may retry, and previous retry + # may not finished in since~until range. + # Assume no retry will happen 1 month before framework finish. + app_since = datetime.datetime.fromtimestamp(since) - datetime.timedelta(days=31) + app_since = int(datetime.datetime.timestamp(app_since)) + cursor = db.conn.cursor() + cursor.execute("""SELECT content FROM apps + WHERE (finished_time>? AND finished_time? AND finished_time 0: + start = end = values[0][0] + events = [] + + for i, value in enumerate(values): + if i == len(values) - 1: + events.append({"start": start, "end": value[0]}) + break + + if value[0] - end <= gap: + end = value[0] + continue + else: + events.append({"start": start, "end": end}) + start = end = value[0] + + for event in events: + # because the end is the last time alert still happening, if we + # treat end - start equals to be the durtion of the alert, + # the alert with start == end will have durtion of 0, which is + # quite confusing, so we set durtion to be end - start + gap + result.append(Alert(alert_name, int(event["start"]), + int(event["end"] - event["start"] + gap), + labels)) + else: + logger.warning("unexpected zero values in alert %s", alert_name) + + logger.info("get %d alert entries", len(result)) + + return result + + +def get_gpu_util(prometheus_url, since, until): + args = urllib.parse.urlencode({ + "query": "nvidiasmi_utilization_gpu", + "start": str(since), + "end": str(until), + "step": "10m", + }) + + url = urllib.parse.urljoin(prometheus_url, + "/prometheus/api/v1/query_range") + "?" + args + + logger.debug("requesting %s", url) + result = [] + + obj = request_with_error_handling(url) + + if walk_json_field_safe(obj, "status") != "success": + logger.warning("requesting %s failed, body is %s", url, obj) + return result + + metrics = walk_json_field_safe(obj, "data", "result") + + for metric in metrics: + node_ip = get_ip(walk_json_field_safe(metric, "metric", "instance")) + gpu_id = walk_json_field_safe(metric, "metric", "minor_number") + + values = walk_json_field_safe(metric, "values") + sum_ = count = avg = 0 + if values is not None and len(values) > 0: + for val in values: + sum_ += float(val[1]) + count += 1 + avg = sum_ / count + else: + logger.warning("unexpected no values in gpu utils %s, %s, default avg to 0", + node_ip, + gpu_id) + + result.append(GPUEntry(node_ip, gpu_id, avg)) + + logger.info("get %d gpu entries", len(result)) + + return result + + +def delete_old_data(database, days): + db = DB(database) + now = datetime.datetime.now() + delta = datetime.timedelta(days=days) + + ago = int(datetime.datetime.timestamp(now - delta)) + + with db.conn: + cursor = db.conn.cursor() + + # should not delete entries if finished_time is 0, they are running apps + cursor.execute("""DELETE FROM apps WHERE finished_time