Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
solve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
ydye committed Mar 5, 2019
2 parents df287ae + 2429b23 commit 3881b3c
Show file tree
Hide file tree
Showing 20 changed files with 587 additions and 84 deletions.
5 changes: 4 additions & 1 deletion docs/webportal/PLUGINS.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ webportal:
plugins:
- title: Marketplace
uri: /scripts/plugins/marketplace.bundle.js
config:
repo: Microsoft/pai
```
- The `title` field is the title of the web portal plugin listed in the menu, it could be customized by administrators for the same plugin with different configurations.
- The `uri` field is the entry file of the web portal plugin, usually previded by the plugin developer. It may be an absolute URL or a root-relative URL, as the different deploy type of the web portal plugin.
- The `config` field is a key-value dictionary to configure the web portal plugin, available configs are listed in web portal plugin's specific document.

In addition, you can also lock the plugin version if the uri refers the Internet, follow the [Publish](#publish) section to move the online web portal plugin to offline.

Expand Down Expand Up @@ -50,7 +53,7 @@ If any other PAI configuration is needed, please open an issue, PAI developers w

### Provide Plugin Configurations

Ask system administration to set the query string of the entry file, like `http://example.com/github-plugin.js?repo=Microsoft%2Fpai`, `document.currentScript.src` would help you get the full uri of the script, including the query string.
The config of the plugin will be set as the query string of the entry file, like `http://example.com/github-plugin.js?repo=Microsoft%2Fpai`, `document.currentScript.src` would help you get the full uri of the script, including the query string.

### Migrate Current AI Web Tools to PAI Web Portal Plugin

Expand Down
8 changes: 2 additions & 6 deletions src/drivers/build/install-ib-drivers
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ lspci | grep -qE '(Network|Infiniband) controller.*Mellanox.*ConnectX' ||
# This script is used for installation of InfiniBand drivers
KERNEL_FULL_VERSION=`uname -r`

[[ -f /lib/modules/$KERNEL_FULL_VERSION/.modules-prepared ]] || {
echo Modules not yet prepared
exit 1
}

HOSTNAME=`hostname`
# HACK: using last octet of the host's IP
LAST_OCTET=`host $HOSTNAME | head -n1 | sed 's/^.*\.//'`
Expand All @@ -43,7 +38,7 @@ CURRENT_DRIVER=/var/drivers/mellanox/current
if [[ ! -f /var/drivers/mellanox/$MLNX_OFED_STRING/mlnxofedinstall ]]; then
[[ -f /tmp/$MLNX_OFED_STRING-ext.tgz ]] ||
{
./ /mlnx_add_kernel_support.sh -y -m ./$MLNX_OFED_STRING --make-tgz || exit $?
./mlnx_add_kernel_support.sh -y -m ./$MLNX_OFED_STRING --make-tgz || exit $?
}
mkdir -p /var/drivers/mellanox/$MLNX_OFED_STRING || exit $?
tar -xvf /tmp/$MLNX_OFED_STRING-ext.tgz -C /var/drivers/mellanox/$MLNX_OFED_STRING --strip 1 || exit $?
Expand Down Expand Up @@ -166,3 +161,4 @@ ibdev2netdev || exit $?
# Final check
ibPresent
echo ibPresent exit value: $?

22 changes: 12 additions & 10 deletions src/hadoop-ai/build/build-pre.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,33 @@

pushd $(dirname "$0") > /dev/null

hadoopBinaryDir="/hadoop-binary/"
hadoopBinaryDir="/hadoop-binary"

hadoopBinaryPath="${hadoopBinaryDir}hadoop-2.9.0.tar.gz"
cacheVersion="${hadoopBinaryDir}12940533-12933562-docker_executor-done"
# When Changing the patch id, please update it.
patchId="12940533-12933562-docker_executor"

hadoopBinaryPath="${hadoopBinaryDir}/hadoop-2.9.0.tar.gz"
cacheVersion="${hadoopBinaryDir}/${patchId}-done"

echo "hadoopbinarypath:${hadoopBinaryDir}"

[[ -f $cacheVersion ]] && [[ -f $hadoopBinaryPath ]] && [[ $cacheVersion -ot $hadoopBinaryPath ]] &&
echo "Hadoop binary path: ${hadoopBinaryDir}"

[[ -f ${cacheVersion} ]] && [[ -f ${hadoopBinaryPath} ]] && [[ ${cacheVersion} -ot ${hadoopBinaryPath} ]] &&
{
echo "Hadoop ai with patch 12940533-12933562-docker_executor has been built"
echo "Hadoop ai with patch ${patchId} has been built"
echo "Skip this build precess"
exit 0
}

[[ ! -f "$hadoopBinaryPath" ]] ||
[[ ! -f "${hadoopBinaryPath}" ]] ||
{

rm -rf $hadoopBinaryPath
rm -rf ${hadoopBinaryPath}

}

# When Changing the patch id, please update the filename here.
rm ${hadoopBinaryDir}/*-done
touch $cacheVersion
touch ${cacheVersion}

docker build -t hadoop-build -f hadoop-ai .

Expand Down
179 changes: 151 additions & 28 deletions src/job-exporter/src/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import subprocess
import time
import copy
import os

from prometheus_client import make_wsgi_app, Counter, Gauge, Histogram
from prometheus_client.core import GaugeMetricFamily
Expand All @@ -32,6 +33,7 @@
import docker_inspect
import docker_stats
import nvidia
import ps

logger = logging.getLogger(__name__)

Expand All @@ -57,6 +59,31 @@ def gen_gpu_mem_util_gauge():
"gpu memory utilization of card",
labels=["minor_number"])

def gen_gpu_ecc_counter():
return GaugeMetricFamily("nvidiasmi_ecc_error_count",
"count of nvidia ecc error",
labels=["minor_number", "type"])

def gen_gpu_memory_leak_counter():
return GaugeMetricFamily("nvidiasmi_memory_leak_count",
"count of nvidia memory leak",
labels=["minor_number"])

def gen_zombie_process_counter():
return GaugeMetricFamily("zombie_process_count",
"count of zombie process",
labels=["command"])

def gen_gpu_used_by_external_process_counter():
return GaugeMetricFamily("gpu_used_by_external_process_count",
"count of gpu used by external process",
labels=["minor_number", "pid"])

def gen_gpu_used_by_zombie_container_counter():
return GaugeMetricFamily("gpu_used_by_zombie_container_count",
"count of gpu used by zombie container",
labels=["minor_number", "container_id"])


class ResourceGauges(object):
def __init__(self):
Expand Down Expand Up @@ -267,13 +294,78 @@ class GpuCollector(Collector):
cmd_histogram = Histogram("cmd_nvidia_smi_latency_seconds",
"Command call latency for nvidia-smi (seconds)")

cmd_timeout = 3 # 99th latency is 0.97s
cmd_timeout = 60 # 99th latency is 0.97s

def __init__(self, name, sleep_time, atomic_ref, iteration_counter, gpu_info_ref):
def __init__(self, name, sleep_time, atomic_ref, iteration_counter,
gpu_info_ref, zombie_info_ref, mem_leak_thrashold):
Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter)
self.gpu_info_ref = gpu_info_ref
self.zombie_info_ref = zombie_info_ref
self.mem_leak_thrashold = mem_leak_thrashold

@staticmethod
def get_container_id(pid):
""" return two values, the first one is if we found the corresponding
container_id, the second one is the container_id if found """
path = "/proc/%d/cgroup" % (pid)
if not os.path.isfile(path):
return False, ""

with open(path) as f:
content = f.read()

for line in content.split("\n"):
line = line.strip()
if "pids" in line and "/docker/" in line:
parts = line.split("/docker/")
if len(parts) == 2 and re.match(u"[0-9a-f]+", parts[1]):
return True, parts[1]

return False, ""

@staticmethod
def convert_to_metrics(gpu_info, zombie_info, pid_to_cid_fn, mem_leak_thrashold):
""" This fn used to convert gpu_info & zombie_info into metrics, used to make
it easier to do unit test """
core_utils = gen_gpu_util_gauge()
mem_utils = gen_gpu_mem_util_gauge()
ecc_errors = gen_gpu_ecc_counter()
mem_leak = gen_gpu_memory_leak_counter()
external_process = gen_gpu_used_by_external_process_counter()
zombie_container = gen_gpu_used_by_zombie_container_counter()

pids_use_gpu = {} # key is gpu minor, value is an array of pid

for minor, info in gpu_info.items():
core_utils.add_metric([minor], info.gpu_util)
mem_utils.add_metric([minor], info.gpu_mem_util)
ecc_errors.add_metric([minor, "single"], info.ecc_errors.single)
ecc_errors.add_metric([minor, "double"], info.ecc_errors.double)
if info.gpu_mem_util > mem_leak_thrashold and len(info.pids) == 0:
# we found memory leak less than 20M can be mitigated automatically
mem_leak.add_metric([minor], 1)

if len(info.pids) > 0:
pids_use_gpu[minor]= info.pids

if zombie_info is not None and len(zombie_info) > 0 and len(pids_use_gpu) > 0:
for minor, pids in pids_use_gpu.items():
for pid in pids:
found, z_id = pid_to_cid_fn(pid)
if found and z_id in zombie_info:
# found corresponding container
zombie_container.add_metric([minor, z_id], 1)
else:
external_process.add_metric([minor, pid], 1)
logger.warning("found gpu used by external %s, zombie container %s",
external_process, zombie_container)

return [core_utils, mem_utils, ecc_errors, mem_leak,
external_process, zombie_container]

def collect_impl(self):
zombie_info = self.zombie_info_ref.get_and_set(None)

gpu_info = nvidia.nvidia_smi(GpuCollector.cmd_histogram,
GpuCollector.cmd_timeout)

Expand All @@ -282,15 +374,8 @@ def collect_impl(self):
self.gpu_info_ref.get_and_set(gpu_info)

if gpu_info is not None:
core_utils = gen_gpu_util_gauge()
mem_utils = gen_gpu_mem_util_gauge()

for minor, info in gpu_info.items():
core_utils.add_metric([minor], info["gpu_util"])
mem_utils.add_metric([minor], info["gpu_mem_util"])

return [core_utils, mem_utils]

return GpuCollector.convert_to_metrics(gpu_info, zombie_info,
GpuCollector.get_container_id, self.mem_leak_thrashold)
return None


Expand Down Expand Up @@ -443,13 +528,17 @@ def process_one_container(self, container_id, stats, gpu_infos, all_conns, gauge

if gpu_infos:
for id in gpu_ids:
if gpu_infos.get(id) is None:
continue

nvidia_gpu_status = gpu_infos[id]
labels = copy.deepcopy(container_labels)
labels["minor_number"] = id

gauges.add_value("task_gpu_percent",
labels, gpu_infos[id]["gpu_util"])
labels, nvidia_gpu_status.gpu_util)
gauges.add_value("task_gpu_mem_percent",
labels, gpu_infos[id]["gpu_mem_util"])
labels, nvidia_gpu_status.gpu_mem_util)

gauges.add_value("task_cpu_percent", container_labels, stats["CPUPerc"])
gauges.add_value("task_mem_usage_byte", container_labels, stats["MemUsage_Limit"]["usage"])
Expand Down Expand Up @@ -508,32 +597,33 @@ def __init__(self, type):
self.decay_time = datetime.timedelta(minutes=5)

def update(self, zombie_ids, now):
""" feed in new zombie ids and get count of decayed zombie """
""" feed in new zombie ids and get id of decayed zombie """
# remove all records not exist anymore
for z_id in list(self.zombies.keys()):
if z_id not in zombie_ids:
logger.debug("pop zombie %s that not exist anymore", z_id)
self.zombies.pop(z_id)

count = 0
result = set()
for current in zombie_ids:
if current in self.zombies:
enter_zombie_time = self.zombies[current]
if now - enter_zombie_time > self.decay_time:
count += 1
result.add(current)
else:
logger.debug("new zombie %s", current)
self.zombies[current] = now

ZombieCollector.zombie_container_count.labels(self.type).set(count)
return count # for test
ZombieCollector.zombie_container_count.labels(self.type).set(len(result))
return result

def __len__(self):
return len(self.zombies)

def __init__(self, name, sleep_time, atomic_ref, iteration_counter, stats_info_ref):
def __init__(self, name, sleep_time, atomic_ref, iteration_counter, stats_info_ref, zombie_ids_ref):
Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter)
self.stats_info_ref = stats_info_ref
self.zombie_ids_ref = zombie_ids_ref

self.type1_zombies = ZombieCollector.ZombieRecorder("job_exit_hangs")
self.type2_zombies = ZombieCollector.ZombieRecorder("residual_job")
Expand All @@ -549,26 +639,32 @@ def update_zombie_count_type1(self, exited_containers, now):

def update_zombie_count_type2(self, stats, now):
""" this fn will generate zombie container count for the second type """
names = set([info["name"] for info in stats.values()])
name_to_id = {}
for info in stats.values():
name_to_id[info["name"]] = info["id"]

# key is job name, value is tuple of corresponding
# yarn_container name and job container id
job_containers = {}

job_containers = {} # key is original name, value is corresponding yarn_container name
yarn_containers = set()

zombie_ids = set()

for name in names:
for name, id in name_to_id.items():
if re.match(self.yarn_container_reg, name) is not None:
yarn_containers.add(name)
elif re.match(self.job_container_reg, name) is not None:
match = re.match(self.job_container_reg, name)
value = match.groups()[0]
job_containers[name] = value
job_containers[name] = (value, id)
else:
pass # ignore

for job_name, yarn_name in job_containers.items():
for job_name, val in job_containers.items():
yarn_name, job_id = val
if yarn_name not in yarn_containers:
zombie_ids.add(job_name)
zombie_ids.add(job_id)

return self.type2_zombies.update(zombie_ids, now)

Expand Down Expand Up @@ -599,6 +695,7 @@ def update_zombie_count(self, stats):
There are two types of zombie:
1. container which outputed "USER COMMAND END" but did not exist for a long period of time
2. yarn container exited but job container didn't
return set of container id that deemed as zombie
"""
if stats is None:
logger.warning("docker stats is None")
Expand All @@ -607,11 +704,37 @@ def update_zombie_count(self, stats):
exited_containers = set(filter(self.is_container_exited, stats.keys()))

now = datetime.datetime.now()
self.update_zombie_count_type1(exited_containers, now)
self.update_zombie_count_type2(stats, now)
type1_zombies = self.update_zombie_count_type1(exited_containers, now)
type2_zombies = self.update_zombie_count_type2(stats, now)
return type1_zombies.union(type2_zombies)

def collect_impl(self):
# set it to None so if docker-stats hangs till next time we get,
# we will get None
stats_info = self.stats_info_ref.get_and_set(None)
self.update_zombie_count(stats_info)
all_zombies = self.update_zombie_count(stats_info)
self.zombie_ids_ref.get_and_set(all_zombies)


class ProcessCollector(Collector):
cmd_histogram = Histogram("cmd_ps_latency_seconds",
"Command call latency for ps (seconds)")

cmd_timeout = 10 # TODO 99th latency is xxx

def __init__(self, name, sleep_time, atomic_ref, iteration_counter):
Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter)

def collect_impl(self):
process = ps.get_zombie_process(ProcessCollector.cmd_histogram,
ProcessCollector.cmd_timeout)

if len(process) > 0:
zombie_metrics = gen_zombie_process_counter()

for cmd, count in process.items():
zombie_metrics.add_metric([cmd], count)

return [zombie_metrics]

return None
Loading

0 comments on commit 3881b3c

Please sign in to comment.