Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AB#43277] Implemented the transactional inbox for the Catalog Service #251

Merged
merged 5 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/media-messages/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
"build:compile": "tsc"
},
"devDependencies": {
"@axinom/mosaic-cli": "0.29.0-rc.11",
"@axinom/mosaic-message-bus-abstractions": "0.10.0-rc.11",
"@axinom/mosaic-cli": "0.30.0-rc.0",
"@axinom/mosaic-message-bus-abstractions": "0.11.0-rc.0",
"@types/glob": "^7.2.0",
"concurrently": "^5.3.0",
"rimraf": "^3.0.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,111 +13,111 @@ channels:
bindings:
amqp:
queue:
name: movie:published
name: inbox
subscribe:
message:
$ref: '#/components/messages/movie-published-event'
'movie.unpublished':
bindings:
amqp:
queue:
name: movie:unpublished
name: inbox
subscribe:
message:
$ref: '#/components/messages/movie-unpublished-event'
'season.published':
bindings:
amqp:
queue:
name: season:published
name: inbox
subscribe:
message:
$ref: '#/components/messages/season-published-event'
'season.unpublished':
bindings:
amqp:
queue:
name: season:unpublished
name: inbox
subscribe:
message:
$ref: '#/components/messages/season-unpublished-event'
'tvshow.published':
bindings:
amqp:
queue:
name: tvshow:published
name: inbox
subscribe:
message:
$ref: '#/components/messages/tvshow-published-event'
'tvshow.unpublished':
bindings:
amqp:
queue:
name: tvshow:unpublished
name: inbox
subscribe:
message:
$ref: '#/components/messages/tvshow-unpublished-event'
'episode.published':
bindings:
amqp:
queue:
name: episode:published
name: inbox
subscribe:
message:
$ref: '#/components/messages/episode-published-event'
'episode.unpublished':
bindings:
amqp:
queue:
name: episode:unpublished
name: inbox
subscribe:
message:
$ref: '#/components/messages/episode-unpublished-event'
'collection.published':
bindings:
amqp:
queue:
name: collection:published
name: inbox
subscribe:
message:
$ref: '#/components/messages/collection-published-event'
'collection.unpublished':
bindings:
amqp:
queue:
name: collection:unpublished
name: inbox
subscribe:
message:
$ref: '#/components/messages/collection-unpublished-event'
'movie-genres.published':
bindings:
amqp:
queue:
name: movie-genres:published
name: inbox
subscribe:
message:
$ref: '#/components/messages/movie-genres-published-event'
'movie-genres.unpublished':
bindings:
amqp:
queue:
name: movie-genres:unpublished
name: inbox
subscribe:
message:
$ref: '#/components/messages/movie-genres-unpublished-event'
'tvshow-genres.published':
bindings:
amqp:
queue:
name: tvshow-genres:published
name: inbox
subscribe:
message:
$ref: '#/components/messages/tvshow-genres-published-event'
'tvshow-genres.unpublished':
bindings:
amqp:
queue:
name: tvshow-genres:unpublished
name: inbox
subscribe:
message:
$ref: '#/components/messages/tvshow-genres-unpublished-event'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,98 +3,98 @@ import { MessagingSettings } from '@axinom/mosaic-message-bus-abstractions';
export class PublishServiceMessagingSettings implements MessagingSettings {
public static MoviePublished = new PublishServiceMessagingSettings(
'MoviePublished',
'movie:published',
'inbox',
'movie.published',
'event',
'movie'
);
public static MovieUnpublished = new PublishServiceMessagingSettings(
'MovieUnpublished',
'movie:unpublished',
'inbox',
'movie.unpublished',
'event',
'movie'
);
public static SeasonPublished = new PublishServiceMessagingSettings(
'SeasonPublished',
'season:published',
'inbox',
'season.published',
'event',
'season'
);
public static SeasonUnpublished = new PublishServiceMessagingSettings(
'SeasonUnpublished',
'season:unpublished',
'inbox',
'season.unpublished',
'event',
'season'
);
public static TvshowPublished = new PublishServiceMessagingSettings(
'TvshowPublished',
'tvshow:published',
'inbox',
'tvshow.published',
'event',
'tvshow'
);
public static TvshowUnpublished = new PublishServiceMessagingSettings(
'TvshowUnpublished',
'tvshow:unpublished',
'inbox',
'tvshow.unpublished',
'event',
'tvshow'
);
public static EpisodePublished = new PublishServiceMessagingSettings(
'EpisodePublished',
'episode:published',
'inbox',
'episode.published',
'event',
'episode'
);
public static EpisodeUnpublished = new PublishServiceMessagingSettings(
'EpisodeUnpublished',
'episode:unpublished',
'inbox',
'episode.unpublished',
'event',
'episode'
);
public static CollectionPublished = new PublishServiceMessagingSettings(
'CollectionPublished',
'collection:published',
'inbox',
'collection.published',
'event',
'collection'
);
public static CollectionUnpublished = new PublishServiceMessagingSettings(
'CollectionUnpublished',
'collection:unpublished',
'inbox',
'collection.unpublished',
'event',
'collection'
);
public static MovieGenresPublished = new PublishServiceMessagingSettings(
'MovieGenresPublished',
'movie-genres:published',
'inbox',
'movie-genres.published',
'event',
'movie-genres'
);
public static MovieGenresUnpublished = new PublishServiceMessagingSettings(
'MovieGenresUnpublished',
'movie-genres:unpublished',
'inbox',
'movie-genres.unpublished',
'event',
'movie-genres'
);
public static TvshowGenresPublished = new PublishServiceMessagingSettings(
'TvshowGenresPublished',
'tvshow-genres:published',
'inbox',
'tvshow-genres.published',
'event',
'tvshow-genres'
);
public static TvshowGenresUnpublished = new PublishServiceMessagingSettings(
'TvshowGenresUnpublished',
'tvshow-genres:unpublished',
'inbox',
'tvshow-genres.unpublished',
'event',
'tvshow-genres'
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"yarn:de-dupe": "npx yarn-deduplicate yarn.lock"
},
"devDependencies": {
"@axinom/mosaic-cli": "0.29.0-rc.11",
"@axinom/mosaic-cli": "0.30.0-rc.0",
"@dbeining/react-atom": "^4.1.21",
"@jest/globals": "^29.5.0",
"@libre/atom": "^1.3.3",
Expand Down Expand Up @@ -106,4 +106,4 @@
"jest": "^29",
"jest-cli": "^29"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
--! Previous: sha1:c9457b917399b9a43ac0fc04d0e130cf728bfcb7
--! Hash: sha1:a588014c99783319a56e38380e970b4877d2257c
--! Message: transactional-inbox

-- ____ _ _ ___ ___ __ __
-- |_ _|| \| || _ ) / _ \ \ \/ /
-- | | | . || _ \| (_) | > <
-- |___||_|\_||___/ \___/ /_/\_\

DROP TABLE IF EXISTS app_hidden.inbox CASCADE;
CREATE TABLE app_hidden.inbox (
id uuid PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
message_type TEXT NOT NULL,
segment TEXT,
concurrency TEXT NOT NULL DEFAULT 'sequential',
payload JSONB NOT NULL,
metadata JSONB,
locked_until TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0),
created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),
processed_at TIMESTAMPTZ,
abandoned_at TIMESTAMPTZ,
started_attempts smallint NOT NULL DEFAULT 0,
finished_attempts smallint NOT NULL DEFAULT 0
);
ALTER TABLE app_hidden.inbox ADD CONSTRAINT inbox_concurrency_check
CHECK (concurrency IN ('sequential', 'parallel'));

GRANT SELECT, INSERT, DELETE ON app_hidden.inbox TO :DATABASE_GQL_ROLE;
GRANT UPDATE (locked_until, processed_at, abandoned_at, started_attempts, finished_attempts) ON app_hidden.inbox TO :DATABASE_GQL_ROLE;
GRANT SELECT, INSERT, UPDATE, DELETE ON app_hidden.inbox TO :DB_OWNER;


-- Create the function to get the next batch of messages from the inbox table.
DROP FUNCTION IF EXISTS app_hidden.next_inbox_messages(integer, integer);
CREATE OR REPLACE FUNCTION app_hidden.next_inbox_messages(
max_size integer, lock_ms integer)
RETURNS SETOF app_hidden.inbox
LANGUAGE 'plpgsql'

AS $BODY$
DECLARE
loop_row app_hidden.inbox%ROWTYPE;
message_row app_hidden.inbox%ROWTYPE;
ids uuid[] := '{}';
BEGIN

IF max_size < 1 THEN
RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR';
END IF;

-- get (only) the oldest message of every segment but only return it if it is not locked
FOR loop_row IN
SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id
FROM app_hidden.inbox
WHERE processed_at IS NULL AND abandoned_at IS NULL
ORDER BY segment, created_at) order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT;-- throw/catch error when locked

IF message_row.locked_until > NOW() THEN
CONTINUE;
END IF;

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;

-- if max_size not reached: get the oldest parallelizable message independent of segment
IF cardinality(ids) < max_size THEN
FOR loop_row IN
SELECT * FROM app_hidden.inbox
WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW()
AND id NOT IN (SELECT UNNEST(ids))
order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT;-- throw/catch error when locked

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;
END IF;

-- set a short lock value so the the workers can each process a message
IF cardinality(ids) > 0 THEN

RETURN QUERY
UPDATE app_hidden.inbox
SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1
WHERE ID = ANY(ids)
RETURNING *;

END IF;
END;
$BODY$;

SELECT ax_define.define_index('segment', 'inbox', 'app_hidden');
SELECT ax_define.define_index('created_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('processed_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('abandoned_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('locked_until', 'inbox', 'app_hidden');
Loading
Loading