Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add typings to commonly used APIs #1333

Merged
merged 11 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/simple/simple_ext1/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ParameterHandler(ExtensionHandlerMixin, JupyterHandler):

def get(self, matched_part=None, *args, **kwargs):
"""Handle a get with parameters."""
var1 = self.get_argument("var1", default=None)
var1 = self.get_argument("var1", default="")
components = [x for x in self.request.path.split("/") if x]
self.write("<h1>Hello Simple App 1 from Handler.</h1>")
self.write(f"<p>matched_part: {url_escape(matched_part)}</p>")
Expand Down
2 changes: 1 addition & 1 deletion examples/simple/simple_ext2/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class ParameterHandler(ExtensionHandlerMixin, JupyterHandler):

def get(self, matched_part=None, *args, **kwargs):
"""Get a parameterized response."""
var1 = self.get_argument("var1", default=None)
var1 = self.get_argument("var1", default="")
components = [x for x in self.request.path.split("/") if x]
self.write("<h1>Hello Simple App 2 from Handler.</h1>")
self.write(f"<p>matched_part: {url_escape(matched_part)}</p>")
Expand Down
11 changes: 7 additions & 4 deletions jupyter_server/_tz.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from __future__ import annotations

from datetime import datetime, timedelta, tzinfo
from typing import Callable

# constant for zero offset
ZERO = timedelta(0)
Expand All @@ -14,19 +17,19 @@
class tzUTC(tzinfo): # noqa
"""tzinfo object for UTC (zero offset)"""

def utcoffset(self, d):
def utcoffset(self, d: datetime | None) -> timedelta:
"""Compute utcoffset."""
return ZERO

def dst(self, d):
def dst(self, d: datetime | None) -> timedelta:
"""Compute dst."""
return ZERO


UTC = tzUTC() # type:ignore[abstract]


def utc_aware(unaware):
def utc_aware(unaware: Callable[..., datetime]) -> Callable[..., datetime]:
"""decorator for adding UTC tzinfo to datetime's utcfoo methods"""

def utc_method(*args, **kwargs):
Expand All @@ -40,7 +43,7 @@ def utc_method(*args, **kwargs):
utcnow = utc_aware(datetime.utcnow)


def isoformat(dt):
def isoformat(dt: datetime) -> str:
"""Return iso-formatted timestamp

Like .isoformat(), but uses Z for UTC instead of +00:00
Expand Down
10 changes: 6 additions & 4 deletions jupyter_server/auth/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from functools import wraps
from typing import Callable, Optional, Union
from typing import Any, Callable, Optional, TypeVar, Union, cast

from tornado.log import app_log
from tornado.web import HTTPError

from .utils import HTTP_METHOD_TO_AUTH_ACTION

FuncT = TypeVar("FuncT", bound=Callable[..., Any])


def authorized(
action: Optional[Union[str, Callable]] = None,
action: Optional[Union[str, FuncT]] = None,
resource: Optional[str] = None,
message: Optional[str] = None,
) -> Callable:
) -> FuncT:
"""A decorator for tornado.web.RequestHandler methods
that verifies whether the current user is authorized
to make the following request.
Expand Down Expand Up @@ -73,4 +75,4 @@ def inner(self, *args, **kwargs):
# no-arguments `@authorized` decorator called
return wrapper(method)

return wrapper
return cast(FuncT, wrapper)
18 changes: 9 additions & 9 deletions jupyter_server/auth/identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

# circular imports for type checking
if TYPE_CHECKING:
from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.base.handlers import AuthenticatedHandler, JupyterHandler
from jupyter_server.serverapp import ServerApp

_non_alphanum = re.compile(r"[^A-Za-z0-9]")
Expand Down Expand Up @@ -321,7 +321,7 @@ def user_from_cookie(self, cookie_value: str) -> User | None:
user["color"],
)

def get_cookie_name(self, handler: JupyterHandler) -> str:
def get_cookie_name(self, handler: AuthenticatedHandler) -> str:
"""Return the login cookie name

Uses IdentityProvider.cookie_name, if defined.
Expand All @@ -333,7 +333,7 @@ def get_cookie_name(self, handler: JupyterHandler) -> str:
else:
return _non_alphanum.sub("-", f"username-{handler.request.host}")

def set_login_cookie(self, handler: JupyterHandler, user: User) -> None:
def set_login_cookie(self, handler: AuthenticatedHandler, user: User) -> None:
"""Call this on handlers to set the login cookie for success"""
cookie_options = {}
cookie_options.update(self.cookie_options)
Expand All @@ -350,7 +350,7 @@ def set_login_cookie(self, handler: JupyterHandler, user: User) -> None:
handler.set_secure_cookie(cookie_name, self.user_to_cookie(user), **cookie_options)

def _force_clear_cookie(
self, handler: JupyterHandler, name: str, path: str = "/", domain: str | None = None
self, handler: AuthenticatedHandler, name: str, path: str = "/", domain: str | None = None
) -> None:
"""Deletes the cookie with the given name.

Expand All @@ -376,7 +376,7 @@ def _force_clear_cookie(
morsel["domain"] = domain
handler.add_header("Set-Cookie", morsel.OutputString())

def clear_login_cookie(self, handler: JupyterHandler) -> None:
def clear_login_cookie(self, handler: AuthenticatedHandler) -> None:
"""Clear the login cookie, effectively logging out the session."""
cookie_options = {}
cookie_options.update(self.cookie_options)
Expand Down Expand Up @@ -478,7 +478,7 @@ def generate_anonymous_user(self, handler: JupyterHandler) -> User:
handler.log.debug(f"Generating new user for token-authenticated request: {user_id}")
return User(user_id, name, display_name, initials, None, color)

def should_check_origin(self, handler: JupyterHandler) -> bool:
def should_check_origin(self, handler: AuthenticatedHandler) -> bool:
"""Should the Handler check for CORS origin validation?

Origin check should be skipped for token-authenticated requests.
Expand All @@ -489,7 +489,7 @@ def should_check_origin(self, handler: JupyterHandler) -> bool:
"""
return not self.is_token_authenticated(handler)

def is_token_authenticated(self, handler: JupyterHandler) -> bool:
def is_token_authenticated(self, handler: AuthenticatedHandler) -> bool:
"""Returns True if handler has been token authenticated. Otherwise, False.

Login with a token is used to signal certain things, such as:
Expand Down Expand Up @@ -713,11 +713,11 @@ def login_available(self):
self.settings
)

def should_check_origin(self, handler: JupyterHandler) -> bool:
def should_check_origin(self, handler: AuthenticatedHandler) -> bool:
"""Whether we should check origin."""
return self.login_handler_class.should_check_origin(handler) # type:ignore[attr-defined]

def is_token_authenticated(self, handler: JupyterHandler) -> bool:
def is_token_authenticated(self, handler: AuthenticatedHandler) -> bool:
"""Whether we are token authenticated."""
return self.login_handler_class.is_token_authenticated(handler) # type:ignore[attr-defined]

Expand Down
7 changes: 4 additions & 3 deletions jupyter_server/auth/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ def post(self):
if new_password and getattr(self.identity_provider, "allow_password_change", False):
config_dir = self.settings.get("config_dir", "")
config_file = os.path.join(config_dir, "jupyter_server_config.json")
self.identity_provider.hashed_password = self.settings[
"password"
] = set_password(new_password, config_file=config_file)
if hasattr(self.identity_provider, "hashed_password"):
self.identity_provider.hashed_password = self.settings[
"password"
] = set_password(new_password, config_file=config_file)
self.log.info("Wrote hashed password to %s" % config_file)
else:
self.set_status(401)
Expand Down
Loading
Loading