Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Improve UsernamePickerTestCase (#9112)
Browse files Browse the repository at this point in the history
* make the OIDC bits of the test work at a higher level - via the REST api instead of poking the OIDCHandler directly.
* Move it to test_login.py, where I think it fits better.
  • Loading branch information
richvdh authored Jan 15, 2021
1 parent 4575ad0 commit 0dd2649
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 125 deletions.
1 change: 1 addition & 0 deletions changelog.d/9112.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve `UsernamePickerTestCase`.
120 changes: 2 additions & 118 deletions tests/handlers/test_oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from typing import Dict, Optional
from urllib.parse import parse_qs, urlencode, urlparse
from typing import Optional
from urllib.parse import parse_qs, urlparse

from mock import ANY, Mock, patch

import pymacaroons

from twisted.web.resource import Resource

from synapse.api.errors import RedirectException
from synapse.handlers.sso import MappingException
from synapse.rest.client.v1 import login
from synapse.rest.synapse.client.pick_username import pick_username_resource
from synapse.server import HomeServer
from synapse.types import UserID

Expand Down Expand Up @@ -856,116 +850,6 @@ def _generate_oidc_session_token(
)


class UsernamePickerTestCase(HomeserverTestCase):
if not HAS_OIDC:
skip = "requires OIDC"

servlets = [login.register_servlets]

def default_config(self):
config = super().default_config()
config["public_baseurl"] = BASE_URL
oidc_config = {
"enabled": True,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"issuer": ISSUER,
"scopes": SCOPES,
"user_mapping_provider": {
"config": {"display_name_template": "{{ user.displayname }}"}
},
}

# Update this config with what's in the default config so that
# override_config works as expected.
oidc_config.update(config.get("oidc_config", {}))
config["oidc_config"] = oidc_config

# whitelist this client URI so we redirect straight to it rather than
# serving a confirmation page
config["sso"] = {"client_whitelist": ["https://whitelisted.client"]}
return config

def create_resource_dict(self) -> Dict[str, Resource]:
d = super().create_resource_dict()
d["/_synapse/client/pick_username"] = pick_username_resource(self.hs)
return d

def test_username_picker(self):
"""Test the happy path of a username picker flow."""
client_redirect_url = "https://whitelisted.client"

# first of all, mock up an OIDC callback to the OidcHandler, which should
# raise a RedirectException
userinfo = {"sub": "tester", "displayname": "Jonny"}
f = self.get_failure(
_make_callback_with_userinfo(
self.hs, userinfo, client_redirect_url=client_redirect_url
),
RedirectException,
)

# check the Location and cookies returned by the RedirectException
self.assertEqual(f.value.location, b"/_synapse/client/pick_username")
cookieheader = f.value.cookies[0]
regex = re.compile(b"^username_mapping_session=([a-zA-Z]+);")
m = regex.search(cookieheader)
if not m:
self.fail("cookie header %s does not match %s" % (cookieheader, regex))

# introspect the sso handler a bit to check that the username mapping session
# looks ok.
session_id = m.group(1).decode("ascii")
username_mapping_sessions = self.hs.get_sso_handler()._username_mapping_sessions
self.assertIn(
session_id, username_mapping_sessions, "session id not found in map"
)
session = username_mapping_sessions[session_id]
self.assertEqual(session.remote_user_id, "tester")
self.assertEqual(session.display_name, "Jonny")
self.assertEqual(session.client_redirect_url, client_redirect_url)

# the expiry time should be about 15 minutes away
expected_expiry = self.clock.time_msec() + (15 * 60 * 1000)
self.assertApproximates(session.expiry_time_ms, expected_expiry, tolerance=1000)

# Now, submit a username to the username picker, which should serve a redirect
# back to the client
submit_path = f.value.location + b"/submit"
content = urlencode({b"username": b"bobby"}).encode("utf8")
chan = self.make_request(
"POST",
path=submit_path,
content=content,
content_is_form=True,
custom_headers=[
("Cookie", cookieheader),
# old versions of twisted don't do form-parsing without a valid
# content-length header.
("Content-Length", str(len(content))),
],
)
self.assertEqual(chan.code, 302, chan.result)
location_headers = chan.headers.getRawHeaders("Location")
# ensure that the returned location starts with the requested redirect URL
self.assertEqual(
location_headers[0][: len(client_redirect_url)], client_redirect_url
)

# fish the login token out of the returned redirect uri
parts = urlparse(location_headers[0])
query = parse_qs(parts.query)
login_token = query["loginToken"][0]

# finally, submit the matrix login token to the login API, which gives us our
# matrix access token, mxid, and device id.
chan = self.make_request(
"POST", "/login", content={"type": "m.login.token", "token": login_token},
)
self.assertEqual(chan.code, 200, chan.result)
self.assertEqual(chan.json_body["user_id"], "@bobby:test")


async def _make_callback_with_userinfo(
hs: HomeServer, userinfo: dict, client_redirect_url: str = "http://client/redirect"
) -> None:
Expand Down
105 changes: 104 additions & 1 deletion tests/rest/client/v1/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import urllib.parse
from html.parser import HTMLParser
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import parse_qs, urlencode, urlparse

from mock import Mock

Expand All @@ -30,13 +31,14 @@
from synapse.rest.client.v2_alpha import devices, register
from synapse.rest.client.v2_alpha.account import WhoamiRestServlet
from synapse.rest.synapse.client.pick_idp import PickIdpResource
from synapse.rest.synapse.client.pick_username import pick_username_resource
from synapse.types import create_requester

from tests import unittest
from tests.handlers.test_oidc import HAS_OIDC
from tests.handlers.test_saml import has_saml2
from tests.rest.client.v1.utils import TEST_OIDC_AUTH_ENDPOINT, TEST_OIDC_CONFIG
from tests.unittest import override_config, skip_unless
from tests.unittest import HomeserverTestCase, override_config, skip_unless

try:
import jwt
Expand Down Expand Up @@ -1060,3 +1062,104 @@ def test_login_appservice_no_token(self):
channel = self.make_request(b"POST", LOGIN_URL, params)

self.assertEquals(channel.result["code"], b"401", channel.result)


@skip_unless(HAS_OIDC, "requires OIDC")
class UsernamePickerTestCase(HomeserverTestCase):
"""Tests for the username picker flow of SSO login"""

servlets = [login.register_servlets]

def default_config(self):
config = super().default_config()
config["public_baseurl"] = BASE_URL

config["oidc_config"] = {}
config["oidc_config"].update(TEST_OIDC_CONFIG)
config["oidc_config"]["user_mapping_provider"] = {
"config": {"display_name_template": "{{ user.displayname }}"}
}

# whitelist this client URI so we redirect straight to it rather than
# serving a confirmation page
config["sso"] = {"client_whitelist": ["https://whitelisted.client"]}
return config

def create_resource_dict(self) -> Dict[str, Resource]:
from synapse.rest.oidc import OIDCResource

d = super().create_resource_dict()
d["/_synapse/client/pick_username"] = pick_username_resource(self.hs)
d["/_synapse/oidc"] = OIDCResource(self.hs)
return d

def test_username_picker(self):
"""Test the happy path of a username picker flow."""
client_redirect_url = "https://whitelisted.client"

# do the start of the login flow
channel = self.helper.auth_via_oidc(
{"sub": "tester", "displayname": "Jonny"}, client_redirect_url
)

# that should redirect to the username picker
self.assertEqual(channel.code, 302, channel.result)
picker_url = channel.headers.getRawHeaders("Location")[0]
self.assertEqual(picker_url, "/_synapse/client/pick_username")

# ... with a username_mapping_session cookie
cookies = {} # type: Dict[str,str]
channel.extract_cookies(cookies)
self.assertIn("username_mapping_session", cookies)
session_id = cookies["username_mapping_session"]

# introspect the sso handler a bit to check that the username mapping session
# looks ok.
username_mapping_sessions = self.hs.get_sso_handler()._username_mapping_sessions
self.assertIn(
session_id, username_mapping_sessions, "session id not found in map",
)
session = username_mapping_sessions[session_id]
self.assertEqual(session.remote_user_id, "tester")
self.assertEqual(session.display_name, "Jonny")
self.assertEqual(session.client_redirect_url, client_redirect_url)

# the expiry time should be about 15 minutes away
expected_expiry = self.clock.time_msec() + (15 * 60 * 1000)
self.assertApproximates(session.expiry_time_ms, expected_expiry, tolerance=1000)

# Now, submit a username to the username picker, which should serve a redirect
# back to the client
submit_path = picker_url + "/submit"
content = urlencode({b"username": b"bobby"}).encode("utf8")
chan = self.make_request(
"POST",
path=submit_path,
content=content,
content_is_form=True,
custom_headers=[
("Cookie", "username_mapping_session=" + session_id),
# old versions of twisted don't do form-parsing without a valid
# content-length header.
("Content-Length", str(len(content))),
],
)
self.assertEqual(chan.code, 302, chan.result)
location_headers = chan.headers.getRawHeaders("Location")
# ensure that the returned location starts with the requested redirect URL
self.assertEqual(
location_headers[0][: len(client_redirect_url)], client_redirect_url
)

# fish the login token out of the returned redirect uri
parts = urlparse(location_headers[0])
query = parse_qs(parts.query)
login_token = query["loginToken"][0]

# finally, submit the matrix login token to the login API, which gives us our
# matrix access token, mxid, and device id.
chan = self.make_request(
"POST", "/login", content={"type": "m.login.token", "token": login_token},
)
self.assertEqual(chan.code, 200, chan.result)
self.assertEqual(chan.json_body["user_id"], "@bobby:test")
11 changes: 6 additions & 5 deletions tests/rest/client/v1/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ def login_via_oidc(self, remote_user_id: str) -> JsonDict:
the normal places.
"""
client_redirect_url = "https://x"
channel = self.auth_via_oidc(remote_user_id, client_redirect_url)
channel = self.auth_via_oidc({"sub": remote_user_id}, client_redirect_url)

# expect a confirmation page
assert channel.code == 200
assert channel.code == 200, channel.result

# fish the matrix login token out of the body of the confirmation page
m = re.search(
Expand All @@ -390,7 +390,7 @@ def login_via_oidc(self, remote_user_id: str) -> JsonDict:

def auth_via_oidc(
self,
remote_user_id: str,
user_info_dict: JsonDict,
client_redirect_url: Optional[str] = None,
ui_auth_session_id: Optional[str] = None,
) -> FakeChannel:
Expand All @@ -411,7 +411,8 @@ def auth_via_oidc(
the normal places.
Args:
remote_user_id: the remote id that the OIDC provider should present
user_info_dict: the remote userinfo that the OIDC provider should present.
Typically this should be '{"sub": "<remote user id>"}'.
client_redirect_url: for a login flow, the client redirect URL to pass to
the login redirect endpoint
ui_auth_session_id: if set, we will perform a UI Auth flow. The session id
Expand Down Expand Up @@ -457,7 +458,7 @@ def auth_via_oidc(
# a dummy OIDC access token
("https://issuer.test/token", {"access_token": "TEST"}),
# and then one to the user_info endpoint, which returns our remote user id.
("https://issuer.test/userinfo", {"sub": remote_user_id}),
("https://issuer.test/userinfo", user_info_dict),
]

async def mock_req(method: str, uri: str, data=None, headers=None):
Expand Down
2 changes: 1 addition & 1 deletion tests/rest/client/v2_alpha/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def test_ui_auth_via_sso(self):
# run the UIA-via-SSO flow
session_id = channel.json_body["session"]
channel = self.helper.auth_via_oidc(
remote_user_id=remote_user_id, ui_auth_session_id=session_id
{"sub": remote_user_id}, ui_auth_session_id=session_id
)

# that should serve a confirmation page
Expand Down

0 comments on commit 0dd2649

Please sign in to comment.