From 2f0baa01ca76ac8fbd1dac902d57f94ec5de7791 Mon Sep 17 00:00:00 2001 From: pgjones Date: Tue, 19 Jan 2021 16:26:16 +0000 Subject: [PATCH] Move sansio code to a sansio module This makes the naming a little easier (called request and response), and allows the sansio module to be considered private/development. The latter is desired as it isn't clear (yet) how to specify the IO interface - in an abstract manner so that both sync and async implementations exist. This also allows further sansio, rather than WSGI based code, to be added in a clear location - clear to future authors that the code must not be WSGI or ASGI or IO specific. --- src/werkzeug/sansio/__init__.py | 0 src/werkzeug/sansio/request.py | 418 ++++++++++++++++++++ src/werkzeug/sansio/response.py | 630 ++++++++++++++++++++++++++++++ src/werkzeug/wrappers/request.py | 412 +------------------ src/werkzeug/wrappers/response.py | 626 +---------------------------- 5 files changed, 1050 insertions(+), 1036 deletions(-) create mode 100644 src/werkzeug/sansio/__init__.py create mode 100644 src/werkzeug/sansio/request.py create mode 100644 src/werkzeug/sansio/response.py diff --git a/src/werkzeug/sansio/__init__.py b/src/werkzeug/sansio/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/werkzeug/sansio/request.py b/src/werkzeug/sansio/request.py new file mode 100644 index 0000000000..71c8918e19 --- /dev/null +++ b/src/werkzeug/sansio/request.py @@ -0,0 +1,418 @@ +import typing as t +from datetime import datetime + +from .._internal import _to_str +from ..datastructures import Accept +from ..datastructures import Authorization +from ..datastructures import CharsetAccept +from ..datastructures import ETags +from ..datastructures import Headers +from ..datastructures import HeaderSet +from ..datastructures import IfRange +from ..datastructures import ImmutableList +from ..datastructures import ImmutableMultiDict +from ..datastructures import LanguageAccept +from ..datastructures import MIMEAccept +from ..datastructures import MultiDict +from ..datastructures import Range +from ..datastructures import RequestCacheControl +from ..http import parse_accept_header +from ..http import parse_authorization_header +from ..http import parse_cache_control_header +from ..http import parse_cookie +from ..http import parse_date +from ..http import parse_etags +from ..http import parse_if_range_header +from ..http import parse_list_header +from ..http import parse_options_header +from ..http import parse_range_header +from ..http import parse_set_header +from ..urls import url_decode +from ..useragents import UserAgent +from ..utils import cached_property +from ..utils import header_property +from ..wsgi import get_content_length + + +class Request: + """Represents the non-IO parts of a HTTP request, specifically the + headers, and method. + + This is best used if you aren't using WSGI, if you are the + :cls:`Request` class is the better choice. + + .. versionadded:: 2.0 + """ + + #: the charset for the request, defaults to utf-8 + charset = "utf-8" + + #: the error handling procedure for errors, defaults to 'replace' + encoding_errors = "replace" + + #: the class to use for `args` and `form`. The default is an + #: :class:`~werkzeug.datastructures.ImmutableMultiDict` which supports + #: multiple values per key. alternatively it makes sense to use an + #: :class:`~werkzeug.datastructures.ImmutableOrderedMultiDict` which + #: preserves order or a :class:`~werkzeug.datastructures.ImmutableDict` + #: which is the fastest but only remembers the last key. It is also + #: possible to use mutable structures, but this is not recommended. + #: + #: .. versionadded:: 0.6 + parameter_storage_class: t.Type[MultiDict] = ImmutableMultiDict + + #: The type to be used for dict values from the incoming WSGI + #: environment. (For example for :attr:`cookies`.) By default an + #: :class:`~werkzeug.datastructures.ImmutableMultiDict` is used. + #: + #: .. versionchanged:: 1.0.0 + #: Changed to ``ImmutableMultiDict`` to support multiple values. + #: + #: .. versionadded:: 0.6 + dict_storage_class: t.Type[MultiDict] = ImmutableMultiDict + + #: the type to be used for list values from the incoming WSGI environment. + #: By default an :class:`~werkzeug.datastructures.ImmutableList` is used + #: (for example for :attr:`access_list`). + #: + #: .. versionadded:: 0.6 + list_storage_class: t.Type[t.List] = ImmutableList + + def __init__( + self, + method: str, + path: str, + query_string: bytes, + headers: Headers, + scheme: str, + remote_addr: t.Optional[str], + ) -> None: + self.method = method.upper() + self.query_string = query_string + self.headers = headers + self.scheme = scheme + self.path = "/" + path.lstrip("/") + self.remote_addr = remote_addr + + def __repr__(self) -> str: + return f"<{type(self).__name__} {self.path} [{self.method}]>" + + @property + def url_charset(self) -> str: + """The charset that is assumed for URLs. Defaults to the value + of :attr:`charset`. + + .. versionadded:: 0.6 + """ + return self.charset + + @cached_property + def args(self) -> "MultiDict[str, str]": + """The parsed URL parameters (the part in the URL after the question + mark). + + By default an + :class:`~werkzeug.datastructures.ImmutableMultiDict` + is returned from this function. This can be changed by setting + :attr:`parameter_storage_class` to a different type. This might + be necessary if the order of the form data is important. + """ + return url_decode( + self.query_string, + self.url_charset, + errors=self.encoding_errors, + cls=self.parameter_storage_class, + ) + + @cached_property + def access_route(self) -> t.List[str]: + """If a forwarded header exists this is a list of all ip addresses + from the client ip to the last proxy server. + """ + if "X-Forwarded-For" in self.headers: + return self.list_storage_class( + parse_list_header(self.headers["X-Forwarded-For"]) + ) + elif self.remote_addr is not None: + return self.list_storage_class([self.remote_addr]) + return self.list_storage_class() + + @cached_property + def full_path(self) -> str: + """Requested path, including the query string.""" + return f"{self.path}?{_to_str(self.query_string, self.url_charset)}" + + @property + def is_secure(self) -> bool: + "`True` if the request is secure." + return self.scheme in {"https", "wss"} + + @cached_property + def cookies(self) -> "ImmutableMultiDict[str, str]": + """A :class:`dict` with the contents of all cookies transmitted with + the request.""" + return parse_cookie( # type: ignore + self.headers.get("Cookie"), + self.charset, + self.encoding_errors, + cls=self.dict_storage_class, + ) + + # Common Descriptors + + content_type = header_property[str]( + "Content-Type", + doc="""The Content-Type entity-header field indicates the media + type of the entity-body sent to the recipient or, in the case of + the HEAD method, the media type that would have been sent had + the request been a GET.""", + read_only=True, + ) + + @cached_property + def content_length(self) -> t.Optional[int]: + """The Content-Length entity-header field indicates the size of the + entity-body in bytes or, in the case of the HEAD method, the size of + the entity-body that would have been sent had the request been a + GET. + """ + return get_content_length(self.headers) + + content_encoding = header_property[str]( + "Content-Encoding", + doc="""The Content-Encoding entity-header field is used as a + modifier to the media-type. When present, its value indicates + what additional content codings have been applied to the + entity-body, and thus what decoding mechanisms must be applied + in order to obtain the media-type referenced by the Content-Type + header field. + + .. versionadded:: 0.9""", + read_only=True, + ) + content_md5 = header_property[str]( + "Content-MD5", + doc="""The Content-MD5 entity-header field, as defined in + RFC 1864, is an MD5 digest of the entity-body for the purpose of + providing an end-to-end message integrity check (MIC) of the + entity-body. (Note: a MIC is good for detecting accidental + modification of the entity-body in transit, but is not proof + against malicious attacks.) + + .. versionadded:: 0.9""", + read_only=True, + ) + referrer = header_property[str]( + "Referer", + doc="""The Referer[sic] request-header field allows the client + to specify, for the server's benefit, the address (URI) of the + resource from which the Request-URI was obtained (the + "referrer", although the header field is misspelled).""", + read_only=True, + ) + date = header_property( + "Date", + None, + parse_date, + doc="""The Date general-header field represents the date and + time at which the message was originated, having the same + semantics as orig-date in RFC 822.""", + read_only=True, + ) + max_forwards = header_property( + "Max-Forwards", + None, + int, + doc="""The Max-Forwards request-header field provides a + mechanism with the TRACE and OPTIONS methods to limit the number + of proxies or gateways that can forward the request to the next + inbound server.""", + read_only=True, + ) + + def _parse_content_type(self) -> None: + if not hasattr(self, "_parsed_content_type"): + self._parsed_content_type = parse_options_header( + self.headers.get("Content-Type", "") + ) + + @property + def mimetype(self) -> str: + """Like :attr:`content_type`, but without parameters (eg, without + charset, type etc.) and always lowercase. For example if the content + type is ``text/HTML; charset=utf-8`` the mimetype would be + ``'text/html'``. + """ + self._parse_content_type() + return self._parsed_content_type[0].lower() + + @property + def mimetype_params(self) -> t.Dict[str, str]: + """The mimetype parameters as dict. For example if the content + type is ``text/html; charset=utf-8`` the params would be + ``{'charset': 'utf-8'}``. + """ + self._parse_content_type() + return self._parsed_content_type[1] + + @cached_property + def pragma(self) -> HeaderSet: + """The Pragma general-header field is used to include + implementation-specific directives that might apply to any recipient + along the request/response chain. All pragma directives specify + optional behavior from the viewpoint of the protocol; however, some + systems MAY require that behavior be consistent with the directives. + """ + return parse_set_header(self.headers.get("Pragma", "")) + + # Accept + + @cached_property + def accept_mimetypes(self) -> MIMEAccept: + """List of mimetypes this client supports as + :class:`~werkzeug.datastructures.MIMEAccept` object. + """ + return parse_accept_header(self.headers.get("Accept"), MIMEAccept) + + @cached_property + def accept_charsets(self) -> CharsetAccept: + """List of charsets this client supports as + :class:`~werkzeug.datastructures.CharsetAccept` object. + """ + return parse_accept_header(self.headers.get("Accept-Charset"), CharsetAccept) + + @cached_property + def accept_encodings(self) -> Accept: + """List of encodings this client accepts. Encodings in a HTTP term + are compression encodings such as gzip. For charsets have a look at + :attr:`accept_charset`. + """ + return parse_accept_header(self.headers.get("Accept-Encoding")) + + @cached_property + def accept_languages(self) -> LanguageAccept: + """List of languages this client accepts as + :class:`~werkzeug.datastructures.LanguageAccept` object. + + .. versionchanged 0.5 + In previous versions this was a regular + :class:`~werkzeug.datastructures.Accept` object. + """ + return parse_accept_header(self.headers.get("Accept-Language"), LanguageAccept) + + # ETag + + @cached_property + def cache_control(self) -> RequestCacheControl: + """A :class:`~werkzeug.datastructures.RequestCacheControl` object + for the incoming cache control headers. + """ + cache_control = self.headers.get("Cache-Control") + return parse_cache_control_header(cache_control, None, RequestCacheControl) + + @cached_property + def if_match(self) -> ETags: + """An object containing all the etags in the `If-Match` header. + + :rtype: :class:`~werkzeug.datastructures.ETags` + """ + return parse_etags(self.headers.get("If-Match")) + + @cached_property + def if_none_match(self) -> ETags: + """An object containing all the etags in the `If-None-Match` header. + + :rtype: :class:`~werkzeug.datastructures.ETags` + """ + return parse_etags(self.headers.get("If-None-Match")) + + @cached_property + def if_modified_since(self) -> t.Optional[datetime]: + """The parsed `If-Modified-Since` header as datetime object.""" + return parse_date(self.headers.get("If-Modified-Since")) + + @cached_property + def if_unmodified_since(self) -> t.Optional[datetime]: + """The parsed `If-Unmodified-Since` header as datetime object.""" + return parse_date(self.headers.get("If-Unmodified-Since")) + + @cached_property + def if_range(self) -> IfRange: + """The parsed `If-Range` header. + + .. versionadded:: 0.7 + + :rtype: :class:`~werkzeug.datastructures.IfRange` + """ + return parse_if_range_header(self.headers.get("If-Range")) + + @cached_property + def range(self) -> t.Optional[Range]: + """The parsed `Range` header. + + .. versionadded:: 0.7 + + :rtype: :class:`~werkzeug.datastructures.Range` + """ + return parse_range_header(self.headers.get("Range")) + + # User Agent + + @cached_property + def user_agent(self) -> UserAgent: + """The current user agent.""" + return UserAgent(self.headers.get("User-Agent", "")) # type: ignore + + # Authorization + + @cached_property + def authorization(self) -> t.Optional[Authorization]: + """The `Authorization` object in parsed form.""" + return parse_authorization_header(self.headers.get("Authorization")) + + # CORS + + origin = header_property[str]( + "Origin", + doc=( + "The host that the request originated from. Set" + " :attr:`~CORSResponseMixin.access_control_allow_origin` on" + " the response to indicate which origins are allowed." + ), + read_only=True, + ) + + access_control_request_headers = header_property( + "Access-Control-Request-Headers", + load_func=parse_set_header, + doc=( + "Sent with a preflight request to indicate which headers" + " will be sent with the cross origin request. Set" + " :attr:`~CORSResponseMixin.access_control_allow_headers`" + " on the response to indicate which headers are allowed." + ), + read_only=True, + ) + + access_control_request_method = header_property[str]( + "Access-Control-Request-Method", + doc=( + "Sent with a preflight request to indicate which method" + " will be used for the cross origin request. Set" + " :attr:`~CORSResponseMixin.access_control_allow_methods`" + " on the response to indicate which methods are allowed." + ), + read_only=True, + ) + + @property + def is_json(self) -> bool: + """Check if the mimetype indicates JSON data, either + :mimetype:`application/json` or :mimetype:`application/*+json`. + """ + mt = self.mimetype + return ( + mt == "application/json" + or mt.startswith("application/") + and mt.endswith("+json") + ) diff --git a/src/werkzeug/sansio/response.py b/src/werkzeug/sansio/response.py new file mode 100644 index 0000000000..186fcd91f4 --- /dev/null +++ b/src/werkzeug/sansio/response.py @@ -0,0 +1,630 @@ +import typing +import typing as t +from datetime import datetime +from datetime import timedelta + +from .._internal import _to_str +from ..datastructures import Headers +from ..http import dump_cookie +from ..http import HTTP_STATUS_CODES +from ..utils import get_content_type +from werkzeug.datastructures import CallbackDict +from werkzeug.datastructures import ContentRange +from werkzeug.datastructures import ResponseCacheControl +from werkzeug.datastructures import WWWAuthenticate +from werkzeug.http import COEP +from werkzeug.http import COOP +from werkzeug.http import dump_age +from werkzeug.http import dump_csp_header +from werkzeug.http import dump_header +from werkzeug.http import dump_options_header +from werkzeug.http import http_date +from werkzeug.http import parse_age +from werkzeug.http import parse_cache_control_header +from werkzeug.http import parse_content_range_header +from werkzeug.http import parse_csp_header +from werkzeug.http import parse_date +from werkzeug.http import parse_options_header +from werkzeug.http import parse_set_header +from werkzeug.http import parse_www_authenticate_header +from werkzeug.http import quote_etag +from werkzeug.http import unquote_etag +from werkzeug.utils import header_property + + +def _set_property(name: str, doc: t.Optional[str] = None) -> property: + def fget(self): + def on_update(header_set): + if not header_set and name in self.headers: + del self.headers[name] + elif header_set: + self.headers[name] = header_set.to_header() + + return parse_set_header(self.headers.get(name), on_update) + + def fset(self, value): + if not value: + del self.headers[name] + elif isinstance(value, str): + self.headers[name] = value + else: + self.headers[name] = dump_header(value) + + return property(fget, fset, doc=doc) + + +class Response: + """Represents the non-IO parts of an HTTP response, specifically the + status, and headers. + + This is best used if you aren't using WSGI, if you are the + :cls:`Response` class is the better choice. + + :param status: The status code for the response. Either an int, in + which case the default status message is added, or a string in + the form ``{code} {message}``, like ``404 Not Found``. Defaults + to 200. + :param headers: A :class:`~werkzeug.datastructures.Headers` object, + or a list of ``(key, value)`` tuples that will be converted to a + ``Headers`` object. + :param mimetype: The mime type (content type without charset or + other parameters) of the response. If the value starts with + ``text/`` (or matches some other special cases), the charset + will be added to create the ``content_type``. + :param content_type: The full content type of the response. + Overrides building the value from ``mimetype``. + + .. versionadded:: 2.0 + """ + + #: the charset of the response. + charset = "utf-8" + + #: the default status if none is provided. + default_status = 200 + + #: the default mimetype if none is provided. + default_mimetype = "text/plain" + + #: Warn if a cookie header exceeds this size. The default, 4093, should be + #: safely `supported by most browsers `_. A cookie larger than + #: this size will still be sent, but it may be ignored or handled + #: incorrectly by some browsers. Set to 0 to disable this check. + #: + #: .. versionadded:: 0.13 + #: + #: .. _`cookie`: http://browsercookielimits.squawky.net/ + max_cookie_size = 4093 + + # A :class:`Headers` object representing the response headers. + headers: Headers + + def __init__( + self, + status: t.Optional[t.Union[int, str]] = None, + headers: t.Optional[ + t.Union[ + t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]], + t.Iterable[t.Tuple[str, t.Union[str, int]]], + ] + ] = None, + mimetype: t.Optional[str] = None, + content_type: t.Optional[str] = None, + ) -> None: + if isinstance(headers, Headers): + self.headers = headers + elif not headers: + self.headers = Headers() + else: + self.headers = Headers(headers) + + if content_type is None: + if mimetype is None and "content-type" not in self.headers: + mimetype = self.default_mimetype + if mimetype is not None: + mimetype = get_content_type(mimetype, self.charset) + content_type = mimetype + if content_type is not None: + self.headers["Content-Type"] = content_type + if status is None: + status = self.default_status + self.status = status # type: ignore + + def __repr__(self) -> str: + return f"<{type(self).__name__} [{self.status}]>" + + @property + def status_code(self) -> int: + """The HTTP status code as a number.""" + return self._status_code + + @status_code.setter + def status_code(self, code: int) -> None: + self.status = code # type: ignore + + @property + def status(self) -> str: + """The HTTP status code as a string.""" + return self._status + + @status.setter + def status(self, value: t.Union[str, int]) -> None: + if not isinstance(value, (str, bytes, int)): + raise TypeError("Invalid status argument") + + self._status, self._status_code = self._clean_status(value) + + def _clean_status(self, value: t.Union[str, int]) -> t.Tuple[str, int]: + status = _to_str(value, self.charset) + split_status = status.split(None, 1) + + if len(split_status) == 0: + raise ValueError("Empty status argument") + + if len(split_status) > 1: + if split_status[0].isdigit(): + # code and message + return status, int(split_status[0]) + + # multi-word message + return f"0 {status}", 0 + + if split_status[0].isdigit(): + # code only + status_code = int(split_status[0]) + + try: + status = f"{status_code} {HTTP_STATUS_CODES[status_code].upper()}" + except KeyError: + status = f"{status_code} UNKNOWN" + + return status, status_code + + # one-word message + return f"0 {status}", 0 + + def set_cookie( + self, + key: str, + value: str = "", + max_age: t.Optional[t.Union[timedelta, int]] = None, + expires: t.Optional[t.Union[str, datetime, int, float]] = None, + path: t.Optional[str] = "/", + domain: t.Optional[str] = None, + secure: bool = False, + httponly: bool = False, + samesite: t.Optional[str] = None, + ) -> None: + """Sets a cookie. + + A warning is raised if the size of the cookie header exceeds + :attr:`max_cookie_size`, but the header will still be set. + + :param key: the key (name) of the cookie to be set. + :param value: the value of the cookie. + :param max_age: should be a number of seconds, or `None` (default) if + the cookie should last only as long as the client's + browser session. + :param expires: should be a `datetime` object or UNIX timestamp. + :param path: limits the cookie to a given path, per default it will + span the whole domain. + :param domain: if you want to set a cross-domain cookie. For example, + ``domain=".example.com"`` will set a cookie that is + readable by the domain ``www.example.com``, + ``foo.example.com`` etc. Otherwise, a cookie will only + be readable by the domain that set it. + :param secure: If ``True``, the cookie will only be available + via HTTPS. + :param httponly: Disallow JavaScript access to the cookie. + :param samesite: Limit the scope of the cookie to only be + attached to requests that are "same-site". + """ + self.headers.add( + "Set-Cookie", + dump_cookie( + key, + value=value, + max_age=max_age, + expires=expires, + path=path, + domain=domain, + secure=secure, + httponly=httponly, + charset=self.charset, + max_size=self.max_cookie_size, + samesite=samesite, + ), + ) + + def delete_cookie( + self, + key: str, + path: str = "/", + domain: t.Optional[str] = None, + secure: bool = False, + httponly: bool = False, + samesite: t.Optional[str] = None, + ): + """Delete a cookie. Fails silently if key doesn't exist. + + :param key: the key (name) of the cookie to be deleted. + :param path: if the cookie that should be deleted was limited to a + path, the path has to be defined here. + :param domain: if the cookie that should be deleted was limited to a + domain, that domain has to be defined here. + :param secure: If ``True``, the cookie will only be available + via HTTPS. + :param httponly: Disallow JavaScript access to the cookie. + :param samesite: Limit the scope of the cookie to only be + attached to requests that are "same-site". + """ + self.set_cookie( + key, + expires=0, + max_age=0, + path=path, + domain=domain, + secure=secure, + httponly=httponly, + samesite=samesite, + ) + + @property + def is_json(self) -> bool: + """Check if the mimetype indicates JSON data, either + :mimetype:`application/json` or :mimetype:`application/*+json`. + """ + mt = self.mimetype + return mt is not None and ( + mt == "application/json" + or mt.startswith("application/") + and mt.endswith("+json") + ) + + # Common Descriptors + + @property + def mimetype(self) -> t.Optional[str]: + """The mimetype (content type without charset etc.)""" + ct = self.headers.get("content-type") + + if ct: + return ct.split(";")[0].strip() + else: + return None + + @mimetype.setter + def mimetype(self, value: str) -> None: + self.headers["Content-Type"] = get_content_type(value, self.charset) + + @property + def mimetype_params(self) -> t.Dict[str, str]: + """The mimetype parameters as dict. For example if the + content type is ``text/html; charset=utf-8`` the params would be + ``{'charset': 'utf-8'}``. + + .. versionadded:: 0.5 + """ + + def on_update(d): + self.headers["Content-Type"] = dump_options_header(self.mimetype, d) + + d = parse_options_header(self.headers.get("content-type", ""))[1] + return CallbackDict(d, on_update) + + location = header_property[str]( + "Location", + doc="""The Location response-header field is used to redirect + the recipient to a location other than the Request-URI for + completion of the request or identification of a new + resource.""", + ) + age = header_property( + "Age", + None, + parse_age, + dump_age, # type: ignore + doc="""The Age response-header field conveys the sender's + estimate of the amount of time since the response (or its + revalidation) was generated at the origin server. + + Age values are non-negative decimal integers, representing time + in seconds.""", + ) + content_type = header_property[str]( + "Content-Type", + doc="""The Content-Type entity-header field indicates the media + type of the entity-body sent to the recipient or, in the case of + the HEAD method, the media type that would have been sent had + the request been a GET.""", + ) + content_length = header_property( + "Content-Length", + None, + int, + str, + doc="""The Content-Length entity-header field indicates the size + of the entity-body, in decimal number of OCTETs, sent to the + recipient or, in the case of the HEAD method, the size of the + entity-body that would have been sent had the request been a + GET.""", + ) + content_location = header_property[str]( + "Content-Location", + doc="""The Content-Location entity-header field MAY be used to + supply the resource location for the entity enclosed in the + message when that entity is accessible from a location separate + from the requested resource's URI.""", + ) + content_encoding = header_property[str]( + "Content-Encoding", + doc="""The Content-Encoding entity-header field is used as a + modifier to the media-type. When present, its value indicates + what additional content codings have been applied to the + entity-body, and thus what decoding mechanisms must be applied + in order to obtain the media-type referenced by the Content-Type + header field.""", + ) + content_md5 = header_property[str]( + "Content-MD5", + doc="""The Content-MD5 entity-header field, as defined in + RFC 1864, is an MD5 digest of the entity-body for the purpose of + providing an end-to-end message integrity check (MIC) of the + entity-body. (Note: a MIC is good for detecting accidental + modification of the entity-body in transit, but is not proof + against malicious attacks.)""", + ) + date = header_property( + "Date", + None, + parse_date, + http_date, + doc="""The Date general-header field represents the date and + time at which the message was originated, having the same + semantics as orig-date in RFC 822.""", + ) + expires = header_property( + "Expires", + None, + parse_date, + http_date, + doc="""The Expires entity-header field gives the date/time after + which the response is considered stale. A stale cache entry may + not normally be returned by a cache.""", + ) + last_modified = header_property( + "Last-Modified", + None, + parse_date, + http_date, + doc="""The Last-Modified entity-header field indicates the date + and time at which the origin server believes the variant was + last modified.""", + ) + + @property + def retry_after(self) -> t.Optional[datetime]: + """The Retry-After response-header field can be used with a + 503 (Service Unavailable) response to indicate how long the + service is expected to be unavailable to the requesting client. + + Time in seconds until expiration or date. + """ + value = self.headers.get("retry-after") + if value is None: + return None + elif value.isdigit(): + return datetime.utcnow() + timedelta(seconds=int(value)) + return parse_date(value) + + @retry_after.setter + def retry_after(self, value: t.Optional[t.Union[datetime, int, str]]) -> None: + if value is None: + if "retry-after" in self.headers: + del self.headers["retry-after"] + return + elif isinstance(value, datetime): + value = http_date(value) + else: + value = str(value) + self.headers["Retry-After"] = value + + vary = _set_property( + "Vary", + doc="""The Vary field value indicates the set of request-header + fields that fully determines, while the response is fresh, + whether a cache is permitted to use the response to reply to a + subsequent request without revalidation.""", + ) + content_language = _set_property( + "Content-Language", + doc="""The Content-Language entity-header field describes the + natural language(s) of the intended audience for the enclosed + entity. Note that this might not be equivalent to all the + languages used within the entity-body.""", + ) + allow = _set_property( + "Allow", + doc="""The Allow entity-header field lists the set of methods + supported by the resource identified by the Request-URI. The + purpose of this field is strictly to inform the recipient of + valid methods associated with the resource. An Allow header + field MUST be present in a 405 (Method Not Allowed) + response.""", + ) + + # ETag + + @property + def cache_control(self) -> ResponseCacheControl: + """The Cache-Control general-header field is used to specify + directives that MUST be obeyed by all caching mechanisms along the + request/response chain. + """ + + def on_update(cache_control): + if not cache_control and "cache-control" in self.headers: + del self.headers["cache-control"] + elif cache_control: + self.headers["Cache-Control"] = cache_control.to_header() + + return parse_cache_control_header( + self.headers.get("cache-control"), on_update, ResponseCacheControl + ) + + def set_etag(self, etag: str, weak: bool = False) -> None: + """Set the etag, and override the old one if there was one.""" + self.headers["ETag"] = quote_etag(etag, weak) + + def get_etag(self) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]: + """Return a tuple in the form ``(etag, is_weak)``. If there is no + ETag the return value is ``(None, None)``. + """ + return unquote_etag(self.headers.get("ETag")) + + accept_ranges = header_property[str]( + "Accept-Ranges", + doc="""The `Accept-Ranges` header. Even though the name would + indicate that multiple values are supported, it must be one + string token only. + + The values ``'bytes'`` and ``'none'`` are common. + + .. versionadded:: 0.7""", + ) + + @property + def content_range(self) -> ContentRange: + """The ``Content-Range`` header as a + :class:`~werkzeug.datastructures.ContentRange` object. Available + even if the header is not set. + + .. versionadded:: 0.7 + """ + + def on_update(rng: ContentRange) -> None: + if not rng: + del self.headers["content-range"] + else: + self.headers["Content-Range"] = rng.to_header() + + rv = parse_content_range_header(self.headers.get("content-range"), on_update) + # always provide a content range object to make the descriptor + # more user friendly. It provides an unset() method that can be + # used to remove the header quickly. + if rv is None: + rv = ContentRange(None, None, None, on_update=on_update) + return rv + + @content_range.setter + def content_range(self, value: t.Optional[t.Union[ContentRange, str]]) -> None: + if not value: + del self.headers["content-range"] + elif isinstance(value, str): + self.headers["Content-Range"] = value + else: + self.headers["Content-Range"] = value.to_header() + + # Authorization + + @property + def www_authenticate(self) -> WWWAuthenticate: + """The ``WWW-Authenticate`` header in a parsed form.""" + + def on_update(www_auth: WWWAuthenticate) -> None: + if not www_auth and "www-authenticate" in self.headers: + del self.headers["www-authenticate"] + elif www_auth: + self.headers["WWW-Authenticate"] = www_auth.to_header() + + header = self.headers.get("www-authenticate") + return parse_www_authenticate_header(header, on_update) + + # CSP + + content_security_policy = header_property( + "Content-Security-Policy", + None, + parse_csp_header, # type: ignore + dump_csp_header, + doc="""The Content-Security-Policy header adds an additional layer of + security to help detect and mitigate certain types of attacks.""", + ) + content_security_policy_report_only = header_property( + "Content-Security-Policy-Report-Only", + None, + parse_csp_header, # type: ignore + dump_csp_header, + doc="""The Content-Security-Policy-Report-Only header adds a csp policy + that is not enforced but is reported thereby helping detect + certain types of attacks.""", + ) + + # CORS + + @property + def access_control_allow_credentials(self) -> bool: + """Whether credentials can be shared by the browser to + JavaScript code. As part of the preflight request it indicates + whether credentials can be used on the cross origin request. + """ + return "Access-Control-Allow-Credentials" in self.headers + + @access_control_allow_credentials.setter + def access_control_allow_credentials(self, value: t.Optional[bool]) -> None: + if value is True: + self.headers["Access-Control-Allow-Credentials"] = "true" + else: + self.headers.pop("Access-Control-Allow-Credentials", None) + + access_control_allow_headers = header_property( + "Access-Control-Allow-Headers", + load_func=parse_set_header, + dump_func=dump_header, + doc="Which headers can be sent with the cross origin request.", + ) + + access_control_allow_methods = header_property( + "Access-Control-Allow-Methods", + load_func=parse_set_header, + dump_func=dump_header, + doc="Which methods can be used for the cross origin request.", + ) + + access_control_allow_origin = header_property[str]( + "Access-Control-Allow-Origin", + doc="The origin or '*' for any origin that may make cross origin requests.", + ) + + access_control_expose_headers = header_property( + "Access-Control-Expose-Headers", + load_func=parse_set_header, + dump_func=dump_header, + doc="Which headers can be shared by the browser to JavaScript code.", + ) + + access_control_max_age = header_property( + "Access-Control-Max-Age", + load_func=int, + dump_func=str, + doc="The maximum age in seconds the access control settings can be cached for.", + ) + + cross_origin_opener_policy = header_property[COOP]( + "Cross-Origin-Opener-Policy", + load_func=lambda value: COOP(value), + dump_func=lambda value: value.value, + default=COOP.UNSAFE_NONE, + doc="""Allows control over sharing of browsing context group with cross-origin + documents. Values must be a member of the :class:`werkzeug.http.COOP` enum.""", + ) + + cross_origin_embedder_policy = header_property[COEP]( + "Cross-Origin-Embedder-Policy", + load_func=lambda value: COEP(value), + dump_func=lambda value: value.value, + default=COEP.UNSAFE_NONE, + doc="""Prevents a document from loading any cross-origin resources that do not + explicitly grant the document permission. Values must be a member of the + :class:`werkzeug.http.COEP` enum.""", + ) diff --git a/src/werkzeug/wrappers/request.py b/src/werkzeug/wrappers/request.py index c2fafe2951..d56f8f5c73 100644 --- a/src/werkzeug/wrappers/request.py +++ b/src/werkzeug/wrappers/request.py @@ -2,47 +2,21 @@ import json import typing as t import warnings -from datetime import datetime from io import BytesIO -from .._internal import _to_str from .._internal import _wsgi_decoding_dance -from ..datastructures import Accept -from ..datastructures import Authorization -from ..datastructures import CharsetAccept from ..datastructures import CombinedMultiDict from ..datastructures import EnvironHeaders -from ..datastructures import ETags from ..datastructures import FileStorage -from ..datastructures import Headers -from ..datastructures import HeaderSet -from ..datastructures import IfRange -from ..datastructures import ImmutableList from ..datastructures import ImmutableMultiDict from ..datastructures import iter_multi_items -from ..datastructures import LanguageAccept -from ..datastructures import MIMEAccept from ..datastructures import MultiDict -from ..datastructures import Range -from ..datastructures import RequestCacheControl from ..formparser import default_stream_factory from ..formparser import FormDataParser -from ..http import parse_accept_header -from ..http import parse_authorization_header -from ..http import parse_cache_control_header -from ..http import parse_cookie -from ..http import parse_date -from ..http import parse_etags -from ..http import parse_if_range_header -from ..http import parse_list_header from ..http import parse_options_header -from ..http import parse_range_header -from ..http import parse_set_header -from ..urls import url_decode -from ..useragents import UserAgent +from ..sansio.request import Request as SansIORequest from ..utils import cached_property from ..utils import environ_property -from ..utils import header_property from ..wsgi import get_content_length from ..wsgi import get_current_url from ..wsgi import get_host @@ -54,390 +28,6 @@ from wsgiref.types import WSGIEnvironment -class SansIORequest: - """Represents the non-IO parts of a HTTP request, specifically the - headers, and method. - - This is best used if you aren't using WSGI, if you are the - :cls:`Request` class is the better choice. - - .. versionadded:: 2.0 - """ - - #: the charset for the request, defaults to utf-8 - charset = "utf-8" - - #: the error handling procedure for errors, defaults to 'replace' - encoding_errors = "replace" - - #: the class to use for `args` and `form`. The default is an - #: :class:`~werkzeug.datastructures.ImmutableMultiDict` which supports - #: multiple values per key. alternatively it makes sense to use an - #: :class:`~werkzeug.datastructures.ImmutableOrderedMultiDict` which - #: preserves order or a :class:`~werkzeug.datastructures.ImmutableDict` - #: which is the fastest but only remembers the last key. It is also - #: possible to use mutable structures, but this is not recommended. - #: - #: .. versionadded:: 0.6 - parameter_storage_class: t.Type[MultiDict] = ImmutableMultiDict - - #: The type to be used for dict values from the incoming WSGI - #: environment. (For example for :attr:`cookies`.) By default an - #: :class:`~werkzeug.datastructures.ImmutableMultiDict` is used. - #: - #: .. versionchanged:: 1.0.0 - #: Changed to ``ImmutableMultiDict`` to support multiple values. - #: - #: .. versionadded:: 0.6 - dict_storage_class: t.Type[MultiDict] = ImmutableMultiDict - - #: the type to be used for list values from the incoming WSGI environment. - #: By default an :class:`~werkzeug.datastructures.ImmutableList` is used - #: (for example for :attr:`access_list`). - #: - #: .. versionadded:: 0.6 - list_storage_class: t.Type[t.List] = ImmutableList - - def __init__( - self, - method: str, - path: str, - query_string: bytes, - headers: Headers, - scheme: str, - remote_addr: t.Optional[str], - ) -> None: - self.method = method.upper() - self.query_string = query_string - self.headers = headers - self.scheme = scheme - self.path = "/" + path.lstrip("/") - self.remote_addr = remote_addr - - def __repr__(self) -> str: - return f"<{type(self).__name__} {self.path} [{self.method}]>" - - @property - def url_charset(self) -> str: - """The charset that is assumed for URLs. Defaults to the value - of :attr:`charset`. - - .. versionadded:: 0.6 - """ - return self.charset - - @cached_property - def args(self) -> "MultiDict[str, str]": - """The parsed URL parameters (the part in the URL after the question - mark). - - By default an - :class:`~werkzeug.datastructures.ImmutableMultiDict` - is returned from this function. This can be changed by setting - :attr:`parameter_storage_class` to a different type. This might - be necessary if the order of the form data is important. - """ - return url_decode( - self.query_string, - self.url_charset, - errors=self.encoding_errors, - cls=self.parameter_storage_class, - ) - - @cached_property - def access_route(self) -> t.List[str]: - """If a forwarded header exists this is a list of all ip addresses - from the client ip to the last proxy server. - """ - if "X-Forwarded-For" in self.headers: - return self.list_storage_class( - parse_list_header(self.headers["X-Forwarded-For"]) - ) - elif self.remote_addr is not None: - return self.list_storage_class([self.remote_addr]) - return self.list_storage_class() - - @cached_property - def full_path(self) -> str: - """Requested path, including the query string.""" - return f"{self.path}?{_to_str(self.query_string, self.url_charset)}" - - @property - def is_secure(self) -> bool: - "`True` if the request is secure." - return self.scheme in {"https", "wss"} - - @cached_property - def cookies(self) -> "ImmutableMultiDict[str, str]": - """A :class:`dict` with the contents of all cookies transmitted with - the request.""" - return parse_cookie( # type: ignore - self.headers.get("Cookie"), - self.charset, - self.encoding_errors, - cls=self.dict_storage_class, - ) - - # Common Descriptors - - content_type = header_property[str]( - "Content-Type", - doc="""The Content-Type entity-header field indicates the media - type of the entity-body sent to the recipient or, in the case of - the HEAD method, the media type that would have been sent had - the request been a GET.""", - read_only=True, - ) - - @cached_property - def content_length(self) -> t.Optional[int]: - """The Content-Length entity-header field indicates the size of the - entity-body in bytes or, in the case of the HEAD method, the size of - the entity-body that would have been sent had the request been a - GET. - """ - return get_content_length(self.headers) - - content_encoding = header_property[str]( - "Content-Encoding", - doc="""The Content-Encoding entity-header field is used as a - modifier to the media-type. When present, its value indicates - what additional content codings have been applied to the - entity-body, and thus what decoding mechanisms must be applied - in order to obtain the media-type referenced by the Content-Type - header field. - - .. versionadded:: 0.9""", - read_only=True, - ) - content_md5 = header_property[str]( - "Content-MD5", - doc="""The Content-MD5 entity-header field, as defined in - RFC 1864, is an MD5 digest of the entity-body for the purpose of - providing an end-to-end message integrity check (MIC) of the - entity-body. (Note: a MIC is good for detecting accidental - modification of the entity-body in transit, but is not proof - against malicious attacks.) - - .. versionadded:: 0.9""", - read_only=True, - ) - referrer = header_property[str]( - "Referer", - doc="""The Referer[sic] request-header field allows the client - to specify, for the server's benefit, the address (URI) of the - resource from which the Request-URI was obtained (the - "referrer", although the header field is misspelled).""", - read_only=True, - ) - date = header_property( - "Date", - None, - parse_date, - doc="""The Date general-header field represents the date and - time at which the message was originated, having the same - semantics as orig-date in RFC 822.""", - read_only=True, - ) - max_forwards = header_property( - "Max-Forwards", - None, - int, - doc="""The Max-Forwards request-header field provides a - mechanism with the TRACE and OPTIONS methods to limit the number - of proxies or gateways that can forward the request to the next - inbound server.""", - read_only=True, - ) - - def _parse_content_type(self) -> None: - if not hasattr(self, "_parsed_content_type"): - self._parsed_content_type = parse_options_header( - self.headers.get("Content-Type", "") - ) - - @property - def mimetype(self) -> str: - """Like :attr:`content_type`, but without parameters (eg, without - charset, type etc.) and always lowercase. For example if the content - type is ``text/HTML; charset=utf-8`` the mimetype would be - ``'text/html'``. - """ - self._parse_content_type() - return self._parsed_content_type[0].lower() - - @property - def mimetype_params(self) -> t.Dict[str, str]: - """The mimetype parameters as dict. For example if the content - type is ``text/html; charset=utf-8`` the params would be - ``{'charset': 'utf-8'}``. - """ - self._parse_content_type() - return self._parsed_content_type[1] - - @cached_property - def pragma(self) -> HeaderSet: - """The Pragma general-header field is used to include - implementation-specific directives that might apply to any recipient - along the request/response chain. All pragma directives specify - optional behavior from the viewpoint of the protocol; however, some - systems MAY require that behavior be consistent with the directives. - """ - return parse_set_header(self.headers.get("Pragma", "")) - - # Accept - - @cached_property - def accept_mimetypes(self) -> MIMEAccept: - """List of mimetypes this client supports as - :class:`~werkzeug.datastructures.MIMEAccept` object. - """ - return parse_accept_header(self.headers.get("Accept"), MIMEAccept) - - @cached_property - def accept_charsets(self) -> CharsetAccept: - """List of charsets this client supports as - :class:`~werkzeug.datastructures.CharsetAccept` object. - """ - return parse_accept_header(self.headers.get("Accept-Charset"), CharsetAccept) - - @cached_property - def accept_encodings(self) -> Accept: - """List of encodings this client accepts. Encodings in a HTTP term - are compression encodings such as gzip. For charsets have a look at - :attr:`accept_charset`. - """ - return parse_accept_header(self.headers.get("Accept-Encoding")) - - @cached_property - def accept_languages(self) -> LanguageAccept: - """List of languages this client accepts as - :class:`~werkzeug.datastructures.LanguageAccept` object. - - .. versionchanged 0.5 - In previous versions this was a regular - :class:`~werkzeug.datastructures.Accept` object. - """ - return parse_accept_header(self.headers.get("Accept-Language"), LanguageAccept) - - # ETag - - @cached_property - def cache_control(self) -> RequestCacheControl: - """A :class:`~werkzeug.datastructures.RequestCacheControl` object - for the incoming cache control headers. - """ - cache_control = self.headers.get("Cache-Control") - return parse_cache_control_header(cache_control, None, RequestCacheControl) - - @cached_property - def if_match(self) -> ETags: - """An object containing all the etags in the `If-Match` header. - - :rtype: :class:`~werkzeug.datastructures.ETags` - """ - return parse_etags(self.headers.get("If-Match")) - - @cached_property - def if_none_match(self) -> ETags: - """An object containing all the etags in the `If-None-Match` header. - - :rtype: :class:`~werkzeug.datastructures.ETags` - """ - return parse_etags(self.headers.get("If-None-Match")) - - @cached_property - def if_modified_since(self) -> t.Optional[datetime]: - """The parsed `If-Modified-Since` header as datetime object.""" - return parse_date(self.headers.get("If-Modified-Since")) - - @cached_property - def if_unmodified_since(self) -> t.Optional[datetime]: - """The parsed `If-Unmodified-Since` header as datetime object.""" - return parse_date(self.headers.get("If-Unmodified-Since")) - - @cached_property - def if_range(self) -> IfRange: - """The parsed `If-Range` header. - - .. versionadded:: 0.7 - - :rtype: :class:`~werkzeug.datastructures.IfRange` - """ - return parse_if_range_header(self.headers.get("If-Range")) - - @cached_property - def range(self) -> t.Optional[Range]: - """The parsed `Range` header. - - .. versionadded:: 0.7 - - :rtype: :class:`~werkzeug.datastructures.Range` - """ - return parse_range_header(self.headers.get("Range")) - - # User Agent - - @cached_property - def user_agent(self) -> UserAgent: - """The current user agent.""" - return UserAgent(self.headers.get("User-Agent", "")) # type: ignore - - # Authorization - - @cached_property - def authorization(self) -> t.Optional[Authorization]: - """The `Authorization` object in parsed form.""" - return parse_authorization_header(self.headers.get("Authorization")) - - # CORS - - origin = header_property[str]( - "Origin", - doc=( - "The host that the request originated from. Set" - " :attr:`~CORSResponseMixin.access_control_allow_origin` on" - " the response to indicate which origins are allowed." - ), - read_only=True, - ) - - access_control_request_headers = header_property( - "Access-Control-Request-Headers", - load_func=parse_set_header, - doc=( - "Sent with a preflight request to indicate which headers" - " will be sent with the cross origin request. Set" - " :attr:`~CORSResponseMixin.access_control_allow_headers`" - " on the response to indicate which headers are allowed." - ), - read_only=True, - ) - - access_control_request_method = header_property[str]( - "Access-Control-Request-Method", - doc=( - "Sent with a preflight request to indicate which method" - " will be used for the cross origin request. Set" - " :attr:`~CORSResponseMixin.access_control_allow_methods`" - " on the response to indicate which methods are allowed." - ), - read_only=True, - ) - - @property - def is_json(self) -> bool: - """Check if the mimetype indicates JSON data, either - :mimetype:`application/json` or :mimetype:`application/*+json`. - """ - mt = self.mimetype - return ( - mt == "application/json" - or mt.startswith("application/") - and mt.endswith("+json") - ) - - class Request(SansIORequest): """Represents an incoming WSGI HTTP request, with headers and body taken from the WSGI environment. Has properties and methods for diff --git a/src/werkzeug/wrappers/response.py b/src/werkzeug/wrappers/response.py index 98d38582eb..8bf53bf1c1 100644 --- a/src/werkzeug/wrappers/response.py +++ b/src/werkzeug/wrappers/response.py @@ -2,48 +2,22 @@ import typing import typing as t import warnings -from datetime import datetime -from datetime import timedelta from .._internal import _to_bytes -from .._internal import _to_str from ..datastructures import Headers -from ..http import dump_cookie -from ..http import HTTP_STATUS_CODES from ..http import remove_entity_headers +from ..sansio.response import Response as SansIOResponse from ..urls import iri_to_uri from ..urls import url_join from ..utils import cached_property -from ..utils import get_content_type from ..wsgi import ClosingIterator from ..wsgi import get_current_url from werkzeug._internal import _get_environ -from werkzeug.datastructures import CallbackDict -from werkzeug.datastructures import ContentRange -from werkzeug.datastructures import ResponseCacheControl -from werkzeug.datastructures import WWWAuthenticate -from werkzeug.http import COEP -from werkzeug.http import COOP -from werkzeug.http import dump_age -from werkzeug.http import dump_csp_header -from werkzeug.http import dump_header -from werkzeug.http import dump_options_header from werkzeug.http import generate_etag from werkzeug.http import http_date from werkzeug.http import is_resource_modified -from werkzeug.http import parse_age -from werkzeug.http import parse_cache_control_header -from werkzeug.http import parse_content_range_header -from werkzeug.http import parse_csp_header -from werkzeug.http import parse_date from werkzeug.http import parse_etags -from werkzeug.http import parse_options_header from werkzeug.http import parse_range_header -from werkzeug.http import parse_set_header -from werkzeug.http import parse_www_authenticate_header -from werkzeug.http import quote_etag -from werkzeug.http import unquote_etag -from werkzeug.utils import header_property from werkzeug.wsgi import _RangeWrapper if t.TYPE_CHECKING: @@ -87,604 +61,6 @@ def _clean_accept_ranges(accept_ranges: t.Union[bool, str]) -> str: raise ValueError("Invalid accept_ranges value") -def _set_property(name: str, doc: t.Optional[str] = None) -> property: - def fget(self): - def on_update(header_set): - if not header_set and name in self.headers: - del self.headers[name] - elif header_set: - self.headers[name] = header_set.to_header() - - return parse_set_header(self.headers.get(name), on_update) - - def fset(self, value): - if not value: - del self.headers[name] - elif isinstance(value, str): - self.headers[name] = value - else: - self.headers[name] = dump_header(value) - - return property(fget, fset, doc=doc) - - -class SansIOResponse: - """Represents the non-IO parts of an HTTP response, specifically the - status, and headers. - - This is best used if you aren't using WSGI, if you are the - :cls:`Response` class is the better choice. - - :param status: The status code for the response. Either an int, in - which case the default status message is added, or a string in - the form ``{code} {message}``, like ``404 Not Found``. Defaults - to 200. - :param headers: A :class:`~werkzeug.datastructures.Headers` object, - or a list of ``(key, value)`` tuples that will be converted to a - ``Headers`` object. - :param mimetype: The mime type (content type without charset or - other parameters) of the response. If the value starts with - ``text/`` (or matches some other special cases), the charset - will be added to create the ``content_type``. - :param content_type: The full content type of the response. - Overrides building the value from ``mimetype``. - - .. versionadded:: 2.0 - """ - - #: the charset of the response. - charset = "utf-8" - - #: the default status if none is provided. - default_status = 200 - - #: the default mimetype if none is provided. - default_mimetype = "text/plain" - - #: Warn if a cookie header exceeds this size. The default, 4093, should be - #: safely `supported by most browsers `_. A cookie larger than - #: this size will still be sent, but it may be ignored or handled - #: incorrectly by some browsers. Set to 0 to disable this check. - #: - #: .. versionadded:: 0.13 - #: - #: .. _`cookie`: http://browsercookielimits.squawky.net/ - max_cookie_size = 4093 - - # A :class:`Headers` object representing the response headers. - headers: Headers - - def __init__( - self, - status: t.Optional[t.Union[int, str]] = None, - headers: t.Optional[ - t.Union[ - t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]], - t.Iterable[t.Tuple[str, t.Union[str, int]]], - ] - ] = None, - mimetype: t.Optional[str] = None, - content_type: t.Optional[str] = None, - ) -> None: - if isinstance(headers, Headers): - self.headers = headers - elif not headers: - self.headers = Headers() - else: - self.headers = Headers(headers) - - if content_type is None: - if mimetype is None and "content-type" not in self.headers: - mimetype = self.default_mimetype - if mimetype is not None: - mimetype = get_content_type(mimetype, self.charset) - content_type = mimetype - if content_type is not None: - self.headers["Content-Type"] = content_type - if status is None: - status = self.default_status - self.status = status # type: ignore - - def __repr__(self) -> str: - return f"<{type(self).__name__} [{self.status}]>" - - @property - def status_code(self) -> int: - """The HTTP status code as a number.""" - return self._status_code - - @status_code.setter - def status_code(self, code: int) -> None: - self.status = code # type: ignore - - @property - def status(self) -> str: - """The HTTP status code as a string.""" - return self._status - - @status.setter - def status(self, value: t.Union[str, int]) -> None: - if not isinstance(value, (str, bytes, int)): - raise TypeError("Invalid status argument") - - self._status, self._status_code = self._clean_status(value) - - def _clean_status(self, value: t.Union[str, int]) -> t.Tuple[str, int]: - status = _to_str(value, self.charset) - split_status = status.split(None, 1) - - if len(split_status) == 0: - raise ValueError("Empty status argument") - - if len(split_status) > 1: - if split_status[0].isdigit(): - # code and message - return status, int(split_status[0]) - - # multi-word message - return f"0 {status}", 0 - - if split_status[0].isdigit(): - # code only - status_code = int(split_status[0]) - - try: - status = f"{status_code} {HTTP_STATUS_CODES[status_code].upper()}" - except KeyError: - status = f"{status_code} UNKNOWN" - - return status, status_code - - # one-word message - return f"0 {status}", 0 - - def set_cookie( - self, - key: str, - value: str = "", - max_age: t.Optional[t.Union[timedelta, int]] = None, - expires: t.Optional[t.Union[str, datetime, int, float]] = None, - path: t.Optional[str] = "/", - domain: t.Optional[str] = None, - secure: bool = False, - httponly: bool = False, - samesite: t.Optional[str] = None, - ) -> None: - """Sets a cookie. - - A warning is raised if the size of the cookie header exceeds - :attr:`max_cookie_size`, but the header will still be set. - - :param key: the key (name) of the cookie to be set. - :param value: the value of the cookie. - :param max_age: should be a number of seconds, or `None` (default) if - the cookie should last only as long as the client's - browser session. - :param expires: should be a `datetime` object or UNIX timestamp. - :param path: limits the cookie to a given path, per default it will - span the whole domain. - :param domain: if you want to set a cross-domain cookie. For example, - ``domain=".example.com"`` will set a cookie that is - readable by the domain ``www.example.com``, - ``foo.example.com`` etc. Otherwise, a cookie will only - be readable by the domain that set it. - :param secure: If ``True``, the cookie will only be available - via HTTPS. - :param httponly: Disallow JavaScript access to the cookie. - :param samesite: Limit the scope of the cookie to only be - attached to requests that are "same-site". - """ - self.headers.add( - "Set-Cookie", - dump_cookie( - key, - value=value, - max_age=max_age, - expires=expires, - path=path, - domain=domain, - secure=secure, - httponly=httponly, - charset=self.charset, - max_size=self.max_cookie_size, - samesite=samesite, - ), - ) - - def delete_cookie( - self, - key: str, - path: str = "/", - domain: t.Optional[str] = None, - secure: bool = False, - httponly: bool = False, - samesite: t.Optional[str] = None, - ): - """Delete a cookie. Fails silently if key doesn't exist. - - :param key: the key (name) of the cookie to be deleted. - :param path: if the cookie that should be deleted was limited to a - path, the path has to be defined here. - :param domain: if the cookie that should be deleted was limited to a - domain, that domain has to be defined here. - :param secure: If ``True``, the cookie will only be available - via HTTPS. - :param httponly: Disallow JavaScript access to the cookie. - :param samesite: Limit the scope of the cookie to only be - attached to requests that are "same-site". - """ - self.set_cookie( - key, - expires=0, - max_age=0, - path=path, - domain=domain, - secure=secure, - httponly=httponly, - samesite=samesite, - ) - - @property - def is_json(self) -> bool: - """Check if the mimetype indicates JSON data, either - :mimetype:`application/json` or :mimetype:`application/*+json`. - """ - mt = self.mimetype - return mt is not None and ( - mt == "application/json" - or mt.startswith("application/") - and mt.endswith("+json") - ) - - # Common Descriptors - - @property - def mimetype(self) -> t.Optional[str]: - """The mimetype (content type without charset etc.)""" - ct = self.headers.get("content-type") - - if ct: - return ct.split(";")[0].strip() - else: - return None - - @mimetype.setter - def mimetype(self, value: str) -> None: - self.headers["Content-Type"] = get_content_type(value, self.charset) - - @property - def mimetype_params(self) -> t.Dict[str, str]: - """The mimetype parameters as dict. For example if the - content type is ``text/html; charset=utf-8`` the params would be - ``{'charset': 'utf-8'}``. - - .. versionadded:: 0.5 - """ - - def on_update(d): - self.headers["Content-Type"] = dump_options_header(self.mimetype, d) - - d = parse_options_header(self.headers.get("content-type", ""))[1] - return CallbackDict(d, on_update) - - location = header_property[str]( - "Location", - doc="""The Location response-header field is used to redirect - the recipient to a location other than the Request-URI for - completion of the request or identification of a new - resource.""", - ) - age = header_property( - "Age", - None, - parse_age, - dump_age, # type: ignore - doc="""The Age response-header field conveys the sender's - estimate of the amount of time since the response (or its - revalidation) was generated at the origin server. - - Age values are non-negative decimal integers, representing time - in seconds.""", - ) - content_type = header_property[str]( - "Content-Type", - doc="""The Content-Type entity-header field indicates the media - type of the entity-body sent to the recipient or, in the case of - the HEAD method, the media type that would have been sent had - the request been a GET.""", - ) - content_length = header_property( - "Content-Length", - None, - int, - str, - doc="""The Content-Length entity-header field indicates the size - of the entity-body, in decimal number of OCTETs, sent to the - recipient or, in the case of the HEAD method, the size of the - entity-body that would have been sent had the request been a - GET.""", - ) - content_location = header_property[str]( - "Content-Location", - doc="""The Content-Location entity-header field MAY be used to - supply the resource location for the entity enclosed in the - message when that entity is accessible from a location separate - from the requested resource's URI.""", - ) - content_encoding = header_property[str]( - "Content-Encoding", - doc="""The Content-Encoding entity-header field is used as a - modifier to the media-type. When present, its value indicates - what additional content codings have been applied to the - entity-body, and thus what decoding mechanisms must be applied - in order to obtain the media-type referenced by the Content-Type - header field.""", - ) - content_md5 = header_property[str]( - "Content-MD5", - doc="""The Content-MD5 entity-header field, as defined in - RFC 1864, is an MD5 digest of the entity-body for the purpose of - providing an end-to-end message integrity check (MIC) of the - entity-body. (Note: a MIC is good for detecting accidental - modification of the entity-body in transit, but is not proof - against malicious attacks.)""", - ) - date = header_property( - "Date", - None, - parse_date, - http_date, - doc="""The Date general-header field represents the date and - time at which the message was originated, having the same - semantics as orig-date in RFC 822.""", - ) - expires = header_property( - "Expires", - None, - parse_date, - http_date, - doc="""The Expires entity-header field gives the date/time after - which the response is considered stale. A stale cache entry may - not normally be returned by a cache.""", - ) - last_modified = header_property( - "Last-Modified", - None, - parse_date, - http_date, - doc="""The Last-Modified entity-header field indicates the date - and time at which the origin server believes the variant was - last modified.""", - ) - - @property - def retry_after(self) -> t.Optional[datetime]: - """The Retry-After response-header field can be used with a - 503 (Service Unavailable) response to indicate how long the - service is expected to be unavailable to the requesting client. - - Time in seconds until expiration or date. - """ - value = self.headers.get("retry-after") - if value is None: - return None - elif value.isdigit(): - return datetime.utcnow() + timedelta(seconds=int(value)) - return parse_date(value) - - @retry_after.setter - def retry_after(self, value: t.Optional[t.Union[datetime, int, str]]) -> None: - if value is None: - if "retry-after" in self.headers: - del self.headers["retry-after"] - return - elif isinstance(value, datetime): - value = http_date(value) - else: - value = str(value) - self.headers["Retry-After"] = value - - vary = _set_property( - "Vary", - doc="""The Vary field value indicates the set of request-header - fields that fully determines, while the response is fresh, - whether a cache is permitted to use the response to reply to a - subsequent request without revalidation.""", - ) - content_language = _set_property( - "Content-Language", - doc="""The Content-Language entity-header field describes the - natural language(s) of the intended audience for the enclosed - entity. Note that this might not be equivalent to all the - languages used within the entity-body.""", - ) - allow = _set_property( - "Allow", - doc="""The Allow entity-header field lists the set of methods - supported by the resource identified by the Request-URI. The - purpose of this field is strictly to inform the recipient of - valid methods associated with the resource. An Allow header - field MUST be present in a 405 (Method Not Allowed) - response.""", - ) - - # ETag - - @property - def cache_control(self) -> ResponseCacheControl: - """The Cache-Control general-header field is used to specify - directives that MUST be obeyed by all caching mechanisms along the - request/response chain. - """ - - def on_update(cache_control): - if not cache_control and "cache-control" in self.headers: - del self.headers["cache-control"] - elif cache_control: - self.headers["Cache-Control"] = cache_control.to_header() - - return parse_cache_control_header( - self.headers.get("cache-control"), on_update, ResponseCacheControl - ) - - def set_etag(self, etag: str, weak: bool = False) -> None: - """Set the etag, and override the old one if there was one.""" - self.headers["ETag"] = quote_etag(etag, weak) - - def get_etag(self) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]: - """Return a tuple in the form ``(etag, is_weak)``. If there is no - ETag the return value is ``(None, None)``. - """ - return unquote_etag(self.headers.get("ETag")) - - accept_ranges = header_property[str]( - "Accept-Ranges", - doc="""The `Accept-Ranges` header. Even though the name would - indicate that multiple values are supported, it must be one - string token only. - - The values ``'bytes'`` and ``'none'`` are common. - - .. versionadded:: 0.7""", - ) - - @property - def content_range(self) -> ContentRange: - """The ``Content-Range`` header as a - :class:`~werkzeug.datastructures.ContentRange` object. Available - even if the header is not set. - - .. versionadded:: 0.7 - """ - - def on_update(rng: ContentRange) -> None: - if not rng: - del self.headers["content-range"] - else: - self.headers["Content-Range"] = rng.to_header() - - rv = parse_content_range_header(self.headers.get("content-range"), on_update) - # always provide a content range object to make the descriptor - # more user friendly. It provides an unset() method that can be - # used to remove the header quickly. - if rv is None: - rv = ContentRange(None, None, None, on_update=on_update) - return rv - - @content_range.setter - def content_range(self, value: t.Optional[t.Union[ContentRange, str]]) -> None: - if not value: - del self.headers["content-range"] - elif isinstance(value, str): - self.headers["Content-Range"] = value - else: - self.headers["Content-Range"] = value.to_header() - - # Authorization - - @property - def www_authenticate(self) -> WWWAuthenticate: - """The ``WWW-Authenticate`` header in a parsed form.""" - - def on_update(www_auth: WWWAuthenticate) -> None: - if not www_auth and "www-authenticate" in self.headers: - del self.headers["www-authenticate"] - elif www_auth: - self.headers["WWW-Authenticate"] = www_auth.to_header() - - header = self.headers.get("www-authenticate") - return parse_www_authenticate_header(header, on_update) - - # CSP - - content_security_policy = header_property( - "Content-Security-Policy", - None, - parse_csp_header, # type: ignore - dump_csp_header, - doc="""The Content-Security-Policy header adds an additional layer of - security to help detect and mitigate certain types of attacks.""", - ) - content_security_policy_report_only = header_property( - "Content-Security-Policy-Report-Only", - None, - parse_csp_header, # type: ignore - dump_csp_header, - doc="""The Content-Security-Policy-Report-Only header adds a csp policy - that is not enforced but is reported thereby helping detect - certain types of attacks.""", - ) - - # CORS - - @property - def access_control_allow_credentials(self) -> bool: - """Whether credentials can be shared by the browser to - JavaScript code. As part of the preflight request it indicates - whether credentials can be used on the cross origin request. - """ - return "Access-Control-Allow-Credentials" in self.headers - - @access_control_allow_credentials.setter - def access_control_allow_credentials(self, value: t.Optional[bool]) -> None: - if value is True: - self.headers["Access-Control-Allow-Credentials"] = "true" - else: - self.headers.pop("Access-Control-Allow-Credentials", None) - - access_control_allow_headers = header_property( - "Access-Control-Allow-Headers", - load_func=parse_set_header, - dump_func=dump_header, - doc="Which headers can be sent with the cross origin request.", - ) - - access_control_allow_methods = header_property( - "Access-Control-Allow-Methods", - load_func=parse_set_header, - dump_func=dump_header, - doc="Which methods can be used for the cross origin request.", - ) - - access_control_allow_origin = header_property[str]( - "Access-Control-Allow-Origin", - doc="The origin or '*' for any origin that may make cross origin requests.", - ) - - access_control_expose_headers = header_property( - "Access-Control-Expose-Headers", - load_func=parse_set_header, - dump_func=dump_header, - doc="Which headers can be shared by the browser to JavaScript code.", - ) - - access_control_max_age = header_property( - "Access-Control-Max-Age", - load_func=int, - dump_func=str, - doc="The maximum age in seconds the access control settings can be cached for.", - ) - - cross_origin_opener_policy = header_property[COOP]( - "Cross-Origin-Opener-Policy", - load_func=lambda value: COOP(value), - dump_func=lambda value: value.value, - default=COOP.UNSAFE_NONE, - doc="""Allows control over sharing of browsing context group with cross-origin - documents. Values must be a member of the :class:`werkzeug.http.COOP` enum.""", - ) - - cross_origin_embedder_policy = header_property[COEP]( - "Cross-Origin-Embedder-Policy", - load_func=lambda value: COEP(value), - dump_func=lambda value: value.value, - default=COEP.UNSAFE_NONE, - doc="""Prevents a document from loading any cross-origin resources that do not - explicitly grant the document permission. Values must be a member of the - :class:`werkzeug.http.COEP` enum.""", - ) - - class Response(SansIOResponse): """Represents an outgoing WSGI HTTP response with body, status, and headers. Has properties and methods for using the functionality