Skip to content

Commit

Permalink
apply patch from #96
Browse files Browse the repository at this point in the history
  • Loading branch information
mbhall88 committed Jul 27, 2023
1 parent e449ac6 commit 20b9c6b
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions {{cookiecutter.profile_name}}/slurm-sidecar.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
"""Run a Snakemake v7+ sidecar process for Slurm
This sidecar process will poll ``squeue --user [user] --format='%i,%T'``
every 60 seconds by default (use environment variable
``SNAKEMAKE_SLURM_SQUEUE_WAIT`` for adjusting this).
This sidecar process will poll ``squeue --me --format='%i,%T'`` every 60
seconds by default (use environment variable ``SNAKEMAKE_SLURM_SQUEUE_WAIT``
for adjusting this).
Note that you have to adjust the value to fit to your ``MinJobAge`` Slurm
configuration. Jobs remain at least ``MinJobAge`` seconds known to the
Expand Down Expand Up @@ -63,7 +63,7 @@ def __init__(
squeue_cmd,
squeue_timeout=2,
sleep_time=0.01,
max_tries=3,
max_tries=10,
*args,
**kwargs
):
Expand Down Expand Up @@ -103,7 +103,10 @@ def get_state(self, jobid):
"""Return the job state for the given jobid."""
jobid = str(jobid)
if jobid not in self.states:
self.states[jobid] = self._get_state_sacct(jobid)
try:
self.states[jobid] = self._get_state_sacct(jobid)
except:
return "__not_seen_yet__"
return self.states.get(jobid, "__not_seen_yet__")

def register_job(self, jobid):
Expand All @@ -122,17 +125,22 @@ def _get_state_sacct(self, jobid):
try:
logger.debug("Calling %s (try %d)", cmd, try_num)
output = subprocess.check_output(cmd, timeout=self.squeue_timeout, text=True)
break
except subprocess.TimeoutExpired as e:
logger.debug("Call to %s timed out (try %d of %d)", cmd, try_num, self.max_tries)
logger.warning("Call to %s timed out (try %d of %d)", cmd, try_num, self.max_tries)
continue
except subprocess.CalledProcessError as e:
logger.debug("Call to %s failed (try %d of %d)", cmd, try_num, self.max_tries)
if try_num >= self.max_tries:
raise Exception("Problem with call to %s" % cmd)
else:
parsed = {x.split("|")[0]: x.split("|")[1] for x in output.strip().split("\n")}
logger.debug("Returning state of %s as %s", jobid, parsed[jobid])
return parsed[jobid]
logger.warning("Call to %s failed (try %d of %d)", cmd, try_num, self.max_tries)
continue
try:
parsed = {x.split("|")[0]: x.split("|")[1] for x in output.strip().split("\n")}
logger.debug("Returning state of %s as %s", jobid, parsed[jobid])
return parsed[jobid]
except IndexError:
logger.warning("Could not parse %s (try %d of %d)", repr(output), try_num, self.max_tries)
secs = try_num / 2.0
loger.info("Sleeping %f seconds", secs)
time.sleep(secs)
raise Exception("Problem with call to %s" % cmd)

def stop(self):
"""Flag thread to stop execution"""
Expand All @@ -143,7 +151,7 @@ def _call_squeue(self, allow_failure=True):
"""Run the call to ``squeue``"""
cluster = CookieCutter.get_cluster_option()
try_num = 0
cmd = [SQUEUE_CMD, "--user={}".format(os.environ.get("USER")), "--format=%i,%T", "--state=all"]
cmd = [SQUEUE_CMD, "--me", "--format=%i,%T", "--state=all"]
if cluster:
cmd.append(cluster)
while try_num < self.max_tries:
Expand Down Expand Up @@ -209,6 +217,12 @@ def do_GET(self):
return
# Otherwise, query job ID status
job_id = self.path[len("/job/status/") :]
try:
logger.debug("CLEMENS:")
logger.debug(job_id)
job_id=job_id.split("%20")[3]
except IndexError:
pass
logger.debug("Querying for job ID %s" % repr(job_id))
status = self.server.poll_thread.get_state(job_id)
logger.debug("Status: %s" % status)
Expand Down Expand Up @@ -286,8 +300,8 @@ def main():
poll_thread = PollSqueueThread(SQUEUE_WAIT, SQUEUE_CMD, name="poll-squeue")
poll_thread.start()

# Initialize HTTP server that makes available the output of ``squeue --user [user]``
# in a controlled fashion.
# Initialize HTTP server that makes available the output of ``squeue --me`` in a
# controlled fashion.
http_server = JobStateHttpServer(poll_thread)
http_thread = threading.Thread(name="http-server", target=http_server.serve_forever)
http_thread.start()
Expand Down

0 comments on commit 20b9c6b

Please sign in to comment.