From 88ef3d6c15c26741d8a7fb05c65b04e0feef3f6d Mon Sep 17 00:00:00 2001 From: Himanshu Pandey <24816726+hp77-creator@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:34:30 +0530 Subject: [PATCH 1/7] Remove 3,5,10,11 endpoint from prefect-proxy endpoints excel --- ddpui/ddpprefect/prefect_service.py | 74 -------------- ddpui/tests/services/test_prefect_service.py | 100 ------------------- 2 files changed, 174 deletions(-) diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 4f69c9c1..96a2404a 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -173,15 +173,6 @@ def delete_airbyte_server_block(block_id): # ================================================================================================ -def get_airbye_connection_blocks(block_names) -> dict: - """Filter out blocks by query params""" - response = prefect_post( - "blocks/airbyte/connection/filter", - {"block_names": block_names}, - ) - return response - - def update_airbyte_connection_block(blockname): """We don't update connection blocks""" raise Exception("not implemented") @@ -192,18 +183,6 @@ def delete_airbyte_connection_block(block_id) -> None: prefect_delete_a_block(block_id) -def post_prefect_blocks_bulk_delete(block_ids: list) -> dict: - """ - Delete airbyte connection blocks in prefect - corresponding the connection ids array passed - """ - response = prefect_post( - "blocks/bulk/delete/", - {"block_ids": block_ids}, - ) - return response - - # ================================================================================================ def get_shell_block_id(blockname) -> str | None: """get the block_id for the shell block having this name""" @@ -237,39 +216,6 @@ def get_dbtcore_block_id(blockname) -> str | None: return response["block_id"] -def create_dbt_core_block( - dbtcore: PrefectDbtCoreSetup, - profilename: str, - cli_profile_block_name: str, - target: str, - wtype: str, - credentials: dict, - bqlocation: str, -) -> dict: - """Create a dbt core block in prefect""" - response = prefect_post( - "blocks/dbtcore/", - { - "blockName": dbtcore.block_name, - "profile": { - "name": profilename, - "target": target, - "target_configs_schema": target, - }, - "cli_profile_block_name": cli_profile_block_name, - "wtype": wtype, - "credentials": credentials, - "bqlocation": bqlocation, - "commands": dbtcore.commands, - "env": dbtcore.env, - "working_dir": dbtcore.working_dir, - "profiles_dir": dbtcore.profiles_dir, - "project_dir": dbtcore.project_dir, - }, - ) - return response - - def delete_dbt_core_block(block_id): """Delete a dbt core block in prefect""" prefect_delete_a_block(block_id) @@ -387,26 +333,6 @@ def get_secret_block_by_name(blockname: str) -> dict: # ================================================================================================ -def run_airbyte_connection_sync( - run_flow: PrefectAirbyteSync, -) -> dict: # pragma: no cover - """initiates an airbyte connection sync""" - res = prefect_post( - "flows/airbyte/connection/sync/", - json=run_flow.to_json(), - ) - return res - - -def run_dbt_core_sync(run_flow: PrefectDbtCore) -> dict: # pragma: no cover - """initiates a dbt job sync""" - res = prefect_post( - "flows/dbtcore/run/", - json=run_flow.to_json(), - ) - return res - - def run_dbt_task_sync(task: PrefectDbtTaskSetup) -> dict: # pragma: no cover """initiates a dbt job sync""" res = prefect_post( diff --git a/ddpui/tests/services/test_prefect_service.py b/ddpui/tests/services/test_prefect_service.py index b3714c6a..7b6ba775 100644 --- a/ddpui/tests/services/test_prefect_service.py +++ b/ddpui/tests/services/test_prefect_service.py @@ -17,14 +17,12 @@ prefect_delete_a_block, HttpError, get_airbyte_server_block_id, - get_airbye_connection_blocks, update_airbyte_server_block, update_airbyte_connection_block, get_dbtcore_block_id, create_airbyte_server_block, delete_airbyte_server_block, delete_airbyte_connection_block, - post_prefect_blocks_bulk_delete, get_shell_block_id, PrefectDbtCoreSetup, create_dbt_core_block, @@ -273,17 +271,6 @@ def test_delete_airbyte_server_block(mock_delete: Mock): # ============================================================================= -@patch("ddpui.ddpprefect.prefect_service.prefect_post") -def test_get_airbye_connection_blocks(mock_post: Mock): - mock_post.return_value = "blocks-blocks-blocks" - response = get_airbye_connection_blocks(["blockname1", "blockname2"]) - assert response == "blocks-blocks-blocks" - mock_post.assert_called_once_with( - "blocks/airbyte/connection/filter", - {"block_names": ["blockname1", "blockname2"]}, - ) - - def test_update_airbyte_connection_block(): with pytest.raises(Exception) as excinfo: update_airbyte_connection_block("blockname") @@ -296,14 +283,6 @@ def test_delete_airbyte_connection_block(mock_delete: Mock): mock_delete.assert_called_once_with("blockid") -@patch("ddpui.ddpprefect.prefect_service.prefect_post") -def test_post_prefect_blocks_bulk_delete(mock_post: Mock): - mock_post.return_value = "retval" - response = post_prefect_blocks_bulk_delete([1, 2, 3]) - assert response == "retval" - mock_post.assert_called_once_with("blocks/bulk/delete/", {"block_ids": [1, 2, 3]}) - - # ============================================================================= @patch("ddpui.ddpprefect.prefect_service.prefect_get") def test_get_shell_block_id(mock_get: Mock): @@ -322,49 +301,6 @@ def test_get_dbtcore_block_id(mock_get: Mock): mock_get.assert_called_once_with("blocks/dbtcore/blockname") -@patch("ddpui.ddpprefect.prefect_service.prefect_post") -def test_create_dbt_core_block(mock_post: Mock): - mock_post.return_value = "retval" - dbtcore = PrefectDbtCoreSetup( - block_name="theblockname", - working_dir="/working/dir", - profiles_dir="/profiles/dir", - project_dir="/project/dir", - env={"ekey": "eval"}, - commands=["c1", "c2"], - ) - response = create_dbt_core_block( - dbtcore, - "profilename", - "cli_profile_block_name", - "target", - "wtype", - credentials={"c1": "c2"}, - bqlocation=None, - ) - assert response == "retval" - mock_post.assert_called_once_with( - "blocks/dbtcore/", - { - "blockName": dbtcore.block_name, - "profile": { - "name": "profilename", - "target": "target", - "target_configs_schema": "target", - }, - "wtype": "wtype", - "credentials": {"c1": "c2"}, - "cli_profile_block_name": "cli_profile_block_name", - "bqlocation": None, - "commands": dbtcore.commands, - "env": dbtcore.env, - "working_dir": dbtcore.working_dir, - "profiles_dir": dbtcore.profiles_dir, - "project_dir": dbtcore.project_dir, - }, - ) - - @patch("ddpui.ddpprefect.prefect_service.prefect_delete_a_block") def test_delete_dbt_core_block(mock_delete: Mock): delete_dbt_core_block("blockid") @@ -454,42 +390,6 @@ def test_delete_secret_block(mock_delete: Mock): # ============================================================================= -@patch("ddpui.ddpprefect.prefect_service.prefect_post") -def test_run_airbyte_connection_sync(mock_post: Mock): - run_flow = PrefectAirbyteSync( - blockName="block-name", flowName="flow-name", flowRunName="flow-run-name" - ) - mock_post.return_value = "retval" - response = run_airbyte_connection_sync(run_flow) - assert response == "retval" - mock_post.assert_called_once_with( - "flows/airbyte/connection/sync/", - json={ - "blockName": "block-name", - "flowName": "flow-name", - "flowRunName": "flow-run-name", - }, - ) - - -@patch("ddpui.ddpprefect.prefect_service.prefect_post") -def test_run_dbt_core_sync(mock_post: Mock): - run_flow = PrefectDbtCore( - blockName="block-name", flowName="flow-name", flowRunName="flow-run-name" - ) - mock_post.return_value = "retval" - response = run_dbt_core_sync(run_flow) - assert response == "retval" - mock_post.assert_called_once_with( - "flows/dbtcore/run/", - json={ - "blockName": "block-name", - "flowName": "flow-name", - "flowRunName": "flow-run-name", - }, - ) - - @patch("ddpui.ddpprefect.prefect_service.prefect_get") def test_get_flow_runs_by_deployment_id_limit(mock_get: Mock): mock_get.return_value = {"flow_runs": []} From cd3f04044f588fb1fc020bcaa2d35efc79913138 Mon Sep 17 00:00:00 2001 From: Himanshu Pandey <24816726+hp77-creator@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:37:49 +0530 Subject: [PATCH 2/7] Remove update dbtcore_edit endpoint --- ddpui/ddpprefect/prefect_service.py | 12 ------------ ddpui/tests/services/test_prefect_service.py | 19 ------------------- 2 files changed, 31 deletions(-) diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 96a2404a..59a053dc 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -221,18 +221,6 @@ def delete_dbt_core_block(block_id): prefect_delete_a_block(block_id) -def update_dbt_core_block_credentials(wtype: str, block_name: str, credentials: dict): - """Update the credentials of a dbt core block in prefect""" - response = prefect_put( - f"blocks/dbtcore_edit/{wtype}/", - { - "blockName": block_name, - "credentials": credentials, - }, - ) - return response - - def update_dbt_core_block_schema(block_name: str, target_configs_schema: str): """Update the schema inside a dbt core block in prefect""" response = prefect_put( diff --git a/ddpui/tests/services/test_prefect_service.py b/ddpui/tests/services/test_prefect_service.py index 7b6ba775..b1e09598 100644 --- a/ddpui/tests/services/test_prefect_service.py +++ b/ddpui/tests/services/test_prefect_service.py @@ -307,25 +307,6 @@ def test_delete_dbt_core_block(mock_delete: Mock): mock_delete.assert_called_once_with("blockid") -@patch("ddpui.ddpprefect.prefect_service.prefect_put") -def test_update_dbt_core_block_credentials(mock_put: Mock): - mock_put.return_value = "retval" - response = update_dbt_core_block_credentials( - "wtype", - "block_name", - {"c1": "c2"}, - ) - - assert response == "retval" - mock_put.assert_called_once_with( - "blocks/dbtcore_edit/wtype/", - { - "blockName": "block_name", - "credentials": {"c1": "c2"}, - }, - ) - - @patch("ddpui.ddpprefect.prefect_service.prefect_put") def test_update_dbt_core_block_schema(mock_put: Mock): mock_put.return_value = "retval" From 9d75f03b5a04203faeb31fb6524b80ad5c28a06a Mon Sep 17 00:00:00 2001 From: Himanshu Pandey <24816726+hp77-creator@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:45:51 +0530 Subject: [PATCH 3/7] remove GET /blocks/shell/{blockname} --- ddpui/ddpprefect/prefect_service.py | 6 ------ ddpui/tests/services/test_prefect_service.py | 14 -------------- 2 files changed, 20 deletions(-) diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 59a053dc..1ed91529 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -184,12 +184,6 @@ def delete_airbyte_connection_block(block_id) -> None: # ================================================================================================ -def get_shell_block_id(blockname) -> str | None: - """get the block_id for the shell block having this name""" - response = prefect_get(f"blocks/shell/{blockname}") - return response["block_id"] - - def create_shell_block(shell: PrefectShellSetup) -> str: """Create a prefect shell block""" response = prefect_post( diff --git a/ddpui/tests/services/test_prefect_service.py b/ddpui/tests/services/test_prefect_service.py index b1e09598..9d5663ca 100644 --- a/ddpui/tests/services/test_prefect_service.py +++ b/ddpui/tests/services/test_prefect_service.py @@ -23,18 +23,13 @@ create_airbyte_server_block, delete_airbyte_server_block, delete_airbyte_connection_block, - get_shell_block_id, PrefectDbtCoreSetup, - create_dbt_core_block, delete_dbt_core_block, PrefectSecretBlockCreate, create_secret_block, delete_secret_block, - update_dbt_core_block_credentials, update_dbt_core_block_schema, - run_airbyte_connection_sync, PrefectAirbyteSync, - run_dbt_core_sync, PrefectDbtCore, get_flow_runs_by_deployment_id, set_deployment_schedule, @@ -283,15 +278,6 @@ def test_delete_airbyte_connection_block(mock_delete: Mock): mock_delete.assert_called_once_with("blockid") -# ============================================================================= -@patch("ddpui.ddpprefect.prefect_service.prefect_get") -def test_get_shell_block_id(mock_get: Mock): - mock_get.return_value = {"block_id": "theblockid"} - response = get_shell_block_id("blockname") - assert response == "theblockid" - mock_get.assert_called_once_with("blocks/shell/blockname") - - # ============================================================================= @patch("ddpui.ddpprefect.prefect_service.prefect_get") def test_get_dbtcore_block_id(mock_get: Mock): From a91a6c6222cc5d8aec16733abd9cf158086a0ba4 Mon Sep 17 00:00:00 2001 From: Himanshu Pandey <24816726+hp77-creator@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:46:56 +0530 Subject: [PATCH 4/7] remove GET /blocks/dbtcore/{blockname} --- ddpui/ddpprefect/prefect_service.py | 6 ------ ddpui/tests/services/test_prefect_service.py | 8 -------- 2 files changed, 14 deletions(-) diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 1ed91529..ab0864e0 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -204,12 +204,6 @@ def delete_shell_block(block_id): # ================================================================================================ -def get_dbtcore_block_id(blockname) -> str | None: - """get the block_id for the dbtcore block having this name""" - response = prefect_get(f"blocks/dbtcore/{blockname}") - return response["block_id"] - - def delete_dbt_core_block(block_id): """Delete a dbt core block in prefect""" prefect_delete_a_block(block_id) diff --git a/ddpui/tests/services/test_prefect_service.py b/ddpui/tests/services/test_prefect_service.py index 9d5663ca..3e58d31a 100644 --- a/ddpui/tests/services/test_prefect_service.py +++ b/ddpui/tests/services/test_prefect_service.py @@ -279,14 +279,6 @@ def test_delete_airbyte_connection_block(mock_delete: Mock): # ============================================================================= -@patch("ddpui.ddpprefect.prefect_service.prefect_get") -def test_get_dbtcore_block_id(mock_get: Mock): - mock_get.return_value = {"block_id": "theblockid"} - response = get_dbtcore_block_id("blockname") - assert response == "theblockid" - mock_get.assert_called_once_with("blocks/dbtcore/blockname") - - @patch("ddpui.ddpprefect.prefect_service.prefect_delete_a_block") def test_delete_dbt_core_block(mock_delete: Mock): delete_dbt_core_block("blockid") From cc8c79c4e4e2f1079dd63776ada1cb34607b19de Mon Sep 17 00:00:00 2001 From: Himanshu Pandey <24816726+hp77-creator@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:48:56 +0530 Subject: [PATCH 5/7] remove unused imports --- ddpui/ddpprefect/prefect_service.py | 5 +---- ddpui/tests/services/test_prefect_service.py | 4 ---- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index ab0864e0..954aea74 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -8,18 +8,15 @@ from django.db.models.functions import RowNumber from ddpui.ddpprefect.schema import ( - PrefectDbtCoreSetup, PrefectShellSetup, - PrefectAirbyteSync, PrefectDataFlowCreateSchema3, - PrefectDbtCore, PrefectSecretBlockCreate, PrefectShellTaskSetup, PrefectDbtTaskSetup, PrefectDataFlowUpdateSchema3, ) from ddpui.utils.custom_logger import CustomLogger -from ddpui.models.tasks import DataflowOrgTask, TaskLock, OrgTask, OrgDataFlowv1 +from ddpui.models.tasks import DataflowOrgTask, TaskLock from ddpui.models.org_user import OrgUser from ddpui.models.flow_runs import PrefectFlowRun from ddpui.ddpprefect import ( diff --git a/ddpui/tests/services/test_prefect_service.py b/ddpui/tests/services/test_prefect_service.py index 3e58d31a..680ef8d7 100644 --- a/ddpui/tests/services/test_prefect_service.py +++ b/ddpui/tests/services/test_prefect_service.py @@ -19,18 +19,14 @@ get_airbyte_server_block_id, update_airbyte_server_block, update_airbyte_connection_block, - get_dbtcore_block_id, create_airbyte_server_block, delete_airbyte_server_block, delete_airbyte_connection_block, - PrefectDbtCoreSetup, delete_dbt_core_block, PrefectSecretBlockCreate, create_secret_block, delete_secret_block, update_dbt_core_block_schema, - PrefectAirbyteSync, - PrefectDbtCore, get_flow_runs_by_deployment_id, set_deployment_schedule, get_filtered_deployments, From 64f090e72bf844c30f588cb01a174b02db90d4d2 Mon Sep 17 00:00:00 2001 From: Himanshu Pandey <24816726+hp77-creator@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:55:08 +0530 Subject: [PATCH 6/7] remove POST /blocks/shell --- ddpui/ddpprefect/prefect_service.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 954aea74..49567165 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -181,20 +181,6 @@ def delete_airbyte_connection_block(block_id) -> None: # ================================================================================================ -def create_shell_block(shell: PrefectShellSetup) -> str: - """Create a prefect shell block""" - response = prefect_post( - "blocks/shell/", - { - "blockName": shell.blockname, - "commands": shell.commands, - "env": shell.env, - "workingDir": shell.workingDir, - }, - ) - return response - - def delete_shell_block(block_id): """Delete a prefect shell block""" prefect_delete_a_block(block_id) From b9095da1216d813d0b705eff07065bac3a329584 Mon Sep 17 00:00:00 2001 From: Himanshu Pandey <24816726+hp77-creator@users.noreply.github.com> Date: Mon, 16 Sep 2024 12:08:48 +0530 Subject: [PATCH 7/7] fix deepsource errors --- ddpui/ddpprefect/prefect_service.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 49567165..bfe435d6 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -8,7 +8,6 @@ from django.db.models.functions import RowNumber from ddpui.ddpprefect.schema import ( - PrefectShellSetup, PrefectDataFlowCreateSchema3, PrefectSecretBlockCreate, PrefectShellTaskSetup, @@ -493,6 +492,7 @@ def get_flow_run_logs_v2(flow_run_id: str) -> dict: # pragma: no cover ) return res + def get_flow_run_graphs(flow_run_id: str) -> dict: """retreive the tasks from a flow-run from prefect""" res = prefect_get( @@ -510,9 +510,7 @@ def get_flow_run(flow_run_id: str) -> dict: def create_deployment_flow_run( deployment_id: str, flow_run_params: dict = None ) -> dict: # pragma: no cover - """ - Proxy call to create a flow run for deployment. - """ + """Proxy call to create a flow run for deployment.""" res = prefect_post( f"deployments/{deployment_id}/flow_run", flow_run_params if flow_run_params else {},