Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code cleanup/refactoring (#243) #263

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions AIPscan/Aggregator/database_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,20 @@ def create_storage_location_object(current_location, description, storage_servic
return storage_location


def create_or_update_storage_location(current_location, api_url, storage_service_id):
def create_or_update_storage_location(current_location, storage_service):
"""Create or update Storage Location and return it."""
storage_location = StorageLocation.query.filter_by(
current_location=current_location
).first()

request_url, request_url_without_api_key = get_storage_service_api_url(
api_url, current_location
storage_service, current_location
)
response = tasks.make_request(request_url, request_url_without_api_key)
description = response.get("description")
if not storage_location:
return create_storage_location_object(
current_location, description, storage_service_id
current_location, description, storage_service.id
)

if storage_location.description != description:
Expand All @@ -233,11 +234,12 @@ def create_pipeline_object(origin_pipeline, dashboard_url):
return pipeline


def create_or_update_pipeline(origin_pipeline, api_url):
def create_or_update_pipeline(origin_pipeline, storage_service):
"""Create or update Storage Location and return it."""
pipeline = Pipeline.query.filter_by(origin_pipeline=origin_pipeline).first()

request_url, request_url_without_api_key = get_storage_service_api_url(
api_url, origin_pipeline
storage_service, origin_pipeline
)
response = tasks.make_request(request_url, request_url_without_api_key)
dashboard_url = response.get("remote_name")
Expand Down
6 changes: 3 additions & 3 deletions AIPscan/Aggregator/mets_parse_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_aip_original_name(mets):
# ignore those.
TRANSFER_DIR_PREFIX = "%transferDirectory%"

NAMESPACES = {u"premis": u"http://www.loc.gov/premis/v3"}
NAMESPACES = {"premis": "http://www.loc.gov/premis/v3"}
ELEM_ORIGINAL_NAME_PATTERN = ".//premis:originalName"

original_name = ""
Expand All @@ -85,13 +85,13 @@ def get_aip_original_name(mets):


def download_mets(
api_url, package_uuid, relative_path_to_mets, timestamp, package_list_no
storage_service, package_uuid, relative_path_to_mets, timestamp, package_list_no
):
"""Download METS from the storage service."""

# Request the METS file.
mets_response = requests.get(
get_mets_url(api_url, package_uuid, relative_path_to_mets)
get_mets_url(storage_service, package_uuid, relative_path_to_mets)
)

# Create a directory to download the METS to.
Expand Down
48 changes: 27 additions & 21 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""Collects a number of reusable components of tasks.py. Also ensures
the module remains clean and easy to refactor over time.
"""
import json
import os
from datetime import datetime

Expand All @@ -11,20 +12,17 @@
from AIPscan.Aggregator.types import StorageServicePackage


def format_api_url_with_limit_offset(api_url):
def format_api_url_with_limit_offset(storage_service):
"""Format the API URL here to make sure it is as correct as
possible.
"""
base_url = api_url.get("baseUrl", "").rstrip("/")
limit = int(api_url.get("limit", ""))
offset = api_url.get("offset", "")
user_name = api_url.get("userName")
api_key = api_url.get("apiKey", "")
base_url = storage_service.url.rstrip("/")

request_url_without_api_key = "{}/api/v2/file/?limit={}&offset={}".format(
base_url, limit, offset
base_url, storage_service.download_limit, storage_service.download_offset
)
request_url = "{}&username={}&api_key={}".format(
request_url_without_api_key, user_name, api_key
request_url_without_api_key, storage_service.user_name, storage_service.api_key
)
return base_url, request_url_without_api_key, request_url

Expand All @@ -36,6 +34,19 @@
return os.path.join("AIPscan", "Aggregator", "downloads", timestamp, "packages")


def parse_package_list_file(filepath, logger=None, remove_after_parsing=False):
with open(filepath, "r") as packages_json:
package_list = json.load(packages_json)
try:
if remove_after_parsing:
os.remove(filepath)
except OSError as err:

Check warning on line 43 in AIPscan/Aggregator/task_helpers.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/task_helpers.py#L43

Added line #L43 was not covered by tests
if logger:
logger.warning("Unable to delete package JSON file: {}".format(err))

Check warning on line 45 in AIPscan/Aggregator/task_helpers.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/task_helpers.py#L45

Added line #L45 was not covered by tests

return package_list


def process_package_object(package_obj):
"""Process a package object as retrieve from the storage service
and return a StorageServicePackage type to the caller for further
Expand Down Expand Up @@ -95,32 +106,27 @@
return date


def get_mets_url(api_url, package_uuid, path_to_mets):
def get_mets_url(storage_service, package_uuid, path_to_mets):
"""Construct a URL from which we can download the METS files that
we are interested in.
"""
am_url = "baseUrl"
user_name = "userName"
api_key = "apiKey"

mets_url = "{}/api/v2/file/{}/extract_file/?relative_path_to_file={}&username={}&api_key={}".format(
api_url[am_url].rstrip("/"),
storage_service.url.rstrip("/"),
package_uuid,
path_to_mets,
api_url[user_name],
api_url[api_key],
storage_service.user_name,
storage_service.api_key,
)
return mets_url


def get_storage_service_api_url(api_url, api_path):
def get_storage_service_api_url(storage_service, api_path):
"""Return URL to fetch location infofrom Storage Service."""
base_url = api_url.get("baseUrl", "").rstrip("/")
base_url = storage_service.url.rstrip("/")
request_url_without_api_key = "{}{}".format(base_url, api_path).rstrip("/")
user_name = api_url.get("userName")
api_key = api_url.get("apiKey", "")

request_url = "{}?username={}&api_key={}".format(
request_url_without_api_key, user_name, api_key
request_url_without_api_key, storage_service.user_name, storage_service.api_key
)
return request_url, request_url_without_api_key

Expand Down
104 changes: 55 additions & 49 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from AIPscan.Aggregator.task_helpers import (
format_api_url_with_limit_offset,
parse_package_list_file,
process_package_object,
)
from AIPscan.extensions import celery
Expand Down Expand Up @@ -52,83 +53,77 @@
relative_path_to_mets,
current_location,
origin_pipeline,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
fetch_job_id,
run_as_task=True,
):
"""Initiate a get_mets task worker and record the event in the
celery database.
"""
storage_service = StorageService.query.get(storage_service_id)

Check warning on line 65 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L65

Added line #L65 was not covered by tests
storage_location = database_helpers.create_or_update_storage_location(
current_location, api_url, storage_service_id
current_location, storage_service
)

pipeline = database_helpers.create_or_update_pipeline(origin_pipeline, api_url)
pipeline = database_helpers.create_or_update_pipeline(

Check warning on line 70 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L70

Added line #L70 was not covered by tests
origin_pipeline, storage_service
)

# Call worker to download and parse METS File.
get_mets_task = get_mets.delay(
args = [

Check warning on line 74 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L74

Added line #L74 was not covered by tests
package_uuid,
aip_size,
relative_path_to_mets,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
storage_location.id,
pipeline.id,
fetch_job_id,
)
mets_task = get_mets_tasks(
get_mets_task_id=get_mets_task.id,
workflow_coordinator_id=workflow_coordinator.request.id,
package_uuid=package_uuid,
status=None,
)
db.session.add(mets_task)
db.session.commit()
]

if run_as_task:
# Call worker to download and parse METS File.
get_mets_task = get_mets.delay(*args)
mets_task = get_mets_tasks(

Check warning on line 89 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L88-L89

Added lines #L88 - L89 were not covered by tests
get_mets_task_id=get_mets_task.id,
workflow_coordinator_id=workflow_coordinator.request.id,
package_uuid=package_uuid,
status=None,
)
db.session.add(mets_task)
db.session.commit()

Check warning on line 96 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L95-L96

Added lines #L95 - L96 were not covered by tests
else:
# Execute immediately.
get_mets.apply(args=args)

Check warning on line 99 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L99

Added line #L99 was not covered by tests


def parse_packages_and_load_mets(
json_file_path,
api_url,
timestamp,
package_list_no,
storage_service_id,
fetch_job_id,
package_list, timestamp, package_list_no, storage_service_id, fetch_job_id
):
"""Parse packages documents from the storage service and initiate
the load mets functions of AIPscan. Results are written to the
database.
"""
OBJECTS = "objects"
packages = []
with open(json_file_path, "r") as packages_json:
package_list = json.load(packages_json)

try:
os.remove(json_file_path)
except OSError as err:
logger.warning("Unable to delete package JSON file: {}".format(err))

for package_obj in package_list.get(OBJECTS, []):
package = process_package_object(package_obj)

packages.append(package)
handle_deletion(package)

Check warning on line 116 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L116

Added line #L116 was not covered by tests

if package.is_deleted():
delete_aip(package.uuid)
if not package.is_undeleted_aip():
continue

if not package.is_aip():
continue
start_mets_task(
package.uuid,
package.size,
package.get_relative_path(),
package.current_location,
package.origin_pipeline,
api_url,
timestamp,
package_list_no,
storage_service_id,
Expand All @@ -137,6 +132,11 @@
return packages


def handle_deletion(package):
if package.is_deleted():
delete_aip(package.uuid)

Check warning on line 137 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L137

Added line #L137 was not covered by tests


def delete_aip(uuid):
logger.warning("Package deleted from SS: '%s'", uuid)

Expand All @@ -149,14 +149,13 @@

@celery.task(bind=True)
def workflow_coordinator(
self, api_url, timestamp, storage_service_id, fetch_job_id, packages_directory
self, timestamp, storage_service_id, fetch_job_id, packages_directory
):

logger.info("Packages directory is: %s", packages_directory)

# Send package list request to a worker.
package_lists_task = package_lists_request.delay(
api_url, timestamp, packages_directory
storage_service_id, timestamp, packages_directory
)

write_celery_update(package_lists_task, workflow_coordinator)
Expand All @@ -181,13 +180,10 @@
)
# Process packages and create a new worker to download and parse
# each METS separately.
package_list = parse_package_list_file(json_file_path, logger, True)

Check warning on line 183 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L183

Added line #L183 was not covered by tests

packages = parse_packages_and_load_mets(
json_file_path,
api_url,
timestamp,
package_list_no,
storage_service_id,
fetch_job_id,
package_list, timestamp, package_list_no, storage_service_id, fetch_job_id
)
all_packages = all_packages + packages

Expand Down Expand Up @@ -228,29 +224,34 @@


@celery.task(bind=True)
def package_lists_request(self, apiUrl, timestamp, packages_directory):
def package_lists_request(self, storage_service_id, timestamp, packages_directory):
"""Request package lists from the storage service. Package lists
will contain details of the AIPs that we want to download.
"""
META = "meta"
NEXT = "next"
LIMIT = "limit"
COUNT = "total_count"
IN_PROGRESS = "IN PROGRESS"

storage_service = StorageService.query.get(storage_service_id)

Check warning on line 236 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L236

Added line #L236 was not covered by tests

(
base_url,
request_url_without_api_key,
request_url,
) = format_api_url_with_limit_offset(apiUrl)
) = format_api_url_with_limit_offset(storage_service)

# First packages request.
packages = make_request(request_url, request_url_without_api_key)
packages_count = 1

# Calculate how many package list files will be downloaded based on
# total number of packages and the download limit
total_packages = int(packages.get(META, {}).get(COUNT, 0))
total_package_lists = int(total_packages / int(apiUrl.get(LIMIT))) + (
total_packages % int(apiUrl.get(LIMIT)) > 0
total_package_lists = int(total_packages / int(storage_service.download_limit)) + (

Check warning on line 251 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L251

Added line #L251 was not covered by tests
total_packages % int(storage_service.download_limit) > 0
)

# There may be more packages to download to let's access those here.
# TODO: `request_url_without_api_key` at this point will not be as
# accurate. If we have more time, modify `format_api_url_with_limit_offset(...)`
Expand All @@ -272,6 +273,7 @@
)
},
)

return {
"totalPackageLists": total_package_lists,
"totalPackages": total_packages,
Expand All @@ -284,7 +286,6 @@
package_uuid,
aip_size,
relative_path_to_mets,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
Expand Down Expand Up @@ -313,8 +314,13 @@
tasklogger = customlogger

# Download METS file
storage_service = StorageService.query.get(storage_service_id)
download_file = download_mets(
api_url, package_uuid, relative_path_to_mets, timestamp_str, package_list_no
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)
mets_name = os.path.basename(download_file)
mets_hash = file_sha256_hash(download_file)
Expand Down
Loading
Loading