Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add class for scr_log_event command #411

Merged
merged 4 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/pyfe/pyfe/cli/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ SET(SCRCLI
__init__.py
scr_index.py
scr_flush_file.py
scr_log.py
)
INSTALL(FILES ${SCRCLI} DESTINATION ${CMAKE_INSTALL_BINDIR}/pyfe/pyfe/cli)
1 change: 1 addition & 0 deletions scripts/pyfe/pyfe/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .scr_index import SCRIndex
from .scr_flush_file import SCRFlushFile
from .scr_log import SCRLog
7 changes: 4 additions & 3 deletions scripts/pyfe/pyfe/cli/scr_flush_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import os

from pyfe import scr_const
from pyfe.scr_common import runproc

class SCRFlushFile:
def __init__(self, bindir, prefix):
self.bindir = bindir # path to SCR bin directory
def __init__(self, prefix):
self.bindir = scr_const.X_BINDIR # path to SCR bin directory
self.prefix = prefix # path to SCR_PREFIX
self.exe = os.path.join(bindir, "scr_flush_file") + " --dir " + prefix
self.exe = os.path.join(self.bindir, "scr_flush_file") + " --dir " + prefix

# return list of output datasets
def list_dsets_output(self):
Expand Down
7 changes: 4 additions & 3 deletions scripts/pyfe/pyfe/cli/scr_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import os

from pyfe import scr_const
from pyfe.scr_common import runproc

class SCRIndex:
def __init__(self, bindir, prefix):
self.bindir = bindir # path to SCR bin directory
def __init__(self, prefix):
self.bindir = scr_const.X_BINDIR # path to SCR bin directory
self.prefix = prefix # path to SCR_PREFIX
self.exe = os.path.join(bindir, "scr_index") + " --prefix " + prefix
self.exe = os.path.join(self.bindir, "scr_index") + " --prefix " + prefix

# make named dataset as current
def current(self, name):
Expand Down
56 changes: 56 additions & 0 deletions scripts/pyfe/pyfe/cli/scr_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#! /usr/bin/env python3

import os

from pyfe import scr_const
from pyfe.scr_common import runproc

class SCRLog:
def __init__(self, prefix, jobid, jobname=None, user=None, jobstart=None):
self.bindir = scr_const.X_BINDIR # path to SCR bin directory

self.prefix = prefix # path to SCR_PREFIX
self.user = user
self.jobid = jobid
self.jobname = jobname
self.jobstart = jobstart

self.exe_event = os.path.join(self.bindir, "scr_log_event")
self.eve_transfer = os.path.join(self.bindir, "scr_log_transfer")

# return list of output datasets
def event(self, event_type, dset=None, name=None, start=None, secs=None, note=None):
argv = [self.exe_event, '-p', self.prefix]

if self.user is not None:
argv.extend(['-u', str(self.user)])

if self.jobid is not None:
argv.extend(['-i', str(self.jobid)])

if self.jobname is not None:
argv.extend(['-j', str(self.jobname)])

if self.jobstart is not None:
argv.extend(['-s', str(self.jobstart)])

if event_type is not None:
argv.extend(['-T', str(event_type)])

if dset is not None:
argv.extend(['-D', str(dset)])

if name is not None:
argv.extend(['-n', str(name)])

if start is not None:
argv.extend(['-S', str(start)])

if secs is not None:
argv.extend(['-L', str(secs)])

if note is not None:
argv.extend(['-N', str(note)])

rc = runproc(argv)[1]
return (rc == 0)
37 changes: 13 additions & 24 deletions scripts/pyfe/pyfe/list_down_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

# list_down_nodes.py

from time import time
from pyfe import scr_const, scr_common
from pyfe.list_dir import list_dir
from pyfe.scr_common import runproc, pipeproc, scr_prefix
from pyfe.scr_common import runproc, pipeproc

# mark any nodes specified on the command line
def remove_argument_excluded_nodes(nodes=[],nodeset_down=[]):
Expand All @@ -18,13 +17,11 @@ def remove_argument_excluded_nodes(nodes=[],nodeset_down=[]):

# The main scr_list_down_nodes method.
# this method takes an scr_env, the contained resource manager will determine which methods above to use
def list_down_nodes(reason=False, free=False, nodeset_down='', log_nodes=False, runtime_secs=None, nodeset=None, scr_env=None):
def list_down_nodes(reason=False, free=False, nodeset_down='', runtime_secs=None, nodeset=None, scr_env=None, log=None):
if scr_env is None or scr_env.resmgr is None or scr_env.param is None:
return 1
bindir = scr_const.X_BINDIR

start_time = str(int(time())) # epoch seconds as int to remove decimal, as string to be a parameter

# check that we have a nodeset before going any further
resourcemgr = scr_env.resmgr
if nodeset is None or len(nodeset)==0:
Expand All @@ -40,14 +37,6 @@ def list_down_nodes(reason=False, free=False, nodeset_down='', log_nodes=False,
# get list of nodes from nodeset
nodes = scr_env.resmgr.expand_hosts(nodeset)

# get prefix directory
prefix = scr_env.get_prefix()

# get jobid
jobid = resourcemgr.getjobid()
#if jobid == 'defjobid': # job id could not be determined
# print('Could not determine the job id') # the only place this is used here is in the logging below

### In each of the scr_list_down_nodes.in
### these nodes are marked as unavailable, and also removed from the list to log
### There is no use to keep track of them in the unavailable dictionary
Expand All @@ -66,23 +55,23 @@ def list_down_nodes(reason=False, free=False, nodeset_down='', log_nodes=False,
# TODO: read exclude list from a file, as well?

# print any failed nodes to stdout and exit with non-zero
if len(unavailable)>0:
if len(unavailable) > 0:
# log each newly failed node, along with the reason
if log_nodes:
# scr_common.log calls the external program: scr_log_event
# the method will also accept a dictionary (instead of a string)
# for the event_note argument, this moves the loop closer to the runproc call
scr_common.log(bindir=bindir, prefix=prefix, jobid=jobid, event_type='NODE_FAIL', event_note=unavailable, event_start=start_time, event_secs=runtime_secs)
if log:
for node in unavailable:
note = node + ": " + unavailable[node]
log.event('NODE_FAIL', note=note, secs=runtime_secs)

# now output info to the user
ret=''
if reason:
# list each node and the reason each is down
reasons = []
for node in unavailable:
ret += node+': '+unavailable[node]+'\n'
ret = ret[:-1] ### take off the final trailing newline (?)
reasons.append(node + ': ' + unavailable[node])
return "\n".join(reasons)
else:
# simply print the list of down node in range syntax
ret = scr_env.resmgr.compress_hosts(list(unavailable))
return ret
return scr_env.resmgr.compress_hosts(list(unavailable))

# otherwise, don't print anything and exit with 0
return 0
10 changes: 5 additions & 5 deletions scripts/pyfe/pyfe/postrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pyfe.scr_glob_hosts import scr_glob_hosts
from pyfe.cli import SCRIndex, SCRFlushFile

def postrun(prefix_dir=None,scr_env=None,verbose=False):
def postrun(prefix_dir=None, scr_env=None, verbose=False, log=None):
if scr_env is None or scr_env.resmgr is None:
return 1

Expand All @@ -42,8 +42,8 @@ def postrun(prefix_dir=None,scr_env=None,verbose=False):
if pardir=='':
return 1

scr_index = SCRIndex(bindir, pardir)
scr_flush_file = SCRFlushFile(bindir, pardir)
scr_index = SCRIndex(pardir)
scr_flush_file = SCRFlushFile(pardir)

# all parameters checked out, start normal output
print('scr_postrun: Started: '+str(datetime.now()))
Expand Down Expand Up @@ -126,7 +126,7 @@ def postrun(prefix_dir=None,scr_env=None,verbose=False):
# Gather files from cache to parallel file system
print('scr_postrun: Scavenging files from cache for '+dsetname+' to '+datadir)
print('scr_postrun: '+bindir+'/scr_scavenge '+('--verbose ' if verbose else '')+'--id '+d+' --from '+cntldir+' --to '+pardir+' --jobset '+scr_nodelist+' --up '+upnodes)
if scr_scavenge(nodeset_job=scr_nodelist, nodeset_up=upnodes, dataset_id=d, cntldir=cntldir, prefixdir=pardir, verbose=verbose, scr_env=scr_env)!=1:
if scr_scavenge(nodeset_job=scr_nodelist, nodeset_up=upnodes, dataset_id=d, cntldir=cntldir, prefixdir=pardir, verbose=verbose, scr_env=scr_env, log=log)!=1:
print('scr_postrun: Done scavenging files from cache for '+dsetname+' to '+datadir)
else:
print('scr_postrun: ERROR: Scavenge files from cache for '+dsetname+' to '+datadir)
Expand Down Expand Up @@ -189,7 +189,7 @@ def postrun(prefix_dir=None,scr_env=None,verbose=False):
# Gather files from cache to parallel file system
print('scr_postrun: Scavenging files from cache for checkpoint '+dsetname+' to '+datadir)
print('scr_postrun: '+bindir+'/scr_scavenge '+('--verbose ' if verbose else '')+'--id '+d+' --from '+cntldir+' --to '+pardir+' --jobset '+scr_nodelist+' --up '+upnodes)
if scr_scavenge(nodeset_job=scr_nodelist, nodeset_up=upnodes, dataset_id=d, cntldir=cntldir, prefixdir=pardir, verbose=verbose, scr_env=scr_env) != 1:
if scr_scavenge(nodeset_job=scr_nodelist, nodeset_up=upnodes, dataset_id=d, cntldir=cntldir, prefixdir=pardir, verbose=verbose, scr_env=scr_env, log=log) != 1:
print('scr_postrun: Done scavenging files from cache for '+dsetname+' to '+datadir)
else:
print('scr_postrun: ERROR: Scavenge files from cache for '+dsetname+' to '+datadir)
Expand Down
12 changes: 11 additions & 1 deletion scripts/pyfe/pyfe/scr_list_down_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pyfe.scr_environment import SCR_Env
from pyfe.scr_param import SCR_Param
from pyfe.resmgr import AutoResourceManager
from pyfe.cli import SCRLog

if __name__=='__main__':
parser = argparse.ArgumentParser(add_help=False, argument_default=argparse.SUPPRESS, prog='scr_list_down_nodes')
Expand All @@ -31,5 +32,14 @@
scr_env = SCR_Env()
scr_env.resmgr = AutoResourceManager()
scr_env.param = SCR_Param()
ret = list_down_nodes(reason=args['reason'], free=args['free'], nodeset_down=args['down'], log_nodes=args['log'], runtime_secs=args['secs'], nodeset=args['[nodeset]'], scr_env=scr_env)

# create log object if asked to log down nodes
log = None
if args['log']:
prefix = scr_env.get_prefix()
jobid = scr_env.resmgr.getjobid()
user = scr_env.get_user()
log = SCRLog(prefix, jobid, user=user)

ret = list_down_nodes(reason=args['reason'], free=args['free'], nodeset_down=args['down'], runtime_secs=args['secs'], nodeset=args['[nodeset]'], scr_env=scr_env, log=log)
print(str(ret),end='') ### remove trailing newlines?
29 changes: 14 additions & 15 deletions scripts/pyfe/pyfe/scr_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
from pyfe.resmgr import AutoResourceManager
from pyfe.scr_param import SCR_Param
from pyfe.scr_glob_hosts import scr_glob_hosts
from pyfe.cli import SCRLog

# determine how many nodes are needed
def nodes_needed(scr_env, nodelist):
# if SCR_MIN_NODES is set, use that
num_needed = os.environ.get('SCR_MIN_NODES')
if num_needed is None or int(num_needed) <= 0:
# otherwise, use value in nodes file if one exists
# num_needed_env='$bindir/scr_env --prefix $prefix --runnodes'
num_needed = scr_env.get_runnode_count()
if num_needed <= 0:
# otherwise, assume we need all nodes in the allocation
Expand All @@ -48,16 +48,14 @@ def nodes_needed(scr_env, nodelist):

# return number of nodes left in allocation after excluding down nodes
def nodes_remaining(resmgr, nodelist, down_nodes):
# num_left='$bindir/scr_glob_hosts --count --minus $SCR_NODELIST:$down_nodes'
num_left = scr_glob_hosts(count=True, minus = nodelist + ':' + down_nodes, resmgr=resmgr)
if num_left is None:
return 0
return int(num_left)

# is there a halt condition instructing us to stop?
def should_halt(bindir, prefix):
#$bindir/scr_retries_halt --dir $prefix;
argv = [bindir + '/scr_retries_halt', '--dir', prefix]
argv = [os.path.join(bindir, 'scr_retries_halt'), '--dir', prefix]
returncode = runproc(argv=argv)[1]
return (returncode == 0)

Expand Down Expand Up @@ -109,6 +107,7 @@ def scr_run(launcher='',launcher_args=[],run_cmd='',restart_cmd='',restart_args=
launcher.hostfile = scr_env.get_prefix()+'/.scr/hostfile'
# jobid will come from resource manager.
jobid = resourcemgr.getjobid()
user = scr_env.get_user()

# TODO: check that we have a valid jobid and bail if not
# pmix always returns None
Expand Down Expand Up @@ -137,7 +136,7 @@ def scr_run(launcher='',launcher_args=[],run_cmd='',restart_cmd='',restart_args=
resourcemgr.usewatchdog(True)

# get the control directory
cntldir = list_dir(user=scr_env.get_user(), jobid=resourcemgr.getjobid(), runcmd='control', scr_env=scr_env, bindir=bindir)
cntldir = list_dir(user=user, jobid=resourcemgr.getjobid(), runcmd='control', scr_env=scr_env, bindir=bindir)
if cntldir == 1:
print(prog+': ERROR: Could not determine control directory')
sys.exit(1)
Expand All @@ -149,6 +148,8 @@ def scr_run(launcher='',launcher_args=[],run_cmd='',restart_cmd='',restart_args=
timestamp=datetime.now()
print(prog+': prerun: '+str(timestamp))

log = SCRLog(prefix, jobid, user=user, jobstart=start_secs)

# test runtime, ensure filepath exists,
if scr_prerun(prefix=prefix)!=0:
print(prog+': ERROR: Command failed: scr_prerun -p '+prefix)
Expand Down Expand Up @@ -202,7 +203,7 @@ def scr_run(launcher='',launcher_args=[],run_cmd='',restart_cmd='',restart_args=
down_nodes = ''
if down_nodes!='':
# print the reason for the down nodes, and log them
list_down_nodes(reason=True, free=free_flag, nodeset_down=down_nodes, log_nodes=True, runtime_secs='0', scr_env=scr_env)
list_down_nodes(reason=True, free=free_flag, nodeset_down=down_nodes, runtime_secs='0', scr_env=scr_env, log=log)

# if this is the first run, we hit down nodes right off the bat, make a record of them
if attempts==0:
Expand Down Expand Up @@ -248,14 +249,12 @@ def scr_run(launcher='',launcher_args=[],run_cmd='',restart_cmd='',restart_args=
launch_cmd = launcher_args.copy()
###### should this be split (' ') ?
launch_cmd.extend(my_restart_cmd.split(' '))

# launch the job, make sure we include the script node and exclude down nodes
start_secs=int(time())

scr_common.log(bindir=bindir, prefix=prefix, jobid=jobid, event_type='RUN_START', event_note='run='+str(attempts), event_start=str(start_secs))
# $bindir/scr_log_event -i $jobid -p $prefix -T "RUN_START" -N "run=$attempts" -S $start_secs
log.event('RUN_START', note='run=' + str(attempts))
print(prog + ': Launching ' + str(launch_cmd))
proc, pid = launcher.launchruncmd(up_nodes=nodelist,down_nodes=down_nodes,launcher_args=launch_cmd)
# $launcher $exclude $launch_cmd
if resourcemgr.usewatchdog() == False:
proc.wait(timeout=None)
else:
Expand All @@ -268,11 +267,11 @@ def scr_run(launcher='',launcher_args=[],run_cmd='',restart_cmd='',restart_args=
end_secs = int(time())
run_secs = end_secs - start_secs

# check for and log any down nodes
list_down_nodes(reason=True, nodeset_down=keep_down, log_nodes=True, runtime_secs=str(run_secs), scr_env=scr_env)
# log stats on the latest run attempt
scr_common.log(bindir=bindir, prefix=prefix, jobid=jobid, event_type='RUN_END', event_note='run='+str(attempts), event_start=str(end_secs), event_secs=str(run_secs))
#$bindir/scr_log_event -i $jobid -p $prefix -T "RUN_END" -N "run=$attempts" -S $end_secs -L $run_secs
log.event('RUN_END', note='run=' + str(attempts), secs=str(run_secs))

# check for and log any down nodes
list_down_nodes(reason=True, nodeset_down=keep_down, runtime_secs=str(run_secs), scr_env=scr_env, log=log)

# any retry attempts left?
if runs > 1:
Expand Down Expand Up @@ -300,7 +299,7 @@ def scr_run(launcher='',launcher_args=[],run_cmd='',restart_cmd='',restart_args=
print(prog + ': postrun: ' + str(timestamp))

# scavenge files from cache to parallel file system
if postrun(prefix_dir=prefix,scr_env=scr_env,verbose=verbose) != 0:
if postrun(prefix_dir=prefix, scr_env=scr_env, verbose=verbose, log=log) != 0:
print(prog+': ERROR: Command failed: scr_postrun -p '+prefix)

# make a record of end time
Expand Down
11 changes: 7 additions & 4 deletions scripts/pyfe/pyfe/scr_scavenge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import argparse
from datetime import datetime
from time import time
from pyfe import scr_const, scr_common
from pyfe import scr_const
from pyfe.scr_param import SCR_Param
from pyfe.scr_environment import SCR_Env
from pyfe.resmgr import AutoResourceManager
Expand All @@ -21,7 +21,7 @@

# check for pdsh / (clustershell) errors in case any nodes should be retried

def scr_scavenge(nodeset_job=None, nodeset_up='', nodeset_down='', dataset_id=None, cntldir=None, prefixdir=None, verbose=False, scr_env=None):
def scr_scavenge(nodeset_job=None, nodeset_up='', nodeset_down='', dataset_id=None, cntldir=None, prefixdir=None, verbose=False, scr_env=None, log=log):
# check that we have a nodeset for the job and directories to read from / write to
if nodeset_job is None or dataset_id is None or cntldir is None or prefixdir is None:
return 1
Expand Down Expand Up @@ -65,7 +65,8 @@ def scr_scavenge(nodeset_job=None, nodeset_up='', nodeset_down='', dataset_id=No
error = prefixdir+'/.scr/scr.dataset.'+dataset_id+'/scr_scavenge.pdsh.e'+jobid

# log the start of the scavenge operation
scr_common.log(bindir=bindir, prefix=prefixdir, jobid=jobid, event_type='SCAVENGE_START', event_dset=dataset_id, event_start=str(start_time))
if log:
log.event('SCAVENGE_START', dset=dataset_id)

print('scr_scavenge: '+str(int(time())))
# have the launcher class gather files via pdsh or clustershell
Expand Down Expand Up @@ -96,7 +97,9 @@ def scr_scavenge(nodeset_job=None, nodeset_up='', nodeset_down='', dataset_id=No
# get a timestamp for logging timing values
end_time = int(time())
diff_time = end_time - start_time
scr_common.log(bindir=bindir, prefix=prefixdir, jobid=jobid, event_type='SCAVENGE_END', event_dset=dataset_id, event_start=str(start_time), event_secs=str(diff_time))
if log:
log.event('SCAVENGE_END', dset=dataset_id, secs=diff_time)

return 0

if __name__=='__main__':
Expand Down