diff --git a/AFQ/api/bundle_dict.py b/AFQ/api/bundle_dict.py index dcd2185ca..3195c9b35 100644 --- a/AFQ/api/bundle_dict.py +++ b/AFQ/api/bundle_dict.py @@ -775,7 +775,6 @@ def __init__(self, self.resample_to = resample_to self.resample_subject_to = resample_subject_to self.keep_in_memory = keep_in_memory - self.has_bids_info = False self.max_includes = 3 self._dict = {} @@ -814,33 +813,23 @@ def update_max_includes(self, new_max): if new_max > self.max_includes: self.max_includes = new_max - def set_bids_info(self, bids_layout, bids_path, subject, session): - """ - Provide the bids_layout, a nearest path, - and the subject and session information - to load ROIS from BIDS - """ - self.has_bids_info = True - self._bids_info = bids_layout - self._bids_path = bids_path - self._subject = subject - self._session = session - - def _cond_load(self, roi_or_sl, resample_to): - """ - Load ROI or streamline if not already loaded - """ + def _use_bids_info(self, roi_or_sl, bids_layout, bids_path, + subject, session): if isinstance(roi_or_sl, dict): - if not self.has_bids_info: - raise ValueError(( - "Attempted to load an ROI using BIDS description without " - "First providing BIDS information.")) suffix = roi_or_sl.get("suffix", "dwi") roi_or_sl = find_file( - self._bids_info, self._bids_path, + bids_layout, bids_path, roi_or_sl, suffix, - self._session, self._subject) + session, subject) + return roi_or_sl + else: + return roi_or_sl + + def _cond_load(self, roi_or_sl, resample_to): + """ + Load ROI or streamline if not already loaded + """ if isinstance(roi_or_sl, str): if self.seg_algo == "afq": return afd.read_resample_roi( diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 292b67bd8..892ddefc1 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -3,6 +3,8 @@ import tempfile from AFQ.definitions.mapping import SynMap +from AFQ.definitions.utils import Definition +import AFQ.api.bundle_dict as abd warnings.simplefilter(action='ignore', category=FutureWarning) # noqa import logging @@ -22,6 +24,7 @@ from AFQ.version import version as pyafq_version from AFQ.viz.utils import trim import pandas as pd +import pydra import numpy as np import os import os.path as op @@ -32,6 +35,7 @@ import nibabel as nib from PIL import Image from s3bids.utils import S3BIDSStudy +import glob from bids.layout import BIDSLayout, BIDSLayoutIndexer try: @@ -60,6 +64,17 @@ def get_afq_bids_entities_fname(): aus.__file__))) + "/afq_bids_entities.json" +class _ParticipantAFQInputs: + def __init__( + self, dwi_data_file, bval_file, bvec_file, results_dir, + kwargs): + self.dwi_data_file = dwi_data_file + self.bval_file = bval_file + self.bvec_file = bvec_file + self.results_dir = results_dir + self.kwargs = kwargs + + class GroupAFQ(object): f"""{AFQclass_doc}""" @@ -146,6 +161,7 @@ def __init__(self, if not isinstance(bids_layout_kwargs, dict): raise TypeError("bids_layout_kwargs must be a dict") + self.logger = logger self.parallel_params = parallel_params @@ -252,6 +268,7 @@ def __init__(self, self.valid_sub_list = [] self.valid_ses_list = [] self.pAFQ_list = [] + self.pAFQ_inputs_list = [] for subject in self.subjects: self.wf_dict[subject] = {} for session in self.sessions: @@ -296,20 +313,90 @@ def __init__(self, if suffix is not None: bids_filters["suffix"] = suffix + # Call find path for all definitions + for key, value in this_kwargs.items(): + if key == "scalars": + for scalar in this_kwargs["scalars"]: + if isinstance(scalar, Definition): + scalar_found = scalar.find_path( + bids_layout, + dwi_data_file, + subject, + session, + required=False) + if scalar_found is False: + this_kwargs["scalars"].remove(scalar) + elif key == "import_tract": + if isinstance(this_kwargs["import_tract"], dict): + it_res = \ + bids_layout.get( + subject=subject, + session=session, + extension=[ + '.trk', + '.tck', + '.vtk', + '.fib', + '.dpy'], + return_type='filename', + **this_kwargs["import_tract"]) + if len(it_res) < 1: + raise ValueError(( + "No custom tractography found for" + f" subject {subject}" + " and session " + f"{session}.")) + elif len(it_res) > 1: + this_kwargs["import_tract"] = it_res[0] + logger.warning(( + f"Multiple viable custom tractographies found for" + f" subject " + f"{subject} and session " + f"{session}. Will use: {it_res[0]}")) + else: + this_kwargs["import_tract"] = it_res[0] + elif isinstance(value, dict): + for _, subvalue in value.items(): + if isinstance(subvalue, Definition): + subvalue.find_path( + bids_layout, + dwi_data_file, + subject, + session) + elif isinstance(value, Definition): + value.find_path( + bids_layout, + dwi_data_file, + subject, + session) + + # call find path for all ROIs + if "bundle_info" in this_kwargs and isinstance( + this_kwargs["bundle_info"], abd.BundleDict): + for b_name in this_kwargs["bundle_info"].bundle_names: + this_kwargs["bundle_info"].apply_to_rois( + b_name, + this_kwargs["bundle_info"]._use_bids_info, + bids_layout, bids_path, subject, session, + dry_run=False) + self.valid_sub_list.append(subject) self.valid_ses_list.append(str(session)) - this_pAFQ = ParticipantAFQ( + this_pAFQ_inputs = _ParticipantAFQInputs( dwi_data_file, bval_file, bvec_file, results_dir, - _bids_info={ - "bids_layout": bids_layout, - "subject": subject, - "session": session}, - **this_kwargs) + this_kwargs) + this_pAFQ = ParticipantAFQ( + this_pAFQ_inputs.dwi_data_file, + this_pAFQ_inputs.bval_file, + this_pAFQ_inputs.bvec_file, + this_pAFQ_inputs.results_dir, + **this_pAFQ_inputs.kwargs) self.wf_dict[subject][str(session)] = this_pAFQ.wf_dict self.pAFQ_list.append(this_pAFQ) + self.pAFQ_inputs_list.append(this_pAFQ_inputs) def combine_profiles(self): tract_profiles_dict = self.export("profiles") @@ -916,6 +1003,139 @@ def assemble_AFQ_browser(self, output_path=None, metadata=None, sublink=page_subtitle_link) +class ParallelGroupAFQ(): + def __init__(self, *args, **kwargs): + orig = GroupAFQ(*args, **kwargs) + + orig.parallel_params["submitter_params"] = \ + orig.parallel_params.get("submitter_params", {"plugin": "cf"}) + + orig.parallel_params["cache_dir"] = \ + orig.parallel_params.get("cache_dir", None) + + self.parallel_params = orig.parallel_params + self.pAFQ_kwargs = orig.pAFQ_inputs_list + + self.finishing_params = dict() + self.finishing_params["args"] = args + self.finishing_params["kwargs"] = kwargs + self.finishing_params["output_dirs"] = [pAFQ.kwargs["output_dir"] + for pAFQ in orig.pAFQ_list] + + def _submit_pydra(self, runnable): + try: + with pydra.Submitter( + **self.parallel_params["submitter_params"], + ) as sub: + sub(runnable=runnable) + # Addresses https://github.com/nipype/pydra/issues/630 + except AttributeError as e: + if "'NoneType' object has no attribute 'replace'" not in str(e): + raise + + def export(self, attr_name="help", collapse=True): + f""" + Export a specific output. To print a list of available outputs, + call export without arguments. + {valid_exports_string} + + Parameters + ---------- + attr_name : str + Name of the output to export. Default: "help" + collapse : bool + Whether to collapse session dimension if there is only 1 session. + Default: True + + Returns + ------- + output : dict + The specific output as a dictionary. Keys are subjects. + Values are dictionaries with keys of sessions + if multiple sessions are used. Otherwise, values are + the output. + None if called without arguments. + """ + + @pydra.mark.task + def export_sub(pAFQ_kwargs, attr_name): + pAFQ = ParticipantAFQ( + pAFQ_kwargs.dwi_data_file, + pAFQ_kwargs.bval_file, + pAFQ_kwargs.bvec_file, + pAFQ_kwargs.results_dir, + **pAFQ_kwargs.kwargs) + pAFQ.export(attr_name) + + # Submit to pydra + export_sub_task = export_sub( + attr_name=attr_name, + cache_dir=self.parallel_params["cache_dir"] + ).split("pAFQ_kwargs", pAFQ_kwargs=self.pAFQ_kwargs) + self._submit_pydra(export_sub_task) + + def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): + """ Exports all the possible outputs + + Parameters + ---------- + viz : bool + Whether to output visualizations. This includes tract profile + plots, a figure containing all bundles, and, if using the AFQ + segmentation algorithm, individual bundle figures. + Default: True + afqbrowser : bool + Whether to output an AFQ-Browser from this AFQ instance. + Default: True + xforms : bool + Whether to output the reg_template image in subject space and, + depending on if it is possible based on the mapping used, to + output the b0 in template space. + Default: True + indiv : bool + Whether to output individual bundles in their own files, in + addition to the one file containing all bundles. If using + the AFQ segmentation algorithm, individual ROIs are also + output. + Default: True + """ + @pydra.mark.task + def export_sub( + pAFQ_kwargs, + finishing_params, + viz, + afqbrowser, + xforms, + indiv + ): + pAFQ = ParticipantAFQ( + pAFQ_kwargs.dwi_data_file, + pAFQ_kwargs.bval_file, + pAFQ_kwargs.bvec_file, + pAFQ_kwargs.results_dir, + **pAFQ_kwargs.kwargs) + pAFQ.export_all(viz, xforms, indiv) + + for dir in finishing_params["output_dirs"]: + if not glob.glob(op.join(dir, "*_desc-profiles_dwi.csv")): + return + + gAFQ = GroupAFQ(*finishing_params["args"], + **finishing_params["kwargs"]) + gAFQ.export_all(viz, afqbrowser, xforms, indiv) + + # Submit to pydra + export_sub_task = export_sub( + finishing_params=self.finishing_params, + viz=viz, + afqbrowser=afqbrowser, + xforms=xforms, + indiv=indiv, + cache_dir=self.parallel_params["cache_dir"] + ).split("pAFQ_kwargs", pAFQ_kwargs=self.pAFQ_kwargs) + self._submit_pydra(export_sub_task) + + def download_and_combine_afq_profiles(bucket, study_s3_prefix="", deriv_name=None, out_file=None, diff --git a/AFQ/api/participant.py b/AFQ/api/participant.py index 4c9fc78d1..da543a3a3 100644 --- a/AFQ/api/participant.py +++ b/AFQ/api/participant.py @@ -34,10 +34,9 @@ def __init__(self, dwi_data_file, bval_file, bvec_file, output_dir, - _bids_info=None, **kwargs): """ - Initialize a ParticipantAFQ object from a BIDS dataset. + Initialize a ParticipantAFQ object. Parameters ---------- @@ -49,10 +48,6 @@ def __init__(self, Path to bvec file. output_dir : str Path to output directory. - _bids_info : dict or None, optional - This should be left as None in most cases. It - is used by GroupAFQ to provide information about - the BIDS layout to each participant. kwargs : additional optional parameters You can set additional parameters for any step of the process. See :ref:`usage/kwargs` for more details. @@ -71,9 +66,6 @@ def __init__(self, In tracking_params, parameters with the suffix mask which are also an image from AFQ.definitions.image will be handled automatically by the api. - - It is recommended that you leave the bids_info parameter as None, - and instead pass in the paths to the files you want to use directly. """ if not isinstance(output_dir, str): raise TypeError( @@ -96,14 +88,12 @@ def __init__(self, "did you mean tracking_params ?")) self.logger = logging.getLogger('AFQ') - self.output_dir = output_dir self.kwargs = dict( - dwi_path=dwi_data_file, - bval=bval_file, - bvec=bvec_file, - results_dir=output_dir, - bids_info=_bids_info, + dwi_data_file=dwi_data_file, + bval_file=bval_file, + bvec_file=bvec_file, + output_dir=output_dir, base_fname=get_base_fname(output_dir, dwi_data_file), **kwargs) self.make_workflow() @@ -114,15 +104,22 @@ def make_workflow(self): self.kwargs["mapping_definition"], SlrMap): plans = { # if using SLR map, do tractography first "data": get_data_plan(self.kwargs), - "tractography": get_tractography_plan(self.kwargs), - "mapping": get_mapping_plan(self.kwargs, use_sls=True), + "tractography": get_tractography_plan( + self.kwargs + ), + "mapping": get_mapping_plan( + self.kwargs, + use_sls=True + ), "segmentation": get_segmentation_plan(self.kwargs), "viz": get_viz_plan(self.kwargs)} else: plans = { # Otherwise, do mapping first "data": get_data_plan(self.kwargs), "mapping": get_mapping_plan(self.kwargs), - "tractography": get_tractography_plan(self.kwargs), + "tractography": get_tractography_plan( + self.kwargs + ), "segmentation": get_segmentation_plan(self.kwargs), "viz": get_viz_plan(self.kwargs)} @@ -280,7 +277,7 @@ def participant_montage(self, images_per_row=2): def _save_file(curr_img): save_path = op.abspath(op.join( - self.output_dir, + self.kwargs["output_dir"], "bundle_montage.png")) curr_img.save(save_path) all_fnames.append(save_path) @@ -374,7 +371,7 @@ def cmd_outputs(self, cmd="rm", dependent_on=None, exceptions=[], " to a filename and will be ignored.")) apply_cmd_to_afq_derivs( - self.output_dir, + self.kwargs["output_dir"], self.export("base_fname"), cmd=cmd, exception_file_names=exception_file_names, diff --git a/AFQ/definitions/image.py b/AFQ/definitions/image.py index 9b5086244..c0181af4e 100644 --- a/AFQ/definitions/image.py +++ b/AFQ/definitions/image.py @@ -138,16 +138,13 @@ def find_path(self, bids_layout, from_path, if nearest_image is None: return False - if session not in self.fnames: - self.fnames[session] = {} - self.fnames[session][subject] = nearest_image + self.fnames[from_path] = nearest_image - def get_path_data_affine(self, bids_info): + def get_path_data_affine(self, dwi_path): if self._from_path: image_file = self.fname else: - image_file = self.fnames[ - bids_info['session']][bids_info['subject']] + image_file = self.fnames[dwi_path] image_img = nib.load(image_file) return image_file, image_img.get_fdata(), image_img.affine @@ -159,10 +156,10 @@ def get_name(self): return name_from_path(self.fname) if self._from_path else self.suffix def get_image_getter(self, task_name): - def _image_getter_helper(dwi, bids_info): + def _image_getter_helper(dwi, dwi_data_file): # Load data image_file, image_data_orig, image_affine = \ - self.get_path_data_affine(bids_info) + self.get_path_data_affine(dwi_data_file) # Apply any conditions on the data image_data, meta = self.apply_conditions( @@ -178,11 +175,11 @@ def _image_getter_helper(dwi, bids_info): image_data.astype(np.float32), dwi.affine), meta if task_name == "data": - def image_getter(dwi, bids_info): - return _image_getter_helper(dwi, bids_info) + def image_getter(dwi, dwi_data_file): + return _image_getter_helper(dwi, dwi_data_file) else: - def image_getter(data_imap, bids_info): - return _image_getter_helper(data_imap["dwi"], bids_info) + def image_getter(data_imap, dwi_data_file): + return _image_getter_helper(data_imap["dwi"], dwi_data_file) return image_getter diff --git a/AFQ/definitions/mapping.py b/AFQ/definitions/mapping.py index f889f2527..64061a99a 100644 --- a/AFQ/definitions/mapping.py +++ b/AFQ/definitions/mapping.py @@ -120,15 +120,14 @@ def find_path(self, bids_layout, from_path, bids_layout, from_path, self.space_filters, self.space_suffix, session, subject, required=required) - self.fnames[session][subject] = (nearest_warp, nearest_space) + self.fnames[from_path] = (nearest_warp, nearest_space) - def get_for_subses(self, base_fname, dwi, bids_info, reg_subject, + def get_for_subses(self, base_fname, dwi, dwi_data_file, reg_subject, reg_template): if self._from_path: nearest_warp, nearest_space = self.fnames else: - nearest_warp, nearest_space = self.fnames[ - bids_info['session']][bids_info['subject']] + nearest_warp, nearest_space = self.fnames[dwi_data_file] our_templ = reg_template subj = Image(dwi) @@ -182,7 +181,7 @@ class IdentityMap(Definition): def __init__(self): pass - def get_for_subses(self, base_fname, dwi, bids_info, reg_subject, + def get_for_subses(self, base_fname, dwi, dwi_data_file, reg_subject, reg_template): return ConformedAffineMapping( np.identity(4), @@ -231,7 +230,7 @@ def prealign(self, base_fname, reg_subject, reg_template, save=True): write_json(meta_fname, meta) return prealign_file if save else np.load(prealign_file) - def get_for_subses(self, base_fname, dwi, bids_info, reg_subject, + def get_for_subses(self, base_fname, dwi, dwi_data_file, reg_subject, reg_template, subject_sls=None, template_sls=None): mapping_file, meta_fname = self.get_fnames( self.extension, base_fname) diff --git a/AFQ/tasks/data.py b/AFQ/tasks/data.py index 3afccb973..cb782124d 100644 --- a/AFQ/tasks/data.py +++ b/AFQ/tasks/data.py @@ -42,7 +42,7 @@ @pimms.calc("data", "gtab", "dwi", "dwi_affine") -def get_data_gtab(dwi_path, bval, bvec, min_bval=None, +def get_data_gtab(dwi_data_file, bval_file, bvec_file, min_bval=None, max_bval=None, filter_b=True, b0_threshold=50): """ DWI data as an ndarray for selected b values, @@ -67,8 +67,8 @@ def get_data_gtab(dwi_path, bval, bvec, min_bval=None, The value of b under which it is considered to be b0. Default: 50. """ - img = nib.load(dwi_path) - bvals, bvecs = read_bvals_bvecs(bval, bvec) + img = nib.load(dwi_data_file) + bvals, bvecs = read_bvals_bvecs(bval_file, bvec_file) data = img.get_fdata() if filter_b and (min_bval is not None): @@ -92,15 +92,15 @@ def get_data_gtab(dwi_path, bval, bvec, min_bval=None, @pimms.calc("b0") @as_file('_desc-b0_dwi.nii.gz') -def b0(dwi_path, gtab): +def b0(dwi_data_file, gtab): """ full path to a nifti file containing the mean b0 """ - data = nib.load(dwi_path) + data = nib.load(dwi_data_file) mean_b0 = np.mean(data.get_fdata()[..., gtab.b0s_mask], -1) mean_b0_img = nib.Nifti1Image(mean_b0, data.affine) meta = dict(b0_threshold=gtab.b0_threshold, - source=dwi_path) + source=dwi_data_file) return mean_b0_img, meta @@ -136,7 +136,8 @@ def dti_fit(dti_params, gtab): @as_file(suffix='_odfmodel-DTI_desc-diffmodel_dwi.nii.gz') @as_img def dti_params(brain_mask, data, gtab, - bval, bvec, b0_threshold=50, robust_tensor_fitting=False): + bval_file, bvec_file, b0_threshold=50, + robust_tensor_fitting=False): """ full path to a nifti file containing parameters for the DTI fit @@ -155,7 +156,7 @@ def dti_params(brain_mask, data, gtab, nib.load(brain_mask).get_fdata() if robust_tensor_fitting: bvals, _ = read_bvals_bvecs( - bval, bvec) + bval_file, bvec_file) sigma = noise_from_b0( data, gtab, bvals, mask=mask, b0_threshold=b0_threshold) @@ -918,7 +919,7 @@ def brain_mask(b0, brain_mask_definition=None): @pimms.calc("bundle_dict", "reg_template") def get_bundle_dict(segmentation_params, - brain_mask, bids_info, b0, + brain_mask, b0, bundle_info=None, reg_template_spec="mni_T1"): """ Dictionary defining the different bundles to be segmented, @@ -995,12 +996,6 @@ def get_bundle_dict(segmentation_params, seg_algo=segmentation_params["seg_algo"], resample_to=reg_template) - if bids_info is not None: - bundle_dict.set_bids_info( - bids_info["bids_layout"], - b0, - bids_info["subject"], - bids_info["session"]) return bundle_dict, reg_template @@ -1027,7 +1022,7 @@ def get_data_plan(kwargs): csd_params, get_bundle_dict]) if "scalars" not in kwargs: - bvals, _ = read_bvals_bvecs(kwargs["bval"], kwargs["bvec"]) + bvals, _ = read_bvals_bvecs(kwargs["bval_file"], kwargs["bvec_file"]) if len(dpg.unique_bvals_magnitude(bvals)) > 2: kwargs["scalars"] = [ "dki_fa", "dki_md", @@ -1050,12 +1045,6 @@ def get_data_plan(kwargs): if not isinstance(bm_def, Definition): raise TypeError( "brain_mask_definition must be a Definition") - if kwargs["bids_info"] is not None: - bm_def.find_path( - kwargs["bids_info"]["bids_layout"], - kwargs["dwi_path"], - kwargs["bids_info"]["subject"], - kwargs["bids_info"]["session"]) data_tasks["brain_mask_res"] = pimms.calc("brain_mask")( as_file(( f'_desc-{str_to_desc(bm_def.get_name())}' diff --git a/AFQ/tasks/decorators.py b/AFQ/tasks/decorators.py index c1bb6a6f6..528c3b805 100644 --- a/AFQ/tasks/decorators.py +++ b/AFQ/tasks/decorators.py @@ -116,7 +116,7 @@ def as_file(suffix, include_track=False, include_seg=False): and only run if not already found """ def _as_file(func): - needed_args = ["base_fname", "results_dir"] + needed_args = ["base_fname", "output_dir"] if include_track: needed_args.append("tracking_params") if include_seg: @@ -125,11 +125,11 @@ def _as_file(func): @functools.wraps(func) @has_args(func, needed_args) def wrapper_as_file(*args, **kwargs): - og_arg_count, base_fname, results_dir, \ + og_arg_count, base_fname, output_dir, \ tracking_params, segmentation_params =\ extract_added_args( func, - ["base_fname", "results_dir", + ["base_fname", "output_dir", "tracking_params", "segmentation_params"], args, includes=[True, True, include_track, include_seg]) @@ -174,7 +174,7 @@ def wrapper_as_file(*args, **kwargs): # modify meta source to be relative if "source" in meta: - meta["source"] = op.relpath(meta["source"], results_dir) + meta["source"] = op.relpath(meta["source"], output_dir) meta_fname = get_fname( base_fname, f"{drop_extension(suffix)}.json", diff --git a/AFQ/tasks/mapping.py b/AFQ/tasks/mapping.py index c8807d8cd..5971fa994 100644 --- a/AFQ/tasks/mapping.py +++ b/AFQ/tasks/mapping.py @@ -47,13 +47,13 @@ def template_xform(mapping, data_imap): @pimms.calc("rois") -def export_rois(base_fname, results_dir, data_imap, mapping): +def export_rois(base_fname, output_dir, data_imap, mapping): """ dictionary of full paths to Nifti1Image files of ROIs transformed to subject space """ bundle_dict = data_imap["bundle_dict"] - rois_dir = op.join(results_dir, 'ROIs') + rois_dir = op.join(output_dir, 'ROIs') os.makedirs(rois_dir, exist_ok=True) roi_files = {} base_roi_fname = op.join(rois_dir, op.split(base_fname)[1]) @@ -71,7 +71,7 @@ def export_rois(base_fname, results_dir, data_imap, mapping): @pimms.calc("mapping") -def mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, +def mapping(base_fname, dwi_data_file, reg_subject, data_imap, mapping_definition=None): """ mapping from subject to template space. @@ -93,19 +93,13 @@ def mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, raise TypeError( "mapping must be a mapping defined" + " in `AFQ.definitions.mapping`") - if bids_info is not None: - mapping_definition.find_path( - bids_info["bids_layout"], - dwi_path, - bids_info["subject"], - bids_info["session"]) return mapping_definition.get_for_subses( - base_fname, data_imap["dwi"], bids_info, + base_fname, data_imap["dwi"], dwi_data_file, reg_subject, reg_template) @pimms.calc("mapping") -def sls_mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, +def sls_mapping(base_fname, dwi_data_file, reg_subject, data_imap, tractography_imap, mapping_definition=None): """ mapping from subject to template space. @@ -127,12 +121,6 @@ def sls_mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, raise TypeError( "mapping must be a mapping defined" + " in `AFQ.definitions.mapping`") - if bids_info is not None: - mapping_definition.find_path( - bids_info["bids_layout"], - dwi_path, - bids_info["subject"], - bids_info["session"]) streamlines_file = tractography_imap["streamlines"] tg = load_tractogram( streamlines_file, reg_subject, @@ -151,7 +139,8 @@ def sls_mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, atlas_fname, 'same', bbox_valid_check=False) return mapping_definition.get_for_subses( - base_fname, data_imap["dwi"], bids_info, + base_fname, data_imap["dwi"], + dwi_data_file, reg_subject, reg_template, subject_sls=tg.streamlines, template_sls=hcp_atlas.streamlines) @@ -205,42 +194,21 @@ def get_mapping_plan(kwargs, use_sls=False): export_registered_b0, template_xform, export_rois, mapping, get_reg_subject]) - bids_info = kwargs.get("bids_info", None) # add custom scalars for scalar in kwargs["scalars"]: if isinstance(scalar, Definition): - if bids_info is None: - scalar.find_path( - None, - kwargs["dwi_path"], - None, - None) - scalar_found = True - else: - scalar_found = scalar.find_path( - bids_info["bids_layout"], - kwargs["dwi_path"], - bids_info["subject"], - bids_info["session"], - required=False) - if scalar_found != False: - mapping_tasks[f"{scalar.get_name()}_res"] =\ - pimms.calc(f"{scalar.get_name()}")( - as_file(( - f'_desc-{str_to_desc(scalar.get_name())}' - '_dwi.nii.gz'))( - scalar.get_image_getter("mapping"))) + mapping_tasks[f"{scalar.get_name()}_res"] =\ + pimms.calc(f"{scalar.get_name()}")( + as_file(( + f'_desc-{str_to_desc(scalar.get_name())}' + '_dwi.nii.gz'))( + scalar.get_image_getter("mapping"))) if use_sls: mapping_tasks["mapping_res"] = sls_mapping reg_ss = kwargs.get("reg_subject_spec", None) if isinstance(reg_ss, ImageDefinition): - reg_ss.find_path( - bids_info["bids_layout"], - kwargs["dwi_path"], - bids_info["subject"], - bids_info["session"]) del kwargs["reg_subject_spec"] mapping_tasks["reg_subject_spec_res"] = pimms.calc("reg_subject_spec")( as_file(( diff --git a/AFQ/tasks/segmentation.py b/AFQ/tasks/segmentation.py index 22122bdee..d20050f1e 100644 --- a/AFQ/tasks/segmentation.py +++ b/AFQ/tasks/segmentation.py @@ -109,7 +109,7 @@ def segment(data_imap, mapping_imap, @pimms.calc("indiv_bundles") -def export_bundles(base_fname, results_dir, +def export_bundles(base_fname, output_dir, bundles, tracking_params, segmentation_params): @@ -123,7 +123,7 @@ def export_bundles(base_fname, results_dir, else: extension = ".trk" - bundles_dir = op.join(results_dir, "bundles") + bundles_dir = op.join(output_dir, "bundles") os.makedirs(bundles_dir, exist_ok=True) seg_sft = aus.SegmentedSFT.fromfile(bundles) for bundle in seg_sft.bundle_names: diff --git a/AFQ/tasks/tractography.py b/AFQ/tasks/tractography.py index 2c7a04d68..248696db6 100644 --- a/AFQ/tasks/tractography.py +++ b/AFQ/tasks/tractography.py @@ -154,7 +154,7 @@ def streamlines(data_imap, seed, stop, @pimms.calc("streamlines") -def custom_tractography(bids_info, import_tract=None): +def custom_tractography(import_tract=None): """ full path to the complete, unsegmented tractography file @@ -166,42 +166,10 @@ def custom_tractography(bids_info, import_tract=None): to generate the tractography. Default: None """ - if not isinstance(import_tract, dict) and\ - not isinstance(import_tract, str): + if not isinstance(import_tract, str): raise TypeError( "import_tract must be" + " either a dict or a str") - if isinstance(import_tract, dict): - if bids_info is None: - raise ValueError(( - "bids_info must be provided if using" - " bids filters to find imported tracts")) - import_tract = \ - bids_info["bids_layout"].get( - subject=bids_info["subject"], - session=bids_info["session"], - extension=[ - '.trk', - '.tck', - '.vtk', - '.fib', - '.dpy'], - return_type='filename', - **import_tract) - if len(import_tract) < 1: - raise ValueError(( - "No custom tractography found for subject " - f"{bids_info['subject']} and session " - f"{bids_info['session']}.")) - elif len(import_tract) > 1: - import_tract = import_tract[0] - logger.warning(( - f"Multiple viable custom tractographies found for" - f" subject " - f"{bids_info['subject']} and session " - f"{bids_info['session']}. Will use: {import_tract}")) - else: - import_tract = import_tract[0] return import_tract @@ -312,21 +280,6 @@ def get_tractography_plan(kwargs): stop_mask = kwargs["tracking_params"]['stop_mask'] seed_mask = kwargs["tracking_params"]['seed_mask'] - bids_info = kwargs["bids_info"] - - if bids_info is not None: - if isinstance(stop_mask, Definition): - stop_mask.find_path( - bids_info["bids_layout"], - kwargs["dwi_path"], - bids_info["subject"], - bids_info["session"]) - if isinstance(seed_mask, Definition): - seed_mask.find_path( - bids_info["bids_layout"], - kwargs["dwi_path"], - bids_info["subject"], - bids_info["session"]) if kwargs["tracking_params"]["tracker"] == "pft": probseg_funcs = stop_mask.get_image_getter("tractography") diff --git a/AFQ/tasks/viz.py b/AFQ/tasks/viz.py index 0aa25ef29..a4ed491ac 100644 --- a/AFQ/tasks/viz.py +++ b/AFQ/tasks/viz.py @@ -144,7 +144,7 @@ def viz_bundles(base_fname, @pimms.calc("indiv_bundles_figures") def viz_indivBundle(base_fname, - results_dir, + output_dir, viz_backend, data_imap, mapping_imap, @@ -273,7 +273,7 @@ def viz_indivBundle(base_fname, interact=False, figure=figure) - roi_dir = op.join(results_dir, 'viz_bundles') + roi_dir = op.join(output_dir, 'viz_bundles') os.makedirs(roi_dir, exist_ok=True) figures[bundle_name] = figure if "no_gif" not in viz_backend.backend: @@ -288,7 +288,7 @@ def viz_indivBundle(base_fname, fname = op.join(roi_dir, fname[1]) viz_backend.create_gif(figure, fname) if "plotly" in viz_backend.backend: - roi_dir = op.join(results_dir, 'viz_bundles') + roi_dir = op.join(output_dir, 'viz_bundles') os.makedirs(roi_dir, exist_ok=True) fname = op.split( get_fname( @@ -302,7 +302,7 @@ def viz_indivBundle(base_fname, figure.write_html(fname) # also do the core visualizations when using the plotly backend - core_dir = op.join(results_dir, 'viz_core_bundles') + core_dir = op.join(output_dir, 'viz_core_bundles') os.makedirs(core_dir, exist_ok=True) indiv_profile = profiles[ profiles.tractID == bundle_name][best_scalar].to_numpy() diff --git a/AFQ/tests/test_api.py b/AFQ/tests/test_api.py index ec230625c..016ea48c9 100644 --- a/AFQ/tests/test_api.py +++ b/AFQ/tests/test_api.py @@ -5,6 +5,7 @@ import subprocess import gc import random +import concurrent.futures import toml @@ -25,7 +26,7 @@ from dipy.io.streamline import load_tractogram import AFQ.api.bundle_dict as abd -from AFQ.api.group import GroupAFQ +from AFQ.api.group import GroupAFQ, ParallelGroupAFQ from AFQ.api.participant import ParticipantAFQ import AFQ.data.fetch as afd import AFQ.utils.streamlines as aus @@ -420,7 +421,7 @@ def test_AFQ_anisotropic(): # check that the apm map was made myafq.export("mapping") assert op.exists(op.join( - myafq.export("results_dir")["01"], + myafq.export("output_dir")["01"], 'sub-01_ses-01_odfmodel-CSD_desc-APM_dwi.nii.gz')) @@ -597,6 +598,12 @@ def test_AFQ_reco80(): npt.assert_(len(seg_sft.get_bundle('CCMid').streamlines) > 0) +def test_AFQ_pydra(): + _, bids_path = afd.fetch_hbn_preproc(["NDARAA948VFH", "NDARAV554TP2"]) + pga = ParallelGroupAFQ(bids_path, preproc_pipeline="qsiprep") + pga.export("dti_fa") + + @pytest.mark.nightly_pft def test_AFQ_pft(): """ @@ -838,18 +845,18 @@ def test_AFQ_data_waypoint(): np.linalg.inv(dwi_affine))) mapping_file = op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), 'sub-01_ses-01_desc-mapping_from-DWI_to-MNI_xform.nii.gz') nib.save(mapping, mapping_file) reg_prealign_file = op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), 'sub-01_ses-01_desc-prealign_from-DWI_to-MNI_xform.npy') np.save(reg_prealign_file, np.eye(4)) # Test ROI exporting: myafq.export("rois") assert op.exists(op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), 'ROIs', 'sub-01_ses-01_space-subject_desc-RightCorticospinalinclude1_mask.json')) # noqa @@ -861,7 +868,7 @@ def test_AFQ_data_waypoint(): # Test bundles exporting: myafq.export("indiv_bundles") assert op.exists(op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), 'bundles', 'sub-01_ses-01_coordsys-RASMM_trkmethod-probCSD_recogmethod-AFQ_desc-LeftCorticospinal_tractography.trk')) # noqa @@ -874,26 +881,26 @@ def test_AFQ_data_waypoint(): myafq.export("indiv_bundles_figures") assert op.exists(op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), "viz_bundles", 'sub-01_ses-01_coordsys-RASMM_trkmethod-probCSD_recogmethod-AFQ_desc-LeftSuperiorLongitudinalviz_dwi.html')) # noqa assert op.exists(op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), "viz_bundles", 'sub-01_ses-01_coordsys-RASMM_trkmethod-probCSD_recogmethod-AFQ_desc-LeftSuperiorLongitudinalviz_dwi.html')) # noqa # Before we run the CLI, we'll remove the bundles and ROI folders, to see # that the CLI generates them - shutil.rmtree(op.join(myafq.export("results_dir"), + shutil.rmtree(op.join(myafq.export("output_dir"), 'bundles')) - shutil.rmtree(op.join(myafq.export("results_dir"), + shutil.rmtree(op.join(myafq.export("output_dir"), 'ROIs')) os.remove(tract_profile_fname) # save memory - results_dir = myafq.export("results_dir") + output_dir = myafq.export("output_dir") del myafq gc.collect() @@ -957,11 +964,11 @@ def test_AFQ_data_waypoint(): # Make sure the CLI did indeed generate these: assert op.exists(op.join( - results_dir, + output_dir, 'ROIs', 'sub-01_ses-01_space-subject_desc-RightSuperiorLongitudinalinclude1_mask.json')) # noqa assert op.exists(op.join( - results_dir, + output_dir, 'bundles', 'sub-01_ses-01_coordsys-RASMM_trkmethod-probCSD_recogmethod-AFQ_desc-RightSuperiorLongitudinal_tractography.trk')) # noqa diff --git a/AFQ/tests/test_definitions.py b/AFQ/tests/test_definitions.py index 7e70179f5..de1f774b1 100644 --- a/AFQ/tests/test_definitions.py +++ b/AFQ/tests/test_definitions.py @@ -62,7 +62,7 @@ def test_find_path(subject, session): image_file = ImageFile(suffix="seg", filters={'scope': 'synthetic'}) image_file.find_path(bids_layout, test_dwi_path, subject, session) - assert image_file.fnames[session][subject] == op.join( + assert image_file.fnames[test_dwi_path] == op.join( bids_dir, "derivatives", "dmriprep", "sub-" + subject, "ses-" + session, "anat", "seg.nii.gz" ) diff --git a/setup.cfg b/setup.cfg index 94188e416..3aea1e1e4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,7 @@ install_requires = pybids>=0.16.2 templateflow>=0.8 pimms + pydra joblib>=1.3.2 dask>=1.1 s3bids>=0.1.7