Skip to content

Commit

Permalink
Merge pull request #74 from lsst/tickets/DM-46307
Browse files Browse the repository at this point in the history
DM-46307: execute butler housekeeping scripts at remote DF for multisite processing
  • Loading branch information
mxk62 authored Dec 10, 2024
2 parents 77cd517 + a0023bb commit 28d841b
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 65 deletions.
4 changes: 2 additions & 2 deletions config/bps_frdf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ computeCloud: EU
computeSite: CC-IN2P3
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///sps/lsst/users/lsstgrid/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///sps/lsst/users/lsstgrid/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
Expand Down
6 changes: 1 addition & 5 deletions config/bps_panda_DF.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ project: dev
campaign: quick
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "${LSST_RUN_TEMP_SPACE}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
butlerConfig: panda-test-med-1
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# Job environment setup
custom_lsst_setup: ""
Expand Down
31 changes: 31 additions & 0 deletions config/bps_panda_cmd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
payloadCommand: >
cd {jobInitDir};
ls -al;
{setupLSSTEnv}
if [[ ! -z \"\${PANDA_AUTH_DIR}\" ]] && [[ ! -z \"\${PANDA_AUTH_ORIGIN}\" ]];
then export PANDA_AUTH_ID_TOKEN=\$(cat $PANDA_AUTH_DIR);
export PANDA_AUTH_VO=\$PANDA_AUTH_ORIGIN;
export IDDS_OIDC_TOKEN=\$(cat \$PANDA_AUTH_DIR);
export IDDS_VO=\$PANDA_AUTH_ORIGIN;
export PANDA_AUTH=oidc;
else unset PANDA_AUTH;
export IDDS_AUTH_TYPE=x509_proxy; fi;
export PANDA_CONFIG_ROOT=\$(pwd);
export PANDA_VERIFY_HOST=off;
export PANDA_SYS=\$CONDA_PREFIX;
export PANDA_URL_SSL=\${PANDA_SERVER_URL}/server/panda;
export PANDACACHE_URL=\$PANDA_URL_SSL;
export PANDA_URL=\$PANDA_URL_SSL;
export PANDA_BEHIND_REAL_LB=true;
pwd;
export RUBIN_ES_MAP_FILE=orderIdMapFilename;
python3 \${CTRL_BPS_PANDA_DIR}/python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder.py _cmd_line_ & pJob=\$!;
prmon -i 5
-f ${logDir}/memory_monitor_output.txt
-j ${logDir}/memory_monitor_summary.json
-p \$pJob & mJob=\$!;
wait \$pJob;
ret=\$?;
wait \$mJob;
{jobCleanup}
exit \$ret;
4 changes: 2 additions & 2 deletions config/bps_ukdf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ computeCloud: EU
computeSite: LANCS
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///cephfs/pool/rubin/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///cephfs/pool/rubin/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
Expand Down
4 changes: 2 additions & 2 deletions config/bps_usdf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ computeCloud: US
computeSite: SLAC
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///sdf/data/rubin/panda_jobs/panda_cache/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///sdf/data/rubin/panda_jobs/panda_cache/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-46307.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The changes were made to allow the plugin to execute jobs remotely without the necessity to use remote build approach.
59 changes: 53 additions & 6 deletions doc/lsst.ctrl.bps.panda/userguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Defining a submission
BPS configuration files are YAML files with some reserved keywords and some
special features. See `BPS configuration file`__ for details.

.. Describe any plugin specific ascpects of a definiing a submissinon below if
.. Describe any plugin specific aspects of a definiing a submission below if
any.
The memory autoscaling is *not* supported supported by the ``ctrl_bps_panda``, i.e.,
Expand All @@ -56,7 +56,7 @@ will have not effect on workflows submitted with this plugin.
Authenticating
--------------

.. Describe any plugin specific ascpects of a authentication below if any.
.. Describe any plugin specific aspects of a authentication below if any.
See https://panda.lsst.io for details.

Expand All @@ -67,18 +67,64 @@ Submitting a run

See `bps submit`_ and https://panda.lsst.io for details.

.. Describe any plugin specific ascpects of a submissinon below if any.
.. Describe any plugin specific aspects of a submission below if any.
.. __: https://pipelines.lsst.io/v/weekly/modules/lsst.ctrl.bps/quickstart.html#submitting-a-run

.. _panda-plugin-submitting-custom-script:

Submitting a custom script
--------------------------

See `bps submitcmd`_ for details.

.. Describe any plugin specific aspects of a submission below if any.
To execute custom scripts you need to specify the version of the LSST Stack to
use and include the settings from
``${CTRL_BPS_PANDA_DIR}/config/bps_panda_DF.yaml`` and
``${CTRL_BPS_PANDA_DIR}/config/bps_panda_cmd.yaml`` in your BPS config.

.. code-block:: yaml
LSST_VERSION: <stack_ver>
includeConfigs:
- ${CTRL_BPS_PANDA_DIR}/config/bps_panda_DF.yaml
- ${CTRL_BPS_PANDA_DIR}/config/bps_panda_cmd.yaml
customJob:
executable: <executable>
arguments: <args>
where ``<stack_ver>`` is the version of the LSST Stack to use while ``<executable>``
and ``<args>`` are respectively the script to run and arguments it takes.

Be default, the script will be executed at USDF (SLAC). If you would like your
script to be executed at FrDF (CC-IN2P3), set ``computeSite`` and
``computeCloud`` in your BPS config to the values shown below:

.. code-block:: yaml
computeSite: "CC-IN2P3"
computeCloud: "EU"
To execute the script in UKDF, set ``computeSite`` to ``LANCS``.

.. note::

Alternatively, you can include ``bps_panda_frdf.yaml`` or
``bps_panda_ukdf.yaml`` instead of the ``bps_panda_DF.yaml`` which will set
the right ``computeSite`` (and ``computeCloud``) for you.

.. _panda-plugin-status:

Checking status
---------------

`bps report`_ is *not* supported, use the WMS commands/tools directly.

.. Describe any plugin specific ascpects of a checking submission status below
.. Describe any plugin specific aspects of a checking submission status below
if any.
.. _panda-plugin-cancelling:
Expand All @@ -88,7 +134,7 @@ Canceling submitted jobs

`bps cancel`_ is *not* supported, use the WMS commands/tools directly.

.. Describe any plugin specific ascpects of a canceling submitted jobs below
.. Describe any plugin specific aspects of a canceling submitted jobs below
if any.
.. _panda-plugin-restarting:
Expand All @@ -98,7 +144,7 @@ Restarting a failed run

`bps restart`_ is *not* supported, use the WMS commands/tools directly.

.. Describe any plugin specific ascpects of restarting a failed jobs below
.. Describe any plugin specific aspects of restarting a failed jobs below
if any.
.. .. _panda-plugin-troubleshooting:
Expand All @@ -111,4 +157,5 @@ Restarting a failed run
.. _bps report: https://pipelines.lsst.io/v/weekly/modules/lsst.ctrl.bps/quickstart.html#checking-status
.. _bps restart: https://pipelines.lsst.io/v/weekly/modules/lsst.ctrl.bps/quickstart.html#restarting-a-failed-run
.. _bps submit: https://pipelines.lsst.io/v/weekly/modules/lsst.ctrl.bps/quickstart.html#submitting-a-run
.. _bps submitcmd: https://pipelines.lsst.io/v/weekly/modules/lsst.ctrl.bps/quickstart.html#submitting-a-custom-script
.. _ctrl_bps: https://github.com/lsst/ctrl_bps.git
42 changes: 1 addition & 41 deletions python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
import logging
import os
import sys
import tarfile

from lsst.ctrl.bps.constants import DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT
from lsst.ctrl.bps.drivers import prepare_driver
from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS
from lsst.ctrl.bps.panda.utils import copy_files_for_distribution, get_idds_client
from lsst.ctrl.bps.panda.utils import copy_files_for_distribution, download_extract_archive, get_idds_client
from lsst.resources import ResourcePath
from lsst.utils.timer import time_this

Expand All @@ -29,45 +28,6 @@
_LOG = logging.getLogger(__name__)


def download_extract_archive(filename):
"""Download and extract the tarball from pandacache.
Parameters
----------
filename : `str`
The filename to download.
"""
archive_basename = os.path.basename(filename)
target_dir = os.getcwd()
full_output_filename = os.path.join(target_dir, archive_basename)

if filename.startswith("https:"):
panda_cache_url = os.path.dirname(os.path.dirname(filename))
os.environ["PANDACACHE_URL"] = panda_cache_url
elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ:
os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
panda_cache_url = os.environ.get("PANDACACHE_URL", None)
print(f"PANDACACHE_URL: {panda_cache_url}")

from pandaclient import Client

attempt = 0
max_attempts = 3
done = False
while attempt < max_attempts and not done:
status, output = Client.getFile(archive_basename, output_path=full_output_filename)
if status == 0:
done = True
print(f"Download archive file from pandacache status: {status}, output: {output}")
if status != 0:
raise RuntimeError("Failed to download archive file from pandacache")
with tarfile.open(full_output_filename, "r:gz") as f:
f.extractall(target_dir)
print(f"Extract {full_output_filename} to {target_dir}")
os.remove(full_output_filename)
print(f"Remove {full_output_filename}")


def create_idds_workflow(config_file, compute_site):
"""Create pipeline workflow at remote site.
Expand Down
9 changes: 9 additions & 0 deletions python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import re
import sys

from lsst.ctrl.bps.panda.utils import download_extract_archive
from lsst.resources import ResourcePath


Expand Down Expand Up @@ -139,6 +140,14 @@ def deliver_input_files(src_path, files, skip_copy):
"""
files = files.split("+")
src_uri = ResourcePath(src_path, forceDirectory=True)

if "jobO" in skip_copy:
download_extract_archive(skip_copy)
for script in files:
file_name_placeholder, file_pfn = script.split(":")
os.chmod(file_pfn, 0o755)
return

for file in files:
file_name_placeholder, file_pfn = file.split(":")
if file_name_placeholder not in skip_copy.split("+"):
Expand Down
19 changes: 13 additions & 6 deletions python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,23 @@ def submit(self, workflow, **kwargs):
_LOG.info("Submitted into iDDs with request id=%s", request_id)
idds_build_workflow.run_id = request_id
return idds_build_workflow

else:
_, max_copy_workers = self.config.search(
"maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS}
)
# Docstring inherited from BaseWmsService.submit.
file_distribution_uri = self.config["fileDistributionEndPoint"]
lsst_temp = "LSST_RUN_TEMP_SPACE"
if lsst_temp in file_distribution_uri and lsst_temp not in os.environ:
file_distribution_uri = self.config["fileDistributionEndPointDefault"]
copy_files_for_distribution(
workflow.files_to_pre_stage,
ResourcePath(file_distribution_uri, forceDirectory=True),
max_copy_workers,
)

submit_cmd = workflow.run_attrs.get("bps_iscustom", False)
if not submit_cmd:
copy_files_for_distribution(
workflow.files_to_pre_stage,
ResourcePath(file_distribution_uri, forceDirectory=True),
max_copy_workers,
)

idds_client = get_idds_client(self.config)
ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False)
Expand Down Expand Up @@ -370,12 +373,16 @@ def __init__(self, name, config=None):
super().__init__(name, config)
self.files_to_pre_stage = {} # src, dest
self.idds_client_workflow = IDDS_client_workflow(name=name)
self.run_attrs = {}

@classmethod
def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_class):
# Docstring inherited from BaseWmsWorkflow.from_generic_workflow.
wms_workflow = cls(generic_workflow.name, config)

if generic_workflow.run_attrs:
wms_workflow.run_attrs.update(generic_workflow.run_attrs)

files, dag_sink_work, task_count = add_idds_work(
config, generic_workflow, wms_workflow.idds_client_workflow
)
Expand Down
Loading

0 comments on commit 28d841b

Please sign in to comment.