Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sb integration bug #25501

Merged
merged 10 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class AzureServiceBusMessagingAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public NamespaceProperties serviceBusNamespaceTopicProperties(AzureServiceBusProperties properties) {
public NamespaceProperties serviceBusNamespaceProperties(AzureServiceBusProperties properties) {
NamespaceProperties namespaceProperties = new NamespaceProperties();
BeanUtils.copyProperties(properties, namespaceProperties);
copyAzureCommonProperties(properties, namespaceProperties);
Expand Down Expand Up @@ -83,7 +83,7 @@ public static class ServiceBusTemplateConfiguration {

@Bean
@ConditionalOnMissingBean
public ServiceBusProducerFactory defaultServiceBusNamespaceQueueProducerFactory(
public ServiceBusProducerFactory defaultServiceBusNamespaceProducerFactory(
NamespaceProperties properties,
ObjectProvider<PropertiesSupplier<String, ProducerProperties>> suppliers) {
return new DefaultServiceBusNamespaceProducerFactory(properties, suppliers.getIfAvailable());
Expand All @@ -98,8 +98,8 @@ public ServiceBusMessageConverter messageConverter() {
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(ServiceBusProducerFactory.class)
public ServiceBusTemplate queueOperation(ServiceBusProducerFactory senderClientfactory,
ServiceBusMessageConverter messageConverter) {
public ServiceBusTemplate serviceBusTemplate(ServiceBusProducerFactory senderClientfactory,
ServiceBusMessageConverter messageConverter) {
ServiceBusTemplate serviceBusTemplate = new ServiceBusTemplate(senderClientfactory);
serviceBusTemplate.setMessageConverter(messageConverter);
return serviceBusTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void noQueueNameOrTopicNameProvidedShouldNotConfigure() {
}

@Test
void entityTypeProvidedShouldNotConfigure() {
void entityNameProvidedShouldConfigure() {
ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder();
serviceBusClientBuilder.connectionString(String.format(CONNECTION_STRING, "test-namespace"));

Expand All @@ -42,7 +42,7 @@ void entityTypeProvidedShouldNotConfigure() {
}

@Test
void entityNameProvidedShouldNotConfigure() {
void entityTypeProvidedShouldConfigure() {
ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder();
serviceBusClientBuilder.connectionString(String.format(CONNECTION_STRING, "test-namespace"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public ServiceBusTemplate getServiceBusTemplate() {
DefaultServiceBusNamespaceProducerFactory factory = new DefaultServiceBusNamespaceProducerFactory(
this.namespaceProperties, getProducerPropertiesSupplier());

factory.addListener((name) -> {
factory.addListener((name, client) -> {
DefaultInstrumentation instrumentation = new DefaultInstrumentation(name, PRODUCER);
instrumentation.markUp();
instrumentationManager.addHealthInstrumentation(instrumentation.getId(), instrumentation);
Expand All @@ -243,7 +243,7 @@ private ServiceBusProcessorContainer getProcessorContainer() {
DefaultServiceBusNamespaceProcessorFactory factory = new DefaultServiceBusNamespaceProcessorFactory(
this.namespaceProperties, getProcessorPropertiesSupplier());

factory.addListener((name, subscription) -> {
factory.addListener((name, subscription, client) -> {
String instrumentationName = name + "/" + subscription == null ? "" : subscription;
Instrumentation instrumentation = new ServiceBusProcessorInstrumentation(instrumentationName, CONSUMER, Duration.ofMinutes(2));
instrumentation.markUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setInstrumentationId(String instrumentationId) {
}
private class IntegrationRecordMessageProcessingListener implements RecordMessageProcessingListener {

private ServiceBusMessageConverter messageConverter;
private ServiceBusMessageConverter messageConverter = new ServiceBusMessageConverter();
private Class<?> payloadType = byte[].class;
private InstrumentationManager instrumentationManager;
private String instrumentationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,33 +75,34 @@ public EventProcessorClient createProcessor(@NonNull String eventHub, @NonNull S

@Override
public void destroy() {
this.processorClientMap.values().forEach(EventProcessorClient::stop);
this.processorClientMap.forEach((t, client) -> {
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2(), client));
client.stop();
});
this.processorClientMap.clear();
this.listeners.clear();
}

private EventProcessorClient doCreateProcessor(@NonNull String eventHub, @NonNull String consumerGroup,
@NonNull EventProcessingListener listener,
@Nullable ProcessorProperties properties) {
Tuple2<String, String> key = Tuples.of(eventHub, consumerGroup);
if (this.processorClientMap.containsKey(key)) {
return this.processorClientMap.get(key);
}
return processorClientMap.computeIfAbsent(key, k -> {

ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEventHubName(eventHub);
processorProperties.setConsumerGroup(consumerGroup);
ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEventHubName(k.getT1());
processorProperties.setConsumerGroup(k.getT2());

EventProcessorClientBuilderFactory factory =
new EventProcessorClientBuilderFactory(processorProperties, this.checkpointStore, listener);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventProcessorClient client = factory.build().buildEventProcessorClient();
LOGGER.info("EventProcessor created for event hub '{}' with consumer group '{}'", eventHub, consumerGroup);
EventProcessorClientBuilderFactory factory =
new EventProcessorClientBuilderFactory(processorProperties, this.checkpointStore, listener);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventProcessorClient client = factory.build().buildEventProcessorClient();
LOGGER.info("EventProcessor created for event hub '{}' with consumer group '{}'", k.getT1(), k.getT2());

this.listeners.forEach(l -> l.processorAdded(eventHub, consumerGroup));
this.listeners.forEach(l -> l.processorAdded(k.getT1(), k.getT2(), client));

this.processorClientMap.put(key, client);

return client;
return client;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ default boolean removeListener(Listener listener) {
*/
interface Listener {

default void processorAdded(String eventHub, String consumerGroup) {
default void processorAdded(String eventHub, String consumerGroup, EventProcessorClient client) {

}

default void processorRemoved(String eventHub, String consumerGroup) {
default void processorRemoved(String eventHub, String consumerGroup, EventProcessorClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,16 @@ public EventHubProducerAsyncClient createProducer(String eventHub) {
}

private EventHubProducerAsyncClient doCreateProducer(String eventHub, @Nullable ProducerProperties properties) {
if (this.clients.containsKey(eventHub)) {
return this.clients.get(eventHub);
}
return clients.computeIfAbsent(eventHub, entityName -> {
ProducerProperties producerProperties = parentMerger.mergeParent(properties, this.namespaceProperties);
producerProperties.setEventHubName(entityName);
EventHubClientBuilderFactory factory = new EventHubClientBuilderFactory(producerProperties);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventHubProducerAsyncClient producerClient = factory.build().buildAsyncProducerClient();
this.listeners.forEach(l -> l.producerAdded(entityName, producerClient));

ProducerProperties producerProperties = parentMerger.mergeParent(properties, this.namespaceProperties);
producerProperties.setEventHubName(eventHub);
EventHubClientBuilderFactory factory = new EventHubClientBuilderFactory(producerProperties);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventHubProducerAsyncClient producerClient = factory.build().buildAsyncProducerClient();

this.listeners.forEach(l -> l.producerAdded(eventHub));

this.clients.put(eventHub, producerClient);
return producerClient;
return producerClient;
});
}

@Override
Expand All @@ -75,7 +71,11 @@ public boolean removeListener(Listener listener) {

@Override
public void destroy() {
this.clients.values().forEach(EventHubProducerAsyncClient::close);
this.clients.forEach((name, client) -> {
this.listeners.forEach(l -> l.producerRemoved(name, client));
client.close();
});
this.clients.clear();
this.listeners.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ default boolean removeListener(Listener listener) {
*/
interface Listener {

default void producerAdded(String name) {
default void producerAdded(String name, EventHubProducerAsyncClient client) {

}

default void producerRemoved(String name) {
default void producerRemoved(String name, EventHubProducerAsyncClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void setUp() {
processorAddedTimes = 0;
this.processorFactory.addListener(new EventHubsProcessorFactory.Listener() {
@Override
public void processorAdded(String eventHub, String consumerGroup) {
public void processorAdded(String eventHub, String consumerGroup, EventProcessorClient client) {
processorAddedTimes++;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void setUp() {
producerAddedTimes = 0;
this.producerFactory.addListener(new EventHubsProducerFactory.Listener() {
@Override
public void producerAdded(String name) {
public void producerAdded(String name, EventHubProducerAsyncClient client) {
producerAddedTimes++;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public boolean unsubscribe(String queue) {
public ServiceBusProcessorClient subscribe(String topic, String subscription, MessageProcessingListener listener) {
ServiceBusProcessorClient processor = this.processorFactory.createProcessor(topic, subscription, listener);
processor.start();
this.listeners.forEach(l -> l.processorAdded(topic, subscription));
this.listeners.forEach(l -> l.processorAdded(topic, subscription, processor));
this.clients.add(processor);
return processor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.spring.messaging.PartitionSupplier;
import com.azure.spring.messaging.core.SendOperation;
import com.azure.spring.service.servicebus.properties.ServiceBusEntityType;
import com.azure.spring.servicebus.core.producer.ServiceBusProducerFactory;
import com.azure.spring.servicebus.support.converter.ServiceBusMessageConverter;
import org.springframework.lang.NonNull;
Expand All @@ -25,6 +26,7 @@ public class ServiceBusTemplate implements SendOperation {
private static final ServiceBusMessageConverter DEFAULT_CONVERTER = new ServiceBusMessageConverter();
private final ServiceBusProducerFactory producerFactory;
private ServiceBusMessageConverter messageConverter = DEFAULT_CONVERTER;
private ServiceBusEntityType defaultEntityType;

public ServiceBusTemplate(@NonNull ServiceBusProducerFactory producerFactory) {
this.producerFactory = producerFactory;
Expand All @@ -35,7 +37,7 @@ public <U> Mono<Void> sendAsync(String destination,
Message<U> message,
PartitionSupplier partitionSupplier) {
Assert.hasText(destination, "destination can't be null or empty");
ServiceBusSenderAsyncClient senderAsyncClient = this.producerFactory.createProducer(destination);
ServiceBusSenderAsyncClient senderAsyncClient = this.producerFactory.createProducer(destination, defaultEntityType);
ServiceBusMessage serviceBusMessage = messageConverter.fromMessage(message, ServiceBusMessage.class);

if (Objects.nonNull(serviceBusMessage) && !StringUtils.hasText(serviceBusMessage.getPartitionKey())) {
Expand Down Expand Up @@ -68,4 +70,8 @@ private String getPartitionKey(PartitionSupplier partitionSupplier) {

return "";
}

public void setDefaultEntityType(ServiceBusEntityType entityType) {
defaultEntityType = entityType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public DefaultServiceBusNamespaceProcessorFactory(NamespaceProperties namespaceP
this.propertiesSupplier = supplier == null ? key -> null : supplier;
}

private <K, V> void close(Map<Tuple2<String, String>, V> map, Consumer<V> close) {
private void close(Map<Tuple2<String, String>, ServiceBusProcessorClient> map, Consumer<ServiceBusProcessorClient> close) {
map.forEach((t, p) -> {
try {
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2()));
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2(), p));
close.accept(p);
} catch (Exception ex) {
LOGGER.warn("Failed to clean service bus queue client factory", ex);
Expand Down Expand Up @@ -92,12 +92,12 @@ private ServiceBusProcessorClient doCreateProcessor(String name, String subscrip

return processorMap.computeIfAbsent(key, k -> {
ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEntityName(name);
if (INVALID_SUBSCRIPTION.equals(subscription)) {
processorProperties.setEntityName(k.getT1());
if (INVALID_SUBSCRIPTION.equals(k.getT2())) {
processorProperties.setEntityType(ServiceBusEntityType.QUEUE);
} else {
processorProperties.setEntityType(ServiceBusEntityType.TOPIC);
processorProperties.setSubscriptionName(subscription);
processorProperties.setSubscriptionName(k.getT2());
}

ServiceBusProcessorClient client;
Expand All @@ -114,8 +114,8 @@ private ServiceBusProcessorClient doCreateProcessor(String name, String subscrip
client = factory.build().buildProcessorClient();
}

this.listeners.forEach(l -> l.processorAdded(name, INVALID_SUBSCRIPTION.equals(subscription) ? null
: subscription));
this.listeners.forEach(l -> l.processorAdded(k.getT1(), INVALID_SUBSCRIPTION.equals(k.getT2()) ? null
: k.getT2(), client));
return client;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ default boolean removeListener(Listener listener) {
@FunctionalInterface
interface Listener {

void processorAdded(String name, String subscription);
void processorAdded(String name, String subscription, ServiceBusProcessorClient client);

default void processorRemoved(String name, String subscription) {
default void processorRemoved(String name, String subscription, ServiceBusProcessorClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.spring.core.AzureSpringIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.service.servicebus.factory.ServiceBusSenderClientBuilderFactory;
import com.azure.spring.service.servicebus.properties.ServiceBusEntityType;
import com.azure.spring.servicebus.core.properties.NamespaceProperties;
import com.azure.spring.servicebus.core.properties.ProducerProperties;
import com.azure.spring.servicebus.core.properties.merger.ProducerPropertiesParentMerger;
Expand Down Expand Up @@ -38,11 +39,20 @@ public DefaultServiceBusNamespaceProducerFactory(NamespaceProperties namespacePr
public DefaultServiceBusNamespaceProducerFactory(NamespaceProperties namespaceProperties,
PropertiesSupplier<String, ProducerProperties> supplier) {
this.namespaceProperties = namespaceProperties;
this.propertiesSupplier = supplier;
this.propertiesSupplier = supplier == null ? key -> null : supplier;
}

public ServiceBusSenderAsyncClient createProducer(String name) {
return doCreateProducer(name, this.propertiesSupplier.getProperties(name));
return createProducer(name, null);
}

public ServiceBusSenderAsyncClient createProducer(String name, ServiceBusEntityType entityType) {
ProducerProperties producerProperties = this.propertiesSupplier.getProperties(name) != null
? this.propertiesSupplier.getProperties(name) : new ProducerProperties();
if (entityType != null) {
producerProperties.setEntityType(entityType);
}
return doCreateProducer(name, producerProperties);
}

@Override
Expand All @@ -58,7 +68,7 @@ public boolean removeListener(Listener listener) {
@Override
public void destroy() {
clients.forEach((name, producer) -> {
listeners.forEach(l -> l.producerRemoved(name));
listeners.forEach(l -> l.producerRemoved(name, producer));
producer.close();
});
this.clients.clear();
Expand All @@ -68,14 +78,14 @@ public void destroy() {
private ServiceBusSenderAsyncClient doCreateProducer(String name, @Nullable ProducerProperties properties) {
return clients.computeIfAbsent(name, entityName -> {
ProducerProperties producerProperties = parentMerger.mergeParent(properties, this.namespaceProperties);

producerProperties.setEntityName(entityName);

//TODO(yiliu6): whether to make the producer client share the same service bus client builder
ServiceBusSenderClientBuilderFactory factory = new ServiceBusSenderClientBuilderFactory(producerProperties);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS);
ServiceBusSenderAsyncClient producerClient = factory.build().buildAsyncClient();

this.listeners.forEach(l -> l.producerAdded(entityName));
this.listeners.forEach(l -> l.producerAdded(entityName, producerClient));
return producerClient;
});
}
Expand Down
Loading