From 27c9ea1009507020ad56c7cbc74576ef676df054 Mon Sep 17 00:00:00 2001 From: zhaoyu Date: Mon, 3 Apr 2023 23:35:38 -0400 Subject: [PATCH] create a build task for remote submission --- config/bps_frdf.yaml | 50 +++++++ config/bps_remote.yaml | 27 ++++ config/bps_ukdf.yaml | 51 +++++++ .../panda/edgenode/build_cmd_line_decoder.py | 127 ++++++++++++++++++ python/lsst/ctrl/bps/panda/panda_service.py | 67 +++++---- python/lsst/ctrl/bps/panda/utils.py | 102 ++++++++++++++ 6 files changed, 400 insertions(+), 24 deletions(-) create mode 100644 config/bps_frdf.yaml create mode 100644 config/bps_remote.yaml create mode 100644 config/bps_ukdf.yaml create mode 100644 python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py diff --git a/config/bps_frdf.yaml b/config/bps_frdf.yaml new file mode 100644 index 0000000..1cdc4e7 --- /dev/null +++ b/config/bps_frdf.yaml @@ -0,0 +1,50 @@ +# This yaml evolves with the progress of the PanDA deployment + +includeConfigs: +- ${CTRL_BPS_PANDA_DIR}/config/bps_panda.yaml + +project: dev +campaign: quick +computeCloud: EU +computeSite: CC-IN2P3 +s3EndpointUrl: "https://storage.googleapis.com" +payloadFolder: payload +fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/" +fileDistributionEndPointDefault: "file:///sps/lsst/users/lsstgrid/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/" + +# location of main butler repo at USDF +payload: + butlerConfig: panda-test-med-1 + +# Job environment setup +custom_lsst_setup: "" +setupLSSTEnv: > + unset PYTHONPATH; + source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash; + setup lsst_distrib; + {custom_lsst_setup} + +# Other job variables +jobInitDir: "`pwd`" +jobLogDir: "{jobInitDir}" +jobContainer: > + /bin/bash -c "{payloadCommand}" >&2; +jobCleanup: "rm -fr EXEC_REPO-*;" + + +# Specify memory request for executionButler, pipetaskInit and forcedPhotCoadd, placeholder for now +requestMemory: 2048 # PanDA does the scheduling based on memory request +executionButler: + requestMemory: 7000 + queue: "CC-IN2P3_Rubin_Merge" + +finalJob: + requestMemory: 7000 + queue: "CC-IN2P3_Rubin_Merge" + +pipetask: + pipetaskInit: + requestMemory: 4000 + + forcedPhotCoadd: + requestMemory: 4000 diff --git a/config/bps_remote.yaml b/config/bps_remote.yaml new file mode 100644 index 0000000..a6df677 --- /dev/null +++ b/config/bps_remote.yaml @@ -0,0 +1,27 @@ +remoteBuild: + enabled: true + requestMemory: 4096 + runnerCommand: > + export SHELL=/bin/bash; + unset PYTHONPATH; + source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash; + pwd;ls -al; + setup lsst_distrib; + echo "setup tokens"; + if [[ ! -z "${PANDA_AUTH_DIR}" ]] && [[ ! -z "${PANDA_AUTH_ORIGIN}" ]]; + then export PANDA_AUTH_ID_TOKEN=$(cat $PANDA_AUTH_DIR); + export PANDA_AUTH_VO=$PANDA_AUTH_ORIGIN; + export IDDS_OIDC_TOKEN=$(cat $PANDA_AUTH_DIR); + export IDDS_VO=$PANDA_AUTH_ORIGIN; + export PANDA_AUTH=oidc; + else unset PANDA_AUTH; + export IDDS_AUTH_TYPE=x509_proxy; fi; + export PANDA_CONFIG_ROOT=$(pwd); + export PANDA_VERIFY_HOST=off; + export PANDA_SYS=$CONDA_PREFIX; + export PANDA_URL_SSL=${PANDA_SERVER_URL}/server/panda; + export PANDACACHE_URL=$PANDA_URL_SSL; + export PANDA_URL=$PANDA_URL_SSL; + export PANDA_BEHIND_REAL_LB=true; + {custom_lsst_setup} + python3 ${CTRL_BPS_PANDA_DIR}/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py _download_cmd_line_ _build_cmd_line_ diff --git a/config/bps_ukdf.yaml b/config/bps_ukdf.yaml new file mode 100644 index 0000000..f91b185 --- /dev/null +++ b/config/bps_ukdf.yaml @@ -0,0 +1,51 @@ +# This yaml evolves with the progress of the PanDA deployment + +includeConfigs: +- ${CTRL_BPS_PANDA_DIR}/config/bps_panda.yaml + +project: dev +campaign: quick +computeCloud: EU +computeSite: LANCS +s3EndpointUrl: "https://storage.googleapis.com" +payloadFolder: payload +fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/" +fileDistributionEndPointDefault: "file:///cephfs/pool/rubin/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/" + +# location of main butler repo at USDF +payload: + butlerConfig: panda-test-med-1 + +# Job environment setup +custom_lsst_setup: "" +setupLSSTEnv: > + export SHELL=/bin/bash; + unset PYTHONPATH; + source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/{LSST_VERSION}/loadLSST.bash; + setup lsst_distrib; + {custom_lsst_setup} + +# Other job variables +jobInitDir: "`pwd`" +jobLogDir: "{jobInitDir}" +jobContainer: > + /bin/bash -c "{payloadCommand}" >&2; +jobCleanup: "rm -fr EXEC_REPO-*;" + + +# Specify memory request for executionButler, pipetaskInit and forcedPhotCoadd, placeholder for now +requestMemory: 2048 # PanDA does the scheduling based on memory request +executionButler: + requestMemory: 7000 + queue: "LANCS_Rubin_Merge" + +finalJob: + requestMemory: 7000 + queue: "LANCS_Rubin_Merge" + +pipetask: + pipetaskInit: + requestMemory: 4000 + + forcedPhotCoadd: + requestMemory: 4000 diff --git a/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py b/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py new file mode 100644 index 0000000..164ac98 --- /dev/null +++ b/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py @@ -0,0 +1,127 @@ +#!/usr/bin/python +""" +The module is needed to decode the command line string sent from the BPS +plugin -> PanDA -> Edge node cluster management +-> Edge node -> Container. This file is not a part +of the BPS but a part of the payload wrapper. +It decodes the hexified command line. +""" +# import base64 +import datetime +import logging +import os +import sys +import tarfile + +from lsst.ctrl.bps.bps_utils import _create_execution_butler +from lsst.ctrl.bps.constants import DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT +from lsst.ctrl.bps.drivers import prepare_driver +from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS +from lsst.ctrl.bps.panda.utils import copy_files_for_distribution, get_idds_client +from lsst.utils.timer import time_this + +logging.basicConfig( + stream=sys.stdout, + level=logging.INFO, + format="%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s", +) + +_LOG = logging.getLogger(__name__) + + +def download_extract_archive(filename): + """Download and extract the tarball from pandacache""" + archive_basename = os.path.basename(filename) + target_dir = os.getcwd() + full_output_filename = os.path.join(target_dir, archive_basename) + + if filename.startswith("https:"): + panda_cache_url = os.path.dirname(os.path.dirname(filename)) + os.environ["PANDACACHE_URL"] = panda_cache_url + elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ: + os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"] + print("PANDACACHE_URL: %s" % os.environ.get("PANDACACHE_URL", None)) + + from pandaclient import Client + + attempt = 0 + max_attempts = 3 + done = False + while attempt < max_attempts and not done: + status, output = Client.getFile(archive_basename, output_path=full_output_filename) + if status = 0: + done = True + print(f"Download archive file from pandacache status: {status}, output: {output}") + if status != 0: + raise RuntimeError("Failed to download archive file from pandacache") + with tarfile.open(full_output_filename, "r:gz") as f: + f.extractall(target_dir) + print(f"Extract {full_output_filename} to {target_dir}") + os.remove(full_output_filename) + print("Remove %s" % full_output_filename) + + +def create_idds_workflow(config_file): + """Create pipeline workflow at remote site""" + _LOG.info("Starting building process") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Completed entire submission process", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + wms_workflow_config, wms_workflow = prepare_driver(config_file) + _, when_create = wms_workflow_config.search(".executionButler.whenCreate") + if when_create.upper() == "SUBMIT": + _, execution_butler_dir = wms_workflow_config.search(".bps_defined.executionButlerDir") + _LOG.info("Creating execution butler in '%s'", execution_butler_dir) + with time_this( + log=_LOG, level=logging.INFO, prefix=None, msg="Completed creating execution butler" + ): + _create_execution_butler( + wms_workflow_config, + wms_workflow_config["runQgraphFile"], + execution_butler_dir, + wms_workflow_config["submitPath"], + ) + return wms_workflow_config, wms_workflow + + +# download the submission tarball +remote_filename = sys.argv[1] +download_extract_archive(remote_filename) + +# request_id and signature are added by iDDS for build task +request_id = os.environ.get("IDDS_BUILD_REQUEST_ID", None) +signature = os.environ.get("IDDS_BUIL_SIGNATURE", None) +config_file = sys.argv[2] + +if request_id is None: + print("IDDS_BUILD_REQUEST_ID is not defined.") + sys.exit(-1) +if signature is None: + print("IDDS_BUIL_SIGNATURE is not defined") + sys.exit(-1) + +print(f"INFO: start {datetime.datetime.utcnow()}") +print(f"INFO: config file: {config_file}") + +current_dir = os.getcwd() + +print("INFO: current dir: %s" % current_dir) + +config, bps_workflow = create_idds_workflow(config_file) +idds_workflow = bps_workflow.idds_client_workflow + +_, max_copy_workers = config.search("maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS}) +copy_files_for_distribution( + bps_workflow.files_to_pre_stage, config["fileDistributionEndPoint"], max_copy_workers +) + +idds_client = get_idds_client(config) +ret = idds_client.update_build_request(request_id, signature, idds_workflow) +print("update_build_request returns: %s" % str(ret)) +sys.exit(ret[0]) diff --git a/python/lsst/ctrl/bps/panda/panda_service.py b/python/lsst/ctrl/bps/panda/panda_service.py index 18a71f8..a9668ee 100644 --- a/python/lsst/ctrl/bps/panda/panda_service.py +++ b/python/lsst/ctrl/bps/panda/panda_service.py @@ -44,6 +44,7 @@ add_final_idds_work, add_idds_work, copy_files_for_distribution, + create_idds_build_workflow, get_idds_client, get_idds_result, ) @@ -63,31 +64,49 @@ def prepare(self, config, generic_workflow, out_prefix=None): workflow.write(out_prefix) return workflow - def submit(self, workflow): - _, max_copy_workers = self.config.search( - "maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS} - ) - # Docstring inherited from BaseWmsService.submit. - file_distribution_uri = self.config["fileDistributionEndPoint"] - lsst_temp = "LSST_RUN_TEMP_SPACE" - if lsst_temp in file_distribution_uri and lsst_temp not in os.environ: - file_distribution_uri = self.config["fileDistributionEndPointDefault"] - - copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers) - - idds_client = get_idds_client(self.config) - ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False) - _LOG.debug("iDDS client manager submit returned = %s", ret) - - # Check submission success - status, result, error = get_idds_result(ret) - if status: - request_id = int(result) + def submit(self, workflow, config=None, remote_build=None, config_file=None): + if config and remote_build: + _LOG.info("remote build") + + idds_build_workflow = create_idds_build_workflow(config_file, config, remote_build) + idds_client = get_idds_client(self.config) + ret = idds_client.submit_build(idds_build_workflow, username=None, use_dataset_name=False) + _LOG.debug("iDDS client manager submit returned = %s", ret) + + # Check submission success + status, result, error = get_idds_result(ret) + if status: + request_id = int(result) + else: + raise RuntimeError(f"Error submitting to PanDA service: {error}") + + _LOG.info("Submitted into iDDs with request id=%s", request_id) + idds_build_workflow.run_id = request_id + return idds_build_workflow else: - raise RuntimeError(f"Error submitting to PanDA service: {error}") - - _LOG.info("Submitted into iDDs with request id=%s", request_id) - workflow.run_id = request_id + _, max_copy_workers = self.config.search( + "maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS} + ) + # Docstring inherited from BaseWmsService.submit. + file_distribution_uri = self.config["fileDistributionEndPoint"] + lsst_temp = "LSST_RUN_TEMP_SPACE" + if lsst_temp in file_distribution_uri and lsst_temp not in os.environ: + file_distribution_uri = self.config["fileDistributionEndPointDefault"] + copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers) + + idds_client = get_idds_client(self.config) + ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False) + _LOG.debug("iDDS client manager submit returned = %s", ret) + + # Check submission success + status, result, error = get_idds_result(ret) + if status: + request_id = int(result) + else: + raise RuntimeError(f"Error submitting to PanDA service: {error}") + + _LOG.info("Submitted into iDDs with request id=%s", request_id) + workflow.run_id = request_id def restart(self, wms_workflow_id): # Docstring inherited from BaseWmsService.restart. diff --git a/python/lsst/ctrl/bps/panda/utils.py b/python/lsst/ctrl/bps/panda/utils.py index cce6e53..59803cd 100644 --- a/python/lsst/ctrl/bps/panda/utils.py +++ b/python/lsst/ctrl/bps/panda/utils.py @@ -39,11 +39,14 @@ import concurrent.futures import logging import os +import tarfile +import uuid import idds.common.utils as idds_utils import pandaclient.idds_api from idds.doma.workflowv2.domapandawork import DomaPanDAWork from idds.workflowv2.workflow import AndCondition +from idds.workflowv2.workflow import Workflow as IDDS_client_workflow from lsst.ctrl.bps import BpsConfig, GenericWorkflow, GenericWorkflowJob from lsst.ctrl.bps.panda.cmd_line_embedder import CommandLineEmbedder from lsst.ctrl.bps.panda.constants import ( @@ -595,3 +598,102 @@ def add_idds_work(config, generic_workflow, idds_workflow): _LOG.info("Successfully recovered.") return files_to_pre_stage, dag_sink_work, task_count + + +def create_archive_file(submit_path, archive_filename, files): + if not archive_filename.startswith("/"): + archive_filename = os.path.join(submit_path, archive_filename) + + with tarfile.open(archive_filename, "w:gz", dereference=True) as tar: + for local_file in files: + base_name = os.path.basename(local_file) + tar.add(local_file, arcname=os.path.basename(base_name)) + return archive_filename + + +def copy_files_to_pandacache(filename): + from pandaclient import Client + + status, out = Client.putFile(filename, True) + print(f"copy_files_to_pandacache: status: {status}, out: {out}") + if out.startswith("NewFileName:"): + # found the same input sandbox to reuse + filename = out.split(":")[-1] + elif out != "True": + print(out) + return None + + filename = os.path.basename(filename) + cache_path = os.path.join(os.environ["PANDACACHE_URL"], "cache") + filename = os.path.join(cache_path, filename) + return filename + + +def get_task_parameter(config, remote_build, key): + _, value = remote_build.search(key) + if not value: + _, value = config.search(key) + return value + + +def create_idds_build_workflow(config_file, config, remote_build): + _, files = remote_build.search("files", opt={"default": []}) + submit_path = config["submitPath"] + files.append(config_file) + archive_filename = "jobO.%s.tar.gz" % str(uuid.uuid4()) + archive_filename = create_archive_file(submit_path, archive_filename, files) + _LOG.info("archive file name: %s" % archive_filename) + remote_filename = copy_files_to_pandacache(archive_filename) + _LOG.info("pandacache file: %s" % remote_filename) + + _LOG.info(type(remote_build)) + search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False} + cvals = {"LSST_VERSION": get_task_parameter(config, remote_build, "LSST_VERSION")} + cvals["custom_lsst_setup"] = get_task_parameter(config, remote_build, "custom_lsst_setup") + search_opt["curvals"] = cvals + _, executable = remote_build.search("runnerCommand", opt=search_opt) + _LOG.info("executable: %s" % str(executable)) + executable = executable.replace("_download_cmd_line_", remote_filename) + executable = executable.replace("_build_cmd_line_", config_file) + _LOG.info("executable: %s" % executable) + + task_cloud = get_task_parameter(config, remote_build, "computeCloud") + task_site = get_task_parameter(config, remote_build, "computeSite") + task_queue = get_task_parameter(config, remote_build, "queue") + task_rss = get_task_parameter(config, remote_build, "requestMemory") + nretries = get_task_parameter(config, remote_build, "numberOfRetries") + _LOG.info("requestMemory: %s", task_rss) + _LOG.info("Cloud: %s", task_cloud) + _LOG.info("Site: %s", task_site) + _LOG.info("Queue: %s", task_queue) + # TODO: fill other parameters based on config + build_work = DomaPanDAWork( + executable=executable, + task_type="lsst_build", + primary_input_collection={"scope": "pseudo_dataset", "name": "pseudo_input_collection#1"}, + output_collections=[{"scope": "pseudo_dataset", "name": "pseudo_output_collection#1"}], + log_collections=[], + dependency_map=None, + task_name="build_task", + task_queue=task_queue, + encode_command_line=True, + prodSourceLabel="managed", + task_log={ + "dataset": "PandaJob_#{pandaid}/", + "destination": "local", + "param_type": "log", + "token": "local", + "type": "template", + "value": "log.tgz", + }, + task_rss=task_rss if task_rss else PANDA_DEFAULT_RSS, + task_cloud=task_cloud, + task_site=task_site, + maxattempt=nretries if nretries > 0 else PANDA_DEFAULT_MAX_ATTEMPTS, + ) + + workflow = IDDS_client_workflow() + + workflow.add_work(build_work) + workflow.name = config["bps_defined"]["uniqProcName"] + return workflow