Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ASGI application #512

Merged
merged 10 commits into from
Feb 19, 2020
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,32 @@ from prometheus_client import start_wsgi_server
start_wsgi_server(8000)
```

#### ASGI

To use Prometheus with [ASGI](http://asgi.readthedocs.org/en/latest/), there is
`make_asgi_app` which creates an ASGI application.

Save the snippet below in a `myapp.py` file

```python
from prometheus_client import make_asgi_app

app = make_asgi_app()
```
Such an application can be useful when integrating Prometheus metrics with ASGI
apps.

The app can be used to serve the metrics through an ASGI implementation, such
Skeen marked this conversation as resolved.
Show resolved Hide resolved
as [daphne](https://github.com/django/daphne) or
[uvicorn](https://www.uvicorn.org/).
```bash
# Install daphne if you do not have it
pip install daphne
daphne myapp:app
```

Visit http://localhost:8000/ to see the metrics

#### Flask

To use Prometheus with [Flask](http://flask.pocoo.org/) we need to serve metrics through a Prometheus WSGI application. This can be achieved using [Flask's application dispatching](http://flask.pocoo.org/docs/latest/patterns/appdispatch/). Below is a working example.
Expand Down
5 changes: 5 additions & 0 deletions prometheus_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
generate_latest = exposition.generate_latest
MetricsHandler = exposition.MetricsHandler
make_wsgi_app = exposition.make_wsgi_app
try:
# Python >3.5 only
make_asgi_app = exposition.make_asgi_app
except:
pass
start_http_server = exposition.start_http_server
start_wsgi_server = exposition.start_wsgi_server
write_to_textfile = exposition.write_to_textfile
Expand Down
34 changes: 34 additions & 0 deletions prometheus_client/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from urllib.parse import parse_qs

from .exposition import _bake_output
from .registry import REGISTRY


def make_asgi_app(registry=REGISTRY):
"""Create a ASGI app which serves the metrics from a registry."""

async def prometheus_app(scope, receive, send):
Skeen marked this conversation as resolved.
Show resolved Hide resolved
assert scope.get("type") == "http"
# Prepare parameters
params = parse_qs(scope.get('query_string', b''))
accept_header = "Accept: " + ",".join([
value.decode("utf8") for (name, value) in scope.get('headers')
if name.decode("utf8") == 'accept'
])
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
# Return output
payload = await receive()
if payload.get("type") == "http.request":
await send(
{
"type": "http.response.start",
"status": int(status.split(' ')[0]),
"headers": [
tuple(x.encode('utf8') for x in header)
]
}
)
await send({"type": "http.response.body", "body": output})

return prometheus_app
83 changes: 43 additions & 40 deletions prometheus_client/exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import socket
import sys
import threading
from wsgiref.simple_server import make_server, WSGIRequestHandler
from wsgiref.simple_server import make_server, WSGIServer, WSGIRequestHandler

from .openmetrics import exposition as openmetrics
from .registry import REGISTRY
Expand All @@ -31,20 +31,27 @@
PYTHON26_OR_OLDER = sys.version_info < (2, 7)
PYTHON376_OR_NEWER = sys.version_info > (3, 7, 5)


def _bake_output(registry, accept_header, params):
"""Bake output for metrics output."""
encoder, content_type = choose_encoder(accept_header)
if 'name[]' in params:
registry = registry.restricted_registry(params['name[]'])
output = encoder(registry)
return str('200 OK'), (str('Content-Type'), content_type), output


def make_wsgi_app(registry=REGISTRY):
"""Create a WSGI app which serves the metrics from a registry."""

def prometheus_app(environ, start_response):
# Prepare parameters
accept_header = environ.get('HTTP_ACCEPT')
params = parse_qs(environ.get('QUERY_STRING', ''))
r = registry
encoder, content_type = choose_encoder(environ.get('HTTP_ACCEPT'))
if 'name[]' in params:
r = r.restricted_registry(params['name[]'])
output = encoder(r)

status = str('200 OK')
headers = [(str('Content-type'), content_type)]
start_response(status, headers)
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
# Return output
start_response(status, [header])
return [output]

return prometheus_app
Expand All @@ -57,15 +64,26 @@ def log_message(self, format, *args):
"""Log nothing."""


class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
"""Thread per request HTTP server."""
# Make worker threads "fire and forget". Beginning with Python 3.7 this
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
# non-daemon threads in a list in order to join on them at server close.
daemon_threads = True


def start_wsgi_server(port, addr='', registry=REGISTRY):
"""Starts a WSGI server for prometheus metrics as a daemon thread."""
app = make_wsgi_app(registry)
httpd = make_server(addr, port, app, handler_class=_SilentHandler)
httpd = make_server(addr, port, app, ThreadingWSGIServer, handler_class=_SilentHandler)
t = threading.Thread(target=httpd.serve_forever)
t.daemon = True
t.start()


start_http_server = start_wsgi_server


def generate_latest(registry=REGISTRY):
"""Returns the metrics from the registry in latest text format as a string."""

Expand Down Expand Up @@ -143,18 +161,15 @@ class MetricsHandler(BaseHTTPRequestHandler):
registry = REGISTRY

def do_GET(self):
# Prepare parameters
registry = self.registry
accept_header = self.headers.get('Accept')
params = parse_qs(urlparse(self.path).query)
encoder, content_type = choose_encoder(self.headers.get('Accept'))
if 'name[]' in params:
registry = registry.restricted_registry(params['name[]'])
try:
output = encoder(registry)
except:
self.send_error(500, 'error generating metric output')
raise
self.send_response(200)
self.send_header('Content-Type', content_type)
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
# Return output
self.send_response(int(status.split(' ')[0]))
self.send_header('Content-Type', header[1])
Skeen marked this conversation as resolved.
Show resolved Hide resolved
self.end_headers()
self.wfile.write(output)

Expand All @@ -177,25 +192,6 @@ def factory(cls, registry):
return MyMetricsHandler


class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
"""Thread per request HTTP server."""
# Make worker threads "fire and forget". Beginning with Python 3.7 this
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
# non-daemon threads in a list in order to join on them at server close.
# Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the
# same as Python 3.7's ``ThreadingHTTPServer``.
daemon_threads = True


def start_http_server(port, addr='', registry=REGISTRY):
"""Starts an HTTP server for prometheus metrics as a daemon thread"""
CustomMetricsHandler = MetricsHandler.factory(registry)
httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler)
t = threading.Thread(target=httpd.serve_forever)
t.daemon = True
t.start()


def write_to_textfile(path, registry):
"""Write metrics to the given path.

Expand Down Expand Up @@ -378,3 +374,10 @@ def instance_ip_grouping_key():
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
s.connect(('localhost', 0))
return {'instance': s.getsockname()[0]}


try:
# Python >3.5 only
from .asgi import make_asgi_app
except:
pass
20 changes: 5 additions & 15 deletions prometheus_client/twisted/_exposition.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
from __future__ import absolute_import, unicode_literals

from twisted.web.resource import Resource
from twisted.web.wsgi import WSGIResource
from twisted.internet import reactor

from .. import exposition, REGISTRY


class MetricsResource(Resource):
"""
Twisted ``Resource`` that serves prometheus metrics.
"""
isLeaf = True

def __init__(self, registry=REGISTRY):
self.registry = registry

def render_GET(self, request):
encoder, content_type = exposition.choose_encoder(request.getHeader('Accept'))
request.setHeader(b'Content-Type', content_type.encode('ascii'))
return encoder(self.registry)
MetricsResource = lambda registry=REGISTRY: WSGIResource(
reactor, reactor.getThreadPool(), exposition.make_wsgi_app(registry)
)
118 changes: 118 additions & 0 deletions tests/test_asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from __future__ import absolute_import, unicode_literals

import sys
from unittest import TestCase
from parameterized import parameterized

from prometheus_client import CollectorRegistry, Counter, generate_latest
from prometheus_client.exposition import CONTENT_TYPE_LATEST

if sys.version_info < (2, 7):
from unittest2 import skipUnless
else:
from unittest import skipUnless

try:
# Python >3.5 only
from prometheus_client import make_asgi_app
import asyncio
from asgiref.testing import ApplicationCommunicator
HAVE_ASYNCIO_AND_ASGI = True
except ImportError:
HAVE_ASYNCIO_AND_ASGI = False


def setup_testing_defaults(scope):
scope.update(
{
"client": ("127.0.0.1", 32767),
"headers": [],
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"scheme": "http",
"server": ("127.0.0.1", 80),
"type": "http",
}
)


class ASGITest(TestCase):
@skipUnless(HAVE_ASYNCIO_AND_ASGI, "Don't have asyncio/asgi installed.")
def setUp(self):
self.registry = CollectorRegistry()
self.captured_status = None
self.captured_headers = None
# Setup ASGI scope
self.scope = {}
setup_testing_defaults(self.scope)
self.communicator = None

def tearDown(self):
if self.communicator:
asyncio.get_event_loop().run_until_complete(
self.communicator.wait()
)

def seed_app(self, app):
self.communicator = ApplicationCommunicator(app, self.scope)

def send_input(self, payload):
asyncio.get_event_loop().run_until_complete(
self.communicator.send_input(payload)
)

def send_default_request(self):
self.send_input({"type": "http.request", "body": b""})

def get_output(self):
output = asyncio.get_event_loop().run_until_complete(
self.communicator.receive_output(0)
)
return output

def get_all_output(self):
outputs = []
while True:
try:
outputs.append(self.get_output())
except asyncio.TimeoutError:
break
return outputs

@parameterized.expand([
["counter", "A counter", 2],
["counter", "Another counter", 3],
["requests", "Number of requests", 5],
["failed_requests", "Number of failed requests", 7],
])
def test_reports_metrics(self, metric_name, help_text, increments):
"""
ASGI app serves the metrics from the provided registry.
"""
c = Counter(metric_name, help_text, registry=self.registry)
for _ in range(increments):
c.inc()
# Create and run ASGI app
app = make_asgi_app(self.registry)
self.seed_app(app)
self.send_default_request()
# Assert outputs
outputs = self.get_all_output()
# Assert outputs
self.assertEqual(len(outputs), 2)
response_start = outputs[0]
self.assertEqual(response_start['type'], 'http.response.start')
response_body = outputs[1]
self.assertEqual(response_body['type'], 'http.response.body')
# Status code
self.assertEqual(response_start['status'], 200)
# Headers
self.assertEqual(len(response_start['headers']), 1)
self.assertEqual(response_start['headers'][0], (b"Content-Type", CONTENT_TYPE_LATEST.encode('utf8')))
# Body
output = response_body['body'].decode('utf8')
self.assertIn("# HELP " + metric_name + "_total " + help_text + "\n", output)
self.assertIn("# TYPE " + metric_name + "_total counter\n", output)
self.assertIn(metric_name + "_total " + str(increments) + ".0\n", output)
Loading