Skip to content

Commit

Permalink
Merge pull request #72 from lsst/tickets/DM-45631
Browse files Browse the repository at this point in the history
fix pandaDistributionEndpoint to support different protocols
  • Loading branch information
zhaoyuyoung authored Aug 14, 2024
2 parents 0fbcb7a + 4223f6a commit 4cb75ee
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 12 deletions.
2 changes: 1 addition & 1 deletion config/bps_panda_DF.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ project: dev
campaign: quick
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPoint: "${LSST_RUN_TEMP_SPACE}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-45631.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix pandaDistributionEndpoint to support different protocols
6 changes: 3 additions & 3 deletions python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from lsst.ctrl.bps.drivers import prepare_driver
from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS
from lsst.ctrl.bps.panda.utils import copy_files_for_distribution, get_idds_client
from lsst.resources import ResourcePath
from lsst.utils.timer import time_this

logging.basicConfig(
Expand Down Expand Up @@ -138,9 +139,8 @@ def create_idds_workflow(config_file, compute_site):
idds_workflow = bps_workflow.idds_client_workflow

_, max_copy_workers = config.search("maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS})
copy_files_for_distribution(
bps_workflow.files_to_pre_stage, config["fileDistributionEndPoint"], max_copy_workers
)
file_distribution_uri = ResourcePath(config["fileDistributionEndPoint"], forceDirectory=True)
copy_files_for_distribution(bps_workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers)

idds_client = get_idds_client(config)
ret = idds_client.update_build_request(request_id, signature, idds_workflow)
Expand Down
7 changes: 6 additions & 1 deletion python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
get_idds_client,
get_idds_result,
)
from lsst.resources import ResourcePath

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -95,7 +96,11 @@ def submit(self, workflow, **kwargs):
lsst_temp = "LSST_RUN_TEMP_SPACE"
if lsst_temp in file_distribution_uri and lsst_temp not in os.environ:
file_distribution_uri = self.config["fileDistributionEndPointDefault"]
copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers)
copy_files_for_distribution(
workflow.files_to_pre_stage,
ResourcePath(file_distribution_uri, forceDirectory=True),
max_copy_workers,
)

idds_client = get_idds_client(self.config)
ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False)
Expand Down
12 changes: 5 additions & 7 deletions python/lsst/ctrl/bps/panda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
----------
files_to_stage : `dict` [`str`, `str`]
Files which need to be copied to a workflow staging area.
file_distribution_uri : `str`
file_distribution_uri : `ResourcePath`
Path on the edge node accessed storage,
including access protocol, bucket name to place files.
max_copy_workers : `int`
Expand All @@ -93,16 +93,14 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
for local_pfn in files_to_stage.values():
folder_name = os.path.basename(os.path.normpath(local_pfn))
if os.path.isdir(local_pfn):
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=True)
files_in_folder = ResourcePath.findFileResources([local_pfn])
for file in files_in_folder:
file_name = file.basename()
files_to_copy[file] = ResourcePath(
os.path.join(file_distribution_uri, folder_name, file_name)
)
files_to_copy[file] = folder_uri.join(file_name, forceDirectory=False)
else:
files_to_copy[ResourcePath(local_pfn)] = ResourcePath(
os.path.join(file_distribution_uri, folder_name)
)
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False)
files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri

copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers)
future_file_copy = []
Expand Down

0 comments on commit 4cb75ee

Please sign in to comment.