-
Notifications
You must be signed in to change notification settings - Fork 2
/
dac2bids.py
executable file
·313 lines (256 loc) · 10.8 KB
/
dac2bids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env python
# Daniel Gomez 29.08.2016
# generates configuration for dcm2niibatch
# from a nested folder with dicom files.
import random
import re
import os.path
import optparse
import yaml
import dicom
# will need to create a nifti file for each directory in folder.
# note the implicit assumption that all folders contain only one series,
# echo or whatever.
# to do that, check some information from the dicom.
# to build a BIDS file, we need the following info:
# 1. is the nifti going to anat, func or fmap?
# 2. does it belong to MB acquisition or to the mbme?
# 3. what echo is it?
# 4. is it a magnitude or phase image?!
# 5. if func, is it resting-state or task? For that we'll have to
# look at the protocol name. Isn't it a terrible design error that
# we cannot know what the data was used for from the DICOMs?
# get all directories in a given folder
def lsdirs(currfolder):
"""
List all directories in a folder.
Ignore files.
"""
return filter(lambda x:
os.path.isdir(os.path.join(currfolder, x)),
os.listdir(currfolder))
def mag_or_phase(dicomImgTypeTag):
"""
Tests if a DICOM image is magnitude or phase.
dicomImgTypeTag is the string from the DICOM ImgTypeTag.
"""
# Magnitude or phase?
if 'M' in dicomImgTypeTag:
return 'magnitude'
elif 'P' in dicomImgTypeTag:
return 'phase'
else:
return 'unknown'
RANDOM_FILE_MEMO = {}
def get_random_file(folder):
"""
Returns a random file from a folder.
It memorizes the file so that multiple calls to the function always return the same file.
(For Python 3 we could have used @functools.lru_cache(maxsize=None) as a decorator and avoid
having memo code in the implementation)
"""
if folder not in RANDOM_FILE_MEMO:
RANDOM_FILE_MEMO[folder] = random.choice(os.listdir(folder))
return os.path.join(folder, RANDOM_FILE_MEMO[folder])
def parse_from_x_protocol(pattern, dicomfile):
"""
Siemens writes a protocol structure as text into each DICOM file.
This structure is necessary to recreate a scanning protocol from a DICOM, since the
DICOM information alone wouldn't be sufficient.
This function extracts values from the dicomfile according to a given pattern.
"""
with open(dicomfile, 'rb') as openfile:
regex = '^' + pattern + '\t = \t(.*)\n'
rx = re.compile(regex.encode('utf-8'))
for line in openfile:
match = rx.match(line)
if match:
return int(match.group(1).decode('utf-8'))
def get_number_of_repetitions_from_x_protocol(dicomfile):
return parse_from_x_protocol('lRepetitions', dicomfile)
def get_number_of_echoes_from_x_protocol(dicomfile):
return parse_from_x_protocol('lContrasts', dicomfile)
def is_incomplete_acquisition(folder):
"""
If a scan was aborted in the middle of the experiment, it is likely that DICOMs will
land in the PACS anyway. We want to avoid converting these incomplete directories.
This function checks the number of measurements specified in the protocol against the
number of DICOMs.
"""
randfile = get_random_file(folder)
nrep = get_number_of_repetitions_from_x_protocol(randfile)
if nrep is None:
return False
return nrep > len(os.listdir(folder)) - 1
def is_multiecho(folder):
randfile = get_random_file(folder)
return get_number_of_echoes_from_x_protocol(randfile) > 1
# NEEDS refactoring.
def parse_protocols(currfolder, taskname="task-unknown"):
"""
Takes a random DICOM image from currfolder and extracts
relevant information for dcm2niix conversion (and for the BIDS format.)
"""
# Initialize empty currfolder.
dirs = dict.fromkeys(lsdirs(currfolder))
dirs = {k: v for k, v in dirs.items() if 'localizer' not in k}
for protocol in dirs.keys():
# Get a random dicom from the folder.
filepath = os.path.join(currfolder,
protocol,
random.choice(os.listdir(
os.path.join(currfolder, protocol))))
try:
print('reading ' + filepath)
dicomfile = dicom.read_file(filepath)
except IOError:
print('File in folder ' + protocol + ' is not a dicom.')
continue
# Attempt to grab enough information from dicoms.
try:
seq = dicomfile.ScanningSequence
echo = dicomfile.EchoNumbers
desc = dicomfile.SeriesDescription
imgtype = dicomfile.ImageType
seqname = dicomfile.SequenceName
# prot = dicomfile.ProtocolName
except AttributeError:
print('Found weird dicom in folder ' + protocol)
# Check for special physiological dicoms.
try:
# imgcomment = dicomfile.ImageComments
imgtype = dicomfile.ImageType
outfolder = 'physio'
seq = 'physio'
echo = 0
experiment = 'physio'
# If it is weird and isn't Physio, skip folder.
except AttributeError:
print('Found a really weird dicom... Check manually.')
# Skip folder.
continue
# Simple logic for the information to create output filename.
acq = ''
# Functional images
if seq == 'EP':
outfolder = 'func'
if 'Resting' in desc:
experiment = 'task-rest'
else:
experiment = taskname
if get_number_of_echoes_from_x_protocol(filepath) > 1:
acq = 'acq-mbme'
else:
acq = 'acq-mb'
elif 'GR' in seq and 'IR' not in seq:
# Field map
if seqname == '*fm2d2r':
outfolder = 'fmap'
experiment = ''
# T2 star mapping
elif seqname == '*fl3d11r':
outfolder = 'anat'
experiment = 'T2starw'
elif 'Scout' in seqname:
outfolder = 'unknown'
experiment = 'unknown'
else:
outfolder = 'unknown'
experiment = 'unknown'
elif 'IR' in seq:
outfolder = 'anat'
experiment = 'T1w'
elif outfolder not in {'anat', 'func', 'fmap'}:
outfolder = 'unknown'
experiment = 'unknown'
else:
outfolder = 'unknown'
experiment = 'unknown'
imgtype = mag_or_phase(imgtype)
# add to dict
dirs[protocol] = {'imgtype': imgtype,
'outfolder': outfolder,
'experiment': experiment,
'echo': echo,
'acq': acq}
return dirs
def bids_opts():
"""
OPTS is a structure accepted by dcm2niibatch converter.
This function returns a reasonable configuration for BIDS.
"""
opts = {'isGz': True,
'isFlipY': False,
'isVerbose': False,
'isCreateBIDS': True,
'isAnonymizeBIDS': True,
'isOnlyBIDS': False,
'isOnlySingleFile': False}
return opts
def create_yaml(inputfolder, outputfolder, subnum=0, sesnum=0, skipfmap=False,
taskname="task-unknown"):
"""
Generate a yaml file compatible with dcm2niibatch and the BIDS format.
Takes an inputfolder tree containing folders with dicom files.
Folders in inputfolder are assumed to contain a single dataset.
"""
inputdict = parse_protocols(inputfolder, taskname=taskname)
# formatted subject number and session number
sub = 'sub-' + '%02d' % (subnum,)
ses = 'ses-' + '%02d' % (sesnum,)
# The pydict will be converted to a yaml file.
pydict = {'Options': bids_opts(), 'Files': []}
for protocol, config in inputdict.items():
echonum = '%02d' % (config['echo'],)
inputdirectory = os.path.join(inputfolder, protocol)
if is_incomplete_acquisition(inputdirectory):
print("Warning: #scans specified in protocol not equal to data size in" + inputdirectory)
outputdirectory = os.path.join(outputfolder, sub, ses, config['outfolder'])
filename = sub + '_' + ses
if config['outfolder'] == 'func':
filename += '_' + config['experiment'] + '_' + config['acq'] + echonum
filename += '_bold'
if config['outfolder'] == 'anat':
filename += '_' + config['experiment']
if config['experiment'] == 'T2starw':
filename += '_' + config['imgtype'] + echonum
if config['outfolder'] == 'fmap' and not skipfmap:
filename += '_' + config['imgtype'] + echonum
if config['outfolder'] is not 'unknown':
filedict = {'in_dir': os.path.abspath(inputdirectory),
'out_dir': os.path.abspath(outputdirectory),
'filename': filename}
pydict['Files'].append(filedict)
return yaml.safe_dump(pydict, default_flow_style=False)
def try_to_get_subject(directory):
return int(re.search('sub-\d(\d\d)', directory)[1])
def try_to_get_session(directory):
return int(re.search('ses-(\d\d)', directory)[1])
# NEEDS refactoring. Change deprecated OptionParser to ArgParser.
def main():
p = optparse.OptionParser()
p.add_option('--inputfolder', '-i', default='.', help="Input folder of dicoms")
p.add_option('--outputfolder', '-o', default='./out/', help="Output folder for niftis")
#p.add_option('--yaml', '-y', default='./batch.yaml', help="Name of generated config file.")
p.add_option('--sub', '-s', default=None, type="int", help="The subject number. If None, try to grep")
p.add_option('--ses', '-e', default=None, type="int", help="The session number. if None, try to grep")
p.add_option('--skipfmap', '-f', action="store_true", help="Skips fieldmaps.")
p.add_option('--taskname', '-t', default="task-unknown", help="The task name")
options, arguments = p.parse_args()
if not 'task-' in options.taskname:
options.taskname = 'task-'+options.taskname
if options.sub is None:
options.sub = try_to_get_subject(options.inputfolder)
if options.ses is None:
options.ses = try_to_get_session(options.inputfolder)
yamlcontent = create_yaml(options.inputfolder,
options.outputfolder,
subnum=options.sub,
sesnum=options.ses,
skipfmap=options.skipfmap,
taskname=options.taskname)
yamlname = './sub-' + str(options.sub) + '_ses-' + str(options.ses) + '.yaml'
with open(yamlname, 'w') as yamlfile:
yamlfile.write(yamlcontent)
if __name__ == '__main__':
main()