Skip to content

Commit

Permalink
Convert DAB RBAC to using its own concrete permission model (#3)
Browse files Browse the repository at this point in the history
* Initial add of permission model

* Convert DAB RBAC to using its own concrete permission model

* Flake8 fixes

* black
  • Loading branch information
AlanCoding committed Mar 14, 2024
1 parent 8f5646a commit a71608f
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 158 deletions.
3 changes: 0 additions & 3 deletions ansible_base/lib/dynamic_config/dynamic_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,4 @@
ANSIBLE_BASE_BYPASS_SUPERUSER_FLAGS = ['is_superuser']
ANSIBLE_BASE_BYPASS_ACTION_FLAGS = {}

# Specify a custom permission model
ANSIBLE_BASE_PERMISSION_MODEL = 'auth.Permission'

ANSIBLE_BASE_CACHE_PARENT_PERMISSIONS = False
2 changes: 1 addition & 1 deletion ansible_base/rbac/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def to_representation(self, data):

class RoleDefinitionSerializer(CommonModelSerializer):
# Relational versions - we may switch to these if custom permission and type models are exposed but out of scope here
# permissions = serializers.SlugRelatedField(many=True, slug_field='codename', queryset=permission_registry.permission_model.objects.all())
# permissions = serializers.SlugRelatedField(many=True, slug_field='codename', queryset=DABPermission.objects.all())
# content_type = ContentTypeField(slug_field='model', queryset=permission_registry.content_type_model.objects.all(), allow_null=True, default=None)
permissions = ManyRelatedListField(child=PermissionField())
content_type = ContentTypeField(allow_null=True, default=None)
Expand Down
4 changes: 2 additions & 2 deletions ansible_base/rbac/evaluations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.conf import settings

from ansible_base.rbac import permission_registry
from ansible_base.rbac.models import RoleDefinition, get_evaluation_model
from ansible_base.rbac.models import DABPermission, RoleDefinition, get_evaluation_model
from ansible_base.rbac.validators import validate_codename_for_model

"""
Expand Down Expand Up @@ -41,7 +41,7 @@ def bound_singleton_permissions(self):
"Method attached to User model as singleton_permissions"
if not hasattr(self, '_singleton_permissions') or bound_singleton_permissions._team_clear_signal:
# values_list will make the return type set[str]
permission_qs = permission_registry.permission_model.objects.values_list('codename', flat=True)
permission_qs = DABPermission.objects.values_list('codename', flat=True)
self._singleton_permissions = RoleDefinition.user_global_permissions(self, permission_qs=permission_qs)
bound_singleton_permissions._team_clear_signal = False
return self._singleton_permissions
Expand Down
87 changes: 87 additions & 0 deletions ansible_base/rbac/management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging

from django.apps import apps as global_apps
from django.contrib.contenttypes.management import create_contenttypes
from django.db import DEFAULT_DB_ALIAS, router

from ansible_base.rbac import permission_registry

logger = logging.getLogger(__name__)


def create_dab_permissions(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, apps=global_apps, **kwargs):
"""
This is modified from the django auth.
This will create DABPermission entries
this will only create permissions for registered models
"""
if not app_config.models_module:
return

# Ensure that contenttypes are created for this app. Needed if
# 'ansible_base.rbac' is in INSTALLED_APPS before
# 'django.contrib.contenttypes'.
create_contenttypes(
app_config,
verbosity=verbosity,
interactive=interactive,
using=using,
apps=apps,
**kwargs,
)

app_label = app_config.label
try:
app_config = apps.get_app_config(app_label)
ContentType = apps.get_model("contenttypes", "ContentType")
Permission = apps.get_model("dab_rbac", "DABPermission")
except LookupError:
return

if not router.allow_migrate_model(using, Permission):
return

# This will hold the permissions we're looking for as (content_type, (codename, name))
searched_perms = []
# The codenames and ctypes that should exist.
ctypes = set()
for klass in app_config.get_models():
if not permission_registry.is_registered(klass):
continue
# Force looking up the content types in the current database
# before creating foreign keys to them.
ctype = ContentType.objects.db_manager(using).get_for_model(klass, for_concrete_model=False)

ctypes.add(ctype)

for action in klass._meta.default_permissions:
searched_perms.append(
(
ctype,
(
f"{action}_{klass._meta.model_name}",
f"Can {action} {klass._meta.verbose_name_raw}",
),
)
)
for codename, name in klass._meta.permissions:
searched_perms.append((ctype, (codename, name)))

# Find all the Permissions that have a content_type for a model we're
# looking for. We don't need to check for codenames since we already have
# a list of the ones we're going to create.
all_perms = set(Permission.objects.using(using).filter(content_type__in=ctypes).values_list("content_type", "codename"))

perms = []
for ct, (codename, name) in searched_perms:
if (ct.pk, codename) not in all_perms:
permission = Permission()
permission._state.db = using
permission.codename = codename
permission.name = name
permission.content_type = ct
perms.append(permission)

Permission.objects.using(using).bulk_create(perms)
for perm in perms:
logger.debug("Adding permission '%s'" % perm)
18 changes: 16 additions & 2 deletions ansible_base/rbac/migrations/0001_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,33 @@ class Migration(migrations.Migration):
('contenttypes', '0002_remove_content_type_name'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
migrations.swappable_dependency(settings.ANSIBLE_BASE_TEAM_MODEL),
migrations.swappable_dependency(settings.ANSIBLE_BASE_PERMISSION_MODEL),
('auth', '0012_alter_user_first_name_max_length'),
]

operations = [
migrations.CreateModel(
name='DABPermission',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=255, verbose_name='name')),
('codename', models.CharField(max_length=100, verbose_name='codename')),
('content_type', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='contenttypes.contenttype', verbose_name='content type')),
],
options={
'verbose_name': 'permission',
'verbose_name_plural': 'permissions',
'ordering': ['content_type__model', 'codename'],
'unique_together': {('content_type', 'codename')},
},
),
migrations.CreateModel(
name='RoleDefinition',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.TextField(db_index=True, unique=True)),
('description', models.TextField(blank=True)),
('managed', models.BooleanField(default=False, editable=False)),
('permissions', models.ManyToManyField(related_name='role_definitions', to=settings.ANSIBLE_BASE_PERMISSION_MODEL)),
('permissions', models.ManyToManyField(related_name='role_definitions', to='dab_rbac.DABPermission')),
('content_type', models.ForeignKey(
default=None,
help_text='Type of resource this can apply to, only used for validation and user assistance',
Expand Down
2 changes: 1 addition & 1 deletion ansible_base/rbac/migrations/_managed_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def setup_managed_role_definitions(apps, schema_editor):
to_create = settings.ANSIBLE_BASE_ROLE_PRECREATE

ContentType = apps.get_model('contenttypes', 'ContentType')
Permission = apps.get_model(settings.ANSIBLE_BASE_PERMISSION_MODEL)
Permission = apps.get_model('dab_rbac', 'DABPermission')
RoleDefinition = apps.get_model('dab_rbac', 'RoleDefinition')
Organization = apps.get_model(settings.ANSIBLE_BASE_ORGANIZATION_MODEL)
org_ct = ContentType.objects.get_for_model(Organization)
Expand Down
96 changes: 3 additions & 93 deletions ansible_base/rbac/migrations/_utils.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,7 @@
import logging
from django.db import models

from django.apps import apps as global_apps
from django.conf import settings
from django.contrib.contenttypes.management import create_contenttypes
from django.db import DEFAULT_DB_ALIAS, models, router

from ansible_base.rbac import permission_registry

logger = logging.getLogger("ansible_base.rbac.migrations._utils")


def create_custom_permissions(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, apps=global_apps, **kwargs):
"""
This is modified from the django auth.
Use this to create permissions in specified settings.ANSIBLE_BASE_PERMISSION_MODEL
this will only create permissions for registered models
"""
if not app_config.models_module:
return

# Ensure that contenttypes are created for this app. Needed if
# 'django.contrib.auth' is in INSTALLED_APPS before
# 'django.contrib.contenttypes'.
create_contenttypes(
app_config,
verbosity=verbosity,
interactive=interactive,
using=using,
apps=apps,
**kwargs,
)

app_label = app_config.label
try:
app_config = apps.get_app_config(app_label)
ContentType = apps.get_model("contenttypes", "ContentType")
Permission = apps.get_model(settings.ANSIBLE_BASE_PERMISSION_MODEL)
except LookupError:
return

if not router.allow_migrate_model(using, Permission):
return

# This will hold the permissions we're looking for as (content_type, (codename, name))
searched_perms = []
# The codenames and ctypes that should exist.
ctypes = set()
for klass in app_config.get_models():
if not permission_registry.is_registered(klass):
continue
# Force looking up the content types in the current database
# before creating foreign keys to them.
ctype = ContentType.objects.db_manager(using).get_for_model(
klass, for_concrete_model=False
)

ctypes.add(ctype)

for action in klass._meta.default_permissions:
searched_perms.append(
(
ctype,
(
f"{action}_{klass._meta.model_name}",
f"Can {action} {klass._meta.verbose_name_raw}",
),
)
)
for codename, name in klass._meta.permissions:
searched_perms.append((ctype, (codename, name)))

# Find all the Permissions that have a content_type for a model we're
# looking for. We don't need to check for codenames since we already have
# a list of the ones we're going to create.
all_perms = set(
Permission.objects.using(using)
.filter(content_type__in=ctypes)
.values_list("content_type", "codename")
)

perms = []
for ct, (codename, name) in searched_perms:
if (ct.pk, codename) not in all_perms:
permission = Permission()
permission._state.db = using
permission.codename = codename
permission.name = name
permission.content_type = ct
perms.append(permission)

Permission.objects.using(using).bulk_create(perms)
for perm in perms:
logger.debug("Adding permission '%s'" % perm)
# This method has moved, and this is put here temporarily to make branch management easier
from ansible_base.rbac.management import create_dab_permissions as create_custom_permissions # noqa


def give_permissions(apps, rd, users=(), teams=(), object_id=None, content_type_id=None):
Expand Down
23 changes: 20 additions & 3 deletions ansible_base/rbac/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@
logger = logging.getLogger('ansible_base.rbac.models')


class DABPermission(models.Model):
"This is a minimal copy of auth.Permission for internal use"

name = models.CharField("name", max_length=255)
content_type = models.ForeignKey(ContentType, models.CASCADE, verbose_name="content type")
codename = models.CharField("codename", max_length=100)

class Meta:
verbose_name = "permission"
verbose_name_plural = "permissions"
unique_together = [["content_type", "codename"]]
ordering = ["content_type__model", "codename"]

def __str__(self):
return f"<{self.__class__.__name__}: {self.codename}>"


class RoleDefinitionManager(models.Manager):
def give_creator_permissions(self, user, obj):
# If the user is a superuser, no need to bother giving the creator permissions
Expand All @@ -42,7 +59,7 @@ def give_creator_permissions(self, user, obj):
cts = ContentType.objects.get_for_models(*model_and_children).values()

needed_perms = set()
for perm in permission_registry.permission_model.objects.filter(content_type__in=cts).prefetch_related('content_type'):
for perm in DABPermission.objects.filter(content_type__in=cts).prefetch_related('content_type'):
action = perm.codename.split('_', 1)[0]
if action in needed_actions:
# do not save add permission on the object level, which does not make sense
Expand Down Expand Up @@ -103,7 +120,7 @@ class Meta:
name = models.TextField(db_index=True, unique=True)
description = models.TextField(blank=True)
managed = models.BooleanField(default=False, editable=False) # pulp definition of Role uses locked
permissions = models.ManyToManyField(settings.ANSIBLE_BASE_PERMISSION_MODEL, related_name='role_definitions')
permissions = models.ManyToManyField('dab_rbac.DABPermission', related_name='role_definitions')
content_type = models.ForeignKey(
ContentType,
help_text=_('Type of resource this can apply to, only used for validation and user assistance'),
Expand Down Expand Up @@ -228,7 +245,7 @@ def user_global_permissions(cls, user, permission_qs=None):
if permission_qs is None:
# Allowing caller to replace the base permission set allows changing the type of thing returned
# this is used in the assignment querysets, but these cases must call the method directly
permission_qs = permission_registry.permission_model.objects.all()
permission_qs = DABPermission.objects.all()

perm_set = set()
if settings.ANSIBLE_BASE_ALLOW_SINGLETON_USER_ROLES:
Expand Down
27 changes: 20 additions & 7 deletions ansible_base/rbac/permission_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,25 @@ def get_child_models(self, parent_model, seen=None):
def call_when_apps_ready(self, apps, app_config):
from ansible_base.rbac import triggers
from ansible_base.rbac.evaluations import bound_has_obj_perm, bound_singleton_permissions, connect_rbac_methods
from ansible_base.rbac.management import create_dab_permissions

self.apps = apps
self.apps_ready = True

if self.team_model not in self._registry:
self._registry.add(self.team_model)

post_migrate.connect(triggers.post_migration_rbac_setup, sender=app_config)
# Do no specify sender for create_dab_permissions, because that is passed as app_config
# and we want to create permissions for external apps, not the dab_rbac app
post_migrate.connect(
create_dab_permissions,
dispatch_uid="ansible_base.rbac.management.create_dab_permissions",
)
post_migrate.connect(
triggers.post_migration_rbac_setup,
sender=app_config,
dispatch_uid="ansible_base.rbac.triggers.post_migration_rbac_setup",
)

self.user_model.add_to_class('has_obj_perm', bound_has_obj_perm)
self.user_model.add_to_class('singleton_permissions', bound_singleton_permissions)
Expand Down Expand Up @@ -139,15 +150,17 @@ def org_ct_id(self):
team_parent_model = self.get_parent_model(self.team_model)
return self.content_type_model.objects.get_for_model(team_parent_model).id

@property
def permission_model(self):
return self.apps.get_model(settings.ANSIBLE_BASE_PERMISSION_MODEL)

@property
def permission_qs(self):
"Return a queryset of the permission model restricted to the RBAC-tracked models"
"""Return a queryset of the permission model restricted to the RBAC-tracked models
Note that this should not be necessary, since the post_migrate signal for DABPermission
will only create entries for registered models.
However, removing permission entries after a model definition changes is still unsolved
and this is already problematic for auth.Permission.
"""
all_cts = self.content_type_model.objects.get_for_models(*self.all_registered_models)
return self.permission_model.objects.filter(content_type__in=all_cts.values())
return self.apps.get_model('dab_rbac.DABPermission').objects.filter(content_type__in=all_cts.values())

@property
def team_permission(self):
Expand Down
11 changes: 3 additions & 8 deletions ansible_base/rbac/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,19 +241,14 @@ def rbac_post_user_delete(instance, *args, **kwargs):


def post_migration_rbac_setup(*args, **kwargs):
"""
Return if running django or py.test unit tests.
Logic is taken from AWX is_testing, it could be cut down on
"""
if not settings.ANSIBLE_BASE_ROLE_PRECREATE:
return

try:
RoleDefinition.objects.first()
except ProgrammingError:
return # this happens when migrating backwards, tables do not exist at prior states

setup_managed_role_definitions(apps, None)
if settings.ANSIBLE_BASE_ROLE_PRECREATE:
setup_managed_role_definitions(apps, None)

compute_team_member_roles()
compute_object_role_permissions()

Expand Down
Loading

0 comments on commit a71608f

Please sign in to comment.