diff --git a/config/bps_panda_DF.yaml b/config/bps_panda_DF.yaml index 40eb933..7e853a8 100644 --- a/config/bps_panda_DF.yaml +++ b/config/bps_panda_DF.yaml @@ -8,7 +8,7 @@ project: dev campaign: quick s3EndpointUrl: "https://storage.googleapis.com" payloadFolder: payload -fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/" +fileDistributionEndPoint: "${LSST_RUN_TEMP_SPACE}/panda_cache_box/{payloadFolder}/{uniqProcName}/" # location of main butler repo at USDF payload: diff --git a/doc/changes/DM-45631.misc.rst b/doc/changes/DM-45631.misc.rst new file mode 100644 index 0000000..6aab8ee --- /dev/null +++ b/doc/changes/DM-45631.misc.rst @@ -0,0 +1 @@ +fix pandaDistributionEndpoint to support different protocols diff --git a/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py b/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py index 6f18c2c..3e6ba2d 100644 --- a/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py +++ b/python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py @@ -18,6 +18,7 @@ from lsst.ctrl.bps.drivers import prepare_driver from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS from lsst.ctrl.bps.panda.utils import copy_files_for_distribution, get_idds_client +from lsst.resources import ResourcePath from lsst.utils.timer import time_this logging.basicConfig( @@ -138,9 +139,8 @@ def create_idds_workflow(config_file, compute_site): idds_workflow = bps_workflow.idds_client_workflow _, max_copy_workers = config.search("maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS}) -copy_files_for_distribution( - bps_workflow.files_to_pre_stage, config["fileDistributionEndPoint"], max_copy_workers -) +file_distribution_uri = ResourcePath(config["fileDistributionEndPoint"], forceDirectory=True) +copy_files_for_distribution(bps_workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers) idds_client = get_idds_client(config) ret = idds_client.update_build_request(request_id, signature, idds_workflow) diff --git a/python/lsst/ctrl/bps/panda/panda_service.py b/python/lsst/ctrl/bps/panda/panda_service.py index 29435be..c496e13 100644 --- a/python/lsst/ctrl/bps/panda/panda_service.py +++ b/python/lsst/ctrl/bps/panda/panda_service.py @@ -48,6 +48,7 @@ get_idds_client, get_idds_result, ) +from lsst.resources import ResourcePath _LOG = logging.getLogger(__name__) @@ -95,7 +96,11 @@ def submit(self, workflow, **kwargs): lsst_temp = "LSST_RUN_TEMP_SPACE" if lsst_temp in file_distribution_uri and lsst_temp not in os.environ: file_distribution_uri = self.config["fileDistributionEndPointDefault"] - copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers) + copy_files_for_distribution( + workflow.files_to_pre_stage, + ResourcePath(file_distribution_uri, forceDirectory=True), + max_copy_workers, + ) idds_client = get_idds_client(self.config) ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False) diff --git a/python/lsst/ctrl/bps/panda/utils.py b/python/lsst/ctrl/bps/panda/utils.py index 716d101..a260b3e 100644 --- a/python/lsst/ctrl/bps/panda/utils.py +++ b/python/lsst/ctrl/bps/panda/utils.py @@ -76,7 +76,7 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_ ---------- files_to_stage : `dict` [`str`, `str`] Files which need to be copied to a workflow staging area. - file_distribution_uri : `str` + file_distribution_uri : `ResourcePath` Path on the edge node accessed storage, including access protocol, bucket name to place files. max_copy_workers : `int` @@ -93,16 +93,14 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_ for local_pfn in files_to_stage.values(): folder_name = os.path.basename(os.path.normpath(local_pfn)) if os.path.isdir(local_pfn): + folder_uri = file_distribution_uri.join(folder_name, forceDirectory=True) files_in_folder = ResourcePath.findFileResources([local_pfn]) for file in files_in_folder: file_name = file.basename() - files_to_copy[file] = ResourcePath( - os.path.join(file_distribution_uri, folder_name, file_name) - ) + files_to_copy[file] = folder_uri.join(file_name, forceDirectory=False) else: - files_to_copy[ResourcePath(local_pfn)] = ResourcePath( - os.path.join(file_distribution_uri, folder_name) - ) + folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False) + files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers) future_file_copy = []