Skip to content
This repository has been archived by the owner on Mar 28, 2019. It is now read-only.

Add a method to remove every principals #645

Merged
merged 3 commits into from
Feb 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This document describes changes between each past release.
- Resource events are now merged in batch requests. One event per resource and
per action is emitted when a transaction is committed (#634)
- Monitor time of events listeners execution (fixes #503)
- Add method to remove a principal from every user

**Bug fixes**

Expand Down
7 changes: 7 additions & 0 deletions cliquet/permission/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ def remove_user_principal(self, user_id, principal):
"""
raise NotImplementedError

def remove_principal(self, principal):
"""Remove a principal from every user.

:param str principal: The principal to remove.
"""
raise NotImplementedError

def user_principals(self, user_id):
"""Return the set of additionnal principals given to a user.

Expand Down
7 changes: 7 additions & 0 deletions cliquet/permission/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ def remove_user_principal(self, user_id, principal):
else:
self._store[user_key] = user_principals

def remove_principal(self, principal):
for user_principals in self._store.values():
try:
user_principals.remove(principal)
except KeyError:
pass

def user_principals(self, user_id):
user_key = 'user:%s' % user_id
members = self._store.get(user_key, set())
Expand Down
7 changes: 7 additions & 0 deletions cliquet/permission/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ def remove_user_principal(self, user_id, principal):
with self.client.connect() as conn:
conn.execute(query, dict(user_id=user_id, principal=principal))

def remove_principal(self, principal):
query = """
DELETE FROM user_principals
WHERE principal = :principal;"""
with self.client.connect() as conn:
conn.execute(query, dict(principal=principal))

def user_principals(self, user_id):
query = """
SELECT principal
Expand Down
7 changes: 7 additions & 0 deletions cliquet/permission/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ def remove_user_principal(self, user_id, principal):
if self._client.scard(user_key) == 0:
self._client.delete(user_key)

def remove_principal(self, principal):
with self._client.pipeline() as pipe:
user_keys = self._client.scan_iter(match='user:*')
for user_key in user_keys:
pipe.srem(user_key, principal)
pipe.execute()

@wrap_redis_error
def user_principals(self, user_id):
user_key = 'user:%s' % user_id
Expand Down
17 changes: 17 additions & 0 deletions cliquet/tests/test_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def test_mandatory_overrides(self):
(self.permission.flush,),
(self.permission.add_user_principal, '', ''),
(self.permission.remove_user_principal, '', ''),
(self.permission.remove_principal, ''),
(self.permission.user_principals, ''),
(self.permission.add_principal_to_ace, '', '', ''),
(self.permission.remove_principal_from_ace, '', '', ''),
Expand Down Expand Up @@ -148,6 +149,22 @@ def test_can_remove_a_unexisting_principal_to_a_user(self):
retrieved = self.permission.user_principals(user_id)
self.assertEquals(retrieved, set())

def test_can_remove_principal_from_every_users(self):
user_id1 = 'foo1'
user_id2 = 'foo2'
principal1 = 'bar'
principal2 = 'foobar'
self.permission.add_user_principal(user_id1, principal1)
self.permission.add_user_principal(user_id2, principal1)
self.permission.add_user_principal(user_id2, principal2)
self.permission.remove_principal(principal1)
self.permission.remove_principal('unknown')

retrieved = self.permission.user_principals(user_id1)
self.assertEquals(retrieved, set())
retrieved = self.permission.user_principals(user_id2)
self.assertEquals(retrieved, {principal2})

def test_can_add_a_principal_to_an_object_permission(self):
object_id = 'foo'
permission = 'write'
Expand Down