Skip to content

Commit

Permalink
feat: upload new and revised datasets to GBIF
Browse files Browse the repository at this point in the history
Implement a new function for uploading both new and revised datasets to
GBIF. Build the workflow to handle typical conditions and edge cases.
Additionally, create integration tests for making actual HTTP calls,
extended tests meant for occasional manual execution, and mock HTTP
calls, which are always run and provide faster results.
  • Loading branch information
clnsmth authored Oct 20, 2023
1 parent f7210e1 commit 53219b6
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ remove_dist = false # don't remove dists
patch_without_tag = true # patch release by default

[tool.pylint.'MESSAGES CONTROL']
disable = "c-extension-no-member"
disable = "c-extension-no-member, protected-access, too-many-arguments"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
1 change: 1 addition & 0 deletions src/gbif_registrar/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
INSTALLATION = "92d76df5-3de1-4c89-be03-7a17abad962a"
GBIF_API = "http://api.gbif-uat.org/v1/dataset"
REGISTRY_BASE_URL = "https://registry.gbif-uat.org/dataset"
GBIF_DATASET_BASE_URL = "https://www.gbif-uat.org/dataset"

# PASTA_ENVIRONMENT can be: "https://pasta.lternet.edu" or
# "https://pasta-s.lternet.edu"
Expand Down
163 changes: 126 additions & 37 deletions src/gbif_registrar/crawl.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,149 @@
"""Functions for calling a GBIF crawl."""

from gbif_registrar.config import REGISTRY_BASE_URL
from gbif_registrar._utilities import (
_has_metadata,
_post_new_metadata_document,
_post_local_dataset_endpoint,
_delete_local_dataset_endpoints,
)
from time import sleep
from gbif_registrar import _utilities
from gbif_registrar.config import GBIF_DATASET_BASE_URL, REGISTRY_BASE_URL


def initiate_crawl(local_dataset_id, local_dataset_endpoint, gbif_dataset_uuid):
"""Initiate a crawl for a dataset at GBIF.
def upload_dataset(local_dataset_id, registrations_file):
"""Upload a local dataset to GBIF.
Parameters
----------
local_dataset_id : str
The identifier of the dataset in the EDI repository. Has the format:
{scope}.{identifier}.{revision}.
local_dataset_endpoint : str
This is the URL for downloading the dataset (.zip archive) at the EDI
repository. This value can be obtained from the
_get_local_dataset_endpoint function in the utilities module.
gbif_dataset_uuid : str
The registration identifier assigned by GBIF to the local dataset
group.
registrations_file : str
The path to the registrations file.
Returns
-------
None
The registrations file as a .csv.
Notes
-----
The synchronization status of the dataset is written to the registrations
file. The status is True if the dataset was successfully synchronized with
GBIF and False otherwise.
Print messages indicate the progress of the upload process. The messages
are written to the standard output stream (stdout).
"""
# Notify user of the dataset being crawled and provide link to the dataset
# registry for details and troubleshooting.
dataset_registry_url = REGISTRY_BASE_URL + "/" + gbif_dataset_uuid
print(
"Initiating crawl for EDI dataset '"
+ local_dataset_id
+ "' / GBIF dataset '"
+ gbif_dataset_uuid
+ "'. See GBIF Registry "
+ "for details:\n"
+ dataset_registry_url
)
print(f"Uploading {local_dataset_id} to GBIF.")

# Read the registrations file to obtain relevant information for the upload
# process.
with open(registrations_file, "r", encoding="utf-8") as registrations:
registrations = _utilities._read_registrations_file(registrations_file)

# Stop if not registered
if local_dataset_id not in registrations["local_dataset_id"].values:
print(
"The local dataset ID is not in the registrations file. "
"Registration is required first."
)
return None

# Obtain relevant information for the upload process from the registrations
# file.
index = registrations.index[
registrations["local_dataset_id"] == local_dataset_id
].tolist()[0]
local_dataset_endpoint = registrations.loc[index, "local_dataset_endpoint"]
gbif_dataset_uuid = registrations.loc[index, "gbif_dataset_uuid"]
synchronized = registrations.loc[index, "synchronized"]

# Check if the local_dataset_id is already synchronized with GBIF and stop
# the upload process if it is.
if synchronized:
print(
f"{local_dataset_id} is already synchronized with GBIF. Skipping"
f" the upload process."
)
return None

# There is a latency in the initialization of a data package group on GBIF
# that can result in the is_synchronized function failing on string parsing
# errors. This case is unlikely to occur under other contexts than
# upload_dataset, so we handle it here.
try:
synchronized = _utilities._is_synchronized(local_dataset_id, registrations_file)
except AttributeError:
synchronized = False
if synchronized:
# Handle the case of a successful upload but timed out synchronization
# check, which would result in the status being False in the
# registrations file.
index = registrations.index[
registrations["local_dataset_id"] == local_dataset_id
].tolist()[0]
if not registrations.loc[index, "synchronized"]:
registrations.loc[index, "synchronized"] = True
registrations.to_csv(registrations_file, index=False, mode="w")
print(
f"Updated the registrations file with the missing "
f"synchronization status of {local_dataset_id}."
)
return None

# Clear the list of local endpoints so when the endpoint is added below,
# it will result in only one being listed on the GBIF dataset landing page.
# Multiple listings could be confusing to end users.
_delete_local_dataset_endpoints(gbif_dataset_uuid)
_utilities._delete_local_dataset_endpoints(gbif_dataset_uuid)
print("Deleted local dataset endpoints from GBIF.")

# Post the local dataset endpoint to GBIF. This will initiate a crawl of
# the local dataset landing page metadata on the first post but not on
# subsequent posts (updates).
_post_local_dataset_endpoint(local_dataset_endpoint, gbif_dataset_uuid)

# Post a new metadata document to update the GBIF landing page. This is
# necessary because GBIF doesn't "re-crawl" the local dataset metadata when
# the new local dataset endpoint is updated.
if _has_metadata(gbif_dataset_uuid):
_post_new_metadata_document(local_dataset_id, gbif_dataset_uuid)
# subsequent posts (updates). In the latter case, the local dataset
# landing page metadata will also need to be posted to update the GBIF
# landing page (below).
_utilities._post_local_dataset_endpoint(local_dataset_endpoint, gbif_dataset_uuid)
print(f"Posted local dataset endpoint {local_dataset_endpoint} to GBIF.")

# For revised datasets, post a new metadata document to update the GBIF
# landing page. This is necessary because GBIF doesn't "re-crawl" the
# local dataset metadata when the new local dataset endpoint is updated.
_utilities._post_new_metadata_document(local_dataset_id, gbif_dataset_uuid)
print(f"Posted new metadata document for {local_dataset_id} to GBIF.")

# Run the is_synchronized function until a True value is returned or the
# max number of attempts is reached.
synchronized = False
max_attempts = 12 # Average synchronization time is 20 seconds
attempts = 0
while not synchronized and attempts < max_attempts:
print(f"Checking if {local_dataset_id} is synchronized with GBIF.")
synchronized = _utilities._is_synchronized(local_dataset_id, registrations_file)
attempts += 1
sleep(5)

# Update the registrations file with the new status
if synchronized:
print(f"{local_dataset_id} is synchronized with GBIF.")
with open(registrations_file, "r", encoding="utf-8") as registrations:
registrations = _utilities._read_registrations_file(registrations_file)
registrations.loc[index, "synchronized"] = True
registrations.to_csv(registrations_file, index=False, mode="w")
print(
f"Updated the registrations file with the new synchronization "
f"status of {local_dataset_id}."
)
print(f"Upload of {local_dataset_id} to GBIF is complete.")
print(
"View the dataset on GBIF at:",
GBIF_DATASET_BASE_URL + "/" + gbif_dataset_uuid,
)
else:
print(
f"Checks on the synchronization status of {local_dataset_id} "
f"with GBIF timed out. Please check the GBIF log page later."
f"Once synchronization has occured, run "
f"complete_registration_records function to reflect this "
f"update."
)
print(
f"For more information, see the GBIF log page for " f"{local_dataset_id}:",
REGISTRY_BASE_URL + "/" + gbif_dataset_uuid,
)
return None
26 changes: 26 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,32 @@ def local_dataset_id_fixture():
return "edi.929.2"


@pytest.fixture(name="mock_update_dataset_success")
def mock_update_dataset_success_fixture(mocker, gbif_dataset_uuid):
"""Create a mock_update_dataset_success fixture for tests that use a
similar pattern of calls to the GBIF API."""
mocker.patch(
"gbif_registrar.register._get_gbif_dataset_uuid", return_value=gbif_dataset_uuid
)
mocker.patch(
"gbif_registrar._utilities._delete_local_dataset_endpoints", return_value=None
)
mocker.patch(
"gbif_registrar._utilities._post_local_dataset_endpoint", return_value=None
)
mocker.patch(
"gbif_registrar._utilities._post_new_metadata_document", return_value=None
)
# The alternating side effects (below) are required to pass the first
# synchronization check and continue on to the second synchronization
# check. We list this pattern twice because update_dataset() is called
# twice in the test.
mocker.patch(
"gbif_registrar._utilities._is_synchronized",
side_effect=[False, True, False, True],
)


@pytest.fixture(name="rgstrs")
def rgstrs_fixture():
"""Read the test registrations file into DataFrame fixture."""
Expand Down
Loading

0 comments on commit 53219b6

Please sign in to comment.