diff --git a/.vscode/settings.json b/.vscode/settings.json index 4687599b9..f4efc816a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,7 +8,8 @@ // "-v", // "--cov", // "--cov-report xml", - "app" + "-s", + "app", ], "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, diff --git a/app/access/mixins/organization.py b/app/access/mixins/organization.py new file mode 100644 index 000000000..a0c23f96d --- /dev/null +++ b/app/access/mixins/organization.py @@ -0,0 +1,370 @@ +from django.contrib.auth.models import User, Group +from access.models import Organization, Team + + +class OrganizationMixin: + """Organization Tenancy Mixin + + This class is intended to be included in **ALL** View / Viewset classes as + it contains the functions/methods required to conduct the permission + checking. + """ + + + _obj_organization: int = None + """Cached Object Organization""" + + def get_obj_organization(self, obj = None, request = None) -> Organization: + """Fetch the objects Organization + + Args: + obj (Model): Model of object + + Raises: + ValueError: When `obj` and `request` are both missing + + Returns: + Organization: Organization the object is from + None: No Organization was found + """ + + if obj is None and request is None: + + raise ValueError('Missing Parameter. obj or request must be supplied') + + + if self._obj_organization: + + return self._obj_organization + + + _obj_organization: Organization = None + + + if obj: + + _obj_organization = getattr(obj, 'organization', None) + + + if not _obj_organization: + + _obj_organization = getattr(obj, 'get_organization', lambda: None)() + + elif request: + + if getattr(request.stream, 'method', '') != 'DELETE': + + data = getattr(request, 'data', None) + + if data: + + data_organization = self.kwargs.get('organization_id', None) + + if not data_organization: + + data_organization = request.data.get('organization_id', None) + + + if not data_organization: + + data_organization = request.data.get('organization', None) + + + if data_organization: + + _obj_organization = Organization.objects.get( + pk = int( data_organization ) + ) + + + if self.get_parent_model(): # if defined is to overwrite object organization + + parent_obj = self.get_parent_obj() + + _obj_organization = parent_obj.get_organization() + + + + if _obj_organization: + + self._obj_organization = _obj_organization + + return self._obj_organization + + + + def get_parent_model(self): + """Get the Parent Model + + This function exists so that dynamic parent models can be defined. + They are defined by overriding this method. + + Returns: + Model: Parent Model + """ + + return self.parent_model + + + + def get_parent_obj(self): + """ Get the Parent Model Object + + Use in views where the the model has no organization and the organization should be fetched from the parent model. + + Requires attribute `parent_model` within the view with the value of the parent's model class + + Returns: + parent_model (Model): with PK from kwargs['pk'] + """ + + return self.parent_model.objects.get(pk=self.kwargs[self.parent_model_pk_kwarg]) + + + + def get_permission_organizations(self, permission: str ) -> list([ int ]): + """Return Organization(s) the permission belongs to + + Searches the users organizations for the required permission, if found + the organization is added to the list to return. + + Args: + permission (str): Permission to search users organizations for + + Returns: + Organizations (list): All Organizations where the permission was found. + """ + + _permission_organizations: list = [] + + for team in self.get_user_teams( self.request.user ): + + for team_permission in team.permissions.all(): + + permission_value = str( team_permission.content_type.app_label + '.' + team_permission.codename ) + + if permission_value == permission: + + _permission_organizations += [ team.organization.id ] + + + return _permission_organizations + + + + def get_permission_required(self) -> str: + """ Get / Generate Permission Required + + If there is a requirement that there be custom/dynamic permissions, + this function can be safely overridden. + + Raises: + ValueError: Unable to determin the view action + + Returns: + str: Permission in format `._` + """ + + + view_action: str = None + + if( + self.action == 'create' + or getattr(self.request._stream, 'method', '') == 'POST' + ): + + view_action = 'add' + + elif ( + self.action == 'partial_update' + or self.action == 'update' + or getattr(self.request._stream, 'method', '') == 'PATCH' + or getattr(self.request._stream, 'method', '') == 'PUT' + ): + + view_action = 'change' + + elif( + self.action == 'destroy' + or getattr(self.request._stream, 'method', '') == 'DELETE' + ): + + view_action = 'delete' + + elif ( + self.action == 'list' + ): + + view_action = 'view' + + elif self.action == 'retrieve': + + view_action = 'view' + + + + if view_action is None: + + raise ValueError('view_action could not be defined.') + + + permission = self.model._meta.app_label + '.' + view_action + '_' + self.model._meta.model_name + + permission_required = permission + + + self.permission_required = permission_required + + return self.permission_required + + + + parent_model: str = None + """ Parent Model + + This attribute defines the parent model for the model in question. The parent model when defined + will be used as the object to obtain the permissions from. + """ + + + parent_model_pk_kwarg: str = 'pk' + """Parent Model kwarg + + This value is used to define the kwarg that is used as the parent objects primary key (pk). + """ + + + _user_organizations: list = None + """Cached User Organizations""" + + + _user_teams: list = None + """Cached User Teams""" + + + _user_permissions: list = None + """Cached User User Permissions""" + + + def get_user_organizations(self, user: User) -> list([int]): + """Get the Organization the user is a part of + + Args: + user (User): User Making the request + + Returns: + list(int()): List containing the organizations the user is a part of. + """ + + if self._user_organizations and self._user_teams and self._user_permissions: + + return self._user_organizations + + + teams = Team.objects.all() + + _user_organizations: list([ int ]) = [] + + _user_teams: list([ Team ]) = [] + + _user_permissions: list([ str ]) = [] + + for group in user.groups.all(): + + team = teams.get(pk=group.id) + + if team not in _user_teams: + + _user_teams += [ team ] + + for permission in team.permissions.all(): + + permission_value = str( permission.content_type.app_label + '.' + permission.codename ) + + if permission_value not in _user_permissions: + + _user_permissions += [ permission_value ] + + + if team.organization.id not in _user_organizations: + + _user_organizations += [ team.organization.id ] + + + if len(_user_organizations) > 0: + + self._user_organizations = _user_organizations + + if len(_user_teams) > 0: + + self._user_teams = _user_teams + + if len(_user_permissions) > 0: + + self._user_permissions = _user_permissions + + + return self._user_organizations + + + + def get_user_teams(self, user: User) -> list([ Team ]): + + if not self._user_teams: + + self.get_user_organizations( user = user ) + + return self._user_teams + + + + def has_organization_permission(self, organization: int, permissions_required: list) -> bool: + """ Check if user has permission within organization. + + Args: + organization (int): Organization to check. + permissions_required (list): if doing object level permissions, pass in required permission. + + Returns: + bool: True for yes. + """ + + has_permission: bool = False + + for team in self.get_user_teams( user = self.request.user ): + + if team.organization.id == int(organization): + + for permission in team.permissions.all(): + + assembled_permission = str(permission.content_type.app_label) + '.' + str( permission.codename ) + + if assembled_permission in permissions_required: + + has_permission = True + + + return has_permission + + + + def is_member(self, organization: int) -> bool: + """Returns true if the current user is a member of the organization + + iterates over the user_organizations list and returns true if the user is a member + + Returns: + bool: _description_ + """ + + is_member: bool = False + + if organization is None: + + return False + + if int(organization) in self.get_user_organizations(self.request.user): + + is_member = True + + return is_member diff --git a/app/access/mixins/permissions.py b/app/access/mixins/permissions.py new file mode 100644 index 000000000..b6d7630b5 --- /dev/null +++ b/app/access/mixins/permissions.py @@ -0,0 +1,226 @@ +import traceback + +from django.core.exceptions import ObjectDoesNotExist + +from rest_framework import exceptions +from rest_framework.permissions import DjangoObjectPermissions + +from access.models import TenancyObject + +from core import exceptions as centurion_exceptions + + + +class OrganizationPermissionMixin( + DjangoObjectPermissions, +): + """Organization Permission Mixin + + This class is to be used as the permission class for API `Views`/`ViewSets`. + In combination with the `OrganizationPermissionsMixin`, permission checking + will be done to ensure the user has the correct permissions to perform the + CRUD operation. + + **Note:** If the user is not authenticated, they will be denied access + globally. + + Permissions are broken down into two areas: + + - `Tenancy` Objects + + This object requires that the user have the correct permission and that + permission be assigned within the organiztion the object belongs to. + + - `Non-Tenancy` Objects. + + This object requires the the use have the correct permission assigned, + regardless of the organization the object is from. This includes objects + that have no organization. + + """ + + _is_tenancy_model: bool = None + + def is_tenancy_model(self, view) -> bool: + """Determin if the Model is a `Tenancy` Model + + Will look at the model defined within the view unless a parent + model is found. If the latter is true, the parent_model will be used to + determin if the model is a `Tenancy` model + + Args: + view (object): The View the HTTP request was mad to + + Returns: + True (bool): Model is a Tenancy Model. + False (bool): Model is not a Tenancy model. + """ + + if not self._is_tenancy_model: + + if hasattr(view, 'model'): + + self._is_tenancy_model = issubclass(view.model, TenancyObject) + + if view.get_parent_model(): + + self._is_tenancy_model = issubclass(view.get_parent_model(), TenancyObject) + + return self._is_tenancy_model + + + + def has_permission(self, request, view): + """ Check if user has the required permission + + Args: + request (object): The HTTP Request Object + view (_type_): The View/Viewset Object the request was made to + + Raises: + ValueError: Could not determin the view action. + + Returns: + True (bool): User has the required permission. + False (bool): User does not have the required permission + """ + + if request.user.is_anonymous: + + return False + + try: + + + view.get_user_organizations( request.user ) + + obj_organization: Organization = view.get_obj_organization( + request = request + ) + + view_action: str = None + + if( + view.action == 'create' + or getattr(view.request._stream, 'method', '') == 'POST' + ): + + view_action = 'add' + + elif ( + view.action == 'partial_update' + or view.action == 'update' + or getattr(view.request._stream, 'method', '') == 'PATCH' + or getattr(view.request._stream, 'method', '') == 'PUT' + ): + + view_action = 'change' + + obj_organization: Organization = view.get_obj_organization( + obj = view.get_object() + ) + + elif( + view.action == 'destroy' + or getattr(view.request._stream, 'method', '') == 'DELETE' + ): + + view_action = 'delete' + + obj_organization: Organization = view.get_obj_organization( + obj = view.get_object() + ) + + elif ( + view.action == 'list' + ): + + view_action = 'view' + + elif view.action == 'retrieve': + + view_action = 'view' + + obj_organization: Organization = view.get_obj_organization( + obj = view.get_object() + ) + + elif view.action == 'metadata': + + return True + + + if view_action is None: + + raise ValueError('view_action could not be defined.') + + + has_permission_required: bool = False + + if getattr(view, '_user_permissions', []): + + has_permission_required = view.get_permission_required() in getattr(view, '_user_permissions', []) + + + + if has_permission_required is True: + + if obj_organization is None: + + return True + + elif obj_organization is not None: + + if view.has_organization_permission( + organization = obj_organization.id, + permissions_required = [ view.get_permission_required() ] + ): + + return True + + + except ValueError: + + pass + + except Exception as e: + + print(traceback.format_exc()) + + + return False + + + + def has_object_permission(self, request, view, obj): + + try: + + if request.user.is_anonymous: + + return False + + + object_organization: int = getattr(view.get_obj_organization( obj = obj ), 'id', None) + + + if object_organization: + + if( + object_organization + in view.get_permission_organizations( view.get_permission_required() ) + ): + + return True + + + elif not self.is_tenancy_model( view ): + + return True + + + except Exception as e: + + print(traceback.format_exc()) + + return False diff --git a/app/access/tests/functional/organization/test_organization_viewset.py b/app/access/tests/functional/organization/test_organization_viewset.py index e0d433f51..f60f5bb56 100644 --- a/app/access/tests/functional/organization/test_organization_viewset.py +++ b/app/access/tests/functional/organization/test_organization_viewset.py @@ -217,7 +217,7 @@ def test_add_has_permission(self): url = reverse( self.app_namespace + ':' + self.url_name + '-list' ) - client.force_login( self.super_add_user ) + client.force_login( self.add_user ) response = client.post( url, data = self.add_data ) @@ -271,6 +271,17 @@ def test_returned_results_only_user_orgs(self): assert len(response.data['results']) == 2 + def test_add_different_organization_denied(self): + """ Check correct permission for add + + This test is a duplicate of a test case with the same name. + Organizations are not tenancy models so this test does nothing of value + + attempt to add as user from different organization + """ + + pass + class OrganizationViewSet( ViewSetBase, diff --git a/app/access/viewsets/team_user.py b/app/access/viewsets/team_user.py index 7cc9c6a52..f9a8dbc2e 100644 --- a/app/access/viewsets/team_user.py +++ b/app/access/viewsets/team_user.py @@ -145,6 +145,10 @@ class ViewSet( ModelViewSet ): model = TeamUsers + parent_model = Team + + parent_model_pk_kwarg = 'team_id' + documentation: str = '' view_description = 'Users belonging to a single team' diff --git a/app/api/tests/abstract/viewsets.py b/app/api/tests/abstract/viewsets.py index 48d4bf9ef..905c9d12a 100644 --- a/app/api/tests/abstract/viewsets.py +++ b/app/api/tests/abstract/viewsets.py @@ -1,5 +1,6 @@ +from access.mixins.permissions import OrganizationPermissionMixin + from api.react_ui_metadata import ReactUIMetadata -from api.views.mixin import OrganizationPermissionAPI @@ -145,7 +146,7 @@ def test_view_attr_permission_classes_value(self): view_set = self.viewset() - assert view_set.permission_classes[0] is OrganizationPermissionAPI + assert view_set.permission_classes[0] is OrganizationPermissionMixin assert len(view_set.permission_classes) == 1 diff --git a/app/api/viewsets/common.py b/app/api/viewsets/common.py index 793bc08b6..a8ace3a69 100644 --- a/app/api/viewsets/common.py +++ b/app/api/viewsets/common.py @@ -5,11 +5,279 @@ from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response -from access.mixin import OrganizationMixin +from access.mixins.organization import OrganizationMixin +from access.mixins.permissions import OrganizationPermissionMixin from api.auth import TokenScheme from api.react_ui_metadata import ReactUIMetadata -from api.views.mixin import OrganizationPermissionAPI + + + +class Create( + viewsets.mixins.CreateModelMixin +): + + + def create(self, request, *args, **kwargs): + """Sainty override + + This function overrides the function of the same name + in the parent class for the purpose of ensuring a + non-api exception will not have the API return a HTTP + 500 error. + + This function is a sanity check that if it triggers, + (an exception occured), the user will be presented with + a stack trace that they will hopefully report as a bug. + + HTTP status set to HTTP/501 so it's distinguishable from + a HTTP/500 which is generally a random error that has not + been planned for. i.e. uncaught exception + """ + + response = None + + try: + + response = super().create(request = request, *args, **kwargs) + + except Exception as e: + + if not isinstance(e, APIException): + + response = Response( + data = { + 'server_error': str(e) + }, + status = 501 + ) + + return response + + + +class Destroy( + viewsets.mixins.DestroyModelMixin +): + + + def destroy(self, request, *args, **kwargs): + """Sainty override + + This function overrides the function of the same name + in the parent class for the purpose of ensuring a + non-api exception will not have the API return a HTTP + 500 error. + + This function is a sanity check that if it triggers, + (an exception occured), the user will be presented with + a stack trace that they will hopefully report as a bug. + + HTTP status set to HTTP/501 so it's distinguishable from + a HTTP/500 which is generally a random error that has not + been planned for. i.e. uncaught exception + """ + + response = None + + try: + + response = super().destroy(request = request, *args, **kwargs) + + except Exception as e: + + if not isinstance(e, APIException): + + response = Response( + data = { + 'server_error': str(e) + }, + status = 501 + ) + + return response + + + + +class List( + viewsets.mixins.ListModelMixin +): + + + def list(self, request, *args, **kwargs): + """Sainty override + + This function overrides the function of the same name + in the parent class for the purpose of ensuring a + non-api exception will not have the API return a HTTP + 500 error. + + This function is a sanity check that if it triggers, + (an exception occured), the user will be presented with + a stack trace that they will hopefully report as a bug. + + HTTP status set to HTTP/501 so it's distinguishable from + a HTTP/500 which is generally a random error that has not + been planned for. i.e. uncaught exception + """ + + response = None + + try: + + response = super().list(request = request, *args, **kwargs) + + except Exception as e: + + if not isinstance(e, APIException): + + response = Response( + data = { + 'server_error': str(e) + }, + status = 501 + ) + + return response + + +# class PartialUpdate: + + + + +class Retrieve( + viewsets.mixins.RetrieveModelMixin +): + + + def retrieve(self, request, *args, **kwargs): + """Sainty override + + This function overrides the function of the same name + in the parent class for the purpose of ensuring a + non-api exception will not have the API return a HTTP + 500 error. + + This function is a sanity check that if it triggers, + (an exception occured), the user will be presented with + a stack trace that they will hopefully report as a bug. + + HTTP status set to HTTP/501 so it's distinguishable from + a HTTP/500 which is generally a random error that has not + been planned for. i.e. uncaught exception + """ + + response = None + + try: + + response = super().retrieve(request = request, *args, **kwargs) + + except Exception as e: + + if not isinstance(e, APIException): + + response = Response( + data = { + 'server_error': str(e) + }, + status = 501 + ) + + else: + + ex = e.get_full_details() + + response = Response( + data = { + 'message': ex['message'] + }, + status = e.status_code + ) + + return response + + + +class Update( + viewsets.mixins.UpdateModelMixin +): + + + def partial_update(self, request, *args, **kwargs): + """Sainty override + + This function overrides the function of the same name + in the parent class for the purpose of ensuring a + non-api exception will not have the API return a HTTP + 500 error. + + This function is a sanity check that if it triggers, + (an exception occured), the user will be presented with + a stack trace that they will hopefully report as a bug. + + HTTP status set to HTTP/501 so it's distinguishable from + a HTTP/500 which is generally a random error that has not + been planned for. i.e. uncaught exception + """ + + response = None + + try: + + response = super().partial_update(request = request, *args, **kwargs) + + except Exception as e: + + if not isinstance(e, APIException): + + response = Response( + data = { + 'server_error': str(e) + }, + status = 501 + ) + + return response + + + def update(self, request, *args, **kwargs): + """Sainty override + + This function overrides the function of the same name + in the parent class for the purpose of ensuring a + non-api exception will not have the API return a HTTP + 500 error. + + This function is a sanity check that if it triggers, + (an exception occured), the user will be presented with + a stack trace that they will hopefully report as a bug. + + HTTP status set to HTTP/501 so it's distinguishable from + a HTTP/500 which is generally a random error that has not + been planned for. i.e. uncaught exception + """ + + response = None + + try: + + response = super().update(request = request, *args, **kwargs) + + except Exception as e: + + if not isinstance(e, APIException): + + response = Response( + data = { + 'server_error': str(e) + }, + status = 501 + ) + + return response @@ -65,7 +333,7 @@ def allowed_methods(self): for detail view, Enables the UI can setup the page layout. """ - permission_classes = [ OrganizationPermissionAPI ] + permission_classes = [ OrganizationPermissionMixin ] """Permission Class _Mandatory_, Permission check class @@ -268,372 +536,58 @@ def get_serializer_class(self): class ModelViewSet( ModelViewSetBase, + Retrieve, viewsets.ModelViewSet, ): - def retrieve(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().retrieve(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response + pass class ModelCreateViewSet( ModelViewSetBase, - viewsets.mixins.CreateModelMixin, + Create, + viewsets.GenericViewSet, ): - def create(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().create(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response + pass class ModelListRetrieveDeleteViewSet( - viewsets.mixins.ListModelMixin, - viewsets.mixins.RetrieveModelMixin, - viewsets.mixins.DestroyModelMixin, + ModelViewSetBase, + List, + Retrieve, + Destroy, viewsets.GenericViewSet, - ModelViewSetBase ): """ Use for models that you wish to delete and view ONLY!""" - def list(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().list(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response - - - def retrieve(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().retrieve(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response - - - def destroy(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().destroy(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response + pass class ModelRetrieveUpdateViewSet( - viewsets.mixins.RetrieveModelMixin, - viewsets.mixins.UpdateModelMixin, + ModelViewSetBase, + Retrieve, + Update, viewsets.GenericViewSet, - ModelViewSetBase ): """ Use for models that you wish to update and view ONLY!""" - - def partial_update(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().partial_update(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response - - - def update(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().update(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response + pass class ReadOnlyModelViewSet( - viewsets.ReadOnlyModelViewSet, - ModelViewSetBase + ModelViewSetBase, + Retrieve, + List, + viewsets.GenericViewSet, ): - def retrieve(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().retrieve(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response - - def list(self, request, *args, **kwargs): - """Sainty override - - This function overrides the function of the same name - in the parent class for the purpose of ensuring a - non-api exception will not have the API return a HTTP - 500 error. - - This function is a sanity check that if it triggers, - (an exception occured), the user will be presented with - a stack trace that they will hopefully report as a bug. - - HTTP status set to HTTP/501 so it's distinguishable from - a HTTP/500 which is generally a random error that has not - been planned for. i.e. uncaught exception - """ - - response = None - - try: - - response = super().list(request = request, *args, **kwargs) - - except Exception as e: - - if not isinstance(e, APIException): - - response = Response( - data = { - 'server_error': str(e) - }, - status = 501 - ) - - return response + pass diff --git a/app/core/tests/functional/test_history/test_history_viewset.py b/app/core/tests/functional/test_history/test_history_viewset.py index fce8c102f..4522f54d9 100644 --- a/app/core/tests/functional/test_history/test_history_viewset.py +++ b/app/core/tests/functional/test_history/test_history_viewset.py @@ -263,7 +263,7 @@ def test_view_has_permission(self): client.force_login(self.view_user) response = client.get(url) - assert response.status_code == 403 + assert response.status_code == 200 def test_add_has_permission_method_not_allowed(self): @@ -340,6 +340,22 @@ def test_returned_results_only_user_orgs(self): pass + # item is not tenancy object + def test_view_different_organizaiton_denied(self): + """ Check correct permission for view + + This test case is a duplicate of a test case with the same name. This + test is not required as currently the history model is not a tenancy + model. + + see https://github.com/nofusscomputing/centurion_erp/issues/455 for + more details. + + Attempt to view with user from different organization + """ + + pass + class HistoryMetadata( ViewSetBase, diff --git a/app/core/viewsets/history.py b/app/core/viewsets/history.py index 27fabeda1..e99616709 100644 --- a/app/core/viewsets/history.py +++ b/app/core/viewsets/history.py @@ -2,7 +2,7 @@ from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiResponse -from api.viewsets.common import ModelViewSet +from api.viewsets.common import ReadOnlyModelViewSet from core.serializers.history import ( History, @@ -27,7 +27,7 @@ update = extend_schema( exclude = True ), partial_update = extend_schema( exclude = True ) ) -class ViewSet(ModelViewSet): +class ViewSet(ReadOnlyModelViewSet): allowed_methods = [ 'GET', diff --git a/app/core/viewsets/related_ticket.py b/app/core/viewsets/related_ticket.py index f4149b28b..3e683bdef 100644 --- a/app/core/viewsets/related_ticket.py +++ b/app/core/viewsets/related_ticket.py @@ -7,6 +7,7 @@ from api.viewsets.common import ModelListRetrieveDeleteViewSet from core.serializers.ticket_related import ( + Ticket, RelatedTickets, RelatedTicketModelSerializer, RelatedTicketViewSerializer, @@ -79,6 +80,10 @@ class ViewSet(ModelListRetrieveDeleteViewSet): model = RelatedTickets + parent_model = Ticket + + parent_model_pk_kwarg = 'ticket_id' + def get_serializer_class(self): diff --git a/app/core/viewsets/ticket.py b/app/core/viewsets/ticket.py index 99ad18ad5..f8469c1b8 100644 --- a/app/core/viewsets/ticket.py +++ b/app/core/viewsets/ticket.py @@ -86,7 +86,7 @@ class TicketViewSet(ModelViewSet): """ - def get_dynamic_permissions(self): + def get_permission_required(self): organization = None @@ -178,11 +178,12 @@ def get_dynamic_permissions(self): raise ValueError('unable to determin the action_keyword') - self.permission_required = [ - str('core.' + action_keyword + '_ticket_' + self._ticket_type).lower().replace(' ', '_'), - ] + self.permission_required = str( + 'core.' + action_keyword + '_ticket_' + self._ticket_type).lower().replace(' ', '_' + ) + + return self.permission_required - return super().get_permission_required() def get_queryset(self): diff --git a/app/core/viewsets/ticket_linked_item.py b/app/core/viewsets/ticket_linked_item.py index 1bac747f6..9183bb46e 100644 --- a/app/core/viewsets/ticket_linked_item.py +++ b/app/core/viewsets/ticket_linked_item.py @@ -127,86 +127,103 @@ class ViewSet(ModelViewSet): model = TicketLinkedItem - def get_serializer_class(self): - if ( - self.action == 'list' - or self.action == 'retrieve' - ): + def get_parent_model(self): - return globals()[str( self.model._meta.verbose_name).replace(' ', '') + 'ViewSerializer'] + if not self.parent_model: + if 'ticket_id' in self.kwargs: - return globals()[str( self.model._meta.verbose_name).replace(' ', '') + 'ModelSerializer'] + self.parent_model = Ticket + self.parent_model_pk_kwarg = 'ticket_id' - def get_queryset(self): + elif 'item_id' in self.kwargs: - if 'ticket_id' in self.kwargs: + item_type: int = None - self.queryset = TicketLinkedItem.objects.filter(ticket=self.kwargs['ticket_id']).order_by('id') + self.parent_model_pk_kwarg = 'item_id' - self.parent_model = Ticket + for choice in list(map(lambda c: c.name, TicketLinkedItem.Modules)): - self.parent_model_pk_kwarg = 'ticket_id' + if str(getattr(TicketLinkedItem.Modules, 'CLUSTER').label).lower() == self.kwargs['item_class']: - elif 'item_id' in self.kwargs: + item_type = getattr(TicketLinkedItem.Modules, 'CLUSTER').value + + self.parent_model = Cluster + + elif str(getattr(TicketLinkedItem.Modules, 'CONFIG_GROUP').label).lower().replace(' ', '_') == self.kwargs['item_class']: + + item_type = getattr(TicketLinkedItem.Modules, 'CONFIG_GROUP').value + + self.parent_model = ConfigGroups + + elif str(getattr(TicketLinkedItem.Modules, 'DEVICE').label).lower() == self.kwargs['item_class']: + + item_type = getattr(TicketLinkedItem.Modules, 'DEVICE').value + + self.parent_model = Device - item_type: int = None + elif str(getattr(TicketLinkedItem.Modules, 'KB').label).lower().replace(' ', '_') == self.kwargs['item_class']: - self.parent_model_pk_kwarg = 'item_id' + item_type = getattr(TicketLinkedItem.Modules, 'KB').value - for choice in list(map(lambda c: c.name, TicketLinkedItem.Modules)): + self.parent_model = KnowledgeBase - if str(getattr(TicketLinkedItem.Modules, 'CLUSTER').label).lower() == self.kwargs['item_class']: + elif str(getattr(TicketLinkedItem.Modules, 'OPERATING_SYSTEM').label).lower().replace(' ', '_') == self.kwargs['item_class']: - item_type = getattr(TicketLinkedItem.Modules, 'CLUSTER').value + item_type = getattr(TicketLinkedItem.Modules, 'OPERATING_SYSTEM').value - self.parent_model = Cluster + self.parent_model = OperatingSystem - elif str(getattr(TicketLinkedItem.Modules, 'CONFIG_GROUP').label).lower().replace(' ', '_') == self.kwargs['item_class']: + elif str(getattr(TicketLinkedItem.Modules, 'SERVICE').label).lower() == self.kwargs['item_class']: - item_type = getattr(TicketLinkedItem.Modules, 'CONFIG_GROUP').value + item_type = getattr(TicketLinkedItem.Modules, 'SERVICE').value - self.parent_model = ConfigGroups + self.parent_model = Service - elif str(getattr(TicketLinkedItem.Modules, 'DEVICE').label).lower() == self.kwargs['item_class']: + elif str(getattr(TicketLinkedItem.Modules, 'SOFTWARE').label).lower() == self.kwargs['item_class']: - item_type = getattr(TicketLinkedItem.Modules, 'DEVICE').value + item_type = getattr(TicketLinkedItem.Modules, 'SOFTWARE').value - self.parent_model = Device + self.parent_model = Software - elif str(getattr(TicketLinkedItem.Modules, 'KB').label).lower().replace(' ', '_') == self.kwargs['item_class']: - item_type = getattr(TicketLinkedItem.Modules, 'KB').value + self.item_type = item_type - self.parent_model = KnowledgeBase - elif str(getattr(TicketLinkedItem.Modules, 'OPERATING_SYSTEM').label).lower().replace(' ', '_') == self.kwargs['item_class']: + return self.parent_model - item_type = getattr(TicketLinkedItem.Modules, 'OPERATING_SYSTEM').value - self.parent_model = OperatingSystem - elif str(getattr(TicketLinkedItem.Modules, 'SERVICE').label).lower() == self.kwargs['item_class']: + def get_serializer_class(self): + + if ( + self.action == 'list' + or self.action == 'retrieve' + ): + + return globals()[str( self.model._meta.verbose_name).replace(' ', '') + 'ViewSerializer'] + - item_type = getattr(TicketLinkedItem.Modules, 'SERVICE').value + return globals()[str( self.model._meta.verbose_name).replace(' ', '') + 'ModelSerializer'] - self.parent_model = Service - elif str(getattr(TicketLinkedItem.Modules, 'SOFTWARE').label).lower() == self.kwargs['item_class']: - item_type = getattr(TicketLinkedItem.Modules, 'SOFTWARE').value + def get_queryset(self): + + if 'ticket_id' in self.kwargs: + + self.queryset = TicketLinkedItem.objects.filter(ticket=self.kwargs['ticket_id']).order_by('id') + + elif 'item_id' in self.kwargs: - self.parent_model = Software self.queryset = TicketLinkedItem.objects.filter( item=int(self.kwargs['item_id']), - item_type = item_type + item_type = self.item_type ) - self.item_type = item_type - if 'pk' in self.kwargs: self.queryset = self.queryset.filter(pk = self.kwargs['pk'])