-
Notifications
You must be signed in to change notification settings - Fork 5
/
mpi.py
371 lines (327 loc) · 11.7 KB
/
mpi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
''' mpi implements common util functions based on mpi4py.
'''
import cPickle as pickle
import glob
import logging
import numpy as np
import os
import random
import socket
import sys
import time
# MPI
try:
from mpi4py import MPI
COMM = MPI.COMM_WORLD
except Exception, e:
sys.stderr.write(\
"Warning: I cannot import mpi4py. Using a dummpy single noded "\
"implementation instead. The program will run in single node mode "\
"even if you executed me with mpirun or mpiexec.\n")
sys.stderr.write("We STRONGLY recommend you to try to install mpi and "\
"mpi4py.\n")
sys.stderr.write("mpi4py exception message is:")
sys.stderr.write(repr(Exception) + repr(e))
from _mpi_dummy import COMM
RANK = COMM.Get_rank()
SIZE = COMM.Get_size()
_HOST_RAW = socket.gethostname()
# this is the hack that removes things like ".icsi.berkeley.edu"
if _HOST_RAW.find('.') == -1:
HOST = _HOST_RAW
else:
HOST = _HOST_RAW[:_HOST_RAW.find('.')]
_MPI_PRINT_MESSAGE_TAG = 560710
_MPI_BUFFER_LIMIT = 1073741824
# we need to set the random seed different for each mpi instance
random.seed(time.time() * RANK)
def mkdir(dirname):
'''make a directory. Avoid race conditions.
'''
try:
os.makedirs(dirname)
except OSError:
pass
except:
raise
def agree(decision):
"""agree() makes the decision consistent by propagating the decision of the
root to everyone
"""
return COMM.bcast(decision)
def elect():
'''elect() randomly chooses a node from all the nodes as the president.
Input:
None
Output:
the rank of the president
'''
president = COMM.bcast(np.random.randint(SIZE))
return president
def is_president():
''' Returns true if I am the president, otherwise return false
'''
return (RANK == elect())
def is_root():
'''returns if the current node is root
'''
return RANK == 0
def barrier(tag=0, sleep=0.01):
''' A better mpi barrier
The original MPI.comm.barrier() may cause idle processes to still occupy
the CPU, while this barrier waits.
'''
if SIZE == 1:
return
mask = 1
while mask < SIZE:
dst = (RANK + mask) % SIZE
src = (RANK - mask + SIZE) % SIZE
req = COMM.isend(None, dst, tag)
while not COMM.Iprobe(src, tag):
time.sleep(sleep)
COMM.recv(None, src, tag)
req.Wait()
mask <<= 1
def root_log_level(level, name = None):
"""set the log level on root.
Input:
level: the logging level, such as logging.DEBUG
name: (optional) the logger name
"""
if is_root():
logging.getLogger(name).setLevel(level)
def log_level(level, name = None):
"""set the log level on all nodes.
Input:
level: the logging level, such as logging.DEBUG
name: (optional) the logger name
"""
logging.getLogger(name).setLevel(level)
def safe_send_matrix(mat, dest=0, tag=0):
"""A safe send that deals with the mpi4py 2GB limit. should be paired with
safe_recv_matrix. The input mat should be C_CONTIGUOUS. To be safe, we send
the matrix in 1GB chunks.
"""
num_batches = int((mat.nbytes - 1) / _MPI_BUFFER_LIMIT + 1)
if num_batches == 1:
COMM.Send(mat, dest, tag)
else:
logging.debug("The buffer is larger than 1GB, sending in chunks...")
batch_size = int(mat.shape[0] / num_batches)
for i in range(num_batches):
COMM.Send(mat[batch_size*i:batch_size*(i+1)], dest, tag)
# send the remaining part
if mat.shape[0] > batch_size * num_batches:
COMM.Send(mat[batch_size * num_batches:], dest, tag)
def safe_recv_matrix(mat, source=0, tag=0, status=None):
"""A safe recv that deals with the mpi4py 2GB limit. should be paired with
safe_send_matrix. The input mat should be C_CONTIGUOUS. To be safe, we recv
the matrix in 1GB chunks.
"""
num_batches = int((mat.nbytes - 1) / _MPI_BUFFER_LIMIT + 1)
if num_batches == 1:
COMM.Recv(mat, source, tag, status)
else:
logging.debug("The buffer is larger than 1GB, sending in chunks...")
batch_size = int(mat.shape[0] / num_batches)
for i in range(num_batches):
COMM.Recv(mat[batch_size*i:batch_size*(i+1)], source, tag, status)
# send the remaining part
if mat.shape[0] > batch_size * num_batches:
COMM.Recv(mat[batch_size * num_batches:], source, tag, status)
def get_segments(total, inverse = False):
"""Get the segments for each local node.
Input:
inverse: (optional) if set True, also return the inverse index of each
element in 0:total.
Output:
segments: a list of size SIZE+1, where segments[i]:segments[i+1]
specifies the range that the local node is responsible for.
inv: (only if inverse=True) a list of size total, where inv[i] is the
rank of the node that is responsible for element i.
"""
if total < SIZE:
raise ValueError, \
"The total number %d should be larger than the mpi size %d." % \
(total, SIZE)
segments = [int(total * i / float(SIZE)) for i in range(SIZE+1)]
if inverse:
inv = sum([[i] * (segments[i+1] - segments[i])
for i in range(SIZE)], [])
return segments, inv
else:
return segments
def distribute(mat):
"""Distributes the mat from root to individual nodes
The data will be distributed along the first axis, as even as possible.
You should make sure that the matrix is in C-contiguous format.
"""
# quick check
if SIZE == 1:
return mat
if is_root():
shape = mat.shape[1:]
segments = get_segments(mat.shape[0])
dtype = mat.dtype
else:
shape = None
segments = None
dtype = None
shape = COMM.bcast(shape)
dtype = COMM.bcast(dtype)
segments = COMM.bcast(segments)
if is_root():
if mat.flags['C_CONTIGUOUS'] != True:
logging.warning('Warning: mat is not contiguous.')
mat = np.ascontiguousarray(mat)
for i in range(1,SIZE):
safe_send_matrix(mat[segments[i]:segments[i+1]], dest=i)
data = mat[:segments[1]].copy()
else:
data = np.empty((segments[RANK+1] - segments[RANK],) + shape,
dtype = dtype)
safe_recv_matrix(data)
return data
def distribute_list(source):
"""Distributes the list from root to individual nodes
"""
# quick check
if SIZE == 1:
return source
if is_root():
length = len(source)
if length == 0:
logging.warning("Warning: List has length 0")
else:
length = 0
length = COMM.bcast(length)
if length == 0:
return []
segments = get_segments(length)
if is_root():
for i in range(1,SIZE):
send_list = source[segments[i]:segments[i+1]]
COMM.send(send_list, dest=i)
data = source[:segments[1]]
del source
else:
data = COMM.recv()
return data
def dump_matrix(mat, filename):
"""Dumps the matrix distributed over machines to one single file.
We do NOT recommend using this - it causes a lot of communications since
all data need to be transferred to root before writing to disk. Instead,
use dump_matrix_multi which stores the matrix to multiple chunks.
"""
if SIZE == 1:
with open(filename,'w') as fid:
np.save(fid, mat)
else:
mat_sizes = COMM.gather(mat.shape[0])
if is_root():
total_size = sum(mat_sizes)
mat_reduced = np.empty((total_size,) + mat.shape[1:],
dtype = mat.dtype)
start = mat_sizes[0]
mat_reduced[:start] = mat
for i in range(1,SIZE):
safe_recv_matrix(mat_reduced[start:start+mat_sizes[i]],
source = i)
start += mat_sizes[i]
with open(filename,'w') as fid:
np.save(fid, mat_reduced)
else:
safe_send_matrix(mat, dest = 0)
barrier()
def load_matrix(filename):
"""Load a matrix from a single matrix, and distribute it to each node
numpy supports memmap so each node will simply load its own part
"""
if SIZE == 1:
try:
data = np.load(filename)
except IOError:
data = np.load(filename + '.npy')
return data
try:
raw_data = np.load(filename, mmap_mode = 'r')
except IOError:
# we try to load the filename with '.npy' affix. If we fail again,
# raise IOError.
raw_data = np.load(filename + '.npy', mmap_mode = 'r')
total_size = raw_data.shape[0]
segments = get_segments(total_size)
data = np.empty((segments[RANK+1] - segments[RANK],) + raw_data.shape[1:])
data[:] = raw_data[segments[RANK]:segments[RANK+1]]
barrier()
return data
def dump_matrix_multi(mat, filename):
"""Dumps the matrix distributed over machines to multiple files, one per
MPI node.
"""
if SIZE > 99999:
# this usually won't happen, but we leave the sanity check here
raise ValueError, 'I cannot deal with too many MPI instances.'
logging.debug("Dumping the matrix to %d parts" % SIZE)
my_filename = '%s-%05d-of-%05d.npy' % (filename, RANK, SIZE)
mkdir(os.path.dirname(filename))
np.save(my_filename, mat)
def load_matrix_multi(filename, N = None):
"""Loads the matrix previously dumped by dump_matrix_multi. The MPI size
might be different. The stored files are in the format
filename-xxxxx-of-xxxxx, which we obtain using glob.
Input:
N: (optional) if given, specify the number of parts the matrix is
separated too. Otherwise, the number is automatically inferred by
listing all the files using regexp matching.
"""
files= glob.glob('%s-?????-of-?????.npy' % (filename))
N = len(files)
logging.debug("Loading the matrix from %d parts" % N)
# we will load the length of the data, and then try to distribute them
# as even as possible.
if RANK == 0:
# the root will first taste each file
sizes = np.array([np.load('%s-%05d-of-%05d.npy' % (filename, i, N),
mmap_mode='r').shape[0]
for i in range(N)])
temp = np.load('%s-%05d-of-%05d.npy' % (filename, 0, N),
mmap_mode='r')
shape = temp.shape[1:]
dtype = temp.dtype
else:
sizes = None
shape = None
dtype = None
barrier()
sizes = COMM.bcast(sizes)
shape = COMM.bcast(shape)
dtype = COMM.bcast(dtype)
total = sizes.sum()
segments = get_segments(total)
# now, each node opens the file that overlaps with its data, and reads
# the contents.
my_start = segments[RANK]
my_end = segments[RANK+1]
my_size = my_end - my_start
mat = np.empty((my_size,) + shape, dtype = dtype)
mat = np.empty((my_size,) + shape)
f_start = 0
f_end = 0
for i, size in enumerate(sizes):
f_end += size
if f_start < my_end and f_end > my_start:
file_mat = np.load('%s-%05d-of-%05d.npy' % (filename, i, N),
mmap_mode='r')
mat[max(f_start - my_start, 0):\
min(f_end - my_start, my_size)] = \
file_mat[max(my_start - f_start,0):\
min(my_end - f_start, size)]
f_start += size
return mat
def root_pickle(obj, filename):
if is_root():
pickle.dump(obj, open(filename, 'w'))
if __name__ == "__main__":
pass