From a5770309d9bbd91d2f5182cc5dbf4f4eee68057f Mon Sep 17 00:00:00 2001 From: Michael Derynck Date: Fri, 23 Aug 2024 12:21:57 -0600 Subject: [PATCH] Add more validation when updating api_token (#4918) # What this PR does Skip updating stored api_token for Grafana if it does not look like one. Note: Exact format is not checked (prefix) since there are some differences between versions for what API tokens might look like and this should tolerate those differences. ## Which issue(s) this PR closes Related to [issue link here] ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes. --- engine/apps/grafana_plugin/helpers/gcom.py | 23 +++++++++++++++++- engine/apps/grafana_plugin/tests/test_gcom.py | 24 +++++++++++++++---- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/engine/apps/grafana_plugin/helpers/gcom.py b/engine/apps/grafana_plugin/helpers/gcom.py index 8d8afe8dd4..e327c7f7b1 100644 --- a/engine/apps/grafana_plugin/helpers/gcom.py +++ b/engine/apps/grafana_plugin/helpers/gcom.py @@ -12,6 +12,7 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) GCOM_TOKEN_CHECK_PERIOD = timezone.timedelta(minutes=60) +MIN_GRAFANA_TOKEN_LENGTH = 16 class GcomToken: @@ -19,6 +20,14 @@ def __init__(self, organization): self.organization = organization +def _validate_grafana_token_format(grafana_token: str) -> bool: + if not grafana_token or not isinstance(grafana_token, str): + return False + if len(grafana_token) < MIN_GRAFANA_TOKEN_LENGTH: + return False + return True + + def check_gcom_permission(token_string: str, context) -> GcomToken: """ Verify that request from plugin is valid. Check it and synchronize the organization details @@ -45,6 +54,8 @@ def check_gcom_permission(token_string: str, context) -> GcomToken: if not instance_info or str(instance_info["orgId"]) != org_id: raise InvalidToken + grafana_token_format_is_valid = _validate_grafana_token_format(grafana_token) + if not organization: from apps.base.models import DynamicSetting @@ -52,6 +63,11 @@ def check_gcom_permission(token_string: str, context) -> GcomToken: name="allow_plugin_organization_signup", defaults={"boolean_value": True} )[0].boolean_value if allow_signup: + if not grafana_token_format_is_valid: + logger.debug( + f"grafana token sent when creating stack_id={stack_id} was invalid format. api_token will still be written to DB" + ) + # Get org from db or create a new one organization, _ = Organization.objects.get_or_create( stack_id=instance_info["id"], @@ -74,8 +90,13 @@ def check_gcom_permission(token_string: str, context) -> GcomToken: organization.grafana_url = instance_info["url"] organization.cluster_slug = instance_info["clusterSlug"] organization.gcom_token = token_string - organization.api_token = grafana_token organization.gcom_token_org_last_time_synced = timezone.now() + if not grafana_token_format_is_valid: + logger.debug( + f"grafana token sent when updating stack_id={stack_id} was invalid, api_token in DB will be unchanged" + ) + else: + organization.api_token = grafana_token organization.save( update_fields=[ "stack_slug", diff --git a/engine/apps/grafana_plugin/tests/test_gcom.py b/engine/apps/grafana_plugin/tests/test_gcom.py index e5524e7a03..c70f0719e4 100644 --- a/engine/apps/grafana_plugin/tests/test_gcom.py +++ b/engine/apps/grafana_plugin/tests/test_gcom.py @@ -5,10 +5,22 @@ from apps.grafana_plugin.helpers.gcom import check_gcom_permission +@pytest.mark.parametrize( + "api_token, api_token_updated", + [ + ("glsa_abcdefghijklmnopqrztuvwxyz", True), + ("abcdefghijklmnopqrztuvwxyz", True), + ("abc", False), + ("", False), + ("", False), + (None, False), + (24, False), + ], +) @pytest.mark.django_db -def test_check_gcom_permission_updates_fields(make_organization): +def test_check_gcom_permission_updates_fields(make_organization, api_token, api_token_updated): gcom_token = "gcom:test_token" - fixed_token = "fixed_token" + broken_token = "broken_token" instance_info = { "id": 324534, "slug": "testinstance", @@ -22,10 +34,11 @@ def test_check_gcom_permission_updates_fields(make_organization): context = { "stack_id": str(instance_info["id"]), "org_id": str(instance_info["orgId"]), - "grafana_token": fixed_token, + "grafana_token": api_token, } - org = make_organization(stack_id=instance_info["id"], org_id=instance_info["orgId"], api_token="broken_token") + org = make_organization(stack_id=instance_info["id"], org_id=instance_info["orgId"], api_token=broken_token) + last_time_gcom_synced = org.gcom_token_org_last_time_synced with patch( "apps.grafana_plugin.helpers.GcomAPIClient.get_instance_info", @@ -43,5 +56,6 @@ def test_check_gcom_permission_updates_fields(make_organization): assert org.org_title == instance_info["orgName"] assert org.region_slug == instance_info["regionSlug"] assert org.cluster_slug == instance_info["clusterSlug"] - assert org.api_token == fixed_token + assert org.api_token == api_token if api_token_updated else broken_token assert org.gcom_token == gcom_token + assert org.gcom_token_org_last_time_synced != last_time_gcom_synced