diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/main/java/com/example/PollingReceiverApplication.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/main/java/com/example/PollingReceiverApplication.java index 60053545bb..3f427c87c7 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/main/java/com/example/PollingReceiverApplication.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/main/java/com/example/PollingReceiverApplication.java @@ -23,6 +23,8 @@ import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -39,18 +41,22 @@ */ @SpringBootApplication public class PollingReceiverApplication { - private static final Log LOGGER = LogFactory.getLog(PollingReceiverApplication.class); public static void main(String[] args) { SpringApplication.run(PollingReceiverApplication.class, args); } + @Bean + public String subscriptionName(@Value("${subscriptionName}") String subscriptionName) { + return subscriptionName; + } + @Bean @InboundChannelAdapter(channel = "pubsubInputChannel", poller = @Poller(fixedDelay = "100")) - public MessageSource pubsubAdapter(PubSubTemplate pubSubTemplate) { - PubSubMessageSource messageSource = - new PubSubMessageSource(pubSubTemplate, "exampleSubscription"); + public MessageSource pubsubAdapter( + PubSubTemplate pubSubTemplate, @Qualifier("subscriptionName") String subscriptionName) { + PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, subscriptionName); messageSource.setMaxFetchSize(5); messageSource.setAckMode(AckMode.MANUAL); messageSource.setPayloadType(String.class); diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/main/resources/application.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/main/resources/application.properties new file mode 100644 index 0000000000..5371cd70be --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/main/resources/application.properties @@ -0,0 +1 @@ +subscriptionName=exampleSubscription \ No newline at end of file diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/java/com/example/PollingReceiverIntegrationTest.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/java/com/example/PollingReceiverIntegrationTest.java index f696ee577f..d619ef44fb 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/java/com/example/PollingReceiverIntegrationTest.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/java/com/example/PollingReceiverIntegrationTest.java @@ -18,10 +18,20 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.spring.pubsub.core.PubSubTemplate; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.ExtendWith; @@ -30,6 +40,7 @@ import org.springframework.boot.test.system.CapturedOutput; import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; /** @@ -41,22 +52,61 @@ @EnabledIfSystemProperty(named = "it.pubsub-integration", matches = "true") @ExtendWith(SpringExtension.class) @ExtendWith(OutputCaptureExtension.class) -@SpringBootTest +@SpringBootTest( + properties = {"spring.main.allow-bean-definition-overriding=true"}, + classes = {PollingReceiverIntegrationTestConfiguration.class}) +@TestPropertySource(locations = "classpath:application-test.properties") @DirtiesContext class PollingReceiverIntegrationTest { + private static final String PROJECT_NAME = + ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + @Autowired private Topic testTopic; + @Autowired private Subscription testSubscription; @Autowired private PubSubTemplate pubSubTemplate; + @Autowired private TopicAdminClient topicAdminClient; + @Autowired private SubscriptionAdminClient subscriptionAdminClient; @Test - void testSample(CapturedOutput capturedOutput) throws Exception { + void testSample(CapturedOutput capturedOutput) { String message = "test message " + UUID.randomUUID(); String expectedString = "Message arrived by Synchronous Pull! Payload: " + message; - this.pubSubTemplate.publish("exampleTopic", message); + this.pubSubTemplate.publish(testTopic.getName(), message); Awaitility.await() - .atMost(60, TimeUnit.SECONDS) + .atMost(180, TimeUnit.SECONDS) .until(() -> capturedOutput.toString().contains(expectedString)); assertThat(capturedOutput.toString()).contains(expectedString); } + + @AfterEach + void cleanUp() { + List projectTopics = fetchTopicNamesFromProject(); + String topicName = testTopic.getName(); + if (projectTopics.contains(topicName)) { + this.topicAdminClient.deleteTopic(topicName); + } + List projectSubscriptions = fetchSubscriptionNamesFromProject(); + String subscriptionName = testSubscription.getName(); + if (projectSubscriptions.contains(subscriptionName)) { + this.subscriptionAdminClient.deleteSubscription(subscriptionName); + } + } + + private List fetchTopicNamesFromProject() { + TopicAdminClient.ListTopicsPagedResponse listTopicsResponse = + topicAdminClient.listTopics("projects/" + PROJECT_NAME); + return StreamSupport.stream(listTopicsResponse.iterateAll().spliterator(), false) + .map(Topic::getName) + .collect(Collectors.toList()); + } + + private List fetchSubscriptionNamesFromProject() { + SubscriptionAdminClient.ListSubscriptionsPagedResponse response = + subscriptionAdminClient.listSubscriptions("projects/" + PROJECT_NAME); + return StreamSupport.stream(response.iterateAll().spliterator(), false) + .map(Subscription::getName) + .collect(Collectors.toList()); + } } diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/java/com/example/PollingReceiverIntegrationTestConfiguration.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/java/com/example/PollingReceiverIntegrationTestConfiguration.java new file mode 100644 index 0000000000..bb6caefd0b --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/java/com/example/PollingReceiverIntegrationTestConfiguration.java @@ -0,0 +1,65 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +@TestConfiguration +public class PollingReceiverIntegrationTestConfiguration { + + private String topicName; + private TopicAdminClient topicAdminClient; + + private SubscriptionAdminClient subscriptionAdminClient; + + public PollingReceiverIntegrationTestConfiguration( + TopicAdminClient topicAdminClient, + SubscriptionAdminClient subscriptionAdminClient, + @Value("${topicName}") String topicName) { + this.topicAdminClient = topicAdminClient; + this.subscriptionAdminClient = subscriptionAdminClient; + this.topicName = topicName; + } + + @Bean + public Topic createTopic() { + String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + return topicAdminClient.createTopic(TopicName.of(projectName, this.topicName)); + } + + @Bean + public Subscription createSubscription(@Qualifier("subscriptionName") String subscriptionName) { + String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + return subscriptionAdminClient.createSubscription( + SubscriptionName.of(projectName, subscriptionName), + TopicName.of(projectName, this.topicName), + PushConfig.getDefaultInstance(), + 10); + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/resources/application-test.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/resources/application-test.properties new file mode 100644 index 0000000000..472d318f37 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-polling-receiver/src/test/resources/application-test.properties @@ -0,0 +1,2 @@ +topicName=topic-name-${random.uuid} +subscriptionName=subscription-name-${random.uuid} \ No newline at end of file diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/main/java/com/example/ReceiverApplication.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/main/java/com/example/ReceiverApplication.java index 1b2dd56b0a..81e31a02be 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/main/java/com/example/ReceiverApplication.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/main/java/com/example/ReceiverApplication.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -49,16 +50,24 @@ public static void main(String[] args) throws IOException { System.in.read(); } + @Bean + public String subscriptionName(@Value("${subscriptionName}") String subscriptionName) { + return subscriptionName; + } + @Bean public MessageChannel pubsubInputChannel() { return new DirectChannel(); } + @Bean public PubSubInboundChannelAdapter messageChannelAdapter( - @Qualifier("pubsubInputChannel") MessageChannel inputChannel, PubSubTemplate pubSubTemplate) { + @Qualifier("pubsubInputChannel") MessageChannel inputChannel, + PubSubTemplate pubSubTemplate, + @Qualifier("subscriptionName") String subscriptionName) { PubSubInboundChannelAdapter adapter = - new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription"); + new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionName); adapter.setOutputChannel(inputChannel); adapter.setAckMode(AckMode.MANUAL); adapter.setPayloadType(String.class); diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/main/resources/application.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/main/resources/application.properties index 8b13789179..b3d5ccb8bb 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/main/resources/application.properties +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/main/resources/application.properties @@ -1 +1 @@ - +subscriptionName=exampleSubscription diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/java/com/example/ReceiverIntegrationTest.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/java/com/example/ReceiverIntegrationTest.java index e3eeeb36ba..9d7215ee8f 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/java/com/example/ReceiverIntegrationTest.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/java/com/example/ReceiverIntegrationTest.java @@ -18,10 +18,20 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.spring.pubsub.core.PubSubTemplate; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.ExtendWith; @@ -30,6 +40,7 @@ import org.springframework.boot.test.system.CapturedOutput; import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; /** @@ -40,22 +51,61 @@ @EnabledIfSystemProperty(named = "it.pubsub-integration", matches = "true") @ExtendWith(SpringExtension.class) @ExtendWith(OutputCaptureExtension.class) -@SpringBootTest +@SpringBootTest( + properties = {"spring.main.allow-bean-definition-overriding=true"}, + classes = {ReceiverTestConfiguration.class}) +@TestPropertySource(locations = "classpath:application-test.properties") @DirtiesContext class ReceiverIntegrationTest { + private static final String PROJECT_NAME = + ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + @Autowired Subscription testSubscription; @Autowired private PubSubTemplate pubSubTemplate; + @Autowired private Topic testTopic; + @Autowired private TopicAdminClient topicAdminClient; + @Autowired private SubscriptionAdminClient subscriptionAdminClient; @Test - void testSample(CapturedOutput capturedOutput) throws Exception { + void testSample(CapturedOutput capturedOutput) { String message = "test message " + UUID.randomUUID(); String expectedString = "Message arrived! Payload: " + message; - this.pubSubTemplate.publish("exampleTopic", message); + this.pubSubTemplate.publish(testTopic.getName(), message); Awaitility.await() .atMost(60, TimeUnit.SECONDS) .until(() -> capturedOutput.toString().contains(expectedString)); assertThat(capturedOutput.toString()).contains(expectedString); } + + @AfterEach + void cleanUp() { + List projectTopics = fetchTopicNamesFromProject(); + String topicName = testTopic.getName(); + if (projectTopics.contains(topicName)) { + this.topicAdminClient.deleteTopic(topicName); + } + List projectSubscriptions = fetchSubscriptionNamesFromProject(); + String subscriptionName = testSubscription.getName(); + if (projectSubscriptions.contains(subscriptionName)) { + this.subscriptionAdminClient.deleteSubscription(subscriptionName); + } + } + + private List fetchTopicNamesFromProject() { + TopicAdminClient.ListTopicsPagedResponse listTopicsResponse = + topicAdminClient.listTopics("projects/" + PROJECT_NAME); + return StreamSupport.stream(listTopicsResponse.iterateAll().spliterator(), false) + .map(Topic::getName) + .collect(Collectors.toList()); + } + + private List fetchSubscriptionNamesFromProject() { + SubscriptionAdminClient.ListSubscriptionsPagedResponse response = + subscriptionAdminClient.listSubscriptions("projects/" + PROJECT_NAME); + return StreamSupport.stream(response.iterateAll().spliterator(), false) + .map(Subscription::getName) + .collect(Collectors.toList()); + } } diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/java/com/example/ReceiverTestConfiguration.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/java/com/example/ReceiverTestConfiguration.java new file mode 100644 index 0000000000..7e46be2bea --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/java/com/example/ReceiverTestConfiguration.java @@ -0,0 +1,65 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +@TestConfiguration +public class ReceiverTestConfiguration { + + private String topicName; + private TopicAdminClient topicAdminClient; + + private SubscriptionAdminClient subscriptionAdminClient; + + public ReceiverTestConfiguration( + TopicAdminClient topicAdminClient, + SubscriptionAdminClient subscriptionAdminClient, + @Value("${topicName}") String topicName) { + this.topicAdminClient = topicAdminClient; + this.subscriptionAdminClient = subscriptionAdminClient; + this.topicName = topicName; + } + + @Bean + public Topic createTopic() { + String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + return topicAdminClient.createTopic(TopicName.of(projectName, this.topicName)); + } + + @Bean + public Subscription createSubscription(@Qualifier("subscriptionName") String subscriptionName) { + String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + return subscriptionAdminClient.createSubscription( + SubscriptionName.of(projectName, subscriptionName), + TopicName.of(projectName, this.topicName), + PushConfig.getDefaultInstance(), + 10); + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/resources/application-test.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/resources/application-test.properties new file mode 100644 index 0000000000..5cf65095c1 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-receiver/src/test/resources/application-test.properties @@ -0,0 +1,2 @@ +topicName=pubsub-spring-integration-sample-receiver-exampleTopic-${random.uuid} +subscriptionName=pubsub-spring-integration-sample-receiver-exampleSubscription-${random.uuid} \ No newline at end of file diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/main/java/com/example/SenderApplication.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/main/java/com/example/SenderApplication.java index d1076c46a8..44e9a58a37 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/main/java/com/example/SenderApplication.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/main/java/com/example/SenderApplication.java @@ -20,6 +20,8 @@ import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -37,10 +39,16 @@ public static void main(String[] args) { SpringApplication.run(SenderApplication.class, args); } + @Bean + public String topicName(@Value("${topicName}") String topicName) { + return topicName; + } + @Bean @ServiceActivator(inputChannel = "pubSubOutputChannel") - public MessageHandler messageSender(PubSubTemplate pubsubTemplate) { - PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "exampleTopic"); + public MessageHandler messageSender( + PubSubTemplate pubsubTemplate, @Qualifier("topicName") String topicName) { + PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, topicName); adapter.setFailureCallback( (exception, message) -> LOGGER.info("There was an error sending the message: " + message.getPayload())); diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/main/resources/application.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/main/resources/application.properties index 8b13789179..78b31cdb2a 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/main/resources/application.properties +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/main/resources/application.properties @@ -1 +1 @@ - +topicName=exampleTopic diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/java/com/example/SenderIntegrationTest.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/java/com/example/SenderIntegrationTest.java index 27e7baac59..f4797a2bd3 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/java/com/example/SenderIntegrationTest.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/java/com/example/SenderIntegrationTest.java @@ -18,11 +18,20 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.spring.pubsub.core.PubSubTemplate; import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage; import com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.ExtendWith; @@ -30,6 +39,7 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -41,13 +51,22 @@ */ @EnabledIfSystemProperty(named = "it.pubsub-integration", matches = "true") @ExtendWith(SpringExtension.class) -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = {"spring.main.allow-bean-definition-overriding=true"}, + classes = {SenderTestConfiguration.class}) +@TestPropertySource(locations = "classpath:application-test.properties") @DirtiesContext class SenderIntegrationTest { + private static final String PROJECT_NAME = + ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); @Autowired private TestRestTemplate restTemplate; - @Autowired private PubSubTemplate pubSubTemplate; + @Autowired private Topic testTopic; + @Autowired private Subscription testSubscription; + @Autowired private TopicAdminClient topicAdminClient; + @Autowired private SubscriptionAdminClient subscriptionAdminClient; @Test void testSample() throws Exception { @@ -63,7 +82,7 @@ void testSample() throws Exception { boolean messageReceived = false; for (int i = 0; i < 100; i++) { - messages = this.pubSubTemplate.pull("exampleSubscription", 10, true); + messages = this.pubSubTemplate.pull(testSubscription.getName(), 10, true); messages.forEach(BasicAcknowledgeablePubsubMessage::ack); if (messages.stream() @@ -75,4 +94,34 @@ void testSample() throws Exception { } assertThat(messageReceived).isTrue(); } + + @AfterEach + void cleanUp() { + List projectTopics = fetchTopicNamesFromProject(); + String topicName = testTopic.getName(); + if (projectTopics.contains(topicName)) { + this.topicAdminClient.deleteTopic(topicName); + } + List projectSubscriptions = fetchSubscriptionNamesFromProject(); + String subscriptionName = testSubscription.getName(); + if (projectSubscriptions.contains(subscriptionName)) { + this.subscriptionAdminClient.deleteSubscription(subscriptionName); + } + } + + private List fetchTopicNamesFromProject() { + TopicAdminClient.ListTopicsPagedResponse listTopicsResponse = + topicAdminClient.listTopics("projects/" + PROJECT_NAME); + return StreamSupport.stream(listTopicsResponse.iterateAll().spliterator(), false) + .map(Topic::getName) + .collect(Collectors.toList()); + } + + private List fetchSubscriptionNamesFromProject() { + SubscriptionAdminClient.ListSubscriptionsPagedResponse response = + subscriptionAdminClient.listSubscriptions("projects/" + PROJECT_NAME); + return StreamSupport.stream(response.iterateAll().spliterator(), false) + .map(Subscription::getName) + .collect(Collectors.toList()); + } } diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/java/com/example/SenderTestConfiguration.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/java/com/example/SenderTestConfiguration.java new file mode 100644 index 0000000000..247f267d09 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/java/com/example/SenderTestConfiguration.java @@ -0,0 +1,66 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +@TestConfiguration +public class SenderTestConfiguration { + + private String subscriptionName; + + private TopicAdminClient topicAdminClient; + + private SubscriptionAdminClient subscriptionAdminClient; + + public SenderTestConfiguration( + TopicAdminClient topicAdminClient, + SubscriptionAdminClient subscriptionAdminClient, + @Value("${subscriptionName}") String subscriptionName) { + this.topicAdminClient = topicAdminClient; + this.subscriptionAdminClient = subscriptionAdminClient; + this.subscriptionName = subscriptionName; + } + + @Bean + public Topic createTopic(@Qualifier("topicName") String topicName) { + String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + return topicAdminClient.createTopic(TopicName.of(projectName, topicName)); + } + + @Bean + public Subscription createSubscription(@Qualifier("topicName") String topicName) { + String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + return subscriptionAdminClient.createSubscription( + SubscriptionName.of(projectName, this.subscriptionName), + TopicName.of(projectName, topicName), + PushConfig.getDefaultInstance(), + 10); + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/resources/application-test.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/resources/application-test.properties new file mode 100644 index 0000000000..b6503373d1 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-sender/src/test/resources/application-test.properties @@ -0,0 +1,2 @@ +topicName=pubsub-spring-integration-sample-sender-exampleTopic-${random.uuid} +subscriptionName=pubsub-spring-integration-sample-sender-exampleTopic-${random.uuid} \ No newline at end of file diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/java/com/example/SampleAppIntegrationTest.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/java/com/example/SampleAppIntegrationTest.java index 1152ff6ebf..7f5436f065 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/java/com/example/SampleAppIntegrationTest.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/java/com/example/SampleAppIntegrationTest.java @@ -44,11 +44,13 @@ class SampleAppIntegrationTest { void testSample(CapturedOutput capturedOutput) throws Exception { SpringApplicationBuilder sender = - new SpringApplicationBuilder(SenderApplication.class).properties("server.port=8082"); + new SpringApplicationBuilder(SampleTestConfiguration.class, SenderApplication.class) + .properties("server.port=8082", "spring.config.name:application-test"); sender.run(); SpringApplicationBuilder receiver = - new SpringApplicationBuilder(ReceiverApplication.class).properties("server.port=8081"); + new SpringApplicationBuilder(SampleTestConfiguration.class, ReceiverApplication.class) + .properties("server.port=8081", "spring.config.name:application-test"); receiver.run(); MultiValueMap map = new LinkedMultiValueMap<>(); diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/java/com/example/SampleTestConfiguration.java b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/java/com/example/SampleTestConfiguration.java new file mode 100644 index 0000000000..f1dad6e8a4 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/java/com/example/SampleTestConfiguration.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +@TestConfiguration +public class SampleTestConfiguration { + + private TopicAdminClient topicAdminClient; + + private SubscriptionAdminClient subscriptionAdminClient; + + public SampleTestConfiguration( + TopicAdminClient topicAdminClient, SubscriptionAdminClient subscriptionAdminClient) { + this.topicAdminClient = topicAdminClient; + this.subscriptionAdminClient = subscriptionAdminClient; + } + + @Bean + public Topic createTopic(@Qualifier("topicName") String topicName) { + String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + return topicAdminClient.createTopic(TopicName.of(projectName, topicName)); + } + + @Bean + public Subscription createSubscription( + @Qualifier("subscriptionName") String subscriptionName, + @Qualifier("topicName") String topicName) { + String projectName = ProjectName.of(ServiceOptions.getDefaultProjectId()).getProject(); + return subscriptionAdminClient.createSubscription( + SubscriptionName.of(projectName, subscriptionName), + TopicName.of(projectName, topicName), + PushConfig.getDefaultInstance(), + 10); + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/resources/application-test.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/resources/application-test.properties new file mode 100644 index 0000000000..6b3c00f8b6 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-sample/spring-cloud-gcp-integration-pubsub-sample-test/src/test/resources/application-test.properties @@ -0,0 +1,2 @@ +topicName=pubsub-spring-integration-sample-test-exampleTopic-${random.uuid} +subscriptionName=pubsub-spring-integration-sample-test-exampleSubscription-${random.uuid} \ No newline at end of file