Skip to content

Commit

Permalink
Fix sb integration bug (#25501)
Browse files Browse the repository at this point in the history
* fix integration bug

* fix sb si bug

* fix binder bug

* fix pipeline error

* fix modify variable of lambda

* add default entity type to servicebus template

* fix ut

* fix pipeline

* modify createfactory
  • Loading branch information
yiliuTo authored Nov 19, 2021
1 parent 49ab208 commit 6969085
Show file tree
Hide file tree
Showing 19 changed files with 112 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class AzureServiceBusMessagingAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public NamespaceProperties serviceBusNamespaceTopicProperties(AzureServiceBusProperties properties) {
public NamespaceProperties serviceBusNamespaceProperties(AzureServiceBusProperties properties) {
NamespaceProperties namespaceProperties = new NamespaceProperties();
BeanUtils.copyProperties(properties, namespaceProperties);
copyAzureCommonProperties(properties, namespaceProperties);
Expand Down Expand Up @@ -84,7 +84,7 @@ public static class ServiceBusTemplateConfiguration {

@Bean
@ConditionalOnMissingBean
public ServiceBusProducerFactory defaultServiceBusNamespaceQueueProducerFactory(
public ServiceBusProducerFactory defaultServiceBusNamespaceProducerFactory(
NamespaceProperties properties,
ObjectProvider<PropertiesSupplier<String, ProducerProperties>> suppliers) {
return new DefaultServiceBusNamespaceProducerFactory(properties, suppliers.getIfAvailable());
Expand All @@ -99,8 +99,8 @@ public ServiceBusMessageConverter messageConverter() {
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(ServiceBusProducerFactory.class)
public ServiceBusTemplate queueOperation(ServiceBusProducerFactory senderClientfactory,
ServiceBusMessageConverter messageConverter) {
public ServiceBusTemplate serviceBusTemplate(ServiceBusProducerFactory senderClientfactory,
ServiceBusMessageConverter messageConverter) {
ServiceBusTemplate serviceBusTemplate = new ServiceBusTemplate(senderClientfactory);
serviceBusTemplate.setMessageConverter(messageConverter);
return serviceBusTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void noQueueNameOrTopicNameProvidedShouldNotConfigure() {
}

@Test
void entityTypeProvidedShouldNotConfigure() {
void entityNameProvidedShouldConfigure() {
ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder();
serviceBusClientBuilder.connectionString(String.format(CONNECTION_STRING, "test-namespace"));

Expand All @@ -44,7 +44,7 @@ void entityTypeProvidedShouldNotConfigure() {
}

@Test
void entityNameProvidedShouldNotConfigure() {
void entityTypeProvidedShouldConfigure() {
ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder();
serviceBusClientBuilder.connectionString(String.format(CONNECTION_STRING, "test-namespace"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public ServiceBusTemplate getServiceBusTemplate() {
DefaultServiceBusNamespaceProducerFactory factory = new DefaultServiceBusNamespaceProducerFactory(
this.namespaceProperties, getProducerPropertiesSupplier());

factory.addListener((name) -> {
factory.addListener((name, client) -> {
DefaultInstrumentation instrumentation = new DefaultInstrumentation(name, PRODUCER);
instrumentation.markUp();
instrumentationManager.addHealthInstrumentation(instrumentation.getId(), instrumentation);
Expand All @@ -243,7 +243,7 @@ private ServiceBusProcessorContainer getProcessorContainer() {
DefaultServiceBusNamespaceProcessorFactory factory = new DefaultServiceBusNamespaceProcessorFactory(
this.namespaceProperties, getProcessorPropertiesSupplier());

factory.addListener((name, subscription) -> {
factory.addListener((name, subscription, client) -> {
String instrumentationName = name + "/" + subscription == null ? "" : subscription;
Instrumentation instrumentation = new ServiceBusProcessorInstrumentation(instrumentationName, CONSUMER, Duration.ofMinutes(2));
instrumentation.markUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setInstrumentationId(String instrumentationId) {
}
private class IntegrationRecordMessageProcessingListener implements RecordMessageProcessingListener {

private ServiceBusMessageConverter messageConverter;
private ServiceBusMessageConverter messageConverter = new ServiceBusMessageConverter();
private Class<?> payloadType = byte[].class;
private InstrumentationManager instrumentationManager;
private String instrumentationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,33 +75,34 @@ public EventProcessorClient createProcessor(@NonNull String eventHub, @NonNull S

@Override
public void destroy() {
this.processorClientMap.values().forEach(EventProcessorClient::stop);
this.processorClientMap.forEach((t, client) -> {
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2(), client));
client.stop();
});
this.processorClientMap.clear();
this.listeners.clear();
}

private EventProcessorClient doCreateProcessor(@NonNull String eventHub, @NonNull String consumerGroup,
@NonNull EventProcessingListener listener,
@Nullable ProcessorProperties properties) {
Tuple2<String, String> key = Tuples.of(eventHub, consumerGroup);
if (this.processorClientMap.containsKey(key)) {
return this.processorClientMap.get(key);
}
return processorClientMap.computeIfAbsent(key, k -> {

ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEventHubName(eventHub);
processorProperties.setConsumerGroup(consumerGroup);
ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEventHubName(k.getT1());
processorProperties.setConsumerGroup(k.getT2());

EventProcessorClientBuilderFactory factory =
new EventProcessorClientBuilderFactory(processorProperties, this.checkpointStore, listener);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventProcessorClient client = factory.build().buildEventProcessorClient();
LOGGER.info("EventProcessor created for event hub '{}' with consumer group '{}'", eventHub, consumerGroup);
EventProcessorClientBuilderFactory factory =
new EventProcessorClientBuilderFactory(processorProperties, this.checkpointStore, listener);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventProcessorClient client = factory.build().buildEventProcessorClient();
LOGGER.info("EventProcessor created for event hub '{}' with consumer group '{}'", k.getT1(), k.getT2());

this.listeners.forEach(l -> l.processorAdded(eventHub, consumerGroup));
this.listeners.forEach(l -> l.processorAdded(k.getT1(), k.getT2(), client));

this.processorClientMap.put(key, client);

return client;
return client;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ default boolean removeListener(Listener listener) {
*/
interface Listener {

default void processorAdded(String eventHub, String consumerGroup) {
default void processorAdded(String eventHub, String consumerGroup, EventProcessorClient client) {

}

default void processorRemoved(String eventHub, String consumerGroup) {
default void processorRemoved(String eventHub, String consumerGroup, EventProcessorClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,16 @@ public EventHubProducerAsyncClient createProducer(String eventHub) {
}

private EventHubProducerAsyncClient doCreateProducer(String eventHub, @Nullable ProducerProperties properties) {
if (this.clients.containsKey(eventHub)) {
return this.clients.get(eventHub);
}
return clients.computeIfAbsent(eventHub, entityName -> {
ProducerProperties producerProperties = parentMerger.mergeParent(properties, this.namespaceProperties);
producerProperties.setEventHubName(entityName);
EventHubClientBuilderFactory factory = new EventHubClientBuilderFactory(producerProperties);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventHubProducerAsyncClient producerClient = factory.build().buildAsyncProducerClient();
this.listeners.forEach(l -> l.producerAdded(entityName, producerClient));

ProducerProperties producerProperties = parentMerger.mergeParent(properties, this.namespaceProperties);
producerProperties.setEventHubName(eventHub);
EventHubClientBuilderFactory factory = new EventHubClientBuilderFactory(producerProperties);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventHubProducerAsyncClient producerClient = factory.build().buildAsyncProducerClient();

this.listeners.forEach(l -> l.producerAdded(eventHub));

this.clients.put(eventHub, producerClient);
return producerClient;
return producerClient;
});
}

@Override
Expand All @@ -75,7 +71,11 @@ public boolean removeListener(Listener listener) {

@Override
public void destroy() {
this.clients.values().forEach(EventHubProducerAsyncClient::close);
this.clients.forEach((name, client) -> {
this.listeners.forEach(l -> l.producerRemoved(name, client));
client.close();
});
this.clients.clear();
this.listeners.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ default boolean removeListener(Listener listener) {
*/
interface Listener {

default void producerAdded(String name) {
default void producerAdded(String name, EventHubProducerAsyncClient client) {

}

default void producerRemoved(String name) {
default void producerRemoved(String name, EventHubProducerAsyncClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void setUp() {
processorAddedTimes = 0;
this.processorFactory.addListener(new EventHubsProcessorFactory.Listener() {
@Override
public void processorAdded(String eventHub, String consumerGroup) {
public void processorAdded(String eventHub, String consumerGroup, EventProcessorClient client) {
processorAddedTimes++;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void setUp() {
producerAddedTimes = 0;
this.producerFactory.addListener(new EventHubsProducerFactory.Listener() {
@Override
public void producerAdded(String name) {
public void producerAdded(String name, EventHubProducerAsyncClient client) {
producerAddedTimes++;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public boolean unsubscribe(String queue) {
public ServiceBusProcessorClient subscribe(String topic, String subscription, MessageProcessingListener listener) {
ServiceBusProcessorClient processor = this.processorFactory.createProcessor(topic, subscription, listener);
processor.start();
this.listeners.forEach(l -> l.processorAdded(topic, subscription));
this.listeners.forEach(l -> l.processorAdded(topic, subscription, processor));
this.clients.add(processor);
return processor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.spring.messaging.PartitionSupplier;
import com.azure.spring.messaging.core.SendOperation;
import com.azure.spring.service.servicebus.properties.ServiceBusEntityType;
import com.azure.spring.servicebus.core.producer.ServiceBusProducerFactory;
import com.azure.spring.servicebus.support.converter.ServiceBusMessageConverter;
import org.springframework.lang.NonNull;
Expand All @@ -25,6 +26,7 @@ public class ServiceBusTemplate implements SendOperation {
private static final ServiceBusMessageConverter DEFAULT_CONVERTER = new ServiceBusMessageConverter();
private final ServiceBusProducerFactory producerFactory;
private ServiceBusMessageConverter messageConverter = DEFAULT_CONVERTER;
private ServiceBusEntityType defaultEntityType;

public ServiceBusTemplate(@NonNull ServiceBusProducerFactory producerFactory) {
this.producerFactory = producerFactory;
Expand All @@ -35,7 +37,7 @@ public <U> Mono<Void> sendAsync(String destination,
Message<U> message,
PartitionSupplier partitionSupplier) {
Assert.hasText(destination, "destination can't be null or empty");
ServiceBusSenderAsyncClient senderAsyncClient = this.producerFactory.createProducer(destination);
ServiceBusSenderAsyncClient senderAsyncClient = this.producerFactory.createProducer(destination, defaultEntityType);
ServiceBusMessage serviceBusMessage = messageConverter.fromMessage(message, ServiceBusMessage.class);

if (Objects.nonNull(serviceBusMessage) && !StringUtils.hasText(serviceBusMessage.getPartitionKey())) {
Expand Down Expand Up @@ -68,4 +70,8 @@ private String getPartitionKey(PartitionSupplier partitionSupplier) {

return "";
}

public void setDefaultEntityType(ServiceBusEntityType entityType) {
defaultEntityType = entityType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public DefaultServiceBusNamespaceProcessorFactory(NamespaceProperties namespaceP
this.propertiesSupplier = supplier == null ? key -> null : supplier;
}

private <K, V> void close(Map<Tuple2<String, String>, V> map, Consumer<V> close) {
private void close(Map<Tuple2<String, String>, ServiceBusProcessorClient> map, Consumer<ServiceBusProcessorClient> close) {
map.forEach((t, p) -> {
try {
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2()));
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2(), p));
close.accept(p);
} catch (Exception ex) {
LOGGER.warn("Failed to clean service bus queue client factory", ex);
Expand Down Expand Up @@ -92,12 +92,12 @@ private ServiceBusProcessorClient doCreateProcessor(String name, String subscrip

return processorMap.computeIfAbsent(key, k -> {
ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEntityName(name);
if (INVALID_SUBSCRIPTION.equals(subscription)) {
processorProperties.setEntityName(k.getT1());
if (INVALID_SUBSCRIPTION.equals(k.getT2())) {
processorProperties.setEntityType(ServiceBusEntityType.QUEUE);
} else {
processorProperties.setEntityType(ServiceBusEntityType.TOPIC);
processorProperties.setSubscriptionName(subscription);
processorProperties.setSubscriptionName(k.getT2());
}

ServiceBusProcessorClient client;
Expand All @@ -114,8 +114,8 @@ private ServiceBusProcessorClient doCreateProcessor(String name, String subscrip
client = factory.build().buildProcessorClient();
}

this.listeners.forEach(l -> l.processorAdded(name, INVALID_SUBSCRIPTION.equals(subscription) ? null
: subscription));
this.listeners.forEach(l -> l.processorAdded(k.getT1(), INVALID_SUBSCRIPTION.equals(k.getT2()) ? null
: k.getT2(), client));
return client;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ default boolean removeListener(Listener listener) {
@FunctionalInterface
interface Listener {

void processorAdded(String name, String subscription);
void processorAdded(String name, String subscription, ServiceBusProcessorClient client);

default void processorRemoved(String name, String subscription) {
default void processorRemoved(String name, String subscription, ServiceBusProcessorClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.spring.core.AzureSpringIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.service.servicebus.factory.ServiceBusSenderClientBuilderFactory;
import com.azure.spring.service.servicebus.properties.ServiceBusEntityType;
import com.azure.spring.servicebus.core.properties.NamespaceProperties;
import com.azure.spring.servicebus.core.properties.ProducerProperties;
import com.azure.spring.servicebus.core.properties.merger.ProducerPropertiesParentMerger;
Expand Down Expand Up @@ -38,11 +39,20 @@ public DefaultServiceBusNamespaceProducerFactory(NamespaceProperties namespacePr
public DefaultServiceBusNamespaceProducerFactory(NamespaceProperties namespaceProperties,
PropertiesSupplier<String, ProducerProperties> supplier) {
this.namespaceProperties = namespaceProperties;
this.propertiesSupplier = supplier;
this.propertiesSupplier = supplier == null ? key -> null : supplier;
}

public ServiceBusSenderAsyncClient createProducer(String name) {
return doCreateProducer(name, this.propertiesSupplier.getProperties(name));
return createProducer(name, null);
}

public ServiceBusSenderAsyncClient createProducer(String name, ServiceBusEntityType entityType) {
ProducerProperties producerProperties = this.propertiesSupplier.getProperties(name) != null
? this.propertiesSupplier.getProperties(name) : new ProducerProperties();
if (entityType != null) {
producerProperties.setEntityType(entityType);
}
return doCreateProducer(name, producerProperties);
}

@Override
Expand All @@ -58,7 +68,7 @@ public boolean removeListener(Listener listener) {
@Override
public void destroy() {
clients.forEach((name, producer) -> {
listeners.forEach(l -> l.producerRemoved(name));
listeners.forEach(l -> l.producerRemoved(name, producer));
producer.close();
});
this.clients.clear();
Expand All @@ -68,14 +78,14 @@ public void destroy() {
private ServiceBusSenderAsyncClient doCreateProducer(String name, @Nullable ProducerProperties properties) {
return clients.computeIfAbsent(name, entityName -> {
ProducerProperties producerProperties = parentMerger.mergeParent(properties, this.namespaceProperties);

producerProperties.setEntityName(entityName);

//TODO(yiliu6): whether to make the producer client share the same service bus client builder
ServiceBusSenderClientBuilderFactory factory = new ServiceBusSenderClientBuilderFactory(producerProperties);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS);
ServiceBusSenderAsyncClient producerClient = factory.build().buildAsyncClient();

this.listeners.forEach(l -> l.producerAdded(entityName));
this.listeners.forEach(l -> l.producerAdded(entityName, producerClient));
return producerClient;
});
}
Expand Down
Loading

0 comments on commit 6969085

Please sign in to comment.