-
Notifications
You must be signed in to change notification settings - Fork 2
/
command_launchers.py
94 lines (77 loc) · 2.82 KB
/
command_launchers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import subprocess
import time
import torch
import sys
import getpass
from pathlib import Path
def local_launcher(commands):
"""Launch commands serially on the local machine."""
for cmd in commands:
subprocess.call(cmd, shell=True)
def dummy_launcher(commands):
"""
Doesn't run anything; instead, prints each command.
Useful for testing.
"""
for cmd in commands:
print(f'Dummy launcher: {cmd}')
def multi_gpu_launcher(commands):
"""
Launch commands on the local machine, using all GPUs in parallel.
"""
print('WARNING: using multi_gpu_launcher.')
n_gpus = torch.cuda.device_count()
procs_by_gpu = [None] * n_gpus
while len(commands) > 0:
for gpu_idx in range(n_gpus):
proc = procs_by_gpu[gpu_idx]
if (proc is None) or (proc.poll() is not None):
# Nothing is running on this GPU; launch a command.
cmd = commands.pop(0)
new_proc = subprocess.Popen(f'CUDA_VISIBLE_DEVICES={gpu_idx} {cmd}', shell=True)
procs_by_gpu[gpu_idx] = new_proc
break
time.sleep(1)
# Wait for the last few tasks to finish before returning
for p in procs_by_gpu:
if p is not None:
p.wait()
def slurm_launcher(commands, output_dirs, max_slurm_jobs=12):
for output_dir, cmd in zip(output_dirs, commands):
block_until_running(max_slurm_jobs, getpass.getuser())
# out_str = subprocess.call(cmd, shell=True)
out_str = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE).stdout.decode(sys.stdout.encoding)
print(out_str.strip())
if output_dir:
try:
job_id = int(out_str.split(' ')[-1])
except (IndexError, ValueError, AttributeError):
print("Error in Slurm submission, exiting.")
sys.exit(0)
(Path(output_dir)/'job_id').write_text(str(job_id))
def get_slurm_jobs(user):
# returns a list of jobs IDs for (queued and waiting, running)
out = subprocess.run(['squeue -u ' + user], shell=True, stdout=subprocess.PIPE).stdout.decode(sys.stdout.encoding)
a = list(filter(lambda x: len(x) > 0, map(lambda x: x.split(), out.split('\n'))))
queued, running = [], []
for i in a:
if i[0].isnumeric():
if i[4].strip() == 'PD':
queued.append(int(i[0]))
else:
running.append(int(i[0]))
return queued, running
def block_until_running(n, user):
while True:
queued, running = get_slurm_jobs(user)
if len(queued) + len(running) < n:
time.sleep(0.2)
return True
else:
time.sleep(10)
REGISTRY = {
'local': local_launcher,
'dummy': dummy_launcher,
'multi_gpu': multi_gpu_launcher,
'slurm': slurm_launcher
}