Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AB#43277]Transactional inbox outbox messaging for the Entitlement Service #252

Merged
merged 2 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
--! Previous: sha1:1db009b8adf828070d8f34c312b5bea7a61686b3
--! Hash: sha1:63f1b3dec39dc8c9d0b4125c90b55383e892e7f1
--! Message: transactional-outbox-inbox

-- ___ _ _ _____ ___ ___ __ __
-- / _ \ | | | ||_ _|| _ ) / _ \ \ \/ /
-- | (_) || |_| | | | | _ \| (_) | > <
-- \___/ \___/ |_| |___/ \___/ /_/\_\

DROP TABLE IF EXISTS app_hidden.outbox CASCADE;
CREATE TABLE app_hidden.outbox (
id uuid PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
message_type TEXT NOT NULL,
segment TEXT,
concurrency TEXT NOT NULL DEFAULT 'sequential',
payload JSONB NOT NULL,
metadata JSONB,
locked_until TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0),
created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),
processed_at TIMESTAMPTZ,
abandoned_at TIMESTAMPTZ,
started_attempts smallint NOT NULL DEFAULT 0,
finished_attempts smallint NOT NULL DEFAULT 0
);
ALTER TABLE app_hidden.outbox ADD CONSTRAINT outbox_concurrency_check
CHECK (concurrency IN ('sequential', 'parallel'));

GRANT SELECT, INSERT, DELETE ON app_hidden.outbox TO :DATABASE_GQL_ROLE;
GRANT UPDATE (locked_until, processed_at, abandoned_at, started_attempts, finished_attempts) ON app_hidden.outbox TO :DATABASE_GQL_ROLE;
GRANT SELECT, INSERT, UPDATE, DELETE ON app_hidden.outbox TO :DB_OWNER;

-- Create the function to get the next batch of messages from the outbox table.
DROP FUNCTION IF EXISTS app_hidden.next_outbox_messages(integer, integer);
CREATE OR REPLACE FUNCTION app_hidden.next_outbox_messages(
max_size integer, lock_ms integer)
RETURNS SETOF app_hidden.outbox
LANGUAGE 'plpgsql'

AS $BODY$
DECLARE
loop_row app_hidden.outbox%ROWTYPE;
message_row app_hidden.outbox%ROWTYPE;
ids uuid[] := '{}';
BEGIN

IF max_size < 1 THEN
RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR';
END IF;

-- get (only) the oldest message of every segment but only return it if it is not locked
FOR loop_row IN
SELECT * FROM app_hidden.outbox m WHERE m.id in (SELECT DISTINCT ON (segment) id
FROM app_hidden.outbox
WHERE processed_at IS NULL AND abandoned_at IS NULL
ORDER BY segment, created_at) order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.outbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

IF message_row.locked_until > NOW() THEN
CONTINUE;
END IF;

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;

-- if max_size not reached: get the oldest parallelizable message independent of segment
IF cardinality(ids) < max_size THEN
FOR loop_row IN
SELECT * FROM app_hidden.outbox
WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW()
AND id NOT IN (SELECT UNNEST(ids))
order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.outbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;
END IF;

-- set a short lock value so the the workers can each process a message
IF cardinality(ids) > 0 THEN

RETURN QUERY
UPDATE app_hidden.outbox
SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1
WHERE ID = ANY(ids)
RETURNING *;

END IF;
END;
$BODY$;

SELECT ax_define.define_index('segment', 'outbox', 'app_hidden');
SELECT ax_define.define_index('created_at', 'outbox', 'app_hidden');
SELECT ax_define.define_index('processed_at', 'outbox', 'app_hidden');
SELECT ax_define.define_index('abandoned_at', 'outbox', 'app_hidden');
SELECT ax_define.define_index('locked_until', 'outbox', 'app_hidden');

-- ____ _ _ ___ ___ __ __
-- |_ _|| \| || _ ) / _ \ \ \/ /
-- | | | . || _ \| (_) | > <
-- |___||_|\_||___/ \___/ /_/\_\

DROP TABLE IF EXISTS app_hidden.inbox CASCADE;
CREATE TABLE app_hidden.inbox (
id uuid PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
message_type TEXT NOT NULL,
segment TEXT,
concurrency TEXT NOT NULL DEFAULT 'sequential',
payload JSONB NOT NULL,
metadata JSONB,
locked_until TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0),
created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),
processed_at TIMESTAMPTZ,
abandoned_at TIMESTAMPTZ,
started_attempts smallint NOT NULL DEFAULT 0,
finished_attempts smallint NOT NULL DEFAULT 0
);
ALTER TABLE app_hidden.inbox ADD CONSTRAINT inbox_concurrency_check
CHECK (concurrency IN ('sequential', 'parallel'));

GRANT SELECT, INSERT, DELETE ON app_hidden.inbox TO :DATABASE_GQL_ROLE;
GRANT UPDATE (locked_until, processed_at, abandoned_at, started_attempts, finished_attempts) ON app_hidden.inbox TO :DATABASE_GQL_ROLE;
GRANT SELECT, INSERT, UPDATE, DELETE ON app_hidden.inbox TO :DB_OWNER;


-- Create the function to get the next batch of messages from the inbox table.
DROP FUNCTION IF EXISTS app_hidden.next_inbox_messages(integer, integer);
CREATE OR REPLACE FUNCTION app_hidden.next_inbox_messages(
max_size integer, lock_ms integer)
RETURNS SETOF app_hidden.inbox
LANGUAGE 'plpgsql'

AS $BODY$
DECLARE
loop_row app_hidden.inbox%ROWTYPE;
message_row app_hidden.inbox%ROWTYPE;
ids uuid[] := '{}';
BEGIN

IF max_size < 1 THEN
RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR';
END IF;

-- get (only) the oldest message of every segment but only return it if it is not locked
FOR loop_row IN
SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id
FROM app_hidden.inbox
WHERE processed_at IS NULL AND abandoned_at IS NULL
ORDER BY segment, created_at) order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

IF message_row.locked_until > NOW() THEN
CONTINUE;
END IF;

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;

-- if max_size not reached: get the oldest parallelizable message independent of segment
IF cardinality(ids) < max_size THEN
FOR loop_row IN
SELECT * FROM app_hidden.inbox
WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW()
AND id NOT IN (SELECT UNNEST(ids))
order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;
END IF;

-- set a short lock value so the the workers can each process a message
IF cardinality(ids) > 0 THEN

RETURN QUERY
UPDATE app_hidden.inbox
SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1
WHERE ID = ANY(ids)
RETURNING *;

END IF;
END;
$BODY$;

SELECT ax_define.define_index('segment', 'inbox', 'app_hidden');
SELECT ax_define.define_index('created_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('processed_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('abandoned_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('locked_until', 'inbox', 'app_hidden');
6 changes: 4 additions & 2 deletions services/entitlement/service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@
},
"dependencies": {
"@axinom/mosaic-db-common": "0.34.0-rc.0",
"@axinom/mosaic-graphql-common": "0.10.0-rc.0",
"@axinom/mosaic-id-guard": "0.29.0-rc.0",
"@axinom/mosaic-id-utils": "0.15.14-rc.0",
"@axinom/mosaic-message-bus": "0.24.0-rc.0",
"@axinom/mosaic-messages": "0.40.0-rc.0",
"@axinom/mosaic-service-common": "0.46.0-rc.0",
"@axinom/mosaic-graphql-common": "0.10.0-rc.0",
"@axinom/mosaic-transactional-inbox-outbox": "0.6.0-rc.0",
"ajv": "^7.2.4",
"ajv-formats": "^1.6.1",
"@graphile-contrib/pg-simplify-inflector": "^6.1.0",
Expand All @@ -72,6 +73,7 @@
"jsonwebtoken": "^9.0.0",
"node-fetch": "^2.6.11",
"pg": "^8.11.0",
"pg-transactional-outbox": "0.5.1",
"pluralize": "^7.0.0",
"postgraphile": "4.13.0",
"postgraphile-plugin-atomic-mutations": "^1.0.4",
Expand Down Expand Up @@ -112,4 +114,4 @@
"ts-node": "^10.9.1",
"tsc-watch": "^4.6.2"
}
}
}
2 changes: 1 addition & 1 deletion services/entitlement/service/scripts/codegen-zapatos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async function main(): Promise<void> {
schemas: {
app_public: { include: '*', exclude: [] },
ax_utils: { include: '*', exclude: [] },
app_hidden: { include: '*', exclude: [] },
app_hidden: { include: '*', exclude: ['inbox', 'outbox'] }, // excluding the 'inbox' and 'outbox' storages
app_private: { include: '*', exclude: [] },
public: { include: '*', exclude: [] },
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import { Broker } from '@axinom/mosaic-message-bus';
import { OwnerPgPool } from '@axinom/mosaic-db-common';
import {
MonetizationGrantsServiceMultiTenantMessagingSettings,
SynchronizeClaimDefinitionsCommand,
} from '@axinom/mosaic-messages';
import { StoreOutboxMessage } from '@axinom/mosaic-transactional-inbox-outbox';
import { Config, requestServiceAccountToken } from '../../common';
import { claimDefinitionGroups } from './claim-definition-groups';

export const syncClaimDefinitions = async (
broker: Broker,
storeOutboxMessage: StoreOutboxMessage,
loginPool: OwnerPgPool,
config: Config,
): Promise<void> => {
const serviceAccountToken = await requestServiceAccountToken(config);
const settings =
MonetizationGrantsServiceMultiTenantMessagingSettings.SynchronizeClaimDefinitions;
await broker.publish<SynchronizeClaimDefinitionsCommand>(
// for a single query Postgres creates a single transaction - no need to manually create one
await storeOutboxMessage<SynchronizeClaimDefinitionsCommand>(
config.environmentId,
settings,
{ claim_definition_groups: claimDefinitionGroups },
{ auth_token: serviceAccountToken.accessToken },
loginPool,
{
routingKey: settings.getEnvironmentRoutingKey({
tenantId: config.tenantId,
environmentId: config.environmentId,
}),
envelopeOverrides: { auth_token: serviceAccountToken.accessToken },
options: {
routingKey: settings.getEnvironmentRoutingKey({
tenantId: config.tenantId,
environmentId: config.environmentId,
}),
},
},
);
};
Loading
Loading