Skip to content

Commit

Permalink
Merge pull request #832 from DalgoT4D/dev_logs_pagination
Browse files Browse the repository at this point in the history
[442] - Pagination added for logs
  • Loading branch information
Ishankoradia authored Sep 3, 2024
2 parents 44114f8 + dc951e7 commit 8cddd37
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 13 deletions.
13 changes: 6 additions & 7 deletions ddpui/api/pipeline_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,11 @@ def post_run_prefect_org_deployment_task(
@pipelineapi.get("flow_runs/{flow_run_id}/logs", auth=auth.CustomAuthMiddleware())
@has_permission(["can_view_pipeline"])
def get_flow_runs_logs(
request, flow_run_id, offset: int = 0
request, flow_run_id, task_run_id = '', limit: int = 0, offset: int = 0
): # pylint: disable=unused-argument
"""return the logs from a flow-run"""
try:
result = prefect_service.get_flow_run_logs(flow_run_id, offset)
result = prefect_service.get_flow_run_logs(flow_run_id, task_run_id,limit,offset)
except Exception as error:
logger.exception(error)
raise HttpError(400, "failed to retrieve logs") from error
Expand Down Expand Up @@ -724,18 +724,17 @@ def get_prefect_flow_runs_log_history(
)
@has_permission(["can_view_pipeline"])
def get_prefect_flow_runs_log_history_v1(
request, deployment_id, limit: int = 0, fetchlogs=True, offset: int = 0
request, deployment_id, limit: int = 0, offset: int = 0
):
# pylint: disable=unused-argument
"""Fetch all flow runs for the deployment and the logs for each flow run"""
flow_runs = prefect_service.get_flow_runs_by_deployment_id_v1(
deployment_id=deployment_id, limit=limit, offset=offset
)

if fetchlogs:
for flow_run in flow_runs:
logs_dict = prefect_service.get_flow_run_logs_v2(flow_run["id"])
flow_run["runs"] = logs_dict
for flow_run in flow_runs:
graph_dict = prefect_service.get_flow_run_graphs(flow_run["id"])
flow_run["runs"] = graph_dict

return flow_runs

Expand Down
22 changes: 20 additions & 2 deletions ddpui/celeryworkers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from ddpui.utils.singletaskprogress import SingleTaskProgress
from ddpui.ddpairbyte import airbyte_service, airbytehelpers
from ddpui.ddpprefect.prefect_service import (
get_flow_run_graphs,
get_flow_run_logs,
update_dbt_core_block_schema,
get_dbt_cli_profile_block,
prefect_get,
Expand All @@ -56,6 +58,7 @@
TASK_DBTCLEAN,
TASK_DBTDEPS,
TASK_AIRBYTESYNC,
FLOW_RUN_LOGS_OFFSET_LIMIT,
)
from ddpui.ddpprefect import DBTCLIPROFILE
from ddpui.core import llm_service
Expand Down Expand Up @@ -750,8 +753,8 @@ def summarize_logs(
log_file_name = ""
try:
if type == LogsSummarizationType.DEPLOYMENT:
all_task_logs = get_flow_run_logs_v2(flow_run_id)
dbt_tasks = [task for task in all_task_logs if task["id"] == task_id]
all_task = get_flow_run_graphs(flow_run_id)
dbt_tasks = [task for task in all_task if task["id"] == task_id]
if len(dbt_tasks) == 0:
taskprogress.add(
{
Expand All @@ -762,6 +765,21 @@ def summarize_logs(
)
return
task = dbt_tasks[0]
task["logs"] = []
offset = 0
while True:
new_logs_set = get_flow_run_logs(
flow_run_id, task_id, FLOW_RUN_LOGS_OFFSET_LIMIT, offset
)
task["logs"] += new_logs_set["logs"]["logs"]
if len(new_logs_set["logs"]) == FLOW_RUN_LOGS_OFFSET_LIMIT:
offset += FLOW_RUN_LOGS_OFFSET_LIMIT
elif len(new_logs_set["logs"]) < FLOW_RUN_LOGS_OFFSET_LIMIT:
break
else:
logger.info("Something weird happening in fetching logs")
break

logs_text = "\n".join([log["message"] for log in task["logs"]])
log_file_name = f"{flow_run_id}_{task_id}"
elif type == LogsSummarizationType.AIRBYTE_SYNC:
Expand Down
13 changes: 11 additions & 2 deletions ddpui/ddpprefect/prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,11 +592,13 @@ def get_deployment(deployment_id) -> dict:
return res


def get_flow_run_logs(flow_run_id: str, offset: int) -> dict: # pragma: no cover
def get_flow_run_logs(
flow_run_id: str, task_run_id: str, limit: int, offset: int
) -> dict: # pragma: no cover
"""retreive the logs from a flow-run from prefect"""
res = prefect_get(
f"flow_runs/logs/{flow_run_id}",
params={"offset": offset},
params={"offset": offset, "limit": limit, "task_run_id": task_run_id},
)
return {"logs": res}

Expand All @@ -608,6 +610,13 @@ def get_flow_run_logs_v2(flow_run_id: str) -> dict: # pragma: no cover
)
return res

def get_flow_run_graphs(flow_run_id: str) -> dict:
"""retreive the tasks from a flow-run from prefect"""
res = prefect_get(
f"flow_runs/graph/{flow_run_id}",
)
return res


def get_flow_run(flow_run_id: str) -> dict:
"""retreive the logs from a flow-run from prefect"""
Expand Down
4 changes: 2 additions & 2 deletions ddpui/tests/services/test_prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,9 @@ def test_get_deployment(mock_get: Mock):
@patch("ddpui.ddpprefect.prefect_service.prefect_get")
def test_get_flow_run_logs(mock_get: Mock):
mock_get.return_value = "the-logs"
response = get_flow_run_logs("flowrunid", 3)
response = get_flow_run_logs("flowrunid","taskrunid", 10, 3)
assert response == {"logs": "the-logs"}
mock_get.assert_called_once_with("flow_runs/logs/flowrunid", params={"offset": 3})
mock_get.assert_called_once_with("flow_runs/logs/flowrunid", params={"offset": 3, "limit": 10, "task_run_id": "taskrunid"})


@patch("ddpui.ddpprefect.prefect_service.prefect_get")
Expand Down
3 changes: 3 additions & 0 deletions ddpui/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@
SYSTEM_USER_EMAIL = "System User"

# prefect flow run states

# offset limit for fetching logs
FLOW_RUN_LOGS_OFFSET_LIMIT = 200

0 comments on commit 8cddd37

Please sign in to comment.