diff --git a/src/oic/oauth2/__init__.py b/src/oic/oauth2/__init__.py index d102ddbb..ee2279d6 100644 --- a/src/oic/oauth2/__init__.py +++ b/src/oic/oauth2/__init__.py @@ -747,7 +747,10 @@ def parse_request_response(self, reqresp, response, body_type, state="", **kwarg ) if response: - if body_type == "txt": + if body_type is None: + # There is no content-type for zero content length. Return the status code. + return reqresp.status_code + elif body_type == "txt": # no meaning trying to parse unstructured text return reqresp.text return self.parse_response( diff --git a/src/oic/oauth2/util.py b/src/oic/oauth2/util.py index 1562e1de..efb4b895 100644 --- a/src/oic/oauth2/util.py +++ b/src/oic/oauth2/util.py @@ -185,6 +185,8 @@ def verify_header(reqresp, body_type): logger.debug("resp.txt: %s" % (sanitize(reqresp.text),)) if body_type == "": + if int(reqresp.headers["content-length"]) == 0: + return None _ctype = reqresp.headers["content-type"] if match_to_("application/json", _ctype): body_type = "json" diff --git a/tests/test_oauth2.py b/tests/test_oauth2.py index b5f2e36e..4247e9ff 100644 --- a/tests/test_oauth2.py +++ b/tests/test_oauth2.py @@ -5,6 +5,7 @@ from urllib.parse import urlparse import pytest +import requests import responses from oic.oauth2 import Client @@ -25,6 +26,7 @@ from oic.oauth2.message import ExtensionTokenRequest from oic.oauth2.message import FormatError from oic.oauth2.message import GrantExpired +from oic.oauth2.message import Message from oic.oauth2.message import MessageTuple from oic.oauth2.message import MissingRequiredAttribute from oic.oauth2.message import OauthMessageFactory @@ -626,6 +628,19 @@ class ExtensionMessageFactory(OauthMessageFactory): assert isinstance(resp, AccessTokenResponse) assert resp["access_token"] == "Token" + def test_parse_request_response_should_return_status_code_if_content_length_zero( + self, + ): + + resp = requests.Response() + resp.headers = requests.models.CaseInsensitiveDict(data={"content-length": "0"}) + resp.status_code = 200 + parsed_response = self.client.parse_request_response( + reqresp=resp, response=Message, body_type="" + ) + + assert parsed_response == 200 + class TestServer(object): @pytest.fixture(autouse=True) diff --git a/tests/test_util.py b/tests/test_util.py index f74f783e..54d72fea 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -204,14 +204,15 @@ def test_match_to(): def test_verify_header(): class FakeResponse: def __init__(self, header): - self.headers = {"content-type": header} + self.headers = header self.text = "TEST_RESPONSE" - json_header = "application/json" - jwt_header = "application/jwt" - default_header = util.DEFAULT_POST_CONTENT_TYPE - plain_text_header = "text/plain" - undefined_header = "undefined" + json_header = {"content-type": "application/json"} + jwt_header = {"content-type": "application/jwt"} + default_header = {"content-type": util.DEFAULT_POST_CONTENT_TYPE} + plain_text_header = {"content-type": "text/plain"} + undefined_header = {"content-type": "undefined"} + zero_content_length_header = {"content-length": "0"} assert util.verify_header(FakeResponse(json_header), "json") == "json" assert util.verify_header(FakeResponse(jwt_header), "json") == "jwt" @@ -223,6 +224,7 @@ def __init__(self, header): util.verify_header(FakeResponse(plain_text_header), "urlencoded") == "urlencoded" ) + assert util.verify_header(FakeResponse(zero_content_length_header), "") is None with pytest.raises(ValueError): util.verify_header(FakeResponse(json_header), "urlencoded") diff --git a/tests/test_x_client.py b/tests/test_x_client.py index 6437079c..35afd715 100644 --- a/tests/test_x_client.py +++ b/tests/test_x_client.py @@ -1,3 +1,7 @@ +from urllib.parse import parse_qs + +import responses + from oic import rndstr from oic.extension.client import Client from oic.extension.provider import Provider @@ -170,3 +174,29 @@ def test_pkce_token(): _info = constructor.get_info(access_grant) assert _info["code_challenge_method"] == args["code_challenge_method"] assert _info["code_challenge"] == args["code_challenge"] + + +@responses.activate +def test_do_token_revocation(): + request_args = { + "token": "access_token", + "token_type_hint": "access_token", + "client_id": "client_id", + "client_secret": "client_secret", + } + token_revocation_endpoint = "https://example.com/revoke" + # Mock zero content length body. + responses.add( + responses.POST, + token_revocation_endpoint, + body="", + status=200, + headers={"content-length": "0"}, + ) + resp = Client().do_token_revocation( + request_args=request_args, endpoint=token_revocation_endpoint + ) + parsed_request: dict = parse_qs(responses.calls[0].request.body) + assert resp == 200 + assert parsed_request["token"] == ["access_token"] + assert parsed_request["token_type_hint"] == ["access_token"]