-
-
Notifications
You must be signed in to change notification settings - Fork 41
/
cpac_pipeline.py
3587 lines (2851 loc) · 157 KB
/
cpac_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import time
import six
import re
import csv
import shutil
import pickle
import copy
import json
import pandas as pd
import pkg_resources as p
import networkx as nx
import logging as cb_logging
from time import strftime
import nipype
import nipype.pipeline.engine as pe
import nipype.interfaces.fsl as fsl
import nipype.interfaces.io as nio
import nipype.interfaces.utility as util
from nipype.interfaces.afni import preprocess
import nipype.interfaces.ants as ants
import nipype.interfaces.c3 as c3
from nipype.interfaces.utility import Merge
from nipype.pipeline.engine.utils import format_dot
from nipype import config
from nipype import logging
from indi_aws import aws_utils, fetch_creds
import CPAC
from CPAC.network_centrality.pipeline import (
create_network_centrality_workflow
)
from CPAC.anat_preproc.anat_preproc import (
create_anat_preproc
)
from CPAC.anat_preproc.lesion_preproc import create_lesion_preproc
from CPAC.func_preproc.func_ingress import (
connect_func_ingress
)
from CPAC.func_preproc.func_preproc import (
connect_func_init,
connect_func_preproc
)
from CPAC.distortion_correction.distortion_correction import (
connect_distortion_correction
)
from CPAC.seg_preproc.seg_preproc import (
connect_anat_segmentation,
create_seg_preproc_template_based
)
from CPAC.seg_preproc.utils import mask_erosion
from CPAC.image_utils import (
spatial_smooth_outputs,
z_score_standardize,
fisher_z_score_standardize,
calc_avg
)
from CPAC.registration import (
create_fsl_flirt_linear_reg,
create_fsl_fnirt_nonlinear_reg,
create_wf_calculate_ants_warp,
connect_func_to_anat_init_reg,
connect_func_to_anat_bbreg,
connect_func_to_template_reg,
output_func_to_standard
)
from CPAC.nuisance import create_regressor_workflow, \
create_nuisance_regression_workflow, \
filtering_bold_and_regressors, \
bandpass_voxels, \
NuisanceRegressor
from CPAC.aroma import create_aroma
from CPAC.median_angle import create_median_angle_correction
from CPAC.generate_motion_statistics import motion_power_statistics
from CPAC.scrubbing import create_scrubbing_preproc
from CPAC.timeseries import (
get_roi_timeseries,
get_voxel_timeseries,
get_vertices_timeseries,
get_spatial_map_timeseries
)
from CPAC.vmhc.vmhc import create_vmhc
from CPAC.reho.reho import create_reho
from CPAC.alff.alff import create_alff
from CPAC.sca.sca import create_sca, create_temporal_reg
from CPAC.connectome.pipeline import create_connectome
from CPAC.utils.datasource import (
create_anat_datasource,
create_roi_mask_dataflow,
create_spatial_map_dataflow,
create_check_for_s3_node,
resolve_resolution,
resample_func_roi
)
from CPAC.utils.trimmer import the_trimmer
from CPAC.utils import Configuration, Strategy, Outputs, find_files
from CPAC.utils.interfaces.function import Function
from CPAC.utils.interfaces.datasink import DataSink
from CPAC.qc.pipeline import create_qc_workflow
from CPAC.qc.utils import generate_qc_pages
from CPAC.utils.utils import (
extract_one_d,
get_tr,
extract_txt,
extract_output_mean,
create_output_mean_csv,
get_zscore,
get_fisher_zscore,
concat_list,
check_config_resources,
check_system_deps,
ordereddict_to_dict
)
from CPAC.utils.monitoring import log_nodes_initial, log_nodes_cb
logger = logging.getLogger('nipype.workflow')
# config.enable_debug_mode()
def run_workflow(sub_dict, c, run, pipeline_timing_info=None, p_name=None,
plugin='MultiProc', plugin_args=None, test_config=False):
'''
Function to prepare and, optionally, run the C-PAC workflow
Parameters
----------
sub_dict : dictionary
subject dictionary with anatomical and functional image paths
c : Configuration object
CPAC pipeline configuration dictionary object
run : boolean
flag to indicate whether to run the prepared workflow
pipeline_timing_info : list (optional); default=None
list of pipeline info for reporting timing information
p_name : string (optional); default=None
name of pipeline
plugin : string (optional); defaule='MultiProc'
nipype plugin to utilize when the workflow is ran
plugin_args : dictionary (optional); default=None
plugin-specific arguments for the workflow plugin
Returns
-------
workflow : nipype workflow
the prepared nipype workflow object containing the parameters
specified in the config
'''
# Assure that changes on config will not affect other parts
c = copy.copy(c)
subject_id = sub_dict['subject_id']
if sub_dict['unique_id']:
subject_id += "_" + sub_dict['unique_id']
log_dir = os.path.join(c.logDirectory, 'pipeline_%s' % c.pipelineName,
subject_id)
if not os.path.exists(log_dir):
os.makedirs(os.path.join(log_dir))
# TODO ASH Enforce c.run_logging to be boolean
# TODO ASH Schema validation
config.update_config({
'logging': {
'log_directory': log_dir,
'log_to_file': bool(getattr(c, 'run_logging', True))
}
})
config.enable_resource_monitor()
logging.update_logging(config)
# Start timing here
pipeline_start_time = time.time()
# at end of workflow, take timestamp again, take time elapsed and check
# tempfile add time to time data structure inside tempfile, and increment
# number of subjects
# Check pipeline config resources
sub_mem_gb, num_cores_per_sub, num_ants_cores = check_config_resources(c)
if not plugin:
plugin = 'MultiProc'
if plugin_args:
plugin_args['memory_gb'] = sub_mem_gb
plugin_args['n_procs'] = num_cores_per_sub
else:
plugin_args = {'memory_gb': sub_mem_gb, 'n_procs': num_cores_per_sub}
# perhaps in future allow user to set threads maximum
# this is for centrality mostly
# import mkl
numThreads = '1'
os.environ['OMP_NUM_THREADS'] = '1' # str(num_cores_per_sub)
os.environ['MKL_NUM_THREADS'] = '1' # str(num_cores_per_sub)
os.environ['ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS'] = str(num_ants_cores)
# TODO: TEMPORARY
# TODO: solve the UNet model hanging issue during MultiProc
if "unet" in c.skullstrip_option:
c.maxCoresPerParticipant = 1
logger.info("\n\n[!] LOCKING CPUs PER PARTICIPANT TO 1 FOR U-NET "
"MODEL.\n\nThis is a temporary measure due to a known "
"issue preventing Nipype's parallelization from running "
"U-Net properly.\n\n")
# calculate maximum potential use of cores according to current pipeline
# configuration
max_core_usage = int(c.maxCoresPerParticipant) * \
int(c.numParticipantsAtOnce)
ndmg_out = False
try:
if "ndmg" in c.output_tree:
ndmg_out = True
except:
pass
try:
creds_path = sub_dict['creds_path']
if creds_path and 'none' not in creds_path.lower():
if os.path.exists(creds_path):
input_creds_path = os.path.abspath(creds_path)
else:
err_msg = 'Credentials path: "%s" for subject "%s" was not ' \
'found. Check this path and try again.' % (
creds_path, subject_id)
raise Exception(err_msg)
else:
input_creds_path = None
except KeyError:
input_creds_path = None
# TODO enforce value with schema validation
try:
encrypt_data = bool(c.s3Encryption[0])
except:
encrypt_data = False
information = """
C-PAC version: {cpac_version}
Setting maximum number of cores per participant to {cores}
Setting number of participants at once to {participants}
Setting OMP_NUM_THREADS to {threads}
Setting MKL_NUM_THREADS to {threads}
Setting ANTS/ITK thread usage to {ants_threads}
Maximum potential number of cores that might be used during this run: {max_cores}
"""
execution_info = """
End of subject workflow {workflow}
CPAC run complete:
Pipeline configuration: {pipeline}
Subject workflow: {workflow}
Elapsed run time (minutes): {elapsed}
Timing information saved in {log_dir}/cpac_individual_timing_{pipeline}.csv
System time of start: {run_start}
System time of completion: {run_finish}
"""
logger.info(information.format(
cpac_version=CPAC.__version__,
cores=c.maxCoresPerParticipant,
participants=c.numParticipantsAtOnce,
threads=numThreads,
ants_threads=c.num_ants_threads,
max_cores=max_core_usage
))
subject_info = {}
subject_info['subject_id'] = subject_id
subject_info['start_time'] = pipeline_start_time
check_centrality_degree = 1 in c.runNetworkCentrality and \
(True in c.degWeightOptions or \
True in c.eigWeightOptions)
check_centrality_lfcd = 1 in c.runNetworkCentrality and \
True in c.lfcdWeightOptions
# Check system dependencies
check_system_deps(check_ants='ANTS' in c.regOption,
check_ica_aroma='1' in str(c.runICA[0]),
check_centrality_degree=check_centrality_degree,
check_centrality_lfcd=check_centrality_lfcd)
# absolute paths of the dirs
c.workingDirectory = os.path.abspath(c.workingDirectory)
if 's3://' not in c.outputDirectory:
c.outputDirectory = os.path.abspath(c.outputDirectory)
workflow, strat_list, pipeline_ids = build_workflow(
subject_id, sub_dict, c, p_name, num_ants_cores
)
forks = "\n\nStrategy forks:\n" + \
"\n".join(["- " + pipe for pipe in sorted(set(pipeline_ids))]) + \
"\n\n"
logger.info(forks)
if test_config:
logger.info('This has been a test of the pipeline configuration '
'file, the pipeline was built successfully, but was '
'not run')
else:
working_dir = os.path.join(c.workingDirectory, workflow.name)
#if c.write_debugging_outputs:
# with open(os.path.join(working_dir, 'resource_pool.pkl'), 'wb') as f:
# pickle.dump(strat_list, f)
if c.reGenerateOutputs is True:
erasable = list(find_files(working_dir, '*sink*')) + \
list(find_files(working_dir, '*link*')) + \
list(find_files(working_dir, '*log*'))
for f in erasable:
if os.path.isfile(f):
os.remove(f)
else:
shutil.rmtree(f)
if hasattr(c, 'trim') and c.trim:
logger.warn("""
Trimming is an experimental feature, and if used wrongly, it can lead to unreproducible results.
It is useful for performance optimization, but only if used correctly.
Please, make yourself aware of how it works and its assumptions:
- The pipeline configuration has not changed;
- The data configuration / BIDS directory has not changed;
- The files from the output directory has not changed;
- Your softwares versions has not changed;
- Your C-PAC version has not changed;
- You do not have access to the working directory.
""")
workflow, _ = the_trimmer(
workflow,
output_dir=c.outputDirectory,
s3_creds_path=input_creds_path,
)
pipeline_start_datetime = strftime("%Y-%m-%d %H:%M:%S")
try:
subject_info['resource_pool'] = []
for strat_no, strat in enumerate(strat_list):
strat_label = 'strat_%d' % strat_no
subject_info[strat_label] = strat.get_name()
subject_info['resource_pool'].append(strat.get_resource_pool())
subject_info['status'] = 'Running'
# Create callback logger
cb_log_filename = os.path.join(log_dir,
'callback.log')
try:
if not os.path.exists(os.path.dirname(cb_log_filename)):
os.makedirs(os.path.dirname(cb_log_filename))
except IOError:
pass
# Add handler to callback log file
cb_logger = cb_logging.getLogger('callback')
cb_logger.setLevel(cb_logging.DEBUG)
handler = cb_logging.FileHandler(cb_log_filename)
cb_logger.addHandler(handler)
# Log initial information from all the nodes
log_nodes_initial(workflow)
# Add status callback function that writes in callback log
if nipype.__version__ not in ('1.1.2'):
err_msg = "This version of Nipype may not be compatible with " \
"CPAC v%s, please install Nipype version 1.1.2\n" \
% (CPAC.__version__)
logger.error(err_msg)
else:
plugin_args['status_callback'] = log_nodes_cb
if plugin_args['n_procs'] == 1:
plugin = 'Linear'
try:
# Actually run the pipeline now, for the current subject
workflow.run(plugin=plugin, plugin_args=plugin_args)
except UnicodeDecodeError:
raise EnvironmentError(
"C-PAC migrated from Python 2 to Python 3 in v1.6.2 (see "
"release notes). Your working directory contains Python 2 "
"pickles, probably from an older version of C-PAC. If you "
"want to continue to use this working directory, run\n\n"
"docker run -i --rm --user $(id -u):$(id -g) "
"-v /path/to/working_dir:/working "
"fcpindi/c-pac:latest /bids_dir /outputs cli -- "
"utils repickle /working\n"
"\nor\n\n"
"singularity run "
"C-PAC_latest.sif /bids_dir /outputs cli -- "
"utils repickle /path/to/working_dir\n\n"
"before running C-PAC >=v1.6.2"
)
# PyPEER kick-off
if 1 in c.run_pypeer:
from CPAC.pypeer.peer import prep_for_pypeer
prep_for_pypeer(c.peer_eye_scan_names, c.peer_data_scan_names,
c.eye_mask_path, c.outputDirectory, subject_id,
pipeline_ids, c.peer_stimulus_path, c.peer_gsr,
c.peer_scrub, c.peer_scrub_thresh)
# Dump subject info pickle file to subject log dir
subject_info['status'] = 'Completed'
subject_info_file = os.path.join(
log_dir, 'subject_info_%s.pkl' % subject_id
)
with open(subject_info_file, 'wb') as info:
pickle.dump(list(subject_info), info)
# have this check in case the user runs cpac_runner from terminal and
# the timing parameter list is not supplied as usual by the GUI
if pipeline_timing_info != None:
# pipeline_timing_info list:
# [0] - unique pipeline ID
# [1] - pipeline start time stamp (first click of 'run' from GUI)
# [2] - number of subjects in subject list
unique_pipeline_id = pipeline_timing_info[0]
pipeline_start_stamp = pipeline_timing_info[1]
num_subjects = pipeline_timing_info[2]
# elapsed time data list:
# [0] - elapsed time in minutes
elapsed_time_data = []
elapsed_time_data.append(
int(((time.time() - pipeline_start_time) / 60)))
# elapsedTimeBin list:
# [0] - cumulative elapsed time (minutes) across all subjects
# [1] - number of times the elapsed time has been appended
# (effectively a measure of how many subjects have run)
# TODO
# write more doc for all this
# warning in .csv that some runs may be partial
# code to delete .tmp file
timing_temp_file_path = os.path.join(c.logDirectory,
'%s_pipeline_timing.tmp' % unique_pipeline_id)
if not os.path.isfile(timing_temp_file_path):
elapsedTimeBin = []
elapsedTimeBin.append(0)
elapsedTimeBin.append(0)
with open(timing_temp_file_path, 'wb') as handle:
pickle.dump(elapsedTimeBin, handle)
with open(timing_temp_file_path, 'rb') as handle:
elapsedTimeBin = pickle.loads(handle.read())
elapsedTimeBin[0] = elapsedTimeBin[0] + elapsed_time_data[0]
elapsedTimeBin[1] = elapsedTimeBin[1] + 1
with open(timing_temp_file_path, 'wb') as handle:
pickle.dump(elapsedTimeBin, handle)
# this happens once the last subject has finished running!
if elapsedTimeBin[1] == num_subjects:
pipelineTimeDict = {}
pipelineTimeDict['Pipeline'] = c.pipelineName
pipelineTimeDict['Cores_Per_Subject'] = c.maxCoresPerParticipant
pipelineTimeDict['Simultaneous_Subjects'] = c.numParticipantsAtOnce
pipelineTimeDict['Number_of_Subjects'] = num_subjects
pipelineTimeDict['Start_Time'] = pipeline_start_stamp
pipelineTimeDict['End_Time'] = strftime("%Y-%m-%d_%H:%M:%S")
pipelineTimeDict['Elapsed_Time_(minutes)'] = elapsedTimeBin[0]
pipelineTimeDict['Status'] = 'Complete'
gpaTimeFields = [
'Pipeline', 'Cores_Per_Subject',
'Simultaneous_Subjects',
'Number_of_Subjects', 'Start_Time',
'End_Time', 'Elapsed_Time_(minutes)',
'Status'
]
timeHeader = dict(zip(gpaTimeFields, gpaTimeFields))
with open(os.path.join(
c.logDirectory,
'cpac_individual_timing_%s.csv' % c.pipelineName
), 'a') as timeCSV, open(os.path.join(
c.logDirectory,
'cpac_individual_timing_%s.csv' % c.pipelineName
), 'r') as readTimeCSV:
timeWriter = csv.DictWriter(timeCSV, fieldnames=gpaTimeFields)
timeReader = csv.DictReader(readTimeCSV)
headerExists = False
for line in timeReader:
if 'Start_Time' in line:
headerExists = True
if headerExists == False:
timeWriter.writerow(timeHeader)
timeWriter.writerow(pipelineTimeDict)
# remove the temp timing file now that it is no longer needed
os.remove(timing_temp_file_path)
# Upload logs to s3 if s3_str in output directory
if c.outputDirectory.lower().startswith('s3://'):
try:
# Store logs in s3 output director/logs/...
s3_log_dir = os.path.join(
c.outputDirectory,
'logs',
os.path.basename(log_dir)
)
bucket_name = c.outputDirectory.split('/')[2]
bucket = fetch_creds.return_bucket(creds_path, bucket_name)
# Collect local log files
local_log_files = []
for root, _, files in os.walk(log_dir):
local_log_files.extend([os.path.join(root, fil)
for fil in files])
# Form destination keys
s3_log_files = [loc.replace(log_dir, s3_log_dir)
for loc in local_log_files]
# Upload logs
aws_utils.s3_upload(bucket,
(local_log_files, s3_log_files),
encrypt=encrypt_data)
# Delete local log files
for log_f in local_log_files:
os.remove(log_f)
except Exception as exc:
err_msg = 'Unable to upload CPAC log files in: %s.\nError: %s'
logger.error(err_msg, log_dir, exc)
except Exception as e:
import traceback; traceback.print_exc()
execution_info = """
Error of subject workflow {workflow}
CPAC run error:
Pipeline configuration: {pipeline}
Subject workflow: {workflow}
Elapsed run time (minutes): {elapsed}
Timing information saved in {log_dir}/cpac_individual_timing_{pipeline}.csv
System time of start: {run_start}
"""
finally:
if 1 in c.generateQualityControlImages and not ndmg_out:
for pip_id in pipeline_ids:
pipeline_base = os.path.join(c.outputDirectory,
'pipeline_{0}'.format(pip_id))
sub_output_dir = os.path.join(pipeline_base, subject_id)
qc_dir = os.path.join(sub_output_dir, 'qc')
generate_qc_pages(qc_dir)
if workflow:
logger.info(execution_info.format(
workflow=workflow.name,
pipeline=c.pipelineName,
log_dir=c.logDirectory,
elapsed=(time.time() - pipeline_start_time) / 60,
run_start=pipeline_start_datetime,
run_finish=strftime("%Y-%m-%d %H:%M:%S")
))
# Remove working directory when done
if c.removeWorkingDir:
try:
if os.path.exists(working_dir):
logger.info("Removing working dir: %s", working_dir)
shutil.rmtree(working_dir)
except:
logger.warn('Could not remove working directory %s',
working_dir)
def build_workflow(subject_id, sub_dict, c, pipeline_name=None, num_ants_cores=1):
# TODO ASH temporary code, remove
# TODO ASH maybe scheme validation/normalization
already_skullstripped = c.already_skullstripped[0]
if already_skullstripped == 2:
already_skullstripped = 0
elif already_skullstripped == 3:
already_skullstripped = 1
if 'ANTS' in c.regOption:
# if someone doesn't have anatRegANTSinterpolation in their pipe config,
# it will default to LanczosWindowedSinc
if not hasattr(c, 'anatRegANTSinterpolation'):
setattr(c, 'anatRegANTSinterpolation', 'LanczosWindowedSinc')
if c.anatRegANTSinterpolation not in ['Linear', 'BSpline', 'LanczosWindowedSinc']:
err_msg = 'The selected ANTS interpolation method may be in the list of values: "Linear", "BSpline", "LanczosWindowedSinc"'
raise Exception(err_msg)
# if someone doesn't have funcRegANTSinterpolation in their pipe config,
# it will default to LanczosWindowedSinc
if not hasattr(c, 'funcRegANTSinterpolation'):
setattr(c, 'funcRegANTSinterpolation', 'LanczosWindowedSinc')
if c.funcRegANTSinterpolation not in ['Linear', 'BSpline', 'LanczosWindowedSinc']:
err_msg = 'The selected ANTS interpolation method may be in the list of values: "Linear", "BSpline", "LanczosWindowedSinc"'
raise Exception(err_msg)
if 'FSL' in c.regOption:
# if someone doesn't have anatRegFSLinterpolation in their pipe config,
# it will default to sinc
if not hasattr(c, 'anatRegFSLinterpolation'):
setattr(c, 'anatRegFSLinterpolation', 'sinc')
if c.anatRegFSLinterpolation not in ["trilinear", "sinc", "spline"]:
err_msg = 'The selected FSL interpolation method may be in the list of values: "trilinear", "sinc", "spline"'
raise Exception(err_msg)
# Workflow setup
workflow_name = 'resting_preproc_' + str(subject_id)
workflow = pe.Workflow(name=workflow_name)
workflow.base_dir = c.workingDirectory
workflow.config['execution'] = {
'hash_method': 'timestamp',
'crashdump_dir': os.path.abspath(c.crashLogDirectory)
}
# Extract credentials path if it exists
try:
creds_path = sub_dict['creds_path']
if creds_path and 'none' not in creds_path.lower():
if os.path.exists(creds_path):
input_creds_path = os.path.abspath(creds_path)
else:
err_msg = 'Credentials path: "%s" for subject "%s" was not ' \
'found. Check this path and try again.' % (
creds_path, subject_id)
raise Exception(err_msg)
else:
input_creds_path = None
except KeyError:
input_creds_path = None
# check if lateral_ventricles_mask exist
if str(c.lateral_ventricles_mask).lower() in ['none', 'false']:
ventricle_mask_exist = False
else:
ventricle_mask_exist = True
# check acpc alignment target
if c.acpc_align and str(c.acpc_template_skull).lower() in ['none', 'false']:
err = "\n\n[!] C-PAC says: You have choosed ACPC alignment, " \
"but you did not provide ACPC alignment template. " \
"Options you provided:\nacpc_template_skull: {0}" \
'\n\n'.format(str(c.acpc_template_skull))
raise Exception(err)
elif c.acpc_align and str(c.acpc_template_skull).lower() not in ['none', 'false', ''] and str(c.acpc_template_brain).lower() in ['none', 'false', '']:
acpc_target = 'whole-head'
elif c.acpc_align and str(c.acpc_template_skull).lower() not in ['none', 'false', ''] and str(c.acpc_template_brain).lower() not in ['none', 'false', '']:
acpc_target = 'brain'
else:
acpc_target = None
# TODO ASH normalize file paths with schema validator
template_keys = [
("anat", "templateSpecificationFile"),
("anat", "lateral_ventricles_mask"),
("anat", "PRIORS_CSF"),
("anat", "PRIORS_GRAY"),
("anat", "PRIORS_WHITE"),
("other", "configFileTwomm"),
("anat", "template_based_segmentation_CSF"),
("anat", "template_based_segmentation_GRAY"),
("anat", "template_based_segmentation_WHITE"),
("anat", "template_based_segmentation_WHITE"),
("anat", "acpc_template_skull"),
("anat", "acpc_template_brain"),
]
for key_type, key in template_keys:
if isinstance(getattr(c, key), str) or getattr(c, key) == None:
node = create_check_for_s3_node(
key,
getattr(c, key), key_type,
input_creds_path, c.workingDirectory, map_node=False
)
setattr(c, key, node)
template_keys_in_list = [
("anat", "ANTs_prior_seg_template_brain_list"),
("anat", "ANTs_prior_seg_template_segmentation_list"),
]
for key_type, key in template_keys_in_list:
node = create_check_for_s3_node(
key,
getattr(c, key), key_type,
input_creds_path, c.workingDirectory, map_node=True
)
setattr(c, key, node)
"""""""""""""""""""""""""""""""""""""""""""""""""""
PREPROCESSING
"""""""""""""""""""""""""""""""""""""""""""""""""""
strat_initial = Strategy()
# The list of strategies that will be shared all along the pipeline creation
strat_list = []
num_strat = 0
anat_flow = create_anat_datasource('anat_gather_%d' % num_strat)
anat_flow.inputs.inputnode.set(
subject = subject_id,
anat = sub_dict['anat'],
creds_path = input_creds_path,
dl_dir = c.workingDirectory,
img_type = 'anat'
)
strat_initial.update_resource_pool({
'anatomical': (anat_flow, 'outputspec.anat')
})
anat_ingress = [
'brain_mask',
'lesion_mask',
'anatomical_csf_mask',
'anatomical_gm_mask',
'anatomical_wm_mask'
]
for key in anat_ingress:
if key in sub_dict.keys():
if sub_dict[key] and sub_dict[key].lower() != 'none':
anat_ingress_flow = create_anat_datasource(
f'anat_ingress_gather_{key}_{num_strat}')
anat_ingress_flow.inputs.inputnode.subject = subject_id
anat_ingress_flow.inputs.inputnode.anat = sub_dict[key]
anat_ingress_flow.inputs.inputnode.creds_path = input_creds_path
anat_ingress_flow.inputs.inputnode.dl_dir = c.workingDirectory
if key == 'brain_mask':
key = 'anatomical_brain_mask'
strat_initial.update_resource_pool({
key: (anat_ingress_flow, 'outputspec.anat')
})
templates_for_resampling = [
(c.resolution_for_anat, c.template_brain_only_for_anat, 'template_brain_for_anat', 'resolution_for_anat'),
(c.resolution_for_anat, c.template_skull_for_anat, 'template_skull_for_anat', 'resolution_for_anat'),
(c.resolution_for_anat, c.template_symmetric_brain_only, 'template_symmetric_brain', 'resolution_for_anat'),
(c.resolution_for_anat, c.template_symmetric_skull, 'template_symmetric_skull', 'resolution_for_anat'),
(c.resolution_for_anat, c.dilated_symmetric_brain_mask, 'template_dilated_symmetric_brain_mask', 'resolution_for_anat'),
(c.resolution_for_anat, c.ref_mask, 'template_ref_mask', 'resolution_for_anat'),
(c.resolution_for_func_preproc, c.template_brain_only_for_func, 'template_brain_for_func_preproc', 'resolution_for_func_preproc'),
(c.resolution_for_func_preproc, c.template_skull_for_func, 'template_skull_for_func_preproc', 'resolution_for_func_preproc'),
(c.resolution_for_func_preproc, c.template_epi, 'template_epi', 'resolution_for_func_preproc'), # no difference of skull and only brain
(c.resolution_for_func_derivative, c.template_epi, 'template_epi_derivative', 'resolution_for_func_derivative'), # no difference of skull and only brain
(c.resolution_for_func_derivative, c.template_brain_only_for_func, 'template_brain_for_func_derivative', 'resolution_for_func_preproc'),
(c.resolution_for_func_derivative, c.template_skull_for_func, 'template_skull_for_func_derivative', 'resolution_for_func_preproc'),
]
if 1 in c.run_pypeer:
templates_for_resampling.append((c.resolution_for_func_preproc, c.eye_mask_path, 'template_eye_mask', 'resolution_for_func_preproc'))
Outputs.any.append("template_eye_mask")
# update resampled template to resource pool
for resolution, template, template_name, tag in templates_for_resampling:
resampled_template = pe.Node(Function(input_names=['resolution', 'template', 'template_name', 'tag'],
output_names=['resampled_template'],
function=resolve_resolution,
as_module=True),
name='resampled_' + template_name)
resampled_template.inputs.resolution = resolution
resampled_template.inputs.template = template
resampled_template.inputs.template_name = template_name
resampled_template.inputs.tag = tag
strat_initial.update_resource_pool({
template_name: (resampled_template, 'resampled_template')
})
# update resource pool from data config
# TODO fork resource pool
if 'resource_pool' in sub_dict.keys():
resource_pool_list = sub_dict['resource_pool']
for num_strat, strat in enumerate(resource_pool_list.keys()):
new_strat = strat_initial.fork()
resource_pool_dict = sub_dict['resource_pool'][strat]
for key in resource_pool_dict.keys():
# handle anatomical_to_mni_nonlinear_xfm, mni_to_anatomical_nonlinear_xfm, ants xfms
if ('nonlinear_xfm' in key or 'ants' in key) and key not in strat:
longitudinal_flow = create_anat_datasource(f'{key}_gather_{num_strat}')
longitudinal_flow.inputs.inputnode.set(
subject = subject_id,
anat = resource_pool_dict[key],
creds_path = input_creds_path,
dl_dir = c.workingDirectory,
img_type = 'other'
)
new_strat.update_resource_pool({
key: (longitudinal_flow, 'outputspec.anat')
})
elif ('anatomical' in key or 'seg' in key) and key not in strat:
if 'seg_probability_maps' in key or 'seg_partial_volume_files' in key:
for num_key, file_path in enumerate(resource_pool_dict[key]):
longitudinal_flow = create_anat_datasource(f'{key}_{num_key}_gather_{num_strat}')
longitudinal_flow.inputs.inputnode.set(
subject = subject_id,
anat = file_path,
creds_path = input_creds_path,
dl_dir = c.workingDirectory,
img_type = 'anat'
)
concat_seg_map = pe.Node(Function(input_names=['in_list1', 'in_list2'],
output_names=['out_list'],
function=concat_list),
name=f'concat_{key}_{num_key}_{num_strat}')
workflow.connect(longitudinal_flow, 'outputspec.anat',
concat_seg_map, 'in_list1')
if num_key == 0:
new_strat.update_resource_pool({
f'temporary_{key}_list':(concat_seg_map, 'out_list')
})
else:
node, out_file = new_strat[f'temporary_{key}_list']
workflow.connect(node, out_file,
concat_seg_map, 'in_list2')
new_strat.update_resource_pool({
f'temporary_{key}_list':(concat_seg_map, 'out_list')
}, override=True)
new_strat.update_resource_pool({
key: (concat_seg_map, 'out_list')
})
else:
longitudinal_flow = create_anat_datasource(f'{key}_gather_{num_strat}')
longitudinal_flow.inputs.inputnode.set(
subject = subject_id,
anat = resource_pool_dict[key],
creds_path = input_creds_path,
dl_dir = c.workingDirectory,
img_type = 'anat'
)
new_strat.update_resource_pool({
key: (longitudinal_flow, 'outputspec.anat')
})
elif 'functional' in key:
# TODO
pass
else:
new_strat.update_resource_pool({
key: resource_pool_dict[key]
})
strat_list += [new_strat]
else:
strat_list += [strat_initial]
new_strat_list = []
if 'anatomical_to_standard' not in strat_list[0]:
for num_strat, strat in enumerate(strat_list):
if 'anatomical_brain_mask' in strat:
anat_preproc = create_anat_preproc(method='mask',
config=c,
acpc_target=acpc_target,
wf_name='anat_preproc_mask_%d' % num_strat)
new_strat = strat.fork()
node, out_file = new_strat['anatomical']
workflow.connect(node, out_file,
anat_preproc, 'inputspec.anat')
node, out_file = new_strat['anatomical_brain_mask']
workflow.connect(node, out_file,
anat_preproc, 'inputspec.brain_mask')
workflow.connect(c.acpc_template_skull, 'local_path',
anat_preproc, 'inputspec.template_skull_for_acpc')
workflow.connect(c.acpc_template_brain, 'local_path',
anat_preproc, 'inputspec.template_brain_only_for_acpc')
new_strat.append_name(anat_preproc.name)
new_strat.set_leaf_properties(anat_preproc, 'outputspec.brain')
new_strat.update_resource_pool({
'anatomical_brain': (anat_preproc, 'outputspec.brain'),
'anatomical_skull_leaf': (anat_preproc, 'outputspec.anat_skull_leaf'),
})
new_strat.update_resource_pool({
'anatomical_brain_mask': (anat_preproc, 'outputspec.brain_mask')
}, override=True)
new_strat_list += [new_strat]
continue
if already_skullstripped:
anat_preproc = create_anat_preproc(method=None,
already_skullstripped=True,
config=c,
acpc_target=acpc_target,
wf_name='anat_preproc_already_%d' % num_strat)
new_strat = strat.fork()
node, out_file = new_strat['anatomical']
workflow.connect(node, out_file,
anat_preproc, 'inputspec.anat')
workflow.connect(c.acpc_template_skull, 'local_path',
anat_preproc, 'inputspec.template_skull_for_acpc')
workflow.connect(c.acpc_template_brain, 'local_path',
anat_preproc, 'inputspec.template_brain_only_for_acpc')
new_strat.append_name(anat_preproc.name)
new_strat.set_leaf_properties(anat_preproc, 'outputspec.brain')