Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix in process termination check #318

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 71 additions & 73 deletions backend/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,111 +11,109 @@
from django.conf import settings
from django.utils import dateparse, timezone

logger = logging.getLogger("beat")
logger = logging.getLogger()


@shared_task()
def check_stopping():
logger.info("Checking processes stopping...")
def check_processes():
""" Checks the processing status in Orchestration and update the PZ Server
database with processing information.

procs_updated = []
monitoring_statuses = ["Stopping"]
procs_stopping = Process.objects.filter(status__in=monitoring_statuses)

for proc in procs_stopping:
logger.info(f"Consulting the {str(proc)} process status.")
proc_orches_id = proc.orchestration_process_id # type: ignore

if not proc_orches_id:
message = f"Process {str(proc.pk)} without Orchestration ID."
Returns:
bool: True, if an update was made. False, if no update was made.
"""

monitoring_statuses = ["Stopping", "Pending", "Running"]
logger.info(f"Monitoring the following statuses: {monitoring_statuses}")

processes = Process.objects.filter(status__in=monitoring_statuses)

if not processes:
return False

maestro = Maestro(settings.ORCHEST_URL)

for process in processes:
logger.info(f"Consulting the {process} process status.")
process_orch_id = process.orchestration_process_id # type: ignore

if not process_orch_id:
message = f"Process {str(process.pk)} without Orchestration ID."
logger.error(message)
proc.status = "Failed"
proc = update_dates(proc, {})
proc.save()
process = update_process_info(process, "Failed", {})
continue

maestro = Maestro(settings.ORCHEST_URL)
proc_orchest = maestro.status(proc_orches_id)
proc_orchest_status = proc_orchest.get("status") # type: ignore

logger.info(f"-> Process orchestration ID: {proc_orches_id}")
logger.info(f"-> Status: {proc_orchest_status}")

if not proc_orchest_status in monitoring_statuses:
proc.status = proc_orchest_status
proc.save()
logger.info(f"-> Process {str(proc)} updated.")
procs_updated.append(proc_orches_id)
process_orch = maestro.status(process_orch_id)
process_orch_status = process_orch.get("status") # type: ignore

return procs_updated
logger.debug(f"-> Process orchestration ID: {process_orch_id}")
logger.debug(f"-> Status: {process_orch_status}")

if process_orch_status == "Running" and process.status == "Pending":
started_at = process_orch.get("started_at", str(process.created_at))
process.started_at = dateparse.parse_datetime(started_at)
process.status = process_orch_status
process.save()

@shared_task()
def check_processes_finish():
logger.info("Checking running processes...")

procs_updated = []
active_statuses = ["Pending", "Running"]
procs_running = Process.objects.filter(status__in=active_statuses)

for proc in procs_running:
logger.info(f"Consulting the {str(proc)} process status.")
proc_orches_id = proc.orchestration_process_id # type: ignore
if process_orch_status == "Successful":
register_outputs(process.pk)

if process_orch_status != process.status:
process = update_process_info(
process=process,
process_orch_status=process_orch_status,
data=process_orch
)
logger.info(
f"{process} has been updated (new status: {process.status})"
)

if not proc_orches_id:
message = f"Process {str(proc.pk)} without Orchestration ID."
logger.error(message)
proc.status = "Failed"
proc = update_dates(proc, {})
proc.save()
continue

return True

maestro = Maestro(settings.ORCHEST_URL)
proc_orchest = maestro.status(proc_orches_id)
proc_orchest_status = proc_orchest.get("status") # type: ignore

logger.info(f"-> Process orchestration ID: {proc_orches_id}")
logger.info(f"-> Status: {proc_orchest_status}")

if proc_orchest_status == "Running" and proc.status == "Pending":
started_at = proc_orchest.get("started_at", str(proc.created_at))
proc.started_at = dateparse.parse_datetime(started_at)
proc.status = proc_orchest_status
proc.save()

if not proc_orchest_status in active_statuses:
proc.status = proc_orchest_status
proc = update_dates(proc, proc_orchest)
proc.save()
register_outputs(proc.pk)
logger.info(f"-> Process {str(proc)} updated.")
procs_updated.append(proc_orches_id)

return procs_updated
def update_process_info(process, process_orch_status, data):
""" Updates basic process information

Args:
process (Process): process object
process_orch_status (str): process orchestration status
data (dict): process info

def update_dates(process, data):
Returns:
Process: process object
"""
started_at = data.get("started_at", str(process.created_at))
ended_at = data.get("ended_at", str(timezone.now()))

if not ended_at:
ended_at = str(timezone.now())

process.started_at = dateparse.parse_datetime(started_at)
process.ended_at = dateparse.parse_datetime(ended_at)
process.status = process_orch_status
process.save()
return process


def register_outputs(process_id):
"""_summary_
""" Records the outputs in the database

Args:
process_id (_type_): _description_
process_id (int): process ID
"""

logger.info(f"[process {process_id}]: starting upload registration...")

file_roles = dict(FileRoles.choices)
file_roles = {str(v).lower(): k for k, v in file_roles.items()}

process = Process.objects.get(pk=process_id)
process_dir = pathlib.Path(settings.PROCESSING_DIR, process.path)
process_file = process_dir.joinpath("process.yml")

logger.debug(f"[process {process_id}]: info filepath {process_file}")

reg_product = RegistryProduct(process.upload.pk)

process_file_dict = load_yaml(process_file)
Expand All @@ -135,17 +133,17 @@ def register_outputs(process_id):
process.upload.status = 1 # Published status
process.upload.save()
process.save()
logger.info(f"[process {process_id}]: registration completed!")
except Exception as _:
process.upload.status = 9 # Failed status
process.upload.save()
process.save()
logger.exception("Failed to upload register!")
logger.exception(f"[process {process_id}]: Failed to upload register!")


def copy_upload(filepath, upload_dir):
filepath = pathlib.Path(filepath)
new_filepath = pathlib.Path(settings.MEDIA_ROOT, upload_dir, filepath.name)
logger.debug("new_filepath -> %s", str(new_filepath))
shutil.copyfile(str(filepath), str(new_filepath))
return str(new_filepath)

Expand Down
8 changes: 2 additions & 6 deletions backend/pzserver/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@

# https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
app.conf.beat_schedule = {
"check-finish": {
"task": "core.tasks.check_processes_finish",
"check-processes": {
"task": "core.tasks.check_processes",
"schedule": 60.0,
},
"check-stopping": {
"task": "core.tasks.check_stopping",
"schedule": 120.0,
},
}
app.conf.timezone = "UTC"

Expand Down
8 changes: 0 additions & 8 deletions backend/pzserver/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,6 @@
"backupCount": 5,
"formatter": "standard",
},
"beat": {
"level": LOGGING_LEVEL,
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(LOG_DIR, "celerybeat.log"),
},
"saml": {
"level": LOGGING_LEVEL,
"class": "logging.handlers.RotatingFileHandler",
Expand Down Expand Up @@ -296,9 +291,6 @@
"level": LOGGING_LEVEL,
"propagate": True,
},
"beat": {
"handlers": ["beat"],
},
"saml": {
"handlers": ["saml"],
"level": LOGGING_LEVEL,
Expand Down
Loading