Skip to content

Commit

Permalink
Merge pull request Azure#4 from moarychan/moary/fix-coverage-not-met-…
Browse files Browse the repository at this point in the history
…error

Fix pipeline failures and comments
  • Loading branch information
rujche authored Jan 10, 2022
2 parents 78066e6 + 0720ea5 commit 47b8259
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.azure.spring.eventhubs.core.properties.ProcessorProperties;
import com.azure.spring.eventhubs.core.properties.ProducerProperties;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.integration.eventhubs.inbound.health.EventHusProcessorInstrumentation;
import com.azure.spring.integration.eventhubs.inbound.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.handler.DefaultMessageHandler;
import com.azure.spring.integration.instrumentation.DefaultInstrumentation;
import com.azure.spring.integration.instrumentation.DefaultInstrumentationManager;
Expand Down Expand Up @@ -222,7 +222,7 @@ private EventHubsProcessorContainer getProcessorContainer() {
this.checkpointStore, this.namespaceProperties, getProcessorPropertiesSupplier());
factory.addListener((name, consumerGroup, processorClient) -> {
String instrumentationName = name + "/" + consumerGroup;
Instrumentation instrumentation = new EventHusProcessorInstrumentation(instrumentationName, CONSUMER, Duration.ofMinutes(2));
Instrumentation instrumentation = new EventHubsProcessorInstrumentation(instrumentationName, CONSUMER, Duration.ofMinutes(2));
instrumentation.markUp();
instrumentationManager.addHealthInstrumentation(instrumentation.getId(), instrumentation);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import com.azure.spring.eventhubs.core.EventHubsProcessorContainer;
import com.azure.spring.eventhubs.core.EventHubsTemplate;
import com.azure.spring.eventhubs.core.producer.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.integration.eventhubs.inbound.health.EventHusProcessorInstrumentation;
import com.azure.spring.integration.eventhubs.inbound.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.instrumentation.Instrumentation;
import com.azure.spring.integration.instrumentation.InstrumentationManager;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
Expand Down Expand Up @@ -247,8 +247,8 @@ default void updateInstrumentation(ErrorContext errorContext,
String instrumentationId) {
Instrumentation instrumentation = instrumentationManager.getHealthInstrumentation(instrumentationId);
if (instrumentation != null) {
if (instrumentation instanceof EventHusProcessorInstrumentation) {
((EventHusProcessorInstrumentation) instrumentation).markError(errorContext);
if (instrumentation instanceof EventHubsProcessorInstrumentation) {
((EventHubsProcessorInstrumentation) instrumentation).markError(errorContext);
} else {
instrumentation.markDown(errorContext.getThrowable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.azure.spring.eventhubs.support.EventHubsHeaders;
import com.azure.spring.eventhubs.support.converter.EventHubBatchMessageConverter;
import com.azure.spring.eventhubs.support.converter.EventHubsMessageConverter;
import com.azure.spring.integration.eventhubs.inbound.health.EventHusProcessorInstrumentation;
import com.azure.spring.integration.eventhubs.inbound.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.instrumentation.Instrumentation;
import com.azure.spring.integration.instrumentation.InstrumentationManager;
import com.azure.spring.messaging.AzureHeaders;
Expand Down Expand Up @@ -223,8 +223,8 @@ default void updateInstrumentation(ErrorContext errorContext,

Instrumentation instrumentation = instrumentationManager.getHealthInstrumentation(instrumentationId);
if (instrumentation != null) {
if (instrumentation instanceof EventHusProcessorInstrumentation) {
((EventHusProcessorInstrumentation) instrumentation).markError(errorContext);
if (instrumentation instanceof EventHubsProcessorInstrumentation) {
((EventHubsProcessorInstrumentation) instrumentation).markError(errorContext);
} else {
instrumentation.markDown(errorContext.getThrowable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
import java.time.Duration;

/**
* EventHus health details entity class.
* EventHubs health details entity class.
*/
public class EventHusProcessorInstrumentation extends AbstractProcessorInstrumentation<ErrorContext> {
public class EventHubsProcessorInstrumentation extends AbstractProcessorInstrumentation<ErrorContext> {

/**
* Construct a {@link EventHusProcessorInstrumentation} with the specified name, {@link Type} and the period of a none error window.
* Construct a {@link EventHubsProcessorInstrumentation} with the specified name, {@link Type} and the period of a none error window.
*
* @param name the name
* @param type the type
* @param noneErrorWindow the period of a none error window
*/
public EventHusProcessorInstrumentation(String name, Type type, Duration noneErrorWindow) {
public EventHubsProcessorInstrumentation(String name, Type type, Duration noneErrorWindow) {
super(name, type, noneErrorWindow);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.spring.integration.instrumentation.DefaultInstrumentationManager;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.converter.AbstractAzureMessageConverter;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -37,13 +38,10 @@ class EventHubsInboundChannelAdapterTests {
private final List<Message<?>> messages = Arrays.stream(payloads)
.map(p -> MessageBuilder.withPayload(p).build())
.collect(Collectors.toList());
private EventHubsProcessorContainer processorsContainer;

@BeforeEach
void setUp() {
processorsContainer = mock(EventHubsProcessorContainer.class);
this.adapter = new TestEventHubsInboundChannelAdapter(processorsContainer, this.eventHub, this.consumerGroup,
new CheckpointConfig());
this.adapter = new TestEventHubsInboundChannelAdapter(mock(EventHubsProcessorContainer.class),
this.eventHub, this.consumerGroup, new CheckpointConfig());
}

@Test
Expand Down Expand Up @@ -132,6 +130,11 @@ public void sendMessage(Message<?> messageArg) {
static class TestAzureMessageConverter extends AbstractAzureMessageConverter<EventData, EventData> {


@Override
protected ObjectMapper getObjectMapper() {
return null;
}

@Override
protected Object getPayload(EventData azureMessage) {
return azureMessage.getBody();
Expand All @@ -151,6 +154,11 @@ protected EventData fromByte(byte[] payload) {
static class TestBatchAzureMessageConverter extends AbstractAzureMessageConverter<EventBatchContext, EventData> {


@Override
protected ObjectMapper getObjectMapper() {
return null;
}

@Override
protected Object getPayload(EventBatchContext azureMessage) {
return azureMessage.getEvents().stream().map(EventData::getBody).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public ErrorContext getErrorContext(RuntimeException exception) {
@Override
public AbstractProcessorInstrumentation<ErrorContext> getProcessorInstrumentation(Instrumentation.Type type,
Duration window) {
return new EventHusProcessorInstrumentation("test", type, window);
return new EventHubsProcessorInstrumentation("test", type, window);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.converter.AbstractAzureMessageConverter;
import com.azure.spring.servicebus.core.ServiceBusProcessorContainer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -47,21 +48,19 @@ class ServiceBusInboundChannelAdapterTests {
.collect(Collectors.toList());

private AutoCloseable closeable;
private ServiceBusProcessorContainer processorsContainer;

@BeforeEach
public void setUp() {
processorsContainer = mock(ServiceBusProcessorContainer.class);
this.closeable = MockitoAnnotations.openMocks(this);
this.adapter = new TestServiceBusInboundChannelAdapter(processorsContainer, destination, subscription,
new CheckpointConfig(CheckpointMode.RECORD));
this.adapter = new TestServiceBusInboundChannelAdapter(mock(ServiceBusProcessorContainer.class),
destination, subscription, new CheckpointConfig(CheckpointMode.RECORD));
}

@Test
void subscriptionCannotEmptyWhenEntityTypeIsTopic() {
void destinationCannotEmptyWhenEntityTypeIsTopic() {
assertThrows(IllegalArgumentException.class,
() -> new TestServiceBusInboundChannelAdapter(processorsContainer, destination, null,
new CheckpointConfig(CheckpointMode.RECORD)));
() -> new TestServiceBusInboundChannelAdapter(mock(ServiceBusProcessorContainer.class),
null, null, new CheckpointConfig(CheckpointMode.RECORD)));
}

@Test
Expand Down Expand Up @@ -153,6 +152,11 @@ public void sendMessage(Message<?> messageArg) {

static class TestServiceBusMessageConverter extends AbstractAzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> {

@Override
protected ObjectMapper getObjectMapper() {
return null;
}

@Override
protected Object getPayload(ServiceBusReceivedMessage azureMessage) {
final BinaryData body = azureMessage.getBody();
Expand Down

0 comments on commit 47b8259

Please sign in to comment.