Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable properties which supported in sdk #22676

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ffb59f4
Support the parameters that are allowed to be configured in the SDK
zhichengliu12581 Jun 30, 2021
00356c6
nothing changed, just remove unused import
zhichengliu12581 Jun 30, 2021
f3071b1
support configuration in topic
zhichengliu12581 Jun 30, 2021
7f982bc
try to fix pipeline error
zhichengliu12581 Jun 30, 2021
ca7d039
merge master
zhichengliu12581 Jul 1, 2021
f876d85
remove useless bean
zhichengliu12581 Jul 2, 2021
48c8ccc
add properties in consumer Properties
zhichengliu12581 Jul 2, 2021
922759a
remove unused import
zhichengliu12581 Jul 5, 2021
c0f6557
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
zhichengliu12581 Jul 6, 2021
5939585
add default client options
zhichengliu12581 Jul 7, 2021
2380f9f
revert useless bean as discussed
zhichengliu12581 Jul 8, 2021
65d8abf
add test for retry-options properties
zhichengliu12581 Jul 9, 2021
c276787
add test for serviceBusReceiveMode properties
zhichengliu12581 Jul 9, 2021
4245f00
nothing changed, just remove Todo
zhichengliu12581 Jul 9, 2021
4ca6492
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
zhichengliu12581 Jul 9, 2021
ec9736b
nothing changed just remove unused import and format code
zhichengliu12581 Jul 9, 2021
54e8474
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
zhichengliu12581 Jul 23, 2021
379c430
update CHANGELOG.md and README.md
zhichengliu12581 Jul 23, 2021
dc87561
remove unused import
zhichengliu12581 Jul 23, 2021
0c45193
merge main
zhichengliu12581 Jul 27, 2021
5efa9aa
add test in queue&topic autoconfigurtion
zhichengliu12581 Jul 28, 2021
77e91fa
update README.md
zhichengliu12581 Jul 28, 2021
006d4a7
enable tests
zhichengliu12581 Jul 28, 2021
b158580
Deprecated concurrency and use maxConcurrentCalls and maxConcurrentSe…
zhichengliu12581 Jul 29, 2021
9a2d7e5
update CHANGELOG.md and README.md
zhichengliu12581 Jul 29, 2021
295e448
enable properties to set whether enableAutoComplete
zhichengliu12581 Jul 29, 2021
8313691
use DeprecatedConfigurationProperty
zhichengliu12581 Jul 29, 2021
a3d52f9
use DeprecatedConfigurationProperty and update docs
zhichengliu12581 Jul 29, 2021
f84bbe7
fix error in name
zhichengliu12581 Jul 29, 2021
de0880e
try to fix pipeline error
Jul 29, 2021
cbec683
update by comments
zhichengliu12581 Jul 30, 2021
4c8688d
merge main
zhichengliu12581 Jul 30, 2021
d32503d
merge main
zhichengliu12581 Jul 30, 2021
916160a
rename functions
zhichengliu12581 Jul 30, 2021
58245ed
make maxConcurrentCalls and maxConcurrentSessions can work normally w…
zhichengliu12581 Jul 30, 2021
fd05168
remove useless it
zhichengliu12581 Jul 30, 2021
9bc8c6d
make properties as pojo
zhichengliu12581 Jul 30, 2021
9b223dc
merge main
zhichengliu12581 Jul 30, 2021
c134b56
fix README.md error
zhichengliu12581 Aug 2, 2021
c8ccb03
fix pipeline error
zhichengliu12581 Aug 2, 2021
6c86b9a
add test for properties
zhichengliu12581 Aug 2, 2021
e91660b
Update ServiceBusQueueSessionBinderConfigTest.java
zhichengliu12581 Aug 2, 2021
efdc850
fix pipeline error
zhichengliu12581 Aug 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package com.azure.spring.cloud.autoconfigure.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
Expand All @@ -16,6 +18,8 @@ public class AzureServiceBusProperties {

private String connectionString;

private AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(ServiceBusConstants.OPERATION_TIMEOUT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this default value for servicebus sdk client? If so, seems we don't set it here


private AmqpTransportType transportType = AmqpTransportType.AMQP;

public String getNamespace() {
Expand All @@ -41,4 +45,12 @@ public AmqpTransportType getTransportType() {
public void setTransportType(AmqpTransportType transportType) {
this.transportType = transportType;
}

public AmqpRetryOptions getRetryOptions() {
return retryOptions;
}

public void setRetryOptions(AmqpRetryOptions retryOptions) {
this.retryOptions = retryOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ public ServiceBusQueueClientFactory queueClientFactory(

Assert.notNull(connectionString, "Service Bus connection string must not be null");

DefaultServiceBusQueueClientFactory clientFactory = new DefaultServiceBusQueueClientFactory(connectionString, properties.getTransportType());
DefaultServiceBusQueueClientFactory clientFactory = new DefaultServiceBusQueueClientFactory(connectionString);

clientFactory.retryOptions(properties.getRetryOptions());
clientFactory.transportType(properties.getTransportType());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we delete the empty line here?

clientFactory.setNamespace(properties.getNamespace());
clientFactory.setServiceBusNamespaceManager(namespaceManager);
clientFactory.setServiceBusQueueManager(queueManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ public ServiceBusTopicClientFactory topicClientFactory(

Assert.notNull(connectionString, "Service Bus connection string must not be null");

DefaultServiceBusTopicClientFactory clientFactory = new DefaultServiceBusTopicClientFactory(connectionString, properties.getTransportType());
DefaultServiceBusTopicClientFactory clientFactory = new DefaultServiceBusTopicClientFactory(connectionString);
clientFactory.retryOptions(properties.getRetryOptions());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using setRetryOptions to keep all function names consistent?

clientFactory.transportType(properties.getTransportType());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we delete the empty line here?

clientFactory.setNamespace(properties.getNamespace());
clientFactory.setServiceBusNamespaceManager(namespaceManager);
clientFactory.setServiceBusTopicManager(topicManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.spring.cloud.autoconfigure.servicebus;

import com.azure.core.amqp.AmqpRetryMode;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.resourcemanager.AzureResourceManager;
Expand Down Expand Up @@ -97,6 +98,23 @@ public void testTransportTypeWithAmqpWebSockets() {
});
}

@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add UTs in the queue&topic autoconfigurtion to see if the client factory beans are created as the way the properties are configured?

public void testTransportTypeWithRetryOptions() {
this.contextRunner.withPropertyValues(SERVICE_BUS_PROPERTY_PREFIX + "retry-options.maxRetries=5",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.delay=100S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.maxDelay=200S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.tryTimeout=300S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.Mode=FIXED")
.run(context -> {
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMaxRetries()).isEqualTo(5);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getDelay().getSeconds()).isEqualTo(100L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMaxDelay().getSeconds()).isEqualTo(200L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getTryTimeout().getSeconds()).isEqualTo(300L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMode()).isEqualTo(AmqpRetryMode.FIXED);
});
}


@Test
public void testWithAzureResourceManagerProvided() {
this.contextRunner.withUserConfiguration(TestConfigWithAzureResourceManager.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public class Constants {

public static final String SPRING_EVENT_HUB_APPLICATION_ID =
String.join("-", AZURE, SPRING_CLOUD, EVENT_HUB) + "/" + SPRING_CLOUD_VERSION;
public static final String SPRING_SERVICE_BUS_APPLICATION_ID =
String.join("-", AZURE, SPRING_CLOUD, SERVICE_BUS) + "/" + SPRING_CLOUD_VERSION;
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ protected ServiceBusClientConfig buildClientConfig(
.setPrefetchCount(consumerProperties.getPrefetchCount())
.setConcurrency(consumerProperties.getConcurrency())
.setSessionsEnabled(consumerProperties.isSessionsEnabled())
.setMaxConcurrentCalls(consumerProperties.getMaxConcurrentCalls())
.setServiceBusReceiveMode(consumerProperties.getServiceBusReceiveMode())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.spring.servicebus.stream.binder.properties;

import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.spring.integration.core.api.CheckpointMode;

/**
Expand All @@ -14,6 +15,8 @@ public class ServiceBusConsumerProperties {
private int concurrency = 1;
private boolean sessionsEnabled = false;
private boolean requeueRejected = false;
private int maxConcurrentCalls = 1;
private ServiceBusReceiveMode serviceBusReceiveMode = ServiceBusReceiveMode.PEEK_LOCK;

private CheckpointMode checkpointMode = CheckpointMode.RECORD;

Expand All @@ -25,6 +28,14 @@ public void setCheckpointMode(CheckpointMode checkpointMode) {
this.checkpointMode = checkpointMode;
}

public int getMaxConcurrentCalls() {
return maxConcurrentCalls;
}

public void setMaxConcurrentCalls(int maxConcurrentCalls) {
this.maxConcurrentCalls = maxConcurrentCalls;
}

/**
* Prefetch count of underlying service bus client.
*
Expand Down Expand Up @@ -83,4 +94,11 @@ public void setRequeueRejected(boolean requeueRejected) {
this.requeueRejected = requeueRejected;
}

public ServiceBusReceiveMode getServiceBusReceiveMode() {
return serviceBusReceiveMode;
}

public void setServiceBusReceiveMode(ServiceBusReceiveMode serviceBusReceiveMode) {
this.serviceBusReceiveMode = serviceBusReceiveMode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.spring.sample.servicebus.binder;

import com.azure.spring.integration.core.api.Checkpointer;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ActiveProfiles;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.azure.spring.integration.core.AzureHeaders.CHECKPOINTER;
import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(classes = { ServiceBusQueueAndTopicBinderWithPropertiesIT.TestQueueConfig.class,
ServiceBusQueueAndTopicBinderWithPropertiesIT.TestTopicConfig.class })
@ActiveProfiles("properties")
public class ServiceBusQueueAndTopicBinderWithPropertiesIT {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueAndTopicBinderWithPropertiesIT.class);

private static String message = UUID.randomUUID().toString();

private static CountDownLatch latch = new CountDownLatch(2);
private static boolean queueError = false;

@Autowired
private Sinks.Many<Message<String>> manyQueue;

@Autowired
private Sinks.Many<Message<String>> manyTopic;

@EnableAutoConfiguration
public static class TestQueueConfig {

@Bean
public Sinks.Many<Message<String>> manyQueue() {
return Sinks.many().unicast().onBackpressureBuffer();
}

@Bean
public Supplier<Flux<Message<String>>> queueSupply(Sinks.Many<Message<String>> manyQueue) {
return () -> manyQueue.asFlux()
.doOnNext(m -> LOGGER.info("Manually sending message {}", m))
.doOnError(t -> LOGGER.error("Error encountered", t));
}

@Bean
public Consumer<Message<String>> queueConsume() {
return message -> {
LOGGER.info("---Test queue new message received: '{}'", message);
if (message.getPayload().equals(ServiceBusQueueAndTopicBinderWithPropertiesIT.message)) {
latch.countDown();
}
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
checkpointer.success().handle((r, ex) -> {
if (ex != null) {
queueError = true;
}
return null;
});

};
}
}

@EnableAutoConfiguration
public static class TestTopicConfig {

@Bean
public Sinks.Many<Message<String>> manyTopic() {
return Sinks.many().unicast().onBackpressureBuffer();
}

@Bean
public Supplier<Flux<Message<String>>> topicSupply(Sinks.Many<Message<String>> manyTopic) {
return () -> manyTopic.asFlux()
.doOnNext(m -> LOGGER.info("Manually sending message {}", m))
.doOnError(t -> LOGGER.error("Error encountered", t));
}

@Bean
public Consumer<Message<String>> topicConsume() {
return message -> {
LOGGER.info("---Test topic new message received: '{}'", message);
if (message.getPayload().equals(ServiceBusQueueAndTopicBinderWithPropertiesIT.message)) {
latch.countDown();
}
};
}
}

@Test
public void testSingleServiceBusSendAndReceiveMessage() throws InterruptedException {
LOGGER.info("SingleServiceBusQueueAndTopicBinderIT begin.");

LOGGER.info("Send a message:" + message + " to the queue.");
manyQueue.emitNext(MessageBuilder.withPayload(message).build(), Sinks.EmitFailureHandler.FAIL_FAST);
LOGGER.info("Send a message:" + message + " to the topic.");
manyTopic.emitNext(MessageBuilder.withPayload(message).build(), Sinks.EmitFailureHandler.FAIL_FAST);

assertThat(ServiceBusQueueAndTopicBinderWithPropertiesIT.latch.await(15, TimeUnit.SECONDS)).isTrue();
assertThat(queueError).isTrue();
LOGGER.info("SingleServiceBusQueueAndTopicBinderIT end.");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
spring:
cloud:
stream:
function:
definition: queueConsume;queueSupply;topicConsume;topicSupply;
bindings:
topicConsume-in-0:
destination: topic2
group: topicSub
topicSupply-out-0:
destination: topic2
queueConsume-in-0:
binder: servicebus-2
destination: queue2
queueSupply-out-0:
binder: servicebus-2
destination: queue2
binders:
servicebus-1:
type: servicebus-topic
default-candidate: true
environment:
stliu marked this conversation as resolved.
Show resolved Hide resolved
spring:
cloud:
azure:
servicebus:
connection-string: ${SERVICEBUS1_BINDER_TEST_CONNECTION_STRING}
servicebus-2:
type: servicebus-queue
default-candidate: false
environment:
spring:
cloud:
azure:
servicebus:
connection-string: ${SERVICEBUS1_BINDER_TEST_CONNECTION_STRING}
servicebus:
queue:
bindings:
queueConsume-in-0:
consumer:
checkpoint-mode: MANUAL
serviceBusReceiveMode: RECEIVE_AND_DELETE
Loading