Skip to content

Commit

Permalink
[#Fixes #9944] Unable to clone a resource (#9948) (#9968)
Browse files Browse the repository at this point in the history
* [#Fixes #9944] Unable to clone a resource

* [#Fixes #9944] variable rename

Co-authored-by: mattiagiupponi <51856725+mattiagiupponi@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and mattiagiupponi authored Sep 6, 2022
1 parent ec77b38 commit bcc8e8a
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 8 deletions.
26 changes: 19 additions & 7 deletions geonode/base/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
from django.conf import settings
from django.contrib.auth import get_user_model
from django.shortcuts import get_object_or_404

from rest_framework import permissions
from rest_framework.filters import BaseFilterBackend
Expand All @@ -30,7 +31,6 @@
get_visible_resources)
from geonode.groups.models import GroupProfile
from rest_framework.permissions import DjangoModelPermissions
from rest_framework.exceptions import NotFound
from guardian.shortcuts import get_objects_for_user
from itertools import chain
from guardian.shortcuts import get_groups_with_perms
Expand Down Expand Up @@ -227,20 +227,28 @@ class UserHasPerms(DjangoModelPermissions):
'DELETE': [f'base.{x}' for x in BASIC_MANAGE_PERMISSIONS],
}

def __init__(self, perms_dict={}):
self.perms_dict = perms_dict

def __call__(self):
return self

def has_permission(self, request, view):
from geonode.base.models import ResourceBase

queryset = self._queryset(view)
perms = self.get_required_permissions(request.method, queryset.model)
perms = self.perms_dict.get(request.method, None) or self.get_required_permissions(request.method, queryset.model)

if request.user.is_superuser:
return True

if view.kwargs.get('pk'):
# if a single resource is called, we check the perms for that resource
res = ResourceBase.objects.filter(pk=view.kwargs.get('pk')).first()
if not res:
raise NotFound
res = get_object_or_404(ResourceBase, pk=view.kwargs.get('pk'))
# if the request is for a single resource, we take the specific or the default. If none is defined we keep the original one defined above
resource_type_specific_perms = self.perms_dict.get(res.get_real_instance().resource_type, self.perms_dict.get('default', {}))
perms = resource_type_specific_perms.get(request.method, []) or perms

# getting the user permission for that resource
resource_perms = list(res.get_user_perms(request.user))
groups = get_groups_with_perms(res, attach_perms=True)
Expand All @@ -249,19 +257,23 @@ def has_permission(self, request, view):
# checking if the user is in that group
if group.user_set.filter(username=request.user).exists():
resource_perms = list(chain(resource_perms, perm))

if request.user.has_perm('base.add_resourcebase'):
resource_perms.append('add_resourcebase')
# merging all available permissions into a single list
available_perms = list(set(resource_perms))
# fixup the permissions name
perms_without_base = [x.replace('base.', '') for x in perms]
# if at least one of the permissions is available the request is True
return any([_perm in available_perms for _perm in perms_without_base])
rule = resource_type_specific_perms.get("rule", any)
return rule([_perm in available_perms for _perm in perms_without_base])

if not get_visible_resources(
queryset,
request.user if request else None,
admin_approval_required=settings.ADMIN_MODERATE_UPLOADS,
unpublished_not_visible=settings.RESOURCE_PUBLISHING,
private_groups_not_visibile=settings.GROUP_PRIVATE_RESOURCES):
private_groups_not_visibile=settings.GROUP_PRIVATE_RESOURCES).exists():
# there are not resource in the db, needed usually for fresh installations
return request.method in permissions.SAFE_METHODS
# check if the user have one of the perms in all the resource available
Expand Down
75 changes: 75 additions & 0 deletions geonode/base/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2396,6 +2396,81 @@ def test_resource_service_copy(self):
# clean
resource.delete()

def test_resource_service_copy_with_perms_dataset(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files_as_dict, _ = get_files(files)
resource = Dataset.objects.create(
owner=get_user_model().objects.get(username='admin'),
name='test_copy',
store='geonode_data',
subtype="vector",
alternate="geonode:test_copy",
resource_type="dataset",
uuid=str(uuid4()),
files=list(files_as_dict.values())
)
self._assertCloningWithPerms(resource)

def test_resource_service_copy_with_perms_doc(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files_as_dict, _ = get_files(files)
resource = Document.objects.create(
owner=get_user_model().objects.get(username='admin'),
subtype="vector",
alternate="geonode:test_copy",
resource_type="document",
uuid=str(uuid4()),
files=list(files_as_dict.values())
)

self._assertCloningWithPerms(resource)

def test_resource_service_copy_with_perms_map(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files_as_dict, _ = get_files(files)
resource = Document.objects.create(
owner=get_user_model().objects.get(username='admin'),
alternate="geonode:test_copy",
resource_type="map",
uuid=str(uuid4()),
files=list(files_as_dict.values())
)

self._assertCloningWithPerms(resource)

def _assertCloningWithPerms(self, resource):
# login as bobby
self.assertTrue(self.client.login(username="bobby", password="bob"))

# bobby cannot copy the resource since he doesnt have all the perms needed
_perms = {
'users': {
"bobby": ['base.add_resourcebase']
},
"groups": {
"anonymous": []
}
}
resource.set_permissions(_perms)
copy_url = reverse('base-resources-resource-service-copy', kwargs={'pk': resource.pk})
response = self.client.put(copy_url, data={'title': 'cloned_resource'})
self.assertEqual(response.status_code, 403)
# set perms to enable user clone resource
# bobby can copy the resource since he has all the perms needed
_perms = {
'users': {
"bobby": ['base.add_resourcebase', 'base.download_resourcebase']
},
"groups": {
"anonymous": ["base.view_resourcebase", "base.download_resourcebae"]
}
}
resource.set_permissions(_perms)
copy_url = reverse('base-resources-resource-service-copy', kwargs={'pk': resource.pk})
response = self.client.put(copy_url, data={'title': 'cloned_resource'})
self.assertEqual(response.status_code, 200)
resource.delete()

def test_base_resources_return_download_link_if_document(self):
"""
Ensure we can access the Resource Base list.
Expand Down
14 changes: 13 additions & 1 deletion geonode/base/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,19 @@ def resource_service_update(self, request, pk):
url_name="resource-service-copy",
methods=["put"],
permission_classes=[
IsAuthenticated, UserHasPerms
IsAuthenticated, UserHasPerms(
perms_dict={
"dataset": {
"PUT": ['add_resourcebase', 'download_resourcebase'], "rule": all
},
"document": {
"PUT": ['add_resourcebase', 'download_resourcebase'], "rule": all
},
"default": {
"PUT": ['add_resourcebase']
}
}
)
])
def resource_service_copy(self, request, pk):
"""Instructs the Async dispatcher to execute a 'COPY' operation over a valid 'pk'
Expand Down

0 comments on commit bcc8e8a

Please sign in to comment.