diff --git a/leukeleu_django_gdpr/anonymize.py b/leukeleu_django_gdpr/anonymize.py new file mode 100644 index 0000000..e7bd625 --- /dev/null +++ b/leukeleu_django_gdpr/anonymize.py @@ -0,0 +1,138 @@ +from functools import partial + +from faker import Faker + +from django.apps import apps +from django.conf import settings +from django.contrib.auth import get_user_model +from django.core.exceptions import ImproperlyConfigured +from django.db import transaction +from django.db.models import Q + +from leukeleu_django_gdpr.gdpr import read_data + + +def get_models_from_gdpr_yml(): + data = read_data() + return data["models"] + + +class BaseAnonymizer: + excluded_fields = [] + extra_fieldtype_overrides = None + extra_qs_overrides = None + extra_field_overrides = None + + def __init__(self): + self.fake = Faker(["nl-NL"]) + + def anonymize(self): + fieldtype_overrides = self.get_fieldtype_overrides() + qs_overrides = self.get_qs_overrides() + field_overrides = self.get_field_overrides() + + with transaction.atomic(): + models = get_models_from_gdpr_yml() + for model_name, model_data in models.items(): + Model = apps.get_model(model_name) # noqa: N806 + + qs = qs_overrides.get(model_name) or Model._base_manager.all() + qs = qs.all() # Makes sure we are always dealing with the latest data + + for field_name, field_data in model_data["fields"].items(): + field_path = f"{model_name}.{field_name}" + if not field_data["pii"] or field_path in self.excluded_fields: + # Leave non PII and ignored fields alone + continue + + field = Model._meta.get_field(field_name) + + field_type = type(field).__name__ + if field.unique: + field_type += ".unique" + + try: + value_func = field_overrides.get( + field_path, + fieldtype_overrides[field_type], + ) + except KeyError: + raise ImproperlyConfigured( + f"Field type '{field_type}' not defined " + "inside FIELDTYPE_FAKER_MAPPING" + ) + + for obj in qs: + setattr(obj, field_name, value_func()) + + Model.objects.bulk_update( + qs, + model_data["fields"].keys(), + batch_size=500, + ) + + def get_fieldtype_overrides(self): + fieldtype_overrides = { + "BigIntegerField": self.fake.random_int, + "BigIntegerField.unique": self.fake.unique.random_int, + "BooleanField": self.fake.boolean, # No unique variant + "CharField": self.fake.pystr, + "CharField.unique": self.fake.unique.pystr, + "DateField": self.fake.date_this_decade, + "DateField.unique": self.fake.unique.date_this_decade, + "DateTimeField": self.fake.date_time_this_decade, + "DateTimeField.unique": self.fake.unique.date_time_this_decade, + "DecimalField": self.fake.random_int, + "DecimalField.unique": self.fake.unique.random_int, + "EmailField": self.fake.safe_email, + "EmailField.unique": self.fake.unique.safe_email, + "FloatField": self.fake.random_int, + "FloatField.unique": self.fake.unique.random_int, + "GenericIPAddressField": self.fake.ipv4, + "GenericIPAddressField.unique": self.fake.unique.ipv4, + "IntegerField": self.fake.random_int, + "IntegerField.unique": self.fake.unique.random_int, + "JSONField": partial( + self.fake.pydict, + value_types=["str"], + ), # No unique variant + "PositiveBigIntegerField": self.fake.random_int, + "PositiveBigIntegerField.unique": self.fake.unique.random_int, + "PositiveIntegerField": self.fake.random_int, + "PositiveIntegerField.unique": self.fake.unique.random_int, + "PositiveSmallIntegerField": self.fake.random_int, + "PositiveSmallIntegerField.unique": self.fake.unique.random_int, + "RichTextField": self.fake.paragraph, + "RichTextField.unique": self.fake.unique.paragraph, + "SlugField": self.fake.pystr, + "SlugField.unique": self.fake.unique.pystr, + "SmallIntegerField": self.fake.random_int, + "SmallIntegerField.unique": self.fake.unique.random_int, + "TextField": self.fake.paragraph, + "TextField.unique": self.fake.unique.paragraph, + "URLField": self.fake.url, + "URLField.unique": self.fake.unique.url, + } + + if self.extra_fieldtype_overrides is not None: + fieldtype_overrides.update(self.extra_fieldtype_overrides) + return fieldtype_overrides + + def get_qs_overrides(self): + qs_overrides = { + settings.AUTH_USER_MODEL: get_user_model()._base_manager.exclude( + Q(is_superuser=True) | Q(is_staff=True) + ), + } + if self.extra_qs_overrides is not None: + qs_overrides.update(self.extra_qs_overrides) + return qs_overrides + + def get_field_overrides(self): + field_overrides = { + f"{settings.AUTH_USER_MODEL}.first_name": self.fake.first_name, + f"{settings.AUTH_USER_MODEL}.last_name": self.fake.last_name, + } + if self.extra_field_overrides is not None: + field_overrides.update(self.extra_field_overrides) + return field_overrides diff --git a/leukeleu_django_gdpr/management/commands/anonymize.py b/leukeleu_django_gdpr/management/commands/anonymize.py new file mode 100644 index 0000000..fb554a5 --- /dev/null +++ b/leukeleu_django_gdpr/management/commands/anonymize.py @@ -0,0 +1,50 @@ +from django.conf import settings +from django.core.management import BaseCommand, CommandError +from django.utils.module_loading import import_string + +from leukeleu_django_gdpr.anonymize import BaseAnonymizer +from leukeleu_django_gdpr.gdpr import get_pii_stats + + +def get_anonymizer(): + if hasattr(settings, "DJANGO_GDPR_ANONYMIZER_CLASS"): + return import_string(settings.DJANGO_GDPR_ANONYMIZER_CLASS)() + else: + return BaseAnonymizer() + + +class Command(BaseCommand): + """ + Goes through models and their fields and anonymizes the data if `pii: True` + + Currently, fields that are *not* required will still be anonymized. + """ + + def handle(self, *args, **options): + if not settings.DEBUG: + raise CommandError("This command only runs in debug mode") + + if ( + input( + "Are you sure you want to anonymize data? " + "This changes the database. [y/N] " + ).lower() + != "y" + ): + raise CommandError("Aborted") + + stats = get_pii_stats(save=False) + unclassified_fields = stats.get(None, 0) + if unclassified_fields: + raise CommandError( + f"There are still {unclassified_fields} unclassified PII fields. " + "Run `manage.py gdpr` first and classify all fields." + ) + + get_anonymizer().anonymize() + + self.stdout.write( + self.style.SUCCESS( + "Successfully anonymized data. Make sure to check it.", + ) + ) diff --git a/setup.cfg b/setup.cfg index 8ae0a5f..b7ab271 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ test = # Linting black~=23.1 check-manifest==0.49 + factory_boy~=3.2 flake8~=6.0 flake8-assertive~=2.1 flake8-black~=0.3.0 diff --git a/tests/test_anonymizer.py b/tests/test_anonymizer.py new file mode 100644 index 0000000..c7e440b --- /dev/null +++ b/tests/test_anonymizer.py @@ -0,0 +1,124 @@ +from unittest import mock + +from django.test import TestCase + +from leukeleu_django_gdpr.anonymize import BaseAnonymizer +from tests.custom_users.models import CustomUser + + +def _get_models(): + return { + "custom_users.CustomUser": { + "fields": { + "username": { + "pii": True, + }, + "first_name": { + "pii": True, + }, + } + } + } + + +patch_get_models = mock.patch( + "leukeleu_django_gdpr.anonymize.get_models_from_gdpr_yml", + return_value=_get_models(), +) + + +class AnonymizerTest(TestCase): + @classmethod + def setUpClass(cls): + patch_get_models.start() + + @classmethod + def tearDownClass(cls): + patch_get_models.stop() + + def setUp(self): + self.user = CustomUser.objects.create(username="User", first_name="John") + self.superuser = CustomUser.objects.create(username="Super", is_superuser=True) + self.staffuser = CustomUser.objects.create(username="Staff", is_staff=True) + + def test_username_anonymization(self): + self.assertEqual(self.user.username, "User") + self.assertEqual(self.superuser.username, "Super") + self.assertEqual(self.staffuser.username, "Staff") + + BaseAnonymizer().anonymize() + + self.user.refresh_from_db() + self.superuser.refresh_from_db() + self.staffuser.refresh_from_db() + + # This should be different now + self.assertNotEqual(self.user.username, "User") + + # These should still equal the original usernames + self.assertEqual(self.superuser.username, "Super") + self.assertEqual(self.staffuser.username, "Staff") + + def test_excluded_fields(self): + class Anonymizer(BaseAnonymizer): + excluded_fields = [ + "custom_users.CustomUser.username", + ] + + self.assertEqual(self.user.username, "User") + Anonymizer().anonymize() + self.user.refresh_from_db() + + # This should still equal the original username + self.assertEqual(self.user.username, "User") + + def test_extra_fieldtypes(self): + class Anonymizer(BaseAnonymizer): + extra_fieldtype_overrides = { + "CharField": lambda: "Foo", + } + + def get_field_overrides(self): + return {} + + self.assertEqual(self.user.first_name, "John") + Anonymizer().anonymize() + self.user.refresh_from_db() + self.assertEqual(self.user.first_name, "Foo") + + def test_extra_qs_overrides(self): + class Anonymizer(BaseAnonymizer): + extra_qs_overrides = { + # By default superusers would be skipped + "custom_users.CustomUser": CustomUser._base_manager.all(), + } + + self.assertEqual(self.superuser.username, "Super") + Anonymizer().anonymize() + self.superuser.refresh_from_db() + + # This should be different now + self.assertNotEqual(self.superuser.username, "Super") + + def test_extra_field_overrides(self): + class Anonymizer(BaseAnonymizer): + extra_field_overrides = { + "custom_users.CustomUser.username": lambda: "Foo", + } + + self.assertEqual(self.user.username, "User") + Anonymizer().anonymize() + self.user.refresh_from_db() + self.assertEqual(self.user.username, "Foo") + + def test_multiple_runs_while_new_data_is_added(self): + class Anonymizer(BaseAnonymizer): + extra_qs_overrides = { + "custom_users.CustomUser": CustomUser._base_manager.all(), + } + + Anonymizer().anonymize() + new_user = CustomUser.objects.create(username="NewUser") + Anonymizer().anonymize() + new_user.refresh_from_db() + self.assertNotEqual(new_user.username, "NewUser")