Skip to content

Commit

Permalink
create a build task for remote submission
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyu committed Oct 12, 2023
1 parent 84a8fe0 commit 13238bf
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 23 deletions.
50 changes: 50 additions & 0 deletions config/bps_frdf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# BPS PanDA configuration for FrDF

includeConfigs:
- ${CTRL_BPS_PANDA_DIR}/config/bps_panda.yaml

project: dev
campaign: quick
computeCloud: EU
computeSite: CC-IN2P3
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///sps/lsst/users/lsstgrid/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
butlerConfig: panda-test-med-1

# Job environment setup
custom_lsst_setup: ""
setupLSSTEnv: >
unset PYTHONPATH;
source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash;
setup lsst_distrib;
{custom_lsst_setup}
# Other job variables
jobInitDir: "`pwd`"
jobLogDir: "{jobInitDir}"
jobContainer: >
/bin/bash -c "{payloadCommand}" >&2;
jobCleanup: "rm -fr EXEC_REPO-*;"


# Specify memory request for executionButler, pipetaskInit and forcedPhotCoadd, placeholder for now
requestMemory: 2048 # PanDA does the scheduling based on memory request
executionButler:
requestMemory: 7000
queue: "CC-IN2P3_Rubin_Merge"

finalJob:
requestMemory: 7000
queue: "CC-IN2P3_Rubin_Merge"

pipetask:
pipetaskInit:
requestMemory: 4000

forcedPhotCoadd:
requestMemory: 4000
49 changes: 49 additions & 0 deletions config/bps_panda_DF.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# PanDA site common configuration for all Data Facilities
# --compute-site is necessary to use this yaml for bps submit

includeConfigs:
- ${CTRL_BPS_PANDA_DIR}/config/bps_panda.yaml

project: dev
campaign: quick
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///sps/lsst/users/lsstgrid/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
butlerConfig: panda-test-med-1

# Job environment setup
custom_lsst_setup: ""
setupLSSTEnv: >
unset PYTHONPATH;
source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash;
setup lsst_distrib;
{custom_lsst_setup}
# Other job variables
jobInitDir: "`pwd`"
jobLogDir: "{jobInitDir}"
jobContainer: >
/bin/bash -c "{payloadCommand}" >&2;
jobCleanup: "rm -fr EXEC_REPO-*;"


# Specify memory request for executionButler, pipetaskInit and forcedPhotCoadd, placeholder for now
requestMemory: 2048 # PanDA does the scheduling based on memory request
executionButler:
requestMemory: 7000
queue: "Rubin_Merge"

finalJob:
requestMemory: 7000
queue: "Rubin_Merge"

pipetask:
pipetaskInit:
requestMemory: 4000

forcedPhotCoadd:
requestMemory: 4000
28 changes: 28 additions & 0 deletions config/bps_remote.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
remoteBuild:
enabled: true
requestMemory: 4096
runnerCommand: >
export SHELL=/bin/bash;
unset PYTHONPATH;
source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash;
pwd;ls -al;
setup lsst_distrib;
echo "setup tokens";
if [[ ! -z "${PANDA_AUTH_DIR}" ]] && [[ ! -z "${PANDA_AUTH_ORIGIN}" ]];
then export PANDA_AUTH_ID_TOKEN=$(cat $PANDA_AUTH_DIR);
export PANDA_AUTH_VO=$PANDA_AUTH_ORIGIN;
export IDDS_OIDC_TOKEN=$(cat $PANDA_AUTH_DIR);
export IDDS_VO=$PANDA_AUTH_ORIGIN;
export PANDA_AUTH=oidc;
else unset PANDA_AUTH;
export IDDS_AUTH_TYPE=x509_proxy; fi;
export PANDA_CONFIG_ROOT=$(pwd);
export PANDA_VERIFY_HOST=off;
export PANDA_SYS=$CONDA_PREFIX;
export PANDA_URL_SSL=${PANDA_SERVER_URL}/server/panda;
export PANDACACHE_URL=$PANDA_URL_SSL;
export PANDA_URL=$PANDA_URL_SSL;
export PANDA_BEHIND_REAL_LB=true;
{custom_lsst_setup}
python3 ${CTRL_BPS_PANDA_DIR}/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py \
_download_cmd_line_ _build_cmd_line_ _compute_site_
51 changes: 51 additions & 0 deletions config/bps_ukdf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# BPS PanDA configuration for UKDF

includeConfigs:
- ${CTRL_BPS_PANDA_DIR}/config/bps_panda.yaml

project: dev
campaign: quick
computeCloud: EU
computeSite: LANCS
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///cephfs/pool/rubin/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
butlerConfig: panda-test-med-1

# Job environment setup
custom_lsst_setup: ""
setupLSSTEnv: >
export SHELL=/bin/bash;
unset PYTHONPATH;
source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash;
setup lsst_distrib;
{custom_lsst_setup}
# Other job variables
jobInitDir: "`pwd`"
jobLogDir: "{jobInitDir}"
jobContainer: >
/bin/bash -c "{payloadCommand}" >&2;
jobCleanup: "rm -fr EXEC_REPO-*;"


# Specify memory request for executionButler, pipetaskInit and forcedPhotCoadd, placeholder for now
requestMemory: 2048 # PanDA does the scheduling based on memory request
executionButler:
requestMemory: 7000
queue: "LANCS_Rubin_Merge"

finalJob:
requestMemory: 7000
queue: "LANCS_Rubin_Merge"

pipetask:
pipetaskInit:
requestMemory: 4000

forcedPhotCoadd:
requestMemory: 4000
12 changes: 11 additions & 1 deletion python/lsst/ctrl/bps/panda/cmd_line_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,17 @@ def substitute_command_line(self, cmd_line, lazy_vars, job_name, gwfiles):
cmd_line = self.replace_static_files(cmd_line, gwfiles)
file_name = job_name + self.attach_pseudo_file_params(actual_lazy_vars)

if len(file_name) > PANDA_MAX_LEN_INPUT_FILE:
max_char = "IDDS_MAX_NAME_LENGTH"
panda_url = "PANDA_URL"
is_usdf = False
PANDA_MAX_DB_VCHAR = PANDA_MAX_LEN_INPUT_FILE
if panda_url in os.environ:
PANDA_instance = os.environ[panda_url]

Check warning on line 199 in python/lsst/ctrl/bps/panda/cmd_line_embedder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/cmd_line_embedder.py#L199

Added line #L199 was not covered by tests
if "doma" not in PANDA_instance:
is_usdf = True

Check warning on line 201 in python/lsst/ctrl/bps/panda/cmd_line_embedder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/cmd_line_embedder.py#L201

Added line #L201 was not covered by tests
if max_char in os.environ and is_usdf:
PANDA_MAX_DB_VCHAR = int(os.environ[max_char])

Check warning on line 203 in python/lsst/ctrl/bps/panda/cmd_line_embedder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/cmd_line_embedder.py#L203

Added line #L203 was not covered by tests
if len(file_name) > PANDA_MAX_DB_VCHAR:
_LOG.error(f"Too long pseudo input filename: {file_name}")
raise RuntimeError(
f"job pseudo input file name contains more than {PANDA_MAX_LEN_INPUT_FILE} symbols. Aborting."
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/bps/panda/conf_example/test_usdf.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# An example bps submission yaml
# Need to setup USDF before submitting the yaml
# source setup_panda.sh w_2022_32
# source setup_lsst_panda.sh w_2022_32

LSST_VERSION: w_2022_32

Expand Down
133 changes: 133 additions & 0 deletions python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#!/usr/bin/python
"""
The module is needed to decode the command line string sent from the BPS
plugin -> PanDA -> Edge node cluster management
-> Edge node -> Container. This file is not a part
of the BPS but a part of the payload wrapper.
It decodes the hexified command line.
"""
# import base64
import datetime
import logging
import os
import sys
import tarfile

from lsst.ctrl.bps.bps_utils import _create_execution_butler
from lsst.ctrl.bps.constants import DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT
from lsst.ctrl.bps.drivers import prepare_driver
from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS
from lsst.ctrl.bps.panda.utils import copy_files_for_distribution, get_idds_client
from lsst.utils.timer import time_this

logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s",
)

_LOG = logging.getLogger(__name__)


def download_extract_archive(filename):
"""Download and extract the tarball from pandacache"""
archive_basename = os.path.basename(filename)
target_dir = os.getcwd()
full_output_filename = os.path.join(target_dir, archive_basename)

if filename.startswith("https:"):
panda_cache_url = os.path.dirname(os.path.dirname(filename))
os.environ["PANDACACHE_URL"] = panda_cache_url
elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ:
os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
print("PANDACACHE_URL: %s" % os.environ.get("PANDACACHE_URL", None))

from pandaclient import Client

attempt = 0
max_attempts = 3
done = False
while attempt < max_attempts and not done:
status, output = Client.getFile(archive_basename, output_path=full_output_filename)
if status == 0:
done = True
print(f"Download archive file from pandacache status: {status}, output: {output}")
if status != 0:
raise RuntimeError("Failed to download archive file from pandacache")
with tarfile.open(full_output_filename, "r:gz") as f:
f.extractall(target_dir)
print(f"Extract {full_output_filename} to {target_dir}")
os.remove(full_output_filename)
print("Remove %s" % full_output_filename)


def create_idds_workflow(config_file, compute_site):
"""Create pipeline workflow at remote site"""
_LOG.info("Starting building process")
kwargs = {}
if compute_site:
kwargs["compute_site"] = compute_site
with time_this(
log=_LOG,
level=logging.INFO,
prefix=None,
msg="Completed entire submission process",
mem_usage=True,
mem_unit=DEFAULT_MEM_UNIT,
mem_fmt=DEFAULT_MEM_FMT,
):
wms_workflow_config, wms_workflow = prepare_driver(config_file, **kwargs)
_, when_create = wms_workflow_config.search(".executionButler.whenCreate")
if when_create.upper() == "SUBMIT":
_, execution_butler_dir = wms_workflow_config.search(".bps_defined.executionButlerDir")
_LOG.info("Creating execution butler in '%s'", execution_butler_dir)
with time_this(
log=_LOG, level=logging.INFO, prefix=None, msg="Completed creating execution butler"
):
_create_execution_butler(
wms_workflow_config,
wms_workflow_config["runQgraphFile"],
execution_butler_dir,
wms_workflow_config["submitPath"],
)
return wms_workflow_config, wms_workflow


# download the submission tarball
remote_filename = sys.argv[1]
download_extract_archive(remote_filename)

# request_id and signature are added by iDDS for build task
request_id = os.environ.get("IDDS_BUILD_REQUEST_ID", None)
signature = os.environ.get("IDDS_BUIL_SIGNATURE", None)
config_file = sys.argv[2]
sys_argv_length = len(sys.argv)
compute_site = sys.argv[3] if sys_argv_length > 3 else None

if request_id is None:
print("IDDS_BUILD_REQUEST_ID is not defined.")
sys.exit(-1)
if signature is None:
print("IDDS_BUIL_SIGNATURE is not defined")
sys.exit(-1)

print(f"INFO: start {datetime.datetime.utcnow()}")
print(f"INFO: config file: {config_file}")
print(f"INFO: compute site: {compute_site}")

current_dir = os.getcwd()

print("INFO: current dir: %s" % current_dir)

config, bps_workflow = create_idds_workflow(config_file, compute_site)
idds_workflow = bps_workflow.idds_client_workflow

_, max_copy_workers = config.search("maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS})
copy_files_for_distribution(
bps_workflow.files_to_pre_stage, config["fileDistributionEndPoint"], max_copy_workers
)

idds_client = get_idds_client(config)
ret = idds_client.update_build_request(request_id, signature, idds_workflow)
print("update_build_request returns: %s" % str(ret))
sys.exit(ret[0])
Loading

0 comments on commit 13238bf

Please sign in to comment.