From 774d2757b44a126ed86a5bea32e516766309c348 Mon Sep 17 00:00:00 2001 From: Jose Castillo Date: Mon, 9 Oct 2023 16:28:15 +0100 Subject: [PATCH] [redhat] Change authentication method for RHEL The authentication method for RHEL uploads to the customer portal is changing in 2024 to Device Auth tokens, from user/password basic authorization. To accomplish this, one new class is created: DeviceAuth (deviceauth.py), that takes care of managing OID token authentication. Closes: RH: SUPDEV-63 Signed-off-by: Jose Castillo --- sos/policies/auth/deviceauth.py | 216 ++++++++++++++++++++++++++++++++ sos/policies/distros/redhat.py | 121 +++++++++++++----- 2 files changed, 306 insertions(+), 31 deletions(-) create mode 100644 sos/policies/auth/deviceauth.py diff --git a/sos/policies/auth/deviceauth.py b/sos/policies/auth/deviceauth.py new file mode 100644 index 0000000000..b1c95d1be4 --- /dev/null +++ b/sos/policies/auth/deviceauth.py @@ -0,0 +1,216 @@ +# Copyright (C) 2023 Red Hat, Inc., Jose Castillo + +# This file is part of the sos project: https://github.com/sosreport/sos +# +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions of +# version 2 of the GNU General Public License. +# +# See the LICENSE file in the source distribution for further information. + +import logging +try: + import requests + REQUESTS_LOADED = True +except ImportError: + REQUESTS_LOADED = False +import time +from datetime import datetime, timedelta + +DEVICE_AUTH_CLIENT_ID = "sos-tools" +GRANT_TYPE_DEVICE_CODE = "urn:ietf:params:oauth:grant-type:device_code" + +logger = logging.getLogger("sos") + + +class DeviceAuthorizationClass: + """ + Device Authorization Class + """ + + def __init__(self, client_identifier_url, token_endpoint): + + self.client_identifier_url = None + self.token_endpoint = None + self._access_token = None + self._access_expires_at = None + self._refresh_token = None + self._refresh_expires_at = None + self._refresh_expires_in = None + self._user_verification_url = None + self.__device_code = None + + self.client_identifier_url = client_identifier_url + self.token_endpoint = token_endpoint + self._use_device_code_grant() + + def _use_device_code_grant(self): + """ + Start the device auth flow. In the future we will + store the tokens in an in-memory keyring. + + """ + + self._request_device_code() + print( + "Please visit the following URL to authenticate this" + f" device: {self._verification_uri_complete}" + ) + self.poll_for_auth_completion() + + def _request_device_code(self): + """ + Initialize new Device Authorization Grant attempt by + requesting a new device code. + + """ + data = "client_id={}".format(DEVICE_AUTH_CLIENT_ID) + headers = {'content-type': 'application/x-www-form-urlencoded'} + if not REQUESTS_LOADED: + raise Exception("python3-requests is not installed and is required" + " for obtaining device auth token.") + try: + res = requests.post( + self.client_identifier_url, + data=data, + headers=headers) + res.raise_for_status() + response = res.json() + self._user_code = response.get("user_code") + self._verification_uri = response.get("verification_uri") + self._interval = response.get("interval") + self.__device_code = response.get("device_code") + self._verification_uri_complete = response.get( + "verification_uri_complete") + except requests.HTTPError as e: + raise requests.HTTPError("HTTP request failed " + "while attempting to acquire the tokens." + f"Error returned was {res.status_code} " + f"{e}") + + def poll_for_auth_completion(self): + """ + Continuously poll OIDC token endpoint until the user is successfully + authenticated or an error occurs. + + """ + token_data = {'grant_type': GRANT_TYPE_DEVICE_CODE, + 'client_id': DEVICE_AUTH_CLIENT_ID, + 'device_code': self.__device_code} + + if not REQUESTS_LOADED: + raise Exception("python3-requests is not installed and is required" + " for obtaining device auth token.") + while self._access_token is None: + time.sleep(self._interval) + try: + check_auth_completion = requests.post(self.token_endpoint, + data=token_data) + + status_code = check_auth_completion.status_code + + if status_code == 200: + logger.info("The SSO authentication is successful") + self._set_token_data(check_auth_completion.json()) + if status_code not in [200, 400]: + raise Exception(status_code, check_auth_completion.text) + if status_code == 400 and \ + check_auth_completion.json()['error'] not in \ + ("authorization_pending", "slow_down"): + raise Exception(status_code, check_auth_completion.text) + except requests.exceptions.RequestException as e: + logger.error(f"Error was found while posting a request: {e}") + + def _set_token_data(self, token_data): + """ + Set the class attributes as per the input token_data received. + In the future we will persist the token data in a local, + in-memory keyring, to avoid visting the browser frequently. + :param token_data: Token data containing access_token, refresh_token + and their expiry etc. + """ + self._access_token = token_data.get("access_token") + self._access_expires_at = datetime.utcnow() + \ + timedelta(seconds=token_data.get("expires_in")) + self._refresh_token = token_data.get("refresh_token") + self._refresh_expires_in = token_data.get("refresh_expires_in") + if self._refresh_expires_in == 0: + self._refresh_expires_at = datetime.max + else: + self._refresh_expires_at = datetime.utcnow() + \ + timedelta(seconds=self._refresh_expires_in) + + def get_access_token(self): + """ + Get the valid access_token at any given time. + :return: Access_token + :rtype: string + """ + if self.is_access_token_valid(): + return self._access_token + else: + if self.is_refresh_token_valid(): + self._use_refresh_token_grant() + return self._access_token + else: + self._use_device_code_grant() + return self._access_token + + def is_access_token_valid(self): + """ + Check the validity of access_token. We are considering it invalid 180 + sec. prior to it's exact expiry time. + :return: True/False + + """ + return self._access_token and self._access_expires_at and \ + self._access_expires_at - timedelta(seconds=180) > \ + datetime.utcnow() + + def is_refresh_token_valid(self): + """ + Check the validity of refresh_token. We are considering it invalid + 180 sec. prior to it's exact expiry time. + + :return: True/False + + """ + return self._refresh_token and self._refresh_expires_at and \ + self._refresh_expires_at - timedelta(seconds=180) > \ + datetime.utcnow() + + def _use_refresh_token_grant(self, refresh_token=None): + """ + Fetch the new access_token and refresh_token using the existing + refresh_token and persist it. + :param refresh_token: optional param for refresh_token + + """ + if not REQUESTS_LOADED: + raise Exception("python3-requests is not installed and is required" + " for obtaining device auth token.") + refresh_token_data = {'client_id': DEVICE_AUTH_CLIENT_ID, + 'grant_type': 'refresh_token', + 'refresh_token': self._refresh_token if not + refresh_token else refresh_token} + + refresh_token_res = requests.post(self.token_endpoint, + data=refresh_token_data) + + if refresh_token_res.status_code == 200: + self._set_token_data(refresh_token_res.json()) + + elif refresh_token_res.status_code == 400 and 'invalid' in\ + refresh_token_res.json()['error']: + logger.warning("Problem while fetching the new tokens from refresh" + " token grant - {} {}." + " New Device code will be requested !".format + (refresh_token_res.status_code, + refresh_token_res.json()['error'])) + self._use_device_code_grant() + else: + raise Exception( + "Something went wrong while using the " + "Refresh token grant for fetching tokens:" + f" Returned status code {refresh_token_res.status_code}" + f" and error {refresh_token_res.json()['error']}") diff --git a/sos/policies/distros/redhat.py b/sos/policies/distros/redhat.py index bdbe8f9529..46c0357391 100644 --- a/sos/policies/distros/redhat.py +++ b/sos/policies/distros/redhat.py @@ -12,6 +12,7 @@ import os import sys import re +from sos.policies.auth.deviceauth import DeviceAuthorizationClass from sos.report.plugins import RedHatPlugin from sos.presets.redhat import (RHEL_PRESETS, ATOMIC_PRESETS, RHV, RHEL, @@ -51,6 +52,10 @@ class RedHatPolicy(LinuxPolicy): default_container_runtime = 'podman' sos_pkg_name = 'sos' sos_bin_path = '/usr/sbin' + client_identifier_url = "https://sso.redhat.com/auth/"\ + "realms/redhat-external/protocol/openid-connect/auth/device" + token_endpoint = "https://sso.redhat.com/auth/realms/"\ + "redhat-external/protocol/openid-connect/token" def __init__(self, sysroot=None, init=None, probe_runtime=True, remote_exec=None): @@ -228,6 +233,7 @@ class RHELPolicy(RedHatPolicy): """ + disclaimer_text + "%(vendor_text)s\n") _upload_url = RH_SFTP_HOST _upload_method = 'post' + _device_token = None def __init__(self, sysroot=None, init=None, probe_runtime=True, remote_exec=None): @@ -266,24 +272,23 @@ def check(cls, remote=''): def prompt_for_upload_user(self): if self.commons['cmdlineopts'].upload_user: - return - # Not using the default, so don't call this prompt for RHCP - if self.commons['cmdlineopts'].upload_url: - super(RHELPolicy, self).prompt_for_upload_user() - return - if not self.get_upload_user(): - if self.case_id: - self.upload_user = input(_( - "Enter your Red Hat Customer Portal username for " - "uploading [empty for anonymous SFTP]: ") - ) - else: # no case id provided => failover to SFTP - self.upload_url = RH_SFTP_HOST - self.ui_log.info("No case id provided, uploading to SFTP") - self.upload_user = input(_( - "Enter your Red Hat Customer Portal username for " - "uploading to SFTP [empty for anonymous]: ") - ) + self.ui_log.info( + _("The option --upload-user has been deprecated in favour" + " of device authorization in RHEL") + ) + if not self.case_id: + # no case id provided => failover to SFTP + self.upload_url = RH_SFTP_HOST + self.ui_log.info("No case id provided, uploading to SFTP") + + def prompt_for_upload_password(self): + # With OIDC we don't ask for user/pass anymore + if self.commons['cmdlineopts'].upload_pass: + self.ui_log.info( + _("The option --upload-pass has been deprecated in favour" + " of device authorization in RHEL") + ) + return def get_upload_url(self): if self.upload_url: @@ -292,10 +297,42 @@ def get_upload_url(self): return self.commons['cmdlineopts'].upload_url elif self.commons['cmdlineopts'].upload_protocol == 'sftp': return RH_SFTP_HOST + elif not self.commons['cmdlineopts'].case_id: + self.ui_log.info("No case id provided, uploading to SFTP") + return RH_SFTP_HOST else: rh_case_api = "/support/v1/cases/%s/attachments" return RH_API_HOST + rh_case_api % self.case_id + def _get_upload_https_auth(self): + str_auth = "Bearer {}".format(self._device_token) + return {'Authorization': str_auth} + + def _upload_https_post(self, archive, verify=True): + """If upload_https() needs to use requests.post(), use this method. + + Policies should override this method instead of the base upload_https() + + :param archive: The open archive file object + """ + files = { + 'file': (archive.name.split('/')[-1], archive, + self._get_upload_headers()) + } + # Get the access token at this point. With this, + # we cover the cases where report generation takes + # longer than the token timeout + RHELAuth = DeviceAuthorizationClass( + self.client_identifier_url, + self.token_endpoint + ) + self._device_token = RHELAuth.get_access_token() + self.ui_log.info("Device authorized correctly. Uploading file to " + f"{self.get_upload_url_string()}") + return requests.post(self.get_upload_url(), files=files, + headers=self._get_upload_https_auth(), + verify=verify) + def _get_upload_headers(self): if self.get_upload_url().startswith(RH_API_HOST): return {'isPrivate': 'false', 'cache-control': 'no-cache'} @@ -332,15 +369,38 @@ def upload_sftp(self): " for obtaining SFTP auth token.") _token = None _user = None + + # We may have a device token already if we attempted + # to upload via http but the upload failed. So + # lets check first if there isn't one. + if not self._device_token: + try: + RHELAuth = DeviceAuthorizationClass( + self.client_identifier_url, + self.token_endpoint + ) + except Exception as e: + # We end up here if the user cancels the device + # authentication in the web interface + if "end user denied" in str(e): + self.ui_log.info( + "Device token authorization " + "has been cancelled by the user." + ) + else: + self._device_token = RHELAuth.get_access_token() + if self._device_token: + self.ui_log.info("Device authorized correctly. Uploading file to" + f" {self.get_upload_url_string()}") + url = RH_API_HOST + '/support/v2/sftp/token' - # we have a username and password, but we need to reset the password - # to be the token returned from the auth endpoint - if self.get_upload_user() and self.get_upload_password(): - auth = self.get_upload_https_auth() - ret = requests.post(url, auth=auth, timeout=10) + ret = None + if self._device_token: + headers = self._get_upload_https_auth() + ret = requests.post(url, headers=headers, timeout=10) if ret.status_code == 200: # credentials are valid - _user = self.get_upload_user() + _user = json.loads(ret.text)['username'] _token = json.loads(ret.text)['token'] else: self.ui_log.debug( @@ -351,8 +411,7 @@ def upload_sftp(self): "Unable to retrieve Red Hat auth token using provided " "credentials. Will try anonymous." ) - # we either do not have a username or password/token, or both - if not _token: + else: adata = {"isAnonymous": True} anon = requests.post(url, data=json.dumps(adata), timeout=10) if anon.status_code == 200: @@ -368,7 +427,6 @@ def upload_sftp(self): f"DEBUG: anonymous request failed (status: " f"{anon.status_code}): {anon.json()}" ) - if _user and _token: return super(RHELPolicy, self).upload_sftp(user=_user, password=_token) @@ -380,17 +438,18 @@ def upload_archive(self, archive): """ try: if self.upload_url and self.upload_url.startswith(RH_API_HOST) and\ - (not self.get_upload_user() or not self.get_upload_password()): + (not self.get_upload_user() or + not self.get_upload_password()): self.upload_url = RH_SFTP_HOST uploaded = super(RHELPolicy, self).upload_archive(archive) - except Exception: + except Exception as e: uploaded = False if not self.upload_url.startswith(RH_API_HOST): raise else: self.ui_log.error( - _(f"Upload to Red Hat Customer Portal failed. Trying " - f"{RH_SFTP_HOST}") + _(f"Upload to Red Hat Customer Portal failed due to " + f"{e}. Trying {RH_SFTP_HOST}") ) self.upload_url = RH_SFTP_HOST uploaded = super(RHELPolicy, self).upload_archive(archive)