From d428291f30a0b110cf9ef16373a6dfda447f137b Mon Sep 17 00:00:00 2001 From: Gary Snider <75227981+gsnider2195@users.noreply.github.com> Date: Wed, 9 Oct 2024 13:10:06 -0700 Subject: [PATCH] Fix migrations failing when no statuses exist and other various migrations fixes (#277) --- changes/272.fixed | 1 + nautobot_firewall_models/__init__.py | 7 +- .../migrations/0002_custom_status.py | 77 ++++++++++-------- .../migrations/0011_custom_status_nat.py | 36 ++++----- .../0012_remove_status_m2m_through_models.py | 16 ++-- .../0014_custom_status_application.py | 37 ++++----- nautobot_firewall_models/signals.py | 14 ++++ nautobot_firewall_models/utils/__init__.py | 79 +++++++++++++++++-- 8 files changed, 177 insertions(+), 90 deletions(-) create mode 100644 changes/272.fixed diff --git a/changes/272.fixed b/changes/272.fixed new file mode 100644 index 00000000..e93b540b --- /dev/null +++ b/changes/272.fixed @@ -0,0 +1 @@ +Fixed migrations failing when no statuses exist in the database and various other migration issues. diff --git a/nautobot_firewall_models/__init__.py b/nautobot_firewall_models/__init__.py index c8874e78..0fd3e1d1 100644 --- a/nautobot_firewall_models/__init__.py +++ b/nautobot_firewall_models/__init__.py @@ -4,6 +4,7 @@ from importlib import metadata from nautobot.apps import NautobotAppConfig +from nautobot.core.signals import nautobot_database_ready __version__ = metadata.version(__name__) @@ -24,13 +25,17 @@ class NautobotFirewallModelsConfig(NautobotAppConfig): "capirca_remark_pass": True, "capirca_os_map": {}, "allowed_status": ["Active"], + "default_status": "Active", "protect_on_delete": True, } docs_view_name = "plugins:nautobot_firewall_models:docs" def ready(self): """Register custom signals.""" - import nautobot_firewall_models.signals # noqa: F401, pylint: disable=import-outside-toplevel,unused-import + import nautobot_firewall_models.signals # pylint: disable=import-outside-toplevel + + nautobot_database_ready.connect(nautobot_firewall_models.signals.create_configured_statuses_signal, sender=self) + nautobot_database_ready.connect(nautobot_firewall_models.signals.associate_statuses_signal, sender=self) super().ready() diff --git a/nautobot_firewall_models/migrations/0002_custom_status.py b/nautobot_firewall_models/migrations/0002_custom_status.py index d9d473bd..89553588 100644 --- a/nautobot_firewall_models/migrations/0002_custom_status.py +++ b/nautobot_firewall_models/migrations/0002_custom_status.py @@ -2,69 +2,76 @@ import os import yaml -from django.core.exceptions import ObjectDoesNotExist from django.db import migrations +from nautobot_firewall_models.utils import ( + create_configured_statuses, + get_configured_status_names, + get_default_status_name, + get_firewall_models_with_status_field, +) + def create_status(apps, schema_editor): """Initial subset of statuses.""" - statuses = ["Active", "Staged", "Decommissioned"] - ContentType = apps.get_model("contenttypes.ContentType") - for i in statuses: - status = apps.get_model("extras.Status").objects.get(name=i) - for model in apps.app_configs["nautobot_firewall_models"].get_models(): - if hasattr(model, "status"): - ct = ContentType.objects.get_for_model(model) - status.content_types.add(ct) + create_configured_statuses(apps) + + content_types = get_firewall_models_with_status_field(apps) + Status = apps.get_model("extras.Status") + status_names = get_configured_status_names() + for status in Status.objects.filter(name__in=status_names).iterator(): + status.content_types.add(*content_types) def reverse_create_status(apps, schema_editor): - """Reverse adding firewall models to status content_types.""" + """Remove firewall models from status content_types.""" - statuses = ["Active", "Staged", "Decommissioned"] ContentType = apps.get_model("contenttypes.ContentType") - for i in statuses: - status = apps.get_model("extras.Status").objects.get(name=i) - for model in apps.app_configs["nautobot_firewall_models"].get_models(): - if hasattr(model, "status"): - ct = ContentType.objects.get_for_model(model) - status.content_types.remove(ct) + Status = apps.get_model("extras.Status") + firewall_models_content_types = ContentType.objects.filter(app_label="nautobot_firewall_models") + for status in Status.objects.filter(content_types__in=firewall_models_content_types).distinct().iterator(): + status.content_types.remove(*firewall_models_content_types) def create_default_objects(apps, schema_editor): """Initial subset of commonly used objects.""" - defaults = os.path.join(os.path.dirname(__file__), "services.yml") - with open(defaults, "r") as f: - services = yaml.safe_load(f) - status = apps.get_model("extras.Status").objects.get(name="Active") + default_services_file = os.path.join(os.path.dirname(__file__), "services.yml") + Status = apps.get_model("extras.Status") + ServiceObject = apps.get_model("nautobot_firewall_models.ServiceObject") + default_status = Status.objects.get(name=get_default_status_name()) - for i in services: - apps.get_model("nautobot_firewall_models.ServiceObject").objects.create(status=status, **i) + with open(default_services_file, "r") as f: + default_services = yaml.safe_load(f) + + for service in default_services: + ServiceObject.objects.create(status=default_status, **service) def reverse_create_default_objects(apps, schema_editor): - """Removes commonly used objects.""" - defaults = os.path.join(os.path.dirname(__file__), "services.yml") - with open(defaults, "r") as f: - services = yaml.safe_load(f) - status = apps.get_model("extras.Status").objects.get(name="Active") + """ + Removes commonly used objects. + + Currently skipped due to Django bug https://code.djangoproject.com/ticket/33586 + """ + default_services_file = os.path.join(os.path.dirname(__file__), "services.yml") + ServiceObject = apps.get_model("nautobot_firewall_models.ServiceObject") + + with open(default_services_file, "r") as f: + default_services = yaml.safe_load(f) - for i in services: - try: - service = apps.get_model("nautobot_firewall_models.ServiceObject").objects.get(status=status, **i) - service.delete() - except ObjectDoesNotExist: - continue + for service in default_services: + ServiceObject.objects.filter(**service).delete() class Migration(migrations.Migration): dependencies = [ + ("contenttypes", "0002_remove_content_type_name"), ("extras", "0033_add__optimized_indexing"), ("nautobot_firewall_models", "0001_initial"), ] operations = [ migrations.RunPython(code=create_status, reverse_code=reverse_create_status), - migrations.RunPython(code=create_default_objects, reverse_code=reverse_create_default_objects), + migrations.RunPython(code=create_default_objects, reverse_code=migrations.RunPython.noop), ] diff --git a/nautobot_firewall_models/migrations/0011_custom_status_nat.py b/nautobot_firewall_models/migrations/0011_custom_status_nat.py index b3aae18a..c157a82c 100644 --- a/nautobot_firewall_models/migrations/0011_custom_status_nat.py +++ b/nautobot_firewall_models/migrations/0011_custom_status_nat.py @@ -2,6 +2,8 @@ from django.db import migrations +from nautobot_firewall_models.utils import create_configured_statuses, get_configured_status_names + def create_nat_status(apps, schema_editor): """Initial subset of statuses for the NAT models. @@ -9,18 +11,15 @@ def create_nat_status(apps, schema_editor): This was added along with 0009_nat_policy in order to associate the same set of statuses with the new NAT models that are associated with the original set of security models. """ + create_configured_statuses(apps) - statuses = ["Active", "Staged", "Decommissioned"] ContentType = apps.get_model("contenttypes.ContentType") - relevant_models = [ - apps.get_model(model) - for model in ["nautobot_firewall_models.NATPolicy", "nautobot_firewall_models.NATPolicyRule"] - ] - for i in statuses: - status = apps.get_model("extras.Status").objects.get(name=i) - for model in relevant_models: - ct = ContentType.objects.get_for_model(model) - status.content_types.add(ct) + Status = apps.get_model("extras.Status") + relevant_models_ct = ContentType.objects.filter( + app_label="nautobot_firewall_models", model__in=["natpolicy", "natpolicyrule"] + ) + for status in Status.objects.filter(name__in=get_configured_status_names()).iterator(): + status.content_types.add(*relevant_models_ct) def remove_nat_status(apps, schema_editor): @@ -30,21 +29,18 @@ def remove_nat_status(apps, schema_editor): that are associated with the original set of security models. """ - statuses = ["Active", "Staged", "Decommissioned"] ContentType = apps.get_model("contenttypes.ContentType") - relevant_models = [ - apps.get_model(model) - for model in ["nautobot_firewall_models.NATPolicy", "nautobot_firewall_models.NATPolicyRule"] - ] - for i in statuses: - status = apps.get_model("extras.Status").objects.get(name=i) - for model in relevant_models: - ct = ContentType.objects.get_for_model(model) - status.content_types.remove(ct) + Status = apps.get_model("extras.Status") + relevant_models_ct = ContentType.objects.filter( + app_label="nautobot_firewall_models", model__in=["natpolicy", "natpolicyrule"] + ) + for status in Status.objects.filter(content_types__in=relevant_models_ct).distinct().iterator(): + status.content_types.remove(*relevant_models_ct) class Migration(migrations.Migration): dependencies = [ + ("contenttypes", "0002_remove_content_type_name"), ("extras", "0033_add__optimized_indexing"), ("nautobot_firewall_models", "0010_nat_policy"), ] diff --git a/nautobot_firewall_models/migrations/0012_remove_status_m2m_through_models.py b/nautobot_firewall_models/migrations/0012_remove_status_m2m_through_models.py index de1b2523..e0aa29f6 100644 --- a/nautobot_firewall_models/migrations/0012_remove_status_m2m_through_models.py +++ b/nautobot_firewall_models/migrations/0012_remove_status_m2m_through_models.py @@ -4,18 +4,20 @@ def remove_m2m_through_status_content_types(apps, schema_editor): """Remove the through model content types from the Status objects.""" - statuses = ["Active", "Staged", "Decommissioned"] ContentType = apps.get_model("contenttypes.ContentType") - for i in statuses: - status = apps.get_model("extras.Status").objects.get(name=i) - for model in apps.app_configs["nautobot_firewall_models"].get_models(): - if not hasattr(model, "status"): - ct = ContentType.objects.get_for_model(model) - status.content_types.remove(ct) + Status = apps.get_model("extras.Status") + firewall_models_without_status_field = [] + for model in apps.app_configs["nautobot_firewall_models"].get_models(): + if not hasattr(model, "status"): + ct = ContentType.objects.get_for_model(model) + firewall_models_without_status_field.append(ct) + for status in Status.objects.filter(content_types__in=firewall_models_without_status_field).distinct().iterator(): + status.content_types.remove(*firewall_models_without_status_field) class Migration(migrations.Migration): dependencies = [ + ("contenttypes", "0002_remove_content_type_name"), ("extras", "0033_add__optimized_indexing"), ("nautobot_firewall_models", "0011_custom_status_nat"), ] diff --git a/nautobot_firewall_models/migrations/0014_custom_status_application.py b/nautobot_firewall_models/migrations/0014_custom_status_application.py index 83ff5c0b..9fbad8db 100644 --- a/nautobot_firewall_models/migrations/0014_custom_status_application.py +++ b/nautobot_firewall_models/migrations/0014_custom_status_application.py @@ -2,6 +2,8 @@ from django.db import migrations +from nautobot_firewall_models.utils import create_configured_statuses, get_configured_status_names + def create_app_status(apps, schema_editor): """Initial subset of statuses for the Application models. @@ -9,18 +11,15 @@ def create_app_status(apps, schema_editor): This was added along with 0013_applications in order to associate the same set of statuses with the new Application models that are associated with the original set of security models. """ + create_configured_statuses(apps) - statuses = ["Active", "Staged", "Decommissioned"] ContentType = apps.get_model("contenttypes.ContentType") - relevant_models = [ - apps.get_model(model) - for model in ["nautobot_firewall_models.ApplicationObject", "nautobot_firewall_models.ApplicationObjectGroup"] - ] - for i in statuses: - status = apps.get_model("extras.Status").objects.get(name=i) - for model in relevant_models: - ct = ContentType.objects.get_for_model(model) - status.content_types.add(ct) + Status = apps.get_model("extras.Status") + relevant_models_ct = ContentType.objects.filter( + app_label="nautobot_firewall_models", model__in=["applicationobject", "applicationobjectgroup"] + ) + for status in Status.objects.filter(name__in=get_configured_status_names()).iterator(): + status.content_types.add(*relevant_models_ct) def remove_app_status(apps, schema_editor): @@ -29,22 +28,18 @@ def remove_app_status(apps, schema_editor): This was added along with 0013_applications in order to associate the same set of statuses with the new Application models that are associated with the original set of security models. """ - - statuses = ["Active", "Staged", "Decommissioned"] ContentType = apps.get_model("contenttypes.ContentType") - relevant_models = [ - apps.get_model(model) - for model in ["nautobot_firewall_models.ApplicationObject", "nautobot_firewall_models.ApplicationObjectGroup"] - ] - for i in statuses: - status = apps.get_model("extras.Status").objects.get(name=i) - for model in relevant_models: - ct = ContentType.objects.get_for_model(model) - status.content_types.remove(ct) + Status = apps.get_model("extras.Status") + relevant_models_ct = ContentType.objects.filter( + app_label="nautobot_firewall_models", model__in=["applicationobject", "applicationobjectgroup"] + ) + for status in Status.objects.filter(content_types__in=relevant_models_ct).distinct().iterator(): + status.content_types.remove(*relevant_models_ct) class Migration(migrations.Migration): dependencies = [ + ("contenttypes", "0002_remove_content_type_name"), ("nautobot_firewall_models", "0013_applications"), ] diff --git a/nautobot_firewall_models/signals.py b/nautobot_firewall_models/signals.py index 8540fb11..96df3cc6 100644 --- a/nautobot_firewall_models/signals.py +++ b/nautobot_firewall_models/signals.py @@ -4,10 +4,12 @@ from django.db.models.signals import pre_delete from django.dispatch import receiver from nautobot.dcim.models import Interface +from nautobot.extras.models import Status from nautobot.ipam.models import VRF, IPAddress, Prefix from nautobot_firewall_models import models from nautobot_firewall_models.constants import PLUGIN_CFG +from nautobot_firewall_models.utils import create_configured_statuses, get_firewall_models_with_status_field ON_DELETE = { IPAddress: ["fqdns", "address_objects"], @@ -102,3 +104,15 @@ def on_delete_handler(instance, **kwargs): for i in ON_DELETE[instance._meta.model]: if hasattr(instance, i) and getattr(instance, i).exists(): raise ValidationError(f"{instance} is assigned to an {i} & `protect_on_delete` is enabled.") + + +def create_configured_statuses_signal(sender, **kwargs): # pylint: disable=unused-argument + """Signal handler to create default_status and allowed_status configured in the app config.""" + create_configured_statuses() + + +def associate_statuses_signal(sender, **kwargs): # pylint: disable=unused-argument + """Signal handler to associate some common statuses with the firewall model content types.""" + for status in Status.objects.filter(name__in=["Active", "Staged", "Decommissioned"]): + content_types = get_firewall_models_with_status_field() + status.content_types.add(*content_types) diff --git a/nautobot_firewall_models/utils/__init__.py b/nautobot_firewall_models/utils/__init__.py index 937a53df..385d7b16 100644 --- a/nautobot_firewall_models/utils/__init__.py +++ b/nautobot_firewall_models/utils/__init__.py @@ -2,17 +2,84 @@ import json -from django.conf import settings +from django.apps import apps as django_apps from django.utils.module_loading import import_string from nautobot.core.models.utils import serialize_object_v2 -from nautobot.extras.models import Status + +try: + from nautobot.extras.management import STATUS_COLOR_MAP, STATUS_DESCRIPTION_MAP +except ImportError: # Nautobot version < v2.2.0 + from nautobot.extras.management import COLOR_MAP as STATUS_COLOR_MAP + from nautobot.extras.management import DESCRIPTION_MAP as STATUS_DESCRIPTION_MAP from rest_framework.renderers import JSONRenderer +from nautobot_firewall_models.constants import PLUGIN_CFG + + +def _create_status(status_name, apps=django_apps): + """Create a status with the given name, using nautobot default description and color if applicable.""" + Status = apps.get_model("extras.Status") # pylint: disable=invalid-name + defaults = {"description": STATUS_DESCRIPTION_MAP.get(status_name, "")} + if status_name in STATUS_COLOR_MAP: + defaults["color"] = STATUS_COLOR_MAP[status_name] + status, _ = Status.objects.get_or_create(name=status_name, defaults=defaults) + + # Add the status to all firewall models with a status field + content_types = get_firewall_models_with_status_field(apps=apps) + status.content_types.add(*content_types) + + +def create_configured_statuses(apps=django_apps): + """Create the configured statuses (default_status and allowed_status) for the firewall app if they don't already exist.""" + for status_name in get_configured_status_names(): + _create_status(status_name, apps=apps) + + +def create_default_status(apps=django_apps): + """Create the default_status defined in the app config if it doesn't already exist.""" + default_status_name = PLUGIN_CFG.get("default_status") + _create_status(default_status_name, apps=apps) + + +def get_configured_status_names(): + """Retrieve the names of the configured statuses (default_status and allowed_status) from the firewall app config.""" + configured_status_names = PLUGIN_CFG.get("allowed_status") + if isinstance(configured_status_names, str): + configured_status_names = [configured_status_names] + return configured_status_names + [get_default_status_name()] + + +def get_default_status(apps=django_apps): + """ + Return the primary key of the default status defined in the firewall app config. + + Creates the default status if it doesn't exist. Used by the firewall models for the status field default. + """ + Status = apps.get_model("extras.Status") # pylint: disable=invalid-name + default_status_name = PLUGIN_CFG.get("default_status") + default_status = Status.objects.filter(name=default_status_name) + if not default_status.exists(): + create_default_status() + + return default_status.all().first().pk + + +def get_default_status_name(): + """Return the name of the default status defined in the firewall app config.""" + default_status_name = PLUGIN_CFG.get("default_status") + return default_status_name + + +def get_firewall_models_with_status_field(apps=django_apps): + """Return a list of content types for all firewall models that have a status field. Usable in migrations.""" + model_content_types = [] + ContentType = apps.get_model("contenttypes.ContentType") # pylint: disable=invalid-name + for model in apps.get_app_config("nautobot_firewall_models").get_models(): + if hasattr(model, "status"): + ct = ContentType.objects.get_for_model(model) # pylint: disable=invalid-name + model_content_types.append(ct) -def get_default_status(): - """Returns a default status value based on plugin config.""" - status_name = settings.PLUGINS_CONFIG.get("nautobot_firewall_models", {}).get("default_status", "Active") - return Status.objects.get(name=status_name).pk + return model_content_types def model_to_json(obj, cls=None):