From 5f40f70b53b3fd54abe13645776749dc99a07122 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Wed, 10 Apr 2024 10:35:59 -0700 Subject: [PATCH 01/17] Adds ParallelGroupAFQ --- AFQ/api/group.py | 216 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 215 insertions(+), 1 deletion(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 292b67bd8..b2dcdf5ad 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -147,7 +147,9 @@ def __init__(self, raise TypeError("bids_layout_kwargs must be a dict") self.logger = logger - + self.bids_path = bids_path + self.output_dir = output_dir + self.preproc_pipeline = preproc_pipeline self.parallel_params = parallel_params self.wf_dict = {} @@ -916,6 +918,218 @@ def assemble_AFQ_browser(self, output_path=None, metadata=None, sublink=page_subtitle_link) +class ParallelGroupAFQ(GroupAFQ): + + def __init__(self, + bids_path, + bids_filters={"suffix": "dwi"}, + preproc_pipeline="all", + participant_labels=None, + output_dir=None, + parallel_params={"engine": "serial"}, + bids_layout_kwargs={}, + plugin="cf", + sbatch_args=None, + cache_dir=None, + **kwargs): + + super().__init__(bids_path, + bids_filters, + preproc_pipeline, + participant_labels, + output_dir, + parallel_params, + bids_layout_kwargs, + **kwargs) + + self.plugin = plugin + self.sbatch_args = sbatch_args + self.cache_dir = cache_dir + + def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): + """ Exports all the possible outputs + + Parameters + ---------- + viz : bool + Whether to output visualizations. This includes tract profile + plots, a figure containing all bundles, and, if using the AFQ + segmentation algorithm, individual bundle figures. + Default: True + afqbrowser : bool + Whether to output an AFQ-Browser from this AFQ instance. + Default: True + xforms : bool + Whether to output the reg_template image in subject space and, + depending on if it is possible based on the mapping used, to + output the b0 in template space. + Default: True + indiv : bool + Whether to output individual bundles in their own files, in + addition to the one file containing all bundles. If using + the AFQ segmentation algorithm, individual ROIs are also + output. + Default: True + """ + import pydra + + bids_contents = os.listdir(self.bids_path) + bids_contents.remove("derivatives") + + for subject_id in bids_contents: + if not op.isdir(op.join(self.bids_path, subject_id)): + continue + + export_sub_task = pydra.mark.task( + self.export_sub( + subject_id, + viz, + afqbrowser, + xforms, + indiv + ), + cache_dir=self.cache_dir + ) + + # Replace with Ray + with pydra.Submitter( + plugin=self.plugin, sbatch_args=self.sbatch_args + ) as sub: + try: + sub(runnable=export_sub_task) + except: + self.logger.info(f"Error submitting {subject_id}") + + def export_sub(self, subject_id, viz=True, afqbrowser=True, xforms=True, + indiv=True): + """ Exports all the possible outputs + + Parameters + ---------- + viz : bool + Whether to output visualizations. This includes tract profile + plots, a figure containing all bundles, and, if using the AFQ + segmentation algorithm, individual bundle figures. + Default: True + afqbrowser : bool + Whether to output an AFQ-Browser from this AFQ instance. + Default: True + xforms : bool + Whether to output the reg_template image in subject space and, + depending on if it is possible based on the mapping used, to + output the b0 in template space. + Default: True + indiv : bool + Whether to output individual bundles in their own files, in + addition to the one file containing all bundles. If using + the AFQ segmentation algorithm, individual ROIs are also + output. + Default: True + """ + import shutil + import pandas as pd + + bids_path = self.bids_path + output_dir = self.output_dir + + bids_location = op.dirname(op.abspath(bids_path)) + bids_name = op.basename(op.abspath(bids_path)) + output_dir_name = op.basename(op.abspath(output_dir)) + + self.bids_path = op.join( + bids_location, + bids_name + f"_{subject_id}" + ) + + self.output_dir = op.join( + self.bids_path, + "derivatives", + output_dir_name + ) + + if op.exists(self.bids_path): + shutil.rmtree(self.bids_path) + + os.makedirs( + op.join( + self.bids_path, + "derivatives", + self.preproc_pipeline + ) + ) + + shutil.copyfile( + op.join(bids_path, "dataset_description.json"), + op.join(self.bids_path, "dataset_description.json") + ) + + shutil.copytree( + op.join(bids_path, subject_id), + op.join(self.bids_path, subject_id) + ) + + shutil.copyfile( + op.join( + bids_path, + "derivatives", + self.preproc_pipeline, + "dataset_description.json" + ), + op.join( + self.bids_path, + "derivatives", + self.preproc_pipeline, + "dataset_description.json" + ) + ) + + shutil.copytree( + op.join( + bids_path, + "derivatives", + self.preproc_pipeline, + subject_id + ), + op.join( + self.bids_path, + "derivatives", + self.preproc_pipeline, + subject_id + ) + ) + + super(self.export_all(viz, afqbrowser, xforms, indiv)) + + if not op.exists(op.join(output_dir, "dataset_description.json")): + shutil.copyfile( + op.join(self.output_dir, "dataset_description.json"), + op.join(output_dir, "dataset_description.json") + ) + + if not op.exists(op.join(output_dir, "tract_profiles.csv")): + shutil.copyfile(op.join(self.output_dir, "tract_profiles.csv"), + op.join(output_dir, "tract_profiles.csv") + ) + else: + df = pd.read_csv( + op.join(output_dir, "tract_profiles.csv"), index_col=0 + ) + df_sub = pd.read_csv( + op.join(self.output_dir, "tract_profiles.csv") + ) + df = df.append(df_sub, ignore_index=True) + df.to_csv(op.join(output_dir, "tract_profiles.csv")) + + shutil.copytree( + op.join(self.output_dir, subject_id), + op.join(output_dir, subject_id) + ) + + shutil.rmtree(self.bids_path) + + self.logger.info(f"{subject_id} finished") + + def download_and_combine_afq_profiles(bucket, study_s3_prefix="", deriv_name=None, out_file=None, From 59ad602cabe7cba4e14915b7b1a50a24585bbcf4 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Thu, 11 Apr 2024 19:43:37 -0700 Subject: [PATCH 02/17] Recursion fix and other updates --- AFQ/api/group.py | 68 +++++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 41 deletions(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index b2dcdf5ad..341d29f83 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -40,6 +40,8 @@ except ImportError: using_afqb = False +import shutil + __all__ = ["GroupAFQ", "get_afq_bids_entities_fname"] @@ -919,33 +921,6 @@ def assemble_AFQ_browser(self, output_path=None, metadata=None, class ParallelGroupAFQ(GroupAFQ): - - def __init__(self, - bids_path, - bids_filters={"suffix": "dwi"}, - preproc_pipeline="all", - participant_labels=None, - output_dir=None, - parallel_params={"engine": "serial"}, - bids_layout_kwargs={}, - plugin="cf", - sbatch_args=None, - cache_dir=None, - **kwargs): - - super().__init__(bids_path, - bids_filters, - preproc_pipeline, - participant_labels, - output_dir, - parallel_params, - bids_layout_kwargs, - **kwargs) - - self.plugin = plugin - self.sbatch_args = sbatch_args - self.cache_dir = cache_dir - def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): """ Exports all the possible outputs @@ -973,27 +948,35 @@ def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): """ import pydra - bids_contents = os.listdir(self.bids_path) - bids_contents.remove("derivatives") + if "plugin" not in self.parallel_params: + self.parallel_params["plugin"] = "cf" - for subject_id in bids_contents: - if not op.isdir(op.join(self.bids_path, subject_id)): - continue + # Assume slurm if sbatch_args are specified + if "sbatch_args" in self.parallel_params: + if self.parallel_params["sbatch_args"] is not None: + self.parallel_params["plugin"] = "slurm" + if "sbatch_args" not in self.parallel_params: + self.parallel_params["sbatch_args"] = None + + if "cache_dir" not in self.parallel_params: + self.parallel_params["cache_dir"] = None + + for subject_id in self.subjects: export_sub_task = pydra.mark.task( self.export_sub( - subject_id, + f"sub-{subject_id}", viz, afqbrowser, xforms, indiv ), - cache_dir=self.cache_dir + cache_dir=self.parallel_params["cache_dir"] ) - # Replace with Ray with pydra.Submitter( - plugin=self.plugin, sbatch_args=self.sbatch_args + plugin=self.parallel_params["plugin"], + sbatch_args=self.parallel_params["sbatch_args"] ) as sub: try: sub(runnable=export_sub_task) @@ -1006,6 +989,8 @@ def export_sub(self, subject_id, viz=True, afqbrowser=True, xforms=True, Parameters ---------- + subject_id : String + BIDS subject to process viz : bool Whether to output visualizations. This includes tract profile plots, a figure containing all bundles, and, if using the AFQ @@ -1026,9 +1011,7 @@ def export_sub(self, subject_id, viz=True, afqbrowser=True, xforms=True, output. Default: True """ - import shutil - import pandas as pd - + # Build temporary directory names bids_path = self.bids_path output_dir = self.output_dir @@ -1038,7 +1021,7 @@ def export_sub(self, subject_id, viz=True, afqbrowser=True, xforms=True, self.bids_path = op.join( bids_location, - bids_name + f"_{subject_id}" + "tmp_" + bids_name + f"_{subject_id}" ) self.output_dir = op.join( @@ -1047,6 +1030,7 @@ def export_sub(self, subject_id, viz=True, afqbrowser=True, xforms=True, output_dir_name ) + # Copy data to temporary directory if op.exists(self.bids_path): shutil.rmtree(self.bids_path) @@ -1098,8 +1082,10 @@ def export_sub(self, subject_id, viz=True, afqbrowser=True, xforms=True, ) ) - super(self.export_all(viz, afqbrowser, xforms, indiv)) + # Process + super().export_all(viz, afqbrowser, xforms, indiv) + # Copy results back to BIDS directory if not op.exists(op.join(output_dir, "dataset_description.json")): shutil.copyfile( op.join(self.output_dir, "dataset_description.json"), From 9761722ee39589b139cb4b1b40dbd79d0d8c4792 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Fri, 12 Apr 2024 18:22:51 -0700 Subject: [PATCH 03/17] Changes to in-place method --- AFQ/api/group.py | 209 ++++++++++------------------------------------- 1 file changed, 45 insertions(+), 164 deletions(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 341d29f83..83de04ea6 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -40,8 +40,6 @@ except ImportError: using_afqb = False -import shutil - __all__ = ["GroupAFQ", "get_afq_bids_entities_fname"] @@ -149,9 +147,7 @@ def __init__(self, raise TypeError("bids_layout_kwargs must be a dict") self.logger = logger - self.bids_path = bids_path - self.output_dir = output_dir - self.preproc_pipeline = preproc_pipeline + self.parallel_params = parallel_params self.wf_dict = {} @@ -328,6 +324,12 @@ def combine_profiles(self): self.afq_path, "tract_profiles.csv")) os.makedirs(op.dirname(out_file), exist_ok=True) _df = clean_pandas_df(_df) + + if "submitter_params" in self.parallel_params: + if op.exists(out_file): + df_prev = pd.read_csv(out_file) + _df = _df.append(df_prev) + _df.to_csv(out_file, index=False) return _df @@ -921,76 +923,26 @@ def assemble_AFQ_browser(self, output_path=None, metadata=None, class ParallelGroupAFQ(GroupAFQ): - def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): - """ Exports all the possible outputs + def __init__(self, *args, **kwargs): + orig = GroupAFQ(*args, **kwargs) - Parameters - ---------- - viz : bool - Whether to output visualizations. This includes tract profile - plots, a figure containing all bundles, and, if using the AFQ - segmentation algorithm, individual bundle figures. - Default: True - afqbrowser : bool - Whether to output an AFQ-Browser from this AFQ instance. - Default: True - xforms : bool - Whether to output the reg_template image in subject space and, - depending on if it is possible based on the mapping used, to - output the b0 in template space. - Default: True - indiv : bool - Whether to output individual bundles in their own files, in - addition to the one file containing all bundles. If using - the AFQ segmentation algorithm, individual ROIs are also - output. - Default: True - """ - import pydra + if "submitter_params" not in orig.parallel_params: + orig.parallel_params["submitter_params"] = {"plugin": "cf"} - if "plugin" not in self.parallel_params: - self.parallel_params["plugin"] = "cf" - - # Assume slurm if sbatch_args are specified - if "sbatch_args" in self.parallel_params: - if self.parallel_params["sbatch_args"] is not None: - self.parallel_params["plugin"] = "slurm" - - if "sbatch_args" not in self.parallel_params: - self.parallel_params["sbatch_args"] = None - - if "cache_dir" not in self.parallel_params: - self.parallel_params["cache_dir"] = None - - for subject_id in self.subjects: - export_sub_task = pydra.mark.task( - self.export_sub( - f"sub-{subject_id}", - viz, - afqbrowser, - xforms, - indiv - ), - cache_dir=self.parallel_params["cache_dir"] - ) - - with pydra.Submitter( - plugin=self.parallel_params["plugin"], - sbatch_args=self.parallel_params["sbatch_args"] - ) as sub: - try: - sub(runnable=export_sub_task) - except: - self.logger.info(f"Error submitting {subject_id}") - - def export_sub(self, subject_id, viz=True, afqbrowser=True, xforms=True, - indiv=True): + if "cache_dir" not in orig.parallel_params: + orig.parallel_params["cache_dir"] = None + + self.parallel_params = orig.parallel_params + self.subjects = orig.subjects + + self.args = args + self.kwargs = kwargs + + def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): """ Exports all the possible outputs Parameters ---------- - subject_id : String - BIDS subject to process viz : bool Whether to output visualizations. This includes tract profile plots, a figure containing all bundles, and, if using the AFQ @@ -1011,109 +963,38 @@ def export_sub(self, subject_id, viz=True, afqbrowser=True, xforms=True, output. Default: True """ - # Build temporary directory names - bids_path = self.bids_path - output_dir = self.output_dir - - bids_location = op.dirname(op.abspath(bids_path)) - bids_name = op.basename(op.abspath(bids_path)) - output_dir_name = op.basename(op.abspath(output_dir)) - - self.bids_path = op.join( - bids_location, - "tmp_" + bids_name + f"_{subject_id}" - ) - - self.output_dir = op.join( - self.bids_path, - "derivatives", - output_dir_name - ) - - # Copy data to temporary directory - if op.exists(self.bids_path): - shutil.rmtree(self.bids_path) - - os.makedirs( - op.join( - self.bids_path, - "derivatives", - self.preproc_pipeline - ) - ) - - shutil.copyfile( - op.join(bids_path, "dataset_description.json"), - op.join(self.bids_path, "dataset_description.json") - ) + import pydra - shutil.copytree( - op.join(bids_path, subject_id), - op.join(self.bids_path, subject_id) - ) + @pydra.mark.task + def export_sub(obj, subject_id, viz, afqbrowser, xforms, indiv): + afq = GroupAFQ(*obj.args, **obj.kwargs) - shutil.copyfile( - op.join( - bids_path, - "derivatives", - self.preproc_pipeline, - "dataset_description.json" - ), - op.join( - self.bids_path, - "derivatives", - self.preproc_pipeline, - "dataset_description.json" - ) - ) + sub_arr = np.array(afq.valid_sub_list) + ses_arr = np.array(afq.valid_ses_list) - shutil.copytree( - op.join( - bids_path, - "derivatives", - self.preproc_pipeline, - subject_id - ), - op.join( - self.bids_path, - "derivatives", - self.preproc_pipeline, - subject_id - ) - ) + afq.valid_sub_list = list(sub_arr[sub_arr == subject_id]) + afq.valid_ses_list = list(ses_arr[sub_arr == subject_id]) - # Process - super().export_all(viz, afqbrowser, xforms, indiv) + afq.parallel_params = obj.parallel_params - # Copy results back to BIDS directory - if not op.exists(op.join(output_dir, "dataset_description.json")): - shutil.copyfile( - op.join(self.output_dir, "dataset_description.json"), - op.join(output_dir, "dataset_description.json") - ) + afq.export_all(viz, afqbrowser, xforms, indiv) - if not op.exists(op.join(output_dir, "tract_profiles.csv")): - shutil.copyfile(op.join(self.output_dir, "tract_profiles.csv"), - op.join(output_dir, "tract_profiles.csv") - ) - else: - df = pd.read_csv( - op.join(output_dir, "tract_profiles.csv"), index_col=0 - ) - df_sub = pd.read_csv( - op.join(self.output_dir, "tract_profiles.csv") - ) - df = df.append(df_sub, ignore_index=True) - df.to_csv(op.join(output_dir, "tract_profiles.csv")) - - shutil.copytree( - op.join(self.output_dir, subject_id), - op.join(output_dir, subject_id) - ) + return True - shutil.rmtree(self.bids_path) + export_sub_task = export_sub( + obj=self, + subject_id=self.subjects, + viz=viz, + afqbrowser=afqbrowser, + xforms=xforms, + indiv=indiv, + cache_dir=self.parallel_params["cache_dir"] + ).split("subject_id") - self.logger.info(f"{subject_id} finished") + with pydra.Submitter( + **self.parallel_params["submitter_params"], + ) as sub: + sub(runnable=export_sub_task) def download_and_combine_afq_profiles(bucket, From f6794f54601b6da2eed72130c8adf1f4f0cf1450 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Fri, 12 Apr 2024 19:28:37 -0700 Subject: [PATCH 04/17] Adds comments --- AFQ/api/group.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 83de04ea6..d85923357 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -325,6 +325,7 @@ def combine_profiles(self): os.makedirs(op.dirname(out_file), exist_ok=True) _df = clean_pandas_df(_df) + # Append to existing csv if this is a parallelized run if "submitter_params" in self.parallel_params: if op.exists(out_file): df_prev = pd.read_csv(out_file) @@ -924,20 +925,24 @@ def assemble_AFQ_browser(self, output_path=None, metadata=None, class ParallelGroupAFQ(GroupAFQ): def __init__(self, *args, **kwargs): + # Initialize throwaway to populate subjects and parallel_params orig = GroupAFQ(*args, **kwargs) + # Implement defaults if "submitter_params" not in orig.parallel_params: orig.parallel_params["submitter_params"] = {"plugin": "cf"} if "cache_dir" not in orig.parallel_params: orig.parallel_params["cache_dir"] = None + # Pass through for convenience self.parallel_params = orig.parallel_params self.subjects = orig.subjects + # Pass through args to initialize new GroupAFQ instances later self.args = args self.kwargs = kwargs - + def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): """ Exports all the possible outputs @@ -967,20 +972,28 @@ def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): @pydra.mark.task def export_sub(obj, subject_id, viz, afqbrowser, xforms, indiv): + # Initialize new instance + # It seems this must be done inside the pydra task function + # or pydra starts breaking afq = GroupAFQ(*obj.args, **obj.kwargs) + # Trim valid_sub_list and valid_ses_list to just this subject sub_arr = np.array(afq.valid_sub_list) ses_arr = np.array(afq.valid_ses_list) afq.valid_sub_list = list(sub_arr[sub_arr == subject_id]) afq.valid_ses_list = list(ses_arr[sub_arr == subject_id]) + # Pass through augmented parallel_params + # This flags that this is a parallelized run in combine_profiles() afq.parallel_params = obj.parallel_params + # Export trimmed instance afq.export_all(viz, afqbrowser, xforms, indiv) return True + # Submit to pydra export_sub_task = export_sub( obj=self, subject_id=self.subjects, From d24edd6995071afb755daff9204edef5f95a3001 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Mon, 15 Apr 2024 16:59:28 -0700 Subject: [PATCH 05/17] ParticipantAFQ method --- AFQ/api/group.py | 67 ++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index d85923357..2edec54e7 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -324,13 +324,6 @@ def combine_profiles(self): self.afq_path, "tract_profiles.csv")) os.makedirs(op.dirname(out_file), exist_ok=True) _df = clean_pandas_df(_df) - - # Append to existing csv if this is a parallelized run - if "submitter_params" in self.parallel_params: - if op.exists(out_file): - df_prev = pd.read_csv(out_file) - _df = _df.append(df_prev) - _df.to_csv(out_file, index=False) return _df @@ -925,23 +918,39 @@ def assemble_AFQ_browser(self, output_path=None, metadata=None, class ParallelGroupAFQ(GroupAFQ): def __init__(self, *args, **kwargs): - # Initialize throwaway to populate subjects and parallel_params orig = GroupAFQ(*args, **kwargs) - # Implement defaults if "submitter_params" not in orig.parallel_params: orig.parallel_params["submitter_params"] = {"plugin": "cf"} if "cache_dir" not in orig.parallel_params: orig.parallel_params["cache_dir"] = None - # Pass through for convenience self.parallel_params = orig.parallel_params - self.subjects = orig.subjects + self.pAFQ_kwargs = [pAFQ.kwargs for pAFQ in orig.pAFQ_list] + + # Rename kwargs and clear "bids_info" and "base_fname" + # ParticipantAFQ takes in these parameters under one name but stores + # and uses them under another + for ii in range(len(self.pAFQ_kwargs)): + self.pAFQ_kwargs[ii]["dwi_data_file"] = \ + self.pAFQ_kwargs[ii]["dwi_path"] + + self.pAFQ_kwargs[ii]["bval_file"] = \ + self.pAFQ_kwargs[ii]["bval"] + + self.pAFQ_kwargs[ii]["bvec_file"] = \ + self.pAFQ_kwargs[ii]["bvec"] + + self.pAFQ_kwargs[ii]["output_dir"] = \ + self.pAFQ_kwargs[ii]["results_dir"] - # Pass through args to initialize new GroupAFQ instances later - self.args = args - self.kwargs = kwargs + del self.pAFQ_kwargs[ii]["dwi_path"] + del self.pAFQ_kwargs[ii]["bval"] + del self.pAFQ_kwargs[ii]["bvec"] + del self.pAFQ_kwargs[ii]["results_dir"] + del self.pAFQ_kwargs[ii]["bids_info"] + del self.pAFQ_kwargs[ii]["base_fname"] def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): """ Exports all the possible outputs @@ -971,38 +980,18 @@ def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): import pydra @pydra.mark.task - def export_sub(obj, subject_id, viz, afqbrowser, xforms, indiv): - # Initialize new instance - # It seems this must be done inside the pydra task function - # or pydra starts breaking - afq = GroupAFQ(*obj.args, **obj.kwargs) - - # Trim valid_sub_list and valid_ses_list to just this subject - sub_arr = np.array(afq.valid_sub_list) - ses_arr = np.array(afq.valid_ses_list) - - afq.valid_sub_list = list(sub_arr[sub_arr == subject_id]) - afq.valid_ses_list = list(ses_arr[sub_arr == subject_id]) - - # Pass through augmented parallel_params - # This flags that this is a parallelized run in combine_profiles() - afq.parallel_params = obj.parallel_params - - # Export trimmed instance - afq.export_all(viz, afqbrowser, xforms, indiv) - - return True + def export_sub(pAFQ_kwargs, viz, xforms, indiv): + pAFQ = ParticipantAFQ(**pAFQ_kwargs) + pAFQ.export_all(viz, xforms, indiv) # Submit to pydra export_sub_task = export_sub( - obj=self, - subject_id=self.subjects, + pAFQ_kwargs=self.pAFQ_kwargs, viz=viz, - afqbrowser=afqbrowser, xforms=xforms, indiv=indiv, cache_dir=self.parallel_params["cache_dir"] - ).split("subject_id") + ).split("pAFQ_kwargs") with pydra.Submitter( **self.parallel_params["submitter_params"], From 5fc2a13028025bf4341e16ded777c61a5d6e8489 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Thu, 18 Apr 2024 11:28:38 -0700 Subject: [PATCH 06/17] Updates ParticipantAFQ kwargs --- AFQ/api/group.py | 25 +------------------------ AFQ/api/participant.py | 38 ++++++++++++++++++++++++-------------- AFQ/tasks/data.py | 31 ++++++++++++++++--------------- AFQ/tasks/decorators.py | 8 ++++---- AFQ/tasks/mapping.py | 21 ++++++++++----------- AFQ/tasks/segmentation.py | 4 ++-- AFQ/tasks/tractography.py | 7 +++---- AFQ/tasks/viz.py | 8 ++++---- 8 files changed, 64 insertions(+), 78 deletions(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 2edec54e7..282bc04d4 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -916,7 +916,7 @@ def assemble_AFQ_browser(self, output_path=None, metadata=None, sublink=page_subtitle_link) -class ParallelGroupAFQ(GroupAFQ): +class ParallelGroupAFQ(): def __init__(self, *args, **kwargs): orig = GroupAFQ(*args, **kwargs) @@ -929,29 +929,6 @@ def __init__(self, *args, **kwargs): self.parallel_params = orig.parallel_params self.pAFQ_kwargs = [pAFQ.kwargs for pAFQ in orig.pAFQ_list] - # Rename kwargs and clear "bids_info" and "base_fname" - # ParticipantAFQ takes in these parameters under one name but stores - # and uses them under another - for ii in range(len(self.pAFQ_kwargs)): - self.pAFQ_kwargs[ii]["dwi_data_file"] = \ - self.pAFQ_kwargs[ii]["dwi_path"] - - self.pAFQ_kwargs[ii]["bval_file"] = \ - self.pAFQ_kwargs[ii]["bval"] - - self.pAFQ_kwargs[ii]["bvec_file"] = \ - self.pAFQ_kwargs[ii]["bvec"] - - self.pAFQ_kwargs[ii]["output_dir"] = \ - self.pAFQ_kwargs[ii]["results_dir"] - - del self.pAFQ_kwargs[ii]["dwi_path"] - del self.pAFQ_kwargs[ii]["bval"] - del self.pAFQ_kwargs[ii]["bvec"] - del self.pAFQ_kwargs[ii]["results_dir"] - del self.pAFQ_kwargs[ii]["bids_info"] - del self.pAFQ_kwargs[ii]["base_fname"] - def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): """ Exports all the possible outputs diff --git a/AFQ/api/participant.py b/AFQ/api/participant.py index 4c9fc78d1..9ab2df832 100644 --- a/AFQ/api/participant.py +++ b/AFQ/api/participant.py @@ -96,14 +96,13 @@ def __init__(self, "did you mean tracking_params ?")) self.logger = logging.getLogger('AFQ') - self.output_dir = output_dir + self.bids_info = _bids_info self.kwargs = dict( - dwi_path=dwi_data_file, - bval=bval_file, - bvec=bvec_file, - results_dir=output_dir, - bids_info=_bids_info, + dwi_data_file=dwi_data_file, + bval_file=bval_file, + bvec_file=bvec_file, + output_dir=output_dir, base_fname=get_base_fname(output_dir, dwi_data_file), **kwargs) self.make_workflow() @@ -113,16 +112,26 @@ def make_workflow(self): if "mapping_definition" in self.kwargs and isinstance( self.kwargs["mapping_definition"], SlrMap): plans = { # if using SLR map, do tractography first - "data": get_data_plan(self.kwargs), - "tractography": get_tractography_plan(self.kwargs), - "mapping": get_mapping_plan(self.kwargs, use_sls=True), + "data": get_data_plan(self.kwargs, self.bids_info), + "tractography": get_tractography_plan( + self.kwargs, + self.bids_info + ), + "mapping": get_mapping_plan( + self.kwargs, + self.bids_info, + use_sls=True + ), "segmentation": get_segmentation_plan(self.kwargs), "viz": get_viz_plan(self.kwargs)} else: plans = { # Otherwise, do mapping first - "data": get_data_plan(self.kwargs), - "mapping": get_mapping_plan(self.kwargs), - "tractography": get_tractography_plan(self.kwargs), + "data": get_data_plan(self.kwargs, self.bids_info), + "mapping": get_mapping_plan(self.kwargs, self.bids_info), + "tractography": get_tractography_plan( + self.kwargs, + self.bids_info + ), "segmentation": get_segmentation_plan(self.kwargs), "viz": get_viz_plan(self.kwargs)} @@ -130,6 +139,7 @@ def make_workflow(self): previous_data = {} for name, plan in plans.items(): previous_data[f"{name}_imap"] = plan( + bids_info=self.bids_info, **self.kwargs, **previous_data) @@ -280,7 +290,7 @@ def participant_montage(self, images_per_row=2): def _save_file(curr_img): save_path = op.abspath(op.join( - self.output_dir, + self.kwargs["output_dir"], "bundle_montage.png")) curr_img.save(save_path) all_fnames.append(save_path) @@ -374,7 +384,7 @@ def cmd_outputs(self, cmd="rm", dependent_on=None, exceptions=[], " to a filename and will be ignored.")) apply_cmd_to_afq_derivs( - self.output_dir, + self.kwargs["output_dir"], self.export("base_fname"), cmd=cmd, exception_file_names=exception_file_names, diff --git a/AFQ/tasks/data.py b/AFQ/tasks/data.py index 3afccb973..e3a3a0d3c 100644 --- a/AFQ/tasks/data.py +++ b/AFQ/tasks/data.py @@ -42,7 +42,7 @@ @pimms.calc("data", "gtab", "dwi", "dwi_affine") -def get_data_gtab(dwi_path, bval, bvec, min_bval=None, +def get_data_gtab(dwi_data_file, bval_file, bvec_file, min_bval=None, max_bval=None, filter_b=True, b0_threshold=50): """ DWI data as an ndarray for selected b values, @@ -67,8 +67,8 @@ def get_data_gtab(dwi_path, bval, bvec, min_bval=None, The value of b under which it is considered to be b0. Default: 50. """ - img = nib.load(dwi_path) - bvals, bvecs = read_bvals_bvecs(bval, bvec) + img = nib.load(dwi_data_file) + bvals, bvecs = read_bvals_bvecs(bval_file, bvec_file) data = img.get_fdata() if filter_b and (min_bval is not None): @@ -92,15 +92,15 @@ def get_data_gtab(dwi_path, bval, bvec, min_bval=None, @pimms.calc("b0") @as_file('_desc-b0_dwi.nii.gz') -def b0(dwi_path, gtab): +def b0(dwi_data_file, gtab): """ full path to a nifti file containing the mean b0 """ - data = nib.load(dwi_path) + data = nib.load(dwi_data_file) mean_b0 = np.mean(data.get_fdata()[..., gtab.b0s_mask], -1) mean_b0_img = nib.Nifti1Image(mean_b0, data.affine) meta = dict(b0_threshold=gtab.b0_threshold, - source=dwi_path) + source=dwi_data_file) return mean_b0_img, meta @@ -136,7 +136,8 @@ def dti_fit(dti_params, gtab): @as_file(suffix='_odfmodel-DTI_desc-diffmodel_dwi.nii.gz') @as_img def dti_params(brain_mask, data, gtab, - bval, bvec, b0_threshold=50, robust_tensor_fitting=False): + bval_file, bvec_file, b0_threshold=50, + robust_tensor_fitting=False): """ full path to a nifti file containing parameters for the DTI fit @@ -155,7 +156,7 @@ def dti_params(brain_mask, data, gtab, nib.load(brain_mask).get_fdata() if robust_tensor_fitting: bvals, _ = read_bvals_bvecs( - bval, bvec) + bval_file, bvec_file) sigma = noise_from_b0( data, gtab, bvals, mask=mask, b0_threshold=b0_threshold) @@ -1004,7 +1005,7 @@ def get_bundle_dict(segmentation_params, return bundle_dict, reg_template -def get_data_plan(kwargs): +def get_data_plan(kwargs, bids_info): if "scalars" in kwargs and not ( isinstance(kwargs["scalars"], list) and isinstance( kwargs["scalars"][0], (str, Definition))): @@ -1027,7 +1028,7 @@ def get_data_plan(kwargs): csd_params, get_bundle_dict]) if "scalars" not in kwargs: - bvals, _ = read_bvals_bvecs(kwargs["bval"], kwargs["bvec"]) + bvals, _ = read_bvals_bvecs(kwargs["bval_file"], kwargs["bvec_file"]) if len(dpg.unique_bvals_magnitude(bvals)) > 2: kwargs["scalars"] = [ "dki_fa", "dki_md", @@ -1050,12 +1051,12 @@ def get_data_plan(kwargs): if not isinstance(bm_def, Definition): raise TypeError( "brain_mask_definition must be a Definition") - if kwargs["bids_info"] is not None: + if bids_info is not None: bm_def.find_path( - kwargs["bids_info"]["bids_layout"], - kwargs["dwi_path"], - kwargs["bids_info"]["subject"], - kwargs["bids_info"]["session"]) + bids_info["bids_layout"], + kwargs["dwi_data_file"], + bids_info["subject"], + bids_info["session"]) data_tasks["brain_mask_res"] = pimms.calc("brain_mask")( as_file(( f'_desc-{str_to_desc(bm_def.get_name())}' diff --git a/AFQ/tasks/decorators.py b/AFQ/tasks/decorators.py index c1bb6a6f6..528c3b805 100644 --- a/AFQ/tasks/decorators.py +++ b/AFQ/tasks/decorators.py @@ -116,7 +116,7 @@ def as_file(suffix, include_track=False, include_seg=False): and only run if not already found """ def _as_file(func): - needed_args = ["base_fname", "results_dir"] + needed_args = ["base_fname", "output_dir"] if include_track: needed_args.append("tracking_params") if include_seg: @@ -125,11 +125,11 @@ def _as_file(func): @functools.wraps(func) @has_args(func, needed_args) def wrapper_as_file(*args, **kwargs): - og_arg_count, base_fname, results_dir, \ + og_arg_count, base_fname, output_dir, \ tracking_params, segmentation_params =\ extract_added_args( func, - ["base_fname", "results_dir", + ["base_fname", "output_dir", "tracking_params", "segmentation_params"], args, includes=[True, True, include_track, include_seg]) @@ -174,7 +174,7 @@ def wrapper_as_file(*args, **kwargs): # modify meta source to be relative if "source" in meta: - meta["source"] = op.relpath(meta["source"], results_dir) + meta["source"] = op.relpath(meta["source"], output_dir) meta_fname = get_fname( base_fname, f"{drop_extension(suffix)}.json", diff --git a/AFQ/tasks/mapping.py b/AFQ/tasks/mapping.py index c8807d8cd..cc0753a52 100644 --- a/AFQ/tasks/mapping.py +++ b/AFQ/tasks/mapping.py @@ -47,13 +47,13 @@ def template_xform(mapping, data_imap): @pimms.calc("rois") -def export_rois(base_fname, results_dir, data_imap, mapping): +def export_rois(base_fname, output_dir, data_imap, mapping): """ dictionary of full paths to Nifti1Image files of ROIs transformed to subject space """ bundle_dict = data_imap["bundle_dict"] - rois_dir = op.join(results_dir, 'ROIs') + rois_dir = op.join(output_dir, 'ROIs') os.makedirs(rois_dir, exist_ok=True) roi_files = {} base_roi_fname = op.join(rois_dir, op.split(base_fname)[1]) @@ -71,7 +71,7 @@ def export_rois(base_fname, results_dir, data_imap, mapping): @pimms.calc("mapping") -def mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, +def mapping(base_fname, dwi_data_file, reg_subject, data_imap, bids_info, mapping_definition=None): """ mapping from subject to template space. @@ -96,7 +96,7 @@ def mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, if bids_info is not None: mapping_definition.find_path( bids_info["bids_layout"], - dwi_path, + dwi_data_file, bids_info["subject"], bids_info["session"]) return mapping_definition.get_for_subses( @@ -105,7 +105,7 @@ def mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, @pimms.calc("mapping") -def sls_mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, +def sls_mapping(base_fname, dwi_data_file, reg_subject, data_imap, bids_info, tractography_imap, mapping_definition=None): """ mapping from subject to template space. @@ -130,7 +130,7 @@ def sls_mapping(base_fname, dwi_path, reg_subject, data_imap, bids_info, if bids_info is not None: mapping_definition.find_path( bids_info["bids_layout"], - dwi_path, + dwi_data_file, bids_info["subject"], bids_info["session"]) streamlines_file = tractography_imap["streamlines"] @@ -200,26 +200,25 @@ def get_reg_subject(data_imap, return img -def get_mapping_plan(kwargs, use_sls=False): +def get_mapping_plan(kwargs, bids_info, use_sls=False): mapping_tasks = with_name([ export_registered_b0, template_xform, export_rois, mapping, get_reg_subject]) - bids_info = kwargs.get("bids_info", None) # add custom scalars for scalar in kwargs["scalars"]: if isinstance(scalar, Definition): if bids_info is None: scalar.find_path( None, - kwargs["dwi_path"], + kwargs["dwi_data_file"], None, None) scalar_found = True else: scalar_found = scalar.find_path( bids_info["bids_layout"], - kwargs["dwi_path"], + kwargs["dwi_data_file"], bids_info["subject"], bids_info["session"], required=False) @@ -238,7 +237,7 @@ def get_mapping_plan(kwargs, use_sls=False): if isinstance(reg_ss, ImageDefinition): reg_ss.find_path( bids_info["bids_layout"], - kwargs["dwi_path"], + kwargs["dwi_data_file"], bids_info["subject"], bids_info["session"]) del kwargs["reg_subject_spec"] diff --git a/AFQ/tasks/segmentation.py b/AFQ/tasks/segmentation.py index 22122bdee..d20050f1e 100644 --- a/AFQ/tasks/segmentation.py +++ b/AFQ/tasks/segmentation.py @@ -109,7 +109,7 @@ def segment(data_imap, mapping_imap, @pimms.calc("indiv_bundles") -def export_bundles(base_fname, results_dir, +def export_bundles(base_fname, output_dir, bundles, tracking_params, segmentation_params): @@ -123,7 +123,7 @@ def export_bundles(base_fname, results_dir, else: extension = ".trk" - bundles_dir = op.join(results_dir, "bundles") + bundles_dir = op.join(output_dir, "bundles") os.makedirs(bundles_dir, exist_ok=True) seg_sft = aus.SegmentedSFT.fromfile(bundles) for bundle in seg_sft.bundle_names: diff --git a/AFQ/tasks/tractography.py b/AFQ/tasks/tractography.py index 2c7a04d68..a06868a32 100644 --- a/AFQ/tasks/tractography.py +++ b/AFQ/tasks/tractography.py @@ -243,7 +243,7 @@ def gpu_tractography(data_imap, tracking_params, seed, stop, sft, seed, stop) -def get_tractography_plan(kwargs): +def get_tractography_plan(kwargs, bids_info): if "tracking_params" in kwargs\ and not isinstance(kwargs["tracking_params"], dict): raise TypeError( @@ -312,19 +312,18 @@ def get_tractography_plan(kwargs): stop_mask = kwargs["tracking_params"]['stop_mask'] seed_mask = kwargs["tracking_params"]['seed_mask'] - bids_info = kwargs["bids_info"] if bids_info is not None: if isinstance(stop_mask, Definition): stop_mask.find_path( bids_info["bids_layout"], - kwargs["dwi_path"], + kwargs["dwi_data_file"], bids_info["subject"], bids_info["session"]) if isinstance(seed_mask, Definition): seed_mask.find_path( bids_info["bids_layout"], - kwargs["dwi_path"], + kwargs["dwi_data_file"], bids_info["subject"], bids_info["session"]) diff --git a/AFQ/tasks/viz.py b/AFQ/tasks/viz.py index 0aa25ef29..a4ed491ac 100644 --- a/AFQ/tasks/viz.py +++ b/AFQ/tasks/viz.py @@ -144,7 +144,7 @@ def viz_bundles(base_fname, @pimms.calc("indiv_bundles_figures") def viz_indivBundle(base_fname, - results_dir, + output_dir, viz_backend, data_imap, mapping_imap, @@ -273,7 +273,7 @@ def viz_indivBundle(base_fname, interact=False, figure=figure) - roi_dir = op.join(results_dir, 'viz_bundles') + roi_dir = op.join(output_dir, 'viz_bundles') os.makedirs(roi_dir, exist_ok=True) figures[bundle_name] = figure if "no_gif" not in viz_backend.backend: @@ -288,7 +288,7 @@ def viz_indivBundle(base_fname, fname = op.join(roi_dir, fname[1]) viz_backend.create_gif(figure, fname) if "plotly" in viz_backend.backend: - roi_dir = op.join(results_dir, 'viz_bundles') + roi_dir = op.join(output_dir, 'viz_bundles') os.makedirs(roi_dir, exist_ok=True) fname = op.split( get_fname( @@ -302,7 +302,7 @@ def viz_indivBundle(base_fname, figure.write_html(fname) # also do the core visualizations when using the plotly backend - core_dir = op.join(results_dir, 'viz_core_bundles') + core_dir = op.join(output_dir, 'viz_core_bundles') os.makedirs(core_dir, exist_ok=True) indiv_profile = profiles[ profiles.tractID == bundle_name][best_scalar].to_numpy() From 1e67bd7ff66fac94528065b56dfa926cc3223bc9 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Thu, 18 Apr 2024 11:33:49 -0700 Subject: [PATCH 07/17] Updates pydra import --- AFQ/api/group.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 282bc04d4..9831b4c9b 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -40,6 +40,9 @@ except ImportError: using_afqb = False +from dipy.utils.optpkg import optional_package +pydra, has_pydra, _ = optional_package('pydra') + __all__ = ["GroupAFQ", "get_afq_bids_entities_fname"] @@ -954,8 +957,6 @@ def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): output. Default: True """ - import pydra - @pydra.mark.task def export_sub(pAFQ_kwargs, viz, xforms, indiv): pAFQ = ParticipantAFQ(**pAFQ_kwargs) From 4ba73074c9814ad7e6a5c41e1acb39c4f1133589 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Thu, 18 Apr 2024 14:08:20 -0700 Subject: [PATCH 08/17] Updates attribute name --- AFQ/tests/test_api.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/AFQ/tests/test_api.py b/AFQ/tests/test_api.py index ec230625c..7f5a05e96 100644 --- a/AFQ/tests/test_api.py +++ b/AFQ/tests/test_api.py @@ -420,7 +420,7 @@ def test_AFQ_anisotropic(): # check that the apm map was made myafq.export("mapping") assert op.exists(op.join( - myafq.export("results_dir")["01"], + myafq.export("output_dir")["01"], 'sub-01_ses-01_odfmodel-CSD_desc-APM_dwi.nii.gz')) @@ -838,18 +838,18 @@ def test_AFQ_data_waypoint(): np.linalg.inv(dwi_affine))) mapping_file = op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), 'sub-01_ses-01_desc-mapping_from-DWI_to-MNI_xform.nii.gz') nib.save(mapping, mapping_file) reg_prealign_file = op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), 'sub-01_ses-01_desc-prealign_from-DWI_to-MNI_xform.npy') np.save(reg_prealign_file, np.eye(4)) # Test ROI exporting: myafq.export("rois") assert op.exists(op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), 'ROIs', 'sub-01_ses-01_space-subject_desc-RightCorticospinalinclude1_mask.json')) # noqa @@ -861,7 +861,7 @@ def test_AFQ_data_waypoint(): # Test bundles exporting: myafq.export("indiv_bundles") assert op.exists(op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), 'bundles', 'sub-01_ses-01_coordsys-RASMM_trkmethod-probCSD_recogmethod-AFQ_desc-LeftCorticospinal_tractography.trk')) # noqa @@ -874,26 +874,26 @@ def test_AFQ_data_waypoint(): myafq.export("indiv_bundles_figures") assert op.exists(op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), "viz_bundles", 'sub-01_ses-01_coordsys-RASMM_trkmethod-probCSD_recogmethod-AFQ_desc-LeftSuperiorLongitudinalviz_dwi.html')) # noqa assert op.exists(op.join( - myafq.export("results_dir"), + myafq.export("output_dir"), "viz_bundles", 'sub-01_ses-01_coordsys-RASMM_trkmethod-probCSD_recogmethod-AFQ_desc-LeftSuperiorLongitudinalviz_dwi.html')) # noqa # Before we run the CLI, we'll remove the bundles and ROI folders, to see # that the CLI generates them - shutil.rmtree(op.join(myafq.export("results_dir"), + shutil.rmtree(op.join(myafq.export("output_dir"), 'bundles')) - shutil.rmtree(op.join(myafq.export("results_dir"), + shutil.rmtree(op.join(myafq.export("output_dir"), 'ROIs')) os.remove(tract_profile_fname) # save memory - results_dir = myafq.export("results_dir") + output_dir = myafq.export("output_dir") del myafq gc.collect() @@ -957,11 +957,11 @@ def test_AFQ_data_waypoint(): # Make sure the CLI did indeed generate these: assert op.exists(op.join( - results_dir, + output_dir, 'ROIs', 'sub-01_ses-01_space-subject_desc-RightSuperiorLongitudinalinclude1_mask.json')) # noqa assert op.exists(op.join( - results_dir, + output_dir, 'bundles', 'sub-01_ses-01_coordsys-RASMM_trkmethod-probCSD_recogmethod-AFQ_desc-RightSuperiorLongitudinal_tractography.trk')) # noqa From 1a23004e9c9b924725076f445dfaab6aed2b2d8c Mon Sep 17 00:00:00 2001 From: 36000 Date: Mon, 22 Apr 2024 13:46:22 -0700 Subject: [PATCH 09/17] add export function, make pydra required, catch error --- AFQ/api/group.py | 56 ++++++++++++++++++++++++++++++++++++++++++------ setup.cfg | 1 + 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 9831b4c9b..d9884bd92 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -22,6 +22,7 @@ from AFQ.version import version as pyafq_version from AFQ.viz.utils import trim import pandas as pd +import pydra import numpy as np import os import os.path as op @@ -41,7 +42,6 @@ using_afqb = False from dipy.utils.optpkg import optional_package -pydra, has_pydra, _ = optional_package('pydra') __all__ = ["GroupAFQ", "get_afq_bids_entities_fname"] @@ -932,6 +932,54 @@ def __init__(self, *args, **kwargs): self.parallel_params = orig.parallel_params self.pAFQ_kwargs = [pAFQ.kwargs for pAFQ in orig.pAFQ_list] + def _submit_pydra(self, runnable): + try: + with pydra.Submitter( + **self.parallel_params["submitter_params"], + ) as sub: + sub(runnable=runnable) + except AttributeError as e: + if "'NoneType' object has no attribute 'replace'" not in str(e): + raise + + def export(self, attr_name="help", collapse=True): + f""" + Export a specific output. To print a list of available outputs, + call export without arguments. + {valid_exports_string} + + Parameters + ---------- + attr_name : str + Name of the output to export. Default: "help" + collapse : bool + Whether to collapse session dimension if there is only 1 session. + Default: True + + Returns + ------- + output : dict + The specific output as a dictionary. Keys are subjects. + Values are dictionaries with keys of sessions + if multiple sessions are used. Otherwise, values are + the output. + None if called without arguments. + """ + + @pydra.mark.task + def export_sub(pAFQ_kwargs, attr_name, collapse): + pAFQ = ParticipantAFQ(**pAFQ_kwargs) + pAFQ.export(attr_name, collapse) + + # Submit to pydra + export_sub_task = export_sub( + pAFQ_kwargs=self.pAFQ_kwargs, + attr_name=attr_name, + collapse=collapse, + cache_dir=self.parallel_params["cache_dir"] + ).split("pAFQ_kwargs") + self._submit_pydra(export_sub_task) + def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): """ Exports all the possible outputs @@ -970,11 +1018,7 @@ def export_sub(pAFQ_kwargs, viz, xforms, indiv): indiv=indiv, cache_dir=self.parallel_params["cache_dir"] ).split("pAFQ_kwargs") - - with pydra.Submitter( - **self.parallel_params["submitter_params"], - ) as sub: - sub(runnable=export_sub_task) + self._submit_pydra(export_sub_task) def download_and_combine_afq_profiles(bucket, diff --git a/setup.cfg b/setup.cfg index 94188e416..3aea1e1e4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,7 @@ install_requires = pybids>=0.16.2 templateflow>=0.8 pimms + pydra joblib>=1.3.2 dask>=1.1 s3bids>=0.1.7 From 16e169bb3925ca7efe94978bd37fcfbacba7b9db Mon Sep 17 00:00:00 2001 From: 36000 Date: Mon, 22 Apr 2024 15:10:18 -0700 Subject: [PATCH 10/17] putting together a pydra test --- AFQ/api/group.py | 35 +++++++++++++++++++++++++++-------- AFQ/tests/test_api.py | 9 ++++++++- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index d9884bd92..81970e8b3 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -63,6 +63,18 @@ def get_afq_bids_entities_fname(): aus.__file__))) + "/afq_bids_entities.json" +class _ParticipantAFQInputs: + def __init__( + self, dwi_data_file, bval_file, bvec_file, results_dir, + _bids_info, kwargs): + self.dwi_data_file = dwi_data_file + self.bval_file = bval_file + self.bvec_file = bvec_file + self.results_dir = results_dir + self._bids_info = _bids_info + self.kwargs = kwargs + + class GroupAFQ(object): f"""{AFQclass_doc}""" @@ -255,6 +267,7 @@ def __init__(self, self.valid_sub_list = [] self.valid_ses_list = [] self.pAFQ_list = [] + self.pAFQ_inputs_list = [] for subject in self.subjects: self.wf_dict[subject] = {} for session in self.sessions: @@ -302,17 +315,25 @@ def __init__(self, self.valid_sub_list.append(subject) self.valid_ses_list.append(str(session)) - this_pAFQ = ParticipantAFQ( + this_pAFQ_inputs = _ParticipantAFQInputs( dwi_data_file, bval_file, bvec_file, results_dir, - _bids_info={ + { "bids_layout": bids_layout, "subject": subject, "session": session}, - **this_kwargs) + this_kwargs) + this_pAFQ = ParticipantAFQ( + this_pAFQ_inputs.dwi_data_file, + this_pAFQ_inputs.bval_file, + this_pAFQ_inputs.bvec_file, + this_pAFQ_inputs.results_dir, + this_pAFQ_inputs._bids_info, + **this_pAFQ_inputs.kwargs) self.wf_dict[subject][str(session)] = this_pAFQ.wf_dict self.pAFQ_list.append(this_pAFQ) + self.pAFQ_inputs_list.append(this_pAFQ_inputs) def combine_profiles(self): tract_profiles_dict = self.export("profiles") @@ -930,7 +951,7 @@ def __init__(self, *args, **kwargs): orig.parallel_params["cache_dir"] = None self.parallel_params = orig.parallel_params - self.pAFQ_kwargs = [pAFQ.kwargs for pAFQ in orig.pAFQ_list] + self.pAFQ_kwargs = orig.pAFQ_inputs_list def _submit_pydra(self, runnable): try: @@ -973,11 +994,10 @@ def export_sub(pAFQ_kwargs, attr_name, collapse): # Submit to pydra export_sub_task = export_sub( - pAFQ_kwargs=self.pAFQ_kwargs, attr_name=attr_name, collapse=collapse, cache_dir=self.parallel_params["cache_dir"] - ).split("pAFQ_kwargs") + ).split("pAFQ_kwargs", pAFQ_kwargs=self.pAFQ_kwargs) self._submit_pydra(export_sub_task) def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): @@ -1012,12 +1032,11 @@ def export_sub(pAFQ_kwargs, viz, xforms, indiv): # Submit to pydra export_sub_task = export_sub( - pAFQ_kwargs=self.pAFQ_kwargs, viz=viz, xforms=xforms, indiv=indiv, cache_dir=self.parallel_params["cache_dir"] - ).split("pAFQ_kwargs") + ).split("pAFQ_kwargs", pAFQ_kwargs=self.pAFQ_kwargs) self._submit_pydra(export_sub_task) diff --git a/AFQ/tests/test_api.py b/AFQ/tests/test_api.py index 7f5a05e96..0157b5da4 100644 --- a/AFQ/tests/test_api.py +++ b/AFQ/tests/test_api.py @@ -5,6 +5,7 @@ import subprocess import gc import random +import concurrent.futures import toml @@ -25,7 +26,7 @@ from dipy.io.streamline import load_tractogram import AFQ.api.bundle_dict as abd -from AFQ.api.group import GroupAFQ +from AFQ.api.group import GroupAFQ, ParallelGroupAFQ from AFQ.api.participant import ParticipantAFQ import AFQ.data.fetch as afd import AFQ.utils.streamlines as aus @@ -597,6 +598,12 @@ def test_AFQ_reco80(): npt.assert_(len(seg_sft.get_bundle('CCMid').streamlines) > 0) +def test_AFQ_pydra(): + _, bids_path = afd.fetch_hbn_preproc(["NDARAA948VFH", "NDARAV554TP2"]) + pga = ParallelGroupAFQ(bids_path, preproc_pipeline="qsiprep") + pga.export_all() + + @pytest.mark.nightly_pft def test_AFQ_pft(): """ From 42e4b49afa0a59ef2da63217bb881d99e8f7b9e0 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Thu, 2 May 2024 11:53:21 -0700 Subject: [PATCH 11/17] Adds final GroupAFQ step --- AFQ/api/group.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 9831b4c9b..f446e4deb 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -32,6 +32,7 @@ import nibabel as nib from PIL import Image from s3bids.utils import S3BIDSStudy +import glob from bids.layout import BIDSLayout, BIDSLayoutIndexer try: @@ -932,6 +933,12 @@ def __init__(self, *args, **kwargs): self.parallel_params = orig.parallel_params self.pAFQ_kwargs = [pAFQ.kwargs for pAFQ in orig.pAFQ_list] + self.finishing_params = dict() + self.finishing_params["args"] = args + self.finishing_params["kwargs"] = kwargs + self.finishing_params["output_dirs"] = \ + [pAFQ.kwargs["output_dir"] for pAFQ in orig.pAFQ_list] + def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): """ Exports all the possible outputs @@ -958,14 +965,30 @@ def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): Default: True """ @pydra.mark.task - def export_sub(pAFQ_kwargs, viz, xforms, indiv): + def export_sub( + pAFQ_kwargs, + finishing_params, + viz, afqbrowser, + xforms, + indiv + ): pAFQ = ParticipantAFQ(**pAFQ_kwargs) pAFQ.export_all(viz, xforms, indiv) + for dir in finishing_params["output_dirs"]: + if not glob.glob(op.join(dir, "*_desc-profiles_dwi.csv")): + return + + gAFQ = GroupAFQ(*finishing_params["args"], + **finishing_params["kwargs"]) + gAFQ.export_all(viz, afqbrowser, xforms, indiv) + # Submit to pydra export_sub_task = export_sub( pAFQ_kwargs=self.pAFQ_kwargs, + finishing_params=self.finishing_params, viz=viz, + afqbrowser=afqbrowser, xforms=xforms, indiv=indiv, cache_dir=self.parallel_params["cache_dir"] From a1ab9ec75ad49214b8f0d994b59d6604980bce35 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Thu, 2 May 2024 12:26:22 -0700 Subject: [PATCH 12/17] Corrects accidental overwrites --- AFQ/api/group.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 81970e8b3..f8d3ecc22 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -33,6 +33,7 @@ import nibabel as nib from PIL import Image from s3bids.utils import S3BIDSStudy +import glob from bids.layout import BIDSLayout, BIDSLayoutIndexer try: @@ -953,6 +954,12 @@ def __init__(self, *args, **kwargs): self.parallel_params = orig.parallel_params self.pAFQ_kwargs = orig.pAFQ_inputs_list + self.finishing_params = dict() + self.finishing_params["args"] = args + self.finishing_params["kwargs"] = kwargs + self.finishing_params["output_dirs"] = [pAFQ.kwargs["output_dir"] + for pAFQ in orig.pAFQ_list] + def _submit_pydra(self, runnable): try: with pydra.Submitter( @@ -1026,13 +1033,30 @@ def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True): Default: True """ @pydra.mark.task - def export_sub(pAFQ_kwargs, viz, xforms, indiv): + def export_sub( + pAFQ_kwargs, + finishing_params, + viz, + afqbrowser, + xforms, + indiv + ): pAFQ = ParticipantAFQ(**pAFQ_kwargs) pAFQ.export_all(viz, xforms, indiv) + for dir in finishing_params["output_dirs"]: + if not glob.glob(op.join(dir, "*_desc-profiles_dwi.csv")): + return + + gAFQ = GroupAFQ(*finishing_params["args"], + **finishing_params["kwargs"]) + gAFQ.export_all(viz, afqbrowser, xforms, indiv) + # Submit to pydra export_sub_task = export_sub( + finishing_params=self.finishing_params, viz=viz, + afqbrowser=afqbrowser, xforms=xforms, indiv=indiv, cache_dir=self.parallel_params["cache_dir"] From 0157e6a2c758074607e20085c7de052562129567 Mon Sep 17 00:00:00 2001 From: 36000 Date: Mon, 6 May 2024 12:08:29 -0700 Subject: [PATCH 13/17] factor out bids from tasks, have groupAFQ handle bidslayout upfront --- AFQ/api/bundle_dict.py | 35 +++++--------- AFQ/api/group.py | 96 +++++++++++++++++++++++++++++++++----- AFQ/api/participant.py | 25 +++------- AFQ/definitions/image.py | 21 ++++----- AFQ/definitions/mapping.py | 11 ++--- AFQ/tasks/data.py | 16 +------ AFQ/tasks/mapping.py | 55 +++++----------------- AFQ/tasks/tractography.py | 52 ++------------------- AFQ/tests/test_api.py | 2 +- 9 files changed, 134 insertions(+), 179 deletions(-) diff --git a/AFQ/api/bundle_dict.py b/AFQ/api/bundle_dict.py index dcd2185ca..3195c9b35 100644 --- a/AFQ/api/bundle_dict.py +++ b/AFQ/api/bundle_dict.py @@ -775,7 +775,6 @@ def __init__(self, self.resample_to = resample_to self.resample_subject_to = resample_subject_to self.keep_in_memory = keep_in_memory - self.has_bids_info = False self.max_includes = 3 self._dict = {} @@ -814,33 +813,23 @@ def update_max_includes(self, new_max): if new_max > self.max_includes: self.max_includes = new_max - def set_bids_info(self, bids_layout, bids_path, subject, session): - """ - Provide the bids_layout, a nearest path, - and the subject and session information - to load ROIS from BIDS - """ - self.has_bids_info = True - self._bids_info = bids_layout - self._bids_path = bids_path - self._subject = subject - self._session = session - - def _cond_load(self, roi_or_sl, resample_to): - """ - Load ROI or streamline if not already loaded - """ + def _use_bids_info(self, roi_or_sl, bids_layout, bids_path, + subject, session): if isinstance(roi_or_sl, dict): - if not self.has_bids_info: - raise ValueError(( - "Attempted to load an ROI using BIDS description without " - "First providing BIDS information.")) suffix = roi_or_sl.get("suffix", "dwi") roi_or_sl = find_file( - self._bids_info, self._bids_path, + bids_layout, bids_path, roi_or_sl, suffix, - self._session, self._subject) + session, subject) + return roi_or_sl + else: + return roi_or_sl + + def _cond_load(self, roi_or_sl, resample_to): + """ + Load ROI or streamline if not already loaded + """ if isinstance(roi_or_sl, str): if self.seg_algo == "afq": return afd.read_resample_roi( diff --git a/AFQ/api/group.py b/AFQ/api/group.py index d3322b317..621fc3774 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -3,6 +3,8 @@ import tempfile from AFQ.definitions.mapping import SynMap +from AFQ.definitions.utils import Definition +import AFQ.api.bundle_dict as abd warnings.simplefilter(action='ignore', category=FutureWarning) # noqa import logging @@ -65,12 +67,11 @@ def get_afq_bids_entities_fname(): class _ParticipantAFQInputs: def __init__( self, dwi_data_file, bval_file, bvec_file, results_dir, - _bids_info, kwargs): + kwargs): self.dwi_data_file = dwi_data_file self.bval_file = bval_file self.bvec_file = bvec_file self.results_dir = results_dir - self._bids_info = _bids_info self.kwargs = kwargs @@ -311,6 +312,73 @@ def __init__(self, if suffix is not None: bids_filters["suffix"] = suffix + # Call find path for all definitions + for key, value in this_kwargs.items(): + if key == "scalars": + for scalar in this_kwargs["scalars"]: + if isinstance(scalar, Definition): + scalar_found = scalar.find_path( + bids_layout, + dwi_data_file, + subject, + session, + required=False) + if scalar_found is False: + this_kwargs["scalars"].remove(scalar) + elif key == "import_tract": + if isinstance(this_kwargs["import_tract"], dict): + it_res = \ + bids_layout.get( + subject=subject, + session=session, + extension=[ + '.trk', + '.tck', + '.vtk', + '.fib', + '.dpy'], + return_type='filename', + **this_kwargs["import_tract"]) + if len(it_res) < 1: + raise ValueError(( + "No custom tractography found for" + f" subject {subject}" + " and session " + f"{session}.")) + elif len(it_res) > 1: + this_kwargs["import_tract"] = it_res[0] + logger.warning(( + f"Multiple viable custom tractographies found for" + f" subject " + f"{subject} and session " + f"{session}. Will use: {it_res[0]}")) + else: + this_kwargs["import_tract"] = it_res[0] + elif isinstance(value, dict): + for _, subvalue in value.items(): + if isinstance(subvalue, Definition): + subvalue.find_path( + bids_layout, + dwi_data_file, + subject, + session) + elif isinstance(value, Definition): + value.find_path( + bids_layout, + dwi_data_file, + subject, + session) + + # call find path for all ROIs + if "bundle_info" in this_kwargs and isinstance( + this_kwargs["bundle_info"], abd.BundleDict): + for b_name in this_kwargs["bundle_info"].bundle_names: + this_kwargs["bundle_info"].apply_to_rois( + b_name, + this_kwargs["bundle_info"]._use_bids_info, + bids_layout, bids_path, subject, session, + dry_run=False) + self.valid_sub_list.append(subject) self.valid_ses_list.append(str(session)) @@ -318,17 +386,12 @@ def __init__(self, dwi_data_file, bval_file, bvec_file, results_dir, - { - "bids_layout": bids_layout, - "subject": subject, - "session": session}, this_kwargs) this_pAFQ = ParticipantAFQ( this_pAFQ_inputs.dwi_data_file, this_pAFQ_inputs.bval_file, this_pAFQ_inputs.bvec_file, this_pAFQ_inputs.results_dir, - this_pAFQ_inputs._bids_info, **this_pAFQ_inputs.kwargs) self.wf_dict[subject][str(session)] = this_pAFQ.wf_dict self.pAFQ_list.append(this_pAFQ) @@ -993,14 +1056,18 @@ def export(self, attr_name="help", collapse=True): """ @pydra.mark.task - def export_sub(pAFQ_kwargs, attr_name, collapse): - pAFQ = ParticipantAFQ(**pAFQ_kwargs) - pAFQ.export(attr_name, collapse) + def export_sub(pAFQ_kwargs, attr_name): + pAFQ = ParticipantAFQ( + pAFQ_kwargs.dwi_data_file, + pAFQ_kwargs.bval_file, + pAFQ_kwargs.bvec_file, + pAFQ_kwargs.results_dir, + **pAFQ_kwargs.kwargs) + pAFQ.export(attr_name) # Submit to pydra export_sub_task = export_sub( attr_name=attr_name, - collapse=collapse, cache_dir=self.parallel_params["cache_dir"] ).split("pAFQ_kwargs", pAFQ_kwargs=self.pAFQ_kwargs) self._submit_pydra(export_sub_task) @@ -1039,7 +1106,12 @@ def export_sub( xforms, indiv ): - pAFQ = ParticipantAFQ(**pAFQ_kwargs) + pAFQ = ParticipantAFQ( + pAFQ_kwargs.dwi_data_file, + pAFQ_kwargs.bval_file, + pAFQ_kwargs.bvec_file, + pAFQ_kwargs.results_dir, + **pAFQ_kwargs.kwargs) pAFQ.export_all(viz, xforms, indiv) for dir in finishing_params["output_dirs"]: diff --git a/AFQ/api/participant.py b/AFQ/api/participant.py index 9ab2df832..da543a3a3 100644 --- a/AFQ/api/participant.py +++ b/AFQ/api/participant.py @@ -34,10 +34,9 @@ def __init__(self, dwi_data_file, bval_file, bvec_file, output_dir, - _bids_info=None, **kwargs): """ - Initialize a ParticipantAFQ object from a BIDS dataset. + Initialize a ParticipantAFQ object. Parameters ---------- @@ -49,10 +48,6 @@ def __init__(self, Path to bvec file. output_dir : str Path to output directory. - _bids_info : dict or None, optional - This should be left as None in most cases. It - is used by GroupAFQ to provide information about - the BIDS layout to each participant. kwargs : additional optional parameters You can set additional parameters for any step of the process. See :ref:`usage/kwargs` for more details. @@ -71,9 +66,6 @@ def __init__(self, In tracking_params, parameters with the suffix mask which are also an image from AFQ.definitions.image will be handled automatically by the api. - - It is recommended that you leave the bids_info parameter as None, - and instead pass in the paths to the files you want to use directly. """ if not isinstance(output_dir, str): raise TypeError( @@ -96,7 +88,6 @@ def __init__(self, "did you mean tracking_params ?")) self.logger = logging.getLogger('AFQ') - self.bids_info = _bids_info self.kwargs = dict( dwi_data_file=dwi_data_file, @@ -112,25 +103,22 @@ def make_workflow(self): if "mapping_definition" in self.kwargs and isinstance( self.kwargs["mapping_definition"], SlrMap): plans = { # if using SLR map, do tractography first - "data": get_data_plan(self.kwargs, self.bids_info), + "data": get_data_plan(self.kwargs), "tractography": get_tractography_plan( - self.kwargs, - self.bids_info + self.kwargs ), "mapping": get_mapping_plan( self.kwargs, - self.bids_info, use_sls=True ), "segmentation": get_segmentation_plan(self.kwargs), "viz": get_viz_plan(self.kwargs)} else: plans = { # Otherwise, do mapping first - "data": get_data_plan(self.kwargs, self.bids_info), - "mapping": get_mapping_plan(self.kwargs, self.bids_info), + "data": get_data_plan(self.kwargs), + "mapping": get_mapping_plan(self.kwargs), "tractography": get_tractography_plan( - self.kwargs, - self.bids_info + self.kwargs ), "segmentation": get_segmentation_plan(self.kwargs), "viz": get_viz_plan(self.kwargs)} @@ -139,7 +127,6 @@ def make_workflow(self): previous_data = {} for name, plan in plans.items(): previous_data[f"{name}_imap"] = plan( - bids_info=self.bids_info, **self.kwargs, **previous_data) diff --git a/AFQ/definitions/image.py b/AFQ/definitions/image.py index 9b5086244..c0181af4e 100644 --- a/AFQ/definitions/image.py +++ b/AFQ/definitions/image.py @@ -138,16 +138,13 @@ def find_path(self, bids_layout, from_path, if nearest_image is None: return False - if session not in self.fnames: - self.fnames[session] = {} - self.fnames[session][subject] = nearest_image + self.fnames[from_path] = nearest_image - def get_path_data_affine(self, bids_info): + def get_path_data_affine(self, dwi_path): if self._from_path: image_file = self.fname else: - image_file = self.fnames[ - bids_info['session']][bids_info['subject']] + image_file = self.fnames[dwi_path] image_img = nib.load(image_file) return image_file, image_img.get_fdata(), image_img.affine @@ -159,10 +156,10 @@ def get_name(self): return name_from_path(self.fname) if self._from_path else self.suffix def get_image_getter(self, task_name): - def _image_getter_helper(dwi, bids_info): + def _image_getter_helper(dwi, dwi_data_file): # Load data image_file, image_data_orig, image_affine = \ - self.get_path_data_affine(bids_info) + self.get_path_data_affine(dwi_data_file) # Apply any conditions on the data image_data, meta = self.apply_conditions( @@ -178,11 +175,11 @@ def _image_getter_helper(dwi, bids_info): image_data.astype(np.float32), dwi.affine), meta if task_name == "data": - def image_getter(dwi, bids_info): - return _image_getter_helper(dwi, bids_info) + def image_getter(dwi, dwi_data_file): + return _image_getter_helper(dwi, dwi_data_file) else: - def image_getter(data_imap, bids_info): - return _image_getter_helper(data_imap["dwi"], bids_info) + def image_getter(data_imap, dwi_data_file): + return _image_getter_helper(data_imap["dwi"], dwi_data_file) return image_getter diff --git a/AFQ/definitions/mapping.py b/AFQ/definitions/mapping.py index f889f2527..64061a99a 100644 --- a/AFQ/definitions/mapping.py +++ b/AFQ/definitions/mapping.py @@ -120,15 +120,14 @@ def find_path(self, bids_layout, from_path, bids_layout, from_path, self.space_filters, self.space_suffix, session, subject, required=required) - self.fnames[session][subject] = (nearest_warp, nearest_space) + self.fnames[from_path] = (nearest_warp, nearest_space) - def get_for_subses(self, base_fname, dwi, bids_info, reg_subject, + def get_for_subses(self, base_fname, dwi, dwi_data_file, reg_subject, reg_template): if self._from_path: nearest_warp, nearest_space = self.fnames else: - nearest_warp, nearest_space = self.fnames[ - bids_info['session']][bids_info['subject']] + nearest_warp, nearest_space = self.fnames[dwi_data_file] our_templ = reg_template subj = Image(dwi) @@ -182,7 +181,7 @@ class IdentityMap(Definition): def __init__(self): pass - def get_for_subses(self, base_fname, dwi, bids_info, reg_subject, + def get_for_subses(self, base_fname, dwi, dwi_data_file, reg_subject, reg_template): return ConformedAffineMapping( np.identity(4), @@ -231,7 +230,7 @@ def prealign(self, base_fname, reg_subject, reg_template, save=True): write_json(meta_fname, meta) return prealign_file if save else np.load(prealign_file) - def get_for_subses(self, base_fname, dwi, bids_info, reg_subject, + def get_for_subses(self, base_fname, dwi, dwi_data_file, reg_subject, reg_template, subject_sls=None, template_sls=None): mapping_file, meta_fname = self.get_fnames( self.extension, base_fname) diff --git a/AFQ/tasks/data.py b/AFQ/tasks/data.py index e3a3a0d3c..cb782124d 100644 --- a/AFQ/tasks/data.py +++ b/AFQ/tasks/data.py @@ -919,7 +919,7 @@ def brain_mask(b0, brain_mask_definition=None): @pimms.calc("bundle_dict", "reg_template") def get_bundle_dict(segmentation_params, - brain_mask, bids_info, b0, + brain_mask, b0, bundle_info=None, reg_template_spec="mni_T1"): """ Dictionary defining the different bundles to be segmented, @@ -996,16 +996,10 @@ def get_bundle_dict(segmentation_params, seg_algo=segmentation_params["seg_algo"], resample_to=reg_template) - if bids_info is not None: - bundle_dict.set_bids_info( - bids_info["bids_layout"], - b0, - bids_info["subject"], - bids_info["session"]) return bundle_dict, reg_template -def get_data_plan(kwargs, bids_info): +def get_data_plan(kwargs): if "scalars" in kwargs and not ( isinstance(kwargs["scalars"], list) and isinstance( kwargs["scalars"][0], (str, Definition))): @@ -1051,12 +1045,6 @@ def get_data_plan(kwargs, bids_info): if not isinstance(bm_def, Definition): raise TypeError( "brain_mask_definition must be a Definition") - if bids_info is not None: - bm_def.find_path( - bids_info["bids_layout"], - kwargs["dwi_data_file"], - bids_info["subject"], - bids_info["session"]) data_tasks["brain_mask_res"] = pimms.calc("brain_mask")( as_file(( f'_desc-{str_to_desc(bm_def.get_name())}' diff --git a/AFQ/tasks/mapping.py b/AFQ/tasks/mapping.py index cc0753a52..5971fa994 100644 --- a/AFQ/tasks/mapping.py +++ b/AFQ/tasks/mapping.py @@ -71,7 +71,7 @@ def export_rois(base_fname, output_dir, data_imap, mapping): @pimms.calc("mapping") -def mapping(base_fname, dwi_data_file, reg_subject, data_imap, bids_info, +def mapping(base_fname, dwi_data_file, reg_subject, data_imap, mapping_definition=None): """ mapping from subject to template space. @@ -93,19 +93,13 @@ def mapping(base_fname, dwi_data_file, reg_subject, data_imap, bids_info, raise TypeError( "mapping must be a mapping defined" + " in `AFQ.definitions.mapping`") - if bids_info is not None: - mapping_definition.find_path( - bids_info["bids_layout"], - dwi_data_file, - bids_info["subject"], - bids_info["session"]) return mapping_definition.get_for_subses( - base_fname, data_imap["dwi"], bids_info, + base_fname, data_imap["dwi"], dwi_data_file, reg_subject, reg_template) @pimms.calc("mapping") -def sls_mapping(base_fname, dwi_data_file, reg_subject, data_imap, bids_info, +def sls_mapping(base_fname, dwi_data_file, reg_subject, data_imap, tractography_imap, mapping_definition=None): """ mapping from subject to template space. @@ -127,12 +121,6 @@ def sls_mapping(base_fname, dwi_data_file, reg_subject, data_imap, bids_info, raise TypeError( "mapping must be a mapping defined" + " in `AFQ.definitions.mapping`") - if bids_info is not None: - mapping_definition.find_path( - bids_info["bids_layout"], - dwi_data_file, - bids_info["subject"], - bids_info["session"]) streamlines_file = tractography_imap["streamlines"] tg = load_tractogram( streamlines_file, reg_subject, @@ -151,7 +139,8 @@ def sls_mapping(base_fname, dwi_data_file, reg_subject, data_imap, bids_info, atlas_fname, 'same', bbox_valid_check=False) return mapping_definition.get_for_subses( - base_fname, data_imap["dwi"], bids_info, + base_fname, data_imap["dwi"], + dwi_data_file, reg_subject, reg_template, subject_sls=tg.streamlines, template_sls=hcp_atlas.streamlines) @@ -200,7 +189,7 @@ def get_reg_subject(data_imap, return img -def get_mapping_plan(kwargs, bids_info, use_sls=False): +def get_mapping_plan(kwargs, use_sls=False): mapping_tasks = with_name([ export_registered_b0, template_xform, export_rois, mapping, get_reg_subject]) @@ -208,38 +197,18 @@ def get_mapping_plan(kwargs, bids_info, use_sls=False): # add custom scalars for scalar in kwargs["scalars"]: if isinstance(scalar, Definition): - if bids_info is None: - scalar.find_path( - None, - kwargs["dwi_data_file"], - None, - None) - scalar_found = True - else: - scalar_found = scalar.find_path( - bids_info["bids_layout"], - kwargs["dwi_data_file"], - bids_info["subject"], - bids_info["session"], - required=False) - if scalar_found != False: - mapping_tasks[f"{scalar.get_name()}_res"] =\ - pimms.calc(f"{scalar.get_name()}")( - as_file(( - f'_desc-{str_to_desc(scalar.get_name())}' - '_dwi.nii.gz'))( - scalar.get_image_getter("mapping"))) + mapping_tasks[f"{scalar.get_name()}_res"] =\ + pimms.calc(f"{scalar.get_name()}")( + as_file(( + f'_desc-{str_to_desc(scalar.get_name())}' + '_dwi.nii.gz'))( + scalar.get_image_getter("mapping"))) if use_sls: mapping_tasks["mapping_res"] = sls_mapping reg_ss = kwargs.get("reg_subject_spec", None) if isinstance(reg_ss, ImageDefinition): - reg_ss.find_path( - bids_info["bids_layout"], - kwargs["dwi_data_file"], - bids_info["subject"], - bids_info["session"]) del kwargs["reg_subject_spec"] mapping_tasks["reg_subject_spec_res"] = pimms.calc("reg_subject_spec")( as_file(( diff --git a/AFQ/tasks/tractography.py b/AFQ/tasks/tractography.py index a06868a32..248696db6 100644 --- a/AFQ/tasks/tractography.py +++ b/AFQ/tasks/tractography.py @@ -154,7 +154,7 @@ def streamlines(data_imap, seed, stop, @pimms.calc("streamlines") -def custom_tractography(bids_info, import_tract=None): +def custom_tractography(import_tract=None): """ full path to the complete, unsegmented tractography file @@ -166,42 +166,10 @@ def custom_tractography(bids_info, import_tract=None): to generate the tractography. Default: None """ - if not isinstance(import_tract, dict) and\ - not isinstance(import_tract, str): + if not isinstance(import_tract, str): raise TypeError( "import_tract must be" + " either a dict or a str") - if isinstance(import_tract, dict): - if bids_info is None: - raise ValueError(( - "bids_info must be provided if using" - " bids filters to find imported tracts")) - import_tract = \ - bids_info["bids_layout"].get( - subject=bids_info["subject"], - session=bids_info["session"], - extension=[ - '.trk', - '.tck', - '.vtk', - '.fib', - '.dpy'], - return_type='filename', - **import_tract) - if len(import_tract) < 1: - raise ValueError(( - "No custom tractography found for subject " - f"{bids_info['subject']} and session " - f"{bids_info['session']}.")) - elif len(import_tract) > 1: - import_tract = import_tract[0] - logger.warning(( - f"Multiple viable custom tractographies found for" - f" subject " - f"{bids_info['subject']} and session " - f"{bids_info['session']}. Will use: {import_tract}")) - else: - import_tract = import_tract[0] return import_tract @@ -243,7 +211,7 @@ def gpu_tractography(data_imap, tracking_params, seed, stop, sft, seed, stop) -def get_tractography_plan(kwargs, bids_info): +def get_tractography_plan(kwargs): if "tracking_params" in kwargs\ and not isinstance(kwargs["tracking_params"], dict): raise TypeError( @@ -313,20 +281,6 @@ def get_tractography_plan(kwargs, bids_info): stop_mask = kwargs["tracking_params"]['stop_mask'] seed_mask = kwargs["tracking_params"]['seed_mask'] - if bids_info is not None: - if isinstance(stop_mask, Definition): - stop_mask.find_path( - bids_info["bids_layout"], - kwargs["dwi_data_file"], - bids_info["subject"], - bids_info["session"]) - if isinstance(seed_mask, Definition): - seed_mask.find_path( - bids_info["bids_layout"], - kwargs["dwi_data_file"], - bids_info["subject"], - bids_info["session"]) - if kwargs["tracking_params"]["tracker"] == "pft": probseg_funcs = stop_mask.get_image_getter("tractography") tractography_tasks["wm_res"] = pimms.calc("pve_wm")(probseg_funcs[0]) diff --git a/AFQ/tests/test_api.py b/AFQ/tests/test_api.py index 0157b5da4..016ea48c9 100644 --- a/AFQ/tests/test_api.py +++ b/AFQ/tests/test_api.py @@ -601,7 +601,7 @@ def test_AFQ_reco80(): def test_AFQ_pydra(): _, bids_path = afd.fetch_hbn_preproc(["NDARAA948VFH", "NDARAV554TP2"]) pga = ParallelGroupAFQ(bids_path, preproc_pipeline="qsiprep") - pga.export_all() + pga.export("dti_fa") @pytest.mark.nightly_pft From f58bb9528758c907b20619ee427abafeca3b0212 Mon Sep 17 00:00:00 2001 From: 36000 Date: Mon, 6 May 2024 12:40:09 -0700 Subject: [PATCH 14/17] update this test --- AFQ/tests/test_definitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AFQ/tests/test_definitions.py b/AFQ/tests/test_definitions.py index 7e70179f5..de1f774b1 100644 --- a/AFQ/tests/test_definitions.py +++ b/AFQ/tests/test_definitions.py @@ -62,7 +62,7 @@ def test_find_path(subject, session): image_file = ImageFile(suffix="seg", filters={'scope': 'synthetic'}) image_file.find_path(bids_layout, test_dwi_path, subject, session) - assert image_file.fnames[session][subject] == op.join( + assert image_file.fnames[test_dwi_path] == op.join( bids_dir, "derivatives", "dmriprep", "sub-" + subject, "ses-" + session, "anat", "seg.nii.gz" ) From e7a899a07a8644b138d90d7ad7dc6fd8f4431b1c Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Mon, 13 May 2024 16:25:39 -0700 Subject: [PATCH 15/17] Updates syntax for setting defaults --- AFQ/api/group.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index 621fc3774..da8ea6ef3 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -161,6 +161,8 @@ def __init__(self, if not isinstance(bids_layout_kwargs, dict): raise TypeError("bids_layout_kwargs must be a dict") + parallel_params["engine"] = parallel_params.get("engine", "serial") + self.logger = logger self.parallel_params = parallel_params @@ -1006,11 +1008,11 @@ class ParallelGroupAFQ(): def __init__(self, *args, **kwargs): orig = GroupAFQ(*args, **kwargs) - if "submitter_params" not in orig.parallel_params: - orig.parallel_params["submitter_params"] = {"plugin": "cf"} - - if "cache_dir" not in orig.parallel_params: - orig.parallel_params["cache_dir"] = None + orig.parallel_params["submitter_params"] = \ + orig.parallel_params.get("submitter_params", {"plugin": "cf"}) + + orig.parallel_params["cache_dir"] = \ + orig.parallel_params.get("cache_dir", None) self.parallel_params = orig.parallel_params self.pAFQ_kwargs = orig.pAFQ_inputs_list From 5109ffce7bd89a53cb2f77bca7e94da3fef243e8 Mon Sep 17 00:00:00 2001 From: Teresa Gomez <46339554+teresamg@users.noreply.github.com> Date: Mon, 13 May 2024 16:37:13 -0700 Subject: [PATCH 16/17] Adds comment about AttributeError --- AFQ/api/group.py | 1 + 1 file changed, 1 insertion(+) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index da8ea6ef3..f748bac8b 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -1029,6 +1029,7 @@ def _submit_pydra(self, runnable): **self.parallel_params["submitter_params"], ) as sub: sub(runnable=runnable) + # Addresses https://github.com/nipype/pydra/issues/630 except AttributeError as e: if "'NoneType' object has no attribute 'replace'" not in str(e): raise From 4a3465b07fa478c8ba9af6754a7a118576a9b484 Mon Sep 17 00:00:00 2001 From: Ariel Rokem Date: Wed, 15 May 2024 16:45:42 -0700 Subject: [PATCH 17/17] Update AFQ/api/group.py --- AFQ/api/group.py | 1 - 1 file changed, 1 deletion(-) diff --git a/AFQ/api/group.py b/AFQ/api/group.py index f748bac8b..892ddefc1 100644 --- a/AFQ/api/group.py +++ b/AFQ/api/group.py @@ -161,7 +161,6 @@ def __init__(self, if not isinstance(bids_layout_kwargs, dict): raise TypeError("bids_layout_kwargs must be a dict") - parallel_params["engine"] = parallel_params.get("engine", "serial") self.logger = logger