Skip to content

Commit

Permalink
Move execute_wait from vestigial LocalChannel into parsl.utils (#3705)
Browse files Browse the repository at this point in the history
# Description

This moves the execute_wait functionality previously provided by
LocalChannel into parsl.utils, as part of #3515.
LocalChannel.execute_wait did not reference `self` so it already
basically behaved as a function rather than a method.

This leaves LocalChannel as solely a place for script_directory, which
will be untangled in a subsequent PR.

# Changed Behaviour

none

## Type of change

- Code maintenance/cleanup
  • Loading branch information
benclifford authored Dec 3, 2024
1 parent 1f583af commit 90d4a86
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 115 deletions.
34 changes: 1 addition & 33 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,7 @@
from abc import ABCMeta, abstractmethod, abstractproperty
from typing import Tuple
from abc import ABCMeta, abstractproperty


class Channel(metaclass=ABCMeta):
"""Channels are abstractions that enable ExecutionProviders to talk to
resource managers of remote compute facilities.
For certain resources such as campus clusters or supercomputers at
research laboratories, resource requirements may require authentication.
The only remaining Channel, *LocalChannel*, executes commands locally in a
shell.
Channels provide the ability to execute commands remotely, using the
execute_wait method, and manipulate the remote file system using methods
such as push_file, pull_file and makedirs.
Channels should ensure that each launched command runs in a new process
group, so that providers (such as LocalProvider) which terminate long
running commands using process groups can do so.
"""

@abstractmethod
def execute_wait(self, cmd: str, walltime: int = 0) -> Tuple[int, str, str]:
''' Executes the cmd, with a defined walltime.
Args:
- cmd (string): Command string to execute over the channel
- walltime (int) : Timeout in seconds
Returns:
- (exit_code, stdout, stderr) (int, string, string)
'''
pass

@abstractproperty
def script_dir(self) -> str:
''' This is a property. Returns the directory assigned for storing all internal scripts such as
Expand Down
35 changes: 0 additions & 35 deletions parsl/channels/local/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import os
import subprocess

from parsl.channels.base import Channel
from parsl.utils import RepresentationMixin
Expand All @@ -21,40 +20,6 @@ def __init__(self):
'''
self.script_dir = None

def execute_wait(self, cmd, walltime=None):
''' Synchronously execute a commandline string on the shell.
Args:
- cmd (string) : Commandline string to execute
- walltime (int) : walltime in seconds
Returns:
- retcode : Return code from the execution
- stdout : stdout string
- stderr : stderr string
'''
try:
logger.debug("Creating process with command '%s'", cmd)
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
preexec_fn=os.setpgrp
)
logger.debug("Created process with pid %s. Performing communicate", proc.pid)
(stdout, stderr) = proc.communicate(timeout=walltime)
retcode = proc.returncode
logger.debug("Process %s returned %s", proc.pid, proc.returncode)

except Exception:
logger.exception(f"Execution of command failed:\n{cmd}")
raise
else:
logger.debug("Execution of command in process %s completed normally", proc.pid)

return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))

@property
def script_dir(self):
return self._script_dir
Expand Down
3 changes: 2 additions & 1 deletion parsl/providers/cluster_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from parsl.launchers.errors import BadLauncher
from parsl.providers.base import ExecutionProvider
from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError
from parsl.utils import execute_wait

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,7 +77,7 @@ def execute_wait(self, cmd, timeout=None):
t = self.cmd_timeout
if timeout is not None:
t = timeout
return self.channel.execute_wait(cmd, t)
return execute_wait(cmd, t)

def _write_submit_script(self, template, script_filename, job_name, configs):
"""Generate submit script and write it to a file.
Expand Down
10 changes: 5 additions & 5 deletions parsl/providers/local/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ScriptPathError,
SubmitException,
)
from parsl.utils import RepresentationMixin
from parsl.utils import RepresentationMixin, execute_wait

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,7 +118,7 @@ def status(self, job_ids):
return [self.resources[jid]['status'] for jid in job_ids]

def _is_alive(self, job_dict):
retcode, stdout, stderr = self.channel.execute_wait(
retcode, stdout, stderr = execute_wait(
'ps -p {} > /dev/null 2> /dev/null; echo "STATUS:$?" '.format(
job_dict['remote_pid']), self.cmd_timeout)
for line in stdout.split('\n'):
Expand Down Expand Up @@ -223,11 +223,11 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"):
# cancel the task later.
#
# We need to do the >/dev/null 2>&1 so that bash closes stdout, otherwise
# channel.execute_wait hangs reading the process stdout until all the
# execute_wait hangs reading the process stdout until all the
# background commands complete.
cmd = '/bin/bash -c \'echo - >{0}.ec && {{ {{ bash {0} 1>{0}.out 2>{0}.err ; ' \
'echo $? > {0}.ec ; }} >/dev/null 2>&1 & echo "PID:$!" ; }}\''.format(script_path)
retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout)
retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout)
if retcode != 0:
raise SubmitException(job_name, "Launch command exited with code {0}".format(retcode),
stdout, stderr)
Expand Down Expand Up @@ -258,7 +258,7 @@ def cancel(self, job_ids):
job_dict['cancelled'] = True
logger.debug("Terminating job/process ID: {0}".format(job))
cmd = "kill -- -$(ps -o pgid= {} | grep -o '[0-9]*')".format(job_dict['remote_pid'])
retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout)
retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout)
if retcode != 0:
logger.warning("Failed to kill PID: {} and child processes on {}".format(job_dict['remote_pid'],
self.label))
Expand Down
Empty file.
22 changes: 0 additions & 22 deletions parsl/tests/test_channels/test_large_output.py

This file was deleted.

19 changes: 0 additions & 19 deletions parsl/tests/test_channels/test_local_channel.py

This file was deleted.

35 changes: 35 additions & 0 deletions parsl/tests/test_utils/test_execute_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from parsl.utils import execute_wait


@pytest.mark.local
def test_env():
''' Regression testing for issue #27
'''

rc, stdout, stderr = execute_wait("env", 1)

stdout = stdout.split('\n')
x = [s for s in stdout if s.startswith("PATH=")]
assert x, "PATH not found"

x = [s for s in stdout if s.startswith("HOME=")]
assert x, "HOME not found"


@pytest.mark.local
def test_large_output_2210():
"""Regression test for #2210.
execute_wait was hanging if the specified command gave too
much output, due to a race condition between process exiting and
pipes filling up.
"""

# this will output 128kb of stdout
execute_wait("yes | dd count=128 bs=1024", walltime=60)

# if this test fails, execute_wait should raise a timeout
# exception.

# The contents out the output is not verified by this test
35 changes: 35 additions & 0 deletions parsl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,38 @@ def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str:
raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'")

return sanitized


def execute_wait(cmd: str, walltime: Optional[int] = None) -> Tuple[int, str, str]:
''' Synchronously execute a commandline string on the shell.
Args:
- cmd (string) : Commandline string to execute
- walltime (int) : walltime in seconds
Returns:
- retcode : Return code from the execution
- stdout : stdout string
- stderr : stderr string
'''
try:
logger.debug("Creating process with command '%s'", cmd)
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
preexec_fn=os.setpgrp
)
logger.debug("Created process with pid %s. Performing communicate", proc.pid)
(stdout, stderr) = proc.communicate(timeout=walltime)
retcode = proc.returncode
logger.debug("Process %s returned %s", proc.pid, proc.returncode)

except Exception:
logger.exception(f"Execution of command failed:\n{cmd}")
raise
else:
logger.debug("Execution of command in process %s completed normally", proc.pid)

return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))

0 comments on commit 90d4a86

Please sign in to comment.