-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix pandaDistributionEndpoint to support different protocols #72
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #72 +/- ##
=======================================
Coverage 31.40% 31.41%
=======================================
Files 9 9
Lines 640 643 +3
Branches 111 111
=======================================
+ Hits 201 202 +1
- Misses 430 432 +2
Partials 9 9 ☔ View full report in Codecov by Sentry. |
@@ -95,6 +95,9 @@ def submit(self, workflow, **kwargs): | |||
lsst_temp = "LSST_RUN_TEMP_SPACE" | |||
if lsst_temp in file_distribution_uri and lsst_temp not in os.environ: | |||
file_distribution_uri = self.config["fileDistributionEndPointDefault"] | |||
protocol_pattern = re.compile(r"^[a-zA-Z][a-zA-Z\d+\-.]*://") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be much easier if you use ResourcePath here or even the standard URI parser (you should definitely not be writing a URI scheme parser of your own).
With ResourcePath all you need is:
file_distribution_uri = ResourcePath(self.config["fileDistributionEndPointDefault"])
and it will sort it out and give you a file:///
if it's a file and whatever other URI if it had another scheme.
@@ -95,6 +95,9 @@ def submit(self, workflow, **kwargs): | |||
lsst_temp = "LSST_RUN_TEMP_SPACE" | |||
if lsst_temp in file_distribution_uri and lsst_temp not in os.environ: | |||
file_distribution_uri = self.config["fileDistributionEndPointDefault"] | |||
protocol_pattern = re.compile(r"^[a-zA-Z][a-zA-Z\d+\-.]*://") | |||
if not protocol_pattern.match(file_distribution_uri): | |||
file_distribution_uri = "file://" + file_distribution_uri | |||
copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy_files_for_distribution
is using ResourcePath
internally so using ResourcePath
in this file seems like the right thing to do. It looks like copy_files_for_distribution
will need to be fixed because it is assuming local files. It needs to be modified to create the ResourcePath
first and then use the .join
method rather than using os.path.join
and then creating the ResourcePath
.
be34416
to
77e0485
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay. I have some minor clean up comments.
copy_files_for_distribution( | ||
bps_workflow.files_to_pre_stage, config["fileDistributionEndPoint"], max_copy_workers | ||
) | ||
file_distribution_uri = ResourcePath(config["fileDistributionEndPoint"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_distribution_uri = ResourcePath(config["fileDistributionEndPoint"]) | |
file_distribution_uri = ResourcePath(config["fileDistributionEndPoint"], forceDirectory=True) |
This will make sure that if someone forgets the trailing /
that it still acts like a directory on remote URIs.
@@ -95,7 +96,9 @@ def submit(self, workflow, **kwargs): | |||
lsst_temp = "LSST_RUN_TEMP_SPACE" | |||
if lsst_temp in file_distribution_uri and lsst_temp not in os.environ: | |||
file_distribution_uri = self.config["fileDistributionEndPointDefault"] | |||
copy_files_for_distribution(workflow.files_to_pre_stage, file_distribution_uri, max_copy_workers) | |||
copy_files_for_distribution( | |||
workflow.files_to_pre_stage, ResourcePath(file_distribution_uri), max_copy_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workflow.files_to_pre_stage, ResourcePath(file_distribution_uri), max_copy_workers | |
workflow.files_to_pre_stage, ResourcePath(file_distribution_uri, forceDirectory=True), max_copy_workers |
python/lsst/ctrl/bps/panda/utils.py
Outdated
files_to_copy[ResourcePath(local_pfn)] = ResourcePath( | ||
os.path.join(file_distribution_uri, folder_name) | ||
) | ||
files_to_copy[ResourcePath(local_pfn)] = file_distribution_uri.join(folder_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We know local_pfn
is a file so we can say ResourcePath(local_pfn, forceDirectory=False)
here.
I think the file_distribution_uri.join(folder_name)
is the same as folder_uri
above so maybe move the folder_uri
outside the if
statement and reuse the variable here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify the slack discussion. folder_name
is not a directory if local_pfn
is not a directory so this should be:
files_to_copy[ResourcePath(local_pfn)] = file_distribution_uri.join(folder_name) | |
files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = file_distribution_uri.join(folder_name, forceDirectory=False) |
77e0485
to
f4736d9
Compare
python/lsst/ctrl/bps/panda/utils.py
Outdated
files_to_copy[file] = ResourcePath( | ||
os.path.join(file_distribution_uri, folder_name, file_name) | ||
) | ||
files_to_copy[file] = folder_uri.join(file_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you know you are appending a file to a directory you can say forceDirectory=False in this join like you do below to be explicit.
06d646f
to
0e8fa1a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still missing the doc/changes file. And need a docstring updated. Otherwise good to merge.
@@ -93,16 +93,14 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_ | |||
for local_pfn in files_to_stage.values(): | |||
folder_name = os.path.basename(os.path.normpath(local_pfn)) | |||
if os.path.isdir(local_pfn): | |||
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to change the type for the file_distribution_uri parameter in the docstring. strings won't work anymore.
0e8fa1a
to
4223f6a
Compare
Checklist
doc/changes