Skip to content

Commit

Permalink
Add support for gzip content-encoding (#776)
Browse files Browse the repository at this point in the history
* Add support for gzip Content-Encoding

Signed-off-by: ivan-valkov <iv.v.valkov@gmail.com>

* remove the case insensitive check for accept header

Signed-off-by: ivan-valkov <iv.v.valkov@gmail.com>

* Add option to disable compression

Signed-off-by: ivan-valkov <iv.v.valkov@gmail.com>

* sMake disable_compression an argument instead of env var

Signed-off-by: ivan-valkov <iv.v.valkov@gmail.com>

* Update readme

Signed-off-by: ivan-valkov <iv.v.valkov@gmail.com>

* Fix linters

Signed-off-by: ivan-valkov <iv.v.valkov@gmail.com>
  • Loading branch information
ivan-valkov authored Mar 9, 2022
1 parent c044b88 commit 3c91b3f
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 90 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ from prometheus_client import start_wsgi_server
start_wsgi_server(8000)
```

By default, the WSGI application will respect `Accept-Encoding:gzip` headers used by Prometheus
and compress the response if such a header is present. This behaviour can be disabled by passing
`disable_compression=True` when creating the app, like this:

```python
app = make_wsgi_app(disable_compression=True)
```

#### ASGI

To use Prometheus with [ASGI](http://asgi.readthedocs.org/en/latest/), there is
Expand All @@ -351,6 +359,14 @@ app = make_asgi_app()
Such an application can be useful when integrating Prometheus metrics with ASGI
apps.

By default, the WSGI application will respect `Accept-Encoding:gzip` headers used by Prometheus
and compress the response if such a header is present. This behaviour can be disabled by passing
`disable_compression=True` when creating the app, like this:

```python
app = make_asgi_app(disable_compression=True)
```

#### Flask

To use Prometheus with [Flask](http://flask.pocoo.org/) we need to serve metrics through a Prometheus WSGI application. This can be achieved using [Flask's application dispatching](http://flask.pocoo.org/docs/latest/patterns/appdispatch/). Below is a working example.
Expand Down
17 changes: 11 additions & 6 deletions prometheus_client/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .registry import CollectorRegistry, REGISTRY


def make_asgi_app(registry: CollectorRegistry = REGISTRY) -> Callable:
def make_asgi_app(registry: CollectorRegistry = REGISTRY, disable_compression: bool = False) -> Callable:
"""Create a ASGI app which serves the metrics from a registry."""

async def prometheus_app(scope, receive, send):
Expand All @@ -14,20 +14,25 @@ async def prometheus_app(scope, receive, send):
params = parse_qs(scope.get('query_string', b''))
accept_header = "Accept: " + ",".join([
value.decode("utf8") for (name, value) in scope.get('headers')
if name.decode("utf8") == 'accept'
if name.decode("utf8").lower() == 'accept'
])
accept_encoding_header = ",".join([
value.decode("utf8") for (name, value) in scope.get('headers')
if name.decode("utf8").lower() == 'accept-encoding'
])
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression)
formatted_headers = []
for header in headers:
formatted_headers.append(tuple(x.encode('utf8') for x in header))
# Return output
payload = await receive()
if payload.get("type") == "http.request":
await send(
{
"type": "http.response.start",
"status": int(status.split(' ')[0]),
"headers": [
tuple(x.encode('utf8') for x in header)
]
"headers": formatted_headers,
}
)
await send({"type": "http.response.body", "body": output})
Expand Down
137 changes: 78 additions & 59 deletions prometheus_client/exposition.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
from contextlib import closing
import gzip
from http.server import BaseHTTPRequestHandler
import os
import socket
Expand Down Expand Up @@ -93,32 +94,39 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
return new_request


def _bake_output(registry, accept_header, params):
def _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression):
"""Bake output for metrics output."""
encoder, content_type = choose_encoder(accept_header)
# Choose the correct plain text format of the output.
formatter, content_type = choose_formatter(accept_header)
if 'name[]' in params:
registry = registry.restricted_registry(params['name[]'])
output = encoder(registry)
return '200 OK', ('Content-Type', content_type), output
output = formatter(registry)
headers = [('Content-Type', content_type)]
# If gzip encoding required, gzip the output.
if not disable_compression and gzip_accepted(accept_encoding_header):
output = gzip.compress(output)
headers.append(('Content-Encoding', 'gzip'))
return '200 OK', headers, output


def make_wsgi_app(registry: CollectorRegistry = REGISTRY) -> Callable:
def make_wsgi_app(registry: CollectorRegistry = REGISTRY, disable_compression: bool = False) -> Callable:
"""Create a WSGI app which serves the metrics from a registry."""

def prometheus_app(environ, start_response):
# Prepare parameters
accept_header = environ.get('HTTP_ACCEPT')
accept_encoding_header = environ.get('HTTP_ACCEPT_ENCODING')
params = parse_qs(environ.get('QUERY_STRING', ''))
if environ['PATH_INFO'] == '/favicon.ico':
# Serve empty response for browsers
status = '200 OK'
header = ('', '')
headers = [('', '')]
output = b''
else:
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression)
# Return output
start_response(status, [header])
start_response(status, headers)
return [output]

return prometheus_app
Expand Down Expand Up @@ -152,8 +160,10 @@ def _get_best_family(address, port):

def start_wsgi_server(port: int, addr: str = '0.0.0.0', registry: CollectorRegistry = REGISTRY) -> None:
"""Starts a WSGI server for prometheus metrics as a daemon thread."""

class TmpServer(ThreadingWSGIServer):
"""Copy of ThreadingWSGIServer to update address_family locally"""

TmpServer.address_family, addr = _get_best_family(addr, port)
app = make_wsgi_app(registry)
httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler)
Expand Down Expand Up @@ -227,7 +237,7 @@ def sample_line(line):
return ''.join(output).encode('utf-8')


def choose_encoder(accept_header: str) -> Tuple[Callable[[CollectorRegistry], bytes], str]:
def choose_formatter(accept_header: str) -> Tuple[Callable[[CollectorRegistry], bytes], str]:
accept_header = accept_header or ''
for accepted in accept_header.split(','):
if accepted.split(';')[0].strip() == 'application/openmetrics-text':
Expand All @@ -236,6 +246,14 @@ def choose_encoder(accept_header: str) -> Tuple[Callable[[CollectorRegistry], by
return generate_latest, CONTENT_TYPE_LATEST


def gzip_accepted(accept_encoding_header: str) -> bool:
accept_encoding_header = accept_encoding_header or ''
for accepted in accept_encoding_header.split(','):
if accepted.split(';')[0].strip().lower() == 'gzip':
return True
return False


class MetricsHandler(BaseHTTPRequestHandler):
"""HTTP handler that gives metrics from ``REGISTRY``."""
registry: CollectorRegistry = REGISTRY
Expand All @@ -244,12 +262,14 @@ def do_GET(self) -> None:
# Prepare parameters
registry = self.registry
accept_header = self.headers.get('Accept')
accept_encoding_header = self.headers.get('Accept-Encoding')
params = parse_qs(urlparse(self.path).query)
# Bake output
status, header, output = _bake_output(registry, accept_header, params)
status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, False)
# Return output
self.send_response(int(status.split(' ')[0]))
self.send_header(*header)
for header in headers:
self.send_header(*header)
self.end_headers()
self.wfile.write(output)

Expand Down Expand Up @@ -289,14 +309,13 @@ def write_to_textfile(path: str, registry: CollectorRegistry) -> None:


def _make_handler(
url: str,
method: str,
timeout: Optional[float],
headers: Sequence[Tuple[str, str]],
data: bytes,
base_handler: type,
url: str,
method: str,
timeout: Optional[float],
headers: Sequence[Tuple[str, str]],
data: bytes,
base_handler: type,
) -> Callable[[], None]:

def handle() -> None:
request = Request(url, data=data)
request.get_method = lambda: method # type: ignore
Expand All @@ -310,11 +329,11 @@ def handle() -> None:


def default_handler(
url: str,
method: str,
timeout: Optional[float],
headers: List[Tuple[str, str]],
data: bytes,
url: str,
method: str,
timeout: Optional[float],
headers: List[Tuple[str, str]],
data: bytes,
) -> Callable[[], None]:
"""Default handler that implements HTTP/HTTPS connections.
Expand All @@ -324,11 +343,11 @@ def default_handler(


def passthrough_redirect_handler(
url: str,
method: str,
timeout: Optional[float],
headers: List[Tuple[str, str]],
data: bytes,
url: str,
method: str,
timeout: Optional[float],
headers: List[Tuple[str, str]],
data: bytes,
) -> Callable[[], None]:
"""
Handler that automatically trusts redirect responses for all HTTP methods.
Expand All @@ -344,13 +363,13 @@ def passthrough_redirect_handler(


def basic_auth_handler(
url: str,
method: str,
timeout: Optional[float],
headers: List[Tuple[str, str]],
data: bytes,
username: str = None,
password: str = None,
url: str,
method: str,
timeout: Optional[float],
headers: List[Tuple[str, str]],
data: bytes,
username: str = None,
password: str = None,
) -> Callable[[], None]:
"""Handler that implements HTTP/HTTPS connections with Basic Auth.
Expand All @@ -371,12 +390,12 @@ def handle():


def push_to_gateway(
gateway: str,
job: str,
registry: CollectorRegistry,
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
gateway: str,
job: str,
registry: CollectorRegistry,
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
) -> None:
"""Push metrics to the given pushgateway.
Expand Down Expand Up @@ -420,12 +439,12 @@ def push_to_gateway(


def pushadd_to_gateway(
gateway: str,
job: str,
registry: Optional[CollectorRegistry],
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
gateway: str,
job: str,
registry: Optional[CollectorRegistry],
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
) -> None:
"""PushAdd metrics to the given pushgateway.
Expand All @@ -451,11 +470,11 @@ def pushadd_to_gateway(


def delete_from_gateway(
gateway: str,
job: str,
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
gateway: str,
job: str,
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
) -> None:
"""Delete metrics from the given pushgateway.
Expand All @@ -480,13 +499,13 @@ def delete_from_gateway(


def _use_gateway(
method: str,
gateway: str,
job: str,
registry: Optional[CollectorRegistry],
grouping_key: Optional[Dict[str, Any]],
timeout: Optional[float],
handler: Callable,
method: str,
gateway: str,
job: str,
registry: Optional[CollectorRegistry],
grouping_key: Optional[Dict[str, Any]],
timeout: Optional[float],
handler: Callable,
) -> None:
gateway_url = urlparse(gateway)
# See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
Expand Down
Loading

0 comments on commit 3c91b3f

Please sign in to comment.