diff --git a/etna/auth0/views.py b/etna/auth0/views.py index 8d211e9936..6ee4657985 100644 --- a/etna/auth0/views.py +++ b/etna/auth0/views.py @@ -7,14 +7,19 @@ from django.http import HttpResponseRedirect from django.shortcuts import redirect from django.urls import reverse +from django.utils import timezone from authlib.integrations.django_client import OAuth +from etna.users.models import IDPProfile + +PROVIDER_NAME = "auth0" + User = get_user_model() oauth = OAuth() oauth.register( - "auth0", + PROVIDER_NAME, client_id=settings.AUTH0_CLIENT_ID, client_secret=settings.AUTH0_CLIENT_SECRET, client_kwargs={ @@ -36,17 +41,61 @@ def login(request): def authorize(request): token = oauth.auth0.authorize_access_token(request) user_info = token["userinfo"] - user, created = User.objects.update_or_create( - username=user_info["email"], - defaults={ - "email": user_info["email"], - "first_name": user_info.get("given_name"), - "last_name": user_info.get("family_name"), - }, - ) - if created: - user.set_unusable_password() - user.save(update_fields=["password"]) + user_id = user_info.get("user_id") or user_info.get("sub") + now = timezone.now() + + user_created = False + idp_profile_exists = True + try: + # First, try to find a user with a matching profile + profile = IDPProfile.objects.select_related("user").get( + provider_name=PROVIDER_NAME, provider_user_id=user_id + ) + user = profile.user + profile.last_login = now + profile.save(update_fields=["last_login"]) + except IDPProfile.DoesNotExist: + try: + # Next, look for a regular Django user without an IDP profile + user = User.objects.get(email=user_info["email"], idp_profiles__isnull=True) + user_created = False + idp_profile_exists = False + except (User.DoesNotExist, User.MultipleObjectsReturned): + # If no Django user was found, create a new one with a unique username + candidate_username = user_info["nickname"][:150] + username = candidate_username + i = 1 + while User.objects.filter(username=username).exists(): + username = f"{candidate_username[:148]}{i}" + i += 1 + + user = User( + username=username, + email=user_info["email"], + first_name=user_info.get("given_name", ""), + last_name=user_info.get("family_name", ""), + ) + user.set_unusable_password() + user.save() + user_created = True + idp_profile_exists = False + + if not user_created: + user.__dict__.update( + email=user_info["email"], + first_name=user_info.get("given_name", ""), + last_name=user_info.get("family_name", ""), + ) + user.save(update_fields=["email", "first_name", "last_name"]) + + if not idp_profile_exists: + IDPProfile.objects.create( + user=user, + provider_name=PROVIDER_NAME, + provider_user_id=user_id, + last_login=now, + ) + auth_login(request, user, backend="etna.auth0.auth_backend.Auth0Backend") return HttpResponseRedirect(request.GET.get("next") or "/")