From 04a5e5fda3b53d793b3eb361c80c2074673455c9 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Wed, 9 Oct 2024 09:22:14 -0400 Subject: [PATCH] Refactor confirmation link logic and add unit tests --- admin/users/views.py | 12 ++++--- admin_tests/users/test_views.py | 55 ++++++++++++++++++++++++++++----- osf/models/user.py | 17 ++++++++++ osf/utils/users.py | 28 +++-------------- 4 files changed, 76 insertions(+), 36 deletions(-) diff --git a/admin/users/views.py b/admin/users/views.py index 7d21cde86fd..1ed6e566680 100644 --- a/admin/users/views.py +++ b/admin/users/views.py @@ -16,12 +16,12 @@ from django.core.mail import send_mail from django.shortcuts import redirect from django.core.paginator import Paginator +from django.core.exceptions import ValidationError from osf.exceptions import UserStateError from osf.models.base import Guid from osf.models.user import OSFUser from osf.models.spam import SpamStatus -from osf.utils.users import get_or_refresh_confirmation_link from framework.auth import get_user from framework.auth.core import generate_verification_key @@ -457,10 +457,12 @@ def get_context_data(self, **kwargs): class GetUserConfirmationLink(GetUserLink): def get_link(self, user): - result = get_or_refresh_confirmation_link(user._id) - if 'confirmation_link' in result: - return result['confirmation_link'] - return result['error'] + try: + return user.get_or_create_confirmation_url(user.username, force=True, renew=True) + except ValidationError as e: + return str(e) + except KeyError as e: + return str(e) def get_link_type(self): return 'User Confirmation' diff --git a/admin_tests/users/test_views.py b/admin_tests/users/test_views.py index b0104fb8616..cd51459e134 100644 --- a/admin_tests/users/test_views.py +++ b/admin_tests/users/test_views.py @@ -488,11 +488,13 @@ def test_get_user_confirmation_link(self): link = view.get_link(user) - link_furl = furl(link) - generated_token = link_furl.path.segments[-1] + user.refresh_from_db() - ideal_link_path = f'/confirm/{user._id}/{generated_token}' - link_path = str(furl(link).path) + user_token = list(user.email_verifications.keys())[0] + + ideal_link_path = f'/confirm/{user._id}/{user_token}/' + + link_path = str(furl(link).path).rstrip('/') + '/' assert link_path == ideal_link_path @@ -507,15 +509,52 @@ def test_get_user_confirmation_link_with_expired_token(self): user.save() link = view.get_link(user) + new_user_token = list(user.email_verifications.keys())[0] - link_furl = furl(link) - generated_token = link_furl.path.segments[-1] - - ideal_link_path = f'/confirm/{user._id}/{generated_token}' link_path = str(furl(link).path) + ideal_link_path = f'/confirm/{user._id}/{new_user_token}/' assert link_path == ideal_link_path + def test_get_user_confirmation_link_generates_new_token_if_expired(self): + user = UnconfirmedUserFactory() + request = RequestFactory().get('/fake_path') + view = views.GetUserConfirmationLink() + view = setup_view(view, request, guid=user._id) + + old_user_token = list(user.email_verifications.keys())[0] + user.email_verifications[old_user_token]['expiration'] = datetime.utcnow().replace(tzinfo=pytz.utc) - timedelta(hours=24) + user.save() + + link = view.get_link(user) + user.refresh_from_db() + + new_user_token = list(user.email_verifications.keys())[0] + + assert new_user_token != old_user_token + + link_path = str(furl(link).path) + ideal_link_path = f'/confirm/{user._id}/{new_user_token}/' + assert link_path == ideal_link_path + + def test_get_user_confirmation_link_does_not_change_unexpired_token(self): + user = UnconfirmedUserFactory() + request = RequestFactory().get('/fake_path') + view = views.GetUserConfirmationLink() + view = setup_view(view, request, guid=user._id) + + user_token_before = list(user.email_verifications.keys())[0] + + user.email_verifications[user_token_before]['expiration'] = datetime.utcnow().replace(tzinfo=pytz.utc) + timedelta(hours=24) + user.save() + + with mock.patch('osf.models.user.OSFUser.get_or_create_confirmation_url') as mock_method: + mock_method.return_value = user.get_confirmation_url(user.username, force=False, renew=False) + + user_token_after = list(user.email_verifications.keys())[0] + + assert user_token_before == user_token_after + def test_get_password_reset_link(self): user = UnconfirmedUserFactory() request = RequestFactory().get('/fake_path') diff --git a/osf/models/user.py b/osf/models/user.py index 29e10efa991..d0783c208aa 100644 --- a/osf/models/user.py +++ b/osf/models/user.py @@ -1342,6 +1342,23 @@ def get_confirmation_url(self, email, destination = '?{}'.format(urlencode({'destination': destination})) if destination else '' return f'{base}confirm/{external}{self._primary_key}/{token}/{destination}' + def get_or_create_confirmation_url(self, email, force=False, renew=False): + """ + Get or create a confirmation URL for the given email. + + :param email: The email to generate a confirmation URL for. + :param force: Force generating a new confirmation link. + :param renew: Renew an expired token. + :raises ValidationError: If email is invalid or domain is banned. + :return: Confirmation URL for the email. + """ + try: + self.get_confirmation_token(email, force=force, renew=renew) + except KeyError: + self.add_unconfirmed_email(email) + self.save() + return self.get_confirmation_url(email) + def register(self, username, password=None, accepted_terms_of_service=None): """Registers the user. """ diff --git a/osf/utils/users.py b/osf/utils/users.py index 79d2edc8334..1427c62fe19 100644 --- a/osf/utils/users.py +++ b/osf/utils/users.py @@ -1,7 +1,5 @@ -from django.utils import timezone from osf.models import OSFUser from django.core.exceptions import ValidationError -from website.settings import DOMAIN def get_or_refresh_confirmation_link(uid): u = OSFUser.load(uid) @@ -11,26 +9,10 @@ def get_or_refresh_confirmation_link(uid): if u.deleted or u.is_merged: return {'error': f'User {uid} is deleted or merged'} - confirmation_link = None - if u.emails.filter(address=u.username).exists(): - for token, data in u.email_verifications.items(): - if data['email'] == u.username and data['expiration'] < timezone.now(): - u.email_verifications[token]['expiration'] = timezone.now() + timezone.timedelta(days=30) - u.save() - confirmation_link = f'{DOMAIN.rstrip("/")}/confirm/{u._id}/{token}' - return {'confirmation_link': confirmation_link} - else: - try: - u.add_unconfirmed_email(u.username) - u.save() - for token, data in u.email_verifications.items(): - if data['email'] == u.username: - confirmation_link = f'{DOMAIN.rstrip("/")}/confirm/{u._id}/{token}' - return {'confirmation_link': confirmation_link} - except ValidationError: - return {'error': f'Invalid email for user {uid}'} - - if confirmation_link: + try: + confirmation_link = u.get_or_create_confirmation_url(u.username, force=True, renew=True) return {'confirmation_link': confirmation_link} - else: + except ValidationError: + return {'error': f'Invalid email for user {uid}'} + except KeyError: return {'error': 'Could not generate or refresh confirmation link'}