diff --git a/services/entitlement/service/migrations/committed/000007-transactional-outbox-inbox.sql b/services/entitlement/service/migrations/committed/000007-transactional-outbox-inbox.sql new file mode 100644 index 00000000..bddb1f5b --- /dev/null +++ b/services/entitlement/service/migrations/committed/000007-transactional-outbox-inbox.sql @@ -0,0 +1,248 @@ +--! Previous: sha1:1db009b8adf828070d8f34c312b5bea7a61686b3 +--! Hash: sha1:63f1b3dec39dc8c9d0b4125c90b55383e892e7f1 +--! Message: transactional-outbox-inbox + +-- ___ _ _ _____ ___ ___ __ __ +-- / _ \ | | | ||_ _|| _ ) / _ \ \ \/ / +-- | (_) || |_| | | | | _ \| (_) | > < +-- \___/ \___/ |_| |___/ \___/ /_/\_\ + +DROP TABLE IF EXISTS app_hidden.outbox CASCADE; +CREATE TABLE app_hidden.outbox ( + id uuid PRIMARY KEY, + aggregate_type TEXT NOT NULL, + aggregate_id TEXT NOT NULL, + message_type TEXT NOT NULL, + segment TEXT, + concurrency TEXT NOT NULL DEFAULT 'sequential', + payload JSONB NOT NULL, + metadata JSONB, + locked_until TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0), + created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(), + processed_at TIMESTAMPTZ, + abandoned_at TIMESTAMPTZ, + started_attempts smallint NOT NULL DEFAULT 0, + finished_attempts smallint NOT NULL DEFAULT 0 +); +ALTER TABLE app_hidden.outbox ADD CONSTRAINT outbox_concurrency_check + CHECK (concurrency IN ('sequential', 'parallel')); + +GRANT SELECT, INSERT, DELETE ON app_hidden.outbox TO :DATABASE_GQL_ROLE; +GRANT UPDATE (locked_until, processed_at, abandoned_at, started_attempts, finished_attempts) ON app_hidden.outbox TO :DATABASE_GQL_ROLE; +GRANT SELECT, INSERT, UPDATE, DELETE ON app_hidden.outbox TO :DB_OWNER; + +-- Create the function to get the next batch of messages from the outbox table. +DROP FUNCTION IF EXISTS app_hidden.next_outbox_messages(integer, integer); +CREATE OR REPLACE FUNCTION app_hidden.next_outbox_messages( + max_size integer, lock_ms integer) + RETURNS SETOF app_hidden.outbox + LANGUAGE 'plpgsql' + +AS $BODY$ +DECLARE + loop_row app_hidden.outbox%ROWTYPE; + message_row app_hidden.outbox%ROWTYPE; + ids uuid[] := '{}'; +BEGIN + + IF max_size < 1 THEN + RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR'; + END IF; + + -- get (only) the oldest message of every segment but only return it if it is not locked + FOR loop_row IN + SELECT * FROM app_hidden.outbox m WHERE m.id in (SELECT DISTINCT ON (segment) id + FROM app_hidden.outbox + WHERE processed_at IS NULL AND abandoned_at IS NULL + ORDER BY segment, created_at) order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.outbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + IF message_row.locked_until > NOW() THEN + CONTINUE; + END IF; + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + + -- if max_size not reached: get the oldest parallelizable message independent of segment + IF cardinality(ids) < max_size THEN + FOR loop_row IN + SELECT * FROM app_hidden.outbox + WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW() + AND id NOT IN (SELECT UNNEST(ids)) + order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.outbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + END IF; + + -- set a short lock value so the the workers can each process a message + IF cardinality(ids) > 0 THEN + + RETURN QUERY + UPDATE app_hidden.outbox + SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1 + WHERE ID = ANY(ids) + RETURNING *; + + END IF; +END; +$BODY$; + +SELECT ax_define.define_index('segment', 'outbox', 'app_hidden'); +SELECT ax_define.define_index('created_at', 'outbox', 'app_hidden'); +SELECT ax_define.define_index('processed_at', 'outbox', 'app_hidden'); +SELECT ax_define.define_index('abandoned_at', 'outbox', 'app_hidden'); +SELECT ax_define.define_index('locked_until', 'outbox', 'app_hidden'); + +-- ____ _ _ ___ ___ __ __ +-- |_ _|| \| || _ ) / _ \ \ \/ / +-- | | | . || _ \| (_) | > < +-- |___||_|\_||___/ \___/ /_/\_\ + +DROP TABLE IF EXISTS app_hidden.inbox CASCADE; +CREATE TABLE app_hidden.inbox ( + id uuid PRIMARY KEY, + aggregate_type TEXT NOT NULL, + aggregate_id TEXT NOT NULL, + message_type TEXT NOT NULL, + segment TEXT, + concurrency TEXT NOT NULL DEFAULT 'sequential', + payload JSONB NOT NULL, + metadata JSONB, + locked_until TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0), + created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(), + processed_at TIMESTAMPTZ, + abandoned_at TIMESTAMPTZ, + started_attempts smallint NOT NULL DEFAULT 0, + finished_attempts smallint NOT NULL DEFAULT 0 +); +ALTER TABLE app_hidden.inbox ADD CONSTRAINT inbox_concurrency_check + CHECK (concurrency IN ('sequential', 'parallel')); + +GRANT SELECT, INSERT, DELETE ON app_hidden.inbox TO :DATABASE_GQL_ROLE; +GRANT UPDATE (locked_until, processed_at, abandoned_at, started_attempts, finished_attempts) ON app_hidden.inbox TO :DATABASE_GQL_ROLE; +GRANT SELECT, INSERT, UPDATE, DELETE ON app_hidden.inbox TO :DB_OWNER; + + +-- Create the function to get the next batch of messages from the inbox table. +DROP FUNCTION IF EXISTS app_hidden.next_inbox_messages(integer, integer); +CREATE OR REPLACE FUNCTION app_hidden.next_inbox_messages( + max_size integer, lock_ms integer) + RETURNS SETOF app_hidden.inbox + LANGUAGE 'plpgsql' + +AS $BODY$ +DECLARE + loop_row app_hidden.inbox%ROWTYPE; + message_row app_hidden.inbox%ROWTYPE; + ids uuid[] := '{}'; +BEGIN + + IF max_size < 1 THEN + RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR'; + END IF; + + -- get (only) the oldest message of every segment but only return it if it is not locked + FOR loop_row IN + SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id + FROM app_hidden.inbox + WHERE processed_at IS NULL AND abandoned_at IS NULL + ORDER BY segment, created_at) order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.inbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + IF message_row.locked_until > NOW() THEN + CONTINUE; + END IF; + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + + -- if max_size not reached: get the oldest parallelizable message independent of segment + IF cardinality(ids) < max_size THEN + FOR loop_row IN + SELECT * FROM app_hidden.inbox + WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW() + AND id NOT IN (SELECT UNNEST(ids)) + order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.inbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + END IF; + + -- set a short lock value so the the workers can each process a message + IF cardinality(ids) > 0 THEN + + RETURN QUERY + UPDATE app_hidden.inbox + SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1 + WHERE ID = ANY(ids) + RETURNING *; + + END IF; +END; +$BODY$; + +SELECT ax_define.define_index('segment', 'inbox', 'app_hidden'); +SELECT ax_define.define_index('created_at', 'inbox', 'app_hidden'); +SELECT ax_define.define_index('processed_at', 'inbox', 'app_hidden'); +SELECT ax_define.define_index('abandoned_at', 'inbox', 'app_hidden'); +SELECT ax_define.define_index('locked_until', 'inbox', 'app_hidden'); diff --git a/services/entitlement/service/package.json b/services/entitlement/service/package.json index 2283755a..241677ef 100644 --- a/services/entitlement/service/package.json +++ b/services/entitlement/service/package.json @@ -47,12 +47,13 @@ }, "dependencies": { "@axinom/mosaic-db-common": "0.34.0-rc.0", + "@axinom/mosaic-graphql-common": "0.10.0-rc.0", "@axinom/mosaic-id-guard": "0.29.0-rc.0", "@axinom/mosaic-id-utils": "0.15.14-rc.0", "@axinom/mosaic-message-bus": "0.24.0-rc.0", "@axinom/mosaic-messages": "0.40.0-rc.0", "@axinom/mosaic-service-common": "0.46.0-rc.0", - "@axinom/mosaic-graphql-common": "0.10.0-rc.0", + "@axinom/mosaic-transactional-inbox-outbox": "0.6.0-rc.0", "ajv": "^7.2.4", "ajv-formats": "^1.6.1", "@graphile-contrib/pg-simplify-inflector": "^6.1.0", @@ -72,6 +73,7 @@ "jsonwebtoken": "^9.0.0", "node-fetch": "^2.6.11", "pg": "^8.11.0", + "pg-transactional-outbox": "0.5.1", "pluralize": "^7.0.0", "postgraphile": "4.13.0", "postgraphile-plugin-atomic-mutations": "^1.0.4", @@ -112,4 +114,4 @@ "ts-node": "^10.9.1", "tsc-watch": "^4.6.2" } -} \ No newline at end of file +} diff --git a/services/entitlement/service/scripts/codegen-zapatos.ts b/services/entitlement/service/scripts/codegen-zapatos.ts index a0ae4e0e..30d764a6 100644 --- a/services/entitlement/service/scripts/codegen-zapatos.ts +++ b/services/entitlement/service/scripts/codegen-zapatos.ts @@ -30,7 +30,7 @@ async function main(): Promise { schemas: { app_public: { include: '*', exclude: [] }, ax_utils: { include: '*', exclude: [] }, - app_hidden: { include: '*', exclude: [] }, + app_hidden: { include: '*', exclude: ['inbox', 'outbox'] }, // excluding the 'inbox' and 'outbox' storages app_private: { include: '*', exclude: [] }, public: { include: '*', exclude: [] }, }, diff --git a/services/entitlement/service/src/domains/monetization/sync-claim-definitions.ts b/services/entitlement/service/src/domains/monetization/sync-claim-definitions.ts index 664ecbb9..e02fc272 100644 --- a/services/entitlement/service/src/domains/monetization/sync-claim-definitions.ts +++ b/services/entitlement/service/src/domains/monetization/sync-claim-definitions.ts @@ -1,28 +1,34 @@ -import { Broker } from '@axinom/mosaic-message-bus'; +import { OwnerPgPool } from '@axinom/mosaic-db-common'; import { MonetizationGrantsServiceMultiTenantMessagingSettings, SynchronizeClaimDefinitionsCommand, } from '@axinom/mosaic-messages'; +import { StoreOutboxMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { Config, requestServiceAccountToken } from '../../common'; import { claimDefinitionGroups } from './claim-definition-groups'; export const syncClaimDefinitions = async ( - broker: Broker, + storeOutboxMessage: StoreOutboxMessage, + loginPool: OwnerPgPool, config: Config, ): Promise => { const serviceAccountToken = await requestServiceAccountToken(config); const settings = MonetizationGrantsServiceMultiTenantMessagingSettings.SynchronizeClaimDefinitions; - await broker.publish( + // for a single query Postgres creates a single transaction - no need to manually create one + await storeOutboxMessage( config.environmentId, settings, { claim_definition_groups: claimDefinitionGroups }, - { auth_token: serviceAccountToken.accessToken }, + loginPool, { - routingKey: settings.getEnvironmentRoutingKey({ - tenantId: config.tenantId, - environmentId: config.environmentId, - }), + envelopeOverrides: { auth_token: serviceAccountToken.accessToken }, + options: { + routingKey: settings.getEnvironmentRoutingKey({ + tenantId: config.tenantId, + environmentId: config.environmentId, + }), + }, }, ); }; diff --git a/services/entitlement/service/src/generated/db/schema.sql b/services/entitlement/service/src/generated/db/schema.sql index 1328a38a..58cc17eb 100644 --- a/services/entitlement/service/src/generated/db/schema.sql +++ b/services/entitlement/service/src/generated/db/schema.sql @@ -100,6 +100,230 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp" WITH SCHEMA public; COMMENT ON EXTENSION "uuid-ossp" IS 'generate universally unique identifiers (UUIDs)'; +SET default_tablespace = ''; + +SET default_with_oids = false; + +-- +-- Name: inbox; Type: TABLE; Schema: app_hidden; Owner: - +-- + +CREATE TABLE app_hidden.inbox ( + id uuid NOT NULL, + aggregate_type text NOT NULL, + aggregate_id text NOT NULL, + message_type text NOT NULL, + segment text, + concurrency text DEFAULT 'sequential'::text NOT NULL, + payload jsonb NOT NULL, + metadata jsonb, + locked_until timestamp with time zone DEFAULT to_timestamp((0)::double precision) NOT NULL, + created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL, + processed_at timestamp with time zone, + abandoned_at timestamp with time zone, + started_attempts smallint DEFAULT 0 NOT NULL, + finished_attempts smallint DEFAULT 0 NOT NULL, + CONSTRAINT inbox_concurrency_check CHECK ((concurrency = ANY (ARRAY['sequential'::text, 'parallel'::text]))) +); + + +-- +-- Name: next_inbox_messages(integer, integer); Type: FUNCTION; Schema: app_hidden; Owner: - +-- + +CREATE FUNCTION app_hidden.next_inbox_messages(max_size integer, lock_ms integer) RETURNS SETOF app_hidden.inbox + LANGUAGE plpgsql + AS $$ +DECLARE + loop_row app_hidden.inbox%ROWTYPE; + message_row app_hidden.inbox%ROWTYPE; + ids uuid[] := '{}'; +BEGIN + + IF max_size < 1 THEN + RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR'; + END IF; + + -- get (only) the oldest message of every segment but only return it if it is not locked + FOR loop_row IN + SELECT * FROM app_hidden.inbox m WHERE m.id in (SELECT DISTINCT ON (segment) id + FROM app_hidden.inbox + WHERE processed_at IS NULL AND abandoned_at IS NULL + ORDER BY segment, created_at) order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.inbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + IF message_row.locked_until > NOW() THEN + CONTINUE; + END IF; + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + + -- if max_size not reached: get the oldest parallelizable message independent of segment + IF cardinality(ids) < max_size THEN + FOR loop_row IN + SELECT * FROM app_hidden.inbox + WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW() + AND id NOT IN (SELECT UNNEST(ids)) + order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.inbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + END IF; + + -- set a short lock value so the the workers can each process a message + IF cardinality(ids) > 0 THEN + + RETURN QUERY + UPDATE app_hidden.inbox + SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1 + WHERE ID = ANY(ids) + RETURNING *; + + END IF; +END; +$$; + + +-- +-- Name: outbox; Type: TABLE; Schema: app_hidden; Owner: - +-- + +CREATE TABLE app_hidden.outbox ( + id uuid NOT NULL, + aggregate_type text NOT NULL, + aggregate_id text NOT NULL, + message_type text NOT NULL, + segment text, + concurrency text DEFAULT 'sequential'::text NOT NULL, + payload jsonb NOT NULL, + metadata jsonb, + locked_until timestamp with time zone DEFAULT to_timestamp((0)::double precision) NOT NULL, + created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL, + processed_at timestamp with time zone, + abandoned_at timestamp with time zone, + started_attempts smallint DEFAULT 0 NOT NULL, + finished_attempts smallint DEFAULT 0 NOT NULL, + CONSTRAINT outbox_concurrency_check CHECK ((concurrency = ANY (ARRAY['sequential'::text, 'parallel'::text]))) +); + + +-- +-- Name: next_outbox_messages(integer, integer); Type: FUNCTION; Schema: app_hidden; Owner: - +-- + +CREATE FUNCTION app_hidden.next_outbox_messages(max_size integer, lock_ms integer) RETURNS SETOF app_hidden.outbox + LANGUAGE plpgsql + AS $$ +DECLARE + loop_row app_hidden.outbox%ROWTYPE; + message_row app_hidden.outbox%ROWTYPE; + ids uuid[] := '{}'; +BEGIN + + IF max_size < 1 THEN + RAISE EXCEPTION 'The max_size for the next messages batch must be at least one.' using errcode = 'MAXNR'; + END IF; + + -- get (only) the oldest message of every segment but only return it if it is not locked + FOR loop_row IN + SELECT * FROM app_hidden.outbox m WHERE m.id in (SELECT DISTINCT ON (segment) id + FROM app_hidden.outbox + WHERE processed_at IS NULL AND abandoned_at IS NULL + ORDER BY segment, created_at) order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.outbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + IF message_row.locked_until > NOW() THEN + CONTINUE; + END IF; + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + + -- if max_size not reached: get the oldest parallelizable message independent of segment + IF cardinality(ids) < max_size THEN + FOR loop_row IN + SELECT * FROM app_hidden.outbox + WHERE concurrency = 'parallel' AND processed_at IS NULL AND abandoned_at IS NULL AND locked_until < NOW() + AND id NOT IN (SELECT UNNEST(ids)) + order by created_at + LOOP + BEGIN + EXIT WHEN cardinality(ids) >= max_size; + + SELECT * + INTO message_row + FROM app_hidden.outbox + WHERE id = loop_row.id + FOR NO KEY UPDATE NOWAIT; -- throw/catch error when locked + + ids := array_append(ids, message_row.id); + EXCEPTION + WHEN lock_not_available THEN + CONTINUE; + WHEN serialization_failure THEN + CONTINUE; + END; + END LOOP; + END IF; + + -- set a short lock value so the the workers can each process a message + IF cardinality(ids) > 0 THEN + + RETURN QUERY + UPDATE app_hidden.outbox + SET locked_until = clock_timestamp() + (lock_ms || ' milliseconds')::INTERVAL, started_attempts = started_attempts + 1 + WHERE ID = ANY(ids) + RETURNING *; + + END IF; +END; +$$; + + -- -- Name: column_exists(text, text, text); Type: FUNCTION; Schema: ax_define; Owner: - -- @@ -1603,10 +1827,6 @@ end; $$; -SET default_tablespace = ''; - -SET default_with_oids = false; - -- -- Name: claim_sets; Type: TABLE; Schema: app_hidden; Owner: - -- @@ -1658,6 +1878,22 @@ ALTER TABLE ONLY app_hidden.claim_sets ADD CONSTRAINT claim_sets_pkey PRIMARY KEY (key); +-- +-- Name: inbox inbox_pkey; Type: CONSTRAINT; Schema: app_hidden; Owner: - +-- + +ALTER TABLE ONLY app_hidden.inbox + ADD CONSTRAINT inbox_pkey PRIMARY KEY (id); + + +-- +-- Name: outbox outbox_pkey; Type: CONSTRAINT; Schema: app_hidden; Owner: - +-- + +ALTER TABLE ONLY app_hidden.outbox + ADD CONSTRAINT outbox_pkey PRIMARY KEY (id); + + -- -- Name: subscription_plans subscription_plans_pkey; Type: CONSTRAINT; Schema: app_hidden; Owner: - -- @@ -1674,6 +1910,76 @@ ALTER TABLE ONLY app_private.messaging_counter ADD CONSTRAINT messaging_counter_pkey PRIMARY KEY (key); +-- +-- Name: idx_inbox_abandoned_at; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_inbox_abandoned_at ON app_hidden.inbox USING btree (abandoned_at); + + +-- +-- Name: idx_inbox_created_at; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_inbox_created_at ON app_hidden.inbox USING btree (created_at); + + +-- +-- Name: idx_inbox_locked_until; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_inbox_locked_until ON app_hidden.inbox USING btree (locked_until); + + +-- +-- Name: idx_inbox_processed_at; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_inbox_processed_at ON app_hidden.inbox USING btree (processed_at); + + +-- +-- Name: idx_inbox_segment; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_inbox_segment ON app_hidden.inbox USING btree (segment); + + +-- +-- Name: idx_outbox_abandoned_at; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_outbox_abandoned_at ON app_hidden.outbox USING btree (abandoned_at); + + +-- +-- Name: idx_outbox_created_at; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_outbox_created_at ON app_hidden.outbox USING btree (created_at); + + +-- +-- Name: idx_outbox_locked_until; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_outbox_locked_until ON app_hidden.outbox USING btree (locked_until); + + +-- +-- Name: idx_outbox_processed_at; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_outbox_processed_at ON app_hidden.outbox USING btree (processed_at); + + +-- +-- Name: idx_outbox_segment; Type: INDEX; Schema: app_hidden; Owner: - +-- + +CREATE INDEX idx_outbox_segment ON app_hidden.outbox USING btree (segment); + + -- -- Name: idx_subscription_plans_claim_set_keys; Type: INDEX; Schema: app_hidden; Owner: - -- @@ -1738,6 +2044,106 @@ GRANT USAGE ON SCHEMA ax_define TO entitlement_service_gql_role; GRANT USAGE ON SCHEMA ax_utils TO entitlement_service_gql_role; +-- +-- Name: TABLE inbox; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT SELECT,INSERT,DELETE ON TABLE app_hidden.inbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN inbox.locked_until; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(locked_until) ON TABLE app_hidden.inbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN inbox.processed_at; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(processed_at) ON TABLE app_hidden.inbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN inbox.abandoned_at; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(abandoned_at) ON TABLE app_hidden.inbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN inbox.started_attempts; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(started_attempts) ON TABLE app_hidden.inbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN inbox.finished_attempts; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(finished_attempts) ON TABLE app_hidden.inbox TO entitlement_service_gql_role; + + +-- +-- Name: FUNCTION next_inbox_messages(max_size integer, lock_ms integer); Type: ACL; Schema: app_hidden; Owner: - +-- + +REVOKE ALL ON FUNCTION app_hidden.next_inbox_messages(max_size integer, lock_ms integer) FROM PUBLIC; +GRANT ALL ON FUNCTION app_hidden.next_inbox_messages(max_size integer, lock_ms integer) TO entitlement_service_gql_role; + + +-- +-- Name: TABLE outbox; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT SELECT,INSERT,DELETE ON TABLE app_hidden.outbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN outbox.locked_until; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(locked_until) ON TABLE app_hidden.outbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN outbox.processed_at; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(processed_at) ON TABLE app_hidden.outbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN outbox.abandoned_at; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(abandoned_at) ON TABLE app_hidden.outbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN outbox.started_attempts; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(started_attempts) ON TABLE app_hidden.outbox TO entitlement_service_gql_role; + + +-- +-- Name: COLUMN outbox.finished_attempts; Type: ACL; Schema: app_hidden; Owner: - +-- + +GRANT UPDATE(finished_attempts) ON TABLE app_hidden.outbox TO entitlement_service_gql_role; + + +-- +-- Name: FUNCTION next_outbox_messages(max_size integer, lock_ms integer); Type: ACL; Schema: app_hidden; Owner: - +-- + +REVOKE ALL ON FUNCTION app_hidden.next_outbox_messages(max_size integer, lock_ms integer) FROM PUBLIC; +GRANT ALL ON FUNCTION app_hidden.next_outbox_messages(max_size integer, lock_ms integer) TO entitlement_service_gql_role; + + -- -- Name: FUNCTION column_exists(columnname text, tablename text, schemaname text); Type: ACL; Schema: ax_define; Owner: - -- diff --git a/services/entitlement/service/src/generated/graphql/schema.graphql b/services/entitlement/service/src/generated/graphql/schema.graphql index fca760c1..a249d2c4 100644 --- a/services/entitlement/service/src/generated/graphql/schema.graphql +++ b/services/entitlement/service/src/generated/graphql/schema.graphql @@ -72,6 +72,9 @@ enum ErrorCodesEnum { """A database operation has failed because of a lock timeout.""" DATABASE_LOCK_TIMEOUT_ERROR + """Unexpected null or undefined value received.""" + UNEXPECTED_NULL_UNDEFINED + """ The webhook message was generated too long ago (%s seconds) and should not be accepted anymore for security reasons. """ @@ -183,6 +186,14 @@ enum ErrorCodesEnum { """ AUTHORIZATION_OPTIONS_MISCONFIGURED + """The subject has no permissions.""" + UNAUTHORIZED + + """ + User is authenticated, but subject information was not found. Please contact Axinom Support. + """ + SUBJECT_NOT_FOUND + """ Error(s) occurred while trying to retrieve the %s with ID '%s' from the catalog service. Please contact the service support. """ diff --git a/services/entitlement/service/src/generated/messaging/rascal-schema.json b/services/entitlement/service/src/generated/messaging/rascal-schema.json index f574a93b..9a750bff 100644 --- a/services/entitlement/service/src/generated/messaging/rascal-schema.json +++ b/services/entitlement/service/src/generated/messaging/rascal-schema.json @@ -52,34 +52,7 @@ } } }, - "entitlement-service:claim_definitions:synchronize_finished": { - "options": { - "arguments": { - "x-dead-letter-exchange": "dead_letter", - "x-dead-letter-routing-key": "entitlement-service.dead_letter", - "x-queue-type": "quorum" - } - } - }, - "entitlement-service:claim_definitions:synchronize_failed": { - "options": { - "arguments": { - "x-dead-letter-exchange": "dead_letter", - "x-dead-letter-routing-key": "entitlement-service.dead_letter", - "x-queue-type": "quorum" - } - } - }, - "entitlement-service:claim_set:published": { - "options": { - "arguments": { - "x-dead-letter-exchange": "dead_letter", - "x-dead-letter-routing-key": "entitlement-service.dead_letter", - "x-queue-type": "quorum" - } - } - }, - "entitlement-service:subscription_plan:published": { + "entitlement-service:inbox": { "options": { "arguments": { "x-dead-letter-exchange": "dead_letter", @@ -110,45 +83,30 @@ "bindingKey": "ax-monetization-grants-service:claim_definitions:synchronize.#", "destination": "ax-monetization-grants-service:claim_definitions:synchronize" }, - "retry:entitlement-service:claim_definitions:synchronize_finished": { + "retry:entitlement-service:inbox": { "source": "retry", - "bindingKey": "entitlement-service:claim_definitions:synchronize_finished.#", - "destination": "entitlement-service:claim_definitions:synchronize_finished" + "bindingKey": "entitlement-service:inbox.#", + "destination": "entitlement-service:inbox" }, "ax-monetization-grants-service.*.*.claim_definitions.synchronize_finished": { "source": "event", "bindingKey": "ax-monetization-grants-service.*.*.claim_definitions.synchronize_finished", - "destination": "entitlement-service:claim_definitions:synchronize_finished" - }, - "retry:entitlement-service:claim_definitions:synchronize_failed": { - "source": "retry", - "bindingKey": "entitlement-service:claim_definitions:synchronize_failed.#", - "destination": "entitlement-service:claim_definitions:synchronize_failed" + "destination": "entitlement-service:inbox" }, "ax-monetization-grants-service.*.*.claim_definitions.synchronize_failed": { "source": "event", "bindingKey": "ax-monetization-grants-service.*.*.claim_definitions.synchronize_failed", - "destination": "entitlement-service:claim_definitions:synchronize_failed" - }, - "retry:entitlement-service:claim_set:published": { - "source": "retry", - "bindingKey": "entitlement-service:claim_set:published.#", - "destination": "entitlement-service:claim_set:published" + "destination": "entitlement-service:inbox" }, "ax-monetization-grants-service.*.*.claim_set.published": { "source": "event", "bindingKey": "ax-monetization-grants-service.*.*.claim_set.published", - "destination": "entitlement-service:claim_set:published" - }, - "retry:entitlement-service:subscription_plan:published": { - "source": "retry", - "bindingKey": "entitlement-service:subscription_plan:published.#", - "destination": "entitlement-service:subscription_plan:published" + "destination": "entitlement-service:inbox" }, "ax-subscription-monetization-service.*.*.subscription_plan.published": { "source": "event", "bindingKey": "ax-subscription-monetization-service.*.*.subscription_plan.published", - "destination": "entitlement-service:subscription_plan:published" + "destination": "entitlement-service:inbox" } }, "publications": { @@ -174,35 +132,8 @@ } }, "subscriptions": { - "SynchronizeClaimDefinitionsFinished": { - "queue": "entitlement-service:claim_definitions:synchronize_finished", - "prefetch": 10, - "recovery": "deferred_retry", - "redeliveries": { - "limit": 5, - "counter": "mosaic" - } - }, - "SynchronizeClaimDefinitionsFailed": { - "queue": "entitlement-service:claim_definitions:synchronize_failed", - "prefetch": 10, - "recovery": "deferred_retry", - "redeliveries": { - "limit": 5, - "counter": "mosaic" - } - }, - "ClaimSetPublished": { - "queue": "entitlement-service:claim_set:published", - "prefetch": 10, - "recovery": "deferred_retry", - "redeliveries": { - "limit": 5, - "counter": "mosaic" - } - }, - "SubscriptionPlanPublished": { - "queue": "entitlement-service:subscription_plan:published", + "Inbox": { + "queue": "entitlement-service:inbox", "prefetch": 10, "recovery": "deferred_retry", "redeliveries": { diff --git a/services/entitlement/service/src/index.ts b/services/entitlement/service/src/index.ts index 8baeb4c0..4c685418 100644 --- a/services/entitlement/service/src/index.ts +++ b/services/entitlement/service/src/index.ts @@ -1,8 +1,6 @@ import { createPostgresPoolConnectivityMetric, getLoginPgPool, - getOwnerPgPool, - initMessagingCounter, setupLoginPgPool, setupOwnerPgPool, } from '@axinom/mosaic-db-common'; @@ -11,10 +9,7 @@ import { IdGuardErrors, setupEndUserAuthentication, } from '@axinom/mosaic-id-guard'; -import { - createRabbitMQConnectivityMetric, - setupMessagingBroker, -} from '@axinom/mosaic-message-bus'; +import { createRabbitMQConnectivityMetric } from '@axinom/mosaic-message-bus'; import { closeHttpServer, handleGlobalErrors, @@ -41,7 +36,6 @@ import { syncClaimDefinitions, } from './domains'; import { setupPostGraphile } from './graphql/postgraphile-middleware'; -import { getMessagingMiddleware } from './messaging'; import { registerMessaging } from './messaging/register-messaging'; import { updateGeoDatabase } from './update-geo-database'; @@ -73,7 +67,7 @@ async function bootstrap(): Promise { const shutdownActions = setupShutdownActions(app, logger); const poolConfig: PoolConfig = { max: config.pgPoolMaxConnections }; - setupOwnerPgPool( + const ownerPgPool = setupOwnerPgPool( app, config.dbOwnerConnectionString, logger, @@ -88,17 +82,13 @@ async function bootstrap(): Promise { poolConfig, ); - const counter = initMessagingCounter(getOwnerPgPool(app)); - const broker = await setupMessagingBroker({ + // Configure messaging: subscribe to topics, create queues, register handlers, start transactional outbox/inbox listeners + const { broker, storeOutboxMessage } = await registerMessaging( app, + ownerPgPool, config, - builders: registerMessaging(app, config), - logger, shutdownActions, - onMessageMiddleware: getMessagingMiddleware(config, logger), - components: { counters: { postgresCounter: counter } }, - rascalConfigExportPath: './src/generated/messaging/rascal-schema.json', - }); + ); setupMonitoring(config, { metrics: [ @@ -107,7 +97,7 @@ async function bootstrap(): Promise { ], }); - await syncClaimDefinitions(broker, config); + await syncClaimDefinitions(storeOutboxMessage, ownerPgPool, config); const authConfig: AuthenticationConfig = { tenantId: config.tenantId, diff --git a/services/entitlement/service/src/messaging/handlers/claim-set-published-handler.db.spec.ts b/services/entitlement/service/src/messaging/handlers/claim-set-published-handler.db.spec.ts index 175a86f6..dc8a80a1 100644 --- a/services/entitlement/service/src/messaging/handlers/claim-set-published-handler.db.spec.ts +++ b/services/entitlement/service/src/messaging/handlers/claim-set-published-handler.db.spec.ts @@ -2,7 +2,9 @@ import { AuthenticatedManagementSubject, AuthenticatedManagementSubjectMessageInfo, } from '@axinom/mosaic-id-guard'; +import { ClaimSetPublishedEvent } from '@axinom/mosaic-messages'; import { rejectionOf } from '@axinom/mosaic-service-common'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { stub } from 'jest-auto-stub'; import 'jest-extended'; import { all, insert, select } from 'zapatos/db'; @@ -21,7 +23,7 @@ describe('ClaimSetPublishedHandler', () => { beforeAll(async () => { ctx = await createTestContext(); - handler = new ClaimSetPublishedHandler(ctx.ownerPool, ctx.config); + handler = new ClaimSetPublishedHandler(ctx.config); user = createTestUser({ name: 'Monetization Admin' }); message = stub({ envelope: { @@ -42,16 +44,20 @@ describe('ClaimSetPublishedHandler', () => { }); it('new claim set received - claim set saved', async () => { - // Act - await handler.onMessage( - { + // Arrange + const message = { + payload: { key: 'PREMIUM', title: 'Premium', claims: ['ENABLE_VIDEOS_DOWNLOAD'], custom_unsupported_property: 'control case to make sure create/update works as expected even with updated message.', - } as any, - message, + }, + } as unknown as TypedTransactionalMessage; + + // Act + await ctx.executeOwnerSql(user, async (dbCtx) => + handler.handleMessage(message, dbCtx), ); // Assert @@ -78,18 +84,20 @@ describe('ClaimSetPublishedHandler', () => { description: 'Desc 1', claims: ['ENTITY_TYPE_MOVIES'], }).run(ctx.ownerPool); - - // Act - await handler.onMessage( - { + const message = { + payload: { key: 'BASIC', title: 'Default', description: 'Desc 2', claims: ['ENTITY_TYPE_EPISODES'], custom_unsupported_property: 'control case to make sure create/update works as expected even with updated message.', - } as any, - message, + }, + } as unknown as TypedTransactionalMessage; + + // Act + await ctx.executeOwnerSql(user, async (dbCtx) => + handler.handleMessage(message, dbCtx), ); // Assert @@ -110,26 +118,30 @@ describe('ClaimSetPublishedHandler', () => { it('claim set with invalid claim - error thrown', async () => { // Arrange - const content = { - key: 'PREMIUM', - title: 'Premium', - claims: [ - 'ENTITY_TYPE_MOVIES', - 'ENTITY_TYPE_EPISODES', - 'TEST_VALUE', - 'ENABLE_VIDEOS_DOWNLOAD', - ], - }; + const message = { + payload: { + key: 'PREMIUM', + title: 'Premium', + claims: [ + 'ENTITY_TYPE_MOVIES', + 'ENTITY_TYPE_EPISODES', + 'TEST_VALUE', + 'ENABLE_VIDEOS_DOWNLOAD', + ], + }, + } as unknown as TypedTransactionalMessage; // Act - const error = await rejectionOf(handler.onMessage(content, message)); + const error = await ctx.executeOwnerSql(user, async (dbCtx) => { + return rejectionOf(handler.handleMessage(message, dbCtx)); + }); // Assert expect(error.message).toEqual( 'Unable to create or update claims set, because it contains invalid claims.', ); expect(error.details).toEqual({ - content, + payload: message.payload, invalidClaims: ['TEST_VALUE'], }); diff --git a/services/entitlement/service/src/messaging/handlers/claim-set-published-handler.ts b/services/entitlement/service/src/messaging/handlers/claim-set-published-handler.ts index 9dcc9ffb..6ea589a0 100644 --- a/services/entitlement/service/src/messaging/handlers/claim-set-published-handler.ts +++ b/services/entitlement/service/src/messaging/handlers/claim-set-published-handler.ts @@ -1,46 +1,42 @@ -import { OwnerPgPool, transactionWithContext } from '@axinom/mosaic-db-common'; -import { MessageInfo } from '@axinom/mosaic-message-bus'; import { ClaimSetPublishedEvent, MonetizationGrantsServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; -import { difference, MosaicError } from '@axinom/mosaic-service-common'; -import { IsolationLevel, upsert } from 'zapatos/db'; +import { difference, Logger, MosaicError } from '@axinom/mosaic-service-common'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { ClientBase } from 'pg'; +import { upsert } from 'zapatos/db'; import { CommonErrors, Config } from '../../common'; import { validClaims } from '../../domains'; -import { EntitlementAuthenticatedMessageHandler } from './entitlement-authenticated-message-handler'; +import { EntitlementAuthenticatedTransactionalMessageHandler } from './entitlement-authenticated-message-handler'; -export class ClaimSetPublishedHandler extends EntitlementAuthenticatedMessageHandler { - constructor(private readonly ownerPool: OwnerPgPool, config: Config) { +export class ClaimSetPublishedHandler extends EntitlementAuthenticatedTransactionalMessageHandler { + constructor(config: Config) { super( - MonetizationGrantsServiceMultiTenantMessagingSettings.ClaimSetPublished - .messageType, + MonetizationGrantsServiceMultiTenantMessagingSettings.ClaimSetPublished, + new Logger({ + config, + context: ClaimSetPublishedHandler.name, + }), config, ); } - async onMessage( - content: ClaimSetPublishedEvent, - message: MessageInfo, + override async handleMessage( + { payload }: TypedTransactionalMessage, + ownerClient: ClientBase, ): Promise { - const invalidClaims = difference(content.claims, validClaims); + const invalidClaims = difference(payload.claims, validClaims); if (invalidClaims.length > 0) { throw new MosaicError({ ...CommonErrors.InvalidClaimsInClaimSet, - details: { invalidClaims, content }, + details: { invalidClaims, payload }, }); } - const { key, title, description, claims } = content; - await transactionWithContext( - this.ownerPool, - IsolationLevel.Serializable, - this.getPgSettings(message), - async (ctx) => { - await upsert('claim_sets', { key, title, description, claims }, [ - 'key', - ]).run(ctx); - }, - ); + const { key, title, description, claims } = payload; + await upsert('claim_sets', { key, title, description, claims }, [ + 'key', + ]).run(ownerClient); } } diff --git a/services/entitlement/service/src/messaging/handlers/claim-set-unpublished-handler.db.spec.ts b/services/entitlement/service/src/messaging/handlers/claim-set-unpublished-handler.db.spec.ts index d8bb4ff6..bce9c4af 100644 --- a/services/entitlement/service/src/messaging/handlers/claim-set-unpublished-handler.db.spec.ts +++ b/services/entitlement/service/src/messaging/handlers/claim-set-unpublished-handler.db.spec.ts @@ -1,16 +1,25 @@ +import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; +import { ClaimSetUnpublishedEvent } from '@axinom/mosaic-messages'; import { rejectionOf } from '@axinom/mosaic-service-common'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import 'jest-extended'; import { all, insert, select } from 'zapatos/db'; -import { createTestContext, ITestContext } from '../../tests/test-utils'; +import { + createTestContext, + createTestUser, + ITestContext, +} from '../../tests/test-utils'; import { ClaimSetUnpublishedHandler } from './claim-set-unpublished-handler'; describe('ClaimSetUnpublishedHandler', () => { let ctx: ITestContext; let handler: ClaimSetUnpublishedHandler; + let user: AuthenticatedManagementSubject; beforeAll(async () => { ctx = await createTestContext(); - handler = new ClaimSetUnpublishedHandler(ctx.ownerPool, ctx.config); + handler = new ClaimSetUnpublishedHandler(ctx.config); + user = createTestUser({ name: 'Monetization Admin' }); }); beforeEach(async () => { @@ -40,9 +49,14 @@ describe('ClaimSetUnpublishedHandler', () => { description: 'Premium Plan ...', claim_set_keys: ['PREMIUM'], }).run(ctx.ownerPool); + const message = { + payload: { key: 'BASIC' }, + } as unknown as TypedTransactionalMessage; // Act - await handler.onMessage({ key: 'BASIC' }); + await ctx.executeOwnerSql(user, async (dbCtx) => + handler.handleMessage(message, dbCtx), + ); // Assert const claimSets = await select('claim_sets', all).run(ctx.ownerPool); @@ -51,29 +65,34 @@ describe('ClaimSetUnpublishedHandler', () => { it('message for existing claim set received with existing related subscription plan - error thrown', async () => { // Arrange - const content = { key: 'BASIC' }; + const payload = { key: 'BASIC' }; const subPlan = await insert('subscription_plans', { id: '79320a3d-403d-46ea-83f4-1c8b5d4a3873', title: 'Basic Plan', description: 'Basic Plan ...', - claim_set_keys: [content.key], + claim_set_keys: [payload.key], }).run(ctx.ownerPool); + const message = { + payload, + } as unknown as TypedTransactionalMessage; // Act - const error = await rejectionOf(handler.onMessage(content)); + const error = await ctx.executeOwnerSql(user, async (dbCtx) => { + return rejectionOf(handler.handleMessage(message, dbCtx)); + }); // Assert expect(error.message).toEqual( 'Unable to unpublish the claim set, because it is used by 1 published subscription plan(s).', ); expect(error.details).toEqual({ - content, + payload, relatedSubscriptionPlanIds: [subPlan.id], }); const claimSets = await select('claim_sets', all, { columns: ['key'] }).run( ctx.ownerPool, ); - expect(claimSets).toEqual([content]); + expect(claimSets).toEqual([payload]); }); }); diff --git a/services/entitlement/service/src/messaging/handlers/claim-set-unpublished-handler.ts b/services/entitlement/service/src/messaging/handlers/claim-set-unpublished-handler.ts index 15f97f73..f0b8d228 100644 --- a/services/entitlement/service/src/messaging/handlers/claim-set-unpublished-handler.ts +++ b/services/entitlement/service/src/messaging/handlers/claim-set-unpublished-handler.ts @@ -1,37 +1,44 @@ -import { OwnerPgPool } from '@axinom/mosaic-db-common'; import { ClaimSetUnpublishedEvent, MonetizationGrantsServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; -import { MosaicError } from '@axinom/mosaic-service-common'; +import { Logger, MosaicError } from '@axinom/mosaic-service-common'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { ClientBase } from 'pg'; import { deletes, param, select, self as value, SQL, sql } from 'zapatos/db'; import { CommonErrors, Config } from '../../common'; -import { EntitlementAuthenticatedMessageHandler } from './entitlement-authenticated-message-handler'; +import { EntitlementAuthenticatedTransactionalMessageHandler } from './entitlement-authenticated-message-handler'; -export class ClaimSetUnpublishedHandler extends EntitlementAuthenticatedMessageHandler { - constructor(private readonly ownerPool: OwnerPgPool, config: Config) { +export class ClaimSetUnpublishedHandler extends EntitlementAuthenticatedTransactionalMessageHandler { + constructor(config: Config) { super( - MonetizationGrantsServiceMultiTenantMessagingSettings.ClaimSetUnpublished - .messageType, + MonetizationGrantsServiceMultiTenantMessagingSettings.ClaimSetUnpublished, + new Logger({ + config, + context: ClaimSetUnpublishedHandler.name, + }), config, ); } - async onMessage(content: ClaimSetUnpublishedEvent): Promise { + override async handleMessage( + { payload }: TypedTransactionalMessage, + ownerClient: ClientBase, + ): Promise { const relatedSubPlans = await select('subscription_plans', { - claim_set_keys: sql`${param(content.key)} = ANY(${value})`, - }).run(this.ownerPool); + claim_set_keys: sql`${param(payload.key)} = ANY(${value})`, + }).run(ownerClient); if (relatedSubPlans.length > 0) { throw new MosaicError({ ...CommonErrors.ClaimSetUnpublishError, details: { relatedSubscriptionPlanIds: relatedSubPlans.map((sp) => sp.id), - content, + payload, }, messageParams: [relatedSubPlans.length], }); } - await deletes('claim_sets', content).run(this.ownerPool); + await deletes('claim_sets', payload).run(ownerClient); } } diff --git a/services/entitlement/service/src/messaging/handlers/entitlement-authenticated-message-handler.ts b/services/entitlement/service/src/messaging/handlers/entitlement-authenticated-message-handler.ts index 02fbf4b8..64a47363 100644 --- a/services/entitlement/service/src/messaging/handlers/entitlement-authenticated-message-handler.ts +++ b/services/entitlement/service/src/messaging/handlers/entitlement-authenticated-message-handler.ts @@ -1,48 +1,75 @@ -import { buildAuthPgSettings } from '@axinom/mosaic-db-common'; import { - AuthenticatedManagementSubject, - authenticationMiddleware, - getMessageInfoManagementSubject, + buildAuthPgSettings, + setPgSettingsConfig, +} from '@axinom/mosaic-db-common'; +import { + getAuthenticatedManagementSubject, + GuardedContext, + IdGuardErrors, } from '@axinom/mosaic-id-guard'; +import { MessagingSettings } from '@axinom/mosaic-message-bus-abstractions'; +import { + Logger, + MosaicError, + MosaicErrors, +} from '@axinom/mosaic-service-common'; import { - MessageHandler, - MessageInfo, - OnMessageMiddleware, -} from '@axinom/mosaic-message-bus'; -import { Dict } from '@axinom/mosaic-service-common'; -import { SubscriptionConfig } from 'rascal'; + TransactionalInboxMessageHandler, + TypedTransactionalMessage, +} from '@axinom/mosaic-transactional-inbox-outbox'; +import { DatabaseClient } from 'pg-transactional-outbox'; import { Config } from '../../common'; /** - * Guard a message handler by getting and verifying the JWT token. It changes - * the `MessageInfo` parameter to `AuthenticatedMessageInfo` which contains the - * JWT token subject. - * In addition it checks that the subjects permission match the required ones. + * Abstract message handler to verify permissions of the message producing service */ -export abstract class EntitlementAuthenticatedMessageHandler< - TContent, -> extends MessageHandler { +export abstract class EntitlementAuthenticatedTransactionalMessageHandler< + T, +> extends TransactionalInboxMessageHandler { constructor( - messagingKey: string, - protected readonly config: Config, - overrides?: SubscriptionConfig, - middleware: OnMessageMiddleware[] = [], + messagingSettings: MessagingSettings, + logger: Logger, + config: Config, ) { - super(messagingKey, overrides, [ - ...middleware, - authenticationMiddleware({ - tenantId: config.tenantId, - environmentId: config.environmentId, - authEndpoint: config.idServiceAuthBaseUrl, - }), - ]); - } + const messageProducerServiceId = messagingSettings.serviceId; + if (!messageProducerServiceId) { + throw new MosaicError({ + message: + 'The service ID was not provided for the EntitlementAuthenticatedTransactionalMessageHandler messaging settings.', + code: MosaicErrors.AssertionFailed.code, + }); + } - protected getPgSettings(message: MessageInfo): Dict { - return buildAuthPgSettings(this.getSubject(message), this.config.serviceId); - } + const authWrapper = async ( + message: TypedTransactionalMessage, + dbClient: DatabaseClient, + ): Promise => { + const token = message.metadata.authToken; + if (token === undefined) { + throw new MosaicError(IdGuardErrors.AccessTokenRequired); + } + const subject = await getAuthenticatedManagementSubject(token, { + tenantId: this.config.tenantId, + environmentId: this.config.environmentId, + authEndpoint: this.config.idServiceAuthBaseUrl, + }); + // Check that the message producer had permissions on that service + const subjectPermissions = + subject.permissions?.[messageProducerServiceId]; + if ( + subjectPermissions === undefined || + !Array.isArray(subjectPermissions) + ) { + throw new MosaicError({ + code: IdGuardErrors.Unauthorized.code, + message: `Permission check failed as the subject has no permissions for the ${messageProducerServiceId} service.`, + }); + } + const pgSettings = buildAuthPgSettings(subject, messageProducerServiceId); + await setPgSettingsConfig(pgSettings, dbClient); + return { subject }; + }; - protected getSubject(message: MessageInfo): AuthenticatedManagementSubject { - return getMessageInfoManagementSubject(message); + super(messagingSettings, logger, config, authWrapper); } } diff --git a/services/entitlement/service/src/messaging/handlers/subscription-plan-published-handler.db.spec.ts b/services/entitlement/service/src/messaging/handlers/subscription-plan-published-handler.db.spec.ts index d3f183a5..5f1a75d5 100644 --- a/services/entitlement/service/src/messaging/handlers/subscription-plan-published-handler.db.spec.ts +++ b/services/entitlement/service/src/messaging/handlers/subscription-plan-published-handler.db.spec.ts @@ -1,9 +1,9 @@ +import { AuthenticatedManagementSubject } from '@axinom/mosaic-id-guard'; import { - AuthenticatedManagementSubject, - AuthenticatedManagementSubjectMessageInfo, -} from '@axinom/mosaic-id-guard'; -import { PeriodUnit } from '@axinom/mosaic-messages'; -import { stub } from 'jest-auto-stub'; + PeriodUnit, + SubscriptionPlanPublishedEvent, +} from '@axinom/mosaic-messages'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import 'jest-extended'; import { all, insert, select } from 'zapatos/db'; import { @@ -17,41 +17,35 @@ describe('SubscriptionPlanPublishedHandler', () => { let ctx: ITestContext; let handler: SubscriptionPlanPublishedHandler; let user: AuthenticatedManagementSubject; - let message: AuthenticatedManagementSubjectMessageInfo; - const content = { - id: '79320a3d-403d-46ea-83f4-1c8b5d4a3873', - title: 'PREMIUM_PLAN', - description: 'Premium SubPlan', - is_active: true, - claim_set_keys: ['PREMIUM'], - provider_configs: [{ payment_provider_key: 'PAYPAL' }], - payment_plans: [ - { - id: '62a4b0fd-7873-4964-be72-0521f328ad95', - title: 'Random Payment Plan', - description: 'Some description', - is_active: true, - period_unit: 'YEAR' as PeriodUnit, - period_quantity: 10, - provider_configs: [{ payment_provider_key: 'PAYPAL' }], - prices: [{ country: 'XX', currency: 'XXX', price: 123 }], - }, - ], - custom_unsupported_property: - 'control case to make sure create/update works as expected even with updated message.', - }; + const message = { + payload: { + id: '79320a3d-403d-46ea-83f4-1c8b5d4a3873', + title: 'PREMIUM_PLAN', + description: 'Premium SubPlan', + is_active: true, + claim_set_keys: ['PREMIUM'], + provider_configs: [{ payment_provider_key: 'PAYPAL' }], + payment_plans: [ + { + id: '62a4b0fd-7873-4964-be72-0521f328ad95', + title: 'Random Payment Plan', + description: 'Some description', + is_active: true, + period_unit: 'YEAR' as PeriodUnit, + period_quantity: 10, + provider_configs: [{ payment_provider_key: 'PAYPAL' }], + prices: [{ country: 'XX', currency: 'XXX', price: 123 }], + }, + ], + custom_unsupported_property: + 'control case to make sure create/update works as expected even with updated message.', + }, + } as unknown as TypedTransactionalMessage; beforeAll(async () => { ctx = await createTestContext(); - handler = new SubscriptionPlanPublishedHandler(ctx.ownerPool, ctx.config); + handler = new SubscriptionPlanPublishedHandler(ctx.config); user = createTestUser({ name: 'Monetization Admin' }); - message = stub({ - envelope: { - auth_token: - 'some token value which is not used because we are substituting getSubject method and using a stub user', - }, - subject: user, - }); }); afterEach(async () => { @@ -65,13 +59,15 @@ describe('SubscriptionPlanPublishedHandler', () => { it('new subscription plan received - subscription plan saved', async () => { // Act - await handler.onMessage(content, message); + await ctx.executeOwnerSql(user, async (dbCtx) => + handler.handleMessage(message, dbCtx), + ); // Assert const subscriptionPlans = await select('subscription_plans', all).run( ctx.ownerPool, ); - const { id, title, description, claim_set_keys } = content; + const { id, title, description, claim_set_keys } = message.payload; expect(subscriptionPlans).toMatchObject([ { id, @@ -89,20 +85,22 @@ describe('SubscriptionPlanPublishedHandler', () => { it('existing subscription plan received - subscription plan updated', async () => { // Arrange const created = await insert('subscription_plans', { - id: content.id, + id: message.payload.id, title: 'Basic Plan', description: 'Basic Plan ...', claim_set_keys: ['BASIC'], }).run(ctx.ownerPool); // Act - await handler.onMessage(content, message); + await ctx.executeOwnerSql(user, async (dbCtx) => + handler.handleMessage(message, dbCtx), + ); // Assert const subscriptionPlans = await select('subscription_plans', all).run( ctx.ownerPool, ); - const { id, title, description, claim_set_keys } = content; + const { id, title, description, claim_set_keys } = message.payload; expect(subscriptionPlans).toMatchObject([ { id, diff --git a/services/entitlement/service/src/messaging/handlers/subscription-plan-published-handler.ts b/services/entitlement/service/src/messaging/handlers/subscription-plan-published-handler.ts index cc9d0c92..070f6858 100644 --- a/services/entitlement/service/src/messaging/handlers/subscription-plan-published-handler.ts +++ b/services/entitlement/service/src/messaging/handlers/subscription-plan-published-handler.ts @@ -1,38 +1,36 @@ -import { OwnerPgPool, transactionWithContext } from '@axinom/mosaic-db-common'; -import { MessageInfo } from '@axinom/mosaic-message-bus'; import { SubscriptionMonetizationServiceMultiTenantMessagingSettings, SubscriptionPlanPublishedEvent, } from '@axinom/mosaic-messages'; -import { IsolationLevel, upsert } from 'zapatos/db'; +import { Logger } from '@axinom/mosaic-service-common'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; +import { ClientBase } from 'pg'; +import { upsert } from 'zapatos/db'; import { Config } from '../../common'; -import { EntitlementAuthenticatedMessageHandler } from './entitlement-authenticated-message-handler'; +import { EntitlementAuthenticatedTransactionalMessageHandler } from './entitlement-authenticated-message-handler'; -export class SubscriptionPlanPublishedHandler extends EntitlementAuthenticatedMessageHandler { - constructor(private readonly ownerPool: OwnerPgPool, config: Config) { +export class SubscriptionPlanPublishedHandler extends EntitlementAuthenticatedTransactionalMessageHandler { + constructor(config: Config) { super( - SubscriptionMonetizationServiceMultiTenantMessagingSettings - .SubscriptionPlanPublished.messageType, + SubscriptionMonetizationServiceMultiTenantMessagingSettings.SubscriptionPlanPublished, + new Logger({ + config, + context: SubscriptionPlanPublishedHandler.name, + }), config, ); } - async onMessage( - content: SubscriptionPlanPublishedEvent, - message: MessageInfo, + override async handleMessage( + { + payload: { id, title, description, claim_set_keys }, + }: TypedTransactionalMessage, + ownerClient: ClientBase, ): Promise { - const { id, title, description, claim_set_keys } = content; - await transactionWithContext( - this.ownerPool, - IsolationLevel.Serializable, - this.getPgSettings(message), - async (ctx) => { - await upsert( - 'subscription_plans', - { id, title, description, claim_set_keys }, - ['id'], - ).run(ctx); - }, - ); + await upsert( + 'subscription_plans', + { id, title, description, claim_set_keys }, + ['id'], + ).run(ownerClient); } } diff --git a/services/entitlement/service/src/messaging/handlers/sync-claim-definitions-failed-handler.ts b/services/entitlement/service/src/messaging/handlers/sync-claim-definitions-failed-handler.ts index 2c0a85ac..f94db165 100644 --- a/services/entitlement/service/src/messaging/handlers/sync-claim-definitions-failed-handler.ts +++ b/services/entitlement/service/src/messaging/handlers/sync-claim-definitions-failed-handler.ts @@ -1,30 +1,30 @@ -import { MessageHandler } from '@axinom/mosaic-message-bus'; import { MonetizationGrantsServiceMultiTenantMessagingSettings, SynchronizeClaimDefinitionsFailedEvent, } from '@axinom/mosaic-messages'; import { Logger } from '@axinom/mosaic-service-common'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { Config } from '../../common'; +import { EntitlementAuthenticatedTransactionalMessageHandler } from './entitlement-authenticated-message-handler'; -export class SyncClaimDefinitionsFailedHandler extends MessageHandler { - private logger: Logger; +export class SyncClaimDefinitionsFailedHandler extends EntitlementAuthenticatedTransactionalMessageHandler { constructor(config: Config) { super( - MonetizationGrantsServiceMultiTenantMessagingSettings - .SynchronizeClaimDefinitionsFailed.messageType, - ); - this.logger = new Logger({ + MonetizationGrantsServiceMultiTenantMessagingSettings.SynchronizeClaimDefinitionsFailed, + new Logger({ + config, + context: SyncClaimDefinitionsFailedHandler.name, + }), config, - context: SyncClaimDefinitionsFailedHandler.name, - }); + ); } - async onMessage( - content: SynchronizeClaimDefinitionsFailedEvent, - ): Promise { + override async handleMessage({ + payload, + }: TypedTransactionalMessage): Promise { this.logger.error({ message: 'Claim Definitions synchronization has failed!', - details: { ...content }, + details: { ...payload }, }); } } diff --git a/services/entitlement/service/src/messaging/handlers/sync-claim-definitions-finished-handler.ts b/services/entitlement/service/src/messaging/handlers/sync-claim-definitions-finished-handler.ts index dfad4e6a..92e7cd71 100644 --- a/services/entitlement/service/src/messaging/handlers/sync-claim-definitions-finished-handler.ts +++ b/services/entitlement/service/src/messaging/handlers/sync-claim-definitions-finished-handler.ts @@ -1,30 +1,30 @@ -import { MessageHandler } from '@axinom/mosaic-message-bus'; import { MonetizationGrantsServiceMultiTenantMessagingSettings, SynchronizeClaimDefinitionsFinishedEvent, } from '@axinom/mosaic-messages'; import { Logger } from '@axinom/mosaic-service-common'; +import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox'; import { Config } from '../../common'; +import { EntitlementAuthenticatedTransactionalMessageHandler } from './entitlement-authenticated-message-handler'; -export class SyncClaimDefinitionsFinishedHandler extends MessageHandler { - private logger: Logger; +export class SyncClaimDefinitionsFinishedHandler extends EntitlementAuthenticatedTransactionalMessageHandler { constructor(config: Config) { super( - MonetizationGrantsServiceMultiTenantMessagingSettings - .SynchronizeClaimDefinitionsFinished.messageType, - ); - this.logger = new Logger({ + MonetizationGrantsServiceMultiTenantMessagingSettings.SynchronizeClaimDefinitionsFinished, + new Logger({ + config, + context: SyncClaimDefinitionsFinishedHandler.name, + }), config, - context: SyncClaimDefinitionsFinishedHandler.name, - }); + ); } - async onMessage( - content: SynchronizeClaimDefinitionsFinishedEvent, - ): Promise { + override async handleMessage({ + payload, + }: TypedTransactionalMessage): Promise { this.logger.log({ message: 'Claim Definitions synchronization has succeeded!', - details: { ...content }, + details: { ...payload }, }); } } diff --git a/services/entitlement/service/src/messaging/register-messaging.ts b/services/entitlement/service/src/messaging/register-messaging.ts index 2de38d5e..82fa9984 100644 --- a/services/entitlement/service/src/messaging/register-messaging.ts +++ b/services/entitlement/service/src/messaging/register-messaging.ts @@ -1,11 +1,35 @@ -import { getOwnerPgPool } from '@axinom/mosaic-db-common'; -import { RascalConfigBuilder } from '@axinom/mosaic-message-bus'; +import { initMessagingCounter, OwnerPgPool } from '@axinom/mosaic-db-common'; +import { + Broker, + RascalConfigBuilder, + setupMessagingBroker, +} from '@axinom/mosaic-message-bus'; +import { Logger } from '@axinom/mosaic-service-common'; +import { Express } from 'express'; +import { Config } from '../common'; +import { getMessagingMiddleware } from './middleware'; + import { MonetizationGrantsServiceMultiTenantMessagingSettings, SubscriptionMonetizationServiceMultiTenantMessagingSettings, } from '@axinom/mosaic-messages'; -import { Express } from 'express'; -import { Config } from '../common'; +import { ShutdownActionsMiddleware } from '@axinom/mosaic-service-common'; +import { + RabbitMqInboxWriter, + RascalTransactionalConfigBuilder, + setupOutboxStorage, + setupPollingOutboxListener, + StoreOutboxMessage, + TransactionalLogMapper, +} from '@axinom/mosaic-transactional-inbox-outbox'; +import { + getInboxPollingListenerSettings, + getOutboxPollingListenerSettings, + initializeMessageStorage, + initializePollingMessageListener, + PollingListenerConfig, + TransactionalMessageHandler, +} from 'pg-transactional-outbox'; import { ClaimSetPublishedHandler, SubscriptionPlanPublishedHandler, @@ -13,33 +37,158 @@ import { SyncClaimDefinitionsFinishedHandler, } from './handlers'; -export const registerMessaging = ( +export const registerMessaging = async ( app: Express, + ownerPool: OwnerPgPool, config: Config, -): RascalConfigBuilder[] => { - const ownerPool = getOwnerPgPool(app); - return [ - new RascalConfigBuilder( + shutdownActions: ShutdownActionsMiddleware, +): Promise<{ broker: Broker; storeOutboxMessage: StoreOutboxMessage }> => { + const outboxLogger = new Logger({ context: 'Transactional outbox' }); + const inboxLogger = new Logger({ context: 'Transactional inbox' }); + + const outboxConfig: PollingListenerConfig = { + outboxOrInbox: 'outbox', + dbListenerConfig: { + connectionString: config.dbOwnerConnectionString, + }, + settings: getOutboxPollingListenerSettings(), + }; + const storeOutboxMessage = setupOutboxStorage( + outboxConfig, + outboxLogger, + config, + ); + + const inboxConfig: PollingListenerConfig = { + outboxOrInbox: 'inbox', + dbListenerConfig: { + connectionString: config.dbOwnerConnectionString, + }, + dbHandlerConfig: { connectionString: config.dbOwnerConnectionString }, + settings: getInboxPollingListenerSettings(), + }; + + const logMapper = new TransactionalLogMapper(inboxLogger, config.logLevel); + registerTransactionalInboxHandlers( + config, + inboxConfig, + logMapper, + shutdownActions, + ); + const broker = await registerRabbitMqMessaging( + app, + ownerPool, + config, + inboxConfig, + inboxLogger, + logMapper, + shutdownActions, + ); + + const shutdownOutbox = setupPollingOutboxListener( + outboxConfig, + broker, + outboxLogger, + config, + ); + shutdownActions.push(shutdownOutbox); + + return { broker, storeOutboxMessage }; +}; + +const registerTransactionalInboxHandlers = ( + config: Config, + inboxConfig: PollingListenerConfig, + logMapper: TransactionalLogMapper, + shutdownActions: ShutdownActionsMiddleware, +): void => { + const messageHandlers: TransactionalMessageHandler[] = [ + new SyncClaimDefinitionsFinishedHandler(config), + new SyncClaimDefinitionsFailedHandler(config), + new ClaimSetPublishedHandler(config), + new SubscriptionPlanPublishedHandler(config), + ]; + + const [shutdownInSrv] = initializePollingMessageListener( + inboxConfig, + [...messageHandlers], + logMapper, + ); + shutdownActions.push(shutdownInSrv); +}; + +const registerRabbitMqMessaging = async ( + app: Express, + ownerPool: OwnerPgPool, + config: Config, + inboxConfig: PollingListenerConfig, + inboxLogger: Logger, + logMapper: TransactionalLogMapper, + shutdownActions: ShutdownActionsMiddleware, +): Promise => { + const storeInboxMessage = initializeMessageStorage(inboxConfig, logMapper); + + const grantsSettings = MonetizationGrantsServiceMultiTenantMessagingSettings; + const planSettings = + SubscriptionMonetizationServiceMultiTenantMessagingSettings; + const inboxWriter = new RabbitMqInboxWriter( + storeInboxMessage, + ownerPool, + inboxLogger, + { + // temporary backward compatibility until all your services are updated and all current messages are processed + acceptedMessageSettings: [ + grantsSettings.SynchronizeClaimDefinitionsFinished, + grantsSettings.SynchronizeClaimDefinitionsFailed, + grantsSettings.ClaimSetPublished, + planSettings.SubscriptionPlanPublished, + ], + customMessagePreProcessor: (message) => { + switch (message.messageType) { + case grantsSettings.SynchronizeClaimDefinitionsFinished.messageType: + case grantsSettings.SynchronizeClaimDefinitionsFailed.messageType: + message.concurrency = 'parallel'; + break; + default: + message.concurrency = 'sequential'; + break; + } + }, + }, + ); + + const rascalBuilders: RascalConfigBuilder[] = [ + new RascalTransactionalConfigBuilder( MonetizationGrantsServiceMultiTenantMessagingSettings.SynchronizeClaimDefinitions, config, ).sendCommand(), - new RascalConfigBuilder( + new RascalTransactionalConfigBuilder( MonetizationGrantsServiceMultiTenantMessagingSettings.SynchronizeClaimDefinitionsFinished, config, - ).subscribeForEvent(() => new SyncClaimDefinitionsFinishedHandler(config)), - new RascalConfigBuilder( + ).subscribeForEvent(() => inboxWriter), + new RascalTransactionalConfigBuilder( MonetizationGrantsServiceMultiTenantMessagingSettings.SynchronizeClaimDefinitionsFailed, config, - ).subscribeForEvent(() => new SyncClaimDefinitionsFailedHandler(config)), - new RascalConfigBuilder( + ).subscribeForEvent(() => inboxWriter), + new RascalTransactionalConfigBuilder( MonetizationGrantsServiceMultiTenantMessagingSettings.ClaimSetPublished, config, - ).subscribeForEvent(() => new ClaimSetPublishedHandler(ownerPool, config)), - new RascalConfigBuilder( + ).subscribeForEvent(() => inboxWriter), + new RascalTransactionalConfigBuilder( SubscriptionMonetizationServiceMultiTenantMessagingSettings.SubscriptionPlanPublished, config, - ).subscribeForEvent( - () => new SubscriptionPlanPublishedHandler(ownerPool, config), - ), + ).subscribeForEvent(() => inboxWriter), ]; + + const counter = initMessagingCounter(ownerPool); + return setupMessagingBroker({ + app, + config, + builders: [...rascalBuilders], + logger: inboxLogger, + shutdownActions, + onMessageMiddleware: getMessagingMiddleware(config, inboxLogger), + components: { counters: { postgresCounter: counter } }, + rascalConfigExportPath: './src/generated/messaging/rascal-schema.json', + }); }; diff --git a/services/entitlement/service/src/tests/test-utils/test-context.ts b/services/entitlement/service/src/tests/test-utils/test-context.ts index 332fefe8..4f517acc 100644 --- a/services/entitlement/service/src/tests/test-utils/test-context.ts +++ b/services/entitlement/service/src/tests/test-utils/test-context.ts @@ -1,12 +1,17 @@ import { + buildPgSettings, compareMigrationHashes, LoginPgPool, MigrationRecord, OwnerPgPool, runCurrentSql, + transactionWithContext, } from '@axinom/mosaic-db-common'; import { enhanceGraphqlErrors } from '@axinom/mosaic-graphql-common'; -import { EndUserAuthenticationContext } from '@axinom/mosaic-id-guard'; +import { + AuthenticatedManagementSubject, + EndUserAuthenticationContext, +} from '@axinom/mosaic-id-guard'; import { assertError, customizeGraphQlErrorFields, @@ -30,7 +35,7 @@ import { PostGraphileOptions, withPostGraphileContext, } from 'postgraphile'; -import { truncate } from 'zapatos/db'; +import { IsolationLevel, truncate, TxnClient } from 'zapatos/db'; import { Table } from 'zapatos/schema'; import { Config, getMigrationSettings } from '../../common'; import { entitlementLogMapper } from '../../common/errors/entitlement-log-mapper'; @@ -115,6 +120,10 @@ export interface ITestContext { variables?: Dict, requestContext?: TestRequestContext, ): Promise; + executeOwnerSql( + user: AuthenticatedManagementSubject, + callback: (client: TxnClient) => Promise, + ): Promise; } export const createTestContext = async ( @@ -165,6 +174,19 @@ export const createTestContext = async ( options, ); + const executeOwnerSql = async ( + user: AuthenticatedManagementSubject, + callback: (client: TxnClient) => Promise, + ): Promise => { + const pgSettings = buildPgSettings(user, config.dbOwner, config.serviceId); + return transactionWithContext( + ownerPool, + IsolationLevel.Serializable, + pgSettings, + async (dbContext) => callback(dbContext), + ); + }; + return { ownerPool, loginPool, @@ -173,6 +195,7 @@ export const createTestContext = async ( options, schema, runGqlQuery, + executeOwnerSql, truncate: async function (tableName: Table): Promise { try { await truncate(tableName, 'CASCADE').run(this.ownerPool);