diff --git a/em_workflows/czi/flow.py b/em_workflows/czi/flow.py index 4c0da0df..082dde20 100644 --- a/em_workflows/czi/flow.py +++ b/em_workflows/czi/flow.py @@ -21,7 +21,7 @@ import SimpleITK as sitk from prefect import flow, task -from pytools.HedwigZarrImages import HedwigZarrImage, HedwigZarrImages +from pytools import HedwigZarrImage, HedwigZarrImages from em_workflows.file_path import FilePath from em_workflows.utils import utils @@ -78,8 +78,11 @@ def copy_zarr_to_assets_dir(file_path: FilePath) -> None: @task -def generate_imageset(file_path: FilePath) -> List[Dict]: +def generate_imageset(file_path: FilePath, + use_default_dask=False) -> List[Dict]: """ + :param: use_default_dask: If True, reuses the Prefect Dask Scheduler for the ZARR and Dask array operations. + | ImageSet consists of all the assets for a particular zarr sub-image and label images | Macro image is ignored | Label image is added as an thumbnail asset @@ -91,16 +94,17 @@ def generate_imageset(file_path: FilePath) -> List[Dict]: """ zarr_fp = f"{file_path.assets_dir}/{file_path.base}.zarr" image_set = list() - zarr_images = HedwigZarrImages(Path(zarr_fp)) + + if use_default_dask: + compute_args = {} + else: + # This task is used in a sub-flow where it's the only task running. + # use all the cores in a thread pool. + compute_args = {"scheduler": "threads"} + zarr_images = HedwigZarrImages(Path(zarr_fp), compute_args=compute_args) # for image_name, image in zarr_images.series(): for k_idx, image_name in enumerate(zarr_images.get_series_keys()): - # The relative path of the zarr group from the root zarr - # this assumes a valid zarr group with OME directory inside - ome_index_to_zarr_group = zarr_images.zarr_root["OME"].attrs["series"] - zarr_idx = ome_index_to_zarr_group[k_idx] - image = HedwigZarrImage( - zarr_images.zarr_root[zarr_idx], zarr_images.ome_info, k_idx - ) + image = zarr_images[k_idx] # single image element image_elt = dict() image_elt["imageMetadata"] = None @@ -119,7 +123,7 @@ def generate_imageset(file_path: FilePath) -> List[Dict]: else: ng_asset = file_path.gen_asset( - asset_type="neuroglancerZarr", asset_fp=Path(zarr_fp) / zarr_idx + asset_type="neuroglancerZarr", asset_fp=image.path ) # note - dims should be image.dims, but GUI does not want XYC # hardcoding in XY for now. @@ -156,7 +160,9 @@ async def generate_czi_imageset(file_path: FilePath) -> List[Dict]: copy_to_assets = copy_zarr_to_assets_dir.submit( file_path, wait_for=[rechunk_result] ) - return generate_imageset.submit(file_path, wait_for=[copy_to_assets]) + return generate_imageset.submit(file_path, + use_default_dask=True, + wait_for=[copy_to_assets]) @task