Skip to content

Commit

Permalink
Merge pull request #224 from DevDataPlatform/223-move-deleteorg-funct…
Browse files Browse the repository at this point in the history
…ions-into-a-module

moved deletion functions into a module
  • Loading branch information
fatchat authored Jul 10, 2023
2 parents 9ba4782 + cda5333 commit b2e6a2b
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 110 deletions.
113 changes: 3 additions & 110 deletions ddpui/management/commands/deleteorg.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from django.core.management.base import BaseCommand

from ddpui.models.org_user import Org, OrgUser
from ddpui.models.org import OrgDataFlow, OrgPrefectBlock, OrgWarehouse
from ddpui.ddpairbyte import airbyte_service
from ddpui.ddpprefect import prefect_service
from ddpui.ddpprefect import AIRBYTESERVER, AIRBYTECONNECTION, SHELLOPERATION, DBTCORE
from ddpui.ddpdbt import dbt_service
from ddpui.utils import secretsmanager
from ddpui.models.org_user import Org
from ddpui.utils.deleteorg import delete_one_org


class Command(BaseCommand):
Expand All @@ -22,108 +17,6 @@ def add_arguments(self, parser): # skipcq: PYL-R0201
parser.add_argument("--org-name", required=True)
parser.add_argument("--yes-really", action="store_true")

def delete_prefect_deployments(self, org: Org): # skipcq: PYL-R0201
"""fetches and deletes every prefect deployment for this org"""
print("=========== OrgDataFlow ===========")
for dataflow in OrgDataFlow.objects.filter(org=org):
print(dataflow.deployment_id, dataflow.connection_id)
try:
prefect_service.delete_deployment_by_id(dataflow.deployment_id)
except Exception:
pass
dataflow.delete()

def delete_prefect_shell_blocks(self, org: Org): # skipcq: PYL-R0201
"""fetches and deletes all prefect blocks for this org"""
print("=========== OrgPrefectBlock: Shell Operations ===========")
for block in OrgPrefectBlock.objects.filter(org=org, block_type=SHELLOPERATION):
print(
block.block_name,
block.display_name,
block.command,
)
prefect_service.delete_shell_block(block.block_id)
block.delete()

def delete_dbt_workspace(self, org: Org): # skipcq: PYL-R0201
"""deletes the dbt workspace"""
dbt_service.delete_dbt_workspace(org)

def delete_airbyte_workspace(self, org: Org): # skipcq: PYL-R0201
"""
deletes airbyte sources, destinations, connections
deletes airbyte server and connection blocks in prefect
"""
print("=========== OrgPrefectBlock: Airbyte Connections ===========")
for block in OrgPrefectBlock.objects.filter(
org=org, block_type=AIRBYTECONNECTION
):
print(
block.block_name,
block.block_type,
block.display_name,
block.dbt_target_schema,
block.command,
)
prefect_service.delete_airbyte_connection_block(block.block_id)
block.delete()

print("=========== OrgPrefectBlock: Airbyte Server(s) ===========")
for block in OrgPrefectBlock.objects.filter(org=org, block_type=AIRBYTESERVER):
print(
block.block_name,
block.block_type,
block.display_name,
block.dbt_target_schema,
block.command,
)
prefect_service.delete_airbyte_server_block(block.block_id)
block.delete()

for connection in airbyte_service.get_connections(org.airbyte_workspace_id)[
"connections"
]:
print("deleting connection in Airbyte " + connection["connectionId"])
airbyte_service.delete_connection(
org.airbyte_workspace_id, connection["connectionId"]
)

for destination in airbyte_service.get_destinations(org.airbyte_workspace_id)[
"destinations"
]:
print("deleting destination in Airbyte " + destination["destinationId"])
airbyte_service.delete_destination(
org.airbyte_workspace_id, destination["destinationId"]
)

for source in airbyte_service.get_sources(org.airbyte_workspace_id)["sources"]:
print("deleting source in Airbyte " + source["sourceId"])
airbyte_service.delete_source(org.airbyte_workspace_id, source["sourceId"])

for warehouse in OrgWarehouse.objects.filter(org=org):
secretsmanager.delete_warehouse_credentials(warehouse)
warehouse.delete()

airbyte_service.delete_workspace(org.airbyte_workspace_id)

def delete_orgusers(self, org: Org): # skipcq: PYL-R0201
"""delete all login users"""
for orguser in OrgUser.objects.filter(org=org):
orguser.user.delete()
# this deletes the orguser as well via CASCADE

def delete_one_org(self, org: Org, yes_really: bool):
"""delete one org"""
print(f"OrgName: {org.name} Airbyte workspace ID: {org.airbyte_workspace_id}")
if yes_really:
self.delete_prefect_deployments(org)
self.delete_dbt_workspace(org)
if org.airbyte_workspace_id:
self.delete_airbyte_workspace(org)
self.delete_prefect_shell_blocks(org)
self.delete_orgusers(org)
org.delete()

def handle(self, *args, **options):
"""Docstring"""
if options["org_name"] == "ALL":
Expand All @@ -135,4 +28,4 @@ def handle(self, *args, **options):
print("no such org")
return

self.delete_one_org(org, options["yes_really"])
delete_one_org(org, options["yes_really"])
170 changes: 170 additions & 0 deletions ddpui/tests/unit_integration_tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import os
from unittest.mock import patch, Mock
import django
import pytest

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ddpui.settings")
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
django.setup()

from ddpui.models.org import Org, OrgDataFlow, OrgPrefectBlock, OrgWarehouse
from ddpui.models.org_user import OrgUser, User

from ddpui.utils.deleteorg import (
delete_prefect_deployments,
delete_prefect_shell_blocks,
delete_dbt_workspace,
delete_orgusers,
delete_airbyte_workspace,
)
from ddpui.ddpprefect import AIRBYTESERVER, AIRBYTECONNECTION, SHELLOPERATION

pytestmark = pytest.mark.django_db


@pytest.fixture
def org_with_workspace():
"""a pytest fixture which creates an Org having an airbyte workspace"""
print("creating org_with_workspace")
org = Org.objects.create(
name="org-name", airbyte_workspace_id="FAKE-WORKSPACE-ID", slug="test-org-slug"
)
yield org
print("deleting org_with_workspace")
org.delete()


@patch.multiple(
"ddpui.ddpprefect.prefect_service",
delete_deployment_by_id=Mock(),
)
def test_delete_prefect_deployments(org_with_workspace):
"""
deleting a prefect deployment should
- invoke delete_deployment_by_id
- delete the org-dataflow
"""
OrgDataFlow.objects.create(org=org_with_workspace, name="org-dataflow-name")
assert OrgDataFlow.objects.filter(org=org_with_workspace).count() == 1
delete_prefect_deployments(org_with_workspace)
assert OrgDataFlow.objects.filter(org=org_with_workspace).count() == 0


@patch.multiple(
"ddpui.ddpprefect.prefect_service",
delete_shell_block=Mock(),
)
def test_delete_prefect_shell_blocks(org_with_workspace):
"""
deleting a prefect deployment should
- invoke delete_deployment_by_id
- delete the org-dataflow
"""
OrgPrefectBlock.objects.create(org=org_with_workspace, block_type=SHELLOPERATION)
assert (
OrgPrefectBlock.objects.filter(
org=org_with_workspace, block_type=SHELLOPERATION
).count()
== 1
)
delete_prefect_shell_blocks(org_with_workspace)
assert (
OrgPrefectBlock.objects.filter(
org=org_with_workspace, block_type=SHELLOPERATION
).count()
== 0
)


@patch.multiple("ddpui.ddpdbt.dbt_service", delete_dbt_workspace=Mock())
def test_delete_dbt_workspace(org_with_workspace):
"""check that dbt_service.delete_dbt_workspace is called"""
delete_dbt_workspace(org_with_workspace)


@patch.multiple(
"ddpui.ddpprefect.prefect_service",
delete_airbyte_connection_block=Mock(),
delete_airbyte_server_block=Mock(),
)
@patch.multiple(
"ddpui.ddpairbyte.airbyte_service",
get_connections=Mock(
return_value={"connections": [{"connectionId": "fake-connection-id"}]}
),
delete_connection=Mock(),
get_destinations=Mock(
return_value={"destinations": [{"destinationId": "fake-destination-id"}]}
),
delete_destination=Mock(),
get_sources=Mock(return_value={"sources": [{"sourceId": "fake-source-id"}]}),
delete_source=Mock(),
delete_workspace=Mock(),
)
@patch.multiple("ddpui.utils.secretsmanager", delete_warehouse_credentials=Mock())
def test_delete_airbyte_workspace(org_with_workspace):
"""
delete connection blocks
delete server blocks
delete connections
delete destinations
delete sources
delete warehouse credentials
delete workspace
"""
OrgPrefectBlock.objects.create(
org=org_with_workspace,
block_type=AIRBYTECONNECTION,
block_id="fake-conn-block-id",
block_name="fake-conn-block-name",
)
OrgPrefectBlock.objects.create(
org=org_with_workspace,
block_type=AIRBYTESERVER,
block_id="fake-srvr-block-id",
block_name="fake-srvr-block-name",
)
OrgWarehouse.objects.create(org=org_with_workspace)
assert (
OrgPrefectBlock.objects.filter(
org=org_with_workspace, block_type=AIRBYTECONNECTION
).count()
== 1
)
assert (
OrgPrefectBlock.objects.filter(
org=org_with_workspace, block_type=AIRBYTESERVER
).count()
== 1
)
assert OrgWarehouse.objects.filter(org=org_with_workspace).count() == 1
delete_airbyte_workspace(org_with_workspace)
assert (
OrgPrefectBlock.objects.filter(
org=org_with_workspace, block_type=AIRBYTECONNECTION
).count()
== 0
)
assert (
OrgPrefectBlock.objects.filter(
org=org_with_workspace, block_type=AIRBYTESERVER
).count()
== 0
)
assert OrgWarehouse.objects.filter(org=org_with_workspace).count() == 0


def test_delete_orgusers(org_with_workspace):
"""ensure that orguser.user.delete is called"""
email = "fake-email"
tempuser = User.objects.create(email=email, username="fake-username")
OrgUser.objects.create(user=tempuser, org=org_with_workspace)
assert (
OrgUser.objects.filter(user__email=email, org=org_with_workspace).count() == 1
)
assert User.objects.filter(email=email).count() == 1
delete_orgusers(org_with_workspace)
assert (
OrgUser.objects.filter(user__email=email, org=org_with_workspace).count() == 0
)
assert User.objects.filter(email=email).count() == 0
Loading

0 comments on commit b2e6a2b

Please sign in to comment.