Skip to content

Commit

Permalink
Merge pull request #134 from niaid/optimize_vol_to_pyr
Browse files Browse the repository at this point in the history
Optimize vol to pyr
  • Loading branch information
philipmac authored Dec 19, 2022
2 parents e6b84fd + 8f40be3 commit 3d6ea08
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 65 deletions.
10 changes: 7 additions & 3 deletions em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def gen_ave_8_vol(file_path: FilePath) -> dict:
"""
ave_8_mrc = f"{file_path.working_dir}/avebin8_{file_path.base}.mrc"
ave_mrc = f"{file_path.working_dir}/ave_{file_path.base}.mrc"
cmd = [Config.binvol, "-binning", "8", ave_mrc, ave_8_mrc]
cmd = [Config.binvol, "-binning", "2", ave_mrc, ave_8_mrc]
log_file = f"{file_path.working_dir}/ave_8_mrc.log"
FilePath.run(cmd=cmd, log_file=log_file)
asset_fp = file_path.copy_to_assets_dir(fp_to_cp=Path(ave_8_mrc))
Expand Down Expand Up @@ -364,9 +364,13 @@ def list_paired_files(fnames: List[Path]) -> List[Path]:

adoc_template = Parameter("adoc_template", default="plastic_brt")
input_dir = Parameter("input_dir")
callback_url = Parameter("callback_url")()
token = Parameter("token")()
callback_url = Parameter("callback_url", default=None)()
token = Parameter("token", default=None)()
file_name = Parameter("file_name", default=None)

# run workflow without an api.
no_api = Parameter("no_api", default=False)()

# a single input_dir will have n tomograms
input_dir_fp = utils.get_input_dir(input_dir=input_dir)
# input_dir_fp = utils.get_input_dir(input_dir=input_dir)
Expand Down
9 changes: 6 additions & 3 deletions em_workflows/dm_conversion/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ def get_environment() -> str:
"""
input_dir = Parameter("input_dir")
file_name = Parameter("file_name", default=None)
callback_url = Parameter("callback_url")()
token = Parameter("token")()
callback_url = Parameter("callback_url", default=None)()
token = Parameter("token", default=None)()
no_api = Parameter("no_api", default=None)()
input_dir_fp = utils.get_input_dir(input_dir=input_dir)

input_fps = utils.list_files(
Expand Down Expand Up @@ -251,12 +252,14 @@ def get_environment() -> str:
callback_with_keyimgs = utils.add_asset.map(
prim_fp=callback_with_thumbs, asset=keyimg_assets
)
# finally filter error states, and convert to JSON and send.
filtered_callback = utils.filter_results(callback_with_keyimgs)
cp_wd_to_assets = utils.copy_workdirs.map(
fps, upstream_tasks=[callback_with_keyimgs]
)

callback_sent = utils.send_callback_body(
token=token,
callback_url=callback_url,
files_elts=callback_with_keyimgs,
files_elts=filtered_callback,
)
12 changes: 6 additions & 6 deletions em_workflows/sem_tomo/flow.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from em_workflows.file_path import FilePath
import subprocess
import glob
import math
from typing import List, Dict
from pathlib import Path
from prefect import Flow, task, Parameter, unmapped
from prefect.run_configs import LocalRun
from prefect.tasks.control_flow import merge
from em_workflows.config import Config
from em_workflows.shell_task_echo import ShellTaskEcho
from em_workflows.utils import utils
Expand Down Expand Up @@ -250,9 +247,10 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict:
) as flow:
input_dir = Parameter("input_dir")
file_name = Parameter("file_name", default=None)
callback_url = Parameter("callback_url")()
token = Parameter("token")()
callback_url = Parameter("callback_url", default=None)()
token = Parameter("token", default=None)()
tilt_angle = Parameter("tilt_angle", default=None)()
no_api = Parameter("no_api", default=False)()

# dir to read from.
input_dir_fp = utils.get_input_dir(input_dir=input_dir)
Expand Down Expand Up @@ -311,9 +309,11 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict:
prim_fp=callback_with_pyramids, asset=corrected_mrc_assets
)

# finally filter error states, and convert to JSON and send.
filtered_callback = utils.filter_results(callback_with_corr_mrcs)
cp_wd_to_assets = utils.copy_workdirs.map(
fps, upstream_tasks=[callback_with_corr_mrcs]
)
cb = utils.send_callback_body(
token=token, callback_url=callback_url, files_elts=callback_with_corr_mrcs
token=token, callback_url=callback_url, files_elts=filtered_callback
)
2 changes: 1 addition & 1 deletion em_workflows/templates/cryo_brt.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ runtime.Positioning.any.thickness = {{ rpa_thickness }}
#Aligned Stack Choices
#Aligned Stack Parameters
runtime.AlignedStack.any.linearInterpolation = 0
runtime.AlignedStack.any.binByFactor = 1
runtime.AlignedStack.any.binByFactor = 4
comparam.golderaser.ccderaser.ExpandCircleIterations = 3

#CTF Correction Parameters
Expand Down
2 changes: 1 addition & 1 deletion em_workflows/templates/plastic_brt.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ runtime.Positioning.any.thickness = {{ rpa_thickness }}
#Aligned Stack Choices
#Aligned Stack Parameters
runtime.AlignedStack.any.linearInterpolation = 0
runtime.AlignedStack.any.binByFactor = 1
runtime.AlignedStack.any.binByFactor = 4
comparam.golderaser.ccderaser.ExpandCircleIterations = 3

#CTF Correction Parameters
Expand Down
1 change: 1 addition & 0 deletions em_workflows/utils/neuroglancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def gen_pyramids(fp_in: FilePath) -> Dict:
"volume-to-precomputed-pyramid",
"--downscaling-method=average",
"--flat",
"--no-gzip",
nifti.as_posix(),
outdir.as_posix(),
]
Expand Down
103 changes: 58 additions & 45 deletions em_workflows/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,19 @@ def notify_api_running(flow: Flow, old_state, new_state) -> State:
tells API the workflow has started to run.
"""
if new_state.is_running():
callback_url = prefect.context.parameters.get("callback_url")
token = prefect.context.parameters.get("token")
headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
response = requests.post(
callback_url, headers=headers, data=json.dumps({"status": "running"})
)
log(response.text)
if prefect.context.parameters.get("no_api"):
log("no_api flag used, not interacting with API")
else:
callback_url = prefect.context.parameters.get("callback_url")
token = prefect.context.parameters.get("token")
headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
response = requests.post(
callback_url, headers=headers, data=json.dumps({"status": "running"})
)
log(response.text)
return new_state


Expand All @@ -506,12 +509,6 @@ def custom_terminal_state_handler(
for task_state in reference_task_states:
if task_state.is_successful():
success = True
callback_url = prefect.context.parameters.get("callback_url")
token = prefect.context.parameters.get("token")
headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
if success:
message = "success"
ns = Success(
Expand All @@ -523,10 +520,19 @@ def custom_terminal_state_handler(
else:
message = "error"
ns = state
response = requests.post(
callback_url, headers=headers, data=json.dumps({"status": message})
)
log(f"Pipeline status is:{message}, {response.text}")
if prefect.context.parameters.get("no_api"):
log(f"no_api flag used, terminal: success is {message}")
else:
callback_url = prefect.context.parameters.get("callback_url")
token = prefect.context.parameters.get("token")
headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
response = requests.post(
callback_url, headers=headers, data=json.dumps({"status": message})
)
log(f"Pipeline status is:{message}, {response.text}")
return ns


Expand All @@ -541,22 +547,24 @@ def notify_api_completion(flow: Flow, old_state, new_state) -> State:
"""
if new_state.is_finished():
status = ""
if new_state.is_successful():
status = "success"
else:
status = "error"
callback_url = prefect.context.parameters.get("callback_url")
token = prefect.context.parameters.get("token")
headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
response = requests.post(
callback_url, headers=headers, data=json.dumps({"status": status})
)
log(f"Pipeline status is:{status}")
log(response.text)
if prefect.context.parameters.get("no_api"):
log(f"no_api flag used, completion: {status}")
else:
callback_url = prefect.context.parameters.get("callback_url")
token = prefect.context.parameters.get("token")
headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
response = requests.post(
callback_url, headers=headers, data=json.dumps({"status": status})
)
log(f"Pipeline status is:{status}")
log(response.text)
return new_state


Expand Down Expand Up @@ -717,9 +725,9 @@ def copy_to_assets_dir(fp: Path, assets_dir: Path, prim_fp: Path = None) -> Path

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=1), trigger=any_successful)
def send_callback_body(
token: str,
callback_url: str,
files_elts: List[Dict],
token: str = None,
callback_url: str = None,
) -> None:
"""
Upon completion of file conversion a callback is made to the calling
Expand Down Expand Up @@ -748,14 +756,19 @@ def send_callback_body(
}
"""
data = {"files": files_elts}
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
response = requests.post(callback_url, headers=headers, data=json.dumps(data))
log(response.url)
log(response.status_code)
log(json.dumps(data))
log(response.text)
log(response.headers)
if response.status_code != 204:
msg = f"Bad response code on callback: {response}"
log(msg=msg)
raise ValueError(msg)
if prefect.context.parameters.get("no_api"):
log("no_api flag used, not interacting with API")
elif callback_url and token:
headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
response = requests.post(callback_url, headers=headers, data=json.dumps(data))
log(response.url)
log(response.status_code)
log(json.dumps(data))
log(response.text)
log(response.headers)
if response.status_code != 204:
msg = f"Bad response code on callback: {response}"
log(msg=msg)
raise ValueError(msg)
else:
raise signals.FAIL(f"Invalid state - need callback_url and token, OR set no_api to True.")
3 changes: 1 addition & 2 deletions test/test_brt.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def test_brt(hpc_env):
LocalAlignments=0,
THICKNESS=30,
input_dir="test/input_files/brt_inputs/Projects/",
token="the_token",
callback_url="https://ptsv2.com/t/",
no_api=True
)
assert result.is_successful()
3 changes: 1 addition & 2 deletions test/test_dm.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def test_input_fname(mock_nfs_mount):
state = flow.run(
input_dir="/test/input_files/dm_inputs/Projects/Lab/PI",
file_name="20210525_1416_A000_G000.dm4",
token="the_token",
callback_url="https://ptsv2.com/t/",
no_api=True,
)
assert state.is_successful()

Expand Down
3 changes: 1 addition & 2 deletions test/test_sem.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def test_sem(mock_nfs_mount):
result = flow.run(
input_dir="/test/input_files/sem_inputs/Projects/",
tilt_angle="30.2",
token="the_token",
callback_url="https://ptsv2.com/t/",
no_api=True
)
assert result.is_successful()

0 comments on commit 3d6ea08

Please sign in to comment.