Skip to content

Commit

Permalink
create a build task for remote submission
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyu committed Oct 4, 2023
1 parent cb68d0d commit 27c9ea1
Show file tree
Hide file tree
Showing 6 changed files with 400 additions and 24 deletions.
50 changes: 50 additions & 0 deletions config/bps_frdf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# This yaml evolves with the progress of the PanDA deployment

includeConfigs:
- ${CTRL_BPS_PANDA_DIR}/config/bps_panda.yaml

project: dev
campaign: quick
computeCloud: EU
computeSite: CC-IN2P3
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///sps/lsst/users/lsstgrid/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
butlerConfig: panda-test-med-1

# Job environment setup
custom_lsst_setup: ""
setupLSSTEnv: >
unset PYTHONPATH;
source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash;
setup lsst_distrib;
{custom_lsst_setup}
# Other job variables
jobInitDir: "`pwd`"
jobLogDir: "{jobInitDir}"
jobContainer: >
/bin/bash -c "{payloadCommand}" >&2;
jobCleanup: "rm -fr EXEC_REPO-*;"


# Specify memory request for executionButler, pipetaskInit and forcedPhotCoadd, placeholder for now
requestMemory: 2048 # PanDA does the scheduling based on memory request
executionButler:
requestMemory: 7000
queue: "CC-IN2P3_Rubin_Merge"

finalJob:
requestMemory: 7000
queue: "CC-IN2P3_Rubin_Merge"

pipetask:
pipetaskInit:
requestMemory: 4000

forcedPhotCoadd:
requestMemory: 4000
27 changes: 27 additions & 0 deletions config/bps_remote.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
remoteBuild:
enabled: true
requestMemory: 4096
runnerCommand: >
export SHELL=/bin/bash;
unset PYTHONPATH;
source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash;
pwd;ls -al;
setup lsst_distrib;
echo "setup tokens";
if [[ ! -z "${PANDA_AUTH_DIR}" ]] && [[ ! -z "${PANDA_AUTH_ORIGIN}" ]];
then export PANDA_AUTH_ID_TOKEN=$(cat $PANDA_AUTH_DIR);
export PANDA_AUTH_VO=$PANDA_AUTH_ORIGIN;
export IDDS_OIDC_TOKEN=$(cat $PANDA_AUTH_DIR);
export IDDS_VO=$PANDA_AUTH_ORIGIN;
export PANDA_AUTH=oidc;
else unset PANDA_AUTH;
export IDDS_AUTH_TYPE=x509_proxy; fi;
export PANDA_CONFIG_ROOT=$(pwd);
export PANDA_VERIFY_HOST=off;
export PANDA_SYS=$CONDA_PREFIX;
export PANDA_URL_SSL=${PANDA_SERVER_URL}/server/panda;
export PANDACACHE_URL=$PANDA_URL_SSL;
export PANDA_URL=$PANDA_URL_SSL;
export PANDA_BEHIND_REAL_LB=true;
{custom_lsst_setup}
python3 ${CTRL_BPS_PANDA_DIR}/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py _download_cmd_line_ _build_cmd_line_
51 changes: 51 additions & 0 deletions config/bps_ukdf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# This yaml evolves with the progress of the PanDA deployment

includeConfigs:
- ${CTRL_BPS_PANDA_DIR}/config/bps_panda.yaml

project: dev
campaign: quick
computeCloud: EU
computeSite: LANCS
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPointDefault: "file:///cephfs/pool/rubin/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
butlerConfig: panda-test-med-1

# Job environment setup
custom_lsst_setup: ""
setupLSSTEnv: >
export SHELL=/bin/bash;
unset PYTHONPATH;
source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash;
setup lsst_distrib;
{custom_lsst_setup}
# Other job variables
jobInitDir: "`pwd`"
jobLogDir: "{jobInitDir}"
jobContainer: >
/bin/bash -c "{payloadCommand}" >&2;
jobCleanup: "rm -fr EXEC_REPO-*;"


# Specify memory request for executionButler, pipetaskInit and forcedPhotCoadd, placeholder for now
requestMemory: 2048 # PanDA does the scheduling based on memory request
executionButler:
requestMemory: 7000
queue: "LANCS_Rubin_Merge"

finalJob:
requestMemory: 7000
queue: "LANCS_Rubin_Merge"

pipetask:
pipetaskInit:
requestMemory: 4000

forcedPhotCoadd:
requestMemory: 4000
127 changes: 127 additions & 0 deletions python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#!/usr/bin/python
"""
The module is needed to decode the command line string sent from the BPS
plugin -> PanDA -> Edge node cluster management
-> Edge node -> Container. This file is not a part
of the BPS but a part of the payload wrapper.
It decodes the hexified command line.
"""
# import base64
import datetime
import logging
import os
import sys
import tarfile

from lsst.ctrl.bps.bps_utils import _create_execution_butler
from lsst.ctrl.bps.constants import DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT
from lsst.ctrl.bps.drivers import prepare_driver
from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS
from lsst.ctrl.bps.panda.utils import copy_files_for_distribution, get_idds_client
from lsst.utils.timer import time_this

logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s",
)

_LOG = logging.getLogger(__name__)


def download_extract_archive(filename):
"""Download and extract the tarball from pandacache"""
archive_basename = os.path.basename(filename)
target_dir = os.getcwd()
full_output_filename = os.path.join(target_dir, archive_basename)

if filename.startswith("https:"):
panda_cache_url = os.path.dirname(os.path.dirname(filename))
os.environ["PANDACACHE_URL"] = panda_cache_url
elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ:
os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
print("PANDACACHE_URL: %s" % os.environ.get("PANDACACHE_URL", None))

from pandaclient import Client

attempt = 0
max_attempts = 3
done = False
while attempt < max_attempts and not done:
status, output = Client.getFile(archive_basename, output_path=full_output_filename)
if status = 0:

Check failure on line 52 in python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E999)

python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py:52:19: E999 SyntaxError: Unexpected token '='

Check failure on line 52 in python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E999

SyntaxError: invalid syntax. Maybe you meant '==' or ':=' instead of '='?
done = True
print(f"Download archive file from pandacache status: {status}, output: {output}")
if status != 0:
raise RuntimeError("Failed to download archive file from pandacache")
with tarfile.open(full_output_filename, "r:gz") as f:
f.extractall(target_dir)
print(f"Extract {full_output_filename} to {target_dir}")
os.remove(full_output_filename)
print("Remove %s" % full_output_filename)


def create_idds_workflow(config_file):
"""Create pipeline workflow at remote site"""
_LOG.info("Starting building process")
with time_this(
log=_LOG,
level=logging.INFO,
prefix=None,
msg="Completed entire submission process",
mem_usage=True,
mem_unit=DEFAULT_MEM_UNIT,
mem_fmt=DEFAULT_MEM_FMT,
):
wms_workflow_config, wms_workflow = prepare_driver(config_file)
_, when_create = wms_workflow_config.search(".executionButler.whenCreate")
if when_create.upper() == "SUBMIT":
_, execution_butler_dir = wms_workflow_config.search(".bps_defined.executionButlerDir")
_LOG.info("Creating execution butler in '%s'", execution_butler_dir)
with time_this(
log=_LOG, level=logging.INFO, prefix=None, msg="Completed creating execution butler"
):
_create_execution_butler(
wms_workflow_config,
wms_workflow_config["runQgraphFile"],
execution_butler_dir,
wms_workflow_config["submitPath"],
)
return wms_workflow_config, wms_workflow


# download the submission tarball
remote_filename = sys.argv[1]
download_extract_archive(remote_filename)

# request_id and signature are added by iDDS for build task
request_id = os.environ.get("IDDS_BUILD_REQUEST_ID", None)
signature = os.environ.get("IDDS_BUIL_SIGNATURE", None)
config_file = sys.argv[2]

if request_id is None:
print("IDDS_BUILD_REQUEST_ID is not defined.")
sys.exit(-1)
if signature is None:
print("IDDS_BUIL_SIGNATURE is not defined")
sys.exit(-1)

print(f"INFO: start {datetime.datetime.utcnow()}")
print(f"INFO: config file: {config_file}")

current_dir = os.getcwd()

print("INFO: current dir: %s" % current_dir)

config, bps_workflow = create_idds_workflow(config_file)
idds_workflow = bps_workflow.idds_client_workflow

_, max_copy_workers = config.search("maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS})
copy_files_for_distribution(
bps_workflow.files_to_pre_stage, config["fileDistributionEndPoint"], max_copy_workers
)

idds_client = get_idds_client(config)
ret = idds_client.update_build_request(request_id, signature, idds_workflow)
print("update_build_request returns: %s" % str(ret))
sys.exit(ret[0])
67 changes: 43 additions & 24 deletions python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
add_final_idds_work,
add_idds_work,
copy_files_for_distribution,
create_idds_build_workflow,
get_idds_client,
get_idds_result,
)
Expand All @@ -63,31 +64,49 @@ def prepare(self, config, generic_workflow, out_prefix=None):
workflow.write(out_prefix)
return workflow

def submit(self, workflow):
_, max_copy_workers = self.config.search(
"maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS}
)
# Docstring inherited from BaseWmsService.submit.
file_distribution_uri = self.config["fileDistributionEndPoint"]
lsst_temp = "LSST_RUN_TEMP_SPACE"
if lsst_temp in file_distribution_uri and lsst_temp not in os.environ:
file_distribution_uri = self.config["fileDistributionEndPointDefault"]

copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers)

idds_client = get_idds_client(self.config)
ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False)
_LOG.debug("iDDS client manager submit returned = %s", ret)

# Check submission success
status, result, error = get_idds_result(ret)
if status:
request_id = int(result)
def submit(self, workflow, config=None, remote_build=None, config_file=None):
if config and remote_build:
_LOG.info("remote build")

Check warning on line 69 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L69

Added line #L69 was not covered by tests

idds_build_workflow = create_idds_build_workflow(config_file, config, remote_build)
idds_client = get_idds_client(self.config)
ret = idds_client.submit_build(idds_build_workflow, username=None, use_dataset_name=False)
_LOG.debug("iDDS client manager submit returned = %s", ret)

Check warning on line 74 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L71-L74

Added lines #L71 - L74 were not covered by tests

# Check submission success
status, result, error = get_idds_result(ret)

Check warning on line 77 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L77

Added line #L77 was not covered by tests
if status:
request_id = int(result)

Check warning on line 79 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L79

Added line #L79 was not covered by tests
else:
raise RuntimeError(f"Error submitting to PanDA service: {error}")

Check warning on line 81 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L81

Added line #L81 was not covered by tests

_LOG.info("Submitted into iDDs with request id=%s", request_id)
idds_build_workflow.run_id = request_id
return idds_build_workflow

Check warning on line 85 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L83-L85

Added lines #L83 - L85 were not covered by tests
else:
raise RuntimeError(f"Error submitting to PanDA service: {error}")

_LOG.info("Submitted into iDDs with request id=%s", request_id)
workflow.run_id = request_id
_, max_copy_workers = self.config.search(

Check warning on line 87 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L87

Added line #L87 was not covered by tests
"maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS}
)
# Docstring inherited from BaseWmsService.submit.
file_distribution_uri = self.config["fileDistributionEndPoint"]
lsst_temp = "LSST_RUN_TEMP_SPACE"

Check warning on line 92 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L91-L92

Added lines #L91 - L92 were not covered by tests
if lsst_temp in file_distribution_uri and lsst_temp not in os.environ:
file_distribution_uri = self.config["fileDistributionEndPointDefault"]
copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers)

Check warning on line 95 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L94-L95

Added lines #L94 - L95 were not covered by tests

idds_client = get_idds_client(self.config)
ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False)
_LOG.debug("iDDS client manager submit returned = %s", ret)

Check warning on line 99 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L97-L99

Added lines #L97 - L99 were not covered by tests

# Check submission success
status, result, error = get_idds_result(ret)

Check warning on line 102 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L102

Added line #L102 was not covered by tests
if status:
request_id = int(result)

Check warning on line 104 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L104

Added line #L104 was not covered by tests
else:
raise RuntimeError(f"Error submitting to PanDA service: {error}")

Check warning on line 106 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L106

Added line #L106 was not covered by tests

_LOG.info("Submitted into iDDs with request id=%s", request_id)
workflow.run_id = request_id

Check warning on line 109 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L108-L109

Added lines #L108 - L109 were not covered by tests

def restart(self, wms_workflow_id):
# Docstring inherited from BaseWmsService.restart.
Expand Down
Loading

0 comments on commit 27c9ea1

Please sign in to comment.