From 233914093aa00ce69a42c388f16de12e1620dd23 Mon Sep 17 00:00:00 2001 From: gquadrati <75862507+gquadrati@users.noreply.github.com> Date: Mon, 9 Aug 2021 15:50:41 +0200 Subject: [PATCH] [#IP-308] add new UserDataDeleteOrchestratorV2 (#158) * [#IP-308] [#IP-308] add new UserDataDeleteOrchestratorV2 * [#IP-308] add new tests for subscription feed * [#IP-308] fix function.json UserDataDeleteOrchestratorV2 * [#IP-308] fix getServicesPreferences bug on retrieve * [#IP-308] apply suggestions from fn-app review --- GetServicesPreferencesActivity/function.json | 10 + GetServicesPreferencesActivity/handler.ts | 103 ++ GetServicesPreferencesActivity/index.ts | 17 + .../__tests__/handler.test.ts | 271 ++++ UpdateSubscriptionsFeedActivity/handler.ts | 184 +++ UpdateSubscriptionsFeedActivity/index.ts | 188 +-- UpdateSubscriptionsFeedActivity/types.ts | 39 - UserDataDeleteOrchestrator/handler.ts | 2 +- .../__tests__/handler.test.ts | 1358 +++++++++++++++++ UserDataDeleteOrchestratorV2/function.json | 10 + UserDataDeleteOrchestratorV2/handler.ts | 672 ++++++++ UserDataDeleteOrchestratorV2/index.ts | 14 + UserDataDeleteOrchestratorV2/utils.ts | 17 + UserDataProcessingTrigger/handler.ts | 4 +- utils/cosmosdb.ts | 3 + utils/crypto.ts | 14 + utils/subscription_feed.ts | 126 ++ 17 files changed, 2820 insertions(+), 212 deletions(-) create mode 100644 GetServicesPreferencesActivity/function.json create mode 100644 GetServicesPreferencesActivity/handler.ts create mode 100644 GetServicesPreferencesActivity/index.ts create mode 100644 UpdateSubscriptionsFeedActivity/__tests__/handler.test.ts create mode 100644 UpdateSubscriptionsFeedActivity/handler.ts delete mode 100644 UpdateSubscriptionsFeedActivity/types.ts create mode 100644 UserDataDeleteOrchestratorV2/__tests__/handler.test.ts create mode 100644 UserDataDeleteOrchestratorV2/function.json create mode 100644 UserDataDeleteOrchestratorV2/handler.ts create mode 100644 UserDataDeleteOrchestratorV2/index.ts create mode 100644 UserDataDeleteOrchestratorV2/utils.ts create mode 100644 utils/crypto.ts create mode 100644 utils/subscription_feed.ts diff --git a/GetServicesPreferencesActivity/function.json b/GetServicesPreferencesActivity/function.json new file mode 100644 index 00000000..884e05ef --- /dev/null +++ b/GetServicesPreferencesActivity/function.json @@ -0,0 +1,10 @@ +{ + "bindings": [ + { + "name": "name", + "type": "activityTrigger", + "direction": "in" + } + ], + "scriptFile": "../dist/GetServicesPreferencesActivity/index.js" +} \ No newline at end of file diff --git a/GetServicesPreferencesActivity/handler.ts b/GetServicesPreferencesActivity/handler.ts new file mode 100644 index 00000000..2a00923a --- /dev/null +++ b/GetServicesPreferencesActivity/handler.ts @@ -0,0 +1,103 @@ +import { Context } from "@azure/functions"; +import { + ServicePreference, + ServicesPreferencesModel +} from "@pagopa/io-functions-commons/dist/src/models/service_preference"; +import { + asyncIteratorToArray, + flattenAsyncIterator +} from "@pagopa/io-functions-commons/dist/src/utils/async"; +import { + CosmosErrors, + toCosmosErrorResponse +} from "@pagopa/io-functions-commons/dist/src/utils/cosmosdb_model"; +import { NonNegativeInteger } from "@pagopa/ts-commons/lib/numbers"; +import { readableReport } from "@pagopa/ts-commons/lib/reporters"; +import { FiscalCode } from "@pagopa/ts-commons/lib/strings"; +import { isRight } from "fp-ts/lib/Either"; +import { fromEither, tryCatch } from "fp-ts/lib/TaskEither"; +import * as t from "io-ts"; + +const ActivityInput = t.interface({ + fiscalCode: FiscalCode, + settingsVersion: NonNegativeInteger +}); +type ActivityInput = t.TypeOf; + +// Activity result +export const ActivityResultSuccess = t.interface({ + kind: t.literal("SUCCESS"), + preferences: t.readonlyArray(ServicePreference) +}); + +export type ActivityResultSuccess = t.TypeOf; + +export const InvalidInputFailure = t.interface({ + kind: t.literal("INVALID_INPUT") +}); +export type InvalidInputFailure = t.TypeOf; + +export const ActivityResult = t.taggedUnion("kind", [ + ActivityResultSuccess, + InvalidInputFailure +]); +export type ActivityResult = t.TypeOf; + +export const GetServicesPreferencesActivityHandler = ( + servicePreferences: ServicesPreferencesModel +) => async (context: Context, input: unknown): Promise => + fromEither(ActivityInput.decode(input)) + .mapLeft(_ => + InvalidInputFailure.encode({ kind: "INVALID_INPUT" }) + ) + .chain(({ fiscalCode, settingsVersion }) => + tryCatch( + async () => + servicePreferences + .getQueryIterator({ + parameters: [ + { + name: "@fiscalCode", + value: fiscalCode + }, + { + name: "@version", + value: settingsVersion + } + ], + query: `SELECT * FROM m WHERE m.fiscalCode = @fiscalCode AND m.settingsVersion = @version` + }) + [Symbol.asyncIterator](), + toCosmosErrorResponse + ) + ) + .map(flattenAsyncIterator) + .map(asyncIteratorToArray) + .chain(i => tryCatch(() => i, toCosmosErrorResponse)) + .map(values => values.filter(isRight).map(_ => _.value)) + .fold( + err => { + if (err.kind === "INVALID_INPUT") { + context.log.error( + `GetServicesPreferencesActivityHandler|ERROR|Invalid activity input [${err}]` + ); + return err; + } + context.log.error( + `GetServicesPreferencesActivityHandler|ERROR|Cosmos error [${ + err.kind === "COSMOS_DECODING_ERROR" + ? readableReport(err.error) + : err.kind === "COSMOS_ERROR_RESPONSE" + ? err.error.message + : err.kind + }]` + ); + throw new Error(err.kind); + }, + preferences => + ActivityResultSuccess.encode({ + kind: "SUCCESS", + preferences + }) + ) + .run(); diff --git a/GetServicesPreferencesActivity/index.ts b/GetServicesPreferencesActivity/index.ts new file mode 100644 index 00000000..7d838a59 --- /dev/null +++ b/GetServicesPreferencesActivity/index.ts @@ -0,0 +1,17 @@ +import { + SERVICE_PREFERENCES_COLLECTION_NAME, + ServicesPreferencesModel +} from "@pagopa/io-functions-commons/dist/src/models/service_preference"; +import { cosmosdbInstance } from "../utils/cosmosdb"; +import { GetServicesPreferencesActivityHandler } from "./handler"; + +const servicePreferencesModel = new ServicesPreferencesModel( + cosmosdbInstance.container(SERVICE_PREFERENCES_COLLECTION_NAME), + SERVICE_PREFERENCES_COLLECTION_NAME +); + +const activityFunctionHandler = GetServicesPreferencesActivityHandler( + servicePreferencesModel +); + +export default activityFunctionHandler; diff --git a/UpdateSubscriptionsFeedActivity/__tests__/handler.test.ts b/UpdateSubscriptionsFeedActivity/__tests__/handler.test.ts new file mode 100644 index 00000000..6e63f298 --- /dev/null +++ b/UpdateSubscriptionsFeedActivity/__tests__/handler.test.ts @@ -0,0 +1,271 @@ +import { Context } from "@azure/functions"; +import { ServiceId } from "@pagopa/io-functions-commons/dist/generated/definitions/ServiceId"; +import { ServicePreference } from "@pagopa/io-functions-commons/dist/src/models/service_preference"; +import { NonEmptyString } from "@pagopa/ts-commons/lib/strings"; +import { TableService } from "azure-storage"; + +import { context as contextMock } from "../../__mocks__/durable-functions"; +import { + aFiscalCode, + aRetrievedServicePreferences +} from "../../__mocks__/mocks"; + +import { Input, updateSubscriptionFeed } from "../handler"; + +const aServiceId = "aServiceId" as ServiceId; + +const insertEntityMock = jest.fn((_, __, f) => { + f(undefined, undefined, { isSuccessful: true }); +}); + +const deleteEntityMock = jest.fn((_, __, f) => { + f(undefined, { isSuccessful: true }); +}); + +const tableServiceMock = ({ + deleteEntity: deleteEntityMock, + insertEntity: insertEntityMock +} as unknown) as TableService; + +const today = new Date(); + +describe("UpdateSubscriptionsFeedActivity - Service", () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("Given a subscribed service input, When service has already been unsubscribed today, Then the unsubscribe feed must be deleted", async () => { + const input: Input = { + fiscalCode: aFiscalCode, + operation: "SUBSCRIBED", + serviceId: aServiceId, + updatedAt: today.getTime(), + version: 1, + subscriptionKind: "SERVICE" + }; + + const result = await updateSubscriptionFeed( + (contextMock as unknown) as Context, + input, + tableServiceMock, + "aTable" as NonEmptyString + ); + + expect(tableServiceMock.deleteEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "S-" + today.toISOString().substring(0, 10) + "-aServiceId-U" + }) + }), + expect.any(Function) + ); + + expect(tableServiceMock.insertEntity).not.toHaveBeenCalled(); + + expect(result).toEqual("SUCCESS"); + }); + + it("Given a subscribed service input, When no service has been unsubscribed during the current day, Then the subscribe feed must be added", async () => { + const input: Input = { + fiscalCode: aFiscalCode, + operation: "SUBSCRIBED", + serviceId: aServiceId, + updatedAt: today.getTime(), + version: 1, + subscriptionKind: "SERVICE" + }; + + deleteEntityMock.mockImplementationOnce((_, __, f) => { + f(Error("an Error"), { isSuccessful: false, statusCode: 404 }); + }); + + const result = await updateSubscriptionFeed( + (contextMock as unknown) as Context, + input, + tableServiceMock, + "aTable" as NonEmptyString + ); + + expect(tableServiceMock.deleteEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "S-" + today.toISOString().substring(0, 10) + "-aServiceId-U" + }) + }), + expect.any(Function) + ); + + expect(tableServiceMock.insertEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "S-" + today.toISOString().substring(0, 10) + "-aServiceId-S" + }) + }), + expect.any(Function) + ); + expect(result).toEqual("SUCCESS"); + }); +}); + +describe("UpdateSubscriptionsFeedActivity - Profile", () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("Given a subscribed profile input, When profile has already been unsubscribed today, Then the unsubscribe feed must be deleted and the subscribe feed must be added", async () => { + const input: Input = { + fiscalCode: aFiscalCode, + operation: "SUBSCRIBED", + updatedAt: today.getTime(), + version: 1, + subscriptionKind: "PROFILE", + previousPreferences: [] + }; + + const result = await updateSubscriptionFeed( + (contextMock as unknown) as Context, + input, + tableServiceMock, + "aTable" as NonEmptyString + ); + + expect(tableServiceMock.deleteEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "P-" + today.toISOString().substring(0, 10) + "-U" + }) + }), + expect.any(Function) + ); + + expect(tableServiceMock.insertEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "P-" + today.toISOString().substring(0, 10) + "-S" + }) + }), + expect.any(Function) + ); + + expect(result).toEqual("SUCCESS"); + }); + + it("Given a subscribed profile input, When no profile has been unsubscribed during the current day, Then the subscribe feed must be added", async () => { + const input: Input = { + fiscalCode: aFiscalCode, + operation: "SUBSCRIBED", + updatedAt: today.getTime(), + version: 1, + subscriptionKind: "PROFILE", + previousPreferences: [] + }; + deleteEntityMock.mockImplementationOnce((_, __, f) => { + f(Error("an Error"), { isSuccessful: false, statusCode: 404 }); + }); + + const result = await updateSubscriptionFeed( + (contextMock as unknown) as Context, + input, + tableServiceMock, + "aTable" as NonEmptyString + ); + + expect(tableServiceMock.deleteEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "P-" + today.toISOString().substring(0, 10) + "-U" + }) + }), + expect.any(Function) + ); + + expect(tableServiceMock.insertEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "P-" + today.toISOString().substring(0, 10) + "-S" + }) + }), + expect.any(Function) + ); + + expect(result).toEqual("SUCCESS"); + }); +}); + +describe("UpdateSubscriptionsFeedActivity - Profile with preferences", () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + it("Given a subscribed profile with previous preferences, When profile has already been unsubscribed today, Then all service preferences subscribed feed for today must be deleted", async () => { + const input: Input = { + fiscalCode: aFiscalCode, + operation: "SUBSCRIBED", + previousPreferences: [aRetrievedServicePreferences], + subscriptionKind: "PROFILE", + updatedAt: today.getTime(), + version: 1 + }; + + const result = await updateSubscriptionFeed( + (contextMock as unknown) as Context, + input, + tableServiceMock, + "aTable" as NonEmptyString + ); + + expect(tableServiceMock.deleteEntity).toHaveBeenCalledTimes(3); + expect(tableServiceMock.deleteEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "P-" + today.toISOString().substring(0, 10) + "-U" + }) + }), + expect.any(Function) + ); + + expect(tableServiceMock.deleteEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: `S-${today.toISOString().substring(0, 10)}-${ + aRetrievedServicePreferences.serviceId + }-U` + }) + }), + expect.any(Function) + ); + + expect(tableServiceMock.deleteEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: `S-${today.toISOString().substring(0, 10)}-${ + aRetrievedServicePreferences.serviceId + }-S` + }) + }), + expect.any(Function) + ); + + expect(tableServiceMock.insertEntity).toHaveBeenCalledTimes(1); + expect(tableServiceMock.insertEntity).toHaveBeenCalledWith( + "aTable", + expect.objectContaining({ + PartitionKey: expect.objectContaining({ + _: "P-" + today.toISOString().substring(0, 10) + "-S" + }) + }), + expect.any(Function) + ); + + expect(result).toEqual("SUCCESS"); + }); +}); diff --git a/UpdateSubscriptionsFeedActivity/handler.ts b/UpdateSubscriptionsFeedActivity/handler.ts new file mode 100644 index 00000000..cc7d6dd6 --- /dev/null +++ b/UpdateSubscriptionsFeedActivity/handler.ts @@ -0,0 +1,184 @@ +import { Context } from "@azure/functions"; +import { ServiceId } from "@pagopa/io-functions-commons/dist/generated/definitions/ServiceId"; +import { ServicePreference } from "@pagopa/io-functions-commons/dist/src/models/service_preference"; +import { readableReport } from "@pagopa/ts-commons/lib/reporters"; +import { FiscalCode, NonEmptyString } from "@pagopa/ts-commons/lib/strings"; +import { TableService } from "azure-storage"; +import { fromPredicate } from "fp-ts/lib/Option"; +import * as t from "io-ts"; +import { toHash } from "../utils/crypto"; +import { + SubscriptionFeedEntitySelector, + updateSubscriptionStatus +} from "../utils/subscription_feed"; + +const CommonInput = t.interface({ + // fiscal code of the user affected by this update + fiscalCode: FiscalCode, + // whether the service has been subscribed or unsubscribed + operation: t.union([t.literal("SUBSCRIBED"), t.literal("UNSUBSCRIBED")]), + // the time (millis epoch) of the update + updatedAt: t.number, + // updated version of the profile + version: t.number +}); +type CommonInput = t.TypeOf; + +const ProfileInput = t.intersection([ + CommonInput, + t.interface({ + // a profile subscription event + subscriptionKind: t.literal("PROFILE") + }), + t.partial({ + previousPreferences: t.readonlyArray(ServicePreference) + }) +]); +type ProfileInput = t.TypeOf; + +const ServiceInput = t.intersection([ + CommonInput, + t.interface({ + // the updated service + serviceId: ServiceId, + // a service subscription event + subscriptionKind: t.literal("SERVICE") + }) +]); +type ServiceInput = t.TypeOf; + +/** + * Input data for this activity function, we need information about the kind + * of subscription event and the affected user profile. + */ +export const Input = t.union([ProfileInput, ServiceInput]); + +export type Input = t.TypeOf; + +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export const updateSubscriptionFeed = async ( + context: Context, + rawInput: unknown, + tableService: TableService, + subscriptionFeedTableName: NonEmptyString, + logPrefix: string = "UpdateServiceSubscriptionFeedActivity" +) => { + const decodedInputOrError = Input.decode(rawInput); + if (decodedInputOrError.isLeft()) { + context.log.error( + `${logPrefix}|Cannot parse input|ERROR=${readableReport( + decodedInputOrError.value + )}` + ); + return "FAILURE"; + } + + const decodedInput = decodedInputOrError.value; + + const { fiscalCode, operation, updatedAt, version } = decodedInput; + + // The date part of the key will be in UTC time zone, with format: YYYY-MM-DD + const utcTodayPrefix = new Date(updatedAt).toISOString().substring(0, 10); + + // Create a SHA256 hash of the fiscal code + // see https://nodejs.org/api/crypto.html#crypto_crypto_createhash_algorithm_options + const fiscalCodeHash = toHash(fiscalCode); + + const updateLogPrefix = `${logPrefix}|PROFILE=${fiscalCode}|OPERATION=${operation}|PROFILE=${fiscalCode}`; + + // Entity keys have the following format + // + // Profile subscription events: P--- + // Service subscription events: S---- + // + // Where: + // + // * DATE is "YYYY-MM-DD" (UTC) + // * SERVICE_ID is the service ID that the user subscribed/unsubscribed + // * EVENT is either "S" for subscription events or "U" for unsubscriptions + // * HASH is the hex encoded SHA256 hash of the fiscal code + // + const sPartitionKey = + decodedInput.subscriptionKind === "PROFILE" + ? `P-${utcTodayPrefix}-S` + : `S-${utcTodayPrefix}-${decodedInput.serviceId}-S`; + const uPartitionKey = + decodedInput.subscriptionKind === "PROFILE" + ? `P-${utcTodayPrefix}-U` + : `S-${utcTodayPrefix}-${decodedInput.serviceId}-U`; + + const sKey = `${sPartitionKey}-${fiscalCodeHash}`; + const uKey = `${uPartitionKey}-${fiscalCodeHash}`; + + const otherEntitiesToDelete: ReadonlyArray = fromPredicate( + ProfileInput.is + )(decodedInput) + .mapNullable(_ => _.previousPreferences) + .map(_ => + _.reduce((prev, preference) => { + // TODO: This code could be optimized deleting only the entry based on the current + // profile status and the effective previous preference inbox value + const sPreferencePartitionKey = `S-${utcTodayPrefix}-${preference.serviceId}-S`; + const uPreferencePartitionKey = `S-${utcTodayPrefix}-${preference.serviceId}-U`; + return [ + ...prev, + { + partitionKey: sPreferencePartitionKey, + rowKey: `${sPreferencePartitionKey}-${fiscalCodeHash}` + }, + { + partitionKey: uPreferencePartitionKey, + rowKey: `${uPreferencePartitionKey}-${fiscalCodeHash}` + } + ]; + }, [] as ReadonlyArray) + ) + .getOrElse([]); + + const allowInsertIfDeleted = decodedInput.subscriptionKind !== "SERVICE"; + + const updateSubscriptionStatusHandler = updateSubscriptionStatus( + tableService, + subscriptionFeedTableName + ); + + if (operation === "SUBSCRIBED") { + // we delete the entry from the unsubscriptions and we add it to the + // subscriptions + await updateSubscriptionStatusHandler( + context, + updateLogPrefix, + version, + { + partitionKey: uPartitionKey, + rowKey: uKey + }, + otherEntitiesToDelete, + { + partitionKey: sPartitionKey, + rowKey: sKey + }, + allowInsertIfDeleted + ); + } else { + // we delete the entry from the subscriptions and we add it to the + // unsubscriptions + await updateSubscriptionStatusHandler( + context, + updateLogPrefix, + version, + { + partitionKey: sPartitionKey, + rowKey: sKey + }, + otherEntitiesToDelete, + { + partitionKey: uPartitionKey, + rowKey: uKey + }, + allowInsertIfDeleted + ); + } + + return "SUCCESS"; +}; diff --git a/UpdateSubscriptionsFeedActivity/index.ts b/UpdateSubscriptionsFeedActivity/index.ts index 6baa2465..1e56b8a5 100644 --- a/UpdateSubscriptionsFeedActivity/index.ts +++ b/UpdateSubscriptionsFeedActivity/index.ts @@ -1,178 +1,26 @@ -import * as crypto from "crypto"; - -import { AzureFunction, Context } from "@azure/functions"; -import { createTableService, TableUtilities } from "azure-storage"; - -import { readableReport } from "@pagopa/ts-commons/lib/reporters"; - -import { isNone } from "fp-ts/lib/Option"; +import { AzureFunction, Context } from "@azure/functions"; +import { createTableService } from "azure-storage"; import { getConfigOrThrow } from "../utils/config"; -import { deleteTableEntity, insertTableEntity } from "../utils/storage"; -import { ActivityInput, ActivityResult } from "./types"; +import { updateSubscriptionFeed } from "./handler"; const config = getConfigOrThrow(); -const storageConnectionString = config.SubscriptionFeedStorageConnection; -const tableService = createTableService(storageConnectionString); - -const subscriptionsFeedTable = config.SUBSCRIPTIONS_FEED_TABLE; - -const insertEntity = insertTableEntity(tableService, subscriptionsFeedTable); -const deleteEntity = deleteTableEntity(tableService, subscriptionsFeedTable); - -const eg = TableUtilities.entityGenerator; - -/** - * Updates the subscrption status of a user. - * - * User subscribed or unsubscribed events are stored as empty entities in an - * Azure storage table. - * - * The entity key is composed by the day of the event, the service ID (if it's - * a service subscription event), a character that indicates whether it's a - * subscribed (S) or unsubscribed (U) event and the SHA256 hash of the fiscal - * code of the user. - * - * For each day, (optionally) service and user, either the S or the U key exist, - * but not both (it would not make sense). - * - * When the key does not include a service ID, it refers to a profile - * subscription event, meaning the user registered to IO (subscribed) or deleted - * her account (unsubscribed). - * When the key includes the service ID, it refers to a service subscription - * event, meaning the user activated (subscribed) or deactivated (unsubscribed) - * a specific service. - */ -// eslint-disable-next-line max-params, prefer-arrow/prefer-arrow-functions -async function updateSubscriptionStatus( - context: Context, - logPrefix: string, - version: number, - delPartitionKey: string, - delKey: string, - insPartitionKey: string, - insKey: string -): Promise { - // First we try to delete a previous (un)subscriptions operation - // from the subscription feed entries for the current day - context.log.verbose(`${logPrefix}|KEY=${delKey}|Deleting entity`); - const { e1: maybeError, e2: uResponse } = await deleteEntity({ - PartitionKey: eg.String(delPartitionKey), - RowKey: eg.String(delKey) - }); - - // If deleteEntity is successful it means the user - // previously made an opposite choice (in the same day). - // Since we're going to expose only the delta for this day, - // and we've just deleted the opposite operation, we go on here. - if (isNone(maybeError)) { - return true; - } - - if (maybeError.isSome() && uResponse.statusCode !== 404) { - // retry - context.log.error(`${logPrefix}|ERROR=${maybeError.value.message}`); - throw maybeError.value; - } - - // If deleteEntity has not found any entry, - // we insert the new (un)subscription entry into the feed - context.log.verbose(`${logPrefix}|KEY=${insKey}|Inserting entity`); - const { e1: resultOrError, e2: sResponse } = await insertEntity({ - PartitionKey: eg.String(insPartitionKey), - RowKey: eg.String(insKey), - version: eg.Int32(version) - }); - if (resultOrError.isLeft() && sResponse.statusCode !== 409) { - // retry - context.log.error(`${logPrefix}|ERROR=${resultOrError.value.message}`); - throw resultOrError.value; - } +const tableService = createTableService( + config.SubscriptionFeedStorageConnection +); - return true; -} +// When the function starts, attempt to create the table if it does not exist +// Note that we cannot log anything just yet since we don't have a Context +tableService.createTableIfNotExists(config.SUBSCRIPTIONS_FEED_TABLE, () => 0); -export const index: AzureFunction = async ( +const activityFunction: AzureFunction = async ( context: Context, rawInput: unknown -): Promise => { - const decodedInputOrError = ActivityInput.decode(rawInput); - if (decodedInputOrError.isLeft()) { - context.log.error( - `UpdateServiceSubscriptionFeedActivity|Cannot parse input|ERROR=${readableReport( - decodedInputOrError.value - )}` - ); - return "FAILURE"; - } - - const decodedInput = decodedInputOrError.value; - - const { fiscalCode, operation, updatedAt, version } = decodedInput; - - // The date part of the key will be in UTC time zone, with format: YYYY-MM-DD - const utcTodayPrefix = new Date(updatedAt).toISOString().substring(0, 10); - - // Create a SHA256 hash of the fiscal code - // see https://nodejs.org/api/crypto.html#crypto_crypto_createhash_algorithm_options - const fiscalCodeHash = crypto - .createHash("sha256") - .update(fiscalCode) - .digest("hex"); - - const logPrefix = `UpdateSubscriptionFeedActivity|PROFILE=${fiscalCode}|OPERATION=${operation}|PROFILE=${fiscalCode}`; - - // Entity keys have the following format - // - // Profile subscription events: P--- - // Service subscription events: S---- - // - // Where: - // - // * DATE is "YYYY-MM-DD" (UTC) - // * SERVICE_ID is the service ID that the user subscribed/unsubscribed - // * EVENT is either "S" for subscription events or "U" for unsubscriptions - // * HASH is the hex encoded SHA256 hash of the fiscal code - // - const sPartitionKey = - decodedInput.subscriptionKind === "PROFILE" - ? `P-${utcTodayPrefix}-S` - : `S-${utcTodayPrefix}-${decodedInput.serviceId}-S`; - const uPartitionKey = - decodedInput.subscriptionKind === "PROFILE" - ? `P-${utcTodayPrefix}-U` - : `S-${utcTodayPrefix}-${decodedInput.serviceId}-U`; - - const sKey = `${sPartitionKey}-${fiscalCodeHash}`; - const uKey = `${uPartitionKey}-${fiscalCodeHash}`; - - if (operation === "SUBSCRIBED") { - // we delete the entry from the unsubscriptions and we add it to the - // subscriptions - await updateSubscriptionStatus( - context, - logPrefix, - version, - uPartitionKey, - uKey, - sPartitionKey, - sKey - ); - } else { - // we delete the entry from the subscriptions and we add it to the - // unsubscriptions - await updateSubscriptionStatus( - context, - logPrefix, - version, - sPartitionKey, - sKey, - uPartitionKey, - uKey - ); - } - - return "SUCCESS"; -}; - -export default index; +): Promise => + updateSubscriptionFeed( + context, + rawInput, + tableService, + config.SUBSCRIPTIONS_FEED_TABLE + ); +export default activityFunction; diff --git a/UpdateSubscriptionsFeedActivity/types.ts b/UpdateSubscriptionsFeedActivity/types.ts deleted file mode 100644 index e6de6373..00000000 --- a/UpdateSubscriptionsFeedActivity/types.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { ServiceId } from "@pagopa/io-functions-commons/dist/generated/definitions/ServiceId"; -import * as t from "io-ts"; -import { FiscalCode } from "@pagopa/ts-commons/lib/strings"; - -/** - * Input data for this activity function, we need information about the kind - * of subscription event and the affected user profile. - */ -export type Input = t.TypeOf; -export const ActivityInput = t.intersection([ - t.interface({ - // fiscal code of the user affected by this update - fiscalCode: FiscalCode, - // whether the service has been subscribed or unsubscribed - operation: t.union([t.literal("SUBSCRIBED"), t.literal("UNSUBSCRIBED")]), - // the time (millis epoch) of the update - updatedAt: t.number, - // updated version of the profile - version: t.number - }), - t.union([ - t.interface({ - // a profile subscription event - subscriptionKind: t.literal("PROFILE") - }), - t.interface({ - // the updated service - serviceId: ServiceId, - // a service subscription event - subscriptionKind: t.literal("SERVICE") - }) - ]) -]); - -export type ActivityResult = t.TypeOf; -export const ActivityResult = t.union([ - t.literal("SUCCESS"), - t.literal("FAILURE") -]); diff --git a/UserDataDeleteOrchestrator/handler.ts b/UserDataDeleteOrchestrator/handler.ts index 8d151e1f..f2f4b7a0 100644 --- a/UserDataDeleteOrchestrator/handler.ts +++ b/UserDataDeleteOrchestrator/handler.ts @@ -44,7 +44,7 @@ import { ActivityResultSuccess as IsFailedUserDataProcessingActivityResultSuccess } from "../IsFailedUserDataProcessingActivity/handler"; -import { ActivityInput as UpdateServiceSubscriptionFeedActivityInput } from "../UpdateSubscriptionsFeedActivity/types"; +import { Input as UpdateServiceSubscriptionFeedActivityInput } from "../UpdateSubscriptionsFeedActivity/handler"; import { ProcessableUserDataDelete } from "../UserDataProcessingTrigger/handler"; import { trackUserDataDeleteEvent, diff --git a/UserDataDeleteOrchestratorV2/__tests__/handler.test.ts b/UserDataDeleteOrchestratorV2/__tests__/handler.test.ts new file mode 100644 index 00000000..394b883b --- /dev/null +++ b/UserDataDeleteOrchestratorV2/__tests__/handler.test.ts @@ -0,0 +1,1358 @@ +import { IOrchestrationFunctionContext } from "durable-functions/lib/src/classes"; +import { + mockOrchestratorCallActivity, + mockOrchestratorCallActivityWithRetry, + mockOrchestratorCancelTimer, + mockOrchestratorContext, + mockOrchestratorGetInput, + mockOrchestratorTaskAny +} from "../../__mocks__/durable-functions"; +import { + createUserDataDeleteOrchestratorHandler, + InvalidInputFailure, + OrchestratorSuccess +} from "../handler"; + +import { UserDataProcessingChoiceEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/UserDataProcessingChoice"; +import { UserDataProcessingStatusEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/UserDataProcessingStatus"; +import { readableReport } from "@pagopa/ts-commons/lib/reporters"; +import { Day, Hour } from "@pagopa/ts-commons/lib/units"; +import { + aUserDataProcessing, + aProfile, + aRetrievedProfile, + aRetrievedServicePreferences +} from "../../__mocks__/mocks"; +import { ActivityResultSuccess as DeleteUserDataActivityResultSuccess } from "../../DeleteUserDataActivity/types"; +import { + ActivityResultNotFoundFailure as GetUserDataProcessingActivityResultNotFoundFailure, + ActivityResultSuccess as GetUserDataProcessingActivityResultSuccess +} from "../../GetUserDataProcessingActivity/handler"; +import { + ActivityResultFailure as IsFailedUserDataProcessingActivityResultFailure, + ActivityResultSuccess as IsFailedUserDataProcessingActivityResultSuccess +} from "../../IsFailedUserDataProcessingActivity/handler"; +import { ActivityResultSuccess as SetUserDataProcessingStatusActivityResultSuccess } from "../../SetUserDataProcessingStatusActivity/handler"; +import { ActivityResultSuccess as SetUserSessionLockActivityResultSuccess } from "../../SetUserSessionLockActivity/handler"; +import { OrchestratorFailure } from "../../UserDataDownloadOrchestrator/handler"; +import { + ActivityResultSuccess as GetProfileActivityResultSuccess, + ActivityResultNotFoundFailure as GetProfileActivityResultNotFoundFailure +} from "../../GetProfileActivity/handler"; +import { ActivityResultSuccess as GetServicesPreferencesActivityResultSuccess } from "../../GetServicesPreferencesActivity/handler"; +import { ProcessableUserDataDelete } from "../../UserDataProcessingTrigger/handler"; +import { ActivityResultSuccess as SendUserDataDeleteEmailActivityResultSuccess } from "../../SendUserDataDeleteEmailActivity/handler"; +import { addDays, addHours } from "../utils"; +import { ServicesPreferencesModeEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/ServicesPreferencesMode"; +import { NonNegativeInteger } from "@pagopa/ts-commons/lib/numbers"; + +const aProcessableUserDataDelete = ProcessableUserDataDelete.decode({ + ...aUserDataProcessing, + choice: UserDataProcessingChoiceEnum.DELETE, + status: UserDataProcessingStatusEnum.PENDING +}).getOrElseL(e => + fail(`Failed creating a mock input document: ${readableReport(e)}`) +); + +const aUserDataDownloadPending = { + ...aUserDataProcessing, + choice: UserDataProcessingChoiceEnum.DOWNLOAD, + status: UserDataProcessingStatusEnum.PENDING +}; + +const aUserDataDownloadWip = { + ...aUserDataProcessing, + choice: UserDataProcessingChoiceEnum.DOWNLOAD, + status: UserDataProcessingStatusEnum.WIP +}; + +const aUserDataDownloadClosed = { + ...aUserDataProcessing, + choice: UserDataProcessingChoiceEnum.DOWNLOAD, + status: UserDataProcessingStatusEnum.CLOSED +}; + +// this mock will default to a not failed request to be transparent for old tests +// we will override this just to test new logic for failed requests +const isFailedUserDataProcessingActivity = jest.fn().mockImplementation(() => + IsFailedUserDataProcessingActivityResultSuccess.encode({ + kind: "SUCCESS", + value: false + }) +); + +const setUserDataProcessingStatusActivity = jest.fn().mockImplementation(() => + SetUserDataProcessingStatusActivityResultSuccess.encode({ + kind: "SUCCESS" + }) +); + +const getUserDataProcessingActivity = jest.fn().mockImplementation(() => + GetUserDataProcessingActivityResultNotFoundFailure.encode({ + kind: "NOT_FOUND_FAILURE" + }) +); + +const setUserSessionLockActivity = jest.fn().mockImplementation(() => + SetUserSessionLockActivityResultSuccess.encode({ + kind: "SUCCESS" + }) +); + +const deleteUserDataActivity = jest.fn().mockImplementation(() => + DeleteUserDataActivityResultSuccess.encode({ + kind: "SUCCESS" + }) +); + +const sendUserDataDeleteEmailActivity = jest.fn().mockImplementation(() => + SendUserDataDeleteEmailActivityResultSuccess.encode({ + kind: "SUCCESS" + }) +); + +const getProfileActivity = jest.fn().mockImplementation(() => + GetProfileActivityResultSuccess.encode({ + kind: "SUCCESS", + value: aRetrievedProfile + }) +); + +const getServicePreferencesActivity = jest.fn().mockImplementation(() => + GetServicesPreferencesActivityResultSuccess.encode({ + kind: "SUCCESS", + preferences: [aRetrievedServicePreferences] + }) +); + +const updateSubscriptionFeed = jest.fn().mockImplementation(() => "SUCCESS"); + +// A mock implementation proxy for df.callActivity/df.df.callActivityWithRetry that routes each call to the correct mock implentation +const switchMockImplementation = (name: string, ...args: readonly unknown[]) => + (name === "SetUserDataProcessingStatusActivity" + ? setUserDataProcessingStatusActivity + : name === "GetUserDataProcessingActivity" + ? getUserDataProcessingActivity + : name === "SetUserSessionLockActivity" + ? setUserSessionLockActivity + : name === "DeleteUserDataActivity" + ? deleteUserDataActivity + : name === "SendUserDataDeleteEmailActivity" + ? sendUserDataDeleteEmailActivity + : name === "GetProfileActivity" + ? getProfileActivity + : name === "GetServicesPreferencesActivity" + ? getServicePreferencesActivity + : name === "UpdateSubscriptionsFeedActivity" + ? updateSubscriptionFeed + : name === "IsFailedUserDataProcessingActivity" + ? isFailedUserDataProcessingActivity + : jest.fn())(name, ...args); + +// I assign switchMockImplementation to both because +// I don't want tests to depend on implementation details +// such as which activity is called with retry and which is not +mockOrchestratorCallActivity.mockImplementation(switchMockImplementation); +mockOrchestratorCallActivityWithRetry.mockImplementation( + switchMockImplementation +); + +/** + * Util function that takes an orchestrator and executes each step until is done + * @param orch an orchestrator + * + * @returns the last value yielded by the orchestrator + */ +const consumeOrchestrator = (orch: any) => { + let prevValue: unknown; + while (true) { + const { done, value } = orch.next(prevValue); + if (done) { + return value; + } + prevValue = value; + } +}; + +// just a convenient cast, good for every test case +const context = (mockOrchestratorContext as unknown) as IOrchestrationFunctionContext; + +// timer are not delayed for test, but we set default values +// to test any override, i.e. the grace period for failed requests +const waitForAbortInterval = 1 as Day; +const waitForDownloadInterval = 1 as Hour; + +const expectedRetryOptions = expect.any(Object); + +describe("createUserDataDeleteOrchestratorHandler", () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("all requests: should fail on invalid input", () => { + mockOrchestratorGetInput.mockReturnValueOnce("invalid input"); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(InvalidInputFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).not.toHaveBeenCalled(); + + expect(isFailedUserDataProcessingActivity).not.toHaveBeenCalled(); + + expect(context.df.createTimer).not.toHaveBeenCalled(); + + expect(setUserSessionLockActivity).not.toHaveBeenCalled(); + + expect(setUserDataProcessingStatusActivity).not.toHaveBeenCalled(); + + expect(getUserDataProcessingActivity).not.toHaveBeenCalled(); + + expect(deleteUserDataActivity).not.toHaveBeenCalled(); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).not.toHaveBeenCalled(); + }); + + it("all requests: should set status as FAILED if user profile does not exist", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + getProfileActivity.mockImplementationOnce(() => + GetProfileActivityResultNotFoundFailure.encode({ + kind: "NOT_FOUND_FAILURE" + }) + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + expect(getProfileActivity).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ + fiscalCode: aProcessableUserDataDelete.fiscalCode + }) + ); + + expect(isFailedUserDataProcessingActivity).not.toHaveBeenCalled(); + + expect(context.df.createTimer).not.toHaveBeenCalled(); + + expect(setUserSessionLockActivity).not.toHaveBeenCalled(); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(1); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).not.toHaveBeenCalled(); + + expect(deleteUserDataActivity).not.toHaveBeenCalled(); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).not.toHaveBeenCalled(); + }); + + it("new processing requests: should set status as FAILED if fails to lock the user session", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + setUserSessionLockActivity.mockImplementationOnce( + () => "any unsuccessful value" + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalled(); + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(1); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + // if session lock fails WIP status is never set, we just put it in FAILED + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(1); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).not.toHaveBeenCalled(); + + expect(deleteUserDataActivity).not.toHaveBeenCalled(); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).not.toHaveBeenCalled(); + }); + + it("new processing requests: should set status as FAILED if fails to set the operation as WIP", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + setUserDataProcessingStatusActivity.mockImplementationOnce( + () => "any unsuccessful value" + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalled(); + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(1); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).not.toHaveBeenCalled(); + + expect(deleteUserDataActivity).not.toHaveBeenCalled(); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).not.toHaveBeenCalled(); + }); + + it("new processing requests: should set status as FAILED if fails delete user data", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + deleteUserDataActivity.mockImplementationOnce( + () => "any unsuccessful value" + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalled(); + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(1); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + expect(deleteUserDataActivity).toHaveBeenCalledTimes(1); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).not.toHaveBeenCalled(); + }); + + it("new processing requests: should set status as FAILED if fails to unlock the user session", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + setUserSessionLockActivity.mockImplementationOnce(() => + SetUserSessionLockActivityResultSuccess.encode({ + kind: "SUCCESS" + }) + ); + + setUserSessionLockActivity.mockImplementationOnce( + () => "any unsuccessful value" + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalled(); + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(2); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "UNLOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(3); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + + // data has been deletes + expect(deleteUserDataActivity).toHaveBeenCalled(); + + // the email has been sent + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalled(); + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalledTimes(1); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + expect(updateSubscriptionFeed).toHaveBeenCalledTimes(1); + }); + + it("new processing requests: should set status as FAILED if fails to set the operation as CLOSED", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + setUserDataProcessingStatusActivity.mockImplementationOnce(() => + SetUserDataProcessingStatusActivityResultSuccess.encode({ + kind: "SUCCESS" + }) + ); + + setUserDataProcessingStatusActivity.mockImplementationOnce( + () => "any unsuccessful value" + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalled(); + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(1); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(3); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalled(); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + }); + + it("new processing requests: should delete profile, send email and set status as CLOSED if wait interval expires", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorSuccess.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(2); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "UNLOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + expect(getUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + expect(deleteUserDataActivity).toHaveBeenCalledTimes(1); + + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalled(); + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalledTimes(1); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + expect(updateSubscriptionFeed).toHaveBeenCalledTimes(1); + }); + + it("new processing requests: should not delete profile and set status as CLOSED if abort request comes before wait interval expires", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + // I trick the implementation of Task.any to return the second event, not the first + mockOrchestratorTaskAny.mockImplementationOnce(([, _]) => _); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorSuccess.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + expect(isFailedUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(mockOrchestratorCancelTimer).toHaveBeenCalledTimes(1); + + expect(setUserSessionLockActivity).not.toHaveBeenCalled(); + + // if abort event is sent no WIP status is set, only CLOSED + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(1); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + + expect(getUserDataProcessingActivity).not.toHaveBeenCalled(); + + expect(deleteUserDataActivity).not.toHaveBeenCalled(); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).not.toHaveBeenCalled(); + }); + + it("new processing requests: should wait if there are pending downloads, then delete profile, send email and set status as CLOSED", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + // call 1: it's pending + getUserDataProcessingActivity.mockImplementationOnce(() => + GetUserDataProcessingActivityResultSuccess.encode({ + kind: "SUCCESS", + value: aUserDataDownloadPending + }) + ); + + // call 2: it's wip + getUserDataProcessingActivity.mockImplementationOnce(() => + GetUserDataProcessingActivityResultSuccess.encode({ + kind: "SUCCESS", + value: aUserDataDownloadWip + }) + ); + + // call 3: it's closed (so we can continue with delete) + getUserDataProcessingActivity.mockImplementationOnce(() => + GetUserDataProcessingActivityResultSuccess.encode({ + kind: "SUCCESS", + value: aUserDataDownloadClosed + }) + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorSuccess.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + expect(isFailedUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + // test timers + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(3); + // test that grace period is respected for abort + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + // test that we wait for pending download + expect(context.df.createTimer).toHaveBeenCalledWith( + addHours(context.df.currentUtcDateTime, waitForDownloadInterval) + ); + // test that we wait for wip download + expect(context.df.createTimer).toHaveBeenCalledWith( + addHours(context.df.currentUtcDateTime, waitForDownloadInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(2); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "UNLOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + expect(getUserDataProcessingActivity).toHaveBeenCalledTimes(3); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + expect(deleteUserDataActivity).toHaveBeenCalledTimes(1); + + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalled(); + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalledTimes(1); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + expect(updateSubscriptionFeed).toHaveBeenCalledTimes(1); + }); + + it("new processing requests: should send a confirmation email if the operation succeeded", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorSuccess.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + expect(isFailedUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(2); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "UNLOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + expect(getUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + expect(deleteUserDataActivity).toHaveBeenCalledTimes(1); + + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalled(); + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalledTimes(1); + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ + toAddress: aProfile.email, + fiscalCode: aProcessableUserDataDelete.fiscalCode + }) + ); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + expect(updateSubscriptionFeed).toHaveBeenCalledTimes(1); + }); + + it("new processing requests: should not send a confirmation email if the email is not present", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + getProfileActivity.mockImplementationOnce(() => + GetProfileActivityResultSuccess.encode({ + kind: "SUCCESS", + value: { ...aRetrievedProfile, email: undefined } + }) + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorSuccess.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + expect(isFailedUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(2); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "UNLOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + expect(getUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + expect(deleteUserDataActivity).toHaveBeenCalledTimes(1); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + expect(updateSubscriptionFeed).toHaveBeenCalledTimes(1); + }); + + it("new processing requests: should not send a confirmation email if the email is not enabled", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + getProfileActivity.mockImplementationOnce(() => + GetProfileActivityResultSuccess.encode({ + kind: "SUCCESS", + value: { ...aRetrievedProfile, isEmailEnabled: false } + }) + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorSuccess.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + expect(isFailedUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(2); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "UNLOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + expect(getUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + expect(deleteUserDataActivity).toHaveBeenCalledTimes(1); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + expect(updateSubscriptionFeed).toHaveBeenCalledTimes(1); + }); + + it("new processing requests: should set status as FAILED if subscription feed fails to update (LEGACY Mode)", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + getProfileActivity.mockImplementationOnce(() => + GetProfileActivityResultSuccess.encode({ + kind: "SUCCESS", + value: { + ...aRetrievedProfile, + servicePreferencesSettings: { + ...aRetrievedProfile.servicePreferencesSettings, + mode: ServicesPreferencesModeEnum.LEGACY, + version: -1 + } + } + }) + ); + + updateSubscriptionFeed.mockImplementationOnce(() => "FAILURE"); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + expect(isFailedUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(1); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + expect(getUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + expect(deleteUserDataActivity).toHaveBeenCalledTimes(1); + + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalled(); + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalledTimes(1); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + expect(updateSubscriptionFeed).toHaveBeenCalledTimes(1); + }); + + it("new processing requests: should set status as FAILED if subscription feed fails to update (no LEGACY Mode)", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + updateSubscriptionFeed.mockImplementationOnce(() => "FAILURE"); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + expect(isFailedUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + // test that grace period is respected + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, waitForAbortInterval) + ); + + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(1); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + expect(getUserDataProcessingActivity).toHaveBeenCalledTimes(1); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + expect(deleteUserDataActivity).toHaveBeenCalledTimes(1); + + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalled(); + expect(sendUserDataDeleteEmailActivity).toHaveBeenCalledTimes(1); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + expect(updateSubscriptionFeed).toHaveBeenCalledTimes(1); + }); + + it("failed processing requests: should set status as FAILED if error occurs in checking failed request", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + isFailedUserDataProcessingActivity.mockImplementationOnce(() => + IsFailedUserDataProcessingActivityResultFailure.encode({ + kind: "FAILURE", + reason: "Any reason" + }) + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorFailure.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + expect(getProfileActivity).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ + fiscalCode: aProcessableUserDataDelete.fiscalCode + }) + ); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + + expect(context.df.createTimer).not.toHaveBeenCalled(); + + expect(setUserSessionLockActivity).not.toHaveBeenCalled(); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(1); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.FAILED + }) + ); + + expect(getUserDataProcessingActivity).not.toHaveBeenCalled(); + + expect(deleteUserDataActivity).not.toHaveBeenCalled(); + + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).not.toHaveBeenCalled(); + }); + + it("failed processing requests: should set status as CLOSED without sending email and with a 0 grace period", () => { + mockOrchestratorGetInput.mockReturnValueOnce(aProcessableUserDataDelete); + + isFailedUserDataProcessingActivity.mockImplementationOnce(() => + IsFailedUserDataProcessingActivityResultSuccess.encode({ + kind: "SUCCESS", + value: true + }) + ); + + const result = consumeOrchestrator( + createUserDataDeleteOrchestratorHandler( + waitForAbortInterval, + waitForDownloadInterval + )(context) + ); + + expect(OrchestratorSuccess.decode(result).isRight()).toBe(true); + + expect(getProfileActivity).toHaveBeenCalled(); + expect(getProfileActivity).toHaveBeenCalledTimes(1); + expect(getProfileActivity).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ + fiscalCode: aProcessableUserDataDelete.fiscalCode + }) + ); + + expect(isFailedUserDataProcessingActivity).toHaveBeenCalled(); + + // test that no grace period has been given + expect(context.df.createTimer).toHaveBeenCalled(); + expect(context.df.createTimer).toHaveBeenCalledTimes(1); + expect(context.df.createTimer).toHaveBeenCalledWith( + addDays(context.df.currentUtcDateTime, 0 as Day) + ); // this works because mocked context has the same currentUtcDateTime + + expect(getUserDataProcessingActivity).toHaveBeenCalled(); + + expect(setUserSessionLockActivity).toHaveBeenCalled(); + expect(setUserSessionLockActivity).toHaveBeenCalledTimes(2); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "LOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + expect(setUserSessionLockActivity).toHaveBeenCalledWith( + expect.any(String), + { + action: "UNLOCK", + fiscalCode: aProcessableUserDataDelete.fiscalCode + } + ); + + expect(deleteUserDataActivity).toHaveBeenCalled(); + + // test that no email has been sent + expect(sendUserDataDeleteEmailActivity).not.toHaveBeenCalled(); + + expect(updateSubscriptionFeed).toHaveBeenCalled(); + + expect(setUserDataProcessingStatusActivity).toHaveBeenCalled(); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledTimes(2); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.WIP + }) + ); + expect(setUserDataProcessingStatusActivity).toHaveBeenCalledWith( + expect.any(String), + expectedRetryOptions, + expect.objectContaining({ + nextStatus: UserDataProcessingStatusEnum.CLOSED + }) + ); + }); +}); diff --git a/UserDataDeleteOrchestratorV2/function.json b/UserDataDeleteOrchestratorV2/function.json new file mode 100644 index 00000000..6d7dcbb6 --- /dev/null +++ b/UserDataDeleteOrchestratorV2/function.json @@ -0,0 +1,10 @@ +{ + "bindings": [ + { + "name": "context", + "type": "orchestrationTrigger", + "direction": "in" + } + ], + "scriptFile": "../dist/UserDataDeleteOrchestratorV2/index.js" +} \ No newline at end of file diff --git a/UserDataDeleteOrchestratorV2/handler.ts b/UserDataDeleteOrchestratorV2/handler.ts new file mode 100644 index 00000000..622fd175 --- /dev/null +++ b/UserDataDeleteOrchestratorV2/handler.ts @@ -0,0 +1,672 @@ +import { + IOrchestrationFunctionContext, + Task, + TaskSet, + RetryOptions +} from "durable-functions/lib/src/classes"; +import { fromPredicate, isLeft, toError } from "fp-ts/lib/Either"; +import { toString } from "fp-ts/lib/function"; +import { UserDataProcessingChoiceEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/UserDataProcessingChoice"; +import { UserDataProcessingStatusEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/UserDataProcessingStatus"; +import { RetrievedProfile } from "@pagopa/io-functions-commons/dist/src/models/profile"; +import { UserDataProcessing } from "@pagopa/io-functions-commons/dist/src/models/user_data_processing"; +import * as t from "io-ts"; +import { readableReport } from "@pagopa/ts-commons/lib/reporters"; +import { FiscalCode, NonEmptyString } from "@pagopa/ts-commons/lib/strings"; +import { Day, Hour } from "@pagopa/ts-commons/lib/units"; +import { ServicesPreferencesModeEnum } from "@pagopa/io-functions-commons/dist/generated/definitions/ServicesPreferencesMode"; +import { ServicePreference } from "@pagopa/io-functions-commons/dist/src/models/service_preference"; +import { + ActivityInput as DeleteUserDataActivityInput, + ActivityResultSuccess as DeleteUserDataActivityResultSuccess +} from "../DeleteUserDataActivity/types"; +import { EmailAddress } from "../generated/definitions/EmailAddress"; +import { + ActivityInput as GetProfileActivityInput, + ActivityResultSuccess as GetProfileActivityResultSuccess +} from "../GetProfileActivity/handler"; +import { + ActivityInput as GetUserDataProcessingStatusActivityInput, + ActivityResult as GetUserDataProcessingStatusActivityResult, + ActivityResultNotFoundFailure as GetUserDataProcessingStatusActivityResultNotFoundFailure, + ActivityResultSuccess as GetUserDataProcessingStatusActivityResultSuccess +} from "../GetUserDataProcessingActivity/handler"; +import { + ActivityInput as SendUserDataDeleteEmailActivityInput, + ActivityResultSuccess as SendUserDataDeleteEmailActivityResultSuccess +} from "../SendUserDataDeleteEmailActivity/handler"; + +import { + ActivityResult as GetServicesPreferencesActivityResult, + ActivityResultSuccess as GetServicesPreferencesActivityResultSuccess +} from "../GetServicesPreferencesActivity/handler"; + +import { ActivityResultSuccess as SetUserDataProcessingStatusActivityResultSuccess } from "../SetUserDataProcessingStatusActivity/handler"; +import { + ActivityInput as SetUserSessionLockActivityInput, + ActivityResultSuccess as SetUserSessionLockActivityResultSuccess +} from "../SetUserSessionLockActivity/handler"; + +import { + ActivityInput as IsFailedUserDataProcessingActivityInput, + ActivityResultSuccess as IsFailedUserDataProcessingActivityResultSuccess +} from "../IsFailedUserDataProcessingActivity/handler"; + +import { Input as UpdateServiceSubscriptionFeedActivityInput } from "../UpdateSubscriptionsFeedActivity/handler"; +import { ProcessableUserDataDelete } from "../UserDataProcessingTrigger/handler"; +import { + trackUserDataDeleteEvent, + trackUserDataDeleteException +} from "../utils/appinsightsEvents"; +import { ABORT_EVENT, addDays, addHours } from "./utils"; + +const logPrefix = "UserDataDeleteOrchestrator"; + +const printableError = (error: Error | unknown): string => + error instanceof Error ? error.message : toString(error); + +export type InvalidInputFailure = t.TypeOf; +export const InvalidInputFailure = t.interface({ + kind: t.literal("INVALID_INPUT"), + reason: t.string +}); + +export type UnhanldedFailure = t.TypeOf; +export const UnhanldedFailure = t.interface({ + kind: t.literal("UNHANDLED"), + reason: t.string +}); + +export type ActivityFailure = t.TypeOf; +export const ActivityFailure = t.intersection([ + t.interface({ + activityName: t.string, + kind: t.literal("ACTIVITY"), + reason: t.string + }), + t.partial({ extra: t.object }) +]); + +export type OrchestratorFailure = t.TypeOf; +export const OrchestratorFailure = t.taggedUnion("kind", [ + InvalidInputFailure, + UnhanldedFailure, + ActivityFailure +]); + +export type OrchestratorSuccess = t.TypeOf; +export const OrchestratorSuccess = t.interface({ + kind: t.literal("SUCCESS"), + type: t.keyof({ ABORTED: null, DELETED: null }) +}); + +export type SkippedDocument = t.TypeOf; +export const SkippedDocument = t.interface({ + kind: t.literal("SKIPPED") +}); + +export type OrchestratorResult = t.TypeOf; +export const OrchestratorResult = t.union([ + OrchestratorFailure, + SkippedDocument, + OrchestratorSuccess +]); + +const retryOptions = new RetryOptions(5000, 10); +// eslint-disable-next-line functional/immutable-data +retryOptions.backoffCoefficient = 1.5; + +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +const toActivityFailure = ( + err: { readonly kind: string }, + activityName: string, + // eslint-disable-next-line @typescript-eslint/ban-types + extra?: object +) => + ActivityFailure.encode({ + activityName, + extra, + kind: "ACTIVITY", + reason: err.kind + }); + +function* setUserSessionLock( + context: IOrchestrationFunctionContext, + { action, fiscalCode }: SetUserSessionLockActivityInput +): Generator { + const result = yield context.df.callActivity( + "SetUserSessionLockActivity", + SetUserSessionLockActivityInput.encode({ + action, + fiscalCode + }) + ); + return SetUserSessionLockActivityResultSuccess.decode(result).getOrElseL( + _ => { + context.log.error( + `${logPrefix}|ERROR|SetUserSessionLockActivity fail|${readableReport( + _ + )}` + ); + throw toActivityFailure( + { kind: "SET_USER_SESSION_LOCK" }, + "SetUserSessionLockActivity", + { + action + } + ); + } + ); +} + +function* isFailedUserDataProcessing( + context: IOrchestrationFunctionContext, + currentRecord: UserDataProcessing +): Generator { + const result = yield context.df.callActivityWithRetry( + "IsFailedUserDataProcessingActivity", + retryOptions, + IsFailedUserDataProcessingActivityInput.encode({ + choice: currentRecord.choice, + fiscalCode: currentRecord.fiscalCode + }) + ); + return IsFailedUserDataProcessingActivityResultSuccess.decode( + result + ).getOrElseL(_ => { + throw toActivityFailure( + { kind: "IS_FAILED_USER_DATA_PROCESSING_ACTIVITY_RESULT" }, + "IsFailedUserDataProcessingActivity" + ); + }).value; +} + +function* setUserDataProcessingStatus( + context: IOrchestrationFunctionContext, + currentRecord: UserDataProcessing, + nextStatus: UserDataProcessingStatusEnum +): Generator { + const result = yield context.df.callActivityWithRetry( + "SetUserDataProcessingStatusActivity", + retryOptions, + { + currentRecord, + nextStatus + } + ); + return SetUserDataProcessingStatusActivityResultSuccess.decode( + result + ).getOrElseL(_ => { + throw toActivityFailure( + { kind: "SET_USER_DATA_PROCESSING_STATUS_ACTIVITY_RESULT" }, + "SetUserDataProcessingStatusActivity", + { + status: nextStatus + } + ); + }); +} + +function* hasPendingDownload( + context: IOrchestrationFunctionContext, + fiscalCode: FiscalCode +): Generator { + const result = yield context.df.callActivity( + "GetUserDataProcessingActivity", + GetUserDataProcessingStatusActivityInput.encode({ + choice: UserDataProcessingChoiceEnum.DOWNLOAD, + fiscalCode + }) + ); + + return GetUserDataProcessingStatusActivityResult.decode(result).fold( + _ => { + throw toActivityFailure( + { kind: "GET_USER_DATA_PROCESSING_ACTIVITY_RESULT" }, + "GetUserDataProcessingActivity" + ); + }, // check if + response => { + if (GetUserDataProcessingStatusActivityResultSuccess.is(response)) { + return [ + UserDataProcessingStatusEnum.PENDING, + UserDataProcessingStatusEnum.WIP + ].includes(response.value.status); + } else if ( + GetUserDataProcessingStatusActivityResultNotFoundFailure.is(response) + ) { + return false; + } + + throw toActivityFailure(response, "GetUserDataProcessingActivity"); + } + ); +} + +function* deleteUserData( + context: IOrchestrationFunctionContext, + currentRecord: UserDataProcessing +): Generator { + const backupFolder = `${ + currentRecord.userDataProcessingId + }-${context.df.currentUtcDateTime.getTime()}` as NonEmptyString; + const result = yield context.df.callActivity( + "DeleteUserDataActivity", + DeleteUserDataActivityInput.encode({ + backupFolder, + fiscalCode: currentRecord.fiscalCode + }) + ); + return DeleteUserDataActivityResultSuccess.decode(result).getOrElseL(_ => { + context.log.error( + `${logPrefix}|ERROR|DeleteUserDataActivity fail`, + result, + readableReport(_) + ); + throw toActivityFailure( + { kind: "DELETE_USER_DATA" }, + "DeleteUserDataActivity" + ); + }); +} + +function* sendUserDataDeleteEmail( + context: IOrchestrationFunctionContext, + toAddress: EmailAddress, + fiscalCode: FiscalCode +): Generator { + const result = yield context.df.callActivity( + "SendUserDataDeleteEmailActivity", + SendUserDataDeleteEmailActivityInput.encode({ + fiscalCode, + toAddress + }) + ); + return SendUserDataDeleteEmailActivityResultSuccess.decode(result).getOrElseL( + _ => { + context.log.error( + `${logPrefix}|ERROR|SendUserDataDeleteEmailActivity fail|${readableReport( + _ + )}` + ); + throw toActivityFailure( + { kind: "SEND_USER_DELETE_EMAIL_ACTIVITY_RESULT" }, + "SendUserDataDeleteEmailActivity" + ); + } + ); +} + +function* getProfile( + context: IOrchestrationFunctionContext, + fiscalCode: FiscalCode +): Generator { + const result = yield context.df.callActivity( + "GetProfileActivity", + GetProfileActivityInput.encode({ + fiscalCode + }) + ); + return GetProfileActivityResultSuccess.decode(result).getOrElseL(_ => { + context.log.error( + `${logPrefix}|ERROR|GetProfileActivity fail|${readableReport( + _ + )}|result=${JSON.stringify(result)}` + ); + throw toActivityFailure( + { kind: "GET_PROFILE_ACTIVITY_RESULT" }, + "GetProfileActivity" + ); + }).value; +} + +function* updateSubscriptionFeed( + context: IOrchestrationFunctionContext, + { fiscalCode, version, servicePreferencesSettings }: RetrievedProfile, + servicesPreferences: ReadonlyArray +): Generator { + const commonInput = { + fiscalCode, + operation: "UNSUBSCRIBED" as const, + subscriptionKind: "PROFILE" as const, + updatedAt: context.df.currentUtcDateTime.getTime(), + version + }; + + // eslint-disable-next-line functional/no-let + let result; + + if (servicePreferencesSettings.mode !== ServicesPreferencesModeEnum.LEGACY) { + context.log.verbose( + `${logPrefix}|VERBOSE|Executing updateSubscriptionFeed - NO LEGACY MODE` + ); + + const input = UpdateServiceSubscriptionFeedActivityInput.encode({ + ...commonInput, + previousPreferences: servicesPreferences + }); + + result = yield context.df.callActivityWithRetry( + "UpdateSubscriptionsFeedActivity", + retryOptions, + input + ); + } else { + context.log.verbose( + `${logPrefix}|VERBOSE|Executing updateSubscriptionFeed - LEGACY MODE` + ); + + const input = UpdateServiceSubscriptionFeedActivityInput.encode( + commonInput + ); + result = yield context.df.callActivityWithRetry( + "UpdateSubscriptionsFeedActivity", + retryOptions, + input + ); + } + + if (result === "FAILURE") { + context.log.error( + `${logPrefix}|ERROR|UpdateSubscriptionsFeedActivity fail` + ); + throw toActivityFailure( + { kind: "UPDATE_SUBSCRIPTIONS_FEED" }, + "UpdateSubscriptionsFeedActivity" + ); + } + + return "SUCCESS"; +} + +/** + * + * @param context + * @param param1 + * @returns + */ +function* getServicesPreferences( + context: IOrchestrationFunctionContext, + { fiscalCode, servicePreferencesSettings }: RetrievedProfile +): Generator> { + // eslint-disable-next-line functional/no-let + let result: ReadonlyArray; + + if (servicePreferencesSettings.mode !== ServicesPreferencesModeEnum.LEGACY) { + context.log.verbose( + `${logPrefix}|VERBOSE|Executing getServicesPreferences - NO LEGACY MODE` + ); + + const activityResult = yield context.df.callActivityWithRetry( + "GetServicesPreferencesActivity", + retryOptions, + { + fiscalCode, + settingsVersion: servicePreferencesSettings.version + } + ); + + result = GetServicesPreferencesActivityResult.decode(activityResult) + .mapLeft(_ => new Error(readableReport(_))) + .chain( + fromPredicate( + (_): _ is GetServicesPreferencesActivityResultSuccess => + _.kind === "SUCCESS", + _ => new Error(_.kind) + ) + ) + .fold( + err => { + // Invalid Activity input. The orchestration fail + context.log.error( + `${logPrefix}|GetServicesPreferencesActivity|ERROR=${err.message}` + ); + throw err; + }, + _ => _.preferences + ); + } else { + context.log.verbose( + `${logPrefix}|VERBOSE|Executing getServicesPreferences - LEGACY MODE` + ); + + result = []; + } + + return result; +} + +/** + * Create a handler for the orchestrator + * + * @param waitForAbortInterval Indicates how many days the request must be left pending, waiting for an eventual abort request + * @param waitForDownloadInterval Indicates how many hours the request must be postponed in case a download request is being processing meanwhile + */ +// eslint-disable-next-line max-lines-per-function +export const createUserDataDeleteOrchestratorHandler = ( + waitForAbortInterval: Day, + waitForDownloadInterval: Hour = 12 as Hour +) => + // eslint-disable-next-line max-lines-per-function + function*(context: IOrchestrationFunctionContext): Generator { + const document = context.df.getInput(); + // This check has been done on the trigger, so it should never fail. + // However, it's worth the effort to check it twice + const invalidInputOrCurrentUserDataProcessing = ProcessableUserDataDelete.decode( + document + ).mapLeft(err => { + context.log.error( + `${logPrefix}|WARN|Cannot decode ProcessableUserDataDelete document: ${readableReport( + err + )}` + ); + return InvalidInputFailure.encode({ + kind: "INVALID_INPUT", + reason: readableReport(err) + }); + }); + + if (isLeft(invalidInputOrCurrentUserDataProcessing)) { + return invalidInputOrCurrentUserDataProcessing.value; + } + + const currentUserDataProcessing = + invalidInputOrCurrentUserDataProcessing.value; + + context.log.verbose( + `${logPrefix}|VERBOSE|Executing delete`, + currentUserDataProcessing + ); + + try { + // retrieve user profile + const profile = yield* getProfile( + context, + currentUserDataProcessing.fiscalCode + ); + + // retrieve last services preferences before deleting them + const servicesPreferences = yield* getServicesPreferences( + context, + profile + ); + + // if profile exists, we check if this is a failed processing request because failed requests + // are managed without waiting any abort event and without sending any email + const isFailedUserDataProcessingRequest = yield* isFailedUserDataProcessing( + context, + currentUserDataProcessing + ); + + context.log.verbose( + `${logPrefix}|VERBOSE|isFailedUserDataProcessingRequest=${isFailedUserDataProcessingRequest}` + ); + + // we calculate the grace period: if this is a failed request => 0 days + const gracePeriod = isFailedUserDataProcessingRequest + ? (0 as Day) + : waitForAbortInterval; + + // we have an interval on which we wait for eventual cancellation by the user + const intervalExpiredEvent = context.df.createTimer( + addDays(context.df.currentUtcDateTime, gracePeriod) + ); + + // we wait for eventually abort message from the user + const canceledRequestEvent = context.df.waitForExternalEvent(ABORT_EVENT); + + context.log.verbose( + `${logPrefix}|VERBOSE|Operation stopped for ${gracePeriod} days` + ); + + trackUserDataDeleteEvent("paused", currentUserDataProcessing); + + // the first that get triggered + const triggeredEvent = yield context.df.Task.any([ + intervalExpiredEvent, + canceledRequestEvent + ]); + + if (triggeredEvent === intervalExpiredEvent) { + context.log.verbose( + `${logPrefix}|VERBOSE|Operation resumed after ${gracePeriod} days` + ); + + // lock user session + yield* setUserSessionLock(context, { + action: "LOCK", + fiscalCode: currentUserDataProcessing.fiscalCode + }); + + // set as wip + yield* setUserDataProcessingStatus( + context, + currentUserDataProcessing, + UserDataProcessingStatusEnum.WIP + ); + + // If there's a working download request, we postpone delete of one day + while ( + yield* hasPendingDownload( + context, + currentUserDataProcessing.fiscalCode + ) + ) { + // we wait some more time for the download process to end + context.log.verbose( + `${logPrefix}|VERBOSE|Found an active DOWNLOAD procedure, wait for ${waitForDownloadInterval} hours` + ); + const waitForDownloadEvent = context.df.createTimer( + addHours(context.df.currentUtcDateTime, waitForDownloadInterval) + ); + trackUserDataDeleteEvent("postponed", currentUserDataProcessing); + yield waitForDownloadEvent; + } + + // eslint-disable-next-line extra-rules/no-commented-out-code + // backup&delete data + yield* deleteUserData(context, currentUserDataProcessing); + + // we need user email to send email + if ( + profile.email && + profile.isEmailValidated && + profile.isEmailEnabled && + !isFailedUserDataProcessingRequest + ) { + // send confirm email + yield* sendUserDataDeleteEmail( + context, + profile.email, + profile.fiscalCode + ); + } + + // update subscription feed + yield* updateSubscriptionFeed(context, profile, servicesPreferences); + + // set as closed + yield* setUserDataProcessingStatus( + context, + currentUserDataProcessing, + UserDataProcessingStatusEnum.CLOSED + ); + + // unlock user + yield* setUserSessionLock(context, { + action: "UNLOCK", + fiscalCode: currentUserDataProcessing.fiscalCode + }); + + trackUserDataDeleteEvent("deleted", currentUserDataProcessing); + return OrchestratorSuccess.encode({ kind: "SUCCESS", type: "DELETED" }); + } else { + // stop the timer to let the orchestrator end + intervalExpiredEvent.cancel(); + + context.log.verbose( + `${logPrefix}|VERBOSE|Operation resumed because of abort event` + ); + + // set as closed + yield* setUserDataProcessingStatus( + context, + currentUserDataProcessing, + UserDataProcessingStatusEnum.CLOSED + ); + + trackUserDataDeleteEvent("aborted", currentUserDataProcessing); + return OrchestratorSuccess.encode({ kind: "SUCCESS", type: "ABORTED" }); + } + } catch (error) { + context.log.error( + `${logPrefix}|ERROR|Failed processing user data for delete: ${printableError( + error + )}` + ); + + trackUserDataDeleteException( + "failed", + toError(error), + currentUserDataProcessing + ); + + const orchestrationFailure = OrchestratorFailure.decode(error).getOrElse( + UnhanldedFailure.encode({ + kind: "UNHANDLED", + reason: printableError(error) + }) + ); + + const failureReason = `${orchestrationFailure.kind}${ + orchestrationFailure.kind === "ACTIVITY" + ? `(${orchestrationFailure.activityName})` + : "" + }|${orchestrationFailure.reason}`; + + SetUserDataProcessingStatusActivityResultSuccess.decode( + yield context.df.callActivityWithRetry( + "SetUserDataProcessingStatusActivity", + retryOptions, + { + currentRecord: currentUserDataProcessing, + failureReason, + nextStatus: UserDataProcessingStatusEnum.FAILED + } + ) + ).getOrElseL(err => { + trackUserDataDeleteException( + "unhandled_failed_status", + new Error(readableReport(err)), + currentUserDataProcessing + ); + throw new Error( + `Activity SetUserDataProcessingStatusActivity (status=FAILED) failed: ${readableReport( + err + )}` + ); + }); + + return orchestrationFailure; + } + }; diff --git a/UserDataDeleteOrchestratorV2/index.ts b/UserDataDeleteOrchestratorV2/index.ts new file mode 100644 index 00000000..a789f2f7 --- /dev/null +++ b/UserDataDeleteOrchestratorV2/index.ts @@ -0,0 +1,14 @@ +import * as df from "durable-functions"; +import { Day } from "@pagopa/ts-commons/lib/units"; +import { getConfigOrThrow } from "../utils/config"; +import { createUserDataDeleteOrchestratorHandler } from "./handler"; + +const config = getConfigOrThrow(); + +const waitInterval = (config.USER_DATA_DELETE_DELAY_DAYS as unknown) as Day; + +const orchestrator = df.orchestrator( + createUserDataDeleteOrchestratorHandler(waitInterval) +); + +export default orchestrator; diff --git a/UserDataDeleteOrchestratorV2/utils.ts b/UserDataDeleteOrchestratorV2/utils.ts new file mode 100644 index 00000000..181aa7f7 --- /dev/null +++ b/UserDataDeleteOrchestratorV2/utils.ts @@ -0,0 +1,17 @@ +import { FiscalCode } from "@pagopa/ts-commons/lib/strings"; +import { Day, Hour } from "@pagopa/ts-commons/lib/units"; + +export const ABORT_EVENT = "user-data-processing-delete-abort"; + +export const makeOrchestratorId = (fiscalCode: FiscalCode): string => + `${fiscalCode}-USER-DATA-DELETE`; + +const aHourInMilliseconds = 60 * 60 * 1000; +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export const addHours = (now: Date, hours: Hour) => + new Date(now.getTime() + hours * aHourInMilliseconds); + +const aDayInMilliseconds = 24 * aHourInMilliseconds; +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export const addDays = (now: Date, days: Day) => + new Date(now.getTime() + days * aDayInMilliseconds); diff --git a/UserDataProcessingTrigger/handler.ts b/UserDataProcessingTrigger/handler.ts index 85701367..8e2dee8a 100644 --- a/UserDataProcessingTrigger/handler.ts +++ b/UserDataProcessingTrigger/handler.ts @@ -102,7 +102,7 @@ const startOrchestrator = async ( dfClient: DurableOrchestrationClient, orchestratorName: | "UserDataDownloadOrchestrator" - | "UserDataDeleteOrchestrator", + | "UserDataDeleteOrchestratorV2", orchestratorId: string, orchestratorInput: unknown ) => @@ -152,7 +152,7 @@ const startUserDataDeleteOrchestrator = ( const orchestratorId = makeDeleteOrchestratorId(processable.fiscalCode); return startOrchestrator( dfClient, - "UserDataDeleteOrchestrator", + "UserDataDeleteOrchestratorV2", orchestratorId, processable ); diff --git a/utils/cosmosdb.ts b/utils/cosmosdb.ts index 16ac387d..45fc792e 100644 --- a/utils/cosmosdb.ts +++ b/utils/cosmosdb.ts @@ -6,9 +6,12 @@ import { getConfigOrThrow } from "./config"; const config = getConfigOrThrow(); const cosmosDbUri = config.COSMOSDB_URI; +const cosmosDbName = config.COSMOSDB_NAME; const masterKey = config.COSMOSDB_KEY; export const cosmosdbClient = new CosmosClient({ endpoint: cosmosDbUri, key: masterKey }); + +export const cosmosdbInstance = cosmosdbClient.database(cosmosDbName); diff --git a/utils/crypto.ts b/utils/crypto.ts new file mode 100644 index 00000000..53fe3666 --- /dev/null +++ b/utils/crypto.ts @@ -0,0 +1,14 @@ +/** + * Common usages of crypto features + */ + +import * as crypto from "crypto"; + +export const toHash = (s: string): string => + crypto + .createHash("sha256") + .update(s) + .digest("hex"); + +export const randomBytes = (size: number): string => + crypto.randomBytes(size).toString("hex"); diff --git a/utils/subscription_feed.ts b/utils/subscription_feed.ts new file mode 100644 index 00000000..74995488 --- /dev/null +++ b/utils/subscription_feed.ts @@ -0,0 +1,126 @@ +import { Context } from "@azure/functions"; +import { NonEmptyString } from "@pagopa/ts-commons/lib/strings"; +import { TableService, TableUtilities } from "azure-storage"; +import { array } from "fp-ts/lib/Array"; +import { isNone, isSome } from "fp-ts/lib/Option"; +import { taskEither, tryCatch } from "fp-ts/lib/TaskEither"; +import * as t from "io-ts"; +import { deleteTableEntity, insertTableEntity } from "./storage"; + +const eg = TableUtilities.entityGenerator; + +export const SubscriptionFeedEntitySelector = t.interface({ + partitionKey: t.string, + rowKey: t.string +}); +export type SubscriptionFeedEntitySelector = t.TypeOf< + typeof SubscriptionFeedEntitySelector +>; + +/** + * Updates the subscrption status of a user. + * + * User subscribed or unsubscribed events are stored as empty entities in an + * Azure storage table. + * + * The entity key is composed by the day of the event, the service ID (if it's + * a service subscription event), a character that indicates whether it's a + * subscribed (S) or unsubscribed (U) event and the SHA256 hash of the fiscal + * code of the user. + * + * For each day, (optionally) service and user, either the S or the U key exist, + * but not both (it would not make sense). + * + * When the key does not include a service ID, it refers to a profile + * subscription event, meaning the user registered to IO (subscribed) or deleted + * her account (unsubscribed). + * When the key includes the service ID, it refers to a service subscription + * event, meaning the user activated (subscribed) or deactivated (unsubscribed) + * a specific service. + */ +export const updateSubscriptionStatus = ( + tableService: TableService, + tableName: NonEmptyString +) => async ( + context: Context, + logPrefix: string, + version: number, + deleteEntity: SubscriptionFeedEntitySelector, + deleteOtherEntities: ReadonlyArray, + insertEntity: SubscriptionFeedEntitySelector, + allowInsertIfDeleted: boolean + // eslint-disable-next-line max-params +): Promise => { + const insertEntityHandler = insertTableEntity(tableService, tableName); + const deleteEntityHandler = deleteTableEntity(tableService, tableName); + // First we try to delete a previous (un)subscriptions operation + // from the subscription feed entries for the current day + const deleteResults = await array + .sequence(taskEither)( + [deleteEntity, ...deleteOtherEntities].map(_ => + tryCatch( + async () => { + // First we try to delete a previous (un)subscriptions operation + // from the subscription feed entries for the current day + context.log.verbose(`${logPrefix}|KEY=${_.rowKey}|Deleting entity`); + const { + e1: maybeError2, + e2: uResponse2 + } = await deleteEntityHandler({ + PartitionKey: eg.String(_.partitionKey), + RowKey: eg.String(_.rowKey) + }); + return { maybeError: maybeError2, uResponse: uResponse2 }; + }, + () => new Error("Error calling the delete entity handler") + ) + ) + ) + .getOrElseL(error => { + throw error; + }) + .run(); + + // If deleteEntity is successful it means the user + // previously made an opposite choice (in the same day). + // Since we're going to expose only the delta for this day, + // and we've just deleted the opposite operation, we go on here. + if (!allowInsertIfDeleted && isNone(deleteResults[0].maybeError)) { + return true; + } + + if ( + deleteResults.some( + _ => _.maybeError.isSome() && _.uResponse.statusCode !== 404 + ) + ) { + // retry + const errors = new Error( + deleteResults + .map(_ => _.maybeError) + .filter(isSome) + .map(_ => _.value.message) + .join("|") + ); + context.log.error(`${logPrefix}|ERROR=${errors.message}}`); + throw errors; + } + + // If deleteEntity has not found any entry or insert is required, + // we insert the new (un)subscription entry into the feed + context.log.verbose( + `${logPrefix}|KEY=${insertEntity.rowKey}|Inserting entity` + ); + const { e1: resultOrError, e2: sResponse } = await insertEntityHandler({ + PartitionKey: eg.String(insertEntity.partitionKey), + RowKey: eg.String(insertEntity.rowKey), + version: eg.Int32(version) + }); + if (resultOrError.isLeft() && sResponse.statusCode !== 409) { + // retry + context.log.error(`${logPrefix}|ERROR=${resultOrError.value.message}`); + throw resultOrError.value; + } + + return true; +};