Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

use twisted in yarn-exporter #2273

Merged
merged 1 commit into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ spec:
- "-p {{ cluster_cfg[ "hadoop-resource-manager" ]["yarn_exporter_port"] }}"
livenessProbe:
httpGet:
path: /
path: '/healthz'
port: {{ cluster_cfg[ "hadoop-resource-manager" ]["yarn_exporter_port"] }}
initialDelaySeconds: 5
timeoutSeconds: 1
initialDelaySeconds: 30
periodSeconds: 30
timeoutSeconds: 10
imagePullSecrets:
- name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }}
volumes:
Expand Down
3 changes: 2 additions & 1 deletion src/yarn-exporter/build/yarn-exporter.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ FROM python:3.7

WORKDIR /root/

COPY src/yarn_exporter.py /usr/local/
COPY src/requirements.txt /usr/local/

RUN pip3 install -r /usr/local/requirements.txt

COPY src/yarn_exporter.py /usr/local/
10 changes: 1 addition & 9 deletions src/yarn-exporter/src/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
altgraph==0.15
attrs==18.1.0
certifi==2018.4.16
chardet==3.0.4
future==0.16.0
idna==2.7
macholib==1.9
pefile==2017.11.5
prometheus-client==0.2.0
requests==2.20.0
urllib3==1.23
twisted==18.9.0
162 changes: 75 additions & 87 deletions src/yarn-exporter/src/yarn_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,128 +16,116 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

from wsgiref.simple_server import make_server
import urllib.parse
import argparse
import signal
import faulthandler
import gc

from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY
from prometheus_client import make_wsgi_app
import attr
import requests


class YarnCollector(object):
api_path = '/'

def __init__(self, endpoint, cluster_name='yarn'):
self.endpoint = endpoint
self.cluster_name = cluster_name
from prometheus_client.core import GaugeMetricFamily, REGISTRY
from prometheus_client import Histogram
from prometheus_client.twisted import MetricsResource

@property
def metric_url(self):
return urllib.parse.urljoin(self.endpoint, self.api_path)
import requests

def collect(self):
raise NotImplemented
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor

##### yarn-exporter will generate following metrics

@attr.s
class YarnMetric(object):
GAUGE = 'gauge'
COUNTER = 'counter'
supported_type = [GAUGE, COUNTER]
cluster_metrics_histogram = Histogram("yarn_api_cluster_metrics_latency_seconds",
"Resource latency for requesting yarn api /ws/v1/cluster/metrics")

namespace = "yarn"
def gen_total_gpu_count():
return GaugeMetricFamily("yarn_total_gpu_num", "count of total gpu number")

name = attr.ib()
metric_type = attr.ib()
def gen_used_gpu_count():
return GaugeMetricFamily("yarn_gpus_used", "count of allocated gpu by yarn")

@metric_type.validator
def check(self, _, value):
if value not in self.supported_type:
raise ValueError('Parameter metric_type value must in {0}, can not be {1}'.format(self.supported_type, value))
def gen_total_node_count():
return GaugeMetricFamily("yarn_nodes_all", "total node count in yarn")

description = attr.ib()
labels = attr.ib(default=attr.Factory(list))
def gen_active_node_count():
return GaugeMetricFamily("yarn_nodes_active", "active node count in yarn")

@property
def metric_name(self):
return '{0}_{1}'.format(self.namespace, self.name)
##### yarn-exporter will generate above metrics

def create_metric(self):
if self.metric_type == self.GAUGE:
return GaugeMetricFamily(self.metric_name, self.description, labels=self.labels)
elif self.metric_type == self.COUNTER:
return CounterMetricFamily(self.metric_name, self.description, labels=self.labels)
else:
raise ValueError('property metric_type value must in {0}, can not be {1}'.format(self.supported_type, self.metric_type))
def request_with_histogram(url, histogram, *args, **kwargs):
with histogram.time():
return requests.get(url, *args, **kwargs)


class YarnMetricCollector(YarnCollector):
api_path = '/ws/v1/cluster/metrics'
class YarnCollector(object):
def __init__(self, yarn_url):
self.metric_url = urllib.parse.urljoin(yarn_url, "/ws/v1/cluster/metrics")

def collect(self):
response = requests.get(self.metric_url, allow_redirects=True)
response = request_with_histogram(self.metric_url, cluster_metrics_histogram,
allow_redirects=True)
response.raise_for_status()
metric = response.json()['clusterMetrics']

metric = response.json()["clusterMetrics"]

total_gpu_num = YarnMetric('total_gpu_num', YarnMetric.COUNTER,
'The total number of GPUs of cluster',['cluster']).create_metric()
total_gpu_num.add_metric([self.cluster_name], metric['totalGPUs'])
total_gpu_num = gen_total_gpu_count()
total_gpu_num.add_metric([], metric["totalGPUs"])
yield total_gpu_num

gpus_used = YarnMetric('gpus_used', YarnMetric.COUNTER,
'The number of allocated GPUs',['cluster']).create_metric()
gpus_used.add_metric([self.cluster_name], metric['allocatedGPUs'])
gpus_used = gen_used_gpu_count()
gpus_used.add_metric([], metric["allocatedGPUs"])
yield gpus_used

nodes_all = YarnMetric('nodes_all', YarnMetric.GAUGE,
'The total number of nodes', ['cluster']).create_metric()
nodes_all.add_metric([self.cluster_name], metric['totalNodes'])
nodes_all = gen_total_gpu_count()
nodes_all.add_metric([], metric["totalNodes"])
yield nodes_all

nodes_active = YarnMetric('nodes_active', YarnMetric.GAUGE,
'The number of active nodes', ['cluster']).create_metric()
nodes_active.add_metric([self.cluster_name], metric['activeNodes'])
nodes_active = gen_active_node_count()
nodes_active.add_metric([], metric["activeNodes"])
yield nodes_active

# nodes_lost = YarnMetric('nodes_lost', YarnMetric.GAUGE,
# 'The number of lost nodes', ['cluster']).create_metric()
# nodes_lost.add_metric([self.cluster_name], metric['lostNodes'])
# yield nodes_lost

# nodes_unhealthy = YarnMetric('nodes_unhealthy', YarnMetric.GAUGE,
# 'The number of unhealthy nodes', ['cluster']).create_metric()
# nodes_unhealthy.add_metric([self.cluster_name], metric['unhealthyNodes'])
# yield nodes_unhealthy
class HealthResource(Resource):
def render_GET(self, request):
request.setHeader("Content-Type", "text/html; charset=utf-8")
return "<html>Ok</html>".encode("utf-8")
xudifsd marked this conversation as resolved.
Show resolved Hide resolved

# nodes_decommissioned = YarnMetric('nodes_decommissioned', YarnMetric.COUNTER,
# 'The number of nodes decommissioned', ['cluster']).create_metric()
# nodes_decommissioned.add_metric([self.cluster_name], metric['decommissionedNodes'])
# yield nodes_decommissioned
def register_stack_trace_dump():
faulthandler.register(signal.SIGTRAP, all_threads=True, chain=False)

# nodes_rebooted = YarnMetric('nodes_rebooted', YarnMetric.COUNTER,
# 'The number of nodes rebooted', ['cluster']).create_metric()
# nodes_rebooted.add_metric([self.cluster_name], metric['rebootedNodes'])
# yield nodes_rebooted
# https://github.com/prometheus/client_python/issues/322#issuecomment-428189291
def burninate_gc_collector():
for callback in gc.callbacks[:]:
if callback.__qualname__.startswith("GCCollector."):
gc.callbacks.remove(callback)

def get_parser():
for name, collector in list(REGISTRY._names_to_collectors.items()):
if name.startswith("python_gc_"):
try:
REGISTRY.unregister(collector)
except KeyError: # probably gone already
pass

def main(args):
register_stack_trace_dump()
burninate_gc_collector()

REGISTRY.register(YarnCollector(args.yarn_url))

root = Resource()
root.putChild(b"metrics", MetricsResource())
root.putChild(b"healthz", HealthResource())

factory = Site(root)
reactor.listenTCP(int(args.port), factory)
reactor.run()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("yarn_url", help="Yarn rest api address, eg: http://127.0.0.1:8088")
parser.add_argument("--cluster-name", "-n", help="Yarn cluster name",
default="cluster_0")
parser.add_argument("--port", "-p", help="Exporter listen port",default="9459")
parser.add_argument("--host", "-H", help="Exporter host address", default="0.0.0.0")
parser.add_argument("--collected-apps", "-c", nargs="*",
help="Name of applications need to collect running status")

return parser

if __name__ == "__main__":
args = get_parser().parse_args()
args = parser.parse_args()

REGISTRY.register(YarnMetricCollector(args.yarn_url + '/metrics', args.cluster_name))
app = make_wsgi_app(REGISTRY)
httpd = make_server(args.host, int(args.port), app)
httpd.serve_forever()
main(args)