Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add class for scr_flush_file command #410

Merged
merged 6 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/pyfe/pyfe/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ INSTALL(FILES ${CMAKE_CURRENT_BINARY_DIR}/scr_const.py
PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ WORLD_EXECUTE WORLD_READ
DESTINATION ${CMAKE_INSTALL_BINDIR}/pyfe/pyfe)

ADD_SUBDIRECTORY(cli)
ADD_SUBDIRECTORY(resmgr)
ADD_SUBDIRECTORY(joblauncher)
6 changes: 6 additions & 0 deletions scripts/pyfe/pyfe/cli/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SET(SCRCLI
__init__.py
scr_index.py
scr_flush_file.py
)
INSTALL(FILES ${SCRCLI} DESTINATION ${CMAKE_INSTALL_BINDIR}/pyfe/pyfe/cli)
2 changes: 2 additions & 0 deletions scripts/pyfe/pyfe/cli/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Wrappers for SCR command line interface
The classes in this directory provide python interfaces around SCR command line tools.
2 changes: 2 additions & 0 deletions scripts/pyfe/pyfe/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .scr_index import SCRIndex
from .scr_flush_file import SCRFlushFile
39 changes: 39 additions & 0 deletions scripts/pyfe/pyfe/cli/scr_flush_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#! /usr/bin/env python3

import os

from pyfe.scr_common import runproc

class SCRFlushFile:
def __init__(self, bindir, prefix):
self.bindir = bindir # path to SCR bin directory
self.prefix = prefix # path to SCR_PREFIX
self.exe = os.path.join(bindir, "scr_flush_file") + " --dir " + prefix

# return list of output datasets
def list_dsets_output(self):
dsets, rc = runproc(self.exe + " --list-output", getstdout=True)
if rc == 0:
return dsets.split()
return []

# return list of checkpoints
def list_dsets_ckpt(self, before=None):
cmd = self.exe + " --list-ckpt"
if before:
cmd = cmd + " --before " + before
dsets, rc = runproc(cmd, getstdout=True)
if rc == 0:
return dsets.split()
return []

# determine whether this dataset needs to be flushed
def need_flush(self, d):
rc = runproc(self.exe + " --need-flush " + d)[1]
return (rc == 0)

# return name of specified dataset
def name(self, d):
name, rc = runproc(self.exe + " --name " + d, getstdout=True)
if rc == 0:
return name.rstrip()
21 changes: 21 additions & 0 deletions scripts/pyfe/pyfe/cli/scr_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#! /usr/bin/env python3

import os

from pyfe.scr_common import runproc

class SCRIndex:
def __init__(self, bindir, prefix):
self.bindir = bindir # path to SCR bin directory
self.prefix = prefix # path to SCR_PREFIX
self.exe = os.path.join(bindir, "scr_index") + " --prefix " + prefix

# make named dataset as current
def current(self, name):
rc = runproc(self.exe + " --current " + name)[1]
return (rc == 0)

# run build command to inspect and rebuild dataset files
def build(self, dset):
rc = runproc(self.exe + " --build " + dset)[1]
return (rc == 0)
126 changes: 50 additions & 76 deletions scripts/pyfe/pyfe/postrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
from pyfe.scr_scavenge import scr_scavenge
from pyfe.list_down_nodes import list_down_nodes
from pyfe.scr_glob_hosts import scr_glob_hosts
from pyfe.cli import SCRIndex, SCRFlushFile

def postrun(prefix_dir=None,scr_env=None,verbose=False):
if scr_env is None or scr_env.resmgr is None:
return 1

# if SCR is disabled, immediately exit
val = os.environ.get('SCR_ENABLE')
if val is not None and val=='0':
Expand All @@ -40,6 +42,9 @@ def postrun(prefix_dir=None,scr_env=None,verbose=False):
if pardir=='':
return 1

scr_index = SCRIndex(bindir, pardir)
scr_flush_file = SCRFlushFile(bindir, pardir)

# all parameters checked out, start normal output
print('scr_postrun: Started: '+str(datetime.now()))

Expand All @@ -52,6 +57,7 @@ def postrun(prefix_dir=None,scr_env=None,verbose=False):
return 1
os.environ['SCR_NODELIST'] = nodelist_env
scr_nodelist = os.environ.get('SCR_NODELIST')

# identify what nodes are still up
upnodes=scr_nodelist
downnodes = list_down_nodes(nodeset=upnodes,scr_env=scr_env)
Expand Down Expand Up @@ -85,47 +91,37 @@ def postrun(prefix_dir=None,scr_env=None,verbose=False):
# array to track datasets we got
succeeded = []

# scavenge all output sets in ascending order,
# track the id of the first one we fail to get
failed_dataset = None

# scavenge all output sets in ascending order
print('scr_postrun: Looking for output sets')
failed_dataset=0
argv = [bindir+'/scr_flush_file','--dir',pardir,'--list-output']
output_list, returncode = runproc(argv=argv,getstdout=True)
if returncode!=0:
dsets_output = scr_flush_file.list_dsets_output()
if not dsets_output:
print('scr_postrun: Found no output set to scavenge')
else:
argv.append('') # make len(argv) == 5
#### Need the format of the scr_flush_file output ####
#### This is just looping over characters ####
if '\n' in outputlist:
outputlist = outputlist.split('\n')
elif ',' in outputlist:
outputlist = outputlist.split(',')
for d in output_list:
for d in dsets_output:
# determine whether this dataset needs to be flushed
argv[3]='--need-flush'
argv[4]=d
returncode = runproc(argv=argv)[1]
if returncode!=0:
if not scr_flush_file.need_flush(d)
# dataset has already been flushed, go to the next one
print('scr_postrun: Dataset '+d+' has already been flushed')
print('scr_postrun: Dataset ' + d + ' has already been flushed')
continue
print('scr_postrun: Attempting to scavenge dataset '+d)
print('scr_postrun: Attempting to scavenge dataset ' + d)

# add $d to ATTEMPTED list
attempted.append(d)

# get dataset name
argv[3]='--name'
dsetname, returncode = runproc(argv=argv,getstdout=True)
if returncode!=0:
dsetname = scr_flush_file.name(d)
if not dsetname:
# got a dataset to flush, but failed to get name
print('scr_postrun: Failed to read name of dataset '+d)
failed_dataset=d
print('scr_postrun: Failed to read name of dataset ' + d)
failed_dataset = d
break

# build full path to dataset directory
datadir=pardir+'/.scr/scr.dataset.'+d
os.makedirs(datadir,exist_ok=True)
datadir = os.path.join(pardir, '.scr', 'scr.dataset.' + d)
os.makedirs(datadir, exist_ok=True)

# Gather files from cache to parallel file system
print('scr_postrun: Scavenging files from cache for '+dsetname+' to '+datadir)
Expand All @@ -139,65 +135,56 @@ def postrun(prefix_dir=None,scr_env=None,verbose=False):
# if not, don't update current marker
#update_current=1
print('scr_postrun: Checking that dataset is complete')
print(bindir+'/scr_index --prefix '+pardir+' --build '+d)
index_argv = [bindir+'/scr_index','--prefix',pardir,'--build',d]
returncode = runproc(argv=argv)[1]
if returncode!=0:
if not scr_index.build(d)
# failed to get dataset, stop trying for later sets
failed_dataset=d
failed_dataset = d
break

# remember that we scavenged this dataset in case we try again below
succeeded.append(d)
print('scr_postrun: Scavenged dataset '+dsetname+' successfully')
print('scr_postrun: Scavenged dataset ' + dsetname + ' successfully')

# check whether we have a dataset set to flush
print('scr_postrun: Looking for most recent checkpoint')
argv = [bindir+'/scr_flush_file','--dir',pardir,'--list-ckpt','--before',str(failed_dataset)]
ckpt_list, returncode = runproc(argv=argv,getstdout=True)
if returncode!=0:
dsets_ckpt = scr_flush_file.list_dsets_ckpt(before=failed_dataset)
if not dsets_ckpt:
print('scr_postrun: Found no checkpoint to scavenge')
else:
argv = [bindir+'/scr_flush_file','--dir',pardir,'--name','']
for d in ckpt_list:
for d in dsets_ckpt:
if d in attempted:
if d in succeeded:
# already got this one above, update current, and finish
argv[4] = d
dsetname, returncode = runproc(argv=argv,getstdout=True)
if returncode==0:
print('scr_postrun: Already scavenged checkpoint dataset '+d)
print('scr_postrun: Updating current marker in index to '+dsetname)
index_argv = [bindir+'/scr_index','--prefix',pardir,'--current',dsetname]
runproc(argv=index_argv)
ret=0
dsetname = scr_flush_file.name(d)
if dsetname:
print('scr_postrun: Already scavenged checkpoint dataset ' + d)
print('scr_postrun: Updating current marker in index to ' + dsetname)
scr_index.current(dsetname)
ret = 0
break
else:
# already tried and failed, skip this dataset
print('scr_postrun: Skipping checkpoint dataset '+d+', since already failed to scavenge')
print('scr_postrun: Skipping checkpoint dataset ' + d + ', since already failed to scavenge')
continue

# we have a dataset, check whether it still needs to be flushed

argv[3]='--need-flush'
argv[4]=d
returncode = runproc(argv=argv)[1]
if returncode!=0:
if not scr_flush_file.need_flush(d):
# found a dataset that has already been flushed, we can quit
print('scr_postrun: Checkpoint dataset '+d+' has already been flushed')
ret=0
print('scr_postrun: Checkpoint dataset ' + d + ' has already been flushed')
ret = 0
break
print('scr_postrun: Attempting to scavenge checkpoint dataset '+d)
print('scr_postrun: Attempting to scavenge checkpoint dataset ' + d)

# get dataset name
argv[3]='--name'
dsetname, returncode = runproc(argv=argv,getstdout=True)
if returncode!=0:
dsetname = scr_flush_file.name(d)
if not dsetname:
# got a dataset to flush, but failed to get name
print('scr_postrun: Failed to read name of checkpoint dataset '+d)
print('scr_postrun: Failed to read name of checkpoint dataset ' + d)
continue

# build full path to dataset directory
datadir=pardir+'/.scr/scr.dataset.'+d
os.makedirs(datadir,exist_ok=True)
datadir = os.path.join(pardir, '.scr', 'scr.dataset.' + d)
os.makedirs(datadir, exist_ok=True)

# Gather files from cache to parallel file system
print('scr_postrun: Scavenging files from cache for checkpoint '+dsetname+' to '+datadir)
Expand All @@ -208,26 +195,13 @@ def postrun(prefix_dir=None,scr_env=None,verbose=False):
print('scr_postrun: ERROR: Scavenge files from cache for '+dsetname+' to '+datadir)

# check that gathered set is complete,
# if not, don't update current marker
print('scr_postrun: Checking that dataset is complete')
print(bindir+'/scr_index --prefix '+pardir+' --build '+d)
argv = [bindir+'/scr_index','--prefix',pardir,'--build',d]
returncode = runproc(argv=argv)[1]
if returncode!=0:
# incomplete dataset, don't update current marker
#update_current=0
pass

# if the set is complete, update the current marker
else:
if scr_index.build(d):
# make the new current
print('scr_postrun: Updating current marker in index to '+dsetname)
argv[3]='--current'
argv[4]=dsetname
runproc(argv=argv)

# just completed scavenging this dataset, so quit
ret=0
print('scr_postrun: Updating current marker in index to ' + dsetname)
scr_index.current(dsetname)
ret = 0
break

# print the timing info
Expand Down
2 changes: 1 addition & 1 deletion scripts/pyfe/pyfe/scr_postrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@
else:
scr_env = SCR_Env()
scr_env.resmgr = AutoResourceManager()
ret = postrun(prefix_dir=args['prefix'],verbose=args['verbose'])
ret = postrun(prefix_dir=args['prefix'], scr_env=scr_env, verbose=args['verbose'])
print(str(ret))