Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
mcantelon committed Dec 7, 2023
1 parent 89d6466 commit dfe49cb
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 153 deletions.
14 changes: 9 additions & 5 deletions AIPscan/Aggregator/database_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
FileType,
Pipeline,
StorageLocation,
StorageService,
)

logger = get_task_logger(__name__)
Expand Down Expand Up @@ -203,19 +204,20 @@ def create_storage_location_object(current_location, description, storage_servic
return storage_location


def create_or_update_storage_location(current_location, api_url, storage_service_id):
def create_or_update_storage_location(current_location, storage_service):
"""Create or update Storage Location and return it."""
storage_location = StorageLocation.query.filter_by(
current_location=current_location
).first()

request_url, request_url_without_api_key = get_storage_service_api_url(
api_url, current_location
storage_service, current_location
)
response = tasks.make_request(request_url, request_url_without_api_key)
description = response.get("description")
if not storage_location:
return create_storage_location_object(
current_location, description, storage_service_id
current_location, description, storage_service.id
)

if storage_location.description != description:
Expand All @@ -233,11 +235,13 @@ def create_pipeline_object(origin_pipeline, dashboard_url):
return pipeline


def create_or_update_pipeline(origin_pipeline, api_url):
def create_or_update_pipeline(origin_pipeline, storage_service_id):
"""Create or update Storage Location and return it."""
pipeline = Pipeline.query.filter_by(origin_pipeline=origin_pipeline).first()

storage_service = StorageService.query.get(storage_service_id)
request_url, request_url_without_api_key = get_storage_service_api_url(
api_url, origin_pipeline
storage_service, origin_pipeline
)
response = tasks.make_request(request_url, request_url_without_api_key)
dashboard_url = response.get("remote_name")
Expand Down
6 changes: 3 additions & 3 deletions AIPscan/Aggregator/mets_parse_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_aip_original_name(mets):
# ignore those.
TRANSFER_DIR_PREFIX = "%transferDirectory%"

NAMESPACES = {u"premis": u"http://www.loc.gov/premis/v3"}
NAMESPACES = {"premis": "http://www.loc.gov/premis/v3"}
ELEM_ORIGINAL_NAME_PATTERN = ".//premis:originalName"

original_name = ""
Expand All @@ -85,13 +85,13 @@ def get_aip_original_name(mets):


def download_mets(
api_url, package_uuid, relative_path_to_mets, timestamp, package_list_no
storage_service, package_uuid, relative_path_to_mets, timestamp, package_list_no
):
"""Download METS from the storage service."""

# Request the METS file.
mets_response = requests.get(
get_mets_url(api_url, package_uuid, relative_path_to_mets)
get_mets_url(storage_service, package_uuid, relative_path_to_mets)
)

# Create a directory to download the METS to.
Expand Down
44 changes: 23 additions & 21 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,27 @@
from AIPscan.Aggregator.types import StorageServicePackage


def format_api_url_with_limit_offset(api_url):
def assemble_api_url_dict(storage_service, offset=0, limit=1_000_000):
return {

Check warning on line 15 in AIPscan/Aggregator/task_helpers.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/task_helpers.py#L15

Added line #L15 was not covered by tests
"baseUrl": storage_service.url,
"userName": storage_service.user_name,
"apiKey": storage_service.api_key,
"offset": offset,
"limit": limit,
}


def format_api_url_with_limit_offset(storage_service):
"""Format the API URL here to make sure it is as correct as
possible.
"""
base_url = api_url.get("baseUrl", "").rstrip("/")
limit = int(api_url.get("limit", ""))
offset = api_url.get("offset", "")
user_name = api_url.get("userName")
api_key = api_url.get("apiKey", "")
base_url = storage_service.url.rstrip("/")

request_url_without_api_key = "{}/api/v2/file/?limit={}&offset={}".format(
base_url, limit, offset
base_url, storage_service.download_limit, storage_service.download_offset
)
request_url = "{}&username={}&api_key={}".format(
request_url_without_api_key, user_name, api_key
request_url_without_api_key, storage_service.user_name, storage_service.api_key
)
return base_url, request_url_without_api_key, request_url

Expand Down Expand Up @@ -95,32 +102,27 @@ def _tz_neutral_date(date):
return date


def get_mets_url(api_url, package_uuid, path_to_mets):
def get_mets_url(storage_service, package_uuid, path_to_mets):
"""Construct a URL from which we can download the METS files that
we are interested in.
"""
am_url = "baseUrl"
user_name = "userName"
api_key = "apiKey"

mets_url = "{}/api/v2/file/{}/extract_file/?relative_path_to_file={}&username={}&api_key={}".format(
api_url[am_url].rstrip("/"),
storage_service.url.rstrip("/"),
package_uuid,
path_to_mets,
api_url[user_name],
api_url[api_key],
storage_service.user_name,
storage_service.api_key,
)
return mets_url


def get_storage_service_api_url(api_url, api_path):
def get_storage_service_api_url(storage_service, api_path):
"""Return URL to fetch location infofrom Storage Service."""
base_url = api_url.get("baseUrl", "").rstrip("/")
base_url = storage_service.url.rstrip("/")
request_url_without_api_key = "{}{}".format(base_url, api_path).rstrip("/")
user_name = api_url.get("userName")
api_key = api_url.get("apiKey", "")

request_url = "{}?username={}&api_key={}".format(
request_url_without_api_key, user_name, api_key
request_url_without_api_key, storage_service.user_name, storage_service.api_key
)
return request_url, request_url_without_api_key

Expand Down
53 changes: 26 additions & 27 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def start_mets_task(
relative_path_to_mets,
current_location,
origin_pipeline,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
Expand All @@ -61,18 +60,20 @@ def start_mets_task(
"""Initiate a get_mets task worker and record the event in the
celery database.
"""
storage_service = StorageService.query.get(storage_service_id)

Check warning on line 63 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L63

Added line #L63 was not covered by tests
storage_location = database_helpers.create_or_update_storage_location(
current_location, api_url, storage_service_id
current_location, storage_service
)

pipeline = database_helpers.create_or_update_pipeline(origin_pipeline, api_url)
pipeline = database_helpers.create_or_update_pipeline(

Check warning on line 68 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L68

Added line #L68 was not covered by tests
origin_pipeline, storage_service_id
)

# Call worker to download and parse METS File.
get_mets_task = get_mets.delay(
package_uuid,
aip_size,
relative_path_to_mets,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
Expand All @@ -91,12 +92,7 @@ def start_mets_task(


def parse_packages_and_load_mets(
json_file_path,
api_url,
timestamp,
package_list_no,
storage_service_id,
fetch_job_id,
json_file_path, timestamp, package_list_no, storage_service_id, fetch_job_id
):
"""Parse packages documents from the storage service and initiate
the load mets functions of AIPscan. Results are written to the
Expand Down Expand Up @@ -128,7 +124,6 @@ def parse_packages_and_load_mets(
package.get_relative_path(),
package.current_location,
package.origin_pipeline,
api_url,
timestamp,
package_list_no,
storage_service_id,
Expand All @@ -149,14 +144,13 @@ def delete_aip(uuid):

@celery.task(bind=True)
def workflow_coordinator(
self, api_url, timestamp, storage_service_id, fetch_job_id, packages_directory
self, timestamp, storage_service_id, fetch_job_id, packages_directory
):

logger.info("Packages directory is: %s", packages_directory)

# Send package list request to a worker.
package_lists_task = package_lists_request.delay(
api_url, timestamp, packages_directory
storage_service_id, timestamp, packages_directory
)

write_celery_update(package_lists_task, workflow_coordinator)
Expand All @@ -182,12 +176,7 @@ def workflow_coordinator(
# Process packages and create a new worker to download and parse
# each METS separately.
packages = parse_packages_and_load_mets(
json_file_path,
api_url,
timestamp,
package_list_no,
storage_service_id,
fetch_job_id,
json_file_path, timestamp, package_list_no, storage_service_id, fetch_job_id
)
all_packages = all_packages + packages

Expand Down Expand Up @@ -228,29 +217,34 @@ def make_request(request_url, request_url_without_api_key):


@celery.task(bind=True)
def package_lists_request(self, apiUrl, timestamp, packages_directory):
def package_lists_request(self, storage_service_id, timestamp, packages_directory):
"""Request package lists from the storage service. Package lists
will contain details of the AIPs that we want to download.
"""
META = "meta"
NEXT = "next"
LIMIT = "limit"
COUNT = "total_count"
IN_PROGRESS = "IN PROGRESS"

storage_service = StorageService.query.get(storage_service_id)

Check warning on line 229 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L229

Added line #L229 was not covered by tests

(
base_url,
request_url_without_api_key,
request_url,
) = format_api_url_with_limit_offset(apiUrl)
) = format_api_url_with_limit_offset(storage_service)

# First packages request.
packages = make_request(request_url, request_url_without_api_key)
packages_count = 1

# Calculate how many package list files will be downloaded based on
# total number of packages and the download limit
total_packages = int(packages.get(META, {}).get(COUNT, 0))
total_package_lists = int(total_packages / int(apiUrl.get(LIMIT))) + (
total_packages % int(apiUrl.get(LIMIT)) > 0
total_package_lists = int(total_packages / int(storage_service.download_limit)) + (

Check warning on line 244 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L244

Added line #L244 was not covered by tests
total_packages % int(storage_service.download_limit) > 0
)

# There may be more packages to download to let's access those here.
# TODO: `request_url_without_api_key` at this point will not be as
# accurate. If we have more time, modify `format_api_url_with_limit_offset(...)`
Expand All @@ -272,6 +266,7 @@ def package_lists_request(self, apiUrl, timestamp, packages_directory):
)
},
)

return {
"totalPackageLists": total_package_lists,
"totalPackages": total_packages,
Expand All @@ -284,7 +279,6 @@ def get_mets(
package_uuid,
aip_size,
relative_path_to_mets,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
Expand Down Expand Up @@ -313,8 +307,13 @@ def get_mets(
tasklogger = customlogger

# Download METS file
storage_service = StorageService.query.get(storage_service_id)
download_file = download_mets(
api_url, package_uuid, relative_path_to_mets, timestamp_str, package_list_no
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)
mets_name = os.path.basename(download_file)
mets_hash = file_sha256_hash(download_file)
Expand Down
6 changes: 3 additions & 3 deletions AIPscan/Aggregator/templates/storage_service.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
<tr><td width=20%><strong>Download offset</strong></td><td>{{ storage_service.download_offset}}</td></tr>
<tr><td width=20%><strong>Default service</strong></td><td>{{ storage_service.default}}</td></tr>
</table>
<a href="{{ url_for('aggregator.edit_storage_service', id=storage_service.id) }}"><button type="button" class="btn btn-info">Edit</button></a>
<a href="{{ url_for('aggregator.delete_storage_service', id=storage_service.id) }}"><button type="button" class="btn btn-danger">Delete</button></a>
<a href="{{ url_for('aggregator.edit_storage_service', storage_service_id=storage_service.id) }}"><button type="button" class="btn btn-info">Edit</button></a>
<a href="{{ url_for('aggregator.delete_storage_service', storage_service_id=storage_service.id) }}"><button type="button" class="btn btn-danger">Delete</button></a>
{% if mets_fetch_jobs %}
<a href="{{ url_for('reporter.view_aips', storage_service_id=storage_service.id) }}"><button type="button" class="btn btn-success">View AIPs</button></a>
{% endif %}
Expand Down Expand Up @@ -73,7 +73,7 @@
</td>
<td>{{ mets_fetch_job.aips|length }}</td>
<td>
<a href="{{ url_for('aggregator.delete_fetch_job', id= mets_fetch_job.id) }}"><button type="button" class="btn btn-danger">Delete</button></a>
<a href="{{ url_for('aggregator.delete_fetch_job', fetch_job_id=mets_fetch_job.id) }}"><button type="button" class="btn btn-danger">Delete</button></a>
</td>
</tr>
{% endfor %}
Expand Down
26 changes: 22 additions & 4 deletions AIPscan/Aggregator/tests/test_database_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
FileType,
Pipeline,
StorageLocation,
StorageService,
)

FIXTURES_DIR = "fixtures"
Expand Down Expand Up @@ -289,14 +290,25 @@ def test_create_or_update_storage_location(
"""Test that Storage Locations are created or updated as expected."""
make_request = mocker.patch("AIPscan.Aggregator.tasks.make_request")
make_request.return_value = {"description": new_description}

create_storage_location_object = mocker.patch(
"AIPscan.Aggregator.database_helpers.create_storage_location_object"
)

# FIX
storage_service = StorageService(
name="a",
url="b",
user_name="c",
api_key="12345",
download_limit=0,
download_offset=0,
default="",
)
storage_service.id = 1

storage_location = database_helpers.create_or_update_storage_location(
current_location=current_location,
api_url={},
storage_service_id=storage_service_id,
current_location=current_location, storage_service=storage_service
)

if location_created:
Expand All @@ -320,12 +332,18 @@ def test_create_or_update_pipeline(
"""Test that Storage Locations are created or updated as expected."""
make_request = mocker.patch("AIPscan.Aggregator.tasks.make_request")
make_request.return_value = {"remote_name": new_url}

create_pipeline_object = mocker.patch(
"AIPscan.Aggregator.database_helpers.create_pipeline_object"
)

get_storage_service_api_url = mocker.patch(
"AIPscan.Aggregator.database_helpers.get_storage_service_api_url"
)
get_storage_service_api_url.return_value = (None, None)

pipeline = database_helpers.create_or_update_pipeline(
origin_pipeline=origin_pipeline, api_url={}
origin_pipeline=origin_pipeline, storage_service_id=None
)

if pipeline_created:
Expand Down
Loading

0 comments on commit dfe49cb

Please sign in to comment.