diff --git a/ddpui/management/commands/deleteorg.py b/ddpui/management/commands/deleteorg.py index b1d23d54..b6c1f0af 100644 --- a/ddpui/management/commands/deleteorg.py +++ b/ddpui/management/commands/deleteorg.py @@ -1,12 +1,7 @@ from django.core.management.base import BaseCommand -from ddpui.models.org_user import Org, OrgUser -from ddpui.models.org import OrgDataFlow, OrgPrefectBlock, OrgWarehouse -from ddpui.ddpairbyte import airbyte_service -from ddpui.ddpprefect import prefect_service -from ddpui.ddpprefect import AIRBYTESERVER, AIRBYTECONNECTION, SHELLOPERATION, DBTCORE -from ddpui.ddpdbt import dbt_service -from ddpui.utils import secretsmanager +from ddpui.models.org_user import Org +from ddpui.utils.deleteorg import delete_one_org class Command(BaseCommand): @@ -22,108 +17,6 @@ def add_arguments(self, parser): # skipcq: PYL-R0201 parser.add_argument("--org-name", required=True) parser.add_argument("--yes-really", action="store_true") - def delete_prefect_deployments(self, org: Org): # skipcq: PYL-R0201 - """fetches and deletes every prefect deployment for this org""" - print("=========== OrgDataFlow ===========") - for dataflow in OrgDataFlow.objects.filter(org=org): - print(dataflow.deployment_id, dataflow.connection_id) - try: - prefect_service.delete_deployment_by_id(dataflow.deployment_id) - except Exception: - pass - dataflow.delete() - - def delete_prefect_shell_blocks(self, org: Org): # skipcq: PYL-R0201 - """fetches and deletes all prefect blocks for this org""" - print("=========== OrgPrefectBlock: Shell Operations ===========") - for block in OrgPrefectBlock.objects.filter(org=org, block_type=SHELLOPERATION): - print( - block.block_name, - block.display_name, - block.command, - ) - prefect_service.delete_shell_block(block.block_id) - block.delete() - - def delete_dbt_workspace(self, org: Org): # skipcq: PYL-R0201 - """deletes the dbt workspace""" - dbt_service.delete_dbt_workspace(org) - - def delete_airbyte_workspace(self, org: Org): # skipcq: PYL-R0201 - """ - deletes airbyte sources, destinations, connections - deletes airbyte server and connection blocks in prefect - """ - print("=========== OrgPrefectBlock: Airbyte Connections ===========") - for block in OrgPrefectBlock.objects.filter( - org=org, block_type=AIRBYTECONNECTION - ): - print( - block.block_name, - block.block_type, - block.display_name, - block.dbt_target_schema, - block.command, - ) - prefect_service.delete_airbyte_connection_block(block.block_id) - block.delete() - - print("=========== OrgPrefectBlock: Airbyte Server(s) ===========") - for block in OrgPrefectBlock.objects.filter(org=org, block_type=AIRBYTESERVER): - print( - block.block_name, - block.block_type, - block.display_name, - block.dbt_target_schema, - block.command, - ) - prefect_service.delete_airbyte_server_block(block.block_id) - block.delete() - - for connection in airbyte_service.get_connections(org.airbyte_workspace_id)[ - "connections" - ]: - print("deleting connection in Airbyte " + connection["connectionId"]) - airbyte_service.delete_connection( - org.airbyte_workspace_id, connection["connectionId"] - ) - - for destination in airbyte_service.get_destinations(org.airbyte_workspace_id)[ - "destinations" - ]: - print("deleting destination in Airbyte " + destination["destinationId"]) - airbyte_service.delete_destination( - org.airbyte_workspace_id, destination["destinationId"] - ) - - for source in airbyte_service.get_sources(org.airbyte_workspace_id)["sources"]: - print("deleting source in Airbyte " + source["sourceId"]) - airbyte_service.delete_source(org.airbyte_workspace_id, source["sourceId"]) - - for warehouse in OrgWarehouse.objects.filter(org=org): - secretsmanager.delete_warehouse_credentials(warehouse) - warehouse.delete() - - airbyte_service.delete_workspace(org.airbyte_workspace_id) - - def delete_orgusers(self, org: Org): # skipcq: PYL-R0201 - """delete all login users""" - for orguser in OrgUser.objects.filter(org=org): - orguser.user.delete() - # this deletes the orguser as well via CASCADE - - def delete_one_org(self, org: Org, yes_really: bool): - """delete one org""" - print(f"OrgName: {org.name} Airbyte workspace ID: {org.airbyte_workspace_id}") - if yes_really: - self.delete_prefect_deployments(org) - self.delete_dbt_workspace(org) - if org.airbyte_workspace_id: - self.delete_airbyte_workspace(org) - self.delete_prefect_shell_blocks(org) - self.delete_orgusers(org) - org.delete() - def handle(self, *args, **options): """Docstring""" if options["org_name"] == "ALL": @@ -135,4 +28,4 @@ def handle(self, *args, **options): print("no such org") return - self.delete_one_org(org, options["yes_really"]) + delete_one_org(org, options["yes_really"]) diff --git a/ddpui/tests/unit_integration_tests/test_utils.py b/ddpui/tests/unit_integration_tests/test_utils.py new file mode 100644 index 00000000..48280f04 --- /dev/null +++ b/ddpui/tests/unit_integration_tests/test_utils.py @@ -0,0 +1,170 @@ +import os +from unittest.mock import patch, Mock +import django +import pytest + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ddpui.settings") +os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" +django.setup() + +from ddpui.models.org import Org, OrgDataFlow, OrgPrefectBlock, OrgWarehouse +from ddpui.models.org_user import OrgUser, User + +from ddpui.utils.deleteorg import ( + delete_prefect_deployments, + delete_prefect_shell_blocks, + delete_dbt_workspace, + delete_orgusers, + delete_airbyte_workspace, +) +from ddpui.ddpprefect import AIRBYTESERVER, AIRBYTECONNECTION, SHELLOPERATION + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def org_with_workspace(): + """a pytest fixture which creates an Org having an airbyte workspace""" + print("creating org_with_workspace") + org = Org.objects.create( + name="org-name", airbyte_workspace_id="FAKE-WORKSPACE-ID", slug="test-org-slug" + ) + yield org + print("deleting org_with_workspace") + org.delete() + + +@patch.multiple( + "ddpui.ddpprefect.prefect_service", + delete_deployment_by_id=Mock(), +) +def test_delete_prefect_deployments(org_with_workspace): + """ + deleting a prefect deployment should + - invoke delete_deployment_by_id + - delete the org-dataflow + """ + OrgDataFlow.objects.create(org=org_with_workspace, name="org-dataflow-name") + assert OrgDataFlow.objects.filter(org=org_with_workspace).count() == 1 + delete_prefect_deployments(org_with_workspace) + assert OrgDataFlow.objects.filter(org=org_with_workspace).count() == 0 + + +@patch.multiple( + "ddpui.ddpprefect.prefect_service", + delete_shell_block=Mock(), +) +def test_delete_prefect_shell_blocks(org_with_workspace): + """ + deleting a prefect deployment should + - invoke delete_deployment_by_id + - delete the org-dataflow + """ + OrgPrefectBlock.objects.create(org=org_with_workspace, block_type=SHELLOPERATION) + assert ( + OrgPrefectBlock.objects.filter( + org=org_with_workspace, block_type=SHELLOPERATION + ).count() + == 1 + ) + delete_prefect_shell_blocks(org_with_workspace) + assert ( + OrgPrefectBlock.objects.filter( + org=org_with_workspace, block_type=SHELLOPERATION + ).count() + == 0 + ) + + +@patch.multiple("ddpui.ddpdbt.dbt_service", delete_dbt_workspace=Mock()) +def test_delete_dbt_workspace(org_with_workspace): + """check that dbt_service.delete_dbt_workspace is called""" + delete_dbt_workspace(org_with_workspace) + + +@patch.multiple( + "ddpui.ddpprefect.prefect_service", + delete_airbyte_connection_block=Mock(), + delete_airbyte_server_block=Mock(), +) +@patch.multiple( + "ddpui.ddpairbyte.airbyte_service", + get_connections=Mock( + return_value={"connections": [{"connectionId": "fake-connection-id"}]} + ), + delete_connection=Mock(), + get_destinations=Mock( + return_value={"destinations": [{"destinationId": "fake-destination-id"}]} + ), + delete_destination=Mock(), + get_sources=Mock(return_value={"sources": [{"sourceId": "fake-source-id"}]}), + delete_source=Mock(), + delete_workspace=Mock(), +) +@patch.multiple("ddpui.utils.secretsmanager", delete_warehouse_credentials=Mock()) +def test_delete_airbyte_workspace(org_with_workspace): + """ + delete connection blocks + delete server blocks + delete connections + delete destinations + delete sources + delete warehouse credentials + delete workspace + """ + OrgPrefectBlock.objects.create( + org=org_with_workspace, + block_type=AIRBYTECONNECTION, + block_id="fake-conn-block-id", + block_name="fake-conn-block-name", + ) + OrgPrefectBlock.objects.create( + org=org_with_workspace, + block_type=AIRBYTESERVER, + block_id="fake-srvr-block-id", + block_name="fake-srvr-block-name", + ) + OrgWarehouse.objects.create(org=org_with_workspace) + assert ( + OrgPrefectBlock.objects.filter( + org=org_with_workspace, block_type=AIRBYTECONNECTION + ).count() + == 1 + ) + assert ( + OrgPrefectBlock.objects.filter( + org=org_with_workspace, block_type=AIRBYTESERVER + ).count() + == 1 + ) + assert OrgWarehouse.objects.filter(org=org_with_workspace).count() == 1 + delete_airbyte_workspace(org_with_workspace) + assert ( + OrgPrefectBlock.objects.filter( + org=org_with_workspace, block_type=AIRBYTECONNECTION + ).count() + == 0 + ) + assert ( + OrgPrefectBlock.objects.filter( + org=org_with_workspace, block_type=AIRBYTESERVER + ).count() + == 0 + ) + assert OrgWarehouse.objects.filter(org=org_with_workspace).count() == 0 + + +def test_delete_orgusers(org_with_workspace): + """ensure that orguser.user.delete is called""" + email = "fake-email" + tempuser = User.objects.create(email=email, username="fake-username") + OrgUser.objects.create(user=tempuser, org=org_with_workspace) + assert ( + OrgUser.objects.filter(user__email=email, org=org_with_workspace).count() == 1 + ) + assert User.objects.filter(email=email).count() == 1 + delete_orgusers(org_with_workspace) + assert ( + OrgUser.objects.filter(user__email=email, org=org_with_workspace).count() == 0 + ) + assert User.objects.filter(email=email).count() == 0 diff --git a/ddpui/utils/deleteorg.py b/ddpui/utils/deleteorg.py new file mode 100644 index 00000000..7323de06 --- /dev/null +++ b/ddpui/utils/deleteorg.py @@ -0,0 +1,123 @@ +from ninja.errors import HttpError +from ddpui.models.org_user import Org, OrgUser +from ddpui.models.org import OrgDataFlow, OrgPrefectBlock, OrgWarehouse +from ddpui.ddpairbyte import airbyte_service +from ddpui.ddpprefect import prefect_service +from ddpui.ddpprefect import AIRBYTESERVER, AIRBYTECONNECTION, SHELLOPERATION +from ddpui.ddpdbt import dbt_service +from ddpui.utils import secretsmanager + +from ddpui.utils.ddp_logger import logger + + +def delete_prefect_deployments(org: Org): # skipcq: PYL-R0201 + """fetches and deletes every prefect deployment for this org""" + logger.info("=========== OrgDataFlow ===========") + for dataflow in OrgDataFlow.objects.filter(org=org): + logger.info("%s %s", dataflow.deployment_id, dataflow.connection_id) + try: + prefect_service.delete_deployment_by_id(dataflow.deployment_id) + except HttpError: + pass + dataflow.delete() + + +def delete_prefect_shell_blocks(org: Org): # skipcq: PYL-R0201 + """fetches and deletes all prefect blocks for this org""" + logger.info("=========== OrgPrefectBlock: Shell Operations ===========") + for block in OrgPrefectBlock.objects.filter(org=org, block_type=SHELLOPERATION): + logger.info( + "%s %s %s", + block.block_name, + block.display_name, + block.command, + ) + prefect_service.delete_shell_block(block.block_id) + block.delete() + + +def delete_dbt_workspace(org: Org): # skipcq: PYL-R0201 + """deletes the dbt workspace""" + dbt_service.delete_dbt_workspace(org) + + +def delete_airbyte_workspace(org: Org): # skipcq: PYL-R0201 + """ + deletes airbyte sources, destinations, connections + deletes airbyte server and connection blocks in prefect + """ + logger.info("=========== OrgPrefectBlock: Airbyte Connections ===========") + for block in OrgPrefectBlock.objects.filter(org=org, block_type=AIRBYTECONNECTION): + logger.info( + "%s %s %s %s %s", + block.block_name, + block.block_type, + block.display_name, + block.dbt_target_schema, + block.command, + ) + prefect_service.delete_airbyte_connection_block(block.block_id) + block.delete() + + logger.info("=========== OrgPrefectBlock: Airbyte Server(s) ===========") + for block in OrgPrefectBlock.objects.filter(org=org, block_type=AIRBYTESERVER): + logger.info( + "%s %s %s %s %s", + block.block_name, + block.block_type, + block.display_name, + block.dbt_target_schema, + block.command, + ) + prefect_service.delete_airbyte_server_block(block.block_id) + block.delete() + + for connection in airbyte_service.get_connections(org.airbyte_workspace_id)[ + "connections" + ]: + logger.info("deleting connection in Airbyte " + connection["connectionId"]) + airbyte_service.delete_connection( + org.airbyte_workspace_id, connection["connectionId"] + ) + + for destination in airbyte_service.get_destinations(org.airbyte_workspace_id)[ + "destinations" + ]: + logger.info("deleting destination in Airbyte " + destination["destinationId"]) + airbyte_service.delete_destination( + org.airbyte_workspace_id, destination["destinationId"] + ) + + for source in airbyte_service.get_sources(org.airbyte_workspace_id)["sources"]: + logger.info("deleting source in Airbyte " + source["sourceId"]) + airbyte_service.delete_source(org.airbyte_workspace_id, source["sourceId"]) + + for warehouse in OrgWarehouse.objects.filter(org=org): + secretsmanager.delete_warehouse_credentials(warehouse) + warehouse.delete() + + airbyte_service.delete_workspace(org.airbyte_workspace_id) + + +def delete_orgusers(org: Org): # skipcq: PYL-R0201 + """delete all login users""" + for orguser in OrgUser.objects.filter(org=org): + orguser.user.delete() + # this deletes the orguser as well via CASCADE + + +def delete_one_org(org: Org, yes_really: bool): + """delete one org""" + logger.info( + "OrgName: %s Airbyte workspace ID: %s", + org.name, + org.airbyte_workspace_id, + ) + if yes_really: + delete_prefect_deployments(org) + delete_dbt_workspace(org) + if org.airbyte_workspace_id: + delete_airbyte_workspace(org) + delete_prefect_shell_blocks(org) + delete_orgusers(org) + org.delete()