Skip to content

Commit

Permalink
Accept multi-token interchange launch commands (#3543)
Browse files Browse the repository at this point in the history
  • Loading branch information
rjmello authored Jul 30, 2024
1 parent 1652304 commit 64e163c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
12 changes: 6 additions & 6 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"--mpi-launcher={mpi_launcher} "
"--available-accelerators {accelerators}")

DEFAULT_INTERCHANGE_LAUNCH_CMD = "interchange.py"
DEFAULT_INTERCHANGE_LAUNCH_CMD = ["interchange.py"]

GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider`
Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`,
Expand All @@ -78,9 +78,9 @@
cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example:
launch_cmd="process_worker_pool.py {debug} -c {cores_per_worker} --task_url={task_url} --result_url={result_url}"
interchange_launch_cmd : str
Custom command line string to launch the interchange process from the executor. If undefined,
the executor will use the default "interchange.py" command.
interchange_launch_cmd : Sequence[str]
Custom sequence of command line tokens to launch the interchange process from the executor. If
undefined, the executor will use the default "interchange.py" command.
address : string
An address to connect to the main Parsl process which is reachable from the network in which
Expand Down Expand Up @@ -238,7 +238,7 @@ def __init__(self,
label: str = 'HighThroughputExecutor',
provider: ExecutionProvider = LocalProvider(),
launch_cmd: Optional[str] = None,
interchange_launch_cmd: Optional[str] = None,
interchange_launch_cmd: Optional[Sequence[str]] = None,
address: Optional[str] = None,
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Optional[Tuple[int, int]] = (54000, 55000),
Expand Down Expand Up @@ -548,7 +548,7 @@ def _start_local_interchange_process(self) -> None:

config_pickle = pickle.dumps(interchange_config)

self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd.encode("utf-8"), stdin=subprocess.PIPE)
self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd, stdin=subprocess.PIPE)
stdin = self.interchange_proc.stdin
assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode"

Expand Down
31 changes: 20 additions & 11 deletions parsl/tests/test_htex/test_htex.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pathlib
import warnings
from subprocess import Popen, TimeoutExpired
from typing import Optional, Sequence
from unittest import mock

import pytest
Expand Down Expand Up @@ -139,13 +139,22 @@ def test_max_workers_per_node():


@pytest.mark.local
def test_htex_launch_cmd():
htex = HighThroughputExecutor()
assert htex.launch_cmd.startswith("process_worker_pool.py")
assert htex.interchange_launch_cmd == "interchange.py"

launch_cmd = "custom-launch-cmd"
ix_launch_cmd = "custom-ix-launch-cmd"
htex = HighThroughputExecutor(launch_cmd=launch_cmd, interchange_launch_cmd=ix_launch_cmd)
assert htex.launch_cmd == launch_cmd
assert htex.interchange_launch_cmd == ix_launch_cmd
@pytest.mark.parametrize("cmd", (None, "custom-launch-cmd"))
def test_htex_worker_pool_launch_cmd(cmd: Optional[str]):
if cmd:
htex = HighThroughputExecutor(launch_cmd=cmd)
assert htex.launch_cmd == cmd
else:
htex = HighThroughputExecutor()
assert htex.launch_cmd.startswith("process_worker_pool.py")


@pytest.mark.local
@pytest.mark.parametrize("cmd", (None, ["custom", "launch", "cmd"]))
def test_htex_interchange_launch_cmd(cmd: Optional[Sequence[str]]):
if cmd:
htex = HighThroughputExecutor(interchange_launch_cmd=cmd)
assert htex.interchange_launch_cmd == cmd
else:
htex = HighThroughputExecutor()
assert htex.interchange_launch_cmd == ["interchange.py"]

0 comments on commit 64e163c

Please sign in to comment.