From 4c2774f5b57aac4bac83b6e2a9b117ea0cea2a0e Mon Sep 17 00:00:00 2001 From: Jaron Krogel Date: Wed, 20 Mar 2019 07:27:27 -0400 Subject: [PATCH] nexus: modifications to handle dual nature of cori --- nexus/lib/machines.py | 90 +++++++++++++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 25 deletions(-) diff --git a/nexus/lib/machines.py b/nexus/lib/machines.py index 514b30e513..81e2fc5ea4 100644 --- a/nexus/lib/machines.py +++ b/nexus/lib/machines.py @@ -225,6 +225,7 @@ def __init__(self, processes_per_node = None, email = None, constraint = None, # slurm specific, Cori + core_spec = None, # slurm specific, Cori alloc_flags = None, # lsf specific, Summit fake = False, ): @@ -271,6 +272,7 @@ def __init__(self, self.account = account self.email = email self.constraint = constraint + self.core_spec = core_spec self.alloc_flags = alloc_flags self.internal_id = None self.system_id = None @@ -1393,6 +1395,9 @@ def process_job(self,job): if job.fake_job: return #end if + + self.pre_process_job(job) + job.subfile = job.name+'.'+self.sub_launcher+'.in' no_cores = job.cores is None no_nodes = job.nodes is None @@ -1432,14 +1437,15 @@ def process_job(self,job): job.ppn = self.cores_per_node #end if - if job.account is None and self.requires_account: - if self.account is None: + if job.account is None: + if self.account is not None: + job.account = self.account + elif self.requires_account: self.error('account not specified for job on '+self.name) #end if - job.account = self.account #end if - self.process_job_extra(job) + self.post_process_job(job) job.set_environment(OMP_NUM_THREADS=job.threads) @@ -1494,16 +1500,21 @@ def process_job_options(self,job): p = '-o '+str(0), ) elif launcher=='jsrun': # Summit - None # Summit class takes care of this in process_job_extra + None # Summit class takes care of this in post_process_job else: self.error(launcher+' is not yet implemented as an application launcher') #end if #end def process_job_options - def process_job_extra(self,job): + def pre_process_job(self,job): + None + #end def pre_process_job + + + def post_process_job(self,job): None - #end def process_job_extra + #end def post_process_job def query_queue(self,out=None): @@ -2067,30 +2078,53 @@ class Edison(NerscMachine): class Cori(NerscMachine): name = 'cori' - def write_job_header(self,job): + def pre_process_job(self,job): if job.queue is None: job.queue = 'regular' #end if if job.constraint is None: - job.constraint = 'knl,quad,cache' + job.constraint = 'knl' #end if + # account for dual nature of Cori + if 'knl' in job.constraint: + self.nodes = 9688 + self.procs_per_node = 1 + self.cores_per_node = 68 + self.ram_per_node = 96 + elif 'haswell' in job.constraint: + self.nodes = 2388 + self.procs_per_node = 2 + self.cores_per_node = 32 + self.ram_per_node = 128 + else: + self.error('SLURM input "constraint" must contain either "knl" or "haswell"\nyou provided: {0}'.format(job.constraint)) + #end if + if job.core_spec is not None: + self.cores_per_node -= job.core_spec + #end if + #end def pre_process_job + + def write_job_header(self,job): + self.pre_process_job(job) # sync machine view with job if 'knl' in job.constraint: - cores_per_node = 68 hyperthreads = 4 elif 'haswell' in job.constraint: - cores_per_node = 32 hyperthreads = 2 else: self.error('SLURM input "constraint" must contain either "knl" or "haswell"\nyou provided: {0}'.format(job.constraint)) #end if + cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node))*hyperthreads c='#!/bin/bash\n' + if job.account is not None: + c+= '#SBATCH -A '+job.account+'\n' + #end if c+='#SBATCH -p '+job.queue+'\n' c+='#SBATCH -C '+str(job.constraint)+'\n' c+='#SBATCH -J '+str(job.name)+'\n' c+='#SBATCH -t '+job.sbatch_walltime()+'\n' c+='#SBATCH -N '+str(job.nodes)+'\n' - c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node) - c+='#SBATCH --cpus-per-task={0}\n'.format(int(floor(float(cores_per_node)/job.processes_per_node))*hyperthreads) + c+='#SBATCH --tasks-per-node={0}\n'.format(job.processes_per_node) + c+='#SBATCH --cpus-per-task={0}\n'.format(cpus_per_task) c+='#SBATCH -o '+job.outfile+'\n' c+='#SBATCH -e '+job.errfile+'\n' if job.user_env: @@ -2102,6 +2136,12 @@ def write_job_header(self,job): echo $SLURM_SUBMIT_DIR cd $SLURM_SUBMIT_DIR ''' + if job.threads>1: + c+=''' +export OMP_PROC_BIND=true +export OMP_PLACES=threads +''' + #end if return c #end def write_job_header #end class Cori @@ -2201,14 +2241,14 @@ class EOS(Supercomputer): requires_account = True batch_capable = True - def process_job_extra(self,job): + def post_process_job(self,job): if job.threads>1: if job.threads<=8: job.run_options.add(ss='-ss') #end if job.run_options.add(cc='-cc numa_node') #end if - #end def process_job_extra + #end def post_process_job def write_job_header(self,job): @@ -2248,7 +2288,7 @@ class ALCF_Machine(Supercomputer): base_partition = None - def process_job_extra(self,job): + def post_process_job(self,job): job.sub_options.add( env = '--env BG_SHAREDMEMSIZE=32', mode = '--mode script' @@ -2270,7 +2310,7 @@ def process_job_extra(self,job): elif job.processes_per_node not in valid_ppn: self.warn('job may not run properly\nprocesses_per_node is not a valid value for {0}\nprocesses_per_node provided: {1}\nvalid options are: {2}'.format(self.name,job.processes_per_node,valid_ppn)) #end if - #end def process_job_extra + #end def post_process_job def write_job_header(self,job): if job.queue is None: @@ -2316,7 +2356,7 @@ class Cooley(Supercomputer): outfile_extension = '.output' errfile_extension = '.error' - def process_job_extra(self,job): + def post_process_job(self,job): #if job.processes_per_node is None and job.threads!=1: # self.error('threads must be 1,2,3,4,6, or 12 on Cooley\nyou provided: {0}'.format(job.threads)) ##end if @@ -2326,7 +2366,7 @@ def process_job_extra(self,job): #ppn = '-ppn {0}'.format(job.processes_per_node), #) return - #end def process_job_extra + #end def post_process_job def write_job_header(self,job): if job.queue is None: @@ -2353,7 +2393,7 @@ class Theta(Supercomputer): outfile_extension = '.output' errfile_extension = '.error' - def process_job_extra(self,job): + def post_process_job(self,job): if job.hyperthreads is None: job.hyperthreads = 1 #end if @@ -2364,7 +2404,7 @@ def process_job_extra(self,job): j = '-j {0}'.format(job.hyperthreads), e = '-e OMP_NUM_THREADS={0}'.format(job.threads), ) - #end def process_job_extra + #end def post_process_job def write_job_header(self,job): if job.queue is None: @@ -2996,12 +3036,12 @@ class Cades(Supercomputer): requires_account = True batch_capable = True - def process_job_extra(self,job): + def post_process_job(self,job): if job.threads>1 and job.processes_per_node > 1: processes_per_socket = int(floor(job.processes_per_node/2)) job.run_options.add(npersocket='--npersocket {0}'.format(processes_per_socket)) #end if - #end def process_job_extra + #end def post_process_job def write_job_header(self,job): if job.queue is None: @@ -3039,7 +3079,7 @@ class Summit(Supercomputer): requires_account = True batch_capable = True - def process_job_extra(self,job): + def post_process_job(self,job): # add the options only if the user has not supplied options if len(job.run_options)==0: opt = obj( @@ -3084,7 +3124,7 @@ def process_job_extra(self,job): ) job.run_options.add(**opt) #end if - #end def process_job_extra + #end def post_process_job def write_job_header(self,job):