Skip to content

Commit

Permalink
feat(cli): add --follow flag to logs command (#731)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlemesh committed Oct 10, 2024
1 parent 80d88dc commit ab17590
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 80 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The list of contributors in alphabetical order:
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
- [Marco Vidal](https://orcid.org/0000-0002-9363-4971)
Expand Down
101 changes: 101 additions & 0 deletions reana_client/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import shlex
import sys
import time
from typing import Callable, NoReturn, Optional, List, Tuple, Union

import click
Expand Down Expand Up @@ -409,3 +410,103 @@ def output_user_friendly_logs(workflow_logs, steps):
f"Step {job_name_or_id} emitted no logs.",
msg_type="info",
)


def retrieve_workflow_logs(
workflow,
access_token,
json_format,
filters,
steps,
chosen_filters,
available_filters,
page=None,
size=None,
): # noqa: D301
"""Retrieve workflow logs."""
from reana_client.api.client import get_workflow_logs

response = get_workflow_logs(
workflow,
access_token,
steps=None if not steps else list(set(steps)),
page=page,
size=size,
)
workflow_logs = json.loads(response["logs"])
if filters:
for key, value in chosen_filters.items():
unwanted_steps = [
k
for k, v in workflow_logs["job_logs"].items()
if v[available_filters[key]] != value
]
for job_id in unwanted_steps:
del workflow_logs["job_logs"][job_id]

if json_format:
display_message(json.dumps(workflow_logs, indent=2))
sys.exit(0)
else:
from reana_client.cli.utils import output_user_friendly_logs

output_user_friendly_logs(workflow_logs, None if not steps else list(set(steps)))


def follow_workflow_logs(
workflow,
access_token,
interval,
steps,
): # noqa: D301
"""Continuously poll for workflow or job logs."""
from reana_client.api.client import get_workflow_logs, get_workflow_status

if len(steps) > 1:
display_message(
"Only one step can be followed at a time, ignoring additional steps.",
"warning",
)
step = steps[0] if steps else None

msg = f"Following logs for workflow: {workflow}"
if step:
msg += f", step: {step}"
display_message(msg, "info")

previous_logs = ""

while True:
response = get_workflow_logs(
workflow,
access_token,
steps=None if not step else [step],
).get("logs")
json_response = json.loads(response)

if step:
jobs = json_response["job_logs"]

if not jobs:
raise Exception(f"Step data not found: {step}")

job = next(iter(jobs.values())) # get values of the first job
logs = job["logs"]
status = job["status"]
else:
logs = json_response["workflow_logs"]
status = get_workflow_status(workflow, access_token).get("status")

previous_lines = previous_logs.splitlines()
new_lines = logs.splitlines()

diff = "\n".join([x for x in new_lines if x not in previous_lines])
if diff != "" and diff != "\n":
display_message(diff)

if status in ["finished", "failed", "stopped", "deleted"]:
display_message("")
display_message(f"Finished, status: {status}", "info")
return
previous_logs = logs
time.sleep(interval)
170 changes: 90 additions & 80 deletions reana_client/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
key_value_to_dict,
parse_filter_parameters,
requires_environments,
retrieve_workflow_logs,
follow_workflow_logs,
)
from reana_client.config import ERROR_MESSAGES, RUN_STATUSES, TIMECHECK
from reana_client.printer import display_message
Expand Down Expand Up @@ -886,6 +888,21 @@ def add_verbose_data_from_response(response, verbose_headers, headers, data):
multiple=True,
help="Filter job logs to include only those steps that match certain filtering criteria. Use --filter name=value pairs. Available filters are compute_backend, docker_img, status and step.",
)
@click.option(
"--follow",
"follow",
is_flag=True,
default=False,
help="Follow the logs of the of running workflow or job (similar to `tail -f`). "
"If workflow or job finishes running, the command exits.",
)
@click.option(
"-i",
"--interval",
"interval",
default=10,
help="Sleep time in seconds between log polling if log following is enabled. [default=10]",
)
@add_pagination_options
@check_connection
@click.pass_context
Expand All @@ -894,22 +911,32 @@ def workflow_logs(
workflow,
access_token,
json_format,
steps=None,
follow,
interval,
filters=None,
page=None,
size=None,
): # noqa: D301
"""Get workflow logs.
The ``logs`` command allows to retrieve logs of running workflow. Note that
only finished steps of the workflow are returned, the logs of the currently
processed step is not returned until it is finished.
The ``logs`` command allows to retrieve logs of a running workflow.
Either retrive logs and print the result or follow the logs of a running workflow/job.
Examples:\n
\t $ reana-client logs -w myanalysis.42
\t $ reana-client logs -w myanalysis.42 -s 1st_step
\t $ reana-client logs -w myanalysis.42\n
\t $ reana-client logs -w myanalysis.42 --json\n
\t $ reana-client logs -w myanalysis.42 --filter status=running\n
\t $ reana-client logs -w myanalysis.42 --filter step=1st_step --follow\n
"""
from reana_client.api.client import get_workflow_logs
logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
for p in ctx.params:
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))

if json_format and follow:
display_message(
"Ignoring --json as it cannot be used together with --follow.",
msg_type="warning",
)

available_filters = {
"step": "job_name",
Expand All @@ -920,90 +947,73 @@ def workflow_logs(
steps = []
chosen_filters = dict()

logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
for p in ctx.params:
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))
if workflow:
if filters:
try:
for f in filters:
key, value = f.split("=")
if key not in available_filters:
if filters:
try:
for f in filters:
key, value = f.split("=")
if key not in available_filters:
display_message(
"Filter '{}' is not valid.\n"
"Available filters are '{}'.".format(
key,
"' '".join(sorted(available_filters.keys())),
),
msg_type="error",
)
sys.exit(1)
elif key == "step":
steps.append(value)
else:
# Case insensitive for compute backends
if (
key == "compute_backend"
and value.lower() in REANA_COMPUTE_BACKENDS
):
value = REANA_COMPUTE_BACKENDS[value.lower()]
elif key == "status" and value not in RUN_STATUSES:
display_message(
"Filter '{}' is not valid.\n"
"Available filters are '{}'.".format(
key,
"' '".join(sorted(available_filters.keys())),
),
"Input status value {} is not valid. ".format(value),
msg_type="error",
)
),
sys.exit(1)
elif key == "step":
steps.append(value)
else:
# Case insensitive for compute backends
if (
key == "compute_backend"
and value.lower() in REANA_COMPUTE_BACKENDS
):
value = REANA_COMPUTE_BACKENDS[value.lower()]
elif key == "status" and value not in RUN_STATUSES:
display_message(
"Input status value {} is not valid. ".format(value),
msg_type="error",
),
sys.exit(1)
chosen_filters[key] = value
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Please provide complete --filter name=value pairs, "
"for example --filter status=running.\n"
"Available filters are '{}'.".format(
"' '".join(sorted(available_filters.keys()))
),
msg_type="error",
)
sys.exit(1)
try:
response = get_workflow_logs(
workflow,
access_token,
steps=None if not steps else list(set(steps)),
page=page,
size=size,
)
workflow_logs = json.loads(response["logs"])
if filters:
for key, value in chosen_filters.items():
unwanted_steps = [
k
for k, v in workflow_logs["job_logs"].items()
if v[available_filters[key]] != value
]
for job_id in unwanted_steps:
del workflow_logs["job_logs"][job_id]

if json_format:
display_message(json.dumps(workflow_logs, indent=2))
sys.exit(0)
else:
from reana_client.cli.utils import output_user_friendly_logs

output_user_friendly_logs(
workflow_logs, None if not steps else list(set(steps))
)
chosen_filters[key] = value
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Cannot retrieve the logs of a workflow {}: \n"
"{}".format(workflow, str(e)),
"Please provide complete --filter name=value pairs, "
"for example --filter status=running.\n"
"Available filters are '{}'.".format(
"' '".join(sorted(available_filters.keys()))
),
msg_type="error",
)
sys.exit(1)

try:
if follow:
follow_workflow_logs(workflow, access_token, interval, steps)
else:
retrieve_workflow_logs(
workflow,
access_token,
json_format,
filters,
steps,
chosen_filters,
available_filters,
page,
size,
)
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Cannot retrieve logs for workflow {}: \n{}".format(workflow, str(e)),
msg_type="error",
)
sys.exit(1)


@workflow_execution_group.command("validate")
@click.option(
Expand Down
Loading

0 comments on commit ab17590

Please sign in to comment.