Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AB#43320] check-finished-ingest-item message dropped #293

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,6 @@ channels:
publish:
message:
$ref: '#/components/messages/update-metadata-command'
'ingest.check_finish_item':
bindings:
amqp:
queue:
name: inbox
publish:
message:
$ref: '#/components/messages/check-finish-ingest-item-command'
'ingest.check_finish_document':
bindings:
amqp:
Expand All @@ -85,12 +77,6 @@ channels:
$ref: '#/components/messages/entity-deleted-event'
components:
messages:
check-finish-ingest-item-command:
tags:
- name: aggregate-type:ingest-item
contentType: application/json
payload:
$ref: 'commands/check-finish-ingest-item-command.json'
check-finish-ingest-document-command:
tags:
- name: aggregate-type:ingest-document
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ export class MediaServiceMessagingSettings implements MessagingSettings {
'command',
'ingest-item'
);
public static CheckFinishIngestItem = new MediaServiceMessagingSettings(
'CheckFinishIngestItem',
'inbox',
'ingest.check_finish_item',
'command',
'ingest-item'
);
public static CheckFinishIngestDocument = new MediaServiceMessagingSettings(
'CheckFinishIngestDocument',
'inbox',
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as CheckFinishIngestDocumentCommand from './check-finish-ingest-document-command.json';
import * as CheckFinishIngestItemCommand from './check-finish-ingest-item-command.json';
import * as DeleteEntityCommand from './delete-entity-command.json';
import * as PublishEntityCommand from './publish-entity-command.json';
import * as StartIngestCommand from './start-ingest-command.json';
Expand All @@ -8,7 +7,6 @@ import * as UnpublishEntityCommand from './unpublish-entity-command.json';
import * as UpdateMetadataCommand from './update-metadata-command.json';

export const CheckFinishIngestDocumentCommandSchema = CheckFinishIngestDocumentCommand;
export const CheckFinishIngestItemCommandSchema = CheckFinishIngestItemCommand;
export const DeleteEntityCommandSchema = DeleteEntityCommand;
export const PublishEntityCommandSchema = PublishEntityCommand;
export const StartIngestCommandSchema = StartIngestCommand;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from './check-finish-ingest-document-command';
export * from './check-finish-ingest-item-command';
export * from './delete-entity-command';
export * from './publish-entity-command';
export * from './start-ingest-command';
Expand All @@ -9,7 +8,6 @@ export * from './update-metadata-command';

export enum MediaCommandsSchemas {
CheckFinishIngestDocumentCommand = 'payloads/media/commands/check-finish-ingest-document-command.json',
CheckFinishIngestItemCommand = 'payloads/media/commands/check-finish-ingest-item-command.json',
DeleteEntityCommand = 'payloads/media/commands/delete-entity-command.json',
PublishEntityCommand = 'payloads/media/commands/publish-entity-command.json',
StartIngestCommand = 'payloads/media/commands/start-ingest-command.json',
Expand All @@ -20,7 +18,6 @@ export enum MediaCommandsSchemas {

export enum MediaCommandsTypes {
CheckFinishIngestDocumentCommand = 'CheckFinishIngestDocumentCommand',
CheckFinishIngestItemCommand = 'CheckFinishIngestItemCommand',
DeleteEntityCommand = 'DeleteEntityCommand',
PublishEntityCommand = 'PublishEntityCommand',
StartIngestCommand = 'StartIngestCommand',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
--! Previous: sha1:9e58d9212f8204cc42f4e3b3afd0abe7d343bd12
--! Hash: sha1:7f4ccc8d5137091cfb2747dd7f3dfb1bd3ac7117
--! Message: next-inbox-message-ingest-priority-added

-- Create the function to get the next batch of messages from the inbox table.
DROP FUNCTION IF EXISTS app_hidden.next_inbox_messages(integer, integer);
CREATE OR REPLACE FUNCTION app_hidden.next_inbox_messages(
max_size integer, lock_ms integer)
RETURNS SETOF app_hidden.inbox
LANGUAGE 'plpgsql'

AS $BODY$
DECLARE
loop_row app_hidden.inbox%ROWTYPE;
message_row app_hidden.inbox%ROWTYPE;
ids uuid[] := '{}';
BEGIN

IF max_size < 1 THEN
RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR';
END IF;

-- get 1 oldest not locked priority ingest message, either StartIngest or CheckFinishIngestDocument
BEGIN
SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE processed_at IS NULL AND abandoned_at IS NULL AND aggregate_type = 'ingest-document'
ORDER BY created_at
LIMIT 1
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

IF FOUND AND message_row.locked_until <= NOW() THEN
ids := array_append(ids, message_row.id);
END IF;
EXCEPTION
WHEN lock_not_available THEN
NULL;
WHEN serialization_failure THEN
NULL;
WHEN OTHERS THEN
RAISE;
END;

-- get (only) the oldest message of every segment but only return it if it is not locked
FOR loop_row IN
SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id
AxTrusov marked this conversation as resolved.
Show resolved Hide resolved
FROM app_hidden.inbox
WHERE processed_at IS NULL AND abandoned_at IS NULL AND id NOT IN (SELECT UNNEST(ids))
ORDER BY segment, created_at) order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

IF message_row.locked_until > NOW() THEN
CONTINUE;
END IF;

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;

-- if max_size not reached: get the oldest parallelizable message independent of segment
IF cardinality(ids) < max_size THEN
FOR loop_row IN
SELECT * FROM app_hidden.inbox
WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW()
AND id NOT IN (SELECT UNNEST(ids))
order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;
END IF;

-- set a short lock value so the the workers can each process a message
IF cardinality(ids) > 0 THEN

RETURN QUERY
UPDATE app_hidden.inbox
SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1
WHERE ID = ANY(ids)
RETURNING *;

END IF;
END;
$BODY$;
24 changes: 23 additions & 1 deletion services/media/service/src/generated/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1537,11 +1537,33 @@ BEGIN
RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR';
END IF;

-- get 1 oldest not locked priority ingest message, either StartIngest or CheckFinishIngestDocument
BEGIN
SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE processed_at IS NULL AND abandoned_at IS NULL AND aggregate_type = 'ingest-document'
ORDER BY created_at
LIMIT 1
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

IF FOUND AND message_row.locked_until <= NOW() THEN
ids := array_append(ids, message_row.id);
END IF;
EXCEPTION
WHEN lock_not_available THEN
NULL;
WHEN serialization_failure THEN
NULL;
WHEN OTHERS THEN
RAISE;
END;

-- get (only) the oldest message of every segment but only return it if it is not locked
FOR loop_row IN
SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id
FROM app_hidden.inbox
WHERE processed_at IS NULL AND abandoned_at IS NULL
WHERE processed_at IS NULL AND abandoned_at IS NULL AND id NOT IN (SELECT UNNEST(ids))
ORDER BY segment, created_at) order by created_at
LOOP
BEGIN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,30 @@ export class CheckFinishIngestDocumentHandler extends MediaGuardedTransactionalI
}: TypedTransactionalMessage<CheckFinishIngestDocumentCommand>,
ownerClient: ClientBase,
): Promise<void> {
const docId = param(ingest_document_id);
await sql`WITH updated AS (
AxTrusov marked this conversation as resolved.
Show resolved Hide resolved
SELECT
iis.ingest_item_id,
CASE
WHEN BOOL_OR(iis.status = 'IN_PROGRESS') THEN NULL
WHEN BOOL_OR(iis.status = 'ERROR') THEN 'ERROR'
ELSE 'SUCCESS'
END as new_status
FROM app_public.ingest_item_steps iis
JOIN app_public.ingest_items ii ON iis.ingest_item_id = ii.id
WHERE ii.ingest_document_id = ${docId} AND ii.status = 'IN_PROGRESS'
GROUP BY iis.ingest_item_id
)
UPDATE app_public.ingest_items item
SET status = updated.new_status
FROM updated
WHERE item.id = updated.ingest_item_id AND
updated.new_status IS NOT NULL;`.run(ownerClient);

const countGroups = await sql<SQL, StatusAggregation[]>`
SELECT status, COUNT (status)
FROM app_public.ingest_items
WHERE ingest_document_id = ${param(ingest_document_id)}
WHERE ingest_document_id = ${docId}
GROUP BY status;
`.run(ownerClient);

Expand Down Expand Up @@ -86,7 +106,13 @@ export class CheckFinishIngestDocumentHandler extends MediaGuardedTransactionalI
seconds_without_progress = 0;
}

if (seconds_without_progress >= 600) {
// At baseline, allow 5 minutes of inactivity for any ingest, no matter the
// reason (e.g. services starting up).
// Add additional minute of inactivity for every 250 items in the document.
const maxSecondsOfInactivity =
600 + 60 * Math.round(updatedDoc.items_count / 250);

if (seconds_without_progress >= maxSecondsOfInactivity) {
const error = param({
message:
'The progress of ingest failed to change for a long period of time. Assuming an unexpected messaging issue and failing the document.',
Expand Down
Loading
Loading