-
Notifications
You must be signed in to change notification settings - Fork 801
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Emil Madsen <sovende@gmail.com>
- Loading branch information
Showing
4 changed files
with
149 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
from __future__ import absolute_import, unicode_literals | ||
|
||
import sys | ||
from unittest import TestCase | ||
from parameterized import parameterized | ||
|
||
from prometheus_client import CollectorRegistry, Counter, generate_latest | ||
from prometheus_client.exposition import CONTENT_TYPE_LATEST | ||
|
||
if sys.version_info < (2, 7): | ||
from unittest2 import skipUnless | ||
else: | ||
from unittest import skipUnless | ||
|
||
try: | ||
# Python >3.5 only | ||
from prometheus_client import make_asgi_app | ||
import asyncio | ||
from asgiref.testing import ApplicationCommunicator | ||
HAVE_ASYNCIO_AND_ASGI = True | ||
except ImportError: | ||
HAVE_ASYNCIO_AND_ASGI = False | ||
|
||
|
||
def setup_testing_defaults(scope): | ||
scope.update( | ||
{ | ||
"client": ("127.0.0.1", 32767), | ||
"headers": [], | ||
"http_version": "1.0", | ||
"method": "GET", | ||
"path": "/", | ||
"query_string": b"", | ||
"scheme": "http", | ||
"server": ("127.0.0.1", 80), | ||
"type": "http", | ||
} | ||
) | ||
|
||
|
||
class ASGITest(TestCase): | ||
@skipUnless(HAVE_ASYNCIO_AND_ASGI, "Don't have asyncio/asgi installed.") | ||
def setUp(self): | ||
self.registry = CollectorRegistry() | ||
self.captured_status = None | ||
self.captured_headers = None | ||
# Setup ASGI scope | ||
self.scope = {} | ||
setup_testing_defaults(self.scope) | ||
self.communicator = None | ||
|
||
def tearDown(self): | ||
if self.communicator: | ||
asyncio.get_event_loop().run_until_complete( | ||
self.communicator.wait() | ||
) | ||
|
||
def seed_app(self, app): | ||
self.communicator = ApplicationCommunicator(app, self.scope) | ||
|
||
def send_input(self, payload): | ||
asyncio.get_event_loop().run_until_complete( | ||
self.communicator.send_input(payload) | ||
) | ||
|
||
def send_default_request(self): | ||
self.send_input({"type": "http.request", "body": b""}) | ||
|
||
def get_output(self): | ||
output = asyncio.get_event_loop().run_until_complete( | ||
self.communicator.receive_output(0) | ||
) | ||
return output | ||
|
||
def get_all_output(self): | ||
outputs = [] | ||
while True: | ||
try: | ||
outputs.append(self.get_output()) | ||
except asyncio.TimeoutError: | ||
break | ||
return outputs | ||
|
||
@parameterized.expand([ | ||
["counter", "A counter", 2], | ||
["counter", "Another counter", 3], | ||
["requests", "Number of requests", 5], | ||
["failed_requests", "Number of failed requests", 7], | ||
]) | ||
def test_reports_metrics(self, metric_name, help_text, increments): | ||
""" | ||
ASGI app serves the metrics from the provided registry. | ||
""" | ||
c = Counter(metric_name, help_text, registry=self.registry) | ||
for _ in range(increments): | ||
c.inc() | ||
# Create and run ASGI app | ||
app = make_asgi_app(self.registry) | ||
self.seed_app(app) | ||
self.send_default_request() | ||
# Assert outputs | ||
outputs = self.get_all_output() | ||
# Assert outputs | ||
self.assertEqual(len(outputs), 2) | ||
response_start = outputs[0] | ||
self.assertEqual(response_start['type'], 'http.response.start') | ||
response_body = outputs[1] | ||
self.assertEqual(response_body['type'], 'http.response.body') | ||
# Status code | ||
self.assertEqual(response_start['status'], 200) | ||
# Headers | ||
self.assertEqual(len(response_start['headers']), 1) | ||
self.assertEqual(response_start['headers'][0], (b"Content-Type", CONTENT_TYPE_LATEST.encode('utf8'))) | ||
# Body | ||
output = response_body['body'].decode('utf8') | ||
self.assertIn("# HELP " + metric_name + "_total " + help_text + "\n", output) | ||
self.assertIn("# TYPE " + metric_name + "_total counter\n", output) | ||
self.assertIn(metric_name + "_total " + str(increments) + ".0\n", output) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters