Skip to content

Commit

Permalink
Add file_share param into workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
annshress committed Sep 5, 2023
1 parent 6a41fbb commit 0d06b59
Show file tree
Hide file tree
Showing 16 changed files with 77 additions and 55 deletions.
6 changes: 3 additions & 3 deletions em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,6 @@ def cleanup_files(file_path: FilePath, pattern=str):
terminal_state_handler=utils.custom_terminal_state_handler,
run_config=LocalRun(labels=[utils.get_environment()]),
) as flow:

# This block of params map are for adoc file specfication.
# Note the ugly names, these parameters are lifted verbatim from
# https://bio3d.colorado.edu/imod/doc/directives.html where possible.
Expand All @@ -489,6 +488,7 @@ def cleanup_files(file_path: FilePath, pattern=str):

adoc_template = Parameter("adoc_template", default="plastic_brt")
input_dir = Parameter("input_dir")
file_share = Parameter("file_share")
callback_url = Parameter("callback_url", default=None)()
token = Parameter("token", default=None)()
file_name = Parameter("file_name", default=None)
Expand All @@ -499,13 +499,13 @@ def cleanup_files(file_path: FilePath, pattern=str):
keep_workdir = Parameter("keep_workdir", default=False)()

# a single input_dir will have n tomograms
input_dir_fp = utils.get_input_dir(input_dir=input_dir)
input_dir_fp = utils.get_input_dir(share_name=file_share, input_dir=input_dir)
# input_dir_fp = utils.get_input_dir(input_dir=input_dir)
input_fps = utils.list_files(
input_dir=input_dir_fp, exts=["MRC", "ST", "mrc", "st"], single_file=file_name
)

fps = utils.gen_fps(input_dir=input_dir_fp, fps_in=input_fps)
fps = utils.gen_fps(share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps)
brts = utils.run_brt.map(
file_path=fps,
adoc_template=unmapped(adoc_template),
Expand Down
46 changes: 21 additions & 25 deletions em_workflows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import prefect
import shutil

from em_workflows.enums import FileShareEnum

# loads .env file into os.environ
load_dotenv()

Expand Down Expand Up @@ -62,43 +64,37 @@ class Config:
mrc2tif_loc = os.environ.get("MRC2TIF_LOC", "/opt/rml/imod/bin/mrc2tif")
newstack_loc = os.environ.get("NEWSTACK_LOC", "/opt/rml/imod/bin/newstack")

# environment where the app gets run - used for share selection
env_to_share = {
"dev": "RMLEMHedwigDev",
"qa": "RMLEMHedwigQA",
"prod": "RMLEMHedwigProd",
}

# All settings moved to respective constants file
# fibsem_input_exts = ["TIFF", "tiff", "TIF", "tif"]

SLURM_EXECUTOR = DaskExecutor(cluster_class=SLURM_exec)
user = os.environ["USER"]
tmp_dir = f"/gs1/Scratch/{user}_scratch/"
mount_point = "/mnt/ai-fas12/"

@staticmethod
def _share_name(env: str) -> str:
"""
gets the path of the location of input based on environment
"""
val = Config.env_to_share.get(env)
if not val:
raise ValueError(
f"Environment {env} not in valid environments: \
{Config.env_to_share.keys()}"
@classmethod
def mount_point(cls, share_name: str) -> str:
try:
share_enum = FileShareEnum[share_name]
except KeyError as e:
prefect.context.logger.info(
f"{share_name} is a bad Share name which is not under consideration yet."
)
return val
raise e
# TODO change the return values as necessary
mapper = {
FileShareEnum.RMLSOHedwigDev: f"/mnt/ai-fas12-so/{share_name}",
FileShareEnum.RMLSOHedwigQA: f"/mnt/ai-fas12-so/{share_name}",
FileShareEnum.RMLSOHedwigProd: f"/mnt/ai-fas12-so/{share_name}",
}
return mapper.get(share_enum, f"/mnt/ai-fas12/{share_name}")

@staticmethod
def proj_dir(env: str) -> str:
share = Config._share_name(env=env)
return f"{Config.mount_point}{share}/Projects/"
def proj_dir(share_name: str) -> str:
return f"{Config.mount_point(share_name)}/Projects/"

@staticmethod
def assets_dir(env: str) -> str:
share = Config._share_name(env=env)
return f"{Config.mount_point}{share}/Assets/"
def assets_dir(share_name: str) -> str:
return f"{Config.mount_point(share_name)}/Assets/"

repo_dir = Path(os.path.dirname(__file__))
template_dir = Path(f"{repo_dir.as_posix()}/templates")
1 change: 0 additions & 1 deletion em_workflows/czi/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def bioformats_gen_zarr(file_path: FilePath):
executor=Config.SLURM_EXECUTOR,
run_config=LocalRun(labels=[utils.get_environment()]),
) as flow:

input_dir = Parameter("input_dir")
file_name = Parameter("file_name", default=None)
callback_url = Parameter("callback_url", default=None)()
Expand Down
5 changes: 3 additions & 2 deletions em_workflows/dm_conversion/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,21 +227,22 @@ def scale_jpegs(file_path: FilePath, size: str) -> Optional[dict]:
-convert all mrcs in Projects dir to jpegs.
-convert all tiffs/pngs/jpegs to correct size for thumbs, "sm" and "lg"
"""
file_share = Parameter("file_share")
input_dir = Parameter("input_dir")
file_name = Parameter("file_name", default=None)
callback_url = Parameter("callback_url", default=None)()
token = Parameter("token", default=None)()
no_api = Parameter("no_api", default=None)()
# keep workdir if set true, useful to look at outputs
keep_workdir = Parameter("keep_workdir", default=False)()
input_dir_fp = utils.get_input_dir(input_dir=input_dir)
input_dir_fp = utils.get_input_dir(share_name=file_share, input_dir=input_dir)

input_fps = utils.list_files(
input_dir_fp,
VALID_2D_INPUT_EXTS,
single_file=file_name,
)
fps = utils.gen_fps(input_dir=input_dir_fp, fps_in=input_fps)
fps = utils.gen_fps(share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps)
# logs = utils.init_log.map(file_path=fps)

tiffs_converted = convert_if_int16_tiff.map(file_path=fps)
Expand Down
17 changes: 17 additions & 0 deletions em_workflows/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from enum import Enum


class FileShareEnum(Enum):
"""
Scientific data can reside in any mounted points depending on the lab or the project.
This enum is used to direct where the data is mounted.
Source: https://github.com/niaid/hedwig-comm-specs/issues/4#issuecomment-1701512859
"""

RMLEMHedwigDev = 1
RMLEMHedwigQA = 2
RMLEMHedwigProd = 3
# SO refers to spatial omics
RMLSOHedwigDev = 4
RMLSOHedwigQA = 5
RMLSOHedwigProd = 6
7 changes: 3 additions & 4 deletions em_workflows/file_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class FilePath:
:todo: Consider making entire class immutable
"""

def __init__(self, input_dir: Path, fp_in: Path) -> None:
def __init__(self, share_name: str, input_dir: Path, fp_in: Path) -> None:
"""
sets up:
Expand All @@ -51,10 +51,9 @@ def __init__(self, input_dir: Path, fp_in: Path) -> None:
self._working_dir = self.make_work_dir()
self._assets_dir = self.make_assets_dir()
self.environment = self.get_environment()
self.proj_root = Path(Config.proj_dir(env=self.environment))
self.asset_root = Path(Config.assets_dir(env=self.environment))
self.proj_root = Path(Config.proj_dir(share_name=share_name))
self.asset_root = Path(Config.assets_dir(share_name=share_name))
self.prim_fp_elt = self.gen_prim_fp_elt()
# log(self.__repr__())

def __str__(self) -> str:
return f"FilePath: proj_root:{self.proj_root}\n\
Expand Down
5 changes: 3 additions & 2 deletions em_workflows/lrg_2d_rgb/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,21 @@ def gen_thumb(file_path: FilePath):
-create tmp dir for each.
-convert to tiff -> zarr -> jpegs (thumb)
"""
file_share = Parameter("file_share")
input_dir = Parameter("input_dir")
file_name = Parameter("file_name", default=None)
callback_url = Parameter("callback_url", default=None)()
token = Parameter("token", default=None)()
no_api = Parameter("no_api", default=None)()
keep_workdir = Parameter("keep_workdir", default=False)()
input_dir_fp = utils.get_input_dir(input_dir=input_dir)
input_dir_fp = utils.get_input_dir(share_name=file_share, input_dir=input_dir)

input_fps = utils.list_files(
input_dir_fp,
VALID_LRG_2D_RGB_INPUTS,
single_file=file_name,
)
fps = utils.gen_fps(input_dir=input_dir_fp, fps_in=input_fps)
fps = utils.gen_fps(share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps)
tiffs = convert_png_to_tiff.map(file_path=fps)
zarr_assets = bioformats_gen_zarr.map(file_path=fps, upstream_tasks=[tiffs])
thumb_assets = gen_thumb.map(file_path=fps, upstream_tasks=[zarr_assets])
Expand Down
7 changes: 5 additions & 2 deletions em_workflows/sem_tomo/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict:
- Firstly create nifti file using the base mrc, then convert this to ng format.
- To conclude, send callback stating the location of the various outputs.
"""
file_share = Parameter("file_share")
input_dir = Parameter("input_dir")
file_name = Parameter("file_name", default=None)
callback_url = Parameter("callback_url", default=None)()
Expand All @@ -237,13 +238,15 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict:
keep_workdir = Parameter("keep_workdir", default=False)()

# dir to read from.
input_dir_fp = utils.get_input_dir(input_dir=input_dir)
input_dir_fp = utils.get_input_dir(share_name=file_share, input_dir=input_dir)
# note FIBSEM is different to other flows in that it uses *directories*
# to define stacks. Therefore, will have to list dirs to discover stacks
# (rather than eg mrc files)
input_dir_fps = utils.list_dirs(input_dir_fp=input_dir_fp)

fps = utils.gen_fps(input_dir=input_dir_fp, fps_in=input_dir_fps)
fps = utils.gen_fps(
share_name=file_share, input_dir=input_dir_fp, fps_in=input_dir_fps
)
tif_to_mrc = convert_tif_to_mrc.map(fps)

# using source.mrc gen align.xf
Expand Down
1 change: 0 additions & 1 deletion em_workflows/utils/neuroglancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

@task
def gen_zarr(fp_in: FilePath, width: int, height: int, depth: int = None) -> Dict:

"""
.. code-block::
Expand Down
8 changes: 4 additions & 4 deletions em_workflows/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ def get_environment() -> str:


@task
def get_input_dir(input_dir: str) -> Path:
def get_input_dir(share_name: str, input_dir: str) -> Path:
"""
:param input_dir:
:return:
Expand All @@ -789,20 +789,20 @@ def get_input_dir(input_dir: str) -> Path:
input_dir = input_dir + "/"
if not input_dir.startswith("/"):
input_dir = "/" + input_dir
input_path_str = Config.proj_dir(env=get_environment()) + input_dir
input_path_str = Config.proj_dir(share_name=share_name) + input_dir
return Path(input_path_str)


@task
def gen_fps(input_dir: Path, fps_in: List[Path]) -> List[FilePath]:
def gen_fps(share_name: str, input_dir: Path, fps_in: List[Path]) -> List[FilePath]:
"""
Given in input directory (Path) and a list of input files (Path), return
a list of FilePaths for the input files. This includes a temporary working
directory for each file to keep the files separate on the HPC.
"""
fps = list()
for fp in fps_in:
file_path = FilePath(input_dir=input_dir, fp_in=fp)
file_path = FilePath(share_name=share_name, input_dir=input_dir, fp_in=fp)
msg = f"created working_dir {file_path.working_dir} for {fp.as_posix()}"
log(msg)
fps.append(file_path)
Expand Down
4 changes: 2 additions & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def mock_nfs_mount(monkeypatch):
from em_workflows.dm_conversion.config import DMConfig
from em_workflows.sem_tomo.config import SEMConfig

def _mock_proj_dir(env: str) -> str:
def _mock_proj_dir(share_name: str) -> str:
return os.getcwd()

def _mock_assets_dir(env: str) -> str:
def _mock_assets_dir(share_name: str) -> str:
return os.getcwd()

monkeypatch.setattr(Config, "proj_dir", _mock_proj_dir)
Expand Down
1 change: 1 addition & 0 deletions test/test_brt.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_brt(mock_nfs_mount):
TargetNumberOfBeads=20,
LocalAlignments=0,
THICKNESS=30,
file_share="test",
input_dir="test/input_files/brt_inputs/Projects/",
no_api=True,
keep_workdir=True,
Expand Down
5 changes: 5 additions & 0 deletions test/test_dm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def test_dm4_conv(mock_nfs_mount):
from em_workflows.dm_conversion.flow import flow

state = flow.run(
file_share="test",
input_dir="/test/input_files/dm_inputs/Projects/Lab/PI",
no_api=True,
)
Expand All @@ -21,6 +22,7 @@ def test_dm4_conv_clean_workdir(mock_nfs_mount):
from em_workflows.dm_conversion.flow import flow

state = flow.run(
file_share="test",
input_dir="/test/input_files/dm_inputs/Projects/Lab/PI",
no_api=True,
# keep_workdir=True
Expand All @@ -32,6 +34,7 @@ def test_input_fname(mock_nfs_mount):
from em_workflows.dm_conversion.flow import flow

state = flow.run(
file_share="test",
input_dir="/test/input_files/dm_inputs/Projects/Lab/PI",
file_name="20210525_1416_A000_G000.dm4",
no_api=True,
Expand All @@ -44,6 +47,7 @@ def test_single_file_no_ext_not_found_gens_exception(mock_nfs_mount):
from em_workflows.dm_conversion.flow import flow

state = flow.run(
file_share="test",
input_dir="/test/input_files/dm_inputs/Projects/Lab/PI",
file_name="file_with_no_ext",
no_api=True,
Expand All @@ -56,6 +60,7 @@ def test_single_file_not_found_gens_exception(mock_nfs_mount):
from em_workflows.dm_conversion.flow import flow

state = flow.run(
file_share="test",
input_dir="/test/input_files/dm_inputs/Projects/Lab/PI",
file_name="does_not_exist.test",
no_api=True,
Expand Down
1 change: 1 addition & 0 deletions test/test_lrg_2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ def test_input_fname(mock_nfs_mount, caplog):
# monkeypatch.setattr(Config, "convert_loc", command_loc("convert"))

state = flow.run(
file_share="test",
input_dir="/test/input_files/lrg_ROI_pngs/Projects",
no_api=True,
)
Expand Down
1 change: 1 addition & 0 deletions test/test_sem.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def test_sem(mock_nfs_mount):

result = flow.run(
# FIXME `sem_inputs` directory is missing
file_share="test",
input_dir="/test/input_files/sem_inputs/Projects/",
tilt_angle="30.2",
no_api=True,
Expand Down
Loading

0 comments on commit 0d06b59

Please sign in to comment.