From ca287ffdcfb83d20c9b537c73e746ae59468060c Mon Sep 17 00:00:00 2001 From: Sergey Trusov Date: Mon, 29 Apr 2024 10:20:21 +0200 Subject: [PATCH 1/2] feat: check-finished-ingest-item message dropped [AB#43320] --- .../payloads/media/media-service-asyncapi.yml | 14 - .../media/media-service-messaging-settings.ts | 7 - .../check-finish-ingest-item-command.json | 27 - .../schemas/payloads/media/commands/index.ts | 2 - .../check-finish-ingest-item-command.ts | 17 - .../types/payloads/media/commands/index.ts | 3 - ...xt-inbox-message-ingest-priority-added.sql | 112 +++ .../media/service/src/generated/db/schema.sql | 24 +- .../check-finish-ingest-document-handler.ts | 30 +- ...heck-finish-ingest-item-handler.db.spec.ts | 655 ------------------ .../check-finish-ingest-item-handler.ts | 73 -- .../handlers/image-already-existed-handler.ts | 8 +- .../ingest/handlers/image-created-handler.ts | 8 +- .../handlers/image-failed-handler.db.spec.ts | 116 ++++ .../handlers/image-failed-handler.spec.ts | 74 -- .../ingest/handlers/image-failed-handler.ts | 37 +- .../image-succeeded-handler.db.spec.ts | 59 +- .../handlers/image-succeeded-handler.ts | 47 +- .../service/src/ingest/handlers/index.ts | 1 - .../localize-entity-failed-handler.db.spec.ts | 76 +- .../localize-entity-failed-handler.ts | 33 +- ...ocalize-entity-finished-handler.db.spec.ts | 74 +- .../localize-entity-finished-handler.ts | 32 +- .../update-metadata-handler.db.spec.ts | 158 ++--- .../handlers/update-metadata-handler.ts | 45 +- ...on-source-entity-failed-handler.db.spec.ts | 32 +- ...calization-source-entity-failed-handler.ts | 31 +- ...-source-entity-finished-handler.db.spec.ts | 6 +- .../handlers/video-already-existed-handler.ts | 8 +- .../video-creation-started-handler.ts | 8 +- .../handlers/video-failed-handler.db.spec.ts | 116 ++++ .../handlers/video-failed-handler.spec.ts | 75 -- .../ingest/handlers/video-failed-handler.ts | 32 +- .../video-succeeded-handler.db.spec.ts | 39 +- .../handlers/video-succeeded-handler.ts | 44 +- .../src/messaging/register-messaging.ts | 39 +- .../src/tests/ingest/upload.db.spec.ts | 27 +- 37 files changed, 718 insertions(+), 1471 deletions(-) delete mode 100644 libs/media-messages/src/generated/schemas/payloads/media/commands/check-finish-ingest-item-command.json delete mode 100644 libs/media-messages/src/generated/types/payloads/media/commands/check-finish-ingest-item-command.ts create mode 100644 services/media/service/migrations/committed/000026-next-inbox-message-ingest-priority-added.sql delete mode 100644 services/media/service/src/ingest/handlers/check-finish-ingest-item-handler.db.spec.ts delete mode 100644 services/media/service/src/ingest/handlers/check-finish-ingest-item-handler.ts create mode 100644 services/media/service/src/ingest/handlers/image-failed-handler.db.spec.ts delete mode 100644 services/media/service/src/ingest/handlers/image-failed-handler.spec.ts create mode 100644 services/media/service/src/ingest/handlers/video-failed-handler.db.spec.ts delete mode 100644 services/media/service/src/ingest/handlers/video-failed-handler.spec.ts diff --git a/libs/media-messages/schemas/payloads/media/media-service-asyncapi.yml b/libs/media-messages/schemas/payloads/media/media-service-asyncapi.yml index 73ce95a5..5274c74f 100644 --- a/libs/media-messages/schemas/payloads/media/media-service-asyncapi.yml +++ b/libs/media-messages/schemas/payloads/media/media-service-asyncapi.yml @@ -57,14 +57,6 @@ channels: publish: message: $ref: '#/components/messages/update-metadata-command' - 'ingest.check_finish_item': - bindings: - amqp: - queue: - name: inbox - publish: - message: - $ref: '#/components/messages/check-finish-ingest-item-command' 'ingest.check_finish_document': bindings: amqp: @@ -85,12 +77,6 @@ channels: $ref: '#/components/messages/entity-deleted-event' components: messages: - check-finish-ingest-item-command: - tags: - - name: aggregate-type:ingest-item - contentType: application/json - payload: - $ref: 'commands/check-finish-ingest-item-command.json' check-finish-ingest-document-command: tags: - name: aggregate-type:ingest-document diff --git a/libs/media-messages/src/generated/config/payloads/media/media-service-messaging-settings.ts b/libs/media-messages/src/generated/config/payloads/media/media-service-messaging-settings.ts index ce142139..1823fb2e 100644 --- a/libs/media-messages/src/generated/config/payloads/media/media-service-messaging-settings.ts +++ b/libs/media-messages/src/generated/config/payloads/media/media-service-messaging-settings.ts @@ -43,13 +43,6 @@ export class MediaServiceMessagingSettings implements MessagingSettings { 'command', 'ingest-item' ); - public static CheckFinishIngestItem = new MediaServiceMessagingSettings( - 'CheckFinishIngestItem', - 'inbox', - 'ingest.check_finish_item', - 'command', - 'ingest-item' - ); public static CheckFinishIngestDocument = new MediaServiceMessagingSettings( 'CheckFinishIngestDocument', 'inbox', diff --git a/libs/media-messages/src/generated/schemas/payloads/media/commands/check-finish-ingest-item-command.json b/libs/media-messages/src/generated/schemas/payloads/media/commands/check-finish-ingest-item-command.json deleted file mode 100644 index d2ca25a4..00000000 --- a/libs/media-messages/src/generated/schemas/payloads/media/commands/check-finish-ingest-item-command.json +++ /dev/null @@ -1,27 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "title": "check_finish_ingest_item_command", - "description": "Check finish ingest item command schema.", - "additionalProperties": false, - "required": [ - "ingest_item_step_id", - "ingest_item_id" - ], - "properties": { - "ingest_item_step_id": { - "type": "string", - "minLength": 1, - "pattern": "^$|.*\\S.*", - "description": "A string with at least one character and not only whitespace characters." - }, - "ingest_item_id": { - "type": "integer", - "description": "Id of an ingest item stored in the database." - }, - "error_message": { - "type": "string", - "description": "Message that describes why a message for a specific ingest_item_step_id has failed." - } - } -} \ No newline at end of file diff --git a/libs/media-messages/src/generated/schemas/payloads/media/commands/index.ts b/libs/media-messages/src/generated/schemas/payloads/media/commands/index.ts index c2ff98a2..042b3537 100644 --- a/libs/media-messages/src/generated/schemas/payloads/media/commands/index.ts +++ b/libs/media-messages/src/generated/schemas/payloads/media/commands/index.ts @@ -1,5 +1,4 @@ import * as CheckFinishIngestDocumentCommand from './check-finish-ingest-document-command.json'; -import * as CheckFinishIngestItemCommand from './check-finish-ingest-item-command.json'; import * as DeleteEntityCommand from './delete-entity-command.json'; import * as PublishEntityCommand from './publish-entity-command.json'; import * as StartIngestCommand from './start-ingest-command.json'; @@ -8,7 +7,6 @@ import * as UnpublishEntityCommand from './unpublish-entity-command.json'; import * as UpdateMetadataCommand from './update-metadata-command.json'; export const CheckFinishIngestDocumentCommandSchema = CheckFinishIngestDocumentCommand; -export const CheckFinishIngestItemCommandSchema = CheckFinishIngestItemCommand; export const DeleteEntityCommandSchema = DeleteEntityCommand; export const PublishEntityCommandSchema = PublishEntityCommand; export const StartIngestCommandSchema = StartIngestCommand; diff --git a/libs/media-messages/src/generated/types/payloads/media/commands/check-finish-ingest-item-command.ts b/libs/media-messages/src/generated/types/payloads/media/commands/check-finish-ingest-item-command.ts deleted file mode 100644 index f5e82a07..00000000 --- a/libs/media-messages/src/generated/types/payloads/media/commands/check-finish-ingest-item-command.ts +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Check finish ingest item command schema. - */ -export interface CheckFinishIngestItemCommand { - /** - * A string with at least one character and not only whitespace characters. - */ - ingest_item_step_id: string; - /** - * Id of an ingest item stored in the database. - */ - ingest_item_id: number; - /** - * Message that describes why a message for a specific ingest_item_step_id has failed. - */ - error_message?: string; -} \ No newline at end of file diff --git a/libs/media-messages/src/generated/types/payloads/media/commands/index.ts b/libs/media-messages/src/generated/types/payloads/media/commands/index.ts index 55e0fccc..f663d6b3 100644 --- a/libs/media-messages/src/generated/types/payloads/media/commands/index.ts +++ b/libs/media-messages/src/generated/types/payloads/media/commands/index.ts @@ -1,5 +1,4 @@ export * from './check-finish-ingest-document-command'; -export * from './check-finish-ingest-item-command'; export * from './delete-entity-command'; export * from './publish-entity-command'; export * from './start-ingest-command'; @@ -9,7 +8,6 @@ export * from './update-metadata-command'; export enum MediaCommandsSchemas { CheckFinishIngestDocumentCommand = 'payloads/media/commands/check-finish-ingest-document-command.json', - CheckFinishIngestItemCommand = 'payloads/media/commands/check-finish-ingest-item-command.json', DeleteEntityCommand = 'payloads/media/commands/delete-entity-command.json', PublishEntityCommand = 'payloads/media/commands/publish-entity-command.json', StartIngestCommand = 'payloads/media/commands/start-ingest-command.json', @@ -20,7 +18,6 @@ export enum MediaCommandsSchemas { export enum MediaCommandsTypes { CheckFinishIngestDocumentCommand = 'CheckFinishIngestDocumentCommand', - CheckFinishIngestItemCommand = 'CheckFinishIngestItemCommand', DeleteEntityCommand = 'DeleteEntityCommand', PublishEntityCommand = 'PublishEntityCommand', StartIngestCommand = 'StartIngestCommand', diff --git a/services/media/service/migrations/committed/000026-next-inbox-message-ingest-priority-added.sql b/services/media/service/migrations/committed/000026-next-inbox-message-ingest-priority-added.sql new file mode 100644 index 00000000..9f1d9d66 --- /dev/null +++ b/services/media/service/migrations/committed/000026-next-inbox-message-ingest-priority-added.sql @@ -0,0 +1,112 @@ +--! Previous: sha1:9e58d9212f8204cc42f4e3b3afd0abe7d343bd12 +--! Hash: sha1:7f4ccc8d5137091cfb2747dd7f3dfb1bd3ac7117 +--! Message: next-inbox-message-ingest-priority-added + +-- Create the function to get the next batch of messages from the inbox table. +DROP FUNCTION IF EXISTS app_hidden.next_inbox_messages(integer, integer); +CREATE OR REPLACE FUNCTION app_hidden.next_inbox_messages( + max_size integer, lock_ms integer) + RETURNS SETOF app_hidden.inbox + LANGUAGE 'plpgsql' + +AS $BODY$ +DECLARE + loop_row app_hidden.inbox%ROWTYPE; + message_row app_hidden.inbox%ROWTYPE; + ids uuid[] := '{}'; +BEGIN + + IF max_size < 1 THEN + RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR'; + END IF; + + -- get 1 oldest not locked priority ingest message, either StartIngest or CheckFinishIngestDocument + BEGIN + SELECT * + INTO message_row + FROM app_hidden.inbox + WHERE processed_at IS NULL AND abandoned_at IS NULL AND aggregate_type = 'ingest-document' + ORDER BY created_at + LIMIT 1 + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + IF FOUND AND message_row.locked_until <= NOW() THEN + ids := array_append(ids, message_row.id); + END IF; + EXCEPTION + WHEN lock_not_available THEN + NULL; + WHEN serialization_failure THEN + NULL; + WHEN OTHERS THEN + RAISE; + END; + + -- get (only) the oldest message of every segment but only return it if it is not locked + FOR loop_row IN + SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id + FROM app_hidden.inbox + WHERE processed_at IS NULL AND abandoned_at IS NULL AND id NOT IN (SELECT UNNEST(ids)) + ORDER BY segment, created_at) order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.inbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + IF message_row.locked_until > NOW() THEN + CONTINUE; + END IF; + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + + -- if max_size not reached: get the oldest parallelizable message independent of segment + IF cardinality(ids) < max_size THEN + FOR loop_row IN + SELECT * FROM app_hidden.inbox + WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW() + AND id NOT IN (SELECT UNNEST(ids)) + order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.inbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + END IF; + + -- set a short lock value so the the workers can each process a message + IF cardinality(ids) > 0 THEN + + RETURN QUERY + UPDATE app_hidden.inbox + SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1 + WHERE ID = ANY(ids) + RETURNING *; + + END IF; +END; +$BODY$; diff --git a/services/media/service/src/generated/db/schema.sql b/services/media/service/src/generated/db/schema.sql index 4539d0c7..d8907a60 100644 --- a/services/media/service/src/generated/db/schema.sql +++ b/services/media/service/src/generated/db/schema.sql @@ -1537,11 +1537,33 @@ BEGIN RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR'; END IF; + -- get 1 oldest not locked priority ingest message, either StartIngest or CheckFinishIngestDocument + BEGIN + SELECT * + INTO message_row + FROM app_hidden.inbox + WHERE processed_at IS NULL AND abandoned_at IS NULL AND aggregate_type = 'ingest-document' + ORDER BY created_at + LIMIT 1 + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + IF FOUND AND message_row.locked_until <= NOW() THEN + ids := array_append(ids, message_row.id); + END IF; + EXCEPTION + WHEN lock_not_available THEN + NULL; + WHEN serialization_failure THEN + NULL; + WHEN OTHERS THEN + RAISE; + END; + -- get (only) the oldest message of every segment but only return it if it is not locked FOR loop_row IN SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id FROM app_hidden.inbox - WHERE processed_at IS NULL AND abandoned_at IS NULL + WHERE processed_at IS NULL AND abandoned_at IS NULL AND id NOT IN (SELECT UNNEST(ids)) ORDER BY segment, created_at) order by created_at LOOP BEGIN diff --git a/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.ts b/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.ts index d8cd2dae..dc974af3 100644 --- a/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.ts +++ b/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.ts @@ -43,10 +43,30 @@ export class CheckFinishIngestDocumentHandler extends MediaGuardedTransactionalI }: TypedTransactionalMessage, ownerClient: ClientBase, ): Promise { + const docId = param(ingest_document_id); + await sql`WITH updated AS ( + SELECT + iis.ingest_item_id, + CASE + WHEN BOOL_OR(iis.status = 'IN_PROGRESS') THEN NULL + WHEN BOOL_OR(iis.status = 'ERROR') THEN 'ERROR' + ELSE 'SUCCESS' + END as new_status + FROM app_public.ingest_item_steps iis + JOIN app_public.ingest_items ii ON iis.ingest_item_id = ii.id + WHERE ii.ingest_document_id = ${docId} AND ii.status = 'IN_PROGRESS' + GROUP BY iis.ingest_item_id + ) + UPDATE app_public.ingest_items item + SET status = updated.new_status + FROM updated + WHERE item.id = updated.ingest_item_id AND + updated.new_status IS NOT NULL;`.run(ownerClient); + const countGroups = await sql` SELECT status, COUNT (status) FROM app_public.ingest_items - WHERE ingest_document_id = ${param(ingest_document_id)} + WHERE ingest_document_id = ${docId} GROUP BY status; `.run(ownerClient); @@ -86,7 +106,13 @@ export class CheckFinishIngestDocumentHandler extends MediaGuardedTransactionalI seconds_without_progress = 0; } - if (seconds_without_progress >= 600) { + // At baseline, allow 5 minutes of inactivity for any ingest, no matter the + // reason (e.g. services starting up). + // Add additional minute of inactivity for every 250 items in the document. + const maxSecondsOfInactivity = + 600 + 60 * Math.round(updatedDoc.items_count / 250); + + if (seconds_without_progress >= maxSecondsOfInactivity) { const error = param({ message: 'The progress of ingest failed to change for a long period of time. Assuming an unexpected messaging issue and failing the document.', diff --git a/services/media/service/src/ingest/handlers/check-finish-ingest-item-handler.db.spec.ts b/services/media/service/src/ingest/handlers/check-finish-ingest-item-handler.db.spec.ts deleted file mode 100644 index 8be1f185..00000000 --- a/services/media/service/src/ingest/handlers/check-finish-ingest-item-handler.db.spec.ts +++ /dev/null @@ -1,655 +0,0 @@ -import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; -import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; -import { stub } from 'jest-auto-stub'; -import 'jest-extended'; -import { CheckFinishIngestItemCommand } from 'media-messages'; -import { v4 as uuid } from 'uuid'; -import { all, insert, select, selectExactlyOne } from 'zapatos/db'; -import { - ingest_documents, - ingest_items, - ingest_item_steps, -} from 'zapatos/schema'; -import { - createTestContext, - createTestUser, - ITestContext, -} from '../../tests/test-utils'; -import { CheckFinishIngestItemHandler } from './check-finish-ingest-item-handler'; - -describe('Check Finish Ingest Item Handler', () => { - let ctx: ITestContext; - let user: AuthenticatedManagementSubject; - let handler: CheckFinishIngestItemHandler; - let doc1: ingest_documents.JSONSelectable; - let item1: ingest_items.JSONSelectable; - let step1: ingest_item_steps.JSONSelectable; - - const createMessage = (payload: CheckFinishIngestItemCommand) => - stub>({ - payload, - }); - - beforeAll(async () => { - ctx = await createTestContext(); - user = createTestUser(ctx.config.serviceId); - handler = new CheckFinishIngestItemHandler(ctx.config); - }); - - beforeEach(async () => { - doc1 = await insert('ingest_documents', { - name: 'test1', - title: 'test1', - document: { - name: 'test1', - document_created: '2020-08-04T08:57:40.763+00:00', - items: [{ type: 'MOVIE', external_id: 'test', data: {} }], - }, - items_count: 1, - in_progress_count: 1, - }).run(ctx.ownerPool); - item1 = await insert('ingest_items', { - ingest_document_id: doc1.id, - external_id: 'externalId', - entity_id: 1, - type: 'MOVIE', - exists_status: 'CREATED', - display_title: 'title', - item: { - type: 'MOVIE', - external_id: 'test', - data: { title: 'title' }, - }, - }).run(ctx.ownerPool); - step1 = await insert('ingest_item_steps', { - id: uuid(), - ingest_item_id: item1.id, - sub_type: 'METADATA', - type: 'ENTITY', - }).run(ctx.ownerPool); - }); - - afterEach(async () => { - await ctx.truncate('ingest_documents'); - }); - - afterAll(async () => { - await ctx.dispose(); - jest.restoreAllMocks(); - }); - - describe('onMessage', () => { - it('message with matching ingest_item_step_id without error_message -> item and document success', async () => { - // Arrange - const payload: CheckFinishIngestItemCommand = { - ingest_item_id: item1.id, - ingest_item_step_id: step1.id, - }; - - // Act - await ctx.executeOwnerSql(user, async (dbCtx) => - handler.handleMessage(createMessage(payload), dbCtx), - ); - - // Assert - const steps = await select('ingest_item_steps', all).run(ctx.ownerPool); - const items = await select('ingest_items', all).run(ctx.ownerPool); - const docs = await select('ingest_documents', all).run(ctx.ownerPool); - - expect(steps).toHaveLength(1); - expect(items).toHaveLength(1); - expect(docs).toHaveLength(1); - - expect(steps[0].id).toEqual(step1.id); - expect(steps[0].status).toEqual('SUCCESS'); - expect(steps[0].response_message).toBeNull(); - - expect(items[0].status).toEqual('SUCCESS'); - expect(items[0].errors).toEqual([]); - - expect(items[0].status).toEqual('SUCCESS'); - expect(items[0].errors).toEqual([]); - - // Calculation done in separate handler - expect(docs[0].in_progress_count).toEqual(1); - expect(docs[0].error_count).toEqual(0); - expect(docs[0].success_count).toEqual(0); - expect(docs[0].status).toEqual('IN_PROGRESS'); - expect(docs[0].errors).toEqual([]); - }); - - it('message with matching ingest_item_step_id without error_message received 2 times -> item and document success, correct progress counts', async () => { - // Arrange - const payload: CheckFinishIngestItemCommand = { - ingest_item_id: item1.id, - ingest_item_step_id: step1.id, - }; - - // Act - await ctx.executeOwnerSql(user, async (dbCtx) => - Promise.all([ - handler.handleMessage(createMessage(payload), dbCtx), - handler.handleMessage(createMessage(payload), dbCtx), - ]), - ); - - // Assert - const steps = await select('ingest_item_steps', all).run(ctx.ownerPool); - const items = await select('ingest_items', all).run(ctx.ownerPool); - const docs = await select('ingest_documents', all).run(ctx.ownerPool); - - expect(steps).toHaveLength(1); - expect(items).toHaveLength(1); - expect(docs).toHaveLength(1); - - expect(steps[0].id).toEqual(step1.id); - expect(steps[0].status).toEqual('SUCCESS'); - expect(steps[0].response_message).toBeNull(); - - expect(items[0].status).toEqual('SUCCESS'); - expect(items[0].errors).toEqual([]); - - // Calculation done in separate handler - expect(docs[0].in_progress_count).toEqual(1); - expect(docs[0].error_count).toEqual(0); - expect(docs[0].success_count).toEqual(0); - expect(docs[0].status).toEqual('IN_PROGRESS'); - expect(docs[0].errors).toEqual([]); - }); - - it('single message with error_message -> item and document error', async () => { - // Arrange - const payload: CheckFinishIngestItemCommand = { - ingest_item_id: item1.id, - ingest_item_step_id: step1.id, - error_message: 'Test error message.', - }; - - // Act - await ctx.executeOwnerSql(user, async (dbCtx) => - handler.handleMessage(createMessage(payload), dbCtx), - ); - - // Assert - const steps = await select('ingest_item_steps', all).run(ctx.ownerPool); - const items = await select('ingest_items', all).run(ctx.ownerPool); - const docs = await select('ingest_documents', all).run(ctx.ownerPool); - - expect(steps).toHaveLength(1); - expect(items).toHaveLength(1); - expect(docs).toHaveLength(1); - - expect(steps[0].id).toEqual(step1.id); - expect(steps[0].status).toEqual('ERROR'); - expect(steps[0].response_message).toEqual(payload.error_message); - - expect(items[0].status).toEqual('ERROR'); - expect(items[0].errors).toEqual([]); - - // Calculation done in separate handler - expect(docs[0].in_progress_count).toEqual(1); - expect(docs[0].error_count).toEqual(0); - expect(docs[0].success_count).toEqual(0); - expect(docs[0].status).toEqual('IN_PROGRESS'); - expect(docs[0].errors).toEqual([]); - }); - - it('first message with matching ingest_item_step_id and 2 sent messages -> item and document in_progress', async () => { - // Arrange - const doc = await insert('ingest_documents', { - name: 'test1', - title: 'test1', - document: { - name: 'test1', - document_created: '2020-08-04T08:57:40.763+00:00', - items: [{ type: 'MOVIE', external_id: 'test', data: {} }], - }, - items_count: 1, - in_progress_count: 1, - }).run(ctx.ownerPool); - const item = await insert('ingest_items', { - ingest_document_id: doc.id, - external_id: 'externalId', - entity_id: 1, - type: 'MOVIE', - exists_status: 'CREATED', - display_title: 'title', - item: { - type: 'MOVIE', - external_id: 'test', - data: { title: 'title' }, - }, - }).run(ctx.ownerPool); - - const metadataId = uuid(); - const videoId = uuid(); - await insert('ingest_item_steps', [ - { - ingest_item_id: item.id, - sub_type: 'METADATA', - type: 'ENTITY', - id: metadataId, - }, - { - ingest_item_id: item.id, - sub_type: 'MAIN', - type: 'VIDEO', - id: videoId, - }, - ]).run(ctx.ownerPool); - - const payload: CheckFinishIngestItemCommand = { - ingest_item_id: item.id, - ingest_item_step_id: metadataId, - }; - - // Act - await ctx.executeOwnerSql(user, async (dbCtx) => - handler.handleMessage(createMessage(payload), dbCtx), - ); - - // Assert - const steps = await select( - 'ingest_item_steps', - { ingest_item_id: item.id }, - { columns: ['status', 'response_message', 'id'] }, - ).run(ctx.ownerPool); - const updatedItem = await selectExactlyOne('ingest_items', { - id: item.id, - }).run(ctx.ownerPool); - const notUpdatedDoc = await selectExactlyOne('ingest_documents', { - id: doc.id, - }).run(ctx.ownerPool); - - expect(steps).toIncludeSameMembers([ - { - response_message: null, - status: 'IN_PROGRESS', - id: videoId, - }, - { - response_message: null, - status: 'SUCCESS', - id: metadataId, - }, - ]); - - expect(updatedItem.status).toEqual('IN_PROGRESS'); - expect(updatedItem.errors).toEqual([]); - - // Calculation done in separate handler - expect(notUpdatedDoc.in_progress_count).toEqual(1); - expect(notUpdatedDoc.error_count).toEqual(0); - expect(notUpdatedDoc.success_count).toEqual(0); - expect(notUpdatedDoc.status).toEqual('IN_PROGRESS'); - expect(notUpdatedDoc.errors).toEqual([]); - }); - - it('Single item in document, first error received -> ingest item is still in progress', async () => { - // Arrange - const doc = await insert('ingest_documents', { - name: 'test1', - title: 'test1', - document: { - name: 'test1', - document_created: '2020-08-04T08:57:40.763+00:00', - items: [{ type: 'MOVIE', external_id: 'test', data: {} }], - }, - items_count: 1, - in_progress_count: 1, - }).run(ctx.ownerPool); - const item = await insert('ingest_items', { - ingest_document_id: doc.id, - external_id: 'externalId', - entity_id: 1, - type: 'MOVIE', - exists_status: 'CREATED', - display_title: 'title', - item: { - type: 'MOVIE', - external_id: 'test', - data: { title: 'title' }, - }, - }).run(ctx.ownerPool); - - const metadataId = uuid(); - const videoId = uuid(); - const trailerId1 = uuid(); - const trailerId2 = uuid(); - await insert('ingest_item_steps', [ - { - ingest_item_id: item.id, - status: 'SUCCESS', - sub_type: 'METADATA', - type: 'ENTITY', - id: metadataId, - }, - { - ingest_item_id: item.id, - sub_type: 'MAIN', - type: 'VIDEO', - id: videoId, - }, - { - ingest_item_id: item.id, - sub_type: 'MAIN', - type: 'VIDEO', - id: trailerId1, - }, - { - ingest_item_id: item.id, - sub_type: 'MAIN', - type: 'VIDEO', - id: trailerId2, - }, - ]).run(ctx.ownerPool); - - // Act - await ctx.executeOwnerSql(user, async (dbCtx) => - handler.handleMessage( - createMessage({ - ingest_item_id: item.id, - ingest_item_step_id: videoId, - error_message: - 'Unexpected error occurred while ensuring that video exists.', - }), - dbCtx, - ), - ); - - // Assert - const steps = await select( - 'ingest_item_steps', - { ingest_item_id: item.id }, - { columns: ['id', 'status', 'response_message'] }, - ).run(ctx.ownerPool); - const updatedItem = await selectExactlyOne('ingest_items', { - id: item.id, - }).run(ctx.ownerPool); - const notUpdatedDoc = await selectExactlyOne('ingest_documents', { - id: doc.id, - }).run(ctx.ownerPool); - - expect(steps).toIncludeSameMembers([ - { - id: metadataId, - response_message: null, - status: 'SUCCESS', - }, - { - id: trailerId1, - response_message: null, - status: 'IN_PROGRESS', - }, - { - id: trailerId2, - response_message: null, - status: 'IN_PROGRESS', - }, - { - id: videoId, - response_message: - 'Unexpected error occurred while ensuring that video exists.', - status: 'ERROR', - }, - ]); - - expect(updatedItem.status).toEqual('IN_PROGRESS'); - expect(updatedItem.errors).toEqual([]); - - // Calculation done in separate handler - expect(notUpdatedDoc.in_progress_count).toEqual(1); - expect(notUpdatedDoc.error_count).toEqual(0); - expect(notUpdatedDoc.success_count).toEqual(0); - expect(notUpdatedDoc.status).toEqual('IN_PROGRESS'); - expect(notUpdatedDoc.errors).toEqual([]); - }); - - it('Single item in document, multiple error messages received -> document has correct in_progress count and errors count', async () => { - // Arrange - const doc = await insert('ingest_documents', { - name: 'test1', - title: 'test1', - document: { - name: 'test1', - document_created: '2020-08-04T08:57:40.763+00:00', - items: [{ type: 'MOVIE', external_id: 'test', data: {} }], - }, - items_count: 1, - in_progress_count: 1, - }).run(ctx.ownerPool); - const item = await insert('ingest_items', { - ingest_document_id: doc.id, - external_id: 'externalId', - entity_id: 1, - type: 'MOVIE', - exists_status: 'CREATED', - display_title: 'title', - item: { - type: 'MOVIE', - external_id: 'test', - data: { title: 'title' }, - }, - }).run(ctx.ownerPool); - - const metadataId = uuid(); - const videoId = uuid(); - const trailerId1 = uuid(); - const trailerId2 = uuid(); - await insert('ingest_item_steps', [ - { - id: metadataId, - ingest_item_id: item.id, - status: 'SUCCESS', - sub_type: 'METADATA', - type: 'ENTITY', - }, - { - id: videoId, - ingest_item_id: item.id, - sub_type: 'MAIN', - type: 'VIDEO', - }, - { - id: trailerId1, - ingest_item_id: item.id, - sub_type: 'MAIN', - type: 'VIDEO', - }, - { - id: trailerId2, - ingest_item_id: item.id, - sub_type: 'MAIN', - type: 'VIDEO', - }, - ]).run(ctx.ownerPool); - - // Act - await ctx.executeOwnerSql(user, async (dbCtx) => { - await handler.handleMessage( - createMessage({ - ingest_item_id: item.id, - ingest_item_step_id: videoId, - error_message: `An unexpected error occurred while trying to ensure that video exists 1.`, - }), - dbCtx, - ); - await handler.handleMessage( - createMessage({ - ingest_item_id: item.id, - ingest_item_step_id: trailerId1, - error_message: `An unexpected error occurred while trying to ensure that video exists 2.`, - }), - dbCtx, - ); - await handler.handleMessage( - createMessage({ - ingest_item_id: item.id, - ingest_item_step_id: trailerId2, - error_message: `An unexpected error occurred while trying to ensure that video exists 3.`, - }), - dbCtx, - ); - }); - - // Assert - const steps = await select( - 'ingest_item_steps', - { ingest_item_id: item.id }, - { columns: ['id', 'status', 'response_message'] }, - ).run(ctx.ownerPool); - const updatedItem = await selectExactlyOne('ingest_items', { - id: item.id, - }).run(ctx.ownerPool); - const notUpdatedDoc = await selectExactlyOne('ingest_documents', { - id: doc.id, - }).run(ctx.ownerPool); - - expect(steps).toIncludeSameMembers([ - { - id: metadataId, - response_message: null, - status: 'SUCCESS', - }, - { - id: trailerId1, - response_message: - 'An unexpected error occurred while trying to ensure that video exists 2.', - status: 'ERROR', - }, - { - id: trailerId2, - response_message: - 'An unexpected error occurred while trying to ensure that video exists 3.', - status: 'ERROR', - }, - { - id: videoId, - response_message: - 'An unexpected error occurred while trying to ensure that video exists 1.', - status: 'ERROR', - }, - ]); - expect(updatedItem.status).toEqual('ERROR'); - expect(updatedItem.errors).toEqual([]); - - // Calculation done in separate handler - expect(notUpdatedDoc.in_progress_count).toEqual(1); - expect(notUpdatedDoc.error_count).toEqual(0); - expect(notUpdatedDoc.success_count).toEqual(0); - expect(notUpdatedDoc.status).toEqual('IN_PROGRESS'); - expect(notUpdatedDoc.errors).toEqual([]); - }); - - it('message for two failed items -> item errors', async () => { - // Arrange - const doc2 = await insert('ingest_documents', { - name: 'test2', - title: 'test2', - document: { - name: 'test2', - document_created: '2020-08-04T08:57:40.763+00:00', - items: [ - { external_id: 'test2', type: 'MOVIE', data: {} }, - { external_id: 'test3', type: 'MOVIE', data: {} }, - ], - }, - items_count: 2, - in_progress_count: 2, - }).run(ctx.ownerPool); - const item2 = await insert('ingest_items', { - ingest_document_id: doc2.id, - external_id: 'test2', - entity_id: 2, - type: 'MOVIE', - exists_status: 'CREATED', - display_title: 'title', - item: { - type: 'MOVIE', - external_id: 'test2', - data: { title: 'title' }, - }, - }).run(ctx.ownerPool); - const item3 = await insert('ingest_items', { - ingest_document_id: doc2.id, - external_id: 'test3', - entity_id: 3, - type: 'MOVIE', - exists_status: 'CREATED', - display_title: 'title', - item: { - type: 'MOVIE', - external_id: 'test3', - data: { title: 'title' }, - }, - }).run(ctx.ownerPool); - - const metadataId1 = uuid(); - const metadataId2 = uuid(); - await insert('ingest_item_steps', [ - { - id: metadataId1, - ingest_item_id: item2.id, - sub_type: 'METADATA', - type: 'ENTITY', - }, - { - id: metadataId2, - ingest_item_id: item3.id, - sub_type: 'METADATA', - type: 'ENTITY', - }, - ]).run(ctx.ownerPool); - - const payload1: CheckFinishIngestItemCommand = { - ingest_item_id: item2.id, - ingest_item_step_id: metadataId1, - error_message: 'Test error message 1', - }; - - const payload2: CheckFinishIngestItemCommand = { - ingest_item_id: item3.id, - ingest_item_step_id: metadataId2, - error_message: 'Test error message 2', - }; - - // Act - await ctx.executeOwnerSql(user, async (dbCtx) => { - await handler.handleMessage(createMessage(payload1), dbCtx); - await handler.handleMessage(createMessage(payload2), dbCtx); - }); - - // Assert - const step2 = await selectExactlyOne('ingest_item_steps', { - ingest_item_id: item2.id, - }).run(ctx.ownerPool); - const step3 = await selectExactlyOne('ingest_item_steps', { - ingest_item_id: item3.id, - }).run(ctx.ownerPool); - const items = await select('ingest_items', all).run(ctx.ownerPool); - const docs = await select('ingest_documents', all).run(ctx.ownerPool); - - expect(items).toHaveLength(3); - expect(docs).toHaveLength(2); - - expect(step2.status).toEqual('ERROR'); - expect(step2.response_message).toBe(payload1.error_message); - - expect(items[1].status).toEqual('ERROR'); - expect(items[1].errors).toEqual([]); - - expect(step3.status).toEqual('ERROR'); - expect(step3.response_message).toBe(payload2.error_message); - - expect(items[2].status).toEqual('ERROR'); - expect(items[2].errors).toEqual([]); - - // Calculation done in separate handler - expect(docs[1].in_progress_count).toEqual(2); - expect(docs[1].error_count).toEqual(0); - expect(docs[1].success_count).toEqual(0); - expect(docs[1].status).toEqual('IN_PROGRESS'); - expect(docs[1].errors).toEqual([]); - }); - }); -}); diff --git a/services/media/service/src/ingest/handlers/check-finish-ingest-item-handler.ts b/services/media/service/src/ingest/handlers/check-finish-ingest-item-handler.ts deleted file mode 100644 index 8ebb4373..00000000 --- a/services/media/service/src/ingest/handlers/check-finish-ingest-item-handler.ts +++ /dev/null @@ -1,73 +0,0 @@ -import { Logger } from '@axinom/mosaic-service-common'; -import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; -import { - CheckFinishIngestItemCommand, - MediaServiceMessagingSettings, -} from 'media-messages'; -import { ClientBase } from 'pg'; -import { select, update } from 'zapatos/db'; -import { Config } from '../../common'; -import { MediaTransactionalInboxMessageHandler } from '../../messaging'; - -export class CheckFinishIngestItemHandler extends MediaTransactionalInboxMessageHandler { - constructor(config: Config) { - super( - MediaServiceMessagingSettings.CheckFinishIngestItem, - new Logger({ - config, - context: CheckFinishIngestItemHandler.name, - }), - config, - ); - } - - async handleMessage( - { - payload: { ingest_item_id, ingest_item_step_id, error_message }, - }: TypedTransactionalMessage, - ownerClient: ClientBase, - ): Promise { - const updated = await update( - 'ingest_item_steps', - { - status: error_message ? 'ERROR' : 'SUCCESS', - response_message: error_message, - }, - { id: ingest_item_step_id, status: 'IN_PROGRESS' }, - ).run(ownerClient); - - if (updated.length === 0) { - this.logger.debug({ - message: 'Ingest item step is already processes. Skipping.', - details: { step_id: ingest_item_step_id }, - }); - return; - } - - const steps = await select( - 'ingest_item_steps', - { ingest_item_id: ingest_item_id }, - { columns: ['status', 'id'] }, - ).run(ownerClient); - - const inProgressSteps = steps.filter((r) => r.status === 'IN_PROGRESS'); - if (inProgressSteps.length > 0) { - this.logger.debug({ - message: 'Ingest item steps still in progress - rechecking.', - details: { - step_id: ingest_item_step_id, - in_progress_steps: inProgressSteps, - }, - }); - return; - } - - await update( - 'ingest_items', - { - status: steps.some((r) => r.status === 'ERROR') ? 'ERROR' : 'SUCCESS', - }, - { id: ingest_item_id }, - ).run(ownerClient); - } -} diff --git a/services/media/service/src/ingest/handlers/image-already-existed-handler.ts b/services/media/service/src/ingest/handlers/image-already-existed-handler.ts index 01432854..964d2556 100644 --- a/services/media/service/src/ingest/handlers/image-already-existed-handler.ts +++ b/services/media/service/src/ingest/handlers/image-already-existed-handler.ts @@ -2,21 +2,15 @@ import { EnsureImageExistsAlreadyExistedEvent, ImageServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; -import { StoreInboxMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { Config } from '../../common'; import { IngestEntityProcessor } from '../models'; import { ImageSucceededHandler } from './image-succeeded-handler'; export class ImageAlreadyExistedHandler extends ImageSucceededHandler { - constructor( - entityProcessors: IngestEntityProcessor[], - storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(entityProcessors: IngestEntityProcessor[], config: Config) { super( entityProcessors, ImageServiceMultiTenantMessagingSettings.EnsureImageExistsAlreadyExisted, - storeInboxMessage, config, ); } diff --git a/services/media/service/src/ingest/handlers/image-created-handler.ts b/services/media/service/src/ingest/handlers/image-created-handler.ts index 631c01c2..847adfa6 100644 --- a/services/media/service/src/ingest/handlers/image-created-handler.ts +++ b/services/media/service/src/ingest/handlers/image-created-handler.ts @@ -2,21 +2,15 @@ import { EnsureImageExistsImageCreatedEvent, ImageServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; -import { StoreInboxMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { Config } from '../../common'; import { IngestEntityProcessor } from '../models'; import { ImageSucceededHandler } from './image-succeeded-handler'; export class ImageCreatedHandler extends ImageSucceededHandler { - constructor( - entityProcessors: IngestEntityProcessor[], - storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(entityProcessors: IngestEntityProcessor[], config: Config) { super( entityProcessors, ImageServiceMultiTenantMessagingSettings.EnsureImageExistsImageCreated, - storeInboxMessage, config, ); } diff --git a/services/media/service/src/ingest/handlers/image-failed-handler.db.spec.ts b/services/media/service/src/ingest/handlers/image-failed-handler.db.spec.ts new file mode 100644 index 00000000..d0ac3da7 --- /dev/null +++ b/services/media/service/src/ingest/handlers/image-failed-handler.db.spec.ts @@ -0,0 +1,116 @@ +import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; +import { EnsureImageExistsFailedEvent } from '@axinom/mosaic-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { stub } from 'jest-auto-stub'; +import 'jest-extended'; +import { randomUUID } from 'node:crypto'; +import { insert, selectOne } from 'zapatos/db'; +import { + ingest_documents, + ingest_items, + ingest_item_steps, +} from 'zapatos/schema'; +import { + createTestContext, + createTestUser, + ITestContext, +} from '../../tests/test-utils'; +import { ImageFailedHandler } from './image-failed-handler'; + +describe('ImageFailedHandler', () => { + let handler: ImageFailedHandler; + let ctx: ITestContext; + let step1: ingest_item_steps.JSONSelectable; + let item1: ingest_items.JSONSelectable; + let doc1: ingest_documents.JSONSelectable; + let user: AuthenticatedManagementSubject; + + const createMessage = ( + payload: EnsureImageExistsFailedEvent, + messageContext: unknown, + ) => + stub>({ + payload, + metadata: { + messageContext, + }, + }); + + beforeAll(async () => { + ctx = await createTestContext(); + user = createTestUser(ctx.config.serviceId); + handler = new ImageFailedHandler(ctx.config); + }); + + beforeEach(async () => { + doc1 = await insert('ingest_documents', { + name: 'test1', + title: 'test1', + document: { + name: 'test1', + document_created: '2020-08-04T08:57:40.763+00:00', + items: [], + }, + items_count: 0, + }).run(ctx.ownerPool); + item1 = await insert('ingest_items', { + ingest_document_id: doc1.id, + external_id: 'externalId', + entity_id: 1, + type: 'MOVIE', + exists_status: 'CREATED', + display_title: 'title', + item: { + type: 'MOVIE', + external_id: 'externalId', + data: { + title: 'title', + trailers: [{ source: 'test', profile: 'DEFAULT' }], + }, + }, + }).run(ctx.ownerPool); + step1 = await insert('ingest_item_steps', { + id: randomUUID(), + type: 'IMAGE', + ingest_item_id: item1.id, + sub_type: 'COVER', + }).run(ctx.ownerPool); + }); + + afterEach(async () => { + await ctx.truncate('ingest_documents'); + }); + + afterAll(async () => { + await ctx.dispose(); + jest.restoreAllMocks(); + }); + + describe('onMessage', () => { + it('message received -> message with error ingestItemStepId sent', async () => { + // Arrange + const payload: EnsureImageExistsFailedEvent = { + message: 'Test error message', + image_location: 'Test', + image_type: 'movie_cover', + }; + const context = { + ingestItemStepId: step1.id, + ingestItemId: item1.id, + imageType: 'MAIN', + }; + + // Act + await ctx.executeOwnerSql(user, async (dbCtx) => + handler.handleMessage(createMessage(payload, context), dbCtx), + ); + + // Assert + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.response_message).toEqual('Test error message'); + expect(step?.status).toEqual('ERROR'); + }); + }); +}); diff --git a/services/media/service/src/ingest/handlers/image-failed-handler.spec.ts b/services/media/service/src/ingest/handlers/image-failed-handler.spec.ts deleted file mode 100644 index 9efeb83a..00000000 --- a/services/media/service/src/ingest/handlers/image-failed-handler.spec.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { EnsureImageExistsFailedEvent } from '@axinom/mosaic-messages'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { stub } from 'jest-auto-stub'; -import 'jest-extended'; -import { CheckFinishIngestItemCommand } from 'media-messages'; -import { ClientBase } from 'pg'; -import { createTestConfig } from '../../tests/test-utils'; -import { ImageFailedHandler } from './image-failed-handler'; - -describe('ImageFailedHandler', () => { - let handler: ImageFailedHandler; - let messages: CheckFinishIngestItemCommand[] = []; - - const createMessage = ( - payload: EnsureImageExistsFailedEvent, - messageContext: unknown, - ) => - stub>({ - payload, - metadata: { - messageContext, - }, - }); - - beforeAll(async () => { - const storeInboxMessage: StoreInboxMessage = jest.fn( - async (_aggregateId, _messagingSettings, message) => { - messages.push(message as CheckFinishIngestItemCommand); - }, - ); - handler = new ImageFailedHandler(storeInboxMessage, createTestConfig()); - }); - - afterEach(async () => { - messages = []; - }); - - afterAll(async () => { - jest.restoreAllMocks(); - }); - - describe('onMessage', () => { - it('message received -> message with error ingestItemStepId sent', async () => { - // Arrange - const payload: EnsureImageExistsFailedEvent = { - message: 'Test error message', - image_location: 'Test', - image_type: 'movie_cover', - }; - const context = { - ingestItemStepId: '34d91ea5-db63-4e51-b511-ae545d5c669c', - ingestItemId: 1, - imageType: 'MAIN', - }; - - // Act - await handler.handleMessage( - createMessage(payload, context), - stub(), - ); - - // Assert - expect(messages).toHaveLength(1); - expect(messages[0]).toEqual({ - ingest_item_step_id: '34d91ea5-db63-4e51-b511-ae545d5c669c', - ingest_item_id: 1, - error_message: 'Test error message', - }); - }); - }); -}); diff --git a/services/media/service/src/ingest/handlers/image-failed-handler.ts b/services/media/service/src/ingest/handlers/image-failed-handler.ts index 1c4b60df..7dbfa209 100644 --- a/services/media/service/src/ingest/handlers/image-failed-handler.ts +++ b/services/media/service/src/ingest/handlers/image-failed-handler.ts @@ -3,26 +3,16 @@ import { ImageServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; import { Logger } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { - CheckFinishIngestItemCommand, - ImageMessageContext, - MediaServiceMessagingSettings, -} from 'media-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { ImageMessageContext } from 'media-messages'; import { ClientBase } from 'pg'; +import { update } from 'zapatos/db'; import { Config } from '../../common'; import { MediaGuardedTransactionalInboxMessageHandler } from '../../messaging'; -import { getFutureIsoDateInMilliseconds } from '../utils'; import { checkIsIngestEvent } from '../utils/check-is-ingest-event'; export class ImageFailedHandler extends MediaGuardedTransactionalInboxMessageHandler { - constructor( - private readonly storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(config: Config) { super( ImageServiceMultiTenantMessagingSettings.EnsureImageExistsFailed, ['INGESTS_EDIT', 'ADMIN'], @@ -47,20 +37,13 @@ export class ImageFailedHandler extends MediaGuardedTransactionalInboxMessageHan return; } const messageContext = metadata.messageContext as ImageMessageContext; - - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, + await update( + 'ingest_item_steps', { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - error_message: payload.message, + status: 'ERROR', + response_message: payload.message, }, - ownerClient, - { - metadata: { authToken: metadata.authToken }, - lockedUntil: getFutureIsoDateInMilliseconds(1_000), - }, - ); + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } } diff --git a/services/media/service/src/ingest/handlers/image-succeeded-handler.db.spec.ts b/services/media/service/src/ingest/handlers/image-succeeded-handler.db.spec.ts index 9e43fb6d..6a1446a2 100644 --- a/services/media/service/src/ingest/handlers/image-succeeded-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/image-succeeded-handler.db.spec.ts @@ -1,19 +1,14 @@ import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; import { EnsureImageExistsImageCreatedEvent } from '@axinom/mosaic-messages'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { stub } from 'jest-auto-stub'; import 'jest-extended'; -import { CheckFinishIngestItemCommand } from 'media-messages'; import { v4 as uuid } from 'uuid'; -import { all, insert, select, selectOne } from 'zapatos/db'; +import { insert, selectOne } from 'zapatos/db'; import { ingest_documents, ingest_items, ingest_item_steps, - movies, } from 'zapatos/schema'; import { MockIngestProcessor } from '../../tests/ingest/mock-ingest-processor'; import { @@ -31,8 +26,6 @@ describe('ImageSucceededHandler', () => { let step1: ingest_item_steps.JSONSelectable; let item1: ingest_items.JSONSelectable; let doc1: ingest_documents.JSONSelectable; - let movie1: movies.JSONSelectable; - let messages: CheckFinishIngestItemCommand[] = []; const createMessage = ( payload: EnsureImageExistsImageCreatedEvent, @@ -47,17 +40,8 @@ describe('ImageSucceededHandler', () => { beforeAll(async () => { ctx = await createTestContext(); - const storeInboxMessage: StoreInboxMessage = jest.fn( - async (_aggregateId, _messagingSettings, message) => { - messages.push(message as CheckFinishIngestItemCommand); - }, - ); user = createTestUser(ctx.config.serviceId); - handler = new ImageCreatedHandler( - [new MockIngestProcessor()], - storeInboxMessage, - ctx.config, - ); + handler = new ImageCreatedHandler([new MockIngestProcessor()], ctx.config); }); beforeEach(async () => { @@ -71,14 +55,10 @@ describe('ImageSucceededHandler', () => { }, items_count: 0, }).run(ctx.ownerPool); - movie1 = await insert('movies', { - title: 'Entity1', - external_id: 'existing1', - }).run(ctx.ownerPool); item1 = await insert('ingest_items', { ingest_document_id: doc1.id, external_id: 'externalId', - entity_id: movie1.id, + entity_id: 1, type: 'MOVIE', exists_status: 'CREATED', display_title: 'title', @@ -101,8 +81,6 @@ describe('ImageSucceededHandler', () => { afterEach(async () => { await ctx.truncate('ingest_documents'); - await ctx.truncate('movies'); - messages = []; }); afterAll(async () => { @@ -133,13 +111,7 @@ describe('ImageSucceededHandler', () => { }).run(ctx.ownerPool); expect(step?.entity_id).toEqual(payload.image_id); - - expect(messages).toEqual([ - { - ingest_item_step_id: step1.id, - ingest_item_id: item1.id, - }, - ]); + expect(step?.status).toEqual('SUCCESS'); }); }); @@ -150,7 +122,7 @@ describe('ImageSucceededHandler', () => { image_id: '11e1d903-49ed-4d70-8b24-90d0824741d0', }; const context = { - ingestItemStepId: '34d91ea5-db63-4e51-b511-ae545d5c669c', + ingestItemStepId: step1.id, ingestItemId: item1.id, imageType: 'COVER', }; @@ -166,18 +138,13 @@ describe('ImageSucceededHandler', () => { ); // Assert - const movies = await select('movies', all).run(ctx.ownerPool); - - expect(movies).toHaveLength(1); - expect(movies[0]).toEqual(movie1); - - expect(messages).toHaveLength(1); - expect(messages[0]).toEqual({ - ingest_item_step_id: '34d91ea5-db63-4e51-b511-ae545d5c669c', - ingest_item_id: item1.id, - error_message: - 'An unexpected error occurred while trying to update image relations.', - }); + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.response_message).toEqual( + 'An unexpected error occurred while trying to update image relations.', + ); + expect(step?.status).toEqual('ERROR'); }); }); }); diff --git a/services/media/service/src/ingest/handlers/image-succeeded-handler.ts b/services/media/service/src/ingest/handlers/image-succeeded-handler.ts index 0e20bee0..a8ef026a 100644 --- a/services/media/service/src/ingest/handlers/image-succeeded-handler.ts +++ b/services/media/service/src/ingest/handlers/image-succeeded-handler.ts @@ -4,21 +4,13 @@ import { EnsureImageExistsImageCreatedEvent, } from '@axinom/mosaic-messages'; import { Logger, MosaicError } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { - CheckFinishIngestItemCommand, - ImageMessageContext, - MediaServiceMessagingSettings, -} from 'media-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { ImageMessageContext } from 'media-messages'; import { ClientBase } from 'pg'; import { selectExactlyOne, update } from 'zapatos/db'; import { CommonErrors, Config } from '../../common'; import { MediaGuardedTransactionalInboxMessageHandler } from '../../messaging'; import { IngestEntityProcessor } from '../models'; -import { getFutureIsoDateInMilliseconds } from '../utils'; import { checkIsIngestEvent } from '../utils/check-is-ingest-event'; import { getIngestErrorMessage } from '../utils/ingest-validation'; @@ -30,7 +22,6 @@ export abstract class ImageSucceededHandler< constructor( private entityProcessors: IngestEntityProcessor[], messagingSettings: MessagingSettings, - private storeInboxMessage: StoreInboxMessage, config: Config, ) { super( @@ -76,23 +67,12 @@ export abstract class ImageSucceededHandler< await update( 'ingest_item_steps', - { entity_id: payload.image_id }, - { id: messageContext.ingestItemStepId }, - ).run(ownerClient); - - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, + status: 'SUCCESS', + entity_id: payload.image_id, }, - ownerClient, - { - metadata: { authToken: metadata.authToken }, - lockedUntil: getFutureIsoDateInMilliseconds(1_000), - }, - ); + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } override async handleErrorMessage( @@ -106,19 +86,16 @@ export abstract class ImageSucceededHandler< } const messageContext = metadata.messageContext as ImageMessageContext; - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, + await update( + 'ingest_item_steps', { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - error_message: getIngestErrorMessage( + status: 'ERROR', + response_message: getIngestErrorMessage( error, 'An unexpected error occurred while trying to update image relations.', ), }, - ownerClient, - { metadata: { authToken: metadata.authToken } }, - ); + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } } diff --git a/services/media/service/src/ingest/handlers/index.ts b/services/media/service/src/ingest/handlers/index.ts index f0c6f5b4..4b4fa8b4 100644 --- a/services/media/service/src/ingest/handlers/index.ts +++ b/services/media/service/src/ingest/handlers/index.ts @@ -1,5 +1,4 @@ export * from './check-finish-ingest-document-handler'; -export * from './check-finish-ingest-item-handler'; export * from './image-already-existed-handler'; export * from './image-created-handler'; export * from './image-failed-handler'; diff --git a/services/media/service/src/ingest/handlers/localize-entity-failed-handler.db.spec.ts b/services/media/service/src/ingest/handlers/localize-entity-failed-handler.db.spec.ts index 5ab4b789..046906f7 100644 --- a/services/media/service/src/ingest/handlers/localize-entity-failed-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/localize-entity-failed-handler.db.spec.ts @@ -1,12 +1,15 @@ import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; import { LocalizeEntityFailedEvent } from '@axinom/mosaic-messages'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { stub } from 'jest-auto-stub'; import 'jest-extended'; -import { CheckFinishIngestItemCommand } from 'media-messages'; +import { randomUUID } from 'node:crypto'; +import { insert, selectOne } from 'zapatos/db'; +import { + ingest_documents, + ingest_items, + ingest_item_steps, +} from 'zapatos/schema'; import { createTestContext, createTestUser, @@ -17,8 +20,10 @@ import { LocalizeEntityFailedHandler } from './localize-entity-failed-handler'; describe('LocalizeEntityFailedHandler', () => { let ctx: ITestContext; let handler: LocalizeEntityFailedHandler; - let payloads: CheckFinishIngestItemCommand[] = []; let user: AuthenticatedManagementSubject; + let step1: ingest_item_steps.JSONSelectable; + let item1: ingest_items.JSONSelectable; + let doc1: ingest_documents.JSONSelectable; const createMessage = ( payload: LocalizeEntityFailedEvent, @@ -33,17 +38,47 @@ describe('LocalizeEntityFailedHandler', () => { beforeAll(async () => { ctx = await createTestContext(); - const storeInboxMessage: StoreInboxMessage = jest.fn( - async (_aggregateId, _messagingSettings, payload) => { - payloads.push(payload as CheckFinishIngestItemCommand); - }, - ); user = createTestUser(ctx.config.serviceId); - handler = new LocalizeEntityFailedHandler(storeInboxMessage, ctx.config); + handler = new LocalizeEntityFailedHandler(ctx.config); + }); + + beforeEach(async () => { + doc1 = await insert('ingest_documents', { + name: 'test1', + title: 'test1', + document: { + name: 'test1', + document_created: '2020-08-04T08:57:40.763+00:00', + items: [], + }, + items_count: 0, + }).run(ctx.ownerPool); + item1 = await insert('ingest_items', { + ingest_document_id: doc1.id, + external_id: 'externalId', + entity_id: 1, + type: 'MOVIE', + exists_status: 'CREATED', + display_title: 'title', + item: { + type: 'MOVIE', + external_id: 'externalId', + data: { + title: 'title', + trailers: [{ source: 'test', profile: 'DEFAULT' }], + }, + }, + }).run(ctx.ownerPool); + step1 = await insert('ingest_item_steps', { + id: randomUUID(), + type: 'IMAGE', + ingest_item_id: item1.id, + sub_type: 'COVER', + }).run(ctx.ownerPool); }); afterEach(async () => { - payloads = []; + await ctx.truncate('ingest_documents'); }); afterAll(async () => { @@ -61,8 +96,8 @@ describe('LocalizeEntityFailedHandler', () => { entity_type: 'movie', }; const context = { - ingestItemStepId: '34d91ea5-db63-4e51-b511-ae545d5c669c', - ingestItemId: 1, + ingestItemStepId: step1.id, + ingestItemId: item1.id, imageType: 'MAIN', }; @@ -72,12 +107,11 @@ describe('LocalizeEntityFailedHandler', () => { ); // Assert - expect(payloads).toHaveLength(1); - expect(payloads[0]).toEqual({ - ingest_item_step_id: '34d91ea5-db63-4e51-b511-ae545d5c669c', - ingest_item_id: 1, - error_message: payload.message, - }); + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.response_message).toEqual(payload.message); + expect(step?.status).toEqual('ERROR'); }); }); }); diff --git a/services/media/service/src/ingest/handlers/localize-entity-failed-handler.ts b/services/media/service/src/ingest/handlers/localize-entity-failed-handler.ts index 11761cdc..9719acab 100644 --- a/services/media/service/src/ingest/handlers/localize-entity-failed-handler.ts +++ b/services/media/service/src/ingest/handlers/localize-entity-failed-handler.ts @@ -3,25 +3,16 @@ import { LocalizeEntityFailedEvent, } from '@axinom/mosaic-messages'; import { Logger } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { - CheckFinishIngestItemCommand, - IngestMessageContext, - MediaServiceMessagingSettings, -} from 'media-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { IngestMessageContext } from 'media-messages'; import { ClientBase } from 'pg'; +import { update } from 'zapatos/db'; import { Config } from '../../common'; import { MediaTransactionalInboxMessageHandler } from '../../messaging'; import { checkIsIngestEvent } from '../utils/check-is-ingest-event'; export class LocalizeEntityFailedHandler extends MediaTransactionalInboxMessageHandler { - constructor( - private readonly storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(config: Config) { super( LocalizationServiceMultiTenantMessagingSettings.LocalizeEntityFailed, new Logger({ @@ -49,17 +40,13 @@ export class LocalizeEntityFailedHandler extends MediaTransactionalInboxMessageH return; } const messageContext = metadata.messageContext as IngestMessageContext; - - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, + await update( + 'ingest_item_steps', { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - error_message: payload.message, + status: 'ERROR', + response_message: payload.message, }, - ownerClient, - { metadata: { authToken: metadata.authToken } }, - ); + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } } diff --git a/services/media/service/src/ingest/handlers/localize-entity-finished-handler.db.spec.ts b/services/media/service/src/ingest/handlers/localize-entity-finished-handler.db.spec.ts index 3ae06d48..19f73f43 100644 --- a/services/media/service/src/ingest/handlers/localize-entity-finished-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/localize-entity-finished-handler.db.spec.ts @@ -1,12 +1,15 @@ import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; import { LocalizeEntityFinishedEvent } from '@axinom/mosaic-messages'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { stub } from 'jest-auto-stub'; import 'jest-extended'; -import { CheckFinishIngestItemCommand } from 'media-messages'; +import { randomUUID } from 'node:crypto'; +import { insert, selectOne } from 'zapatos/db'; +import { + ingest_documents, + ingest_items, + ingest_item_steps, +} from 'zapatos/schema'; import { createTestContext, createTestUser, @@ -17,8 +20,10 @@ import { LocalizeEntityFinishedHandler } from './localize-entity-finished-handle describe('LocalizeEntityFinishedHandler', () => { let ctx: ITestContext; let handler: LocalizeEntityFinishedHandler; - let payloads: CheckFinishIngestItemCommand[] = []; let user: AuthenticatedManagementSubject; + let step1: ingest_item_steps.JSONSelectable; + let item1: ingest_items.JSONSelectable; + let doc1: ingest_documents.JSONSelectable; const createMessage = ( payload: LocalizeEntityFinishedEvent, @@ -33,17 +38,47 @@ describe('LocalizeEntityFinishedHandler', () => { beforeAll(async () => { ctx = await createTestContext(); - const storeInboxMessage: StoreInboxMessage = jest.fn( - async (_aggregateId, _messagingSettings, payload) => { - payloads.push(payload as CheckFinishIngestItemCommand); - }, - ); user = createTestUser(ctx.config.serviceId); - handler = new LocalizeEntityFinishedHandler(storeInboxMessage, ctx.config); + handler = new LocalizeEntityFinishedHandler(ctx.config); + }); + + beforeEach(async () => { + doc1 = await insert('ingest_documents', { + name: 'test1', + title: 'test1', + document: { + name: 'test1', + document_created: '2020-08-04T08:57:40.763+00:00', + items: [], + }, + items_count: 0, + }).run(ctx.ownerPool); + item1 = await insert('ingest_items', { + ingest_document_id: doc1.id, + external_id: 'externalId', + entity_id: 1, + type: 'MOVIE', + exists_status: 'CREATED', + display_title: 'title', + item: { + type: 'MOVIE', + external_id: 'externalId', + data: { + title: 'title', + trailers: [{ source: 'test', profile: 'DEFAULT' }], + }, + }, + }).run(ctx.ownerPool); + step1 = await insert('ingest_item_steps', { + id: randomUUID(), + type: 'IMAGE', + ingest_item_id: item1.id, + sub_type: 'COVER', + }).run(ctx.ownerPool); }); afterEach(async () => { - payloads = []; + await ctx.truncate('ingest_documents'); }); afterAll(async () => { @@ -62,8 +97,8 @@ describe('LocalizeEntityFinishedHandler', () => { processed_language_tags: ['de-DE'], }; const context = { - ingestItemStepId: '34d91ea5-db63-4e51-b511-ae545d5c669c', - ingestItemId: 1, + ingestItemStepId: step1.id, + ingestItemId: item1.id, imageType: 'MAIN', }; @@ -73,11 +108,10 @@ describe('LocalizeEntityFinishedHandler', () => { ); // Assert - expect(payloads).toHaveLength(1); - expect(payloads[0]).toEqual({ - ingest_item_step_id: '34d91ea5-db63-4e51-b511-ae545d5c669c', - ingest_item_id: 1, - }); + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.status).toEqual('SUCCESS'); }); }); }); diff --git a/services/media/service/src/ingest/handlers/localize-entity-finished-handler.ts b/services/media/service/src/ingest/handlers/localize-entity-finished-handler.ts index 595f7517..6845a104 100644 --- a/services/media/service/src/ingest/handlers/localize-entity-finished-handler.ts +++ b/services/media/service/src/ingest/handlers/localize-entity-finished-handler.ts @@ -3,25 +3,16 @@ import { LocalizeEntityFinishedEvent, } from '@axinom/mosaic-messages'; import { Logger } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { - CheckFinishIngestItemCommand, - IngestMessageContext, - MediaServiceMessagingSettings, -} from 'media-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { IngestMessageContext } from 'media-messages'; import { ClientBase } from 'pg'; +import { update } from 'zapatos/db'; import { Config } from '../../common'; import { MediaTransactionalInboxMessageHandler } from '../../messaging'; import { checkIsIngestEvent } from '../utils/check-is-ingest-event'; export class LocalizeEntityFinishedHandler extends MediaTransactionalInboxMessageHandler { - constructor( - private readonly storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(config: Config) { super( LocalizationServiceMultiTenantMessagingSettings.LocalizeEntityFinished, new Logger({ @@ -51,15 +42,10 @@ export class LocalizeEntityFinishedHandler extends MediaTransactionalInboxMessag const messageContext = metadata.messageContext as IngestMessageContext; - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, - { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - }, - ownerClient, - { metadata: { authToken: metadata.authToken } }, - ); + await update( + 'ingest_item_steps', + { status: 'SUCCESS' }, + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } } diff --git a/services/media/service/src/ingest/handlers/update-metadata-handler.db.spec.ts b/services/media/service/src/ingest/handlers/update-metadata-handler.db.spec.ts index f8a0ddb6..cff38041 100644 --- a/services/media/service/src/ingest/handlers/update-metadata-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/update-metadata-handler.db.spec.ts @@ -1,18 +1,17 @@ import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; import { MosaicError } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { stub } from 'jest-auto-stub'; import 'jest-extended'; -import { - CheckFinishIngestItemCommand, - IngestItem, - UpdateMetadataCommand, -} from 'media-messages'; +import { IngestItem, UpdateMetadataCommand } from 'media-messages'; +import { randomUUID } from 'node:crypto'; import { v4 as uuid } from 'uuid'; -import { insert } from 'zapatos/db'; +import { insert, selectOne } from 'zapatos/db'; +import { + ingest_documents, + ingest_items, + ingest_item_steps, +} from 'zapatos/schema'; import { CommonErrors } from '../../common'; import { MockIngestProcessor } from '../../tests/ingest/mock-ingest-processor'; import { @@ -27,8 +26,10 @@ describe('UpdateMetadataHandler', () => { let ctx: ITestContext; let user: AuthenticatedManagementSubject; let handler: UpdateMetadataHandler; - let payloads: CheckFinishIngestItemCommand[] = []; let processor: IngestEntityProcessor; + let step1: ingest_item_steps.JSONSelectable; + let item1: ingest_items.JSONSelectable; + let doc1: ingest_documents.JSONSelectable; const createMessage = ( payload: UpdateMetadataCommand, @@ -43,24 +44,49 @@ describe('UpdateMetadataHandler', () => { beforeAll(async () => { ctx = await createTestContext(); - const storeInboxMessage: StoreInboxMessage = jest.fn( - async (_aggregateId, _messagingSettings, payload) => { - payloads.push(payload as CheckFinishIngestItemCommand); - }, - ); user = createTestUser(ctx.config.serviceId); processor = new MockIngestProcessor(); processor.updateMetadata = jest.fn(); - handler = new UpdateMetadataHandler( - [processor], - storeInboxMessage, - ctx.config, - ); + handler = new UpdateMetadataHandler([processor], ctx.config); + }); + + beforeEach(async () => { + doc1 = await insert('ingest_documents', { + name: 'test1', + title: 'test1', + document: { + name: 'test1', + document_created: '2020-08-04T08:57:40.763+00:00', + items: [], + }, + items_count: 0, + }).run(ctx.ownerPool); + item1 = await insert('ingest_items', { + ingest_document_id: doc1.id, + external_id: 'externalId', + entity_id: 1, + type: 'MOVIE', + exists_status: 'CREATED', + display_title: 'title', + item: { + type: 'MOVIE', + external_id: 'externalId', + data: { + title: 'title', + trailers: [{ source: 'test', profile: 'DEFAULT' }], + }, + }, + }).run(ctx.ownerPool); + step1 = await insert('ingest_item_steps', { + id: randomUUID(), + type: 'IMAGE', + ingest_item_id: item1.id, + sub_type: 'COVER', + }).run(ctx.ownerPool); }); afterEach(async () => { await ctx.truncate('ingest_documents'); - payloads = []; jest.restoreAllMocks(); }); @@ -75,8 +101,8 @@ describe('UpdateMetadataHandler', () => { item: { type: 'MOVIE' }, } as unknown as UpdateMetadataCommand; const context = { - ingestItemStepId: '8331d916-575e-4555-99da-ac820d456a7b', - ingestItemId: 1, + ingestItemStepId: step1.id, + ingestItemId: item1.id, }; // Act @@ -85,17 +111,15 @@ describe('UpdateMetadataHandler', () => { ); // Assert - expect(payloads).toEqual([ - { - ingest_item_step_id: '8331d916-575e-4555-99da-ac820d456a7b', - ingest_item_id: 1, - }, - ]); expect(processor.updateMetadata).toHaveBeenCalledWith( payload, expect.any(Object), undefined, ); + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.status).toEqual('SUCCESS'); }); it('message with existing LOCALIZATIONS step succeeded without errors -> message without error sent', async () => { @@ -108,36 +132,16 @@ describe('UpdateMetadataHandler', () => { const payload = { item, } as unknown as UpdateMetadataCommand; - const doc = await insert('ingest_documents', { - name: 'test1', - title: 'test1', - document: { - name: 'test1', - document_created: '2020-08-04T08:57:40.763+00:00', - items: [item], - }, - items_count: 1, - in_progress_count: 1, - }).run(ctx.ownerPool); - const ingestItem = await insert('ingest_items', { - ingest_document_id: doc.id, - external_id: 'externalId', - entity_id: 1, - type: 'MOVIE', - exists_status: 'CREATED', - display_title: 'title', - item, - }).run(ctx.ownerPool); - const step = await insert('ingest_item_steps', { + await insert('ingest_item_steps', { id: uuid(), type: 'LOCALIZATIONS', - ingest_item_id: ingestItem.id, + ingest_item_id: item1.id, sub_type: '', }).run(ctx.ownerPool); const context = { - ingestItemStepId: step.id, - ingestItemId: ingestItem.id, + ingestItemStepId: step1.id, + ingestItemId: item1.id, }; // Act @@ -146,17 +150,15 @@ describe('UpdateMetadataHandler', () => { ); // Assert - expect(payloads).toEqual([ - { - ingest_item_step_id: step.id, - ingest_item_id: ingestItem.id, - }, - ]); expect(processor.updateMetadata).toHaveBeenCalledWith( payload, expect.any(Object), - ingestItem.id, + item1.id, // Ingest item ID is passed as ingest_correlation_id ); + const updatedStep = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(updatedStep?.status).toEqual('SUCCESS'); }); }); @@ -167,8 +169,8 @@ describe('UpdateMetadataHandler', () => { item: { type: 'MOVIE' }, }); const context = { - ingestItemStepId: '8331d916-575e-4555-99da-ac820d456a7b', - ingestItemId: 1, + ingestItemStepId: step1.id, + ingestItemId: item1.id, }; // Act @@ -182,13 +184,13 @@ describe('UpdateMetadataHandler', () => { ); // Assert - expect(payloads).toEqual([ - { - ingest_item_step_id: '8331d916-575e-4555-99da-ac820d456a7b', - ingest_item_id: 1, - error_message: 'Unexpected error occurred while updating metadata.', - }, - ]); + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.response_message).toEqual( + 'Unexpected error occurred while updating metadata.', + ); + expect(step?.status).toEqual('ERROR'); }); it('message failed on all retries with ingest error -> message with ingest_item_step_id and genre update errorMessage sent', async () => { @@ -199,8 +201,8 @@ describe('UpdateMetadataHandler', () => { const errorMessage = 'Metadata update has failed because following genres do not exist: Non-existent1, Non-existent2'; const context = { - ingestItemStepId: '8331d916-575e-4555-99da-ac820d456a7b', - ingestItemId: 1, + ingestItemStepId: step1.id, + ingestItemId: item1.id, }; // Act @@ -217,13 +219,11 @@ describe('UpdateMetadataHandler', () => { ); // Assert - expect(payloads).toEqual([ - { - ingest_item_step_id: '8331d916-575e-4555-99da-ac820d456a7b', - ingest_item_id: 1, - error_message: errorMessage, - }, - ]); + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.response_message).toEqual(errorMessage); + expect(step?.status).toEqual('ERROR'); }); }); }); diff --git a/services/media/service/src/ingest/handlers/update-metadata-handler.ts b/services/media/service/src/ingest/handlers/update-metadata-handler.ts index 65e3f8c7..b49554f7 100644 --- a/services/media/service/src/ingest/handlers/update-metadata-handler.ts +++ b/services/media/service/src/ingest/handlers/update-metadata-handler.ts @@ -1,28 +1,22 @@ import { MosaicError } from '@axinom/mosaic-service-common'; import { - CheckFinishIngestItemCommand, IngestMessageContext, MediaServiceMessagingSettings, UpdateMetadataCommand, } from 'media-messages'; -import { selectOne } from 'zapatos/db'; +import { selectOne, update } from 'zapatos/db'; import { CommonErrors, Config } from '../../common'; import { IngestEntityProcessor } from '../models'; import { getIngestErrorMessage } from '../utils/ingest-validation'; import { Logger } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { ClientBase } from 'pg'; import { MediaGuardedTransactionalInboxMessageHandler } from '../../messaging'; -import { getFutureIsoDateInMilliseconds } from '../utils'; export class UpdateMetadataHandler extends MediaGuardedTransactionalInboxMessageHandler { constructor( private entityProcessors: IngestEntityProcessor[], - private readonly storeInboxMessage: StoreInboxMessage, config: Config, ) { super( @@ -65,20 +59,11 @@ export class UpdateMetadataHandler extends MediaGuardedTransactionalInboxMessage ownerClient, localizationStep ? messageContext.ingestItemId : undefined, ); - - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, - { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - }, - ownerClient, - { - metadata: { authToken: metadata.authToken }, - lockedUntil: getFutureIsoDateInMilliseconds(1_000), - }, - ); + await update( + 'ingest_item_steps', + { status: 'SUCCESS' }, + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } override async handleErrorMessage( @@ -91,20 +76,16 @@ export class UpdateMetadataHandler extends MediaGuardedTransactionalInboxMessage return; } const messageContext = metadata.messageContext as IngestMessageContext; - - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, + await update( + 'ingest_item_steps', { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - error_message: getIngestErrorMessage( + status: 'ERROR', + response_message: getIngestErrorMessage( error, 'Unexpected error occurred while updating metadata.', ), }, - ownerClient, - { metadata: { authToken: metadata.authToken } }, - ); + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } } diff --git a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.db.spec.ts b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.db.spec.ts index a0df3544..7b9533bb 100644 --- a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.db.spec.ts @@ -1,15 +1,12 @@ import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; import { UpsertLocalizationSourceEntityFailedEvent } from '@axinom/mosaic-messages'; import { MosaicError } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { stub } from 'jest-auto-stub'; import 'jest-extended'; -import { CheckFinishIngestItemCommand, IngestItem } from 'media-messages'; +import { IngestItem } from 'media-messages'; import { v4 as uuid } from 'uuid'; -import { all, insert, select } from 'zapatos/db'; +import { all, insert, select, selectOne } from 'zapatos/db'; import { ingest_documents, ingest_items, @@ -29,7 +26,6 @@ describe('UpsertLocalizationSourceEntityFailedHandler', () => { let step1: ingest_item_steps.JSONSelectable; let item1: ingest_items.JSONSelectable; let doc1: ingest_documents.JSONSelectable; - let payloads: CheckFinishIngestItemCommand[] = []; let user: AuthenticatedManagementSubject; const createMessage = ( @@ -45,16 +41,8 @@ describe('UpsertLocalizationSourceEntityFailedHandler', () => { beforeAll(async () => { ctx = await createTestContext(); - const storeInboxMessage: StoreInboxMessage = jest.fn( - async (_aggregateId, _messagingSettings, payload) => { - payloads.push(payload as CheckFinishIngestItemCommand); - }, - ); user = createTestUser(ctx.config.serviceId); - handler = new UpsertLocalizationSourceEntityFailedHandler( - storeInboxMessage, - ctx.config, - ); + handler = new UpsertLocalizationSourceEntityFailedHandler(ctx.config); }); beforeEach(async () => { @@ -95,7 +83,6 @@ describe('UpsertLocalizationSourceEntityFailedHandler', () => { afterEach(async () => { await ctx.truncate('ingest_documents'); - payloads = []; }); afterAll(async () => { @@ -123,12 +110,11 @@ describe('UpsertLocalizationSourceEntityFailedHandler', () => { ); // Assert - expect(payloads).toHaveLength(1); - expect(payloads[0]).toEqual({ - ingest_item_step_id: step1.id, - ingest_item_id: item1.id, - error_message: payload.message, - }); + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.response_message).toEqual(payload.message); + expect(step?.status).toEqual('ERROR'); }); }); diff --git a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.ts b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.ts index e0471be0..f23cc48e 100644 --- a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.ts +++ b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.ts @@ -3,25 +3,15 @@ import { UpsertLocalizationSourceEntityFailedEvent, } from '@axinom/mosaic-messages'; import { Logger, MosaicError } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { - CheckFinishIngestItemCommand, - IngestMessageContext, - MediaServiceMessagingSettings, -} from 'media-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { IngestMessageContext } from 'media-messages'; import { ClientBase } from 'pg'; import { param, selectOne, self as value, SQL, sql, update } from 'zapatos/db'; import { CommonErrors, Config, getMediaMappedError } from '../../common'; import { MediaTransactionalInboxMessageHandler } from '../../messaging'; export class UpsertLocalizationSourceEntityFailedHandler extends MediaTransactionalInboxMessageHandler { - constructor( - private readonly storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(config: Config) { super( LocalizationServiceMultiTenantMessagingSettings.UpsertLocalizationSourceEntityFailed, new Logger({ @@ -67,17 +57,14 @@ export class UpsertLocalizationSourceEntityFailedHandler extends MediaTransactio }); } - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, + await update( + 'ingest_item_steps', { - ingest_item_step_id: localizationStep?.id, - ingest_item_id: messageContext.ingestItemId, - error_message: payload.message, + status: 'ERROR', + response_message: payload.message, }, - ownerClient, - { metadata: { authToken: metadata.authToken } }, - ); + { id: localizationStep.id }, + ).run(ownerClient); } public override mapError(error: unknown): Error { diff --git a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-finished-handler.db.spec.ts b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-finished-handler.db.spec.ts index 56bf308d..a967d8c8 100644 --- a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-finished-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-finished-handler.db.spec.ts @@ -10,7 +10,7 @@ import { } from '@axinom/mosaic-transactional-inbox-outbox'; import { stub } from 'jest-auto-stub'; import 'jest-extended'; -import { CheckFinishIngestItemCommand, IngestItem } from 'media-messages'; +import { IngestItem } from 'media-messages'; import { v4 as uuid } from 'uuid'; import { all, insert, select } from 'zapatos/db'; import { @@ -39,7 +39,7 @@ describe('UpsertLocalizationSourceEntityFinishedHandler', () => { let step1: ingest_item_steps.JSONSelectable; let item1: ingest_items.JSONSelectable; let doc1: ingest_documents.JSONSelectable; - let payloads: CheckFinishIngestItemCommand[] = []; + let payloads: LocalizeEntityCommand[] = []; let user: AuthenticatedManagementSubject; const createMessage = ( @@ -59,7 +59,7 @@ describe('UpsertLocalizationSourceEntityFinishedHandler', () => { ctx = await createTestContext(); const storeOutboxMessage: StoreOutboxMessage = jest.fn( async (_aggregateId, _messagingSettings, payload) => { - payloads.push(payload as CheckFinishIngestItemCommand); + payloads.push(payload as LocalizeEntityCommand); }, ); user = createTestUser(ctx.config.serviceId); diff --git a/services/media/service/src/ingest/handlers/video-already-existed-handler.ts b/services/media/service/src/ingest/handlers/video-already-existed-handler.ts index 1f684ddb..d3b6eb51 100644 --- a/services/media/service/src/ingest/handlers/video-already-existed-handler.ts +++ b/services/media/service/src/ingest/handlers/video-already-existed-handler.ts @@ -2,20 +2,14 @@ import { EnsureVideoExistsAlreadyExistedEvent, VideoServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; -import { StoreInboxMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { Config } from '../../common'; import { IngestEntityProcessor } from '../models'; import { VideoSucceededHandler } from './video-succeeded-handler'; export class VideoAlreadyExistedHandler extends VideoSucceededHandler { - constructor( - entityProcessors: IngestEntityProcessor[], - storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(entityProcessors: IngestEntityProcessor[], config: Config) { super( entityProcessors, VideoServiceMultiTenantMessagingSettings.EnsureVideoExistsAlreadyExisted, - storeInboxMessage, config, ); } diff --git a/services/media/service/src/ingest/handlers/video-creation-started-handler.ts b/services/media/service/src/ingest/handlers/video-creation-started-handler.ts index f6cd95ae..ff823a50 100644 --- a/services/media/service/src/ingest/handlers/video-creation-started-handler.ts +++ b/services/media/service/src/ingest/handlers/video-creation-started-handler.ts @@ -2,21 +2,15 @@ import { EnsureVideoExistsCreationStartedEvent, VideoServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; -import { StoreInboxMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { Config } from '../../common'; import { IngestEntityProcessor } from '../models'; import { VideoSucceededHandler } from './video-succeeded-handler'; export class VideoCreationStartedHandler extends VideoSucceededHandler { - constructor( - entityProcessors: IngestEntityProcessor[], - storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(entityProcessors: IngestEntityProcessor[], config: Config) { super( entityProcessors, VideoServiceMultiTenantMessagingSettings.EnsureVideoExistsCreationStarted, - storeInboxMessage, config, ); } diff --git a/services/media/service/src/ingest/handlers/video-failed-handler.db.spec.ts b/services/media/service/src/ingest/handlers/video-failed-handler.db.spec.ts new file mode 100644 index 00000000..8801e088 --- /dev/null +++ b/services/media/service/src/ingest/handlers/video-failed-handler.db.spec.ts @@ -0,0 +1,116 @@ +import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; +import { EnsureVideoExistsFailedEvent } from '@axinom/mosaic-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { stub } from 'jest-auto-stub'; +import 'jest-extended'; +import { randomUUID } from 'node:crypto'; +import { insert, selectOne } from 'zapatos/db'; +import { + ingest_documents, + ingest_items, + ingest_item_steps, +} from 'zapatos/schema'; +import { + createTestContext, + createTestUser, + ITestContext, +} from '../../tests/test-utils'; +import { VideoFailedHandler } from './video-failed-handler'; + +describe('VideoFailedHandler', () => { + let handler: VideoFailedHandler; + let ctx: ITestContext; + let step1: ingest_item_steps.JSONSelectable; + let item1: ingest_items.JSONSelectable; + let doc1: ingest_documents.JSONSelectable; + let user: AuthenticatedManagementSubject; + + const createMessage = ( + payload: EnsureVideoExistsFailedEvent, + messageContext: unknown, + ) => + stub>({ + payload, + metadata: { + messageContext, + }, + }); + + beforeAll(async () => { + ctx = await createTestContext(); + user = createTestUser(ctx.config.serviceId); + handler = new VideoFailedHandler(ctx.config); + }); + + beforeEach(async () => { + doc1 = await insert('ingest_documents', { + name: 'test1', + title: 'test1', + document: { + name: 'test1', + document_created: '2020-08-04T08:57:40.763+00:00', + items: [], + }, + items_count: 0, + }).run(ctx.ownerPool); + item1 = await insert('ingest_items', { + ingest_document_id: doc1.id, + external_id: 'externalId', + entity_id: 1, + type: 'MOVIE', + exists_status: 'CREATED', + display_title: 'title', + item: { + type: 'MOVIE', + external_id: 'externalId', + data: { + title: 'title', + trailers: [{ source: 'test', profile: 'DEFAULT' }], + }, + }, + }).run(ctx.ownerPool); + step1 = await insert('ingest_item_steps', { + id: randomUUID(), + type: 'IMAGE', + ingest_item_id: item1.id, + sub_type: 'COVER', + }).run(ctx.ownerPool); + }); + + afterEach(async () => { + await ctx.truncate('ingest_documents'); + }); + + afterAll(async () => { + await ctx.dispose(); + jest.restoreAllMocks(); + }); + + describe('onMessage', () => { + it('message received -> message with error ingestItemStepId sent', async () => { + // Arrange + const payload: EnsureVideoExistsFailedEvent = { + message: 'Test error message', + video_location: 'Test', + video_profile: 'DEFAULT', + }; + const context = { + ingestItemStepId: step1.id, + ingestItemId: item1.id, + videoType: 'MAIN', + }; + + // Act + await ctx.executeOwnerSql(user, async (dbCtx) => + handler.handleMessage(createMessage(payload, context), dbCtx), + ); + + // Assert + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.response_message).toEqual('Test error message'); + expect(step?.status).toEqual('ERROR'); + }); + }); +}); diff --git a/services/media/service/src/ingest/handlers/video-failed-handler.spec.ts b/services/media/service/src/ingest/handlers/video-failed-handler.spec.ts deleted file mode 100644 index 086407f9..00000000 --- a/services/media/service/src/ingest/handlers/video-failed-handler.spec.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { EnsureVideoExistsFailedEvent } from '@axinom/mosaic-messages'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { stub } from 'jest-auto-stub'; -import 'jest-extended'; -import { CheckFinishIngestItemCommand } from 'media-messages'; -import { ClientBase } from 'pg'; -import { createTestConfig } from '../../tests/test-utils'; -import { VideoFailedHandler } from './video-failed-handler'; - -describe('VideoFailedHandler', () => { - let handler: VideoFailedHandler; - let messages: CheckFinishIngestItemCommand[] = []; - - const createMessage = ( - payload: EnsureVideoExistsFailedEvent, - messageContext: unknown, - ) => - stub>({ - payload, - metadata: { - messageContext, - }, - }); - - beforeAll(() => { - const storeInboxMessage: StoreInboxMessage = jest.fn( - async (_aggregateId, _messagingSettings, message) => { - messages.push(message as CheckFinishIngestItemCommand); - }, - ); - - handler = new VideoFailedHandler(storeInboxMessage, createTestConfig()); - }); - - afterEach(async () => { - messages = []; - }); - - afterAll(async () => { - jest.restoreAllMocks(); - }); - - describe('onMessage', () => { - it('message received -> message with error ingestItemStepId sent', async () => { - // Arrange - const payload: EnsureVideoExistsFailedEvent = { - message: 'Test error message', - video_location: 'Test', - video_profile: 'DEFAULT', - }; - const context = { - ingestItemStepId: '8331d916-575e-4555-99da-ac820d456a7b', - ingestItemId: 1, - videoType: 'MAIN', - }; - - // Act - await handler.handleMessage( - createMessage(payload, context), - stub(), - ); - - // Assert - expect(messages).toHaveLength(1); - expect(messages[0]).toEqual({ - ingest_item_step_id: '8331d916-575e-4555-99da-ac820d456a7b', - ingest_item_id: 1, - error_message: 'Test error message', - }); - }); - }); -}); diff --git a/services/media/service/src/ingest/handlers/video-failed-handler.ts b/services/media/service/src/ingest/handlers/video-failed-handler.ts index 1d736e33..14ca195d 100644 --- a/services/media/service/src/ingest/handlers/video-failed-handler.ts +++ b/services/media/service/src/ingest/handlers/video-failed-handler.ts @@ -3,25 +3,16 @@ import { VideoServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; import { Logger } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { - CheckFinishIngestItemCommand, - MediaServiceMessagingSettings, - VideoMessageContext, -} from 'media-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { VideoMessageContext } from 'media-messages'; import { ClientBase } from 'pg'; +import { update } from 'zapatos/db'; import { Config } from '../../common'; import { MediaGuardedTransactionalInboxMessageHandler } from '../../messaging'; import { checkIsIngestEvent } from '../utils/check-is-ingest-event'; export class VideoFailedHandler extends MediaGuardedTransactionalInboxMessageHandler { - constructor( - private readonly storeInboxMessage: StoreInboxMessage, - config: Config, - ) { + constructor(config: Config) { super( VideoServiceMultiTenantMessagingSettings.EnsureVideoExistsFailed, ['INGESTS_EDIT', 'ADMIN'], @@ -48,16 +39,13 @@ export class VideoFailedHandler extends MediaGuardedTransactionalInboxMessageHan const messageContext = metadata.messageContext as VideoMessageContext; - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, + await update( + 'ingest_item_steps', { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - error_message: payload.message, + status: 'ERROR', + response_message: payload.message, }, - ownerClient, - { metadata: { authToken: metadata.authToken } }, - ); + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } } diff --git a/services/media/service/src/ingest/handlers/video-succeeded-handler.db.spec.ts b/services/media/service/src/ingest/handlers/video-succeeded-handler.db.spec.ts index cb48791f..655a25a6 100644 --- a/services/media/service/src/ingest/handlers/video-succeeded-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/video-succeeded-handler.db.spec.ts @@ -1,12 +1,8 @@ import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; import { EnsureVideoExistsCreationStartedEvent } from '@axinom/mosaic-messages'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { stub } from 'jest-auto-stub'; import 'jest-extended'; -import { CheckFinishIngestItemCommand } from 'media-messages'; import { v4 as uuid } from 'uuid'; import { insert, selectOne } from 'zapatos/db'; import { @@ -30,7 +26,6 @@ describe('VideoSucceededHandler', () => { let step1: ingest_item_steps.JSONSelectable; let item1: ingest_items.JSONSelectable; let doc1: ingest_documents.JSONSelectable; - let payloads: CheckFinishIngestItemCommand[] = []; const createMessage = ( payload: EnsureVideoExistsCreationStartedEvent, @@ -45,15 +40,9 @@ describe('VideoSucceededHandler', () => { beforeAll(async () => { ctx = await createTestContext(); - const storeInboxMessage: StoreInboxMessage = jest.fn( - async (_aggregateId, _messagingSettings, payload) => { - payloads.push(payload as CheckFinishIngestItemCommand); - }, - ); user = createTestUser(ctx.config.serviceId); handler = new VideoCreationStartedHandler( [new MockIngestProcessor()], - storeInboxMessage, ctx.config, ); }); @@ -95,7 +84,6 @@ describe('VideoSucceededHandler', () => { afterEach(async () => { await ctx.truncate('ingest_documents'); - payloads = []; }); afterAll(async () => { @@ -127,13 +115,7 @@ describe('VideoSucceededHandler', () => { }).run(ctx.ownerPool); expect(step?.entity_id).toEqual(payload.video_id); - - expect(payloads).toEqual([ - { - ingest_item_step_id: step1.id, - ingest_item_id: item1.id, - }, - ]); + expect(step?.status).toEqual('SUCCESS'); }); }); @@ -145,7 +127,7 @@ describe('VideoSucceededHandler', () => { encoding_state: 'IN_PROGRESS', }; const context = { - ingestItemStepId: '8331d916-575e-4555-99da-ac820d456a7b', + ingestItemStepId: step1.id, ingestItemId: item1.id, videoType: 'MAIN', }; @@ -161,14 +143,13 @@ describe('VideoSucceededHandler', () => { ); // Assert - expect(payloads).toEqual([ - { - ingest_item_step_id: '8331d916-575e-4555-99da-ac820d456a7b', - ingest_item_id: item1.id, - error_message: - 'An unexpected error occurred while trying to update video relations.', - }, - ]); + const step = await selectOne('ingest_item_steps', { + id: step1.id, + }).run(ctx.ownerPool); + expect(step?.response_message).toEqual( + 'An unexpected error occurred while trying to update video relations.', + ); + expect(step?.status).toEqual('ERROR'); }); }); }); diff --git a/services/media/service/src/ingest/handlers/video-succeeded-handler.ts b/services/media/service/src/ingest/handlers/video-succeeded-handler.ts index 47bf68ec..2ff9d571 100644 --- a/services/media/service/src/ingest/handlers/video-succeeded-handler.ts +++ b/services/media/service/src/ingest/handlers/video-succeeded-handler.ts @@ -4,21 +4,13 @@ import { EnsureVideoExistsCreationStartedEvent, } from '@axinom/mosaic-messages'; import { Logger, MosaicError } from '@axinom/mosaic-service-common'; -import { - StoreInboxMessage, - TypedTransactionalMessage, -} from '@axinom/mosaic-transactional-inbox-outbox'; -import { - CheckFinishIngestItemCommand, - MediaServiceMessagingSettings, - VideoMessageContext, -} from 'media-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { VideoMessageContext } from 'media-messages'; import { ClientBase } from 'pg'; import { selectExactlyOne, update } from 'zapatos/db'; import { CommonErrors, Config } from '../../common'; import { MediaGuardedTransactionalInboxMessageHandler } from '../../messaging'; import { IngestEntityProcessor } from '../models'; -import { getFutureIsoDateInMilliseconds } from '../utils'; import { checkIsIngestEvent } from '../utils/check-is-ingest-event'; import { getIngestErrorMessage } from '../utils/ingest-validation'; @@ -30,7 +22,6 @@ export abstract class VideoSucceededHandler< constructor( private entityProcessors: IngestEntityProcessor[], messagingSettings: MessagingSettings, - private readonly storeInboxMessage: StoreInboxMessage, config: Config, ) { super( @@ -76,23 +67,9 @@ export abstract class VideoSucceededHandler< await update( 'ingest_item_steps', - { entity_id: payload.video_id }, + { status: 'SUCCESS', entity_id: payload.video_id }, { id: messageContext.ingestItemStepId }, ).run(ownerClient); - - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, - { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - }, - ownerClient, - { - metadata: { authToken: metadata.authToken }, - lockedUntil: getFutureIsoDateInMilliseconds(1_000), - }, - ); } override async handleErrorMessage( @@ -106,19 +83,16 @@ export abstract class VideoSucceededHandler< } const messageContext = metadata.messageContext as VideoMessageContext; - await this.storeInboxMessage( - messageContext.ingestItemId.toString(), - MediaServiceMessagingSettings.CheckFinishIngestItem, + await update( + 'ingest_item_steps', { - ingest_item_step_id: messageContext.ingestItemStepId, - ingest_item_id: messageContext.ingestItemId, - error_message: getIngestErrorMessage( + status: 'ERROR', + response_message: getIngestErrorMessage( error, 'An unexpected error occurred while trying to update video relations.', ), }, - ownerClient, - { metadata: { authToken: metadata.authToken } }, - ); + { id: messageContext.ingestItemStepId }, + ).run(ownerClient); } } diff --git a/services/media/service/src/messaging/register-messaging.ts b/services/media/service/src/messaging/register-messaging.ts index 2673638b..7b171dd8 100644 --- a/services/media/service/src/messaging/register-messaging.ts +++ b/services/media/service/src/messaging/register-messaging.ts @@ -94,7 +94,6 @@ import { } from '../domains/tvshows'; import { CheckFinishIngestDocumentHandler, - CheckFinishIngestItemHandler, ImageAlreadyExistedHandler, ImageCreatedHandler, ImageFailedHandler, @@ -321,26 +320,21 @@ const registerTransactionalInboxHandlers = ( storeOutboxMessage, config, ), - new UpdateMetadataHandler(ingestProcessors, storeInboxMessage, config), - new CheckFinishIngestItemHandler(config), + new UpdateMetadataHandler(ingestProcessors, config), new CheckFinishIngestDocumentHandler(storeInboxMessage, config), - new VideoAlreadyExistedHandler(ingestProcessors, storeInboxMessage, config), - new VideoCreationStartedHandler( - ingestProcessors, - storeInboxMessage, - config, - ), - new VideoFailedHandler(storeInboxMessage, config), - new ImageAlreadyExistedHandler(ingestProcessors, storeInboxMessage, config), - new ImageCreatedHandler(ingestProcessors, storeInboxMessage, config), - new ImageFailedHandler(storeInboxMessage, config), + new VideoAlreadyExistedHandler(ingestProcessors, config), + new VideoCreationStartedHandler(ingestProcessors, config), + new VideoFailedHandler(config), + new ImageAlreadyExistedHandler(ingestProcessors, config), + new ImageCreatedHandler(ingestProcessors, config), + new ImageFailedHandler(config), new UpsertLocalizationSourceEntityFinishedHandler( storeOutboxMessage, config, ), - new UpsertLocalizationSourceEntityFailedHandler(storeInboxMessage, config), - new LocalizeEntityFinishedHandler(storeInboxMessage, config), - new LocalizeEntityFailedHandler(storeInboxMessage, config), + new UpsertLocalizationSourceEntityFailedHandler(config), + new LocalizeEntityFinishedHandler(config), + new LocalizeEntityFailedHandler(config), ]; const commonMessageHandlers: TransactionalMessageHandler[] = [ new DeleteEntityHandler(storeOutboxMessage, config), @@ -372,15 +366,8 @@ const registerTransactionalInboxHandlers = ( return 15_000; } }, - messageProcessingTransactionLevelStrategy: (message) => { - switch (message.messageType) { - // Ensure no "parallel" updates on the ingest items - case MediaServiceMessagingSettings.CheckFinishIngestItem.messageType: - return IsolationLevel.Serializable; - default: - return IsolationLevel.RepeatableRead; - } - }, + messageProcessingTransactionLevelStrategy: () => + IsolationLevel.RepeatableRead, messageRetryStrategy: ingestMessageRetryStrategy( [...dbMessageHandlers, ...ingestMessageHandlers].map( (x) => x.messageType, @@ -410,7 +397,6 @@ const registerRabbitMqMessaging = async ( MediaServiceMessagingSettings.StartIngest, MediaServiceMessagingSettings.StartIngestItem, MediaServiceMessagingSettings.UpdateMetadata, - MediaServiceMessagingSettings.CheckFinishIngestItem, MediaServiceMessagingSettings.CheckFinishIngestDocument, VideoServiceMultiTenantMessagingSettings.EnsureVideoExistsAlreadyExisted, VideoServiceMultiTenantMessagingSettings.EnsureVideoExistsCreationStarted, @@ -432,7 +418,6 @@ const registerRabbitMqMessaging = async ( case MediaServiceMessagingSettings.StartIngest.messageType: case MediaServiceMessagingSettings.StartIngestItem.messageType: case MediaServiceMessagingSettings.UpdateMetadata.messageType: - case MediaServiceMessagingSettings.CheckFinishIngestItem.messageType: case MediaServiceMessagingSettings.CheckFinishIngestDocument .messageType: case VideoServiceMultiTenantMessagingSettings diff --git a/services/media/service/src/tests/ingest/upload.db.spec.ts b/services/media/service/src/tests/ingest/upload.db.spec.ts index 2ea01e5a..1568e9ab 100644 --- a/services/media/service/src/tests/ingest/upload.db.spec.ts +++ b/services/media/service/src/tests/ingest/upload.db.spec.ts @@ -30,7 +30,6 @@ import { getLongLivedToken } from '../../common/utils/token-utils'; import { getIngestProcessors } from '../../domains/get-ingest-processors'; import { CheckFinishIngestDocumentHandler, - CheckFinishIngestItemHandler, StartIngestHandler, StartIngestItemHandler, UpdateMetadataHandler, @@ -88,7 +87,6 @@ describe('Movies GraphQL endpoints', () => { let updateMetadata: UpdateMetadataHandler; let videoCreationStarted: VideoCreationStartedHandler; let imageCreated: ImageCreatedHandler; - let checkFinishItem: CheckFinishIngestItemHandler; let checkFinishDocument: CheckFinishIngestDocumentHandler; let messages: { messageType: string; @@ -168,28 +166,15 @@ describe('Movies GraphQL endpoints', () => { ctx.config, ); - updateMetadata = new UpdateMetadataHandler( - ingestProcessors, - storeInboxMessage, - - ctx.config, - ); + updateMetadata = new UpdateMetadataHandler(ingestProcessors, ctx.config); videoCreationStarted = new VideoCreationStartedHandler( ingestProcessors, - storeInboxMessage, - ctx.config, ); - imageCreated = new ImageCreatedHandler( - ingestProcessors, - storeInboxMessage, - - ctx.config, - ); + imageCreated = new ImageCreatedHandler(ingestProcessors, ctx.config); - checkFinishItem = new CheckFinishIngestItemHandler(ctx.config); checkFinishDocument = new CheckFinishIngestDocumentHandler( storeInboxMessage, ctx.config, @@ -592,14 +577,6 @@ describe('Movies GraphQL endpoints', () => { ); }); break; - case MediaServiceMessagingSettings.CheckFinishIngestItem.messageType: - await ctx.executeOwnerSql(user, async (txn) => { - await checkFinishItem.handleMessage( - createMessage(msg.payload, msg.envelopeOverrides), - txn, - ); - }); - break; case MediaServiceMessagingSettings.CheckFinishIngestDocument .messageType: await ctx.executeOwnerSql(user, async (txn) => { From 926d34cd7f5523d58ebd70d91c8a82ea82fac32f Mon Sep 17 00:00:00 2001 From: Sergey Trusov Date: Tue, 30 Apr 2024 07:33:03 +0200 Subject: [PATCH 2/2] chore: PR review adjustments [AB#43320] --- ...channel-published-event-handler.db.spec.ts | 2 +- ...annel-unpublished-event-handler.db.spec.ts | 2 +- ...-status-succeeded-event-handler.db.spec.ts | 2 +- ...ction-key-created-event-handler.db.spec.ts | 2 +- ...lection-published-event-handler.db.spec.ts | 2 +- ...ction-unpublished-event-handler.db.spec.ts | 2 +- ...-genres-published-event-handler.db.spec.ts | 2 +- ...enres-unpublished-event-handler.db.spec.ts | 2 +- .../movie-published-event-handler.db.spec.ts | 2 +- ...movie-unpublished-event-handler.db.spec.ts | 2 +- ...episode-published-event-handler.db.spec.ts | 2 +- ...isode-unpublished-event-handler.db.spec.ts | 2 +- .../season-published-event-handler.db.spec.ts | 2 +- ...eason-unpublished-event-handler.db.spec.ts | 2 +- ...-genres-published-event-handler.db.spec.ts | 2 +- ...enres-unpublished-event-handler.db.spec.ts | 2 +- .../tvshow-published-event-handler.db.spec.ts | 2 +- ...vshow-unpublished-event-handler.db.spec.ts | 2 +- ...0026-ingest-document-check-index-added.sql | 6 + ...xt-inbox-message-ingest-priority-added.sql | 112 ------------------ .../service/src/common/constants/index.ts | 1 + .../constants/transactional-inbox-outbox.ts | 4 + .../media/service/src/generated/db/schema.sql | 31 ++--- ...-finish-ingest-document-handler.db.spec.ts | 2 +- .../check-finish-ingest-document-handler.ts | 3 +- .../handlers/image-failed-handler.db.spec.ts | 2 +- .../image-succeeded-handler.db.spec.ts | 4 +- .../localize-entity-failed-handler.db.spec.ts | 2 +- ...ocalize-entity-finished-handler.db.spec.ts | 2 +- .../handlers/start-ingest-handler.db.spec.ts | 4 +- .../ingest/handlers/start-ingest-handler.ts | 7 +- .../start-ingest-item-handler.db.spec.ts | 4 +- .../update-metadata-handler.db.spec.ts | 4 +- ...on-source-entity-failed-handler.db.spec.ts | 4 +- ...-source-entity-finished-handler.db.spec.ts | 4 +- .../handlers/video-failed-handler.db.spec.ts | 2 +- .../video-succeeded-handler.db.spec.ts | 4 +- .../plugins/start-ingest-endpoint-plugin.ts | 3 +- .../publish-entity-handler.db.spec.ts | 4 +- .../unpublish-entity-handler.db.spec.ts | 2 +- 40 files changed, 68 insertions(+), 179 deletions(-) create mode 100644 services/media/service/migrations/committed/000026-ingest-document-check-index-added.sql delete mode 100644 services/media/service/migrations/committed/000026-next-inbox-message-ingest-priority-added.sql create mode 100644 services/media/service/src/common/constants/transactional-inbox-outbox.ts diff --git a/services/catalog/service/src/domains/channels/handlers/channel-published-event-handler.db.spec.ts b/services/catalog/service/src/domains/channels/handlers/channel-published-event-handler.db.spec.ts index 275442d7..efbfec93 100644 --- a/services/catalog/service/src/domains/channels/handlers/channel-published-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/channels/handlers/channel-published-event-handler.db.spec.ts @@ -27,7 +27,7 @@ describe('ChannelPublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('A new channel is published', async () => { // Arrange const message = createChannelPublishedMessage(uuid()); diff --git a/services/catalog/service/src/domains/channels/handlers/channel-unpublished-event-handler.db.spec.ts b/services/catalog/service/src/domains/channels/handlers/channel-unpublished-event-handler.db.spec.ts index 996ffdae..05005135 100644 --- a/services/catalog/service/src/domains/channels/handlers/channel-unpublished-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/channels/handlers/channel-unpublished-event-handler.db.spec.ts @@ -24,7 +24,7 @@ describe('ChannelPublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('An existing channel is unpublished', async () => { // Arrange const originalId = uuid(); diff --git a/services/catalog/service/src/domains/channels/handlers/check-channel-job-status-succeeded-event-handler.db.spec.ts b/services/catalog/service/src/domains/channels/handlers/check-channel-job-status-succeeded-event-handler.db.spec.ts index d92752f6..fee72310 100644 --- a/services/catalog/service/src/domains/channels/handlers/check-channel-job-status-succeeded-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/channels/handlers/check-channel-job-status-succeeded-event-handler.db.spec.ts @@ -25,7 +25,7 @@ describe('CheckChannelJobStatusSucceededEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('live stream is ready, but the channel is not yet registered in catalog -> error is thrown & channel is not created', async () => { // Arrange const originalId = uuid(); diff --git a/services/catalog/service/src/domains/channels/handlers/live-stream-protection-key-created-event-handler.db.spec.ts b/services/catalog/service/src/domains/channels/handlers/live-stream-protection-key-created-event-handler.db.spec.ts index 99789acb..b434416e 100644 --- a/services/catalog/service/src/domains/channels/handlers/live-stream-protection-key-created-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/channels/handlers/live-stream-protection-key-created-event-handler.db.spec.ts @@ -25,7 +25,7 @@ describe('LiveStreamProtectionKeyCreatedEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('live stream protection key is sent, but the channel is not yet registered in catalog -> error is thrown & channel is not created', async () => { // Arrange const originalId = uuid(); diff --git a/services/catalog/service/src/domains/collections/handlers/collection-published-event-handler.db.spec.ts b/services/catalog/service/src/domains/collections/handlers/collection-published-event-handler.db.spec.ts index 6aef4c07..ff393d4b 100644 --- a/services/catalog/service/src/domains/collections/handlers/collection-published-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/collections/handlers/collection-published-event-handler.db.spec.ts @@ -27,7 +27,7 @@ describe('CollectionPublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('A new collection is published', async () => { // Arrange const message = createCollectionPublishedMessage('collection-1'); diff --git a/services/catalog/service/src/domains/collections/handlers/collection-unpublished-event-handler.db.spec.ts b/services/catalog/service/src/domains/collections/handlers/collection-unpublished-event-handler.db.spec.ts index 90f97a3c..d87c4245 100644 --- a/services/catalog/service/src/domains/collections/handlers/collection-unpublished-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/collections/handlers/collection-unpublished-event-handler.db.spec.ts @@ -23,7 +23,7 @@ describe('CollectionPublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('An existing collection is unpublished', async () => { // Arrange const collectionId = 'collection-1'; diff --git a/services/catalog/service/src/domains/movies/handlers/movie-genres-published-event-handler.db.spec.ts b/services/catalog/service/src/domains/movies/handlers/movie-genres-published-event-handler.db.spec.ts index 2deb1828..7344a6d2 100644 --- a/services/catalog/service/src/domains/movies/handlers/movie-genres-published-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/movies/handlers/movie-genres-published-event-handler.db.spec.ts @@ -27,7 +27,7 @@ describe('MovieGenrePublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('A new movie genre is published', async () => { // Arrange const message = createMovieGenresPublishedMessage('movie_genre-1'); diff --git a/services/catalog/service/src/domains/movies/handlers/movie-genres-unpublished-event-handler.db.spec.ts b/services/catalog/service/src/domains/movies/handlers/movie-genres-unpublished-event-handler.db.spec.ts index ca2c24c8..ba789339 100644 --- a/services/catalog/service/src/domains/movies/handlers/movie-genres-unpublished-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/movies/handlers/movie-genres-unpublished-event-handler.db.spec.ts @@ -25,7 +25,7 @@ describe('MovieGenrePublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('Received a movie-genres unpublish message -> the single existing movie-genre is removed', async () => { // Arrange await insert('movie_genre', { diff --git a/services/catalog/service/src/domains/movies/handlers/movie-published-event-handler.db.spec.ts b/services/catalog/service/src/domains/movies/handlers/movie-published-event-handler.db.spec.ts index ce5c6e03..f5f6b112 100644 --- a/services/catalog/service/src/domains/movies/handlers/movie-published-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/movies/handlers/movie-published-event-handler.db.spec.ts @@ -27,7 +27,7 @@ describe('MoviePublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('A new movie is published', async () => { // Arrange const message = createMoviePublishedMessage('movie-1'); diff --git a/services/catalog/service/src/domains/movies/handlers/movie-unpublished-event-handler.db.spec.ts b/services/catalog/service/src/domains/movies/handlers/movie-unpublished-event-handler.db.spec.ts index f0883e60..b4bad95c 100644 --- a/services/catalog/service/src/domains/movies/handlers/movie-unpublished-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/movies/handlers/movie-unpublished-event-handler.db.spec.ts @@ -24,7 +24,7 @@ describe('MovieUnpublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('An existing movie is unpublished', async () => { // Arrange const movieId = 'movie-1'; diff --git a/services/catalog/service/src/domains/tvshows/handlers/episode-published-event-handler.db.spec.ts b/services/catalog/service/src/domains/tvshows/handlers/episode-published-event-handler.db.spec.ts index 4c220c8a..c557180b 100644 --- a/services/catalog/service/src/domains/tvshows/handlers/episode-published-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/tvshows/handlers/episode-published-event-handler.db.spec.ts @@ -27,7 +27,7 @@ describe('EpisodePublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('A new episode is published', async () => { // Arrange const message = createEpisodePublishedMessage('episode-1'); diff --git a/services/catalog/service/src/domains/tvshows/handlers/episode-unpublished-event-handler.db.spec.ts b/services/catalog/service/src/domains/tvshows/handlers/episode-unpublished-event-handler.db.spec.ts index 58944eb3..4ca0ef56 100644 --- a/services/catalog/service/src/domains/tvshows/handlers/episode-unpublished-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/tvshows/handlers/episode-unpublished-event-handler.db.spec.ts @@ -25,7 +25,7 @@ describe('EpisodePublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('An existing episode is unpublished', async () => { // Arrange const episodeId = 'episode-1'; diff --git a/services/catalog/service/src/domains/tvshows/handlers/season-published-event-handler.db.spec.ts b/services/catalog/service/src/domains/tvshows/handlers/season-published-event-handler.db.spec.ts index 8ca261d6..a118956a 100644 --- a/services/catalog/service/src/domains/tvshows/handlers/season-published-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/tvshows/handlers/season-published-event-handler.db.spec.ts @@ -27,7 +27,7 @@ describe('SeasonPublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('A new season is published', async () => { // Arrange const message = createSeasonPublishedMessage('season-1'); diff --git a/services/catalog/service/src/domains/tvshows/handlers/season-unpublished-event-handler.db.spec.ts b/services/catalog/service/src/domains/tvshows/handlers/season-unpublished-event-handler.db.spec.ts index e219ba48..d1ecc792 100644 --- a/services/catalog/service/src/domains/tvshows/handlers/season-unpublished-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/tvshows/handlers/season-unpublished-event-handler.db.spec.ts @@ -24,7 +24,7 @@ describe('SeasonPublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('An existing season is unpublished', async () => { // Arrange const seasonId = 'season-1'; diff --git a/services/catalog/service/src/domains/tvshows/handlers/tvshow-genres-published-event-handler.db.spec.ts b/services/catalog/service/src/domains/tvshows/handlers/tvshow-genres-published-event-handler.db.spec.ts index f86d3974..8bf46fc3 100644 --- a/services/catalog/service/src/domains/tvshows/handlers/tvshow-genres-published-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/tvshows/handlers/tvshow-genres-published-event-handler.db.spec.ts @@ -27,7 +27,7 @@ describe('TvshowGenrePublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('A new tvshow genre is published', async () => { // Arrange const message = createTvshowGenrePublishedMessage('tvshow_genre-1'); diff --git a/services/catalog/service/src/domains/tvshows/handlers/tvshow-genres-unpublished-event-handler.db.spec.ts b/services/catalog/service/src/domains/tvshows/handlers/tvshow-genres-unpublished-event-handler.db.spec.ts index d92a6ca9..e2d4e1bf 100644 --- a/services/catalog/service/src/domains/tvshows/handlers/tvshow-genres-unpublished-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/tvshows/handlers/tvshow-genres-unpublished-event-handler.db.spec.ts @@ -25,7 +25,7 @@ describe('TvshowGenrePublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('Received a tvshow-genres unpublish message -> the single existing tvshow-genre is removed', async () => { // Arrange await insert('tvshow_genre', { diff --git a/services/catalog/service/src/domains/tvshows/handlers/tvshow-published-event-handler.db.spec.ts b/services/catalog/service/src/domains/tvshows/handlers/tvshow-published-event-handler.db.spec.ts index 9b9410cd..b6686a10 100644 --- a/services/catalog/service/src/domains/tvshows/handlers/tvshow-published-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/tvshows/handlers/tvshow-published-event-handler.db.spec.ts @@ -27,7 +27,7 @@ describe('TvshowPublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('A new tvshow is published', async () => { // Arrange const message = createTvshowPublishedMessage('tvshow-1'); diff --git a/services/catalog/service/src/domains/tvshows/handlers/tvshow-unpublished-event-handler.db.spec.ts b/services/catalog/service/src/domains/tvshows/handlers/tvshow-unpublished-event-handler.db.spec.ts index 901cb913..1c201a0a 100644 --- a/services/catalog/service/src/domains/tvshows/handlers/tvshow-unpublished-event-handler.db.spec.ts +++ b/services/catalog/service/src/domains/tvshows/handlers/tvshow-unpublished-event-handler.db.spec.ts @@ -24,7 +24,7 @@ describe('TvshowPublishEventHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { test('An existing tvshow is unpublished', async () => { // Arrange const tvshowId = 'tvshow-1'; diff --git a/services/media/service/migrations/committed/000026-ingest-document-check-index-added.sql b/services/media/service/migrations/committed/000026-ingest-document-check-index-added.sql new file mode 100644 index 00000000..1f5b1f92 --- /dev/null +++ b/services/media/service/migrations/committed/000026-ingest-document-check-index-added.sql @@ -0,0 +1,6 @@ +--! Previous: sha1:9e58d9212f8204cc42f4e3b3afd0abe7d343bd12 +--! Hash: sha1:b74cde38f51e59d36d33923a186339559759f757 +--! Message: ingest-document-check-index-added + +DROP INDEX IF EXISTS idx_ingest_items_ingest_document_id_and_status CASCADE; +CREATE INDEX idx_ingest_items_ingest_document_id_and_status ON app_public.ingest_items (ingest_document_id, status); diff --git a/services/media/service/migrations/committed/000026-next-inbox-message-ingest-priority-added.sql b/services/media/service/migrations/committed/000026-next-inbox-message-ingest-priority-added.sql deleted file mode 100644 index 9f1d9d66..00000000 --- a/services/media/service/migrations/committed/000026-next-inbox-message-ingest-priority-added.sql +++ /dev/null @@ -1,112 +0,0 @@ ---! Previous: sha1:9e58d9212f8204cc42f4e3b3afd0abe7d343bd12 ---! Hash: sha1:7f4ccc8d5137091cfb2747dd7f3dfb1bd3ac7117 ---! Message: next-inbox-message-ingest-priority-added - --- Create the function to get the next batch of messages from the inbox table. -DROP FUNCTION IF EXISTS app_hidden.next_inbox_messages(integer, integer); -CREATE OR REPLACE FUNCTION app_hidden.next_inbox_messages( - max_size integer, lock_ms integer) - RETURNS SETOF app_hidden.inbox - LANGUAGE 'plpgsql' - -AS $BODY$ -DECLARE - loop_row app_hidden.inbox%ROWTYPE; - message_row app_hidden.inbox%ROWTYPE; - ids uuid[] := '{}'; -BEGIN - - IF max_size < 1 THEN - RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR'; - END IF; - - -- get 1 oldest not locked priority ingest message, either StartIngest or CheckFinishIngestDocument - BEGIN - SELECT * - INTO message_row - FROM app_hidden.inbox - WHERE processed_at IS NULL AND abandoned_at IS NULL AND aggregate_type = 'ingest-document' - ORDER BY created_at - LIMIT 1 - FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked - - IF FOUND AND message_row.locked_until <= NOW() THEN - ids := array_append(ids, message_row.id); - END IF; - EXCEPTION - WHEN lock_not_available THEN - NULL; - WHEN serialization_failure THEN - NULL; - WHEN OTHERS THEN - RAISE; - END; - - -- get (only) the oldest message of every segment but only return it if it is not locked - FOR loop_row IN - SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id - FROM app_hidden.inbox - WHERE processed_at IS NULL AND abandoned_at IS NULL AND id NOT IN (SELECT UNNEST(ids)) - ORDER BY segment, created_at) order by created_at - LOOP - BEGIN - EXIT WHEN cardinality(ids) >= max_size; - - SELECT * - INTO message_row - FROM app_hidden.inbox - WHERE id = loop_row.id - FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked - - IF message_row.locked_until > NOW() THEN - CONTINUE; - END IF; - - ids := array_append(ids, message_row.id); - EXCEPTION - WHEN lock_not_available THEN - CONTINUE; - WHEN serialization_failure THEN - CONTINUE; - END; - END LOOP; - - -- if max_size not reached: get the oldest parallelizable message independent of segment - IF cardinality(ids) < max_size THEN - FOR loop_row IN - SELECT * FROM app_hidden.inbox - WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW() - AND id NOT IN (SELECT UNNEST(ids)) - order by created_at - LOOP - BEGIN - EXIT WHEN cardinality(ids) >= max_size; - - SELECT * - INTO message_row - FROM app_hidden.inbox - WHERE id = loop_row.id - FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked - - ids := array_append(ids, message_row.id); - EXCEPTION - WHEN lock_not_available THEN - CONTINUE; - WHEN serialization_failure THEN - CONTINUE; - END; - END LOOP; - END IF; - - -- set a short lock value so the the workers can each process a message - IF cardinality(ids) > 0 THEN - - RETURN QUERY - UPDATE app_hidden.inbox - SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1 - WHERE ID = ANY(ids) - RETURNING *; - - END IF; -END; -$BODY$; diff --git a/services/media/service/src/common/constants/index.ts b/services/media/service/src/common/constants/index.ts index 8871d406..dc2f5c8e 100644 --- a/services/media/service/src/common/constants/index.ts +++ b/services/media/service/src/common/constants/index.ts @@ -1,2 +1,3 @@ export * from './iso-alpha-two-country-codes'; export * from './localization'; +export * from './transactional-inbox-outbox'; diff --git a/services/media/service/src/common/constants/transactional-inbox-outbox.ts b/services/media/service/src/common/constants/transactional-inbox-outbox.ts new file mode 100644 index 00000000..b1c1209e --- /dev/null +++ b/services/media/service/src/common/constants/transactional-inbox-outbox.ts @@ -0,0 +1,4 @@ +/** + * A segment to add to messages to give them priority processing. + */ +export const PRIORITY_SEGMENT = '.priority'; diff --git a/services/media/service/src/generated/db/schema.sql b/services/media/service/src/generated/db/schema.sql index d8907a60..91e1f813 100644 --- a/services/media/service/src/generated/db/schema.sql +++ b/services/media/service/src/generated/db/schema.sql @@ -1537,33 +1537,11 @@ BEGIN RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR'; END IF; - -- get 1 oldest not locked priority ingest message, either StartIngest or CheckFinishIngestDocument - BEGIN - SELECT * - INTO message_row - FROM app_hidden.inbox - WHERE processed_at IS NULL AND abandoned_at IS NULL AND aggregate_type = 'ingest-document' - ORDER BY created_at - LIMIT 1 - FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked - - IF FOUND AND message_row.locked_until <= NOW() THEN - ids := array_append(ids, message_row.id); - END IF; - EXCEPTION - WHEN lock_not_available THEN - NULL; - WHEN serialization_failure THEN - NULL; - WHEN OTHERS THEN - RAISE; - END; - -- get (only) the oldest message of every segment but only return it if it is not locked FOR loop_row IN SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id FROM app_hidden.inbox - WHERE processed_at IS NULL AND abandoned_at IS NULL AND id NOT IN (SELECT UNNEST(ids)) + WHERE processed_at IS NULL AND abandoned_at IS NULL ORDER BY segment, created_at) order by created_at LOOP BEGIN @@ -7830,6 +7808,13 @@ CREATE INDEX idx_ingest_items_external_id_desc_with_id ON app_public.ingest_item CREATE INDEX idx_ingest_items_ingest_document_id ON app_public.ingest_items USING btree (ingest_document_id); +-- +-- Name: idx_ingest_items_ingest_document_id_and_status; Type: INDEX; Schema: app_public; Owner: - +-- + +CREATE INDEX idx_ingest_items_ingest_document_id_and_status ON app_public.ingest_items USING btree (ingest_document_id, status); + + -- -- Name: idx_ingest_items_status_asc_with_id; Type: INDEX; Schema: app_public; Owner: - -- diff --git a/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.db.spec.ts b/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.db.spec.ts index 341cd7eb..d45702a0 100644 --- a/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.db.spec.ts @@ -107,7 +107,7 @@ describe('Check Finish Ingest Document Handler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message with all 0 counts for 2 in progress items, initial call -> seconds_without_progress remains the same, waiting 5 sec', async () => { // Arrange const payload: CheckFinishIngestDocumentCommand = { diff --git a/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.ts b/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.ts index dc974af3..e1127c3f 100644 --- a/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.ts +++ b/services/media/service/src/ingest/handlers/check-finish-ingest-document-handler.ts @@ -10,7 +10,7 @@ import { import { ClientBase } from 'pg'; import { IngestItemStatusEnum, IngestStatusEnum } from 'zapatos/custom'; import { param, self as value, sql, SQL, update } from 'zapatos/db'; -import { Config } from '../../common'; +import { Config, PRIORITY_SEGMENT } from '../../common'; import { MediaGuardedTransactionalInboxMessageHandler } from '../../messaging'; import { getFutureIsoDateInMilliseconds } from '../utils'; @@ -141,6 +141,7 @@ export class CheckFinishIngestDocumentHandler extends MediaGuardedTransactionalI { metadata: { authToken: metadata.authToken }, lockedUntil: getFutureIsoDateInMilliseconds(5_000), + segment: PRIORITY_SEGMENT, }, ); } diff --git a/services/media/service/src/ingest/handlers/image-failed-handler.db.spec.ts b/services/media/service/src/ingest/handlers/image-failed-handler.db.spec.ts index d0ac3da7..106406ce 100644 --- a/services/media/service/src/ingest/handlers/image-failed-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/image-failed-handler.db.spec.ts @@ -86,7 +86,7 @@ describe('ImageFailedHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message received -> message with error ingestItemStepId sent', async () => { // Arrange const payload: EnsureImageExistsFailedEvent = { diff --git a/services/media/service/src/ingest/handlers/image-succeeded-handler.db.spec.ts b/services/media/service/src/ingest/handlers/image-succeeded-handler.db.spec.ts index 6a1446a2..77b73308 100644 --- a/services/media/service/src/ingest/handlers/image-succeeded-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/image-succeeded-handler.db.spec.ts @@ -88,7 +88,7 @@ describe('ImageSucceededHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message succeeded without errors -> message without error sent and step updated', async () => { // Arrange const payload: EnsureImageExistsImageCreatedEvent = { @@ -115,7 +115,7 @@ describe('ImageSucceededHandler', () => { }); }); - describe('onMessageFailure', () => { + describe('handleErrorMessage', () => { it('message failed on all retries -> message with error ingestItemStepId sent', async () => { // Arrange const payload: EnsureImageExistsImageCreatedEvent = { diff --git a/services/media/service/src/ingest/handlers/localize-entity-failed-handler.db.spec.ts b/services/media/service/src/ingest/handlers/localize-entity-failed-handler.db.spec.ts index 046906f7..bfe21af1 100644 --- a/services/media/service/src/ingest/handlers/localize-entity-failed-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/localize-entity-failed-handler.db.spec.ts @@ -86,7 +86,7 @@ describe('LocalizeEntityFailedHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message received -> message with error ingestItemStepId sent', async () => { // Arrange const payload: LocalizeEntityFailedEvent = { diff --git a/services/media/service/src/ingest/handlers/localize-entity-finished-handler.db.spec.ts b/services/media/service/src/ingest/handlers/localize-entity-finished-handler.db.spec.ts index 19f73f43..a739466d 100644 --- a/services/media/service/src/ingest/handlers/localize-entity-finished-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/localize-entity-finished-handler.db.spec.ts @@ -86,7 +86,7 @@ describe('LocalizeEntityFinishedHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message received -> check ingest item message without error sent', async () => { // Arrange const payload: LocalizeEntityFinishedEvent = { diff --git a/services/media/service/src/ingest/handlers/start-ingest-handler.db.spec.ts b/services/media/service/src/ingest/handlers/start-ingest-handler.db.spec.ts index d96d6423..0ae3bf4c 100644 --- a/services/media/service/src/ingest/handlers/start-ingest-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/start-ingest-handler.db.spec.ts @@ -74,7 +74,7 @@ describe('Start Ingest Handler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message with 1 new movie -> ingest item created and message sent', async () => { // Arrange jest.spyOn(processor, 'initializeMedia').mockImplementation(async () => ({ @@ -269,7 +269,7 @@ describe('Start Ingest Handler', () => { }); }); - describe('onMessageFailure', () => { + describe('handleErrorMessage', () => { it('message failed on all retries -> document state updated to ERROR', async () => { // Arrange const docItems: IngestItem[] = [ diff --git a/services/media/service/src/ingest/handlers/start-ingest-handler.ts b/services/media/service/src/ingest/handlers/start-ingest-handler.ts index 46a2e0ab..56af86c6 100644 --- a/services/media/service/src/ingest/handlers/start-ingest-handler.ts +++ b/services/media/service/src/ingest/handlers/start-ingest-handler.ts @@ -37,7 +37,7 @@ import { update, } from 'zapatos/db'; import { ingest_items } from 'zapatos/schema'; -import { CommonErrors, Config } from '../../common'; +import { CommonErrors, Config, PRIORITY_SEGMENT } from '../../common'; import { MediaGuardedTransactionalInboxMessageHandler } from '../../messaging'; import { DisplayTitleMapping, @@ -153,7 +153,10 @@ export class StartIngestHandler extends MediaGuardedTransactionalInboxMessageHan previous_in_progress_count: 0, }, ctx, - { metadata: { authToken: metadata.authToken } }, + { + metadata: { authToken: metadata.authToken }, + segment: PRIORITY_SEGMENT, + }, ); } }, diff --git a/services/media/service/src/ingest/handlers/start-ingest-item-handler.db.spec.ts b/services/media/service/src/ingest/handlers/start-ingest-item-handler.db.spec.ts index 05f2d2ed..18f473d0 100644 --- a/services/media/service/src/ingest/handlers/start-ingest-item-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/start-ingest-item-handler.db.spec.ts @@ -152,7 +152,7 @@ describe('Start Ingest Item Handler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message for one step -> step is saved and command is sent', async () => { // Arrange const mockItem = createOrchestrationMock( @@ -374,7 +374,7 @@ describe('Start Ingest Item Handler', () => { }); }); - describe('onMessageFailure', () => { + describe('handleErrorMessage', () => { it('message failed on all retries -> item state updated to ERROR', async () => { // Arrange const payload: StartIngestItemCommand = { diff --git a/services/media/service/src/ingest/handlers/update-metadata-handler.db.spec.ts b/services/media/service/src/ingest/handlers/update-metadata-handler.db.spec.ts index cff38041..80d216ba 100644 --- a/services/media/service/src/ingest/handlers/update-metadata-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/update-metadata-handler.db.spec.ts @@ -94,7 +94,7 @@ describe('UpdateMetadataHandler', () => { await ctx.dispose(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message succeeded without errors -> message without error sent', async () => { // Arrange const payload = { @@ -162,7 +162,7 @@ describe('UpdateMetadataHandler', () => { }); }); - describe('onMessageFailure', () => { + describe('handleErrorMessage', () => { it('message failed on all retries with non-ingest error -> message with ingest_item_step_id and generic errorMessage sent', async () => { // Arrange const payload: UpdateMetadataCommand = stub({ diff --git a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.db.spec.ts b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.db.spec.ts index 7b9533bb..2af3b446 100644 --- a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-failed-handler.db.spec.ts @@ -90,7 +90,7 @@ describe('UpsertLocalizationSourceEntityFailedHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message succeeded without errors -> localize entity message sent', async () => { // Arrange const payload: UpsertLocalizationSourceEntityFailedEvent = { @@ -118,7 +118,7 @@ describe('UpsertLocalizationSourceEntityFailedHandler', () => { }); }); - describe('onMessageFailure', () => { + describe('handleErrorMessage', () => { it('message failed on all retries -> message with error sent', async () => { // Arrange const payload: UpsertLocalizationSourceEntityFailedEvent = { diff --git a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-finished-handler.db.spec.ts b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-finished-handler.db.spec.ts index a967d8c8..42e423ba 100644 --- a/services/media/service/src/ingest/handlers/upsert-localization-source-entity-finished-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/upsert-localization-source-entity-finished-handler.db.spec.ts @@ -131,7 +131,7 @@ describe('UpsertLocalizationSourceEntityFinishedHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message succeeded without errors -> localize entity message sent', async () => { // Arrange const payload: UpsertLocalizationSourceEntityFinishedEvent = { @@ -206,7 +206,7 @@ describe('UpsertLocalizationSourceEntityFinishedHandler', () => { }); }); - describe('onMessageFailure', () => { + describe('handleErrorMessage', () => { it('message failed on all retries -> message with error sent', async () => { // Arrange const payload: UpsertLocalizationSourceEntityFinishedEvent = { diff --git a/services/media/service/src/ingest/handlers/video-failed-handler.db.spec.ts b/services/media/service/src/ingest/handlers/video-failed-handler.db.spec.ts index 8801e088..b37927af 100644 --- a/services/media/service/src/ingest/handlers/video-failed-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/video-failed-handler.db.spec.ts @@ -86,7 +86,7 @@ describe('VideoFailedHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message received -> message with error ingestItemStepId sent', async () => { // Arrange const payload: EnsureVideoExistsFailedEvent = { diff --git a/services/media/service/src/ingest/handlers/video-succeeded-handler.db.spec.ts b/services/media/service/src/ingest/handlers/video-succeeded-handler.db.spec.ts index 655a25a6..d3888857 100644 --- a/services/media/service/src/ingest/handlers/video-succeeded-handler.db.spec.ts +++ b/services/media/service/src/ingest/handlers/video-succeeded-handler.db.spec.ts @@ -91,7 +91,7 @@ describe('VideoSucceededHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message succeeded without errors -> message without error sent and step updated', async () => { // Arrange const payload: EnsureVideoExistsCreationStartedEvent = { @@ -119,7 +119,7 @@ describe('VideoSucceededHandler', () => { }); }); - describe('onMessageFailure', () => { + describe('handleErrorMessage', () => { it('message failed on all retries -> message with error sent', async () => { // Arrange const payload: EnsureVideoExistsCreationStartedEvent = { diff --git a/services/media/service/src/ingest/plugins/start-ingest-endpoint-plugin.ts b/services/media/service/src/ingest/plugins/start-ingest-endpoint-plugin.ts index a7ff0fa4..a0c44fda 100644 --- a/services/media/service/src/ingest/plugins/start-ingest-endpoint-plugin.ts +++ b/services/media/service/src/ingest/plugins/start-ingest-endpoint-plugin.ts @@ -21,6 +21,7 @@ import { CommonErrors, getLongLivedToken, getMediaMappedError, + PRIORITY_SEGMENT, transformAjvErrors, } from '../../common'; import { ingestPermissionMappings } from '../../domains/permission-definition'; @@ -157,7 +158,7 @@ export const StartIngestEndpointPlugin = makeExtendSchemaPlugin((build) => { MediaServiceMessagingSettings.StartIngest, { doc_id: doc.id }, ctx, - { metadata: { authToken: token } }, + { metadata: { authToken: token }, segment: PRIORITY_SEGMENT }, ); return doc; }, diff --git a/services/media/service/src/publishing/handlers/publish-entity-handler.db.spec.ts b/services/media/service/src/publishing/handlers/publish-entity-handler.db.spec.ts index 894bb751..09155794 100644 --- a/services/media/service/src/publishing/handlers/publish-entity-handler.db.spec.ts +++ b/services/media/service/src/publishing/handlers/publish-entity-handler.db.spec.ts @@ -81,7 +81,7 @@ describe('PublishEntityCommandHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message to publish a movie -> snapshot with relation created with correct metadata and publish message sent', async () => { // Act await ctx.executeOwnerSql(user, async (txn) => { @@ -378,7 +378,7 @@ describe('PublishEntityCommandHandler', () => { }); }); - describe('onMessageFailure', () => { + describe('handleErrorMessage', () => { it('message to publish a movie failed after 10 tries -> snapshot with ERROR state created', async () => { // Act await ctx.executeOwnerSql(user, async (txn) => { diff --git a/services/media/service/src/publishing/handlers/unpublish-entity-handler.db.spec.ts b/services/media/service/src/publishing/handlers/unpublish-entity-handler.db.spec.ts index a33f2766..0016a328 100644 --- a/services/media/service/src/publishing/handlers/unpublish-entity-handler.db.spec.ts +++ b/services/media/service/src/publishing/handlers/unpublish-entity-handler.db.spec.ts @@ -90,7 +90,7 @@ describe('UnpublishEntityHandler', () => { jest.restoreAllMocks(); }); - describe('onMessage', () => { + describe('handleMessage', () => { it('message to publish a movie -> snapshot with relation created with correct metadata and publish message sent', async () => { // Act await ctx.executeOwnerSql(user, async (dbCtx) =>