-
Notifications
You must be signed in to change notification settings - Fork 50
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Key features are - support of resource tree permission inheritance - support for grouping users into teams - efficient generation of querysets Added feature for tracking roles with relationships Added feature to manage singleton permissions
- Loading branch information
1 parent
7341654
commit 4909e55
Showing
28 changed files
with
2,657 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,12 @@ | ||
from django.apps import AppConfig | ||
|
||
import ansible_base.checks # noqa: F401 - register checks | ||
from ansible_base.rbac.permission_registry import permission_registry | ||
|
||
|
||
class AnsibleAuthConfig(AppConfig): | ||
default_auto_field = 'django.db.models.BigAutoField' | ||
name = 'ansible_base' | ||
|
||
def ready(self): | ||
permission_registry.call_when_apps_ready(self.apps) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from django.core.management.base import BaseCommand | ||
|
||
from ansible_base.models.rbac import ObjectRole, RoleDefinition | ||
from ansible_base.rbac import permission_registry | ||
|
||
|
||
class Command(BaseCommand): | ||
help = "Runs bug checking sanity checks, gets scale metrics, and reccomendations for Role Based Access Control" | ||
|
||
def handle(self, *args, **options): | ||
rd_ct = RoleDefinition.objects.count() | ||
self.stdout.write(f'Inspecting {rd_ct} role definitions') | ||
self.stdout.write(' checking for minimum of view permission') | ||
indexed_rds = {} | ||
for rd in RoleDefinition.objects.prefetch_related('permissions'): | ||
perm_list = list(rd.permissions.values_list('codename', flat=True)) | ||
if 'view' not in ' '.join(perm_list): | ||
self.stdout.write(f'Role definition {rd.name} does not list any view permissions and this is considered invalid') | ||
perm_set = frozenset(perm_list) | ||
indexed_rds.setdefault(perm_set, []) | ||
indexed_rds[perm_set].append(rd) | ||
|
||
self.stdout.write(' checking for duplicate role definitions') | ||
for perm_set, rd_list in indexed_rds.items(): | ||
if len(rd_list) > 1: | ||
self.stdout.write('Found duplicate role definitions with same permissions list:') | ||
for rd in rd_list: | ||
self.stdout.write(f' {rd}') | ||
|
||
self.stdout.write(f'Inspecting {ObjectRole.objects.count()} object roles') | ||
self.stdout.write(' checking for invalid permissions for model type') | ||
for role in ObjectRole.objects.prefetch_related('role_definition__permissions', 'content_type'): | ||
for permission in role.role_definition.permissions.all(): | ||
if permission.content_type_id != role.content_type_id: | ||
if permission.content_type.model_class() not in set( | ||
cls for filter_path, cls in permission_registry.get_child_models(role.content_type.model) | ||
): | ||
self.stdout.write(f'Object role {role} has permission {permission.codename} for an unlike content type {permission.content_type}') | ||
|
||
self.stdout.write(' checking for up-to-date role evaluations') | ||
for role in ObjectRole.objects.all(): | ||
to_delete, to_add = role.needed_cache_updates() | ||
if to_delete or to_add: | ||
self.stdout.write(f'Object role {role} does not have up-to-date role evaluations cached, this can happen if someone bypasses signals') | ||
|
||
self.stdout.write(' checking for missing content object') | ||
for role in ObjectRole.objects.all(): | ||
if not role.content_object: | ||
self.stdout.write(f'Object role {role} has been orphaned, indicating that post_delete signals are broken') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# Generated by Django 4.2.6 on 2023-11-20 20:48 | ||
|
||
from django.conf import settings | ||
from django.db import migrations, models | ||
import django.db.models.deletion | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('ansible_base', '0009_authenticator_users'), | ||
('contenttypes', '0002_remove_content_type_name'), | ||
migrations.swappable_dependency(settings.AUTH_USER_MODEL), | ||
migrations.swappable_dependency(settings.ROLE_TEAM_MODEL), | ||
migrations.swappable_dependency(settings.ROLE_PERMISSION_MODEL), | ||
('auth', '0012_alter_user_first_name_max_length'), | ||
] | ||
|
||
operations = [ | ||
migrations.CreateModel( | ||
name='RoleDefinition', | ||
fields=[ | ||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), | ||
('name', models.TextField(db_index=True, unique=True)), | ||
('description', models.TextField(null=True)), | ||
('managed', models.BooleanField(default=False)), | ||
('permissions', models.ManyToManyField(to=settings.ROLE_PERMISSION_MODEL)), | ||
], | ||
options={ | ||
'verbose_name_plural': 'role_definition', | ||
}, | ||
), | ||
migrations.CreateModel( | ||
name='ObjectRole', | ||
fields=[ | ||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), | ||
('object_id', models.PositiveIntegerField()), | ||
('content_type', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='contenttypes.contenttype')), | ||
('provides_teams', models.ManyToManyField(help_text='Users who have this role obtain member access to these teams, and inherit all their permissions', related_name='member_roles', to=settings.ROLE_TEAM_MODEL)), | ||
('role_definition', models.ForeignKey(help_text='The role definition which defines what permissions this object role grants', on_delete=django.db.models.deletion.CASCADE, to='ansible_base.roledefinition')), | ||
('teams', models.ManyToManyField(help_text='Teams or groups who have access to the permissions defined by this object role', related_name='has_roles', to=settings.ROLE_TEAM_MODEL)), | ||
('users', models.ManyToManyField(help_text='Users who have access to the permissions defined by this object role', related_name='has_roles', to=settings.AUTH_USER_MODEL)), | ||
], | ||
options={ | ||
'verbose_name_plural': 'object_roles', | ||
'ordering': ('content_type', 'object_id'), | ||
}, | ||
), | ||
migrations.CreateModel( | ||
name='RoleEvaluation', | ||
fields=[ | ||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), | ||
('codename', models.TextField(help_text='The name of the permission, giving the action and the model, from the Django Permission model')), | ||
('content_type_id', models.PositiveIntegerField()), | ||
('object_id', models.PositiveIntegerField()), | ||
( | ||
'role', | ||
models.ForeignKey( | ||
help_text='The object role that grants this form of permission', | ||
on_delete=django.db.models.deletion.CASCADE, | ||
related_name='permission_partials', | ||
to='ansible_base.objectrole', | ||
), | ||
), | ||
], | ||
options={ | ||
'verbose_name_plural': 'role_object_permissions', | ||
'indexes': [ | ||
models.Index(fields=['role', 'content_type_id', 'object_id'], name='ansible_bas_role_id_a7c9c2_idx'), | ||
models.Index(fields=['role', 'content_type_id', 'codename'], name='ansible_bas_role_id_e7da52_idx'), | ||
], | ||
}, | ||
), | ||
migrations.AddConstraint( | ||
model_name='roleevaluation', | ||
constraint=models.UniqueConstraint(fields=('object_id', 'content_type_id', 'codename', 'role'), name='one_entry_per_object_permission_and_role'), | ||
), | ||
migrations.AddIndex( | ||
model_name='objectrole', | ||
index=models.Index(fields=['content_type', 'object_id'], name='ansible_bas_content_0088d6_idx'), | ||
), | ||
migrations.AddConstraint( | ||
model_name='objectrole', | ||
constraint=models.UniqueConstraint(fields=('object_id', 'content_type', 'role_definition'), name='one_object_role_per_object_and_role'), | ||
), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from django.conf import settings | ||
|
||
from ansible_base.rbac.permission_registry import permission_registry | ||
|
||
import logging | ||
|
||
|
||
logger = logging.getLogger('ansible_base.migrations._managed_definitions') | ||
|
||
|
||
def get_or_create_managed(name, description, permissions, RoleDefinition): | ||
role_definition, created = RoleDefinition.objects.get_or_create(name=name, defaults={'managed': True, 'description': description}) | ||
role_definition.permissions.set(list(permissions)) | ||
|
||
if not role_definition.managed: | ||
role_definition.managed = True | ||
role_definition.save(update_fields=['managed']) | ||
|
||
if created: | ||
logger.info(f'Created RoleDefinition {role_definition.name} pk={role_definition} with {len(permissions)} permissions') | ||
|
||
return role_definition | ||
|
||
|
||
def setup_managed_role_definitions(apps, schema_editor): | ||
""" | ||
Idepotent method to create or sync the managed role definitions | ||
""" | ||
to_create = settings.GATEWAY_ROLE_PRECREATE | ||
|
||
ContentType = apps.get_model('contenttypes', 'ContentType') | ||
Permission = apps.get_model(settings.ROLE_PERMISSION_MODEL) | ||
RoleDefinition = apps.get_model('ansible_base', 'RoleDefinition') | ||
Organization = apps.get_model(settings.ROLE_ORGANIZATION_MODEL) | ||
managed_role_definitions = [] | ||
|
||
org_perms = set() | ||
for cls in permission_registry._registry: | ||
object_perms = set(Permission.objects.filter(content_type=ContentType.objects.get_for_model(cls))) | ||
# Special case for InstanceGroup which has an organiation field, but is not an organization child object | ||
if cls._meta.model_name != 'instancegroup': | ||
org_perms.update(object_perms) | ||
|
||
if 'object_admin' in to_create and cls != Organization: | ||
indiv_perms = object_perms.copy() | ||
add_perms = [perm for perm in indiv_perms if perm.codename.startswith('add_')] | ||
if add_perms: | ||
for perm in add_perms: | ||
indiv_perms.remove(perm) | ||
|
||
managed_role_definitions.append( | ||
get_or_create_managed( | ||
to_create['object_admin'].format(cls=cls), f'Has all permissions to a single {cls._meta.verbose_name}', indiv_perms, RoleDefinition | ||
) | ||
) | ||
|
||
if 'org_children' in to_create and cls != Organization: | ||
org_child_perms = object_perms.copy() | ||
org_child_perms.add(Permission.objects.get(codename='view_organization')) | ||
|
||
managed_role_definitions.append( | ||
get_or_create_managed( | ||
to_create['org_children'].format(cls=cls), | ||
f'Has all permissions to {cls._meta.verbose_name_plural} within an organization', | ||
org_child_perms, | ||
RoleDefinition, | ||
) | ||
) | ||
|
||
if 'special' in to_create: | ||
special_perms = [] | ||
for perm in object_perms: | ||
if perm.codename.split('_')[0] not in ('add', 'change', 'update', 'delete', 'view'): | ||
special_perms.append(perm) | ||
for perm in special_perms: | ||
action = perm.codename.split('_')[0] | ||
view_perm = Permission.objects.get(content_type=ContentType.objects.get_for_model(cls), codename__startswith='view_') | ||
managed_role_definitions.append( | ||
get_or_create_managed( | ||
to_create['special'].format(cls=cls, action=action), | ||
f'Has {action} permissions to a single {cls._meta.verbose_name}', | ||
[perm, view_perm], | ||
RoleDefinition, | ||
) | ||
) | ||
|
||
if 'org_admin' in to_create: | ||
managed_role_definitions.append( | ||
get_or_create_managed( | ||
to_create['org_admin'].format(cls=Organization), | ||
'Has all permissions to a single organization and all objects inside of it', | ||
org_perms, | ||
RoleDefinition, | ||
) | ||
) | ||
|
||
unexpected_role_definitions = RoleDefinition.objects.filter(managed=True).exclude(pk__in=[rd.pk for rd in managed_role_definitions]) | ||
for role_definition in unexpected_role_definitions: | ||
logger.info(f'Deleting old managed role definition {role_definition.name}, pk={role_definition.pk}') | ||
role_definition.delete() |
Oops, something went wrong.