Skip to content

Commit

Permalink
Finalise rerun and simple messages
Browse files Browse the repository at this point in the history
Formatting and linting commit

Use main branch for db

Add simple examples for generic runs

Add utils tests

Formatting and linting commit

Fix some issues with typing

Formatting and linting commit

Import correct state for job watcher tests

Adjust start and end inputs

Further correct issues
  • Loading branch information
Pasarus committed Aug 8, 2024
1 parent 59aa9a8 commit dd5b462
Show file tree
Hide file tree
Showing 18 changed files with 391 additions and 336 deletions.
8 changes: 8 additions & 0 deletions container/example_simple_runner.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3.12.3-slim

RUN pip install pandas uniplot

# The secret sauce here is to have this at the end of a runner's dockerfile
RUN echo '#!/bin/bash\npython -c "$@"' > /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
2 changes: 1 addition & 1 deletion container/job_creator.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.12.3-slim
FROM python:3.12.3

WORKDIR /jobcreator

Expand Down
2 changes: 1 addition & 1 deletion container/job_watcher.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.12.3-slim
FROM python:3.12.3

WORKDIR /jobwatcher

Expand Down
118 changes: 118 additions & 0 deletions job_creator/README.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions job_creator/jobcreator/job_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def spawn_job( # noqa: PLR0913
cluster_id: str,
fs_name: str,
ceph_mount_path: str,
reduction_id: int,
job_id: int,
max_time_to_complete_job: int,
db_ip: str,
db_username: str,
Expand All @@ -235,7 +235,7 @@ def spawn_job( # noqa: PLR0913
:param cluster_id: The cluster id for the ceph cluster to connect to
:param fs_name: The file system name for the ceph cluster
:param ceph_mount_path: the path on the ceph cluster to mount
:param reduction_id: The id used in the DB for the reduction
:param job_id: The id used in the DB for the reduction
:param max_time_to_complete_job: The maximum time to allow for completion of a job in seconds
:param db_ip: The database ip to connect to
:param db_username: the database username to use to connect
Expand Down Expand Up @@ -376,7 +376,7 @@ def spawn_job( # noqa: PLR0913
job_metadata = client.V1ObjectMeta(
name=job_name,
annotations={
"reduction-id": str(reduction_id),
"job-id": str(job_id),
"pvs": str(pv_names),
"pvcs": str(pvc_names),
"kubectl.kubernetes.io/default-container": main_container.name,
Expand Down
167 changes: 139 additions & 28 deletions job_creator/jobcreator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
from pathlib import Path
from typing import Any

from jobcreator.database.db_updater import DBUpdater, Run
from db.data_models import Job, JobType, Run, State
from db.utils.db_updater import DBUpdater

from jobcreator.job_creator import JobCreator
from jobcreator.queue_consumer import QueueConsumer
from jobcreator.script_aquisition import acquire_script
from jobcreator.utils import create_ceph_mount_path, find_sha256_of_image, logger

from db.data_models import Job, State
from jobcreator.utils import (
create_ceph_mount_path_autoreduction,
create_ceph_mount_path_simple,
find_sha256_of_image,
logger,
)

# Set up the jobcreator environment
DB_IP = os.environ.get("DB_IP", "")
Expand Down Expand Up @@ -59,25 +64,117 @@
MAX_TIME_TO_COMPLETE = int(os.environ.get("MAX_TIME_TO_COMPLETE", 60 * 60 * 6))


def process_simple_message(message: dict[str, Any]):
runner_image = find_sha256_of_image(message["runner_image"])
script = message["script"]
owner = DB_UPDATER.find_owner_db_entry_or_create(experiment_number=message.get("experiment_number"),
user_number=message.get("user_number"))
script = DB_UPDATER.update_script()
reduction = Job(
start=None,
end=None,
state=State.NOT_STARTED,
inputs={},
script=script,
outputs=None,
runner_image=runner_image,
owner_relationship=owner,
)
def process_simple_message(message: dict[str, Any]) -> None:
"""
A simple message expects the following entries in the dictionary: (experiment_number or user_number, runner_image,
and script).
:param message: The message to be processed, there is an assumption it is the simple variety of message that the
creator can process.
:return: None
"""
try:
runner_image = find_sha256_of_image(message["runner_image"])
script = message["script"]
owner = DB_UPDATER.find_owner_db_entry_or_create(
experiment_number=message.get("experiment_number"), user_number=message.get("user_number")
)
if message.get("user_number"):
# Add UUID which will avoid collisions
job_name = f"run-owner{str(owner.user_number).lower()}-requested-{uuid.uuid4().hex!s}"
else:
# Add UUID which will avoid collisions
job_name = f"run-owner{str(owner.experiment_number).lower()}-requested-{uuid.uuid4().hex!s}"
# Job name can be no longer than 50 characters because more will be added to end the name such as -extras-pvc
# and is needed For defining the PVs and PVCs
if len(job_name) > 50: # noqa: PLR2004
job_name = job_name[:50]
job = Job(
start=None,
end=None,
state=State.NOT_STARTED,
inputs={},
outputs=None,
runner_image=runner_image,
owner=owner,
job_type=JobType.SIMPLE,
)
DB_UPDATER.add_simple_job(job=job)
DB_UPDATER.update_script(job=job, job_script=script, script_sha="")
ceph_mount_path_kwargs = (
{"user_number": str(owner.user_number)}
if owner.user_number is not None
else {"experiment_number": str(owner.experiment_number)}
)
ceph_mount_path = create_ceph_mount_path_simple(**ceph_mount_path_kwargs)
JOB_CREATOR.spawn_job(
job_name=job_name,
script=script,
job_namespace=JOB_NAMESPACE,
ceph_creds_k8s_secret_name=CEPH_CREDS_SECRET_NAME,
ceph_creds_k8s_namespace=CEPH_CREDS_SECRET_NAMESPACE,
cluster_id=CLUSTER_ID,
fs_name=FS_NAME,
ceph_mount_path=str(ceph_mount_path),
job_id=job.id,
db_ip=DB_IP,
db_username=DB_USERNAME,
db_password=DB_PASSWORD,
max_time_to_complete_job=MAX_TIME_TO_COMPLETE,
runner_image=runner_image,
manila_share_id=MANILA_SHARE_ID,
manila_share_access_id=MANILA_SHARE_ACCESS_ID,
)
except Exception as exception: # pylint: disable=broad-exception-caught
logger.exception(exception)


def process_rerun_message(message: dict[str, Any]) -> None:
"""
Rerun a reduction based on the
:param message: dict, the message is a dictionary containing the needed information for spawning a pod
:return: None
"""
try:
runner_image = find_sha256_of_image(message["runner_image"])
script = message["script"]
# Add UUID which will avoid collisions for reruns
owner = DB_UPDATER.find_owner_db_entry_or_create(
experiment_number=message.get("experiment_number"), user_number=message.get("user_number")
)
run, new_job = DB_UPDATER.add_rerun_job(
original_job_id=int(message.get("job_id")), # type: ignore
new_script=script,
new_owner_id=owner.id,
new_runner_image=runner_image,
)
ceph_mount_path = create_ceph_mount_path_autoreduction(
instrument_name=run.instrument.instrument_name,
rb_number=str(run.owner.experiment_number) if run.owner is not None else "0",
)
job_name = f"run-{run.filename.lower()}-{uuid.uuid4().hex!s}"
JOB_CREATOR.spawn_job(
job_name=job_name,
script=script,
job_namespace=JOB_NAMESPACE,
ceph_creds_k8s_secret_name=CEPH_CREDS_SECRET_NAME,
ceph_creds_k8s_namespace=CEPH_CREDS_SECRET_NAMESPACE,
cluster_id=CLUSTER_ID,
fs_name=FS_NAME,
ceph_mount_path=str(ceph_mount_path),
job_id=new_job.id,
db_ip=DB_IP,
db_username=DB_USERNAME,
db_password=DB_PASSWORD,
max_time_to_complete_job=MAX_TIME_TO_COMPLETE,
runner_image=runner_image,
manila_share_id=MANILA_SHARE_ID,
manila_share_access_id=MANILA_SHARE_ACCESS_ID,
)
except Exception as exception: # pylint: disable=broad-exception-caught
logger.exception(exception)


def process_autoreduction_message(message: dict[str, Any]): # pylint: disable=too-many-locals
def process_autoreduction_message(message: dict[str, Any]) -> None: # pylint: disable=too-many-locals
"""
Request that the k8s api spawns a job
:param message: dict, the message is a dictionary containing the needed information for spawning a pod
Expand All @@ -99,28 +196,29 @@ def process_autoreduction_message(message: dict[str, Any]): # pylint: disable
runner_image = find_sha256_of_image(runner_image)
# Add UUID which will avoid collisions for reruns
job_name = f"run-{filename.lower()}-{uuid.uuid4().hex!s}"
reduction = DB_UPDATER.add_detected_run(
job = DB_UPDATER.add_detected_run(
instrument_name,
Run(
filename=filename,
title=title,
users=users,
experiment_number=experiment_number,
run_start=run_start,
run_end=run_end,
good_frames=good_frames,
raw_frames=raw_frames,
),
additional_values,
runner_image,
experiment_number,
)
job_id = job.id # Needed due to ORM weirdness with session availability.
script, script_sha = acquire_script(
fia_api_host=FIA_API_HOST,
reduction_id=reduction.id,
job_id=job_id,
instrument=instrument_name,
)
DB_UPDATER.update_script(reduction, script, script_sha)
ceph_mount_path = create_ceph_mount_path(instrument_name, rb_number)
DB_UPDATER.update_script(job, script, script_sha)
ceph_mount_path = create_ceph_mount_path_autoreduction(instrument_name, rb_number)
JOB_CREATOR.spawn_job(
job_name=job_name,
script=script,
Expand All @@ -130,7 +228,7 @@ def process_autoreduction_message(message: dict[str, Any]): # pylint: disable
cluster_id=CLUSTER_ID,
fs_name=FS_NAME,
ceph_mount_path=str(ceph_mount_path),
reduction_id=reduction.id,
job_id=job_id,
db_ip=DB_IP,
db_username=DB_USERNAME,
db_password=DB_PASSWORD,
Expand All @@ -151,9 +249,22 @@ def process_message(message: dict[str, Any]) -> None:
:param message: the message is a dictionary containing the needed information for spawning a pod
:return: None
"""
if message.get("script") and message.get("runner_image") and (message.get("user_number") or message.get("experiment_number")):
if (
not message.get("job_id")
and message.get("script")
and message.get("runner_image")
and (message.get("user_number") or message.get("experiment_number"))
):
logger.info("Processing simple message...")
process_simple_message(message)
elif (
message.get("job_id")
and message.get("runner_image")
and message.get("script")
and (message.get("user_number") or message.get("experiment_number"))
):
logger.info("Processing rerun message...")
process_rerun_message(message)
else:
logger.info("Processing autoreduction message...")
process_autoreduction_message(message)
Expand Down
8 changes: 4 additions & 4 deletions job_creator/jobcreator/script_aquisition.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
"""
Contains the functions for acquiring a script for the reduction workflow
Contains the functions for acquiring a script for the job workflow
"""

from http import HTTPStatus

import requests


def acquire_script(fia_api_host: str, reduction_id: int, instrument: str) -> tuple[str, str]:
def acquire_script(fia_api_host: str, job_id: int, instrument: str) -> tuple[str, str]:
"""
Given the FIA-API host, reduction_id, and instrument, return the script object required for reduction
Given the FIA-API host, job_id, and instrument, return the script object required for reduction
:return: Script, the script for the reduction
"""
response = requests.get(
f"http://{fia_api_host}/instrument/{instrument}/script?reduction_id={reduction_id}",
f"http://{fia_api_host}/instrument/{instrument}/script?job_id={job_id}",
timeout=30,
)
if response.status_code != HTTPStatus.OK:
Expand Down
43 changes: 38 additions & 5 deletions job_creator/jobcreator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
logger = logging.getLogger("jobcreator")


def create_ceph_path(instrument_name: str, rb_number: str) -> Path:
def create_ceph_path_autoreduction(instrument_name: str, rb_number: str) -> Path:
"""
Create the path that the files should store outputs in on CEPH
:param instrument_name: The name of the instrument that the file is from
Expand Down Expand Up @@ -59,7 +59,38 @@ def load_kubernetes_config() -> None:
config.load_kube_config()


def ensure_ceph_path_exists(ceph_path: Path) -> Path:
def create_ceph_mount_path_simple(
user_number: str | None = None,
experiment_number: str | None = None,
mount_path: str = "/isis/instrument",
local_ceph_path: str = "/ceph",
) -> Path:
"""
Creates the ceph mount for the job to output to
:param user_number: str, The user number that owns the job
:param experiment_number: str, The experiment number that owns the job
:param mount_path: str, the path that should be pointed to by default, before RBNumber, and Instrument specific
directories.
:param local_ceph_path: str, the path that we expect Ceph to be present locally, by default it's /ceph, mostly for
testing.
:return: str, the path that was created for the mount
"""
initial_path = Path(local_ceph_path) / "GENERIC" / "autoreduce"
if user_number is not None and experiment_number is None:
ceph_path = initial_path / "UserNumbers" / user_number
elif experiment_number is not None and user_number is None:
ceph_path = initial_path / "ExperimentNumbers" / experiment_number
else:
raise ValueError("Both user_number and experiment_number cannot be defined, but one must be.")
if not ceph_path.exists():
logger.info("Attempting to create ceph path: %s", str(ceph_path))
ceph_path.mkdir(parents=True, exist_ok=True)
# There is an assumption that the ceph_path will have /ceph at the start that needs to be removed
ceph_path = ceph_path.relative_to(local_ceph_path)
return Path(mount_path) / ceph_path


def ensure_ceph_path_exists_autoreduction(ceph_path: Path) -> Path:
"""
Takes a path that is intended to be on ceph and ensures that it will be correct for what we should mount and
apply output to.
Expand All @@ -81,7 +112,9 @@ def ensure_ceph_path_exists(ceph_path: Path) -> Path:
return ceph_path


def create_ceph_mount_path(instrument_name: str, rb_number: str, mount_path: str = "/isis/instrument") -> Path:
def create_ceph_mount_path_autoreduction(
instrument_name: str, rb_number: str, mount_path: str = "/isis/instrument"
) -> Path:
"""
Creates the ceph mount for the job to output to
:param instrument_name: str, name of the instrument
Expand All @@ -90,8 +123,8 @@ def create_ceph_mount_path(instrument_name: str, rb_number: str, mount_path: str
directories.
:return: str, the path that was created for the mount
"""
ceph_path = create_ceph_path(instrument_name, rb_number)
ceph_path = ensure_ceph_path_exists(ceph_path)
ceph_path = create_ceph_path_autoreduction(instrument_name, rb_number)
ceph_path = ensure_ceph_path_exists_autoreduction(ceph_path)

Check warning on line 127 in job_creator/jobcreator/utils.py

View check run for this annotation

Codecov / codecov/patch

job_creator/jobcreator/utils.py#L126-L127

Added lines #L126 - L127 were not covered by tests
# There is an assumption that the ceph_path will have /ceph at the start that needs to be removed
ceph_path = ceph_path.relative_to("/ceph")
return Path(mount_path) / ceph_path
Expand Down
5 changes: 1 addition & 4 deletions job_creator/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ version = "0.0.1"
requires-python = ">= 3.11"
dependencies = [
"kubernetes==29.0.0",
"psycopg2-binary==2.9.9",
"SQLAlchemy==2.0.30",
"pika==1.3.2",
"db@git+https://github.com/fiaisis/db",
]
Expand All @@ -23,7 +21,6 @@ formatting = [
"ruff==0.4.8",
"mypy==1.10.0",
"pytest==8.2.2",
"sqlalchemy-stubs==0.4",
"types-requests==2.32.0.20240602"
]

Expand All @@ -34,7 +31,7 @@ test = [
]

[tool.setuptools]
packages = ["jobcreator", "jobcreator.database"]
packages = ["jobcreator"]

[tool.ruff]
line-length = 120
Expand Down
Loading

0 comments on commit dd5b462

Please sign in to comment.