-
-
Notifications
You must be signed in to change notification settings - Fork 833
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for gzip, deflate, and brotli decoding
- Loading branch information
1 parent
9d59a5a
commit 39b57c9
Showing
5 changed files
with
213 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
try: | ||
import brotli | ||
except ImportError: | ||
brotli = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,41 +1,118 @@ | ||
""" | ||
Handlers for Content-Encoding. | ||
See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding | ||
""" | ||
import typing | ||
import zlib | ||
|
||
from .compat import brotli | ||
|
||
|
||
class Decoder: | ||
def decode(self, data: bytes) -> bytes: | ||
raise NotImplementedError() # pragma: nocover | ||
|
||
def flush(self) -> bytes: | ||
raise NotImplementedError() # pragma: nocover | ||
|
||
|
||
class IdentityDecoder(Decoder): | ||
def decode(self, data: bytes) -> bytes: | ||
return data | ||
|
||
def flush(self) -> bytes: | ||
return b"" | ||
|
||
|
||
class DeflateDecoder(Decoder): | ||
""" | ||
Handle 'deflate' decoding. | ||
See: https://stackoverflow.com/questions/1838699 | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) | ||
|
||
def decode(self, data: bytes) -> bytes: | ||
return self.decompressor.decompress(data) | ||
|
||
def flush(self) -> bytes: | ||
return self.decompressor.flush() | ||
|
||
|
||
class IdentityDecoder: | ||
def decode(self, chunk: bytes) -> bytes: | ||
return chunk | ||
class GZipDecoder(Decoder): | ||
""" | ||
Handle 'gzip' decoding. | ||
See: https://stackoverflow.com/questions/1838699 | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16) | ||
|
||
def decode(self, data: bytes) -> bytes: | ||
return self.decompressor.decompress(data) | ||
|
||
def flush(self) -> bytes: | ||
return self.decompressor.flush() | ||
|
||
|
||
class BrotliDecoder(Decoder): | ||
""" | ||
Handle 'brotli' decoding. | ||
Requires `pip install brotlipy`. | ||
See: https://brotlipy.readthedocs.io/ | ||
""" | ||
|
||
def __init__(self) -> None: | ||
assert ( | ||
brotli is not None | ||
), "The 'brotlipy' library must be installed to use 'BrotliDecoder'" | ||
self.decompressor = brotli.Decompressor() | ||
|
||
def decode(self, data: bytes) -> bytes: | ||
return self.decompressor.decompress(data) | ||
|
||
def flush(self) -> bytes: | ||
self.decompressor.finish() | ||
return b"" | ||
|
||
|
||
# class DeflateDecoder: | ||
# pass | ||
# | ||
# | ||
# class GZipDecoder: | ||
# pass | ||
# | ||
# | ||
# class BrotliDecoder: | ||
# pass | ||
# | ||
# | ||
# class MultiDecoder: | ||
# def __init__(self, children): | ||
# self.children = children | ||
# | ||
# def decode(self, chunk: bytes) -> bytes: | ||
# data = chunk | ||
# for child in children: | ||
# data = child.decode(data) | ||
# return data | ||
# | ||
# def flush(self) -> bytes: | ||
# data = b'' | ||
# for child in children: | ||
# data = child.decode(data) | ||
# data = child.flush() | ||
# return data | ||
class MultiDecoder(Decoder): | ||
""" | ||
Handle the case where mutliple encodings have been applied. | ||
""" | ||
|
||
def __init__(self, children: typing.Sequence[Decoder]) -> None: | ||
""" | ||
children should be a sequence of decoders in the order in which | ||
each was applied. | ||
""" | ||
# Note that we reverse the order for decoding. | ||
self.children = list(reversed(children)) | ||
|
||
def decode(self, data: bytes) -> bytes: | ||
for child in self.children: | ||
data = child.decode(data) | ||
return data | ||
|
||
def flush(self) -> bytes: | ||
data = b"" | ||
for child in self.children: | ||
data = child.decode(data) + child.flush() | ||
return data | ||
|
||
|
||
SUPPORTED_DECODERS = { | ||
b"gzip": GZipDecoder, | ||
b"deflate": DeflateDecoder, | ||
b"identity": IdentityDecoder, | ||
b"br": BrotliDecoder, | ||
} | ||
|
||
|
||
if brotli is None: | ||
SUPPORTED_DECODERS.pop(b"br") # pragma: nocover |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,10 @@ | ||
certifi | ||
h11 | ||
|
||
# Optional | ||
brotlipy | ||
|
||
|
||
# Testing | ||
autoflake | ||
black | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import zlib | ||
|
||
import brotli | ||
import pytest | ||
|
||
import httpcore | ||
|
||
|
||
def test_deflate(): | ||
body = b"test 123" | ||
compressor = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS) | ||
compressed_body = compressor.compress(body) + compressor.flush() | ||
|
||
headers = [(b"Content-Encoding", b"deflate")] | ||
response = httpcore.Response(200, headers=headers, body=compressed_body) | ||
assert response.body == body | ||
|
||
|
||
def test_gzip(): | ||
body = b"test 123" | ||
compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) | ||
compressed_body = compressor.compress(body) + compressor.flush() | ||
|
||
headers = [(b"Content-Encoding", b"gzip")] | ||
response = httpcore.Response(200, headers=headers, body=compressed_body) | ||
assert response.body == body | ||
|
||
|
||
def test_brotli(): | ||
body = b"test 123" | ||
compressed_body = brotli.compress(body) | ||
|
||
headers = [(b"Content-Encoding", b"br")] | ||
response = httpcore.Response(200, headers=headers, body=compressed_body) | ||
assert response.body == body | ||
|
||
|
||
def test_multi(): | ||
body = b"test 123" | ||
|
||
deflate_compressor = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS) | ||
compressed_body = deflate_compressor.compress(body) + deflate_compressor.flush() | ||
|
||
gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) | ||
compressed_body = ( | ||
gzip_compressor.compress(compressed_body) + gzip_compressor.flush() | ||
) | ||
|
||
headers = [(b"Content-Encoding", b"deflate, gzip")] | ||
response = httpcore.Response(200, headers=headers, body=compressed_body) | ||
assert response.body == body | ||
|
||
|
||
def test_multi_with_identity(): | ||
body = b"test 123" | ||
compressed_body = brotli.compress(body) | ||
|
||
headers = [(b"Content-Encoding", b"br, identity")] | ||
response = httpcore.Response(200, headers=headers, body=compressed_body) | ||
assert response.body == body | ||
|
||
headers = [(b"Content-Encoding", b"identity, br")] | ||
response = httpcore.Response(200, headers=headers, body=compressed_body) | ||
assert response.body == body | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_streaming(): | ||
body = b"test 123" | ||
compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) | ||
|
||
async def compress(body): | ||
yield compressor.compress(body) | ||
yield compressor.flush() | ||
|
||
headers = [(b"Content-Encoding", b"gzip")] | ||
response = httpcore.Response(200, headers=headers, body=compress(body)) | ||
assert not hasattr(response, "body") | ||
assert await response.read() == body |