Skip to content

Commit

Permalink
[redhat] Change authentication method for RHEL
Browse files Browse the repository at this point in the history
The authentication method for RHEL uploads to the
customer portal is changing at the beginning of December to
Device Auth tokens, from user/password basic authorization.
The new method  requires storing authentication tokens
and we'll be doing that using the took 'keyctl' to access
the Linux Kernel Key Retention Service.

Two new classes are created:
- Key (keyring.py), that takes care of access to the keyctl tool, and
- DeviceAuth (deviceauth.py), that takes care of managing OID token
authentication.

Closes: RH: SUPDEV-63

Signed-off-by: Jose Castillo <jcastillo@redhat.com>
  • Loading branch information
jcastill committed Oct 10, 2023
1 parent c9de414 commit c3f8413
Show file tree
Hide file tree
Showing 3 changed files with 789 additions and 25 deletions.
291 changes: 291 additions & 0 deletions sos/deviceauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
# Copyright (C) 2023 Red Hat, Inc., Jose Castillo <jcastillo@redhat.com>

# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.

import logging
import requests
import time
from datetime import datetime, timedelta
import os
from sos.keyring import Key, KeyNotFoundError
import shutil

DEVICE_AUTH_CLIENT_ID = "sos-tools"
GRANT_TYPE_DEVICE_CODE = "urn:ietf:params:oauth:grant-type:device_code"
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

logger = logging.getLogger("sos")


def try_read_refresh_token():
"""Try to read locally stored refresh token
Returns:
str: Returns ODIC refresh token if found otherwise None
"""
try:
RHELKey = Key()
key_refresh_token = RHELKey.search('sos-tools_refresh_token')
key_username = RHELKey.search('sos-tools-user')
except KeyNotFoundError:
logger.info("Refresh token does not exist in keyring or is expired.")
return None
except Exception as e:
logger.error("Error encoutered while accessing keyring. {}".format(e))
return None
if key_username.data.decode() != os.getlogin():
return None

return key_refresh_token.data.decode()


class AuthClass:
"""
Device Authorization Class
"""

def __init__(self):

self.client_identifier_url = "https://sso.redhat.com/auth/"\
"realms/redhat-external/protocol/openid-connect/auth/device"
self.token_endpoint = "https://sso.redhat.com/auth/realms/"\
"redhat-external/protocol/openid-connect/token"
self._access_token = None
self._access_expires_at = None
self._refresh_token = None
self._refresh_expires_at = None
self._refresh_expires_in = None
self._user_verification_url = None
self.__device_code = None
self.RHELKey = Key()

# Lets check first if we have keyctl installed so we can
# store the token in the keyring
if not shutil.which("keyctl"):
raise Exception("keyctl tool is not installed"
" and is required to store auth tokens.")

self._use_device_code_grant()

def _use_device_code_grant(self):
"""
Start the device auth flow. First check for the refresh token stored in
the session keyring. If they are not stored
or are invalid, request new device code. If the stored refresh token is
valid, use it to get new access_token
"""
stored_refresh_token = try_read_refresh_token()

if not stored_refresh_token:
self._request_device_code()
print(
"Please visit the following URL to authenticate this"
f" device {self._verification_uri_complete}")
self.poll_for_auth_completion()
else:
self._use_refresh_token_grant(stored_refresh_token)

def _request_device_code(self):
"""
Initialize new Device Authorization Grant attempt by
requesting a new device code.
"""
data = "client_id={}".format(DEVICE_AUTH_CLIENT_ID)
headers = {'content-type': 'application/x-www-form-urlencoded'}
try:
res = requests.post(
self.client_identifier_url,
data=data,
headers=headers)
res.raise_for_status()
response = res.json()
self._user_code = response.get("user_code")
self._verification_uri = response.get("verification_uri")
self._interval = response.get("interval")
self.__device_code = response.get("device_code")
self._verification_uri_complete = response.get(
"verification_uri_complete")
except requests.HTTPError as e:
raise e
except Exception as e:
raise e

def poll_for_auth_completion(self):
"""
Continuously poll OIDC token endpoint until the user is successfully
authenticated or an error occurs.
"""
token_data = {'grant_type': GRANT_TYPE_DEVICE_CODE,
'client_id': DEVICE_AUTH_CLIENT_ID,
'device_code': self.__device_code}

while self._access_token is None:
time.sleep(self._interval)
try:
check_auth_completion = requests.post(self.token_endpoint,
data=token_data)

status_code = check_auth_completion.status_code

if status_code == 200:
logger.info("The SSO authentication is successful")
self._set_token_data(check_auth_completion.json())
if status_code not in [200, 400]:
raise Exception(status_code, check_auth_completion.text)
if status_code == 400 and \
check_auth_completion.json()['error'] not in \
("authorization_pending", "slow_down"):
raise Exception(status_code, check_auth_completion.text)
except Exception as e:
raise e

def _set_token_data(self, token_data):
"""
Set the class attributes as per the input token_data received and
persist it in the local keyring to avoid
visting the browser frequently.
:param token_data: Token data containing access_token, refresh_token
and their expiry etc.
"""
self._access_token = token_data.get("access_token")
self._access_expires_at = datetime.utcnow(
) + timedelta(seconds=token_data.get("expires_in"))
self._refresh_token = token_data.get("refresh_token")
self._refresh_expires_in = token_data.get("refresh_expires_in")
if self._refresh_expires_in == 0:
self._refresh_expires_at = datetime.max
else:
self._refresh_expires_at = datetime.utcnow(
) + timedelta(seconds=self._refresh_expires_in)
self.persist_refresh_token()

def get_access_token(self):

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error as implicit returns always return None.
"""
Get the valid access_token at any given time.
:return: Access_token
:rtype: string
"""
if self.is_access_token_valid():
return self._access_token

if self.grant_type == "client_credentials":
self._use_client_credentials_grant()
return self._access_token

elif self.grant_type == "device_auth":
if self.is_refresh_token_valid():
self._use_refresh_token_grant()
return self._access_token
else:
return self.request_new_device_code()

def is_access_token_valid(self):
"""
Check the validity of access_token. We are considering it invalid 180
sec. prior to it's exact expiry time.
:return: True/False
"""
return self._access_token and self._access_expires_at and \
self._access_expires_at - timedelta(seconds=180) > \
datetime.utcnow()

def is_refresh_token_valid(self):
"""
Check the validity of refresh_token. We are considering it invalid
180 sec. prior to it's exact expiry time.
:return: True/False
"""
return self._refresh_token and self._refresh_expires_at and \
self._refresh_expires_at - timedelta(seconds=180) > \
datetime.utcnow()

def _use_refresh_token_grant(self, refresh_token=None):
"""
Fetch the new access_token and refresh_token using the existing
refresh_token and persist it.
:param refresh_token: optional param for refresh_token
"""
refresh_token_data = {'client_id': DEVICE_AUTH_CLIENT_ID,
'grant_type': 'refresh_token',
'refresh_token': self._refresh_token if not
refresh_token else refresh_token}

refresh_token_res = requests.post(self.token_endpoint,
data=refresh_token_data)

if refresh_token_res.status_code == 200:
self._set_token_data(refresh_token_res.json())

elif refresh_token_res.status_code == 400 and 'invalid' in\
refresh_token_res.json()['error']:
logger.warning("Problem while fetching the new tokens from refresh"
" token grant - {} {}."
" New Device code will be requested !".format
(refresh_token_res.status_code,
refresh_token_res.json()['error']))
self.request_new_device_code()
else:
raise Exception(
"Something went wrong while using the "
"Refresh token grant for fetching tokens.")

def request_new_device_code(self):
"""Initialize new Device Authorization Grant
attempt by requesting a new device code.
"""
self._use_device_code_grant()

def persist_refresh_token(self):
"""Persist current refresh token in keyring using keyctl
Returns:
bool: True if refresh token was successfully
persisted, otherwise False
"""
if self.is_refresh_token_valid():
try:
key_refresh_token = self.RHELKey.search(
'sos-tools_refresh_token')
key_refresh_token.update(self._refresh_token)
except KeyNotFoundError:
key_refresh_token = self.RHELKey.add(
'sos-tools_refresh_token', self._refresh_token)
except Exception as e:
logger.error(
"Keyctl error encountered while reading "
"keyring {}".format(e))
return False

try:
key_username = self.RHELKey.search('sos-tools-user')
key_username.update(os.getlogin())
except KeyNotFoundError:
key_username = self.RHELKey.add(
'sos-tools-user', os.getlogin())
except Exception as e:
logger.error(
"Keyctl error encountered while reading "
"keyring {}".format(e))
return False

key_refresh_token.set_timeout(self._refresh_expires_in - 300)
key_username.set_timeout(self._refresh_expires_in - 300)

return True
else:
logger.info("Cannot save invalid refresh token in keyring")
return False
Loading

0 comments on commit c3f8413

Please sign in to comment.