-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
52258c7
commit e003c46
Showing
4 changed files
with
313 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
from functools import partial | ||
|
||
from faker import Faker | ||
|
||
from django.apps import apps | ||
from django.conf import settings | ||
from django.contrib.auth import get_user_model | ||
from django.core.exceptions import ImproperlyConfigured | ||
from django.db import transaction | ||
from django.db.models import Q | ||
|
||
from leukeleu_django_gdpr.gdpr import read_data | ||
|
||
|
||
def get_models_from_gdpr_yml(): | ||
data = read_data() | ||
return data["models"] | ||
|
||
|
||
class BaseAnonymizer: | ||
excluded_fields = [] | ||
extra_fieldtype_overrides = None | ||
extra_qs_overrides = None | ||
extra_field_overrides = None | ||
|
||
def __init__(self): | ||
self.fake = Faker(["nl-NL"]) | ||
|
||
def anonymize(self): | ||
fieldtype_overrides = self.get_fieldtype_overrides() | ||
qs_overrides = self.get_qs_overrides() | ||
field_overrides = self.get_field_overrides() | ||
|
||
with transaction.atomic(): | ||
models = get_models_from_gdpr_yml() | ||
for model_name, model_data in models.items(): | ||
Model = apps.get_model(model_name) # noqa: N806 | ||
|
||
qs = qs_overrides.get(model_name) or Model._base_manager.all() | ||
qs = qs.all() # Makes sure we are always dealing with the latest data | ||
|
||
for field_name, field_data in model_data["fields"].items(): | ||
field_path = f"{model_name}.{field_name}" | ||
if not field_data["pii"] or field_path in self.excluded_fields: | ||
# Leave non PII and ignored fields alone | ||
continue | ||
|
||
field = Model._meta.get_field(field_name) | ||
|
||
field_type = type(field).__name__ | ||
if field.unique: | ||
field_type += ".unique" | ||
|
||
try: | ||
value_func = field_overrides.get( | ||
field_path, | ||
fieldtype_overrides[field_type], | ||
) | ||
except KeyError: | ||
raise ImproperlyConfigured( | ||
f"Field type '{field_type}' not defined " | ||
"inside FIELDTYPE_FAKER_MAPPING" | ||
) | ||
|
||
for obj in qs: | ||
setattr(obj, field_name, value_func()) | ||
|
||
Model.objects.bulk_update( | ||
qs, | ||
model_data["fields"].keys(), | ||
batch_size=500, | ||
) | ||
|
||
def get_fieldtype_overrides(self): | ||
fieldtype_overrides = { | ||
"BigIntegerField": self.fake.random_int, | ||
"BigIntegerField.unique": self.fake.unique.random_int, | ||
"BooleanField": self.fake.boolean, # No unique variant | ||
"CharField": self.fake.pystr, | ||
"CharField.unique": self.fake.unique.pystr, | ||
"DateField": self.fake.date_this_decade, | ||
"DateField.unique": self.fake.unique.date_this_decade, | ||
"DateTimeField": self.fake.date_time_this_decade, | ||
"DateTimeField.unique": self.fake.unique.date_time_this_decade, | ||
"DecimalField": self.fake.random_int, | ||
"DecimalField.unique": self.fake.unique.random_int, | ||
"EmailField": self.fake.safe_email, | ||
"EmailField.unique": self.fake.unique.safe_email, | ||
"FloatField": self.fake.random_int, | ||
"FloatField.unique": self.fake.unique.random_int, | ||
"GenericIPAddressField": self.fake.ipv4, | ||
"GenericIPAddressField.unique": self.fake.unique.ipv4, | ||
"IntegerField": self.fake.random_int, | ||
"IntegerField.unique": self.fake.unique.random_int, | ||
"JSONField": partial( | ||
self.fake.pydict, | ||
value_types=["str"], | ||
), # No unique variant | ||
"PositiveBigIntegerField": self.fake.random_int, | ||
"PositiveBigIntegerField.unique": self.fake.unique.random_int, | ||
"PositiveIntegerField": self.fake.random_int, | ||
"PositiveIntegerField.unique": self.fake.unique.random_int, | ||
"PositiveSmallIntegerField": self.fake.random_int, | ||
"PositiveSmallIntegerField.unique": self.fake.unique.random_int, | ||
"RichTextField": self.fake.paragraph, | ||
"RichTextField.unique": self.fake.unique.paragraph, | ||
"SlugField": self.fake.pystr, | ||
"SlugField.unique": self.fake.unique.pystr, | ||
"SmallIntegerField": self.fake.random_int, | ||
"SmallIntegerField.unique": self.fake.unique.random_int, | ||
"TextField": self.fake.paragraph, | ||
"TextField.unique": self.fake.unique.paragraph, | ||
"URLField": self.fake.url, | ||
"URLField.unique": self.fake.unique.url, | ||
} | ||
|
||
if self.extra_fieldtype_overrides is not None: | ||
fieldtype_overrides.update(self.extra_fieldtype_overrides) | ||
return fieldtype_overrides | ||
|
||
def get_qs_overrides(self): | ||
qs_overrides = { | ||
settings.AUTH_USER_MODEL: get_user_model()._base_manager.exclude( | ||
Q(is_superuser=True) | Q(is_staff=True) | ||
), | ||
} | ||
if self.extra_qs_overrides is not None: | ||
qs_overrides.update(self.extra_qs_overrides) | ||
return qs_overrides | ||
|
||
def get_field_overrides(self): | ||
field_overrides = { | ||
f"{settings.AUTH_USER_MODEL}.first_name": self.fake.first_name, | ||
f"{settings.AUTH_USER_MODEL}.last_name": self.fake.last_name, | ||
} | ||
if self.extra_field_overrides is not None: | ||
field_overrides.update(self.extra_field_overrides) | ||
return field_overrides |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from django.conf import settings | ||
from django.core.management import BaseCommand, CommandError | ||
from django.utils.module_loading import import_string | ||
|
||
from leukeleu_django_gdpr.anonymize import BaseAnonymizer | ||
from leukeleu_django_gdpr.gdpr import get_pii_stats | ||
|
||
|
||
def get_anonymizer(): | ||
if hasattr(settings, "DJANGO_GDPR_ANONYMIZER_CLASS"): | ||
return import_string(settings.DJANGO_GDPR_ANONYMIZER_CLASS)() | ||
else: | ||
return BaseAnonymizer() | ||
|
||
|
||
class Command(BaseCommand): | ||
""" | ||
Goes through models and their fields and anonymizes the data if `pii: True` | ||
Currently, fields that are *not* required will still be anonymized. | ||
""" | ||
|
||
def handle(self, *args, **options): | ||
if not settings.DEBUG: | ||
raise CommandError("This command only runs in debug mode") | ||
|
||
if ( | ||
input( | ||
"Are you sure you want to anonymize data? " | ||
"This changes the database. [y/N] " | ||
).lower() | ||
!= "y" | ||
): | ||
raise CommandError("Aborted") | ||
|
||
stats = get_pii_stats(save=False) | ||
unclassified_fields = stats.get(None, 0) | ||
if unclassified_fields: | ||
raise CommandError( | ||
f"There are still {unclassified_fields} unclassified PII fields. " | ||
"Run `manage.py gdpr` first and classify all fields." | ||
) | ||
|
||
get_anonymizer().anonymize() | ||
|
||
self.stdout.write( | ||
self.style.SUCCESS( | ||
"Successfully anonymized data. Make sure to check it.", | ||
) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
from unittest import mock | ||
|
||
from django.test import TestCase | ||
|
||
from leukeleu_django_gdpr.anonymize import BaseAnonymizer | ||
from tests.custom_users.models import CustomUser | ||
|
||
|
||
def _get_models(): | ||
return { | ||
"custom_users.CustomUser": { | ||
"fields": { | ||
"username": { | ||
"pii": True, | ||
}, | ||
"first_name": { | ||
"pii": True, | ||
}, | ||
} | ||
} | ||
} | ||
|
||
|
||
patch_get_models = mock.patch( | ||
"leukeleu_django_gdpr.anonymize.get_models_from_gdpr_yml", | ||
return_value=_get_models(), | ||
) | ||
|
||
|
||
class AnonymizerTest(TestCase): | ||
@classmethod | ||
def setUpClass(cls): | ||
patch_get_models.start() | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
patch_get_models.stop() | ||
|
||
def setUp(self): | ||
self.user = CustomUser.objects.create(username="User", first_name="John") | ||
self.superuser = CustomUser.objects.create(username="Super", is_superuser=True) | ||
self.staffuser = CustomUser.objects.create(username="Staff", is_staff=True) | ||
|
||
def test_username_anonymization(self): | ||
self.assertEqual(self.user.username, "User") | ||
self.assertEqual(self.superuser.username, "Super") | ||
self.assertEqual(self.staffuser.username, "Staff") | ||
|
||
BaseAnonymizer().anonymize() | ||
|
||
self.user.refresh_from_db() | ||
self.superuser.refresh_from_db() | ||
self.staffuser.refresh_from_db() | ||
|
||
# This should be different now | ||
self.assertNotEqual(self.user.username, "User") | ||
|
||
# These should still equal the original usernames | ||
self.assertEqual(self.superuser.username, "Super") | ||
self.assertEqual(self.staffuser.username, "Staff") | ||
|
||
def test_excluded_fields(self): | ||
class Anonymizer(BaseAnonymizer): | ||
excluded_fields = [ | ||
"custom_users.CustomUser.username", | ||
] | ||
|
||
self.assertEqual(self.user.username, "User") | ||
Anonymizer().anonymize() | ||
self.user.refresh_from_db() | ||
|
||
# This should still equal the original username | ||
self.assertEqual(self.user.username, "User") | ||
|
||
def test_extra_fieldtypes(self): | ||
class Anonymizer(BaseAnonymizer): | ||
extra_fieldtype_overrides = { | ||
"CharField": lambda: "Foo", | ||
} | ||
|
||
def get_field_overrides(self): | ||
return {} | ||
|
||
self.assertEqual(self.user.first_name, "John") | ||
Anonymizer().anonymize() | ||
self.user.refresh_from_db() | ||
self.assertEqual(self.user.first_name, "Foo") | ||
|
||
def test_extra_qs_overrides(self): | ||
class Anonymizer(BaseAnonymizer): | ||
extra_qs_overrides = { | ||
# By default superusers would be skipped | ||
"custom_users.CustomUser": CustomUser._base_manager.all(), | ||
} | ||
|
||
self.assertEqual(self.superuser.username, "Super") | ||
Anonymizer().anonymize() | ||
self.superuser.refresh_from_db() | ||
|
||
# This should be different now | ||
self.assertNotEqual(self.superuser.username, "Super") | ||
|
||
def test_extra_field_overrides(self): | ||
class Anonymizer(BaseAnonymizer): | ||
extra_field_overrides = { | ||
"custom_users.CustomUser.username": lambda: "Foo", | ||
} | ||
|
||
self.assertEqual(self.user.username, "User") | ||
Anonymizer().anonymize() | ||
self.user.refresh_from_db() | ||
self.assertEqual(self.user.username, "Foo") | ||
|
||
def test_multiple_runs_while_new_data_is_added(self): | ||
class Anonymizer(BaseAnonymizer): | ||
extra_qs_overrides = { | ||
"custom_users.CustomUser": CustomUser._base_manager.all(), | ||
} | ||
|
||
Anonymizer().anonymize() | ||
new_user = CustomUser.objects.create(username="NewUser") | ||
Anonymizer().anonymize() | ||
new_user.refresh_from_db() | ||
self.assertNotEqual(new_user.username, "NewUser") |