Skip to content

Commit

Permalink
Merge pull request #858 from hp77-creator/feature_remove_unused_prefect
Browse files Browse the repository at this point in the history
Feature remove unused prefect
  • Loading branch information
Ishankoradia authored Sep 25, 2024
2 parents 18ac822 + b9095da commit 20776a0
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 265 deletions.
122 changes: 2 additions & 120 deletions ddpui/ddpprefect/prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@
from django.db.models.functions import RowNumber

from ddpui.ddpprefect.schema import (
PrefectDbtCoreSetup,
PrefectShellSetup,
PrefectAirbyteSync,
PrefectDataFlowCreateSchema3,
PrefectDbtCore,
PrefectSecretBlockCreate,
PrefectShellTaskSetup,
PrefectDbtTaskSetup,
PrefectDataFlowUpdateSchema3,
)
from ddpui.utils.custom_logger import CustomLogger
from ddpui.models.tasks import DataflowOrgTask, TaskLock, OrgTask, OrgDataFlowv1
from ddpui.models.tasks import DataflowOrgTask, TaskLock
from ddpui.models.org_user import OrgUser
from ddpui.models.flow_runs import PrefectFlowRun
from ddpui.ddpprefect import (
Expand Down Expand Up @@ -176,15 +172,6 @@ def delete_airbyte_server_block(block_id):


# ================================================================================================
def get_airbye_connection_blocks(block_names) -> dict:
"""Filter out blocks by query params"""
response = prefect_post(
"blocks/airbyte/connection/filter",
{"block_names": block_names},
)
return response


def update_airbyte_connection_block(blockname):
"""We don't update connection blocks"""
raise Exception("not implemented")
Expand All @@ -195,101 +182,18 @@ def delete_airbyte_connection_block(block_id) -> None:
prefect_delete_a_block(block_id)


def post_prefect_blocks_bulk_delete(block_ids: list) -> dict:
"""
Delete airbyte connection blocks in prefect
corresponding the connection ids array passed
"""
response = prefect_post(
"blocks/bulk/delete/",
{"block_ids": block_ids},
)
return response


# ================================================================================================
def get_shell_block_id(blockname) -> str | None:
"""get the block_id for the shell block having this name"""
response = prefect_get(f"blocks/shell/{blockname}")
return response["block_id"]


def create_shell_block(shell: PrefectShellSetup) -> str:
"""Create a prefect shell block"""
response = prefect_post(
"blocks/shell/",
{
"blockName": shell.blockname,
"commands": shell.commands,
"env": shell.env,
"workingDir": shell.workingDir,
},
)
return response


def delete_shell_block(block_id):
"""Delete a prefect shell block"""
prefect_delete_a_block(block_id)


# ================================================================================================
def get_dbtcore_block_id(blockname) -> str | None:
"""get the block_id for the dbtcore block having this name"""
response = prefect_get(f"blocks/dbtcore/{blockname}")
return response["block_id"]


def create_dbt_core_block(
dbtcore: PrefectDbtCoreSetup,
profilename: str,
cli_profile_block_name: str,
target: str,
wtype: str,
credentials: dict,
bqlocation: str,
) -> dict:
"""Create a dbt core block in prefect"""
response = prefect_post(
"blocks/dbtcore/",
{
"blockName": dbtcore.block_name,
"profile": {
"name": profilename,
"target": target,
"target_configs_schema": target,
},
"cli_profile_block_name": cli_profile_block_name,
"wtype": wtype,
"credentials": credentials,
"bqlocation": bqlocation,
"commands": dbtcore.commands,
"env": dbtcore.env,
"working_dir": dbtcore.working_dir,
"profiles_dir": dbtcore.profiles_dir,
"project_dir": dbtcore.project_dir,
},
)
return response


def delete_dbt_core_block(block_id):
"""Delete a dbt core block in prefect"""
prefect_delete_a_block(block_id)


def update_dbt_core_block_credentials(wtype: str, block_name: str, credentials: dict):
"""Update the credentials of a dbt core block in prefect"""
response = prefect_put(
f"blocks/dbtcore_edit/{wtype}/",
{
"blockName": block_name,
"credentials": credentials,
},
)
return response


def update_dbt_core_block_schema(block_name: str, target_configs_schema: str):
"""Update the schema inside a dbt core block in prefect"""
response = prefect_put(
Expand Down Expand Up @@ -390,26 +294,6 @@ def get_secret_block_by_name(blockname: str) -> dict:


# ================================================================================================
def run_airbyte_connection_sync(
run_flow: PrefectAirbyteSync,
) -> dict: # pragma: no cover
"""initiates an airbyte connection sync"""
res = prefect_post(
"flows/airbyte/connection/sync/",
json=run_flow.to_json(),
)
return res


def run_dbt_core_sync(run_flow: PrefectDbtCore) -> dict: # pragma: no cover
"""initiates a dbt job sync"""
res = prefect_post(
"flows/dbtcore/run/",
json=run_flow.to_json(),
)
return res


def run_dbt_task_sync(task: PrefectDbtTaskSetup) -> dict: # pragma: no cover
"""initiates a dbt job sync"""
res = prefect_post(
Expand Down Expand Up @@ -629,9 +513,7 @@ def get_flow_run(flow_run_id: str) -> dict:
def create_deployment_flow_run(
deployment_id: str, flow_run_params: dict = None
) -> dict: # pragma: no cover
"""
Proxy call to create a flow run for deployment.
"""
"""Proxy call to create a flow run for deployment."""
res = prefect_post(
f"deployments/{deployment_id}/flow_run",
flow_run_params if flow_run_params else {},
Expand Down
145 changes: 0 additions & 145 deletions ddpui/tests/services/test_prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,16 @@
prefect_delete_a_block,
HttpError,
get_airbyte_server_block_id,
get_airbye_connection_blocks,
update_airbyte_server_block,
update_airbyte_connection_block,
get_dbtcore_block_id,
create_airbyte_server_block,
delete_airbyte_server_block,
delete_airbyte_connection_block,
post_prefect_blocks_bulk_delete,
get_shell_block_id,
PrefectDbtCoreSetup,
create_dbt_core_block,
delete_dbt_core_block,
PrefectSecretBlockCreate,
create_secret_block,
delete_secret_block,
update_dbt_core_block_credentials,
update_dbt_core_block_schema,
run_airbyte_connection_sync,
PrefectAirbyteSync,
run_dbt_core_sync,
PrefectDbtCore,
get_flow_runs_by_deployment_id,
set_deployment_schedule,
get_filtered_deployments,
Expand Down Expand Up @@ -274,17 +263,6 @@ def test_delete_airbyte_server_block(mock_delete: Mock):


# =============================================================================
@patch("ddpui.ddpprefect.prefect_service.prefect_post")
def test_get_airbye_connection_blocks(mock_post: Mock):
mock_post.return_value = "blocks-blocks-blocks"
response = get_airbye_connection_blocks(["blockname1", "blockname2"])
assert response == "blocks-blocks-blocks"
mock_post.assert_called_once_with(
"blocks/airbyte/connection/filter",
{"block_names": ["blockname1", "blockname2"]},
)


def test_update_airbyte_connection_block():
with pytest.raises(Exception) as excinfo:
update_airbyte_connection_block("blockname")
Expand All @@ -297,100 +275,13 @@ def test_delete_airbyte_connection_block(mock_delete: Mock):
mock_delete.assert_called_once_with("blockid")


@patch("ddpui.ddpprefect.prefect_service.prefect_post")
def test_post_prefect_blocks_bulk_delete(mock_post: Mock):
mock_post.return_value = "retval"
response = post_prefect_blocks_bulk_delete([1, 2, 3])
assert response == "retval"
mock_post.assert_called_once_with("blocks/bulk/delete/", {"block_ids": [1, 2, 3]})


# =============================================================================
@patch("ddpui.ddpprefect.prefect_service.prefect_get")
def test_get_shell_block_id(mock_get: Mock):
mock_get.return_value = {"block_id": "theblockid"}
response = get_shell_block_id("blockname")
assert response == "theblockid"
mock_get.assert_called_once_with("blocks/shell/blockname")


# =============================================================================
@patch("ddpui.ddpprefect.prefect_service.prefect_get")
def test_get_dbtcore_block_id(mock_get: Mock):
mock_get.return_value = {"block_id": "theblockid"}
response = get_dbtcore_block_id("blockname")
assert response == "theblockid"
mock_get.assert_called_once_with("blocks/dbtcore/blockname")


@patch("ddpui.ddpprefect.prefect_service.prefect_post")
def test_create_dbt_core_block(mock_post: Mock):
mock_post.return_value = "retval"
dbtcore = PrefectDbtCoreSetup(
block_name="theblockname",
working_dir="/working/dir",
profiles_dir="/profiles/dir",
project_dir="/project/dir",
env={"ekey": "eval"},
commands=["c1", "c2"],
)
response = create_dbt_core_block(
dbtcore,
"profilename",
"cli_profile_block_name",
"target",
"wtype",
credentials={"c1": "c2"},
bqlocation=None,
)
assert response == "retval"
mock_post.assert_called_once_with(
"blocks/dbtcore/",
{
"blockName": dbtcore.block_name,
"profile": {
"name": "profilename",
"target": "target",
"target_configs_schema": "target",
},
"wtype": "wtype",
"credentials": {"c1": "c2"},
"cli_profile_block_name": "cli_profile_block_name",
"bqlocation": None,
"commands": dbtcore.commands,
"env": dbtcore.env,
"working_dir": dbtcore.working_dir,
"profiles_dir": dbtcore.profiles_dir,
"project_dir": dbtcore.project_dir,
},
)


@patch("ddpui.ddpprefect.prefect_service.prefect_delete_a_block")
def test_delete_dbt_core_block(mock_delete: Mock):
delete_dbt_core_block("blockid")
mock_delete.assert_called_once_with("blockid")


@patch("ddpui.ddpprefect.prefect_service.prefect_put")
def test_update_dbt_core_block_credentials(mock_put: Mock):
mock_put.return_value = "retval"
response = update_dbt_core_block_credentials(
"wtype",
"block_name",
{"c1": "c2"},
)

assert response == "retval"
mock_put.assert_called_once_with(
"blocks/dbtcore_edit/wtype/",
{
"blockName": "block_name",
"credentials": {"c1": "c2"},
},
)


@patch("ddpui.ddpprefect.prefect_service.prefect_put")
def test_update_dbt_core_block_schema(mock_put: Mock):
mock_put.return_value = "retval"
Expand Down Expand Up @@ -455,42 +346,6 @@ def test_delete_secret_block(mock_delete: Mock):


# =============================================================================
@patch("ddpui.ddpprefect.prefect_service.prefect_post")
def test_run_airbyte_connection_sync(mock_post: Mock):
run_flow = PrefectAirbyteSync(
blockName="block-name", flowName="flow-name", flowRunName="flow-run-name"
)
mock_post.return_value = "retval"
response = run_airbyte_connection_sync(run_flow)
assert response == "retval"
mock_post.assert_called_once_with(
"flows/airbyte/connection/sync/",
json={
"blockName": "block-name",
"flowName": "flow-name",
"flowRunName": "flow-run-name",
},
)


@patch("ddpui.ddpprefect.prefect_service.prefect_post")
def test_run_dbt_core_sync(mock_post: Mock):
run_flow = PrefectDbtCore(
blockName="block-name", flowName="flow-name", flowRunName="flow-run-name"
)
mock_post.return_value = "retval"
response = run_dbt_core_sync(run_flow)
assert response == "retval"
mock_post.assert_called_once_with(
"flows/dbtcore/run/",
json={
"blockName": "block-name",
"flowName": "flow-name",
"flowRunName": "flow-run-name",
},
)


@patch("ddpui.ddpprefect.prefect_service.prefect_get")
def test_get_flow_runs_by_deployment_id_limit(mock_get: Mock):
mock_get.return_value = {"flow_runs": []}
Expand Down

0 comments on commit 20776a0

Please sign in to comment.