Skip to content

Commit

Permalink
Added ASGI application (#512)
Browse files Browse the repository at this point in the history
* Added ASGI application
* Factor out common-functionality for asgi/wsgi
* Convert twisted to use WSGIResource
* Change default HTTP Server to WSGI Server

Signed-off-by: Emil Madsen <sovende@gmail.com>
  • Loading branch information
Skeen authored Feb 19, 2020
1 parent ce7063f commit 56a8a53
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 55 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,19 @@ from prometheus_client import start_wsgi_server
start_wsgi_server(8000)
```

#### ASGI

To use Prometheus with [ASGI](http://asgi.readthedocs.org/en/latest/), there is
`make_asgi_app` which creates an ASGI application.

```python
from prometheus_client import make_asgi_app

app = make_asgi_app()
```
Such an application can be useful when integrating Prometheus metrics with ASGI
apps.

#### Flask

To use Prometheus with [Flask](http://flask.pocoo.org/) we need to serve metrics through a Prometheus WSGI application. This can be achieved using [Flask's application dispatching](http://flask.pocoo.org/docs/latest/patterns/appdispatch/). Below is a working example.
Expand Down
5 changes: 5 additions & 0 deletions prometheus_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
generate_latest = exposition.generate_latest
MetricsHandler = exposition.MetricsHandler
make_wsgi_app = exposition.make_wsgi_app
try:
# Python >3.5 only
make_asgi_app = exposition.make_asgi_app
except:
pass
start_http_server = exposition.start_http_server
start_wsgi_server = exposition.start_wsgi_server
write_to_textfile = exposition.write_to_textfile
Expand Down
34 changes: 34 additions & 0 deletions prometheus_client/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from urllib.parse import parse_qs

from .exposition import _bake_output
from .registry import REGISTRY


def make_asgi_app(registry=REGISTRY):
"""Create a ASGI app which serves the metrics from a registry."""

async def prometheus_app(scope, receive, send):
assert scope.get("type") == "http"
# Prepare parameters
params = parse_qs(scope.get('query_string', b''))
accept_header = "Accept: " + ",".join([
value.decode("utf8") for (name, value) in scope.get('headers')
if name.decode("utf8") == 'accept'
])
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
# Return output
payload = await receive()
if payload.get("type") == "http.request":
await send(
{
"type": "http.response.start",
"status": int(status.split(' ')[0]),
"headers": [
tuple(x.encode('utf8') for x in header)
]
}
)
await send({"type": "http.response.body", "body": output})

return prometheus_app
83 changes: 43 additions & 40 deletions prometheus_client/exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import socket
import sys
import threading
from wsgiref.simple_server import make_server, WSGIRequestHandler
from wsgiref.simple_server import make_server, WSGIServer, WSGIRequestHandler

from .openmetrics import exposition as openmetrics
from .registry import REGISTRY
Expand All @@ -31,20 +31,27 @@
PYTHON26_OR_OLDER = sys.version_info < (2, 7)
PYTHON376_OR_NEWER = sys.version_info > (3, 7, 5)


def _bake_output(registry, accept_header, params):
"""Bake output for metrics output."""
encoder, content_type = choose_encoder(accept_header)
if 'name[]' in params:
registry = registry.restricted_registry(params['name[]'])
output = encoder(registry)
return str('200 OK'), (str('Content-Type'), content_type), output


def make_wsgi_app(registry=REGISTRY):
"""Create a WSGI app which serves the metrics from a registry."""

def prometheus_app(environ, start_response):
# Prepare parameters
accept_header = environ.get('HTTP_ACCEPT')
params = parse_qs(environ.get('QUERY_STRING', ''))
r = registry
encoder, content_type = choose_encoder(environ.get('HTTP_ACCEPT'))
if 'name[]' in params:
r = r.restricted_registry(params['name[]'])
output = encoder(r)

status = str('200 OK')
headers = [(str('Content-type'), content_type)]
start_response(status, headers)
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
# Return output
start_response(status, [header])
return [output]

return prometheus_app
Expand All @@ -57,15 +64,26 @@ def log_message(self, format, *args):
"""Log nothing."""


class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
"""Thread per request HTTP server."""
# Make worker threads "fire and forget". Beginning with Python 3.7 this
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
# non-daemon threads in a list in order to join on them at server close.
daemon_threads = True


def start_wsgi_server(port, addr='', registry=REGISTRY):
"""Starts a WSGI server for prometheus metrics as a daemon thread."""
app = make_wsgi_app(registry)
httpd = make_server(addr, port, app, handler_class=_SilentHandler)
httpd = make_server(addr, port, app, ThreadingWSGIServer, handler_class=_SilentHandler)
t = threading.Thread(target=httpd.serve_forever)
t.daemon = True
t.start()


start_http_server = start_wsgi_server


def generate_latest(registry=REGISTRY):
"""Returns the metrics from the registry in latest text format as a string."""

Expand Down Expand Up @@ -143,18 +161,15 @@ class MetricsHandler(BaseHTTPRequestHandler):
registry = REGISTRY

def do_GET(self):
# Prepare parameters
registry = self.registry
accept_header = self.headers.get('Accept')
params = parse_qs(urlparse(self.path).query)
encoder, content_type = choose_encoder(self.headers.get('Accept'))
if 'name[]' in params:
registry = registry.restricted_registry(params['name[]'])
try:
output = encoder(registry)
except:
self.send_error(500, 'error generating metric output')
raise
self.send_response(200)
self.send_header('Content-Type', content_type)
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
# Return output
self.send_response(int(status.split(' ')[0]))
self.send_header(*header)
self.end_headers()
self.wfile.write(output)

Expand All @@ -177,25 +192,6 @@ def factory(cls, registry):
return MyMetricsHandler


class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
"""Thread per request HTTP server."""
# Make worker threads "fire and forget". Beginning with Python 3.7 this
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
# non-daemon threads in a list in order to join on them at server close.
# Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the
# same as Python 3.7's ``ThreadingHTTPServer``.
daemon_threads = True


def start_http_server(port, addr='', registry=REGISTRY):
"""Starts an HTTP server for prometheus metrics as a daemon thread"""
CustomMetricsHandler = MetricsHandler.factory(registry)
httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler)
t = threading.Thread(target=httpd.serve_forever)
t.daemon = True
t.start()


def write_to_textfile(path, registry):
"""Write metrics to the given path.
Expand Down Expand Up @@ -378,3 +374,10 @@ def instance_ip_grouping_key():
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
s.connect(('localhost', 0))
return {'instance': s.getsockname()[0]}


try:
# Python >3.5 only
from .asgi import make_asgi_app
except:
pass
20 changes: 5 additions & 15 deletions prometheus_client/twisted/_exposition.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
from __future__ import absolute_import, unicode_literals

from twisted.web.resource import Resource
from twisted.web.wsgi import WSGIResource
from twisted.internet import reactor

from .. import exposition, REGISTRY


class MetricsResource(Resource):
"""
Twisted ``Resource`` that serves prometheus metrics.
"""
isLeaf = True

def __init__(self, registry=REGISTRY):
self.registry = registry

def render_GET(self, request):
encoder, content_type = exposition.choose_encoder(request.getHeader('Accept'))
request.setHeader(b'Content-Type', content_type.encode('ascii'))
return encoder(self.registry)
MetricsResource = lambda registry=REGISTRY: WSGIResource(
reactor, reactor.getThreadPool(), exposition.make_wsgi_app(registry)
)
123 changes: 123 additions & 0 deletions tests/test_asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from __future__ import absolute_import, unicode_literals

import sys
from unittest import TestCase

from prometheus_client import CollectorRegistry, Counter, generate_latest
from prometheus_client.exposition import CONTENT_TYPE_LATEST

if sys.version_info < (2, 7):
from unittest2 import skipUnless
else:
from unittest import skipUnless

try:
# Python >3.5 only
from prometheus_client import make_asgi_app
import asyncio
from asgiref.testing import ApplicationCommunicator
HAVE_ASYNCIO_AND_ASGI = True
except ImportError:
HAVE_ASYNCIO_AND_ASGI = False


def setup_testing_defaults(scope):
scope.update(
{
"client": ("127.0.0.1", 32767),
"headers": [],
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"scheme": "http",
"server": ("127.0.0.1", 80),
"type": "http",
}
)


class ASGITest(TestCase):
@skipUnless(HAVE_ASYNCIO_AND_ASGI, "Don't have asyncio/asgi installed.")
def setUp(self):
self.registry = CollectorRegistry()
self.captured_status = None
self.captured_headers = None
# Setup ASGI scope
self.scope = {}
setup_testing_defaults(self.scope)
self.communicator = None

def tearDown(self):
if self.communicator:
asyncio.get_event_loop().run_until_complete(
self.communicator.wait()
)

def seed_app(self, app):
self.communicator = ApplicationCommunicator(app, self.scope)

def send_input(self, payload):
asyncio.get_event_loop().run_until_complete(
self.communicator.send_input(payload)
)

def send_default_request(self):
self.send_input({"type": "http.request", "body": b""})

def get_output(self):
output = asyncio.get_event_loop().run_until_complete(
self.communicator.receive_output(0)
)
return output

def get_all_output(self):
outputs = []
while True:
try:
outputs.append(self.get_output())
except asyncio.TimeoutError:
break
return outputs

def validate_metrics(self, metric_name, help_text, increments):
"""
ASGI app serves the metrics from the provided registry.
"""
c = Counter(metric_name, help_text, registry=self.registry)
for _ in range(increments):
c.inc()
# Create and run ASGI app
app = make_asgi_app(self.registry)
self.seed_app(app)
self.send_default_request()
# Assert outputs
outputs = self.get_all_output()
# Assert outputs
self.assertEqual(len(outputs), 2)
response_start = outputs[0]
self.assertEqual(response_start['type'], 'http.response.start')
response_body = outputs[1]
self.assertEqual(response_body['type'], 'http.response.body')
# Status code
self.assertEqual(response_start['status'], 200)
# Headers
self.assertEqual(len(response_start['headers']), 1)
self.assertEqual(response_start['headers'][0], (b"Content-Type", CONTENT_TYPE_LATEST.encode('utf8')))
# Body
output = response_body['body'].decode('utf8')
self.assertIn("# HELP " + metric_name + "_total " + help_text + "\n", output)
self.assertIn("# TYPE " + metric_name + "_total counter\n", output)
self.assertIn(metric_name + "_total " + str(increments) + ".0\n", output)

def test_report_metrics_1(self):
self.validate_metrics("counter", "A counter", 2)

def test_report_metrics_2(self):
self.validate_metrics("counter", "Another counter", 3)

def test_report_metrics_3(self):
self.validate_metrics("requests", "Number of requests", 5)

def test_report_metrics_4(self):
self.validate_metrics("failed_requests", "Number of failed requests", 7)
Loading

0 comments on commit 56a8a53

Please sign in to comment.