From aee5cc375621af2e2a7e6ca3dfd8fd3e05688440 Mon Sep 17 00:00:00 2001 From: Byron Himes Date: Tue, 19 Nov 2024 11:28:08 +0100 Subject: [PATCH] IFRS: Do idempotence in core (GSI-1091) (#61) * IFRS: Remove idempotence handler layer * IFRS: Update tests (remove outbox tests) * IFRS: Bump version number from 2.0.3 -> 2.1.0 * IFRS: log FileNotInRegistryError instead of raise * IFRS: Revert to outbox subscriber * IFRS: Update the delete method doc strings * IFRS: Log delete-type events as errors, not warnings --- .../ifrs/.readme_generation/description.md | 2 +- services/ifrs/README.md | 8 +- services/ifrs/pyproject.toml | 2 +- .../src/ifrs/adapters/inbound/event_sub.py | 63 ++-- .../src/ifrs/adapters/inbound/idempotent.py | 167 ----------- .../ifrs/src/ifrs/adapters/inbound/models.py | 45 --- .../ifrs/src/ifrs/adapters/inbound/utils.py | 61 ---- .../ifrs/src/ifrs/adapters/outbound/dao.py | 41 +-- services/ifrs/src/ifrs/core/file_registry.py | 6 +- services/ifrs/src/ifrs/inject.py | 25 +- .../src/ifrs/ports/inbound/file_registry.py | 2 - .../ifrs/src/ifrs/ports/inbound/idempotent.py | 69 ----- services/ifrs/src/ifrs/ports/outbound/dao.py | 6 - services/ifrs/tests_ifrs/fixtures/joint.py | 19 +- .../ifrs/tests_ifrs/test_ifrs_edge_cases.py | 103 ++++++- .../tests_ifrs/test_ifrs_typical_journey.py | 2 +- .../ifrs/tests_ifrs/test_outbox_subscriber.py | 277 ------------------ 17 files changed, 155 insertions(+), 743 deletions(-) delete mode 100644 services/ifrs/src/ifrs/adapters/inbound/idempotent.py delete mode 100644 services/ifrs/src/ifrs/adapters/inbound/models.py delete mode 100644 services/ifrs/src/ifrs/adapters/inbound/utils.py delete mode 100644 services/ifrs/src/ifrs/ports/inbound/idempotent.py delete mode 100644 services/ifrs/tests_ifrs/test_outbox_subscriber.py diff --git a/services/ifrs/.readme_generation/description.md b/services/ifrs/.readme_generation/description.md index 77d93ffc..1a8e1d62 100644 --- a/services/ifrs/.readme_generation/description.md +++ b/services/ifrs/.readme_generation/description.md @@ -2,7 +2,7 @@ This service provides functionality to administer files stored in an S3-compatib object storage. All file-related metadata is stored in an internal mongodb database, owned and controlled by this service. -It exposes no REST API enpoints and communicates with other services via events. +It exposes no REST API endpoints and communicates with other services via events. ### Events consumed: diff --git a/services/ifrs/README.md b/services/ifrs/README.md index 27270b6a..6a284bc5 100644 --- a/services/ifrs/README.md +++ b/services/ifrs/README.md @@ -8,7 +8,7 @@ This service provides functionality to administer files stored in an S3-compatib object storage. All file-related metadata is stored in an internal mongodb database, owned and controlled by this service. -It exposes no REST API enpoints and communicates with other services via events. +It exposes no REST API endpoints and communicates with other services via events. ### Events consumed: @@ -36,13 +36,13 @@ We recommend using the provided Docker container. A pre-build version is available at [docker hub](https://hub.docker.com/repository/docker/ghga/internal-file-registry-service): ```bash -docker pull ghga/internal-file-registry-service:2.0.4 +docker pull ghga/internal-file-registry-service:2.1.0 ``` Or you can build the container yourself from the [`./Dockerfile`](./Dockerfile): ```bash # Execute in the repo's root dir: -docker build -t ghga/internal-file-registry-service:2.0.4 . +docker build -t ghga/internal-file-registry-service:2.1.0 . ``` For production-ready deployment, we recommend using Kubernetes, however, @@ -50,7 +50,7 @@ for simple use cases, you could execute the service using docker on a single server: ```bash # The entrypoint is preconfigured: -docker run -p 8080:8080 ghga/internal-file-registry-service:2.0.4 --help +docker run -p 8080:8080 ghga/internal-file-registry-service:2.1.0 --help ``` If you prefer not to use containers, you may install the service from source: diff --git a/services/ifrs/pyproject.toml b/services/ifrs/pyproject.toml index 0b0b9708..4d8b8494 100644 --- a/services/ifrs/pyproject.toml +++ b/services/ifrs/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "ifrs" -version = "2.0.4" +version = "2.1.0" description = "Internal File Registry Service - This service acts as a registry for the internal location and representation of files." readme = "README.md" authors = [ diff --git a/services/ifrs/src/ifrs/adapters/inbound/event_sub.py b/services/ifrs/src/ifrs/adapters/inbound/event_sub.py index ab69c9e5..5de303c3 100644 --- a/services/ifrs/src/ifrs/adapters/inbound/event_sub.py +++ b/services/ifrs/src/ifrs/adapters/inbound/event_sub.py @@ -22,7 +22,8 @@ from pydantic import Field from pydantic_settings import BaseSettings -from ifrs.ports.inbound.idempotent import IdempotenceHandlerPort +from ifrs.core.models import FileMetadataBase +from ifrs.ports.inbound.file_registry import FileRegistryPort log = logging.getLogger(__name__) @@ -48,7 +49,7 @@ class OutboxSubTranslatorConfig(BaseSettings): ) -class NonstagedFileRequestedListener( +class NonstagedFileRequestedTranslator( DaoSubscriberProtocol[event_schemas.NonStagedFileRequested] ): """A class that consumes NonStagedFileRequested events.""" @@ -60,28 +61,31 @@ def __init__( self, *, config: OutboxSubTranslatorConfig, - idempotence_handler: IdempotenceHandlerPort, + file_registry: FileRegistryPort, ): - self._idempotence_handler = idempotence_handler + self._file_registry = file_registry self.event_topic = config.files_to_stage_topic async def changed( self, resource_id: str, update: event_schemas.NonStagedFileRequested ) -> None: """Consume change event (created or updated) for download request data.""" - await self._idempotence_handler.upsert_nonstaged_file_requested( - resource_id=resource_id, update=update + await self._file_registry.stage_registered_file( + file_id=resource_id, + decrypted_sha256=update.decrypted_sha256, + outbox_object_id=update.target_object_id, + outbox_bucket_id=update.target_bucket_id, ) async def deleted(self, resource_id: str) -> None: - """Consume event indicating the deletion of a NonStagedFileRequested event.""" - log.warning( + """This should never be called because these events are stateless and not saved.""" + log.error( "Received DELETED-type event for NonStagedFileRequested with resource ID '%s'", resource_id, ) -class FileDeletionRequestedListener( +class FileDeletionRequestedTranslator( DaoSubscriberProtocol[event_schemas.FileDeletionRequested] ): """A class that consumes FileDeletionRequested events.""" @@ -93,28 +97,26 @@ def __init__( self, *, config: OutboxSubTranslatorConfig, - idempotence_handler: IdempotenceHandlerPort, + file_registry: FileRegistryPort, ): - self._idempotence_handler = idempotence_handler + self._file_registry = file_registry self.event_topic = config.files_to_delete_topic async def changed( self, resource_id: str, update: event_schemas.FileDeletionRequested ) -> None: """Consume change event (created or updated) for File Deletion Requests.""" - await self._idempotence_handler.upsert_file_deletion_requested( - resource_id=resource_id, update=update - ) + await self._file_registry.delete_file(file_id=resource_id) async def deleted(self, resource_id: str) -> None: - """Consume event indicating the deletion of a File Deletion Request.""" - log.warning( + """This should never be called because these events are stateless and not saved.""" + log.error( "Received DELETED-type event for FileDeletionRequested with resource ID '%s'", resource_id, ) -class FileValidationSuccessListener( +class FileValidationSuccessTranslator( DaoSubscriberProtocol[event_schemas.FileUploadValidationSuccess] ): """A class that consumes FileUploadValidationSuccess events.""" @@ -126,22 +128,37 @@ def __init__( self, *, config: OutboxSubTranslatorConfig, - idempotence_handler: IdempotenceHandlerPort, + file_registry: FileRegistryPort, ): - self._idempotence_handler = idempotence_handler + self._file_registry = file_registry self.event_topic = config.files_to_register_topic async def changed( self, resource_id: str, update: event_schemas.FileUploadValidationSuccess ) -> None: """Consume change event (created or updated) for FileUploadValidationSuccess events.""" - await self._idempotence_handler.upsert_file_upload_validation_success( - resource_id=resource_id, update=update + file_without_object_id = FileMetadataBase( + file_id=resource_id, + decrypted_sha256=update.decrypted_sha256, + decrypted_size=update.decrypted_size, + upload_date=update.upload_date, + decryption_secret_id=update.decryption_secret_id, + encrypted_part_size=update.encrypted_part_size, + encrypted_parts_md5=update.encrypted_parts_md5, + encrypted_parts_sha256=update.encrypted_parts_sha256, + content_offset=update.content_offset, + storage_alias=update.s3_endpoint_alias, + ) + + await self._file_registry.register_file( + file_without_object_id=file_without_object_id, + staging_object_id=update.object_id, + staging_bucket_id=update.bucket_id, ) async def deleted(self, resource_id: str) -> None: - """Consume event indicating the deletion of a FileUploadValidationSuccess events.""" - log.warning( + """This should never be called because these events are stateless and not saved.""" + log.error( "Received DELETED-type event for FileUploadValidationSuccess with resource ID '%s'", resource_id, ) diff --git a/services/ifrs/src/ifrs/adapters/inbound/idempotent.py b/services/ifrs/src/ifrs/adapters/inbound/idempotent.py deleted file mode 100644 index d08be622..00000000 --- a/services/ifrs/src/ifrs/adapters/inbound/idempotent.py +++ /dev/null @@ -1,167 +0,0 @@ -# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln -# for the German Human Genome-Phenome Archive (GHGA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Contains a class to serve outbox event data to the core in an idempotent manner.""" - -from ghga_event_schemas import pydantic_ as event_schemas -from hexkit.providers.mongodb import MongoDbDaoFactory - -from ifrs.adapters.inbound import models -from ifrs.adapters.inbound.utils import check_record_is_new, make_record_from_update -from ifrs.adapters.outbound.dao import ( - get_file_deletion_requested_dao, - get_file_upload_validation_success_dao, - get_nonstaged_file_requested_dao, -) -from ifrs.config import Config -from ifrs.core.models import FileMetadataBase -from ifrs.ports.inbound.file_registry import FileRegistryPort -from ifrs.ports.inbound.idempotent import IdempotenceHandlerPort -from ifrs.ports.outbound.dao import ( - FileDeletionRequestedDaoPort, - FileUploadValidationSuccessDaoPort, - NonStagedFileRequestedDaoPort, -) - - -async def get_idempotence_handler( - *, - config: Config, - file_registry: FileRegistryPort, -) -> IdempotenceHandlerPort: - """Get an instance of the IdempotenceHandler.""" - dao_factory = MongoDbDaoFactory(config=config) - file_deletion_requested_dao = await get_file_deletion_requested_dao( - dao_factory=dao_factory - ) - file_upload_validation_success_dao = await get_file_upload_validation_success_dao( - dao_factory=dao_factory - ) - nonstaged_file_requested_dao = await get_nonstaged_file_requested_dao( - dao_factory=dao_factory - ) - - return IdempotenceHandler( - file_registry=file_registry, - file_deletion_requested_dao=file_deletion_requested_dao, - file_upload_validation_success_dao=file_upload_validation_success_dao, - nonstaged_file_requested_dao=nonstaged_file_requested_dao, - ) - - -class IdempotenceHandler(IdempotenceHandlerPort): - """Class to serve outbox event data to the core in an idempotent manner.""" - - def __init__( - self, - *, - file_registry: FileRegistryPort, - nonstaged_file_requested_dao: NonStagedFileRequestedDaoPort, - file_upload_validation_success_dao: FileUploadValidationSuccessDaoPort, - file_deletion_requested_dao: FileDeletionRequestedDaoPort, - ): - self._file_registry = file_registry - self._nonstaged_file_requested_dao = nonstaged_file_requested_dao - self._file_upload_validation_success_dao = file_upload_validation_success_dao - self._file_deletion_requested_dao = file_deletion_requested_dao - - async def upsert_nonstaged_file_requested( - self, *, resource_id: str, update: event_schemas.NonStagedFileRequested - ) -> None: - """Upsert a NonStagedFileRequested event. Call `stage_registered_file` if the - idempotence check is passed. - - Args: - resource_id: - The resource ID. - update: - The NonStagedFileRequested event to upsert. - """ - record = make_record_from_update(models.NonStagedFileRequestedRecord, update) - if await check_record_is_new( - dao=self._nonstaged_file_requested_dao, - resource_id=resource_id, - update=update, - record=record, - ): - await self._file_registry.stage_registered_file( - file_id=resource_id, - decrypted_sha256=update.decrypted_sha256, - outbox_object_id=update.target_object_id, - outbox_bucket_id=update.target_bucket_id, - ) - await self._nonstaged_file_requested_dao.insert(record) - - async def upsert_file_deletion_requested( - self, *, resource_id: str, update: event_schemas.FileDeletionRequested - ) -> None: - """Upsert a FileDeletionRequested event. Call `delete_file` if the idempotence - check is passed. - - Args: - resource_id: - The resource ID. - update: - The FileDeletionRequested event to upsert. - """ - record = make_record_from_update(models.FileDeletionRequestedRecord, update) - if await check_record_is_new( - dao=self._file_deletion_requested_dao, - resource_id=resource_id, - update=update, - record=record, - ): - await self._file_registry.delete_file(file_id=resource_id) - await self._file_deletion_requested_dao.insert(record) - - async def upsert_file_upload_validation_success( - self, *, resource_id: str, update: event_schemas.FileUploadValidationSuccess - ) -> None: - """Upsert a FileUploadValidationSuccess event. Call `register_file` if the - idempotence check is passed. - - Args: - resource_id: - The resource ID. - update: - The FileUploadValidationSuccess event to upsert. - """ - record = make_record_from_update( - models.FileUploadValidationSuccessRecord, update - ) - if await check_record_is_new( - dao=self._file_upload_validation_success_dao, - resource_id=resource_id, - update=update, - record=record, - ): - file_without_object_id = FileMetadataBase( - file_id=update.file_id, - decrypted_sha256=update.decrypted_sha256, - decrypted_size=update.decrypted_size, - upload_date=update.upload_date, - decryption_secret_id=update.decryption_secret_id, - encrypted_part_size=update.encrypted_part_size, - encrypted_parts_md5=update.encrypted_parts_md5, - encrypted_parts_sha256=update.encrypted_parts_sha256, - content_offset=update.content_offset, - storage_alias=update.s3_endpoint_alias, - ) - - await self._file_registry.register_file( - file_without_object_id=file_without_object_id, - staging_object_id=update.object_id, - staging_bucket_id=update.bucket_id, - ) - await self._file_upload_validation_success_dao.insert(record) diff --git a/services/ifrs/src/ifrs/adapters/inbound/models.py b/services/ifrs/src/ifrs/adapters/inbound/models.py deleted file mode 100644 index fef5a0f5..00000000 --- a/services/ifrs/src/ifrs/adapters/inbound/models.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln -# for the German Human Genome-Phenome Archive (GHGA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Models for inbound adapter idempotence functionality""" - -from ghga_event_schemas import pydantic_ as event_schemas -from pydantic import BaseModel, Field - - -class IdempotenceRecord(BaseModel): - """A record of an event for idempotence purposes.""" - - correlation_id: str = Field( - default=..., - description="The correlation ID associated with the request event.", - ) - - -class NonStagedFileRequestedRecord( - IdempotenceRecord, event_schemas.NonStagedFileRequested -): - """A record of a NonStagedFileRequested event for idempotence purposes.""" - - -class FileDeletionRequestedRecord( - IdempotenceRecord, event_schemas.FileDeletionRequested -): - """A record of a FileDeletionRequested event for idempotence purposes.""" - - -class FileUploadValidationSuccessRecord( - IdempotenceRecord, event_schemas.FileUploadValidationSuccess -): - """A record of a FileUploadValidationSuccess event for idempotence purposes.""" diff --git a/services/ifrs/src/ifrs/adapters/inbound/utils.py b/services/ifrs/src/ifrs/adapters/inbound/utils.py deleted file mode 100644 index e76647a4..00000000 --- a/services/ifrs/src/ifrs/adapters/inbound/utils.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln -# for the German Human Genome-Phenome Archive (GHGA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Logic that isn't directly owned by the file registry but is used by it.""" - -import logging -from contextlib import suppress -from typing import TypeVar - -from hexkit.correlation import get_correlation_id -from hexkit.protocols.dao import DaoNaturalId, NoHitsFoundError -from pydantic import BaseModel - -from ifrs.adapters.inbound import models - -log = logging.getLogger(__name__) - -RecordType = TypeVar("RecordType", bound=models.IdempotenceRecord) - - -def make_record_from_update( - record_type: type[RecordType], update: BaseModel -) -> RecordType: - """Get an IdempotenceRecord containing the update payload and correlation ID.""" - correlation_id = get_correlation_id() - return record_type(correlation_id=correlation_id, **update.model_dump()) - - -async def check_record_is_new( - dao: DaoNaturalId, - resource_id: str, - update: BaseModel, - record: models.IdempotenceRecord, -): - """Returns whether or not the record is new and emits a debug log if it is not.""" - with suppress(NoHitsFoundError): - matching_record = await dao.find_one(mapping=record.model_dump()) - - if matching_record: - log.debug( - ( - "Event with '%s' schema for resource ID '%s' has" - + " already been processed under current correlation_id. Skipping." - ), - type(update).__name__, - resource_id, - ) - return False - return True diff --git a/services/ifrs/src/ifrs/adapters/outbound/dao.py b/services/ifrs/src/ifrs/adapters/outbound/dao.py index 3b17afd4..ff730f2d 100644 --- a/services/ifrs/src/ifrs/adapters/outbound/dao.py +++ b/services/ifrs/src/ifrs/adapters/outbound/dao.py @@ -17,14 +17,8 @@ from hexkit.protocols.dao import DaoFactoryProtocol -from ifrs.adapters.inbound import models from ifrs.core.models import FileMetadata -from ifrs.ports.outbound.dao import ( - FileDeletionRequestedDaoPort, - FileMetadataDaoPort, - FileUploadValidationSuccessDaoPort, - NonStagedFileRequestedDaoPort, -) +from ifrs.ports.outbound.dao import FileMetadataDaoPort async def get_file_metadata_dao( @@ -36,36 +30,3 @@ async def get_file_metadata_dao( dto_model=FileMetadata, id_field="file_id", ) - - -async def get_nonstaged_file_requested_dao( - *, dao_factory: DaoFactoryProtocol -) -> NonStagedFileRequestedDaoPort: - """Setup the DAOs using the specified provider of the DaoFactoryProtocol.""" - return await dao_factory.get_dao( - name="nonstaged_file_requested", - dto_model=models.NonStagedFileRequestedRecord, - id_field="file_id", - ) - - -async def get_file_upload_validation_success_dao( - *, dao_factory: DaoFactoryProtocol -) -> FileUploadValidationSuccessDaoPort: - """Setup the DAOs using the specified provider of the DaoFactoryProtocol.""" - return await dao_factory.get_dao( - name="file_upload_validation_success", - dto_model=models.FileUploadValidationSuccessRecord, - id_field="file_id", - ) - - -async def get_file_deletion_requested_dao( - *, dao_factory: DaoFactoryProtocol -) -> FileDeletionRequestedDaoPort: - """Setup the DAOs using the specified provider of the DaoFactoryProtocol.""" - return await dao_factory.get_dao( - name="file_deletion_requested", - dto_model=models.FileDeletionRequestedRecord, - id_field="file_id", - ) diff --git a/services/ifrs/src/ifrs/core/file_registry.py b/services/ifrs/src/ifrs/core/file_registry.py index 40f4a50d..dc6ff84a 100644 --- a/services/ifrs/src/ifrs/core/file_registry.py +++ b/services/ifrs/src/ifrs/core/file_registry.py @@ -186,8 +186,6 @@ async def stage_registered_file( The S3 bucket ID for the outbox. Raises: - self.FileNotInRegistryError: - When a file is requested that has not (yet) been registered. self.ChecksumMismatchError: When the provided checksum did not match the expectations. self.FileInRegistryButNotInStorageError: @@ -197,10 +195,10 @@ async def stage_registered_file( """ try: file = await self._file_metadata_dao.get_by_id(file_id) - except ResourceNotFoundError as error: + except ResourceNotFoundError: file_not_in_registry_error = self.FileNotInRegistryError(file_id=file_id) log.error(file_not_in_registry_error, extra={"file_id": file_id}) - raise file_not_in_registry_error from error + return if decrypted_sha256 != file.decrypted_sha256: checksum_error = self.ChecksumMismatchError( diff --git a/services/ifrs/src/ifrs/inject.py b/services/ifrs/src/ifrs/inject.py index 7eb58e9e..0b83798c 100644 --- a/services/ifrs/src/ifrs/inject.py +++ b/services/ifrs/src/ifrs/inject.py @@ -24,17 +24,15 @@ from hexkit.providers.mongodb import MongoDbDaoFactory from ifrs.adapters.inbound.event_sub import ( - FileDeletionRequestedListener, - FileValidationSuccessListener, - NonstagedFileRequestedListener, + FileDeletionRequestedTranslator, + FileValidationSuccessTranslator, + NonstagedFileRequestedTranslator, ) -from ifrs.adapters.inbound.idempotent import get_idempotence_handler from ifrs.adapters.outbound import dao from ifrs.adapters.outbound.event_pub import EventPubTranslator from ifrs.config import Config from ifrs.core.file_registry import FileRegistry from ifrs.ports.inbound.file_registry import FileRegistryPort -from ifrs.ports.inbound.idempotent import IdempotenceHandlerPort @asynccontextmanager @@ -75,7 +73,6 @@ async def prepare_outbox_subscriber( *, config: Config, core_override: FileRegistryPort | None = None, - idempotence_handler_override: IdempotenceHandlerPort | None = None, ) -> AsyncGenerator[KafkaOutboxSubscriber, None]: """Construct and initialize an event subscriber with all its dependencies. By default, the core dependencies are automatically prepared but you can also @@ -84,21 +81,15 @@ async def prepare_outbox_subscriber( async with prepare_core_with_override( config=config, core_override=core_override ) as file_registry: - idempotence_handler = idempotence_handler_override - if not idempotence_handler: - idempotence_handler = await get_idempotence_handler( - config=config, - file_registry=file_registry, - ) - outbox_translators = [ - cls(config=config, idempotence_handler=idempotence_handler) + cls(config=config, file_registry=file_registry) for cls in ( - FileDeletionRequestedListener, - FileValidationSuccessListener, - NonstagedFileRequestedListener, + FileDeletionRequestedTranslator, + FileValidationSuccessTranslator, + NonstagedFileRequestedTranslator, ) ] + async with KafkaOutboxSubscriber.construct( config=config, translators=outbox_translators ) as kafka_outbox_subscriber: diff --git a/services/ifrs/src/ifrs/ports/inbound/file_registry.py b/services/ifrs/src/ifrs/ports/inbound/file_registry.py index 0495d364..2d324f8b 100644 --- a/services/ifrs/src/ifrs/ports/inbound/file_registry.py +++ b/services/ifrs/src/ifrs/ports/inbound/file_registry.py @@ -141,8 +141,6 @@ async def stage_registered_file( The S3 bucket ID for the outbox. Raises: - self.FileNotInRegistryError: - When a file is requested that has not (yet) been registered. self.ChecksumMismatchError: When the provided checksum did not match the expectations. self.FileInRegistryButNotInStorageError: diff --git a/services/ifrs/src/ifrs/ports/inbound/idempotent.py b/services/ifrs/src/ifrs/ports/inbound/idempotent.py deleted file mode 100644 index 32024185..00000000 --- a/services/ifrs/src/ifrs/ports/inbound/idempotent.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln -# for the German Human Genome-Phenome Archive (GHGA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Describes generic functionality of a class that handles outbox event idempotence.""" - -from abc import ABC, abstractmethod - -from ghga_event_schemas import pydantic_ as event_schemas - - -class IdempotenceHandlerPort(ABC): - """Class to serve outbox event data to the core in an idempotent manner.""" - - @abstractmethod - async def upsert_nonstaged_file_requested( - self, *, resource_id: str, update: event_schemas.NonStagedFileRequested - ) -> None: - """Upsert a NonStagedFileRequested event. Call `stage_registered_file` if the - idempotence check is passed. - - Args: - resource_id: - The resource ID. - update: - The NonStagedFileRequested event to upsert. - """ - ... - - @abstractmethod - async def upsert_file_deletion_requested( - self, *, resource_id: str, update: event_schemas.FileDeletionRequested - ) -> None: - """Upsert a FileDeletionRequested event. Call `delete_file` if the idempotence - check is passed. - - Args: - resource_id: - The resource ID. - update: - The FileDeletionRequested event to upsert. - """ - ... - - @abstractmethod - async def upsert_file_upload_validation_success( - self, *, resource_id: str, update: event_schemas.FileUploadValidationSuccess - ) -> None: - """Upsert a FileUploadValidationSuccess event. Call `register_file` if the - idempotence check is passed. - - Args: - resource_id: - The resource ID. - update: - The FileUploadValidationSuccess event to upsert. - """ - ... diff --git a/services/ifrs/src/ifrs/ports/outbound/dao.py b/services/ifrs/src/ifrs/ports/outbound/dao.py index 23972656..aad395be 100644 --- a/services/ifrs/src/ifrs/ports/outbound/dao.py +++ b/services/ifrs/src/ifrs/ports/outbound/dao.py @@ -17,13 +17,7 @@ from hexkit.protocols.dao import DaoNaturalId, ResourceNotFoundError # noqa: F401 -from ifrs.adapters.inbound import models from ifrs.core.models import FileMetadata # port described by a type alias: FileMetadataDaoPort = DaoNaturalId[FileMetadata] -NonStagedFileRequestedDaoPort = DaoNaturalId[models.NonStagedFileRequestedRecord] -FileUploadValidationSuccessDaoPort = DaoNaturalId[ - models.FileUploadValidationSuccessRecord -] -FileDeletionRequestedDaoPort = DaoNaturalId[models.FileDeletionRequestedRecord] diff --git a/services/ifrs/tests_ifrs/fixtures/joint.py b/services/ifrs/tests_ifrs/fixtures/joint.py index beefaab7..7459c4eb 100644 --- a/services/ifrs/tests_ifrs/fixtures/joint.py +++ b/services/ifrs/tests_ifrs/fixtures/joint.py @@ -36,12 +36,10 @@ from hexkit.providers.mongodb.testutils import MongoDbFixture from hexkit.providers.s3.testutils import S3Fixture -from ifrs.adapters.inbound.idempotent import get_idempotence_handler from ifrs.adapters.outbound.dao import get_file_metadata_dao from ifrs.config import Config from ifrs.inject import prepare_core, prepare_outbox_subscriber from ifrs.ports.inbound.file_registry import FileRegistryPort -from ifrs.ports.inbound.idempotent import IdempotenceHandlerPort from ifrs.ports.outbound.dao import FileMetadataDaoPort from tests_ifrs.fixtures.config import get_config @@ -53,7 +51,7 @@ @dataclass -class EndpointAliases: +class StorageAliases: node1: str = STORAGE_ALIASES[0] node2: str = STORAGE_ALIASES[1] fake: str = f"{STORAGE_ALIASES[0]}_fake" @@ -68,12 +66,11 @@ class JointFixture: s3: S3Fixture file_metadata_dao: FileMetadataDaoPort file_registry: FileRegistryPort - idempotence_handler: IdempotenceHandlerPort kafka: KafkaFixture outbox_subscriber: KafkaOutboxSubscriber outbox_bucket: str staging_bucket: str - endpoint_aliases: EndpointAliases + storage_aliases: StorageAliases @pytest_asyncio.fixture(scope="function") @@ -87,11 +84,11 @@ async def joint_fixture( bucket=PERMANENT_BUCKET, credentials=s3.config ) - endpoint_aliases = EndpointAliases() + storage_aliases = StorageAliases() object_storage_config = S3ObjectStoragesConfig( object_storages={ - endpoint_aliases.node1: node_config, + storage_aliases.node1: node_config, } ) @@ -102,14 +99,9 @@ async def joint_fixture( # Prepare the file registry (core) async with prepare_core(config=config) as file_registry: - idempotence_handler = await get_idempotence_handler( - config=config, - file_registry=file_registry, - ) async with prepare_outbox_subscriber( config=config, core_override=file_registry, - idempotence_handler_override=idempotence_handler, ) as outbox_subscriber: yield JointFixture( config=config, @@ -117,10 +109,9 @@ async def joint_fixture( s3=s3, file_metadata_dao=file_metadata_dao, file_registry=file_registry, - idempotence_handler=idempotence_handler, kafka=kafka, outbox_subscriber=outbox_subscriber, outbox_bucket=OUTBOX_BUCKET, staging_bucket=STAGING_BUCKET, - endpoint_aliases=endpoint_aliases, + storage_aliases=storage_aliases, ) diff --git a/services/ifrs/tests_ifrs/test_ifrs_edge_cases.py b/services/ifrs/tests_ifrs/test_ifrs_edge_cases.py index dc21aa3d..2f7d45ef 100644 --- a/services/ifrs/tests_ifrs/test_ifrs_edge_cases.py +++ b/services/ifrs/tests_ifrs/test_ifrs_edge_cases.py @@ -16,8 +16,13 @@ """Tests edge cases not covered by the typical journey test.""" import logging +from unittest.mock import AsyncMock import pytest +from ghga_event_schemas import pydantic_ as event_schemas +from ghga_service_commons.utils.utc_dates import now_as_utc +from hexkit.custom_types import JsonObject +from hexkit.providers.akafka.provider.daosub import CHANGE_EVENT_TYPE from hexkit.providers.s3.testutils import ( FileObject, S3Fixture, # noqa: F401 @@ -30,6 +35,32 @@ pytestmark = pytest.mark.asyncio() +TEST_FILE_ID = "test_id" +TEST_NONSTAGED_FILE_REQUESTED = event_schemas.NonStagedFileRequested( + file_id=TEST_FILE_ID, + target_object_id="", + target_bucket_id="", + s3_endpoint_alias="", + decrypted_sha256="", +) + +TEST_FILE_UPLOAD_VALIDATION_SUCCESS = event_schemas.FileUploadValidationSuccess( + upload_date=now_as_utc().isoformat(), + file_id=TEST_FILE_ID, + object_id="", + bucket_id="", + s3_endpoint_alias="", + decrypted_size=0, + decryption_secret_id="", + content_offset=0, + encrypted_part_size=0, + encrypted_parts_md5=[], + encrypted_parts_sha256=[], + decrypted_sha256="", +) + +TEST_FILE_DELETION_REQUESTED = event_schemas.FileDeletionRequested(file_id=TEST_FILE_ID) + async def test_register_with_empty_staging(joint_fixture: JointFixture): """Test registration of a file when the file content is missing from staging.""" @@ -49,7 +80,7 @@ async def test_reregistration( an exception). Test PR/Push workflow message """ storage = joint_fixture.s3 - storage_alias = joint_fixture.endpoint_aliases.node1 + storage_alias = joint_fixture.storage_aliases.node1 # place example content in the staging bucket: file_object = tmp_file.model_copy( @@ -102,7 +133,7 @@ async def test_reregistration_with_updated_metadata( expected exception. """ storage = joint_fixture.s3 - storage_alias = joint_fixture.endpoint_aliases.node1 + storage_alias = joint_fixture.storage_aliases.node1 # place example content in the staging bucket: file_object = tmp_file.model_copy( update={ @@ -151,17 +182,24 @@ async def test_reregistration_with_updated_metadata( assert expected_message in caplog.messages -async def test_stage_non_existing_file(joint_fixture: JointFixture): +async def test_stage_non_existing_file(joint_fixture: JointFixture, caplog): """Check that requesting to stage a non-registered file fails with the expected exception. """ - with pytest.raises(FileRegistryPort.FileNotInRegistryError): - await joint_fixture.file_registry.stage_registered_file( - file_id="notregisteredfile001", - decrypted_sha256=EXAMPLE_METADATA_BASE.decrypted_sha256, - outbox_object_id=EXAMPLE_METADATA.object_id, - outbox_bucket_id=joint_fixture.outbox_bucket, - ) + file_id = "notregisteredfile001" + error = joint_fixture.file_registry.FileNotInRegistryError(file_id=file_id) + + caplog.clear() + await joint_fixture.file_registry.stage_registered_file( + file_id=file_id, + decrypted_sha256=EXAMPLE_METADATA_BASE.decrypted_sha256, + outbox_object_id=EXAMPLE_METADATA.object_id, + outbox_bucket_id=joint_fixture.outbox_bucket, + ) + assert len(caplog.records) == 1 + record = caplog.records[0] + assert record.message == str(error) + assert record.levelname == "ERROR" async def test_stage_checksum_mismatch( @@ -175,7 +213,7 @@ async def test_stage_checksum_mismatch( await joint_fixture.file_metadata_dao.insert(EXAMPLE_METADATA) storage = joint_fixture.s3 - storage_alias = joint_fixture.endpoint_aliases.node1 + storage_alias = joint_fixture.storage_aliases.node1 bucket_id = joint_fixture.config.object_storages[storage_alias].bucket # place the content for an example file in the permanent storage: @@ -216,3 +254,46 @@ async def test_storage_db_inconsistency(joint_fixture: JointFixture): outbox_object_id=EXAMPLE_METADATA.object_id, outbox_bucket_id=joint_fixture.outbox_bucket, ) + + +@pytest.mark.parametrize( + "upsertion_event, topic_config_name, method_name", + [ + ( + TEST_FILE_DELETION_REQUESTED.model_dump(), + "files_to_delete_topic", + "delete_file", + ), + ( + TEST_FILE_UPLOAD_VALIDATION_SUCCESS.model_dump(), + "files_to_register_topic", + "register_file", + ), + ( + TEST_NONSTAGED_FILE_REQUESTED.model_dump(), + "files_to_stage_topic", + "stage_registered_file", + ), + ], +) +@pytest.mark.asyncio() +async def test_outbox_subscriber_routing( + joint_fixture: JointFixture, + upsertion_event: JsonObject, + topic_config_name: str, + method_name: str, + monkeypatch: pytest.MonkeyPatch, +): + """Make sure the outbox subscriber calls the correct method on the file registry.""" + topic = getattr(joint_fixture.config, topic_config_name) + mock = AsyncMock() + monkeypatch.setattr(joint_fixture.file_registry, method_name, mock) + await joint_fixture.kafka.publish_event( + payload=upsertion_event, + type_=CHANGE_EVENT_TYPE, + topic=topic, + key=TEST_FILE_ID, + ) + + await joint_fixture.outbox_subscriber.run(forever=False) + mock.assert_awaited_once() diff --git a/services/ifrs/tests_ifrs/test_ifrs_typical_journey.py b/services/ifrs/tests_ifrs/test_ifrs_typical_journey.py index 902d6131..bf6355e4 100644 --- a/services/ifrs/tests_ifrs/test_ifrs_typical_journey.py +++ b/services/ifrs/tests_ifrs/test_ifrs_typical_journey.py @@ -37,7 +37,7 @@ async def test_happy_journey( ): """Simulates a typical, successful journey for upload, download, and deletion""" storage = joint_fixture.s3 - storage_alias = joint_fixture.endpoint_aliases.node1 + storage_alias = joint_fixture.storage_aliases.node1 bucket_id = joint_fixture.config.object_storages[storage_alias].bucket # place example content in the staging: diff --git a/services/ifrs/tests_ifrs/test_outbox_subscriber.py b/services/ifrs/tests_ifrs/test_outbox_subscriber.py deleted file mode 100644 index c41db5ef..00000000 --- a/services/ifrs/tests_ifrs/test_outbox_subscriber.py +++ /dev/null @@ -1,277 +0,0 @@ -# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln -# for the German Human Genome-Phenome Archive (GHGA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Tests for functionality related to the outbox subscriber.""" - -from collections.abc import Callable -from unittest.mock import AsyncMock - -import pytest -from ghga_event_schemas import pydantic_ as event_schemas -from ghga_service_commons.utils.utc_dates import now_as_utc -from hexkit.correlation import get_correlation_id -from hexkit.custom_types import JsonObject -from hexkit.providers.mongodb import MongoDbDaoFactory -from logot import Logot, logged -from pydantic import BaseModel - -from ifrs.adapters.inbound import models -from ifrs.adapters.inbound.utils import check_record_is_new, make_record_from_update -from ifrs.adapters.outbound.dao import ( - get_file_deletion_requested_dao, -) -from tests_ifrs.fixtures.joint import JointFixture - -CHANGE_EVENT_TYPE = "upserted" -DELETE_EVENT_TYPE = "deleted" - - -TEST_FILE_ID = "test_id" - -TEST_NONSTAGED_FILE_REQUESTED = event_schemas.NonStagedFileRequested( - file_id=TEST_FILE_ID, - target_object_id="", - target_bucket_id="", - s3_endpoint_alias="", - decrypted_sha256="", -) - -TEST_FILE_UPLOAD_VALIDATION_SUCCESS = event_schemas.FileUploadValidationSuccess( - upload_date=now_as_utc().isoformat(), - file_id=TEST_FILE_ID, - object_id="", - bucket_id="", - s3_endpoint_alias="", - decrypted_size=0, - decryption_secret_id="", - content_offset=0, - encrypted_part_size=0, - encrypted_parts_md5=[], - encrypted_parts_sha256=[], - decrypted_sha256="", -) - -TEST_FILE_DELETION_REQUESTED = event_schemas.FileDeletionRequested(file_id=TEST_FILE_ID) - - -def test_make_record_from_update(): - """Test the get_record function""" - record = make_record_from_update( - models.FileDeletionRequestedRecord, TEST_FILE_DELETION_REQUESTED - ) - isinstance(record, models.FileDeletionRequestedRecord) - assert record.model_dump() == { - "correlation_id": get_correlation_id(), - "file_id": TEST_FILE_ID, - } - - -@pytest.mark.asyncio() -async def test_idempotence(joint_fixture: JointFixture, logot: Logot): - """Test the idempotence functionality when encountering a record that already exists""" - # First, insert the record that we want to collide with - dao_factory = MongoDbDaoFactory(config=joint_fixture.config) - dao = await get_file_deletion_requested_dao(dao_factory=dao_factory) - record = models.FileDeletionRequestedRecord( - correlation_id=get_correlation_id(), file_id=TEST_FILE_ID - ) - - record_is_new = await check_record_is_new( - dao=dao, - resource_id=TEST_FILE_ID, - update=TEST_FILE_DELETION_REQUESTED, - record=record, - ) - - assert record_is_new - - # insert record into the DB - await dao.insert(record) - - # rerun the assertion and verify that the result is False and that we get a log - record_is_new = await check_record_is_new( - dao=dao, - resource_id=TEST_FILE_ID, - update=TEST_FILE_DELETION_REQUESTED, - record=record, - ) - - assert not record_is_new - - # examine logs - logot.assert_logged( - logged.debug( - "Event with 'FileDeletionRequested' schema for resource ID 'test_id' has" - + " already been processed under current correlation_id. Skipping." - ) - ) - - -@pytest.mark.parametrize( - "upsertion_event, topic_config_name, method_name", - [ - ( - TEST_FILE_DELETION_REQUESTED.model_dump(), - "files_to_delete_topic", - "upsert_file_deletion_requested", - ), - ( - TEST_FILE_UPLOAD_VALIDATION_SUCCESS.model_dump(), - "files_to_register_topic", - "upsert_file_upload_validation_success", - ), - ( - TEST_NONSTAGED_FILE_REQUESTED.model_dump(), - "files_to_stage_topic", - "upsert_nonstaged_file_requested", - ), - ], -) -@pytest.mark.asyncio() -async def test_outbox_subscriber_routing( - joint_fixture: JointFixture, - upsertion_event: JsonObject, - topic_config_name: str, - method_name: str, - monkeypatch: pytest.MonkeyPatch, -): - """Make sure the outbox subscriber calls the correct method on the idempotence - handler for the given event. - """ - topic = getattr(joint_fixture.config, topic_config_name) - mock = AsyncMock() - monkeypatch.setattr(joint_fixture.idempotence_handler, method_name, mock) - await joint_fixture.kafka.publish_event( - payload=upsertion_event, - type_=CHANGE_EVENT_TYPE, - topic=topic, - key=TEST_FILE_ID, - ) - - await joint_fixture.outbox_subscriber.run(forever=False) - mock.assert_awaited_once() - - -@pytest.mark.parametrize( - "deletion_event, topic_config_name, event_type", - [ - ( - TEST_FILE_DELETION_REQUESTED.model_dump(), - "files_to_delete_topic", - "FileDeletionRequested", - ), - ( - TEST_FILE_UPLOAD_VALIDATION_SUCCESS.model_dump(), - "files_to_register_topic", - "FileUploadValidationSuccess", - ), - ( - TEST_NONSTAGED_FILE_REQUESTED.model_dump(), - "files_to_stage_topic", - "NonStagedFileRequested", - ), - ], -) -@pytest.mark.asyncio() -async def test_deletion_logs( - joint_fixture: JointFixture, - logot: Logot, - deletion_event: JsonObject, - topic_config_name: str, - event_type: str, -): - """Test that the outbox subscriber logs deletions correctly. - Consume a 'DELETED' event type for each of the outbox events. - """ - topic = getattr(joint_fixture.config, topic_config_name) - await joint_fixture.kafka.publish_event( - payload=deletion_event, - type_=DELETE_EVENT_TYPE, - topic=topic, - key=TEST_FILE_ID, - ) - await joint_fixture.outbox_subscriber.run(forever=False) - logot.assert_logged( - logged.warning( - f"Received DELETED-type event for {event_type} with resource ID '%s'", - ) - ) - - -@pytest.mark.parametrize( - "update, method_to_patch, event_schema_name", - [ - ( - TEST_FILE_DELETION_REQUESTED, - "delete_file", - "FileDeletionRequested", - ), - ( - TEST_FILE_UPLOAD_VALIDATION_SUCCESS, - "register_file", - "FileUploadValidationSuccess", - ), - ( - TEST_NONSTAGED_FILE_REQUESTED, - "stage_registered_file", - "NonStagedFileRequested", - ), - ], -) -@pytest.mark.asyncio() -async def test_idempotence_handler( - joint_fixture: JointFixture, - logot: Logot, - update: BaseModel, - method_to_patch: str, - event_schema_name: str, -): - """Test that the IdempotenceHandler handles events correctly. - - This tests the methods inside the file registry that are called by the outbox. - The registry methods are patched with a mock that can be inspected later for calls. - The expected behavior is that the method is called once, and then not called again, - because the record is already in the database. - """ - mock = AsyncMock() - - setattr(joint_fixture.file_registry, method_to_patch, mock) - - method_map: dict[str, Callable] = { - "FileDeletionRequested": joint_fixture.idempotence_handler.upsert_file_deletion_requested, - "FileUploadValidationSuccess": joint_fixture.idempotence_handler.upsert_file_upload_validation_success, - "NonStagedFileRequested": joint_fixture.idempotence_handler.upsert_nonstaged_file_requested, - } - - # Set which 'upsert_xyz' method to call on the idempotence handler - method_to_call = method_map[event_schema_name] - - # call idempotence handler method once, which should call the file registry method - await method_to_call(resource_id=TEST_FILE_ID, update=update) - - mock.assert_awaited_once() - mock.reset_mock() - - # call the method once more, which should emit a debug log and not hit the registry - await method_to_call(resource_id=TEST_FILE_ID, update=update) - - mock.assert_not_awaited() - - logot.assert_logged( - logged.debug( - f"Event with '{event_schema_name}' schema for resource ID '{TEST_FILE_ID}'" - + " has already been processed under current correlation_id. Skipping." - ) - )