Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus: update for Cori #1463

Merged
merged 2 commits into from
Mar 20, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 65 additions & 25 deletions nexus/lib/machines.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def __init__(self,
processes_per_node = None,
email = None,
constraint = None, # slurm specific, Cori
core_spec = None, # slurm specific, Cori
alloc_flags = None, # lsf specific, Summit
fake = False,
):
Expand Down Expand Up @@ -271,6 +272,7 @@ def __init__(self,
self.account = account
self.email = email
self.constraint = constraint
self.core_spec = core_spec
self.alloc_flags = alloc_flags
self.internal_id = None
self.system_id = None
Expand Down Expand Up @@ -1393,6 +1395,9 @@ def process_job(self,job):
if job.fake_job:
return
#end if

self.pre_process_job(job)

job.subfile = job.name+'.'+self.sub_launcher+'.in'
no_cores = job.cores is None
no_nodes = job.nodes is None
Expand Down Expand Up @@ -1432,14 +1437,15 @@ def process_job(self,job):
job.ppn = self.cores_per_node
#end if

if job.account is None and self.requires_account:
if self.account is None:
if job.account is None:
if self.account is not None:
job.account = self.account
elif self.requires_account:
self.error('account not specified for job on '+self.name)
#end if
job.account = self.account
#end if

self.process_job_extra(job)
self.post_process_job(job)

job.set_environment(OMP_NUM_THREADS=job.threads)

Expand Down Expand Up @@ -1494,16 +1500,21 @@ def process_job_options(self,job):
p = '-o '+str(0),
)
elif launcher=='jsrun': # Summit
None # Summit class takes care of this in process_job_extra
None # Summit class takes care of this in post_process_job
else:
self.error(launcher+' is not yet implemented as an application launcher')
#end if
#end def process_job_options


def process_job_extra(self,job):
def pre_process_job(self,job):
None
#end def pre_process_job


def post_process_job(self,job):
None
#end def process_job_extra
#end def post_process_job


def query_queue(self,out=None):
Expand Down Expand Up @@ -2067,30 +2078,53 @@ class Edison(NerscMachine):
class Cori(NerscMachine):
name = 'cori'

def write_job_header(self,job):
def pre_process_job(self,job):
if job.queue is None:
job.queue = 'regular'
#end if
if job.constraint is None:
job.constraint = 'knl,quad,cache'
job.constraint = 'knl'
#end if
# account for dual nature of Cori
if 'knl' in job.constraint:
self.nodes = 9688
self.procs_per_node = 1
self.cores_per_node = 68
self.ram_per_node = 96
elif 'haswell' in job.constraint:
self.nodes = 2388
self.procs_per_node = 2
self.cores_per_node = 32
self.ram_per_node = 128
else:
self.error('SLURM input "constraint" must contain either "knl" or "haswell"\nyou provided: {0}'.format(job.constraint))
#end if
if job.core_spec is not None:
self.cores_per_node -= job.core_spec
#end if
#end def pre_process_job

def write_job_header(self,job):
self.pre_process_job(job) # sync machine view with job
if 'knl' in job.constraint:
cores_per_node = 68
hyperthreads = 4
elif 'haswell' in job.constraint:
cores_per_node = 32
hyperthreads = 2
else:
self.error('SLURM input "constraint" must contain either "knl" or "haswell"\nyou provided: {0}'.format(job.constraint))
#end if
cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node))*hyperthreads
c='#!/bin/bash\n'
if job.account is not None:
c+= '#SBATCH -A '+job.account+'\n'
#end if
c+='#SBATCH -p '+job.queue+'\n'
c+='#SBATCH -C '+str(job.constraint)+'\n'
c+='#SBATCH -J '+str(job.name)+'\n'
c+='#SBATCH -t '+job.sbatch_walltime()+'\n'
c+='#SBATCH -N '+str(job.nodes)+'\n'
c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(int(floor(float(cores_per_node)/job.processes_per_node))*hyperthreads)
c+='#SBATCH --tasks-per-node={0}\n'.format(job.processes_per_node)
c+='#SBATCH --cpus-per-task={0}\n'.format(cpus_per_task)
c+='#SBATCH -o '+job.outfile+'\n'
c+='#SBATCH -e '+job.errfile+'\n'
if job.user_env:
Expand All @@ -2102,6 +2136,12 @@ def write_job_header(self,job):
echo $SLURM_SUBMIT_DIR
cd $SLURM_SUBMIT_DIR
'''
if job.threads>1:
c+='''
export OMP_PROC_BIND=true
export OMP_PLACES=threads
'''
#end if
return c
#end def write_job_header
#end class Cori
Expand Down Expand Up @@ -2201,14 +2241,14 @@ class EOS(Supercomputer):
requires_account = True
batch_capable = True

def process_job_extra(self,job):
def post_process_job(self,job):
if job.threads>1:
if job.threads<=8:
job.run_options.add(ss='-ss')
#end if
job.run_options.add(cc='-cc numa_node')
#end if
#end def process_job_extra
#end def post_process_job


def write_job_header(self,job):
Expand Down Expand Up @@ -2248,7 +2288,7 @@ class ALCF_Machine(Supercomputer):

base_partition = None

def process_job_extra(self,job):
def post_process_job(self,job):
job.sub_options.add(
env = '--env BG_SHAREDMEMSIZE=32',
mode = '--mode script'
Expand All @@ -2270,7 +2310,7 @@ def process_job_extra(self,job):
elif job.processes_per_node not in valid_ppn:
self.warn('job may not run properly\nprocesses_per_node is not a valid value for {0}\nprocesses_per_node provided: {1}\nvalid options are: {2}'.format(self.name,job.processes_per_node,valid_ppn))
#end if
#end def process_job_extra
#end def post_process_job

def write_job_header(self,job):
if job.queue is None:
Expand Down Expand Up @@ -2316,7 +2356,7 @@ class Cooley(Supercomputer):
outfile_extension = '.output'
errfile_extension = '.error'

def process_job_extra(self,job):
def post_process_job(self,job):
#if job.processes_per_node is None and job.threads!=1:
# self.error('threads must be 1,2,3,4,6, or 12 on Cooley\nyou provided: {0}'.format(job.threads))
##end if
Expand All @@ -2326,7 +2366,7 @@ def process_job_extra(self,job):
#ppn = '-ppn {0}'.format(job.processes_per_node),
#)
return
#end def process_job_extra
#end def post_process_job

def write_job_header(self,job):
if job.queue is None:
Expand All @@ -2353,7 +2393,7 @@ class Theta(Supercomputer):
outfile_extension = '.output'
errfile_extension = '.error'

def process_job_extra(self,job):
def post_process_job(self,job):
if job.hyperthreads is None:
job.hyperthreads = 1
#end if
Expand All @@ -2364,7 +2404,7 @@ def process_job_extra(self,job):
j = '-j {0}'.format(job.hyperthreads),
e = '-e OMP_NUM_THREADS={0}'.format(job.threads),
)
#end def process_job_extra
#end def post_process_job

def write_job_header(self,job):
if job.queue is None:
Expand Down Expand Up @@ -2996,12 +3036,12 @@ class Cades(Supercomputer):
requires_account = True
batch_capable = True

def process_job_extra(self,job):
def post_process_job(self,job):
if job.threads>1 and job.processes_per_node > 1:
processes_per_socket = int(floor(job.processes_per_node/2))
job.run_options.add(npersocket='--npersocket {0}'.format(processes_per_socket))
#end if
#end def process_job_extra
#end def post_process_job

def write_job_header(self,job):
if job.queue is None:
Expand Down Expand Up @@ -3039,7 +3079,7 @@ class Summit(Supercomputer):
requires_account = True
batch_capable = True

def process_job_extra(self,job):
def post_process_job(self,job):
# add the options only if the user has not supplied options
if len(job.run_options)==0:
opt = obj(
Expand Down Expand Up @@ -3084,7 +3124,7 @@ def process_job_extra(self,job):
)
job.run_options.add(**opt)
#end if
#end def process_job_extra
#end def post_process_job


def write_job_header(self,job):
Expand Down