-
Notifications
You must be signed in to change notification settings - Fork 5
/
start_jupyter.py
457 lines (397 loc) · 18.2 KB
/
start_jupyter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
import argparse
import os
import os.path as osp
import shutil
import subprocess
import stat
import sys
import time
from random import choices
from string import ascii_lowercase
import getpass
import socket
from typing import List
def _get_lc_nodes(partition: str) -> List[str]:
"""
Get the list of 'lc' (loosely coupled) nodes in the specified partition.
Args:
partition (str): The partition to check for 'lc' nodes.
Returns:
List[str]: The list of 'lc' node names.
"""
cmd = f"nodestatus {partition}"
try:
output = subprocess.check_output(cmd, universal_newlines=True, shell=True)
lines = output.split("\n")
lc_nodes = []
for line in lines:
columns = line.split()
if len(columns) >= 4 and "," in columns[3]:
features = columns[3].split(",")
if "lc" in features:
lc_nodes.append(columns[0])
return lc_nodes
except subprocess.CalledProcessError as e:
print(f"An error occurred while executing nodestatus: {e}")
return []
# check which machine I am on
hostname = socket.gethostname()
# automatically set default partition based on hostname
if 'midway3' in hostname:
default_partition = 'lgrandi'
on_midway3 = True
else:
default_partition = 'xenon1t'
on_midway3 = False
# the path to this file
ENVSTARTER_PATH = osp.dirname(osp.abspath(__file__))
# where you want to store sbatch and log files
OUTPUT_DIR_DALI = osp.expanduser('/dali/lgrandi/%s/straxlab'%(getpass.getuser()))
OUTPUT_DIR_MIDWAY = osp.expanduser('~/straxlab')
OUTPUT_DIR = {
'lgrandi': OUTPUT_DIR_MIDWAY,
'build': OUTPUT_DIR_MIDWAY,
'caslake': OUTPUT_DIR_MIDWAY,
'dali': OUTPUT_DIR_DALI,
'xenon1t': OUTPUT_DIR_MIDWAY,
'broadwl': OUTPUT_DIR_MIDWAY,
'kicp': OUTPUT_DIR_MIDWAY,
'bigmem2': OUTPUT_DIR_MIDWAY,
'gpu2': OUTPUT_DIR_MIDWAY,
}
# default home directories
HOME_MIDWAY = os.environ['HOME']
HOME_DALI = osp.expanduser('/dali/lgrandi/%s'%(getpass.getuser()))
HOME = {
'lgrandi': HOME_MIDWAY,
'build': HOME_MIDWAY,
'caslake': HOME_MIDWAY,
'dali': HOME_DALI,
'xenon1t': HOME_MIDWAY,
'broadwl': HOME_MIDWAY,
'kicp': HOME_MIDWAY,
'bigmem2': HOME_MIDWAY,
'gpu2': HOME_MIDWAY,
}
SHELL_SCRIPT = 'start_notebook.sh'
def printflush(x):
"""Does print(x, flush=True), also in python 2.x"""
print(x)
sys.stdout.flush()
SPLASH_SCREEN = r"""
__ __ ______ _ _ ____ _ _ _______
\ \ / /| ____|| \ | | / __ \ | \ | | |__ __|
\ V / | |__ | \| || | | || \| | _ __ | |
> < | __| | . ` || | | || . ` || '_ \ | |
/ . \ | |____ | |\ || |__| || |\ || | | || |
/_/ \_\|______||_| \_| \____/ |_| \_||_| |_||_|
The UChicago Analysis Center
"""
JOB_HEADER = """#!/bin/bash
#SBATCH --job-name=straxlab
#SBATCH --output={log_fn}
#SBATCH --error={log_fn}
#SBATCH --account=pi-lgrandi
#SBATCH --ntasks=1
#SBATCH --cpus-per-task={n_cpu}
#SBATCH --mem-per-cpu={mem_per_cpu}
#SBATCH --time={max_hours}:00:00
{extra_header}
export NUMEXPR_MAX_THREADS={n_cpu}
echo Starting jupyter job
"""
GPU_HEADER = """\
#SBATCH --partition=gpu2
#SBATCH --gres=gpu:1
module load cuda/10.1
"""
CPU_HEADER = """\
#SBATCH --qos {qos}
#SBATCH --partition {partition}
{reservation}
"""
# This is only if the user is NOT starting the singularity container
# (for singularity, starting jupyter is done in _xentenv_inner)
START_JUPYTER = """
JUP_PORT=$(( 15000 + (RANDOM %= 5000) ))
JUP_HOST=$(hostname -i)
echo $PYTHONPATH
jupyter {jupyter} --no-browser --port=$JUP_PORT --ip=$JUP_HOST --notebook-dir {notebook_dir} 2>&1
"""
SUCCESS_MESSAGE = """
All done! If you have linux, execute this command on your laptop:
ssh -fN -L {port}:{ip}:{port} {username}@dali-login2.rcc.uchicago.edu && sensible-browser http://localhost:{port}/{token}
If you have a mac, instead do:
ssh -fN -L {port}:{ip}:{port} {username}@dali-login2.rcc.uchicago.edu && open "http://localhost:{port}/{token}"
Happy strax analysis, {username}!
"""
def parse_arguments():
parser = argparse.ArgumentParser(
description='Start a strax jupyter notebook server on the dali batch queue')
parser.add_argument('--partition',
default=default_partition, type=str,
help="RCC/DALI partition to use. Try dali, broadwl, xenon1t, lgrandi, caslake or kicp. If you want to use midway3, then use 'lgrandi'.")
parser.add_argument('--bypass_reservation', '--bypass-reservation', '--skip_reservation', '--skip-reservation', '--no_reservation', '--no-reservation',
dest='bypass_reservation',
action='store_true',
help="Do not use the notebook reservation (useful if it is full)")
parser.add_argument('--node', help="Specify a node, if desired. By default no specification made")
parser.add_argument('--exclude_nodes',
default=None,
help="Specify nodes, which should be excluded, e.g., dali001,dali002 or dali0[28-30]")
parser.add_argument('--timeout',
default=120, type=int,
help='Seconds to wait for the jupyter server to start')
parser.add_argument('--cpu',
default=2, type=int,
help='Number of CPUs to request.')
parser.add_argument('--ram',
default=8000, type=int,
help='MB of RAM to request')
parser.add_argument('--gpu',
action='store_true', default=False,
help='Request to run on a GPU partition. Limits runtime to 2 hours.')
parser.add_argument('--env',
default='singularity',
choices=['singularity', 'cvmfs', 'backup'],
help='Environment to activate; defaults to "singularity" '
'to load XENONnT singularity container. '
'Passing "cvmfs" will use the conda environment installed in cvmfs, '
'using the --tag argument to determine which env exactly ')
parser.add_argument('--tag',
default='development',
help='Tagged environment to load'
'See wiki page https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:analysis:environments' # noqa
'Default: "development", or -- equivalently -- "latest"')
parser.add_argument('--max_hours',
default=None, type=float,
help='Max number of hours before the job expires. Defaults to 8 h for normal jobs and 2 for GPUs.') # noqa
parser.add_argument('--force_new', '--force-new',
dest='force_new',
action='store_true', default=False,
help='Start a new job even if you already have an old one running')
parser.add_argument('--jupyter',
choices=['lab', 'notebook'],
default='lab',
help='Use jupyter-lab or jupyter-notebook')
parser.add_argument('--notebook_dir', '--notebook-dir',
dest='notebook_dir',
default=os.environ['HOME'],
help='The working directory passed to jupyter')
parser.add_argument('--copy_tutorials', '--copy-tutorials',
dest='copy_tutorials',
action='store_true',
help='Copy tutorials to ~/strax_tutorials (if it does not exist)')
parser.add_argument('--local_cutax', '--cutax', '--local-cutax',
dest='local_cutax',
action='store_true',
help='Enable the usage of locally installed cutax')
parser.add_argument('--xenon_config', '--xenon-config',
default=None,
help='Enter the path of your xenon_config file if you want to replace the public one.')
return parser.parse_args()
def main():
args = parse_arguments()
print_flush(SPLASH_SCREEN)
# Dir for the sbatch and log files
os.makedirs(OUTPUT_DIR[args.partition], exist_ok=True)
if args.local_cutax:
os.environ['INSTALL_CUTAX'] = '0'
if args.copy_tutorials:
dest = os.path.join(OUTPUT_DIR[args.partition], 'strax_tutorials')
if osp.exists(dest):
print_flush("NOT copying tutorials, folder already exists")
else:
shutil.copytree(
'/dali/lgrandi/strax/straxen/notebooks/tutorials',
dest)
# If using default value for notebook_dir, switch to the dali
if args.notebook_dir == os.environ['HOME']:
print('Your HOME directory:', HOME[args.partition])
args.notebook_dir = HOME[args.partition]
if args.env == 'singularity':
s_container = 'xenonnt-%s.simg' % args.tag
batch_job = JOB_HEADER + \
"{env_starter}/{script} " \
"{s_container} {jupyter} {nbook_dir} {partition} {xenon_config}".format(env_starter=ENVSTARTER_PATH,
script=SHELL_SCRIPT,
s_container=s_container,
jupyter=args.jupyter,
nbook_dir=args.notebook_dir,
partition=args.partition,
xenon_config=args.xenon_config
)
elif args.env == 'cvmfs':
if args.partition == 'lgrandi':
raise Exception("Only singularity is supported on Midway3")
batch_job = (JOB_HEADER
+ "source /cvmfs/xenon.opensciencegrid.org/releases/nT/%s/setup.sh" % (args.tag)
+ START_JUPYTER.format(jupyter=args.jupyter,
notebook_dir=args.notebook_dir)
)
print_flush("Using conda from cvmfs (%s) instead of singularity container." % (args.tag))
elif args.env == 'backup':
if args.partition == 'lgrandi':
raise Exception("Only singularity is supported on Midway3")
if args.tag != 'development':
raise ValueError('I\'m going to give you the latest container, you cannot choose a version!')
batch_job = (JOB_HEADER
+ "source /dali/lgrandi/strax/miniconda3/bin/activate strax"
+ START_JUPYTER.format(jupyter=args.jupyter,
notebook_dir=args.notebook_dir)
)
print_flush("Using conda from cvmfs (%s) instead of singularity container." % (args.tag))
if args.partition == 'kicp':
qos = 'xenon1t-kicp'
else:
qos = args.partition
url = None
url_cache_fn = osp.join(
HOME[args.partition],
'.last_jupyter_url')
username = os.environ['USER']
# Check if a job is already running
q = subprocess.check_output(['squeue', '-u', username])
jobs = [line for line in q.decode().splitlines() if 'straxlab' in line]
job_ids = [int(job.split()[0]) for job in jobs]
unique_id = '' if len(job_ids) == 0 else '_' + get_unique_id()
if job_ids:
print_flush("You still have running straxlab jobs with ids [%s]!" % ",".join([str(id) for id in job_ids]))
for job_id in job_ids:
if not args.force_new:
print_flush("\tTrying to retrieve the URL for job %d from " % job_id + url_cache_fn)
print_flush("\tIf it doesn't work, login and cancel your job "
"so we can start a new one.")
with open(url_cache_fn) as f:
try:
cached_job_id, cached_url = f.read().split()
except Exception as e:
print_flush("\tProblem reading cache file! " + str(e))
print_flush("\tWell, we can still start a new job...")
else:
if int(cached_job_id) == job_id:
url = cached_url
else:
print_flush("\t... Unfortunately the cache file refers "
"to a different job, id %s" % cached_job_id)
if url is not None:
break
else:
print_flush("Submitting a new jupyter job")
_want_to_make_reservation = (args.partition == 'xenon1t'
and (not args.bypass_reservation))
if args.ram > 16000 and _want_to_make_reservation:
print_flush('You asked for more than 16 GB total memory you cannot use the notebook '
'reservation queue for this job! We will bypass the reservation.')
if args.cpu >= 8 and _want_to_make_reservation:
print_flush('You asked for more than 7 CPUs you cannot use the notebook reservation '
'queue for this job! We will bypass the reservation.')
use_reservation = (
(not args.force_new)
and _want_to_make_reservation
and args.cpu < 8
and args.ram <= 16000
and (not on_midway3)
)
job_fn = os.path.join(OUTPUT_DIR[args.partition], f'notebook{unique_id}.sbatch')
if not args.force_new:
log_fn = os.path.join(OUTPUT_DIR[args.partition], 'notebook.log')
else:
log_fn = os.path.join(OUTPUT_DIR[args.partition], f'notebook_forced{unique_id}.log')
if os.path.exists(log_fn):
os.remove(log_fn)
with open(job_fn, mode='w', encoding='utf-8') as f:
extra_header = (GPU_HEADER if args.gpu
else CPU_HEADER.format(partition=args.partition,
qos=qos,
reservation=('#SBATCH --reservation=xenon_notebook'
if use_reservation else '')))
if args.node:
extra_header += '\n#SBATCH --nodelist={node}'.format(node=args.node)
if args.exclude_nodes:
if args.exclude_nodes == 'lc':
# Get the list of 'lc' nodes
lc_nodes = _get_lc_nodes(args.partition)
# Convert the list of 'lc' nodes to a comma-separated string
exclude_nodes_str = ','.join(lc_nodes)
# Append the --exclude option to the extra_header
extra_header += '\n#SBATCH --exclude={exclude_nodes}'.format(exclude_nodes=exclude_nodes_str)
print(f"Excluding lc nodes: {exclude_nodes_str}")
else:
extra_header += '\n#SBATCH --exclude={exclude_nodes}'.format(exclude_nodes=args.exclude_nodes)
if args.max_hours is None:
max_hours = 2 if args.gpu else 8
else:
max_hours = int(args.max_hours)
f.write(batch_job.format(
log_fn=log_fn,
max_hours=max_hours,
extra_header=extra_header,
n_cpu=args.cpu,
mem_per_cpu=int(args.ram / args.cpu)))
make_executable(job_fn)
print_flush("\tSubmitting sbatch %s" % job_fn)
result = subprocess.check_output(['sbatch', job_fn])
print_flush("\tsbatch returned: %s" % result)
job_id = int(result.decode().split()[-1])
print_flush("\tYou have job id %d" % job_id)
print_flush("Waiting for your job to start")
print_flush("\tLooking for logfile %s" % log_fn)
while not osp.exists(log_fn):
print_flush("\tstill waiting...")
time.sleep(2)
print_flush("Job started. Logfile is displayed below; "
"we're looking for the jupyter URL.")
lines_shown = 0
slept = 0
url = None
while url is None and slept < args.timeout:
with open(log_fn, mode='r', encoding='utf-8') as f:
content = f.readlines()
for line_i, line in enumerate(content):
if line_i >= lines_shown:
print_flush('\t' + line.rstrip())
lines_shown += 1
if 'http' in line and not any([excluded in line for excluded in ['sylabs', 'github.com']]):
url = line.split()[-1]
break
else:
time.sleep(2)
slept += 2
if url is None:
raise RuntimeError("Jupyter did not start inside your job!")
print_flush("\nJupyter started succesfully")
print_flush("\tDumping URL %s to cache file %s" % (url, url_cache_fn))
with open(url_cache_fn, mode='w') as f:
f.write(str(job_id) + ' ' + url + '\n')
# The token is in the file, so we had better do...
os.chmod(url_cache_fn, stat.S_IRWXU)
print_flush("\tParsing URL %s" % url)
ip, port = url.split('/')[2].split(':')
if 'token' in url:
token = url.split('?')[1].split('=')[1]
token = '?token=' + token
else:
token = ''
# Check if many jobs are running
q = subprocess.check_output(['squeue', '-u', username])
jobs = [line for line in q.decode().splitlines() if 'straxlab' in line]
job_ids = [int(job.split()[0]) for job in jobs]
if len(job_ids) > 1:
print_flush("\nPlease consider stopping remaining straxlab jobs:")
for job in jobs:
print_flush("\t" + job)
print_flush(SUCCESS_MESSAGE.format(ip=ip, port=port, token=token, username=username))
def print_flush(x):
"""Does print(x, flush=True), also in python 2.x"""
print(x)
sys.stdout.flush()
def get_unique_id():
return ''.join(choices(ascii_lowercase, k=6))
def make_executable(path):
"""Make the file at path executable, see """
mode = os.stat(path).st_mode
mode |= (mode & 0o444) >> 2 # copy R bits to X
os.chmod(path, mode)
if __name__ == '__main__':
main()