Skip to content

Commit

Permalink
Add support for workflows to command sos status/kill/purge #1420
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Dec 23, 2020
1 parent 4c2d7e2 commit 4398e4e
Show file tree
Hide file tree
Showing 8 changed files with 733 additions and 85 deletions.
269 changes: 203 additions & 66 deletions src/sos/__main__.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/sos/executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def create_task(global_def, global_vars, task_stmt, task_params):
)
# if no output (thus no signature)
# temporarily create task signature to obtain sig_id
task_id = RuntimeInfo(
task_id = 't' + RuntimeInfo(
statementMD5([task_stmt]),
task_vars["_input"],
task_vars["_output"],
Expand Down
10 changes: 8 additions & 2 deletions src/sos/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def __init__(
)
# remove previous status file, which could be readonly if the job is killed
if os.path.isfile(self.pulse_file):
if not os.access(self.pulse_file, os.W_OK):
if not os.stat(self.pulse_file).st_mode & stat.S_IWUSR:
os.chmod(self.pulse_file, stat.S_IREAD | stat.S_IWRITE)
os.remove(self.pulse_file)
self.sos_dict = sos_dict
Expand Down Expand Up @@ -209,12 +209,18 @@ def _exceed_resource(self, msg):
p = psutil.Process(self.pid)
p.kill()

def write(self, msg):
with open(self.pulse_file, "a") as pd:
pd.write(
f"#{time.time()}\t{msg}\n"
)

def run(self):
counter = 0
start_time = time.time()
while True:
try:
if not os.path.isfile(self.pulse_file) or not os.access(self.pulse_file, os.W_OK):
if not os.path.isfile(self.pulse_file) or not os.stat(self.pulse_file).st_mode & stat.S_IWUSR:
env.logger.warning(f"Workflow {self.workflow_id} ``aborted``")
# the job should be killed
p = psutil.Process(self.pid)
Expand Down
2 changes: 1 addition & 1 deletion src/sos/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ def calc_md5(self, args):
for step in self.sections + self.auxiliary_sections:
sig.write(f"{step.step_name()}: {step.md5}\n")
sig.write(f"{args}\n")
return 'WF' + textMD5(sig.getvalue())[:14]
return 'w' + textMD5(sig.getvalue())[:16]

class SoS_ScriptContent:
"""A small class to record the script information to be used by nested
Expand Down
7 changes: 5 additions & 2 deletions src/sos/task_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def run(self):
try:
# return creation time, start time, and duration
tid, tags, ct, st, dr, tst = line.split('\t')
if tid.startswith('w'):
continue
# for some reason on windows there can be a \r at the end
self.task_status[tid] = tst.strip()
self.task_info[tid]['date'] = [
Expand Down Expand Up @@ -607,10 +609,11 @@ def query_tasks(self,
status=None):
try:
return self.agent.check_output(
"{} status {} -v {} {} {} {} {} {}".format(
"{} status {} -v {} {} {} {} {} {} {}".format(
self.agent.config.get('sos', 'sos'),
'' if tasks is None else ' '.join(tasks),
verbosity,
'--all tasks' if check_all else '',
'--html' if html else '',
'--numeric-times' if numeric_times else '',
f'--age {age}' if age else '',
Expand Down Expand Up @@ -660,7 +663,7 @@ def kill_tasks(self, tasks, tags=None, all_tasks=False):
self.agent.config.get('sos',
'sos'), '' if all_tasks else ' '.join(tasks),
f'--tags {" ".join(tags)}' if tags else '',
'-a' if all_tasks else '')
'--all tasks' if all_tasks else '')

try:
ret = self.agent.check_output(cmd)
Expand Down
9 changes: 5 additions & 4 deletions src/sos/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __repr__(self):

class MasterTaskParams(TaskParams):
def __init__(self, num_workers=None):
self.ID = "M_0"
self.ID = "t0"
self.name = self.ID
self.global_def = ""
self.task = ""
Expand Down Expand Up @@ -212,7 +212,8 @@ def push(self, task_id, params):
self.task_stack.append([task_id, params])
self.tags = sorted(list(set(self.tags)))
#
self.ID = f"M{len(self.task_stack)}_{self.task_stack[0][0]}"
id_prefix = f't{len(self.task_stack)}'
self.ID = f"{id_prefix}{self.task_stack[0][0][:-(len(id_prefix))]}"
self.name = self.ID

def finalize(self):
Expand Down Expand Up @@ -1444,7 +1445,7 @@ def print_task_status(
]

if not all_tasks:
env.logger.info("No matching tasks are identified.")
env.logger.debug("No matching tasks are identified.")
return

raw_status = check_tasks([x[0] for x in all_tasks], check_all)
Expand Down Expand Up @@ -1930,7 +1931,7 @@ def kill_tasks(tasks, tags=None):
]

if not all_tasks:
env.logger.warning("No task to kill")
env.logger.debug("No task to kill")
return
all_tasks = sorted(list(set(all_tasks)))
# at most 20 threads
Expand Down
Loading

0 comments on commit 4398e4e

Please sign in to comment.