Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IA-5017] Event pubsub google client code #19

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a75226c
meaningful test error message
Jul 10, 2024
53dceb2
tests
Jul 10, 2024
d5cb7fb
feedback
Jul 11, 2024
9b6b6a4
refactor
Jul 11, 2024
3fb351d
compiling with passing the publisher in
Jul 11, 2024
c5eac08
feedback and fix tests
Jul 12, 2024
854367d
create a topic if in a BEE, verify topic exists if in production
cindy-broadinstitute Jul 16, 2024
da3dbac
added config from feedback, bean wiring not working properly
Jul 24, 2024
7b601aa
fix config
Jul 24, 2024
ec1937a
dependency
Jul 29, 2024
174276c
fix import
Jul 29, 2024
6752920
refactor: into own packages
cpate4 Jul 30, 2024
c7692fd
chore: posting an update of current status
cpate4 Jul 31, 2024
fc639fb
chore: renaming EventClient to AbstractEventTopic
cpate4 Jul 31, 2024
a5fccf9
checkpoint: service integration complete
cpate4 Aug 14, 2024
20a7e21
checkpoint: rewiring publishing
cpate4 Aug 14, 2024
e453252
removing more code
cpate4 Aug 14, 2024
3bd65cb
removing more code
cpate4 Aug 14, 2024
588784f
checkpoint: subscription.v1
cpate4 Aug 16, 2024
743ec31
unit test checkpoint
cpate4 Aug 28, 2024
14af6ae
removing some TODOs
cpate4 Aug 28, 2024
3cb48ae
fixing SONAR issues
cpate4 Aug 28, 2024
922b7c7
adding debug info for tests to build
cpate4 Aug 28, 2024
0795a06
fixing build issues with failing tests
cpate4 Aug 28, 2024
8723a53
turning off debug statements
cpate4 Aug 28, 2024
6fd4a78
fixing more SONAR issues
cpate4 Aug 28, 2024
27adfef
fixing more SONAR issues
cpate4 Aug 28, 2024
11f606f
fixing SONAR issues
cpate4 Aug 28, 2024
fa30376
fixing SONAR issues
cpate4 Aug 28, 2024
3fba859
fixing SONAR issues
cpate4 Aug 28, 2024
9754856
fixing SONAR issues
cpate4 Aug 28, 2024
00ba577
fixing SONAR issues
cpate4 Aug 29, 2024
b8fe011
fixing SONAR issues
cpate4 Aug 29, 2024
dbba427
fixing SONAR issues
cpate4 Aug 29, 2024
bf6284e
more documentation
cpate4 Sep 12, 2024
21af24b
comments
cpate4 Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
id 'com.srcclr.gradle'
id 'org.sonarqube'

id 'com.gorylenko.gradle-git-properties' version '2.3.1'
id 'com.gorylenko.gradle-git-properties' version '2.4.2'
id 'org.liquibase.gradle' version '2.2.2'
id "io.sentry.jvm.gradle" version "4.6.0"
}
Expand All @@ -28,6 +28,12 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'io.micrometer:micrometer-registry-prometheus:1.10.4'

implementation platform('com.google.cloud:libraries-bom:26.42.0')
implementation 'com.google.cloud:google-cloud-pubsub'

implementation platform('org.testcontainers:testcontainers-bom:1.20.1')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed for pubsub emulator and unit testing

testImplementation "org.testcontainers:gcloud"

liquibaseRuntime 'org.liquibase:liquibase-core'
liquibaseRuntime 'info.picocli:picocli:4.6.1'
liquibaseRuntime 'org.postgresql:postgresql'
Expand Down
12 changes: 7 additions & 5 deletions service/src/main/java/bio/terra/appmanager/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,26 @@
"bio.terra.common.retry.transaction",
// Scan for tracing-related components & configs
"bio.terra.common.tracing",
// Event-driven architecture related configs
"bio.terra.common.events",
// Scan all service-specific packages beneath the current package
"bio.terra.appmanager"
})
@ConfigurationPropertiesScan("bio.terra.appmanager")
@ConfigurationPropertiesScan({"bio.terra.common.events", "bio.terra.appmanager"})
@EnableRetry
@EnableTransactionManagement
@EnableConfigurationProperties
public class App {
public static void main(String[] args) {
new SpringApplicationBuilder(App.class).initializers(new LoggingInitializer()).run(args);
}

private final DataSource dataSource;

public App(DataSource dataSource) {
this.dataSource = dataSource;
}

public static void main(String[] args) {
new SpringApplicationBuilder(App.class).initializers(new LoggingInitializer()).run(args);
}

@Bean("objectMapper")
public ObjectMapper objectMapper() {
return new ObjectMapper()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package bio.terra.appmanager.events;

import bio.terra.common.events.client.PubsubClientFactory;
import bio.terra.common.events.config.PubsubConfig;
import bio.terra.common.events.topics.ChartTopic;
import bio.terra.common.events.topics.messages.EventMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;

@Repository
public class ChartEvents extends ChartTopic {

private static final Logger logger = LoggerFactory.getLogger(ChartEvents.class);

public ChartEvents(PubsubConfig config, PubsubClientFactory factory) {
super(config, factory);
subscribe();
}

@Override
public boolean process(EventMessage message) {
logger.info("Received message: {}", message.eventType);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import bio.terra.appmanager.config.ChartServiceConfiguration;
import bio.terra.appmanager.controller.ChartNotFoundException;
import bio.terra.appmanager.dao.ChartDao;
import bio.terra.appmanager.events.ChartEvents;
import bio.terra.appmanager.model.Chart;
import bio.terra.common.db.ReadTransaction;
import bio.terra.common.db.WriteTransaction;
Expand All @@ -16,10 +17,15 @@ public class ChartService {

private final List<String> allowedChartNames;
private final ChartDao chartDao;
private final ChartEvents chartEvents;

public ChartService(ChartServiceConfiguration chartServiceConfiguration, ChartDao chartDao) {
public ChartService(
ChartServiceConfiguration chartServiceConfiguration,
ChartDao chartDao,
ChartEvents chartEvents) {
this.allowedChartNames = chartServiceConfiguration.allowedNames();
this.chartDao = chartDao;
this.chartEvents = chartEvents;
}

/**
Expand All @@ -34,6 +40,7 @@ public void createCharts(@NotNull List<Chart> charts) {
chart -> {
checkChartName(chart);
chartDao.upsert(chart);
chartEvents.chartCreated(chart.name());
});
}

Expand Down Expand Up @@ -63,7 +70,11 @@ public void updateVersions(@NotNull List<Chart> versions) {
+ nonexistentVersions);
}

versions.forEach(chartDao::upsert);
versions.forEach(
version -> {
chartDao.upsert(version);
chartEvents.chartUpdated(version.name());
});
}

/**
Expand All @@ -74,6 +85,7 @@ public void updateVersions(@NotNull List<Chart> versions) {
@WriteTransaction
public void deleteVersion(@NotNull String name) {
chartDao.delete(List.of(name));
chartEvents.chartDeleted(name);
}

/**
Expand All @@ -90,7 +102,7 @@ public List<Chart> getCharts(@NotNull List<String> names, @NotNull Boolean inclu

private void checkChartName(Chart chart) {
if (!allowedChartNames.contains(chart.name())) {
throw new IllegalArgumentException("unrecogrnized chartName provided");
throw new IllegalArgumentException("unrecognized chartName provided");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bio.terra.common.events.client;

@FunctionalInterface
public interface MessageProcessor {
boolean process(String jsonString);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package bio.terra.common.events.client;

import java.io.Closeable;

/**
* The purpose of this class is to represent the client to the cloud-specific pubsub infrastructure
* for a single topic.
*
* <p>To create an instance of this class, please see {@link PubsubClientFactory}
*
* <p>The PubsubClient is responsible for ensuring the following conditions:
*
* <ul>
* <li>the topic exists
* </ul>
*/
public abstract interface PubsubClient extends Closeable {

public abstract void publish(String message);

public abstract void subscribe(MessageProcessor process);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package bio.terra.common.events.client;

import bio.terra.common.events.client.google.GooglePubsubClient;
import bio.terra.common.events.config.PubsubConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.stereotype.Component;

/**
* Based on the various configs that are out there for pubsub, create an instance of the pubsub
* client to be used by a Spring Boot application.
*
* <p>Current clients supported are:
*
* <ul>
* <li>GoogleClient
* </ul>
*
* <p>This class would need to be added to if additional clients become supported (like Azure, AWS,
* etc.)
*/
@Component
public class PubsubClientFactory {

private PubsubConfig pubsubConfig;

public PubsubClientFactory(PubsubConfig config) {
this.pubsubConfig = config;
}

public PubsubClient createPubsubClient(String topicName, String serviceName) {
return new GooglePubsubClient(
pubsubConfig.googleConfig().projectId(),
formatTopicId(topicName),
formatSubscriptionId(serviceName, topicName),
pubsubConfig.createTopic(),
pubsubConfig.connectLocal(),
pubsubConfig.emulatorTarget());
}

private String formatTopicId(String topicName) {
List<String> parts = new ArrayList<>(Arrays.asList("event", topicName));

if (pubsubConfig.nameSuffix() != null) {
parts.add(pubsubConfig.nameSuffix());
}

return String.join("-", parts);
}

private String formatSubscriptionId(String serviceName, String topicName) {
if (serviceName == null) {
return null;
}

List<String> parts = new ArrayList<>(Arrays.asList("subscription", serviceName, topicName));

return String.join("-", parts);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package bio.terra.common.events.client.google;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CreateEventTopicIfNotExist extends EventTopicName {
private static final Logger logger = LoggerFactory.getLogger(CreateEventTopicIfNotExist.class);
private final String projectId;

public CreateEventTopicIfNotExist(
String projectId,
boolean connectLocal,
TransportChannelProvider channelProvider,
CredentialsProvider credentialsProvider) {
super(connectLocal, channelProvider, credentialsProvider);
this.projectId = projectId;
}

/**
* This is called when running on a BEE Verify the topic exists or create the topic if it does not
* exist Then return the TopicName
*
* @param name
* @return TopicName for the Event topic for the environment
*/
@Override
public TopicName verifyTopicName(String name) throws IOException {
try (TopicAdminClient topicAdminClient = buildTopicAdminClient()) {
TopicName topicName = TopicName.of(projectId, name);

try {
topicAdminClient.getTopic(topicName);
} catch (com.google.api.gax.rpc.NotFoundException e) {
// topic not found, create it
topicAdminClient.createTopic(topicName);
}
return topicName;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package bio.terra.common.events.client.google;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import javax.naming.ConfigurationException;

public class EventTopicMustBeAlreadyCreated extends EventTopicName {
private final String projectId;

public EventTopicMustBeAlreadyCreated(
String projectId,
boolean connectLocal,
TransportChannelProvider channelProvider,
CredentialsProvider credentialsProvider) {
super(connectLocal, channelProvider, credentialsProvider);
this.projectId = projectId;
}

/**
* This is called when running in the Production environment Verify the topic exists or generate a
* ConfigurationError then return the TopicName
*
* @param name
* @return TopicName for the Event topic for Production
*/
@Override
public TopicName verifyTopicName(String name) throws ConfigurationException, IOException {
try (TopicAdminClient topicAdminClient = buildTopicAdminClient()) {
TopicName topicName = TopicName.of(projectId, name);
Topic topic = topicAdminClient.getTopic(topicName);
if (topic != null) {
return topicName;
}
throw new ConfigurationException("Error, Event Topic " + topicName + " must exist");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package bio.terra.common.events.client.google;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import javax.naming.ConfigurationException;

public abstract class EventTopicName {

private final boolean connectLocal;

private TransportChannelProvider channelProvider;
private CredentialsProvider credentialsProvider;

protected EventTopicName(
boolean connectLocal,
TransportChannelProvider channelProvider,
CredentialsProvider credentialsProvider) {
this.connectLocal = connectLocal;
// this is optional, and only required if connectLocal is false
this.channelProvider = channelProvider;
this.credentialsProvider = credentialsProvider;
}

abstract TopicName verifyTopicName(String name) throws IOException, ConfigurationException;

/**
* Need to support both the standard building of the client,<br>
* and the building of a client that connects to the Pubsub Emulator.
*
* <p>By default, the Emulator is used for:
*
* <ul>
* <li>local development (started manually)<br>
* local development is established through setting up through shell scripts and exposing
* the associated emulator environment variables.
* <li>testing environment (started by testing infrastructure)<br>
* testing infrastructure is established through the override of the configuration settings
* and extending the BaseEventsTest class
* </ul>
*
* @return
* @throws IOException
*/
public TopicAdminClient buildTopicAdminClient() throws IOException {
TopicAdminSettings.Builder builder = TopicAdminSettings.newBuilder();

if (connectLocal) {

builder
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider);
}
return TopicAdminClient.create(builder.build());
}
}
Loading
Loading