Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more validation when updating api_token #4918

Merged
merged 2 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion engine/apps/grafana_plugin/helpers/gcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,22 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
GCOM_TOKEN_CHECK_PERIOD = timezone.timedelta(minutes=60)
MIN_GRAFANA_TOKEN_LENGTH = 16


class GcomToken:
def __init__(self, organization):
self.organization = organization


def _validate_grafana_token_format(grafana_token: str) -> bool:
if not grafana_token or not isinstance(grafana_token, str):
return False
if len(grafana_token) < MIN_GRAFANA_TOKEN_LENGTH:
return False
return True


def check_gcom_permission(token_string: str, context) -> GcomToken:
"""
Verify that request from plugin is valid. Check it and synchronize the organization details
Expand All @@ -45,13 +54,20 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
if not instance_info or str(instance_info["orgId"]) != org_id:
raise InvalidToken

grafana_token_format_is_valid = _validate_grafana_token_format(grafana_token)

if not organization:
from apps.base.models import DynamicSetting

allow_signup = DynamicSetting.objects.get_or_create(
name="allow_plugin_organization_signup", defaults={"boolean_value": True}
)[0].boolean_value
if allow_signup:
if not grafana_token_format_is_valid:
logger.debug(
f"grafana token sent when creating stack_id={stack_id} was invalid format. api_token will still be written to DB"
)

# Get org from db or create a new one
organization, _ = Organization.objects.get_or_create(
stack_id=instance_info["id"],
Expand All @@ -74,8 +90,13 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
organization.grafana_url = instance_info["url"]
organization.cluster_slug = instance_info["clusterSlug"]
organization.gcom_token = token_string
organization.api_token = grafana_token
organization.gcom_token_org_last_time_synced = timezone.now()
if not grafana_token_format_is_valid:
logger.debug(
f"grafana token sent when updating stack_id={stack_id} was invalid, api_token in DB will be unchanged"
)
else:
organization.api_token = grafana_token
organization.save(
update_fields=[
"stack_slug",
Expand Down
24 changes: 19 additions & 5 deletions engine/apps/grafana_plugin/tests/test_gcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@
from apps.grafana_plugin.helpers.gcom import check_gcom_permission


@pytest.mark.parametrize(
"api_token, api_token_updated",
[
("glsa_abcdefghijklmnopqrztuvwxyz", True),
("abcdefghijklmnopqrztuvwxyz", True),
("abc", False),
("", False),
("<no_value>", False),
(None, False),
(24, False),
],
)
@pytest.mark.django_db
def test_check_gcom_permission_updates_fields(make_organization):
def test_check_gcom_permission_updates_fields(make_organization, api_token, api_token_updated):
gcom_token = "gcom:test_token"
fixed_token = "fixed_token"
broken_token = "broken_token"
instance_info = {
"id": 324534,
"slug": "testinstance",
Expand All @@ -22,10 +34,11 @@ def test_check_gcom_permission_updates_fields(make_organization):
context = {
"stack_id": str(instance_info["id"]),
"org_id": str(instance_info["orgId"]),
"grafana_token": fixed_token,
"grafana_token": api_token,
}

org = make_organization(stack_id=instance_info["id"], org_id=instance_info["orgId"], api_token="broken_token")
org = make_organization(stack_id=instance_info["id"], org_id=instance_info["orgId"], api_token=broken_token)
last_time_gcom_synced = org.gcom_token_org_last_time_synced

with patch(
"apps.grafana_plugin.helpers.GcomAPIClient.get_instance_info",
Expand All @@ -43,5 +56,6 @@ def test_check_gcom_permission_updates_fields(make_organization):
assert org.org_title == instance_info["orgName"]
assert org.region_slug == instance_info["regionSlug"]
assert org.cluster_slug == instance_info["clusterSlug"]
assert org.api_token == fixed_token
assert org.api_token == api_token if api_token_updated else broken_token
assert org.gcom_token == gcom_token
assert org.gcom_token_org_last_time_synced != last_time_gcom_synced
Loading