Skip to content

Commit

Permalink
Merge pull request #252 from Axinom/feature/trx-inbox-outbox-entitlement
Browse files Browse the repository at this point in the history
[AB#43277]Transactional inbox outbox messaging for the Entitlement Service
  • Loading branch information
Zehelein authored Mar 5, 2024
2 parents f637694 + 6d3cfdf commit 23d4c89
Show file tree
Hide file tree
Showing 19 changed files with 1,150 additions and 327 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
--! Previous: sha1:1db009b8adf828070d8f34c312b5bea7a61686b3
--! Hash: sha1:63f1b3dec39dc8c9d0b4125c90b55383e892e7f1
--! Message: transactional-outbox-inbox

-- ___ _ _ _____ ___ ___ __ __
-- / _ \ | | | ||_ _|| _ ) / _ \ \ \/ /
-- | (_) || |_| | | | | _ \| (_) | > <
-- \___/ \___/ |_| |___/ \___/ /_/\_\

DROP TABLE IF EXISTS app_hidden.outbox CASCADE;
CREATE TABLE app_hidden.outbox (
id uuid PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
message_type TEXT NOT NULL,
segment TEXT,
concurrency TEXT NOT NULL DEFAULT 'sequential',
payload JSONB NOT NULL,
metadata JSONB,
locked_until TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0),
created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),
processed_at TIMESTAMPTZ,
abandoned_at TIMESTAMPTZ,
started_attempts smallint NOT NULL DEFAULT 0,
finished_attempts smallint NOT NULL DEFAULT 0
);
ALTER TABLE app_hidden.outbox ADD CONSTRAINT outbox_concurrency_check
CHECK (concurrency IN ('sequential', 'parallel'));

GRANT SELECT, INSERT, DELETE ON app_hidden.outbox TO :DATABASE_GQL_ROLE;
GRANT UPDATE (locked_until, processed_at, abandoned_at, started_attempts, finished_attempts) ON app_hidden.outbox TO :DATABASE_GQL_ROLE;
GRANT SELECT, INSERT, UPDATE, DELETE ON app_hidden.outbox TO :DB_OWNER;

-- Create the function to get the next batch of messages from the outbox table.
DROP FUNCTION IF EXISTS app_hidden.next_outbox_messages(integer, integer);
CREATE OR REPLACE FUNCTION app_hidden.next_outbox_messages(
max_size integer, lock_ms integer)
RETURNS SETOF app_hidden.outbox
LANGUAGE 'plpgsql'

AS $BODY$
DECLARE
loop_row app_hidden.outbox%ROWTYPE;
message_row app_hidden.outbox%ROWTYPE;
ids uuid[] := '{}';
BEGIN

IF max_size < 1 THEN
RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR';
END IF;

-- get (only) the oldest message of every segment but only return it if it is not locked
FOR loop_row IN
SELECT * FROM app_hidden.outbox m WHERE m.id in (SELECT DISTINCT ON (segment) id
FROM app_hidden.outbox
WHERE processed_at IS NULL AND abandoned_at IS NULL
ORDER BY segment, created_at) order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.outbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

IF message_row.locked_until > NOW() THEN
CONTINUE;
END IF;

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;

-- if max_size not reached: get the oldest parallelizable message independent of segment
IF cardinality(ids) < max_size THEN
FOR loop_row IN
SELECT * FROM app_hidden.outbox
WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW()
AND id NOT IN (SELECT UNNEST(ids))
order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.outbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;
END IF;

-- set a short lock value so the the workers can each process a message
IF cardinality(ids) > 0 THEN

RETURN QUERY
UPDATE app_hidden.outbox
SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1
WHERE ID = ANY(ids)
RETURNING *;

END IF;
END;
$BODY$;

SELECT ax_define.define_index('segment', 'outbox', 'app_hidden');
SELECT ax_define.define_index('created_at', 'outbox', 'app_hidden');
SELECT ax_define.define_index('processed_at', 'outbox', 'app_hidden');
SELECT ax_define.define_index('abandoned_at', 'outbox', 'app_hidden');
SELECT ax_define.define_index('locked_until', 'outbox', 'app_hidden');

-- ____ _ _ ___ ___ __ __
-- |_ _|| \| || _ ) / _ \ \ \/ /
-- | | | . || _ \| (_) | > <
-- |___||_|\_||___/ \___/ /_/\_\

DROP TABLE IF EXISTS app_hidden.inbox CASCADE;
CREATE TABLE app_hidden.inbox (
id uuid PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
message_type TEXT NOT NULL,
segment TEXT,
concurrency TEXT NOT NULL DEFAULT 'sequential',
payload JSONB NOT NULL,
metadata JSONB,
locked_until TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0),
created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),
processed_at TIMESTAMPTZ,
abandoned_at TIMESTAMPTZ,
started_attempts smallint NOT NULL DEFAULT 0,
finished_attempts smallint NOT NULL DEFAULT 0
);
ALTER TABLE app_hidden.inbox ADD CONSTRAINT inbox_concurrency_check
CHECK (concurrency IN ('sequential', 'parallel'));

GRANT SELECT, INSERT, DELETE ON app_hidden.inbox TO :DATABASE_GQL_ROLE;
GRANT UPDATE (locked_until, processed_at, abandoned_at, started_attempts, finished_attempts) ON app_hidden.inbox TO :DATABASE_GQL_ROLE;
GRANT SELECT, INSERT, UPDATE, DELETE ON app_hidden.inbox TO :DB_OWNER;


-- Create the function to get the next batch of messages from the inbox table.
DROP FUNCTION IF EXISTS app_hidden.next_inbox_messages(integer, integer);
CREATE OR REPLACE FUNCTION app_hidden.next_inbox_messages(
max_size integer, lock_ms integer)
RETURNS SETOF app_hidden.inbox
LANGUAGE 'plpgsql'

AS $BODY$
DECLARE
loop_row app_hidden.inbox%ROWTYPE;
message_row app_hidden.inbox%ROWTYPE;
ids uuid[] := '{}';
BEGIN

IF max_size < 1 THEN
RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR';
END IF;

-- get (only) the oldest message of every segment but only return it if it is not locked
FOR loop_row IN
SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id
FROM app_hidden.inbox
WHERE processed_at IS NULL AND abandoned_at IS NULL
ORDER BY segment, created_at) order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

IF message_row.locked_until > NOW() THEN
CONTINUE;
END IF;

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;

-- if max_size not reached: get the oldest parallelizable message independent of segment
IF cardinality(ids) < max_size THEN
FOR loop_row IN
SELECT * FROM app_hidden.inbox
WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW()
AND id NOT IN (SELECT UNNEST(ids))
order by created_at
LOOP
BEGIN
EXIT WHEN cardinality(ids) >= max_size;

SELECT *
INTO message_row
FROM app_hidden.inbox
WHERE id = loop_row.id
FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked

ids := array_append(ids, message_row.id);
EXCEPTION
WHEN lock_not_available THEN
CONTINUE;
WHEN serialization_failure THEN
CONTINUE;
END;
END LOOP;
END IF;

-- set a short lock value so the the workers can each process a message
IF cardinality(ids) > 0 THEN

RETURN QUERY
UPDATE app_hidden.inbox
SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1
WHERE ID = ANY(ids)
RETURNING *;

END IF;
END;
$BODY$;

SELECT ax_define.define_index('segment', 'inbox', 'app_hidden');
SELECT ax_define.define_index('created_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('processed_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('abandoned_at', 'inbox', 'app_hidden');
SELECT ax_define.define_index('locked_until', 'inbox', 'app_hidden');
6 changes: 4 additions & 2 deletions services/entitlement/service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@
},
"dependencies": {
"@axinom/mosaic-db-common": "0.34.0-rc.0",
"@axinom/mosaic-graphql-common": "0.10.0-rc.0",
"@axinom/mosaic-id-guard": "0.29.0-rc.0",
"@axinom/mosaic-id-utils": "0.15.14-rc.0",
"@axinom/mosaic-message-bus": "0.24.0-rc.0",
"@axinom/mosaic-messages": "0.40.0-rc.0",
"@axinom/mosaic-service-common": "0.46.0-rc.0",
"@axinom/mosaic-graphql-common": "0.10.0-rc.0",
"@axinom/mosaic-transactional-inbox-outbox": "0.6.0-rc.0",
"ajv": "^7.2.4",
"ajv-formats": "^1.6.1",
"@graphile-contrib/pg-simplify-inflector": "^6.1.0",
Expand All @@ -72,6 +73,7 @@
"jsonwebtoken": "^9.0.0",
"node-fetch": "^2.6.11",
"pg": "^8.11.0",
"pg-transactional-outbox": "0.5.1",
"pluralize": "^7.0.0",
"postgraphile": "4.13.0",
"postgraphile-plugin-atomic-mutations": "^1.0.4",
Expand Down Expand Up @@ -112,4 +114,4 @@
"ts-node": "^10.9.1",
"tsc-watch": "^4.6.2"
}
}
}
2 changes: 1 addition & 1 deletion services/entitlement/service/scripts/codegen-zapatos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async function main(): Promise<void> {
schemas: {
app_public: { include: '*', exclude: [] },
ax_utils: { include: '*', exclude: [] },
app_hidden: { include: '*', exclude: [] },
app_hidden: { include: '*', exclude: ['inbox', 'outbox'] }, // excluding the 'inbox' and 'outbox' storages
app_private: { include: '*', exclude: [] },
public: { include: '*', exclude: [] },
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import { Broker } from '@axinom/mosaic-message-bus';
import { OwnerPgPool } from '@axinom/mosaic-db-common';
import {
MonetizationGrantsServiceMultiTenantMessagingSettings,
SynchronizeClaimDefinitionsCommand,
} from '@axinom/mosaic-messages';
import { StoreOutboxMessage } from '@axinom/mosaic-transactional-inbox-outbox';
import { Config, requestServiceAccountToken } from '../../common';
import { claimDefinitionGroups } from './claim-definition-groups';

export const syncClaimDefinitions = async (
broker: Broker,
storeOutboxMessage: StoreOutboxMessage,
loginPool: OwnerPgPool,
config: Config,
): Promise<void> => {
const serviceAccountToken = await requestServiceAccountToken(config);
const settings =
MonetizationGrantsServiceMultiTenantMessagingSettings.SynchronizeClaimDefinitions;
await broker.publish<SynchronizeClaimDefinitionsCommand>(
// for a single query Postgres creates a single transaction - no need to manually create one
await storeOutboxMessage<SynchronizeClaimDefinitionsCommand>(
config.environmentId,
settings,
{ claim_definition_groups: claimDefinitionGroups },
{ auth_token: serviceAccountToken.accessToken },
loginPool,
{
routingKey: settings.getEnvironmentRoutingKey({
tenantId: config.tenantId,
environmentId: config.environmentId,
}),
envelopeOverrides: { auth_token: serviceAccountToken.accessToken },
options: {
routingKey: settings.getEnvironmentRoutingKey({
tenantId: config.tenantId,
environmentId: config.environmentId,
}),
},
},
);
};
Loading

0 comments on commit 23d4c89

Please sign in to comment.