Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pandaDistributionEndpoint to support different protocols #72

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/bps_panda_DF.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ project: dev
campaign: quick
s3EndpointUrl: "https://storage.googleapis.com"
payloadFolder: payload
fileDistributionEndPoint: "file://${LSST_RUN_TEMP_SPACE}/{operator}/panda_cache_box/{payloadFolder}/{uniqProcName}/"
fileDistributionEndPoint: "${LSST_RUN_TEMP_SPACE}/panda_cache_box/{payloadFolder}/{uniqProcName}/"

# location of main butler repo at USDF
payload:
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-45631.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix pandaDistributionEndpoint to support different protocols
6 changes: 3 additions & 3 deletions python/lsst/ctrl/bps/panda/edgenode/build_cmd_line_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from lsst.ctrl.bps.drivers import prepare_driver
from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS
from lsst.ctrl.bps.panda.utils import copy_files_for_distribution, get_idds_client
from lsst.resources import ResourcePath
from lsst.utils.timer import time_this

logging.basicConfig(
Expand Down Expand Up @@ -138,9 +139,8 @@ def create_idds_workflow(config_file, compute_site):
idds_workflow = bps_workflow.idds_client_workflow

_, max_copy_workers = config.search("maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS})
copy_files_for_distribution(
bps_workflow.files_to_pre_stage, config["fileDistributionEndPoint"], max_copy_workers
)
file_distribution_uri = ResourcePath(config["fileDistributionEndPoint"], forceDirectory=True)
copy_files_for_distribution(bps_workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers)

idds_client = get_idds_client(config)
ret = idds_client.update_build_request(request_id, signature, idds_workflow)
Expand Down
7 changes: 6 additions & 1 deletion python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
get_idds_client,
get_idds_result,
)
from lsst.resources import ResourcePath

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -95,7 +96,11 @@
lsst_temp = "LSST_RUN_TEMP_SPACE"
if lsst_temp in file_distribution_uri and lsst_temp not in os.environ:
file_distribution_uri = self.config["fileDistributionEndPointDefault"]
copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers)
copy_files_for_distribution(

Check warning on line 99 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L99

Added line #L99 was not covered by tests
workflow.files_to_pre_stage,
ResourcePath(file_distribution_uri, forceDirectory=True),
max_copy_workers,
)

idds_client = get_idds_client(self.config)
ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False)
Expand Down
12 changes: 5 additions & 7 deletions python/lsst/ctrl/bps/panda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
----------
files_to_stage : `dict` [`str`, `str`]
Files which need to be copied to a workflow staging area.
file_distribution_uri : `str`
file_distribution_uri : `ResourcePath`
Path on the edge node accessed storage,
including access protocol, bucket name to place files.
max_copy_workers : `int`
Expand All @@ -93,16 +93,14 @@
for local_pfn in files_to_stage.values():
folder_name = os.path.basename(os.path.normpath(local_pfn))
if os.path.isdir(local_pfn):
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=True)

Check warning on line 96 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L96

Added line #L96 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to change the type for the file_distribution_uri parameter in the docstring. strings won't work anymore.

files_in_folder = ResourcePath.findFileResources([local_pfn])
for file in files_in_folder:
file_name = file.basename()
files_to_copy[file] = ResourcePath(
os.path.join(file_distribution_uri, folder_name, file_name)
)
files_to_copy[file] = folder_uri.join(file_name, forceDirectory=False)

Check warning on line 100 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L100

Added line #L100 was not covered by tests
else:
files_to_copy[ResourcePath(local_pfn)] = ResourcePath(
os.path.join(file_distribution_uri, folder_name)
)
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False)
files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri

Check warning on line 103 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L102-L103

Added lines #L102 - L103 were not covered by tests

copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers)
future_file_copy = []
Expand Down
Loading