Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SELC-5887] Feat: Added send event for FD in user CDC #206

Merged
merged 8 commits into from
Oct 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
import io.quarkus.runtime.Startup;
import io.quarkus.runtime.configuration.ConfigUtils;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.client.EventHubFdRestClient;
import it.pagopa.selfcare.user.client.EventHubRestClient;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.event.mapper.NotificationMapper;
import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository;
import it.pagopa.selfcare.user.model.NotificationUserType;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.TrackEventInput;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -42,6 +46,7 @@
import static it.pagopa.selfcare.user.model.TrackEventInput.toTrackEventInput;
import static it.pagopa.selfcare.user.model.constants.EventsMetric.*;
import static it.pagopa.selfcare.user.model.constants.EventsName.EVENT_USER_CDC_NAME;
import static it.pagopa.selfcare.user.model.constants.EventsName.FD_EVENT_USER_CDC_NAME;
import static java.util.Arrays.asList;

@Startup
Expand All @@ -52,6 +57,10 @@ public class UserInstitutionCdcService {
private static final String COLLECTION_NAME = "userInstitutions";
private static final String OPERATION_NAME = "USER-CDC-UserInfoUpdate";
public static final String USERS_FIELD_LIST_WITHOUT_FISCAL_CODE = "name,familyName,email,workContacts";
private static final String PROD_FD = "prod-fd";
private static final String PROD_FD_GARANTITO = "prod-fd-garantito";
public static final String ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE = "Error during subscribe collection, exception: {} , message: {}";


private final TelemetryClient telemetryClient;

Expand All @@ -72,6 +81,10 @@ public class UserInstitutionCdcService {
@Inject
EventHubRestClient eventHubRestClient;

@RestClient
@Inject
EventHubFdRestClient eventHubFdRestClient;

private final NotificationMapper notificationMapper;


Expand All @@ -81,6 +94,7 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
@ConfigProperty(name = "user-cdc.retry.max-backoff") Integer retryMaxBackOff,
@ConfigProperty(name = "user-cdc.retry") Integer maxRetry,
@ConfigProperty(name = "user-cdc.send-events.watch.enabled") Boolean sendEventsEnabled,
@ConfigProperty(name = "user-cdc.send-events-fd.watch.enabled") Boolean sendFdEventsEnabled,
UserInstitutionRepository userInstitutionRepository,
TelemetryClient telemetryClient,
TableClient tableClient, NotificationMapper notificationMapper) {
Expand All @@ -94,16 +108,16 @@ public UserInstitutionCdcService(ReactiveMongoClient mongoClient,
this.tableClient = tableClient;
this.notificationMapper = notificationMapper;
telemetryClient.getContext().getOperation().setName(OPERATION_NAME);
initOrderStream(sendEventsEnabled);
initOrderStream(sendEventsEnabled, sendFdEventsEnabled);
}

private void initOrderStream(Boolean sendEventsEnabled) {
private void initOrderStream(Boolean sendEventsEnabled, Boolean sendFdEventsEnabled) {
log.info("Starting initOrderStream ... ");

//Retrieve last resumeToken for watching collection at specific operation
String resumeToken = null;

if(!ConfigUtils.getProfiles().contains("test")) {
if (!ConfigUtils.getProfiles().contains("test")) {
try {
TableEntity cdcStartAtEntity = tableClient.getEntity(CDC_START_AT_PARTITION_KEY, CDC_START_AT_ROW_KEY);
if (Objects.nonNull(cdcStartAtEntity))
Expand All @@ -117,7 +131,7 @@ private void initOrderStream(Boolean sendEventsEnabled) {
ReactiveMongoCollection<UserInstitution> dataCollection = getCollection();
ChangeStreamOptions options = new ChangeStreamOptions()
.fullDocument(FullDocument.UPDATE_LOOKUP);
if(Objects.nonNull(resumeToken))
if (Objects.nonNull(resumeToken))
options = options.resumeAfter(BsonDocument.parse(resumeToken));

Bson match = Aggregates.match(Filters.in("operationType", asList("update", "replace", "insert")));
Expand All @@ -128,21 +142,32 @@ private void initOrderStream(Boolean sendEventsEnabled) {
publisher.subscribe().with(
this::consumerUserInstitutionRepositoryEvent,
failure -> {
log.error("Error during subscribe collection, exception: {} , message: {}", failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
Quarkus.asyncExit();
});

if(sendEventsEnabled) {
if (Boolean.TRUE.equals(sendEventsEnabled)) {
publisher.subscribe().with(
this::consumerToSendScUserEvent,
failure -> {
log.error("Error during subscribe collection, exception: {} , message: {}", failure.toString(), failure.getMessage());
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
Quarkus.asyncExit();
});
}

if (Boolean.TRUE.equals(sendFdEventsEnabled)) {
publisher.subscribe().with(
this::consumerToSendUserEventForFD,
failure -> {
log.error(ERROR_DURING_SUBSCRIBE_COLLECTION_EXCEPTION_MESSAGE, failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
flaminiaScarciofolo marked this conversation as resolved.
Show resolved Hide resolved
Quarkus.asyncExit();
});
}


log.info("Completed initOrderStream ... ");
}

Expand Down Expand Up @@ -196,13 +221,13 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu

userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Multi.createFrom().iterable(UserUtils.groupingProductAndReturnMinStateProduct(userInstitutionChanged.getProducts()))
.map(onboardedProduct -> notificationMapper.toUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource))
.onItem().transformToUniAndMerge(userNotificationToSend -> eventHubRestClient.sendMessage(userNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.toUni()
)
.subscribe().with(
Expand All @@ -216,6 +241,47 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu
});
}

public void consumerToSendUserEventForFD(ChangeStreamDocument<UserInstitution> document) {

if (Objects.nonNull(document.getFullDocument()) && Objects.nonNull(document.getDocumentKey())) {
UserInstitution userInstitutionChanged = document.getFullDocument();

log.info("Starting consumerToSendUserEventForFd ... ");

userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Uni.createFrom().item(UserUtils.retrieveFdProductIfItChanged(userInstitutionChanged.getProducts(), List.of(PROD_FD, PROD_FD_GARANTITO)))
.onItem().ifNotNull().transform(onboardedProduct -> notificationMapper.toFdUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource, evaluateType(onboardedProduct)))
.onItem().ifNotNull().transformToUni(fdUserNotificationToSend -> {
log.info("Sending message to EventHubFdRestClient ... ");
return eventHubFdRestClient.sendMessage(fdUserNotificationToSend)
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(fdUserNotificationToSend)), Map.of(FD_EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D)));
}
))
.subscribe().with(
result -> {
log.info("SendFdEvents successfully performed from UserInstitution document having id: {}", document.getDocumentKey().toJson());
telemetryClient.trackEvent(FD_EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(FD_EVENTS_USER_INSTITUTION_SUCCESS, 1D));
},
failure -> {
log.error("Error during SendFdEvents from UserInstitution document having id: {} , message: {}", document.getDocumentKey().toJson(), failure.getMessage());
telemetryClient.trackEvent(FD_EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(FD_EVENTS_USER_INSTITUTION_FAILURE, 1D));
});
}
}

private NotificationUserType evaluateType(OnboardedProduct onboardedProduct) {
return switch (onboardedProduct.getStatus()) {
case ACTIVE -> NotificationUserType.ACTIVE_USER;
case SUSPENDED -> NotificationUserType.SUSPEND_USER;
case DELETED -> NotificationUserType.DELETE_USER;
default -> null;
};
}

private boolean checkIfIsRetryableException(Throwable throwable) {
return throwable instanceof TimeoutException ||
(throwable instanceof ClientWebApplicationException webApplicationException && webApplicationException.getResponse().getStatus() == 429);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package it.pagopa.selfcare.user.event.mapper;

import com.microsoft.applicationinsights.web.dependencies.apachecommons.lang3.StringUtils;
import it.pagopa.selfcare.user.UserUtils;
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.UserNotificationToSend;
import it.pagopa.selfcare.user.model.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
import org.openapi.quarkus.user_registry_json.model.CertifiableFieldResourceOfstring;
import org.openapi.quarkus.user_registry_json.model.UserResource;

import java.util.UUID;
import javax.swing.text.html.Option;
import java.util.*;

@Mapper(componentModel = "cdi", imports = UUID.class)
public interface NotificationMapper {
Expand All @@ -18,13 +20,7 @@ public interface NotificationMapper {
@Mapping(target = "productId", source = "product.productId")
@Mapping(target = "createdAt", source = "product.createdAt")
@Mapping(target = "updatedAt", expression = "java((null == product.getUpdatedAt()) ? product.getCreatedAt() : product.getUpdatedAt())")
@Mapping(target = "user.role", source = "product.role")
@Mapping(target = "user.productRole", source = "product.productRole")
@Mapping(target = "user.relationshipStatus", source = "product.status")
@Mapping(target = "user.userId", source = "userResource.id", ignore = true)
@Mapping(target = "user.name", source = "userResource.name.value")
@Mapping(target = "user.familyName", source = "userResource.familyName.value")
@Mapping(target = "user.email", source = "userResource.email.value")
@Mapping(target = "user", expression = "java(mapUser(userResource, userInstitution.getUserMailUuid(), product))")
@Mapping(target = "id", expression = "java(toUniqueIdNotification(userInstitution, product))")
@Mapping(target = "eventType", expression = "java(it.pagopa.selfcare.user.model.constants.QueueEvent.UPDATE)")
UserNotificationToSend toUserNotificationToSend(UserInstitution userInstitution, OnboardedProduct product, UserResource userResource);
Expand All @@ -33,4 +29,43 @@ public interface NotificationMapper {
default String toUniqueIdNotification(UserInstitution userInstitution, OnboardedProduct product) {
return UserUtils.uniqueIdNotification(userInstitution.getId().toHexString(), product.getProductId(), product.getProductRole());
}

@Mapping(target = "id", expression = "java(toUniqueIdNotification(userInstitutionChanged, product))")
@Mapping(target = "onboardingTokenId", source = "product.tokenId")
@Mapping(target = "product", source = "product.productId")
@Mapping(target = "createdAt", source = "product.createdAt")
@Mapping(target = "updatedAt", expression = "java((null == product.getUpdatedAt()) ? product.getCreatedAt() : product.getUpdatedAt())")
@Mapping(target = "user", expression = "java(mapUserForFD(userResource, product))")
@Mapping(target = "type", source = "type")
FdUserNotificationToSend toFdUserNotificationToSend(UserInstitution userInstitutionChanged, OnboardedProduct product, UserResource userResource, NotificationUserType type);

@Named("mapUserForFD")
default UserToNotify mapUserForFD(UserResource userResource,OnboardedProduct onboardedProduct) {
UserToNotify userToNotify = new UserToNotify();
userToNotify.setUserId(Optional.ofNullable(userResource.getId()).map(UUID::toString).orElse(null));
userToNotify.setRoles(StringUtils.isNotBlank(onboardedProduct.getProductRole()) ? List.of(onboardedProduct.getProductRole()) : Collections.emptyList());
userToNotify.setRole(Optional.ofNullable(onboardedProduct.getRole()).map(Enum::name).orElse(null));
return userToNotify;
}

@Named("mapUser")
default UserToNotify mapUser(UserResource userResource, String userMailUuid, OnboardedProduct onboardedProduct) {
UserToNotify userToNotify = new UserToNotify();
userToNotify.setUserId(Optional.ofNullable(userResource.getId()).map(UUID::toString).orElse(null));
userToNotify.setName(Optional.ofNullable(userResource.getName()).map(CertifiableFieldResourceOfstring::getValue).orElse(null));
userToNotify.setFamilyName(Optional.ofNullable(userResource.getFamilyName()).map(CertifiableFieldResourceOfstring::getValue).orElse(null));
userToNotify.setEmail(Optional.ofNullable(userMailUuid).map(mailUuid -> retrieveMailFromWorkContacts(userResource, mailUuid)).orElse(null));
userToNotify.setProductRole(onboardedProduct.getProductRole());
userToNotify.setRole(Optional.ofNullable(onboardedProduct.getRole()).map(Enum::name).orElse(null));
userToNotify.setRelationshipStatus(onboardedProduct.getStatus());
return userToNotify;
}

default String retrieveMailFromWorkContacts(UserResource userResource, String userMailUuid) {
return Optional.ofNullable(userResource.getWorkContacts())
.flatMap(stringWorkContactResourceMap -> Optional.ofNullable(stringWorkContactResourceMap.get(userMailUuid))
.flatMap(workContactResource -> Optional.ofNullable(workContactResource.getEmail())
.map(CertifiableFieldResourceOfstring::getValue)))
.orElse(null);
}
}
11 changes: 8 additions & 3 deletions apps/user-cdc/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ quarkus.mongodb.connection-string = ${MONGODB-CONNECTION-STRING}
quarkus.mongodb.database = selcUser

#False for pnpg use case because we must not send events
user-cdc.send-events.watch.enabled=${USER_CDC_SEND_EVENTS_WATCH_ENABLED:false}
user-cdc.send-events.watch.enabled=${USER_CDC_SEND_EVENTS_WATCH_ENABLED:true}
user-cdc.send-events-fd.watch.enabled=${USER_CDC_SEND_EVENTS_FD_WATCH_ENABLED:true}
user-cdc.appinsights.connection-string=${APPLICATIONINSIGHTS_CONNECTION_STRING:InstrumentationKey=00000000-0000-0000-0000-000000000000}
user-cdc.table.name=${START_AT_TABLE_NAME:CdCStartAt}
user-cdc.storage.connection-string=${STORAGE_CONNECTION_STRING:UseDevelopmentStorage=true;}
Expand All @@ -23,6 +24,10 @@ quarkus.openapi-generator.codegen.spec.user_registry_json.additional-model-type-
quarkus.openapi-generator.user_registry_json.auth.api_key.api-key = ${USER-REGISTRY-API-KEY:example-api-key}
quarkus.rest-client."org.openapi.quarkus.user_registry_json.api.UserApi".url=${USER_REGISTRY_URL:http://localhost:8080}

quarkus.rest-client.event-hub.url=${EVENT_HUB_BASE_PATH:test}
quarkus.rest-client.event-hub.url=${EVENT_HUB_BASE_PATH:test}${EVENT_HUB_SC_USERS_TOPIC:sc-users}
eventhub.rest-client.keyName=${SHARED_ACCESS_KEY_NAME:test}
eventhub.rest-client.key=${EVENTHUB-SC-USERS-SELFCARE-WO-KEY-LC:test}
eventhub.rest-client.key=${EVENTHUB-SC-USERS-SELFCARE-WO-KEY-LC:test}

quarkus.rest-client.event-hub-fd.url=${EVENT_HUB_BASE_PATH:test}${EVENT_HUB_SELFCARE_FD_TOPIC:selfcare-fd}
eventhubfd.rest-client.keyName=${FD_SHARED_ACCESS_KEY_NAME:test}
eventhubfd.rest-client.key=${EVENTHUB_SELFCARE_FD_EXTERNAL_KEY_LC:test}
Loading
Loading