Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
use twisted in yarn-exporter (#2273)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudifsd authored Mar 7, 2019
1 parent 83b9114 commit 647a87d
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ spec:
- "-p {{ cluster_cfg[ "hadoop-resource-manager" ]["yarn_exporter_port"] }}"
livenessProbe:
httpGet:
path: /
path: '/healthz'
port: {{ cluster_cfg[ "hadoop-resource-manager" ]["yarn_exporter_port"] }}
initialDelaySeconds: 5
timeoutSeconds: 1
initialDelaySeconds: 30
periodSeconds: 30
timeoutSeconds: 10
imagePullSecrets:
- name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }}
volumes:
Expand Down
3 changes: 2 additions & 1 deletion src/yarn-exporter/build/yarn-exporter.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ FROM python:3.7

WORKDIR /root/

COPY src/yarn_exporter.py /usr/local/
COPY src/requirements.txt /usr/local/

RUN pip3 install -r /usr/local/requirements.txt

COPY src/yarn_exporter.py /usr/local/
10 changes: 1 addition & 9 deletions src/yarn-exporter/src/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
altgraph==0.15
attrs==18.1.0
certifi==2018.4.16
chardet==3.0.4
future==0.16.0
idna==2.7
macholib==1.9
pefile==2017.11.5
prometheus-client==0.2.0
requests==2.20.0
urllib3==1.23
twisted==18.9.0
162 changes: 75 additions & 87 deletions src/yarn-exporter/src/yarn_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,128 +16,116 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

from wsgiref.simple_server import make_server
import urllib.parse
import argparse
import signal
import faulthandler
import gc

from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY
from prometheus_client import make_wsgi_app
import attr
import requests


class YarnCollector(object):
api_path = '/'

def __init__(self, endpoint, cluster_name='yarn'):
self.endpoint = endpoint
self.cluster_name = cluster_name
from prometheus_client.core import GaugeMetricFamily, REGISTRY
from prometheus_client import Histogram
from prometheus_client.twisted import MetricsResource

@property
def metric_url(self):
return urllib.parse.urljoin(self.endpoint, self.api_path)
import requests

def collect(self):
raise NotImplemented
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor

##### yarn-exporter will generate following metrics

@attr.s
class YarnMetric(object):
GAUGE = 'gauge'
COUNTER = 'counter'
supported_type = [GAUGE, COUNTER]
cluster_metrics_histogram = Histogram("yarn_api_cluster_metrics_latency_seconds",
"Resource latency for requesting yarn api /ws/v1/cluster/metrics")

namespace = "yarn"
def gen_total_gpu_count():
return GaugeMetricFamily("yarn_total_gpu_num", "count of total gpu number")

name = attr.ib()
metric_type = attr.ib()
def gen_used_gpu_count():
return GaugeMetricFamily("yarn_gpus_used", "count of allocated gpu by yarn")

@metric_type.validator
def check(self, _, value):
if value not in self.supported_type:
raise ValueError('Parameter metric_type value must in {0}, can not be {1}'.format(self.supported_type, value))
def gen_total_node_count():
return GaugeMetricFamily("yarn_nodes_all", "total node count in yarn")

description = attr.ib()
labels = attr.ib(default=attr.Factory(list))
def gen_active_node_count():
return GaugeMetricFamily("yarn_nodes_active", "active node count in yarn")

@property
def metric_name(self):
return '{0}_{1}'.format(self.namespace, self.name)
##### yarn-exporter will generate above metrics

def create_metric(self):
if self.metric_type == self.GAUGE:
return GaugeMetricFamily(self.metric_name, self.description, labels=self.labels)
elif self.metric_type == self.COUNTER:
return CounterMetricFamily(self.metric_name, self.description, labels=self.labels)
else:
raise ValueError('property metric_type value must in {0}, can not be {1}'.format(self.supported_type, self.metric_type))
def request_with_histogram(url, histogram, *args, **kwargs):
with histogram.time():
return requests.get(url, *args, **kwargs)


class YarnMetricCollector(YarnCollector):
api_path = '/ws/v1/cluster/metrics'
class YarnCollector(object):
def __init__(self, yarn_url):
self.metric_url = urllib.parse.urljoin(yarn_url, "/ws/v1/cluster/metrics")

def collect(self):
response = requests.get(self.metric_url, allow_redirects=True)
response = request_with_histogram(self.metric_url, cluster_metrics_histogram,
allow_redirects=True)
response.raise_for_status()
metric = response.json()['clusterMetrics']

metric = response.json()["clusterMetrics"]

total_gpu_num = YarnMetric('total_gpu_num', YarnMetric.COUNTER,
'The total number of GPUs of cluster',['cluster']).create_metric()
total_gpu_num.add_metric([self.cluster_name], metric['totalGPUs'])
total_gpu_num = gen_total_gpu_count()
total_gpu_num.add_metric([], metric["totalGPUs"])
yield total_gpu_num

gpus_used = YarnMetric('gpus_used', YarnMetric.COUNTER,
'The number of allocated GPUs',['cluster']).create_metric()
gpus_used.add_metric([self.cluster_name], metric['allocatedGPUs'])
gpus_used = gen_used_gpu_count()
gpus_used.add_metric([], metric["allocatedGPUs"])
yield gpus_used

nodes_all = YarnMetric('nodes_all', YarnMetric.GAUGE,
'The total number of nodes', ['cluster']).create_metric()
nodes_all.add_metric([self.cluster_name], metric['totalNodes'])
nodes_all = gen_total_gpu_count()
nodes_all.add_metric([], metric["totalNodes"])
yield nodes_all

nodes_active = YarnMetric('nodes_active', YarnMetric.GAUGE,
'The number of active nodes', ['cluster']).create_metric()
nodes_active.add_metric([self.cluster_name], metric['activeNodes'])
nodes_active = gen_active_node_count()
nodes_active.add_metric([], metric["activeNodes"])
yield nodes_active

# nodes_lost = YarnMetric('nodes_lost', YarnMetric.GAUGE,
# 'The number of lost nodes', ['cluster']).create_metric()
# nodes_lost.add_metric([self.cluster_name], metric['lostNodes'])
# yield nodes_lost

# nodes_unhealthy = YarnMetric('nodes_unhealthy', YarnMetric.GAUGE,
# 'The number of unhealthy nodes', ['cluster']).create_metric()
# nodes_unhealthy.add_metric([self.cluster_name], metric['unhealthyNodes'])
# yield nodes_unhealthy
class HealthResource(Resource):
def render_GET(self, request):
request.setHeader("Content-Type", "text/html; charset=utf-8")
return "<html>Ok</html>".encode("utf-8")

# nodes_decommissioned = YarnMetric('nodes_decommissioned', YarnMetric.COUNTER,
# 'The number of nodes decommissioned', ['cluster']).create_metric()
# nodes_decommissioned.add_metric([self.cluster_name], metric['decommissionedNodes'])
# yield nodes_decommissioned
def register_stack_trace_dump():
faulthandler.register(signal.SIGTRAP, all_threads=True, chain=False)

# nodes_rebooted = YarnMetric('nodes_rebooted', YarnMetric.COUNTER,
# 'The number of nodes rebooted', ['cluster']).create_metric()
# nodes_rebooted.add_metric([self.cluster_name], metric['rebootedNodes'])
# yield nodes_rebooted
# https://github.com/prometheus/client_python/issues/322#issuecomment-428189291
def burninate_gc_collector():
for callback in gc.callbacks[:]:
if callback.__qualname__.startswith("GCCollector."):
gc.callbacks.remove(callback)

def get_parser():
for name, collector in list(REGISTRY._names_to_collectors.items()):
if name.startswith("python_gc_"):
try:
REGISTRY.unregister(collector)
except KeyError: # probably gone already
pass

def main(args):
register_stack_trace_dump()
burninate_gc_collector()

REGISTRY.register(YarnCollector(args.yarn_url))

root = Resource()
root.putChild(b"metrics", MetricsResource())
root.putChild(b"healthz", HealthResource())

factory = Site(root)
reactor.listenTCP(int(args.port), factory)
reactor.run()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("yarn_url", help="Yarn rest api address, eg: http://127.0.0.1:8088")
parser.add_argument("--cluster-name", "-n", help="Yarn cluster name",
default="cluster_0")
parser.add_argument("--port", "-p", help="Exporter listen port",default="9459")
parser.add_argument("--host", "-H", help="Exporter host address", default="0.0.0.0")
parser.add_argument("--collected-apps", "-c", nargs="*",
help="Name of applications need to collect running status")

return parser

if __name__ == "__main__":
args = get_parser().parse_args()
args = parser.parse_args()

REGISTRY.register(YarnMetricCollector(args.yarn_url + '/metrics', args.cluster_name))
app = make_wsgi_app(REGISTRY)
httpd = make_server(args.host, int(args.port), app)
httpd.serve_forever()
main(args)

0 comments on commit 647a87d

Please sign in to comment.