Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka connector loop retry #2792

Merged
merged 12 commits into from
Jul 17, 2024
6 changes: 6 additions & 0 deletions connectors/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,16 @@ except in compliance with the proprietary license.</license.inlineheader>
<artifactId>commons-text</artifactId>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import dev.failsafe.function.CheckedSupplier;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.ForwardErrorToUpstream;
Expand All @@ -45,7 +48,6 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
Expand All @@ -66,6 +68,8 @@ public class KafkaConnectorConsumer {

private Health consumerStatus = Health.unknown();

private final RetryPolicy<Object> retryPolicy;

public static ObjectMapper objectMapper =
new ObjectMapper()
.registerModule(new Jdk8Module())
Expand All @@ -86,11 +90,13 @@ public class KafkaConnectorConsumer {
public KafkaConnectorConsumer(
final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction,
final InboundConnectorContext connectorContext,
final KafkaConnectorProperties elementProps) {
final KafkaConnectorProperties elementProps,
final RetryPolicy<Object> retryPolicy) {
this.consumerCreatorFunction = consumerCreatorFunction;
this.context = connectorContext;
this.elementProps = elementProps;
this.executorService = Executors.newSingleThreadExecutor();
this.retryPolicy = retryPolicy;
}

public void startConsumer() {
Expand All @@ -100,13 +106,28 @@ public void startConsumer() {
AvroMapper avroMapper = new AvroMapper();
avroObjectReader = avroMapper.reader(avroSchema);
}
this.future =
CompletableFuture.runAsync(
() -> {
prepareConsumer();
consume();
},
this.executorService);

CheckedSupplier<Void> retryableFutureSupplier =
() -> {
try {
prepareConsumer();
consume();
return null;
} catch (Exception ex) {
LOG.error("Consumer loop failure, retry pending: {}", ex.getMessage());
throw ex;
}
};

future =
Failsafe.with(retryPolicy)
.with(executorService)
.getAsync(retryableFutureSupplier)
.exceptionally(
(e) -> {
shouldLoop = false;
return null;
});
}

private void prepareConsumer() {
Expand Down Expand Up @@ -160,9 +181,7 @@ public void consume() {
reportUp();
} catch (Exception ex) {
reportDown(ex);
if (ex instanceof OffsetOutOfRangeException) {
throw ex;
}
throw ex;
}
}
LOG.debug("Kafka inbound loop finished");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package io.camunda.connector.kafka.inbound;

import dev.failsafe.RetryPolicy;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
Expand All @@ -14,6 +15,7 @@
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.generator.dsl.BpmnType;
import io.camunda.connector.generator.java.annotation.ElementTemplate;
import java.time.Duration;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -63,13 +65,25 @@ public class KafkaExecutable implements InboundConnectorExecutable<InboundConnec
private final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction;
public KafkaConnectorConsumer kafkaConnectorConsumer;

private final RetryPolicy<Object> retryPolicy;

public KafkaExecutable(
final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction) {
final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction,
final RetryPolicy<Object> retryConfig) {
this.consumerCreatorFunction = consumerCreatorFunction;
this.retryPolicy = retryConfig;
}

private static final int INFINITE_RETRIES = -1;

public KafkaExecutable() {
this(KafkaConsumer::new);
this(
KafkaConsumer::new,
RetryPolicy.builder()
.handle(Exception.class)
.withDelay(Duration.ofSeconds(30))
.withMaxAttempts(INFINITE_RETRIES)
.build());
}

@Override
Expand All @@ -84,7 +98,7 @@ public void activate(InboundConnectorContext context) {
KafkaConnectorProperties elementProps =
context.bindProperties(KafkaConnectorProperties.class);
this.kafkaConnectorConsumer =
new KafkaConnectorConsumer(consumerCreatorFunction, context, elementProps);
new KafkaConnectorConsumer(consumerCreatorFunction, context, elementProps, retryPolicy);
this.kafkaConnectorConsumer.startConsumer();
context.log(
Activity.level(Severity.INFO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import dev.failsafe.RetryPolicy;
import io.camunda.connector.kafka.model.KafkaAuthentication;
import io.camunda.connector.kafka.model.KafkaTopic;
import io.camunda.connector.kafka.model.SerializationType;
import io.camunda.connector.test.inbound.InboundConnectorContextBuilder;
import io.camunda.connector.test.inbound.InboundConnectorDefinitionBuilder;
import io.camunda.connector.validation.impl.DefaultValidationProvider;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand All @@ -49,7 +52,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
Expand All @@ -60,13 +62,14 @@ public class KafkaExecutableTest {
private List<PartitionInfo> topicPartitions;
private KafkaConnectorProperties kafkaConnectorProperties;
private KafkaTopic kafkaTopic;
@Mock private KafkaConsumer<Object, Object> mockConsumer;
private KafkaConsumer<Object, Object> mockConsumer;

private String topic;

private final String processId = "Process_id";
private static final int MAX_ATTEMPTS = 3;

@BeforeEach
@SuppressWarnings("unchecked")
public void setUp() {
topic = "my-topic";
topicPartitions =
Expand Down Expand Up @@ -95,6 +98,7 @@ public void setUp() {
.validation(new DefaultValidationProvider())
.build();
originalContext = context;
mockConsumer = mock(KafkaConsumer.class);
}

@Test
Expand Down Expand Up @@ -148,6 +152,31 @@ void testActivateAndDeactivate() {
assertFalse(kafkaExecutable.kafkaConnectorConsumer.shouldLoop);
}

@Test
void testActivateAndDeactivate_consumerThrows() {
// Given
when(mockConsumer.partitionsFor(topic)).thenReturn(topicPartitions);
doNothing().when(mockConsumer).assign(any());
KafkaExecutable kafkaExecutable = getConsumerMock();
var groupMetadataMock = mock(ConsumerGroupMetadata.class);
when(groupMetadataMock.groupId()).thenReturn("groupId");
when(groupMetadataMock.groupInstanceId()).thenReturn(Optional.of("groupInstanceId"));
when(groupMetadataMock.generationId()).thenReturn(1);
when(mockConsumer.groupMetadata()).thenReturn(groupMetadataMock);

// When
when(mockConsumer.poll(any())).thenThrow(new RuntimeException("Test exception"));
kafkaExecutable.activate(context);
await()
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> assertFalse(kafkaExecutable.kafkaConnectorConsumer.shouldLoop));
kafkaExecutable.deactivate();

// Then
verify(mockConsumer, times(MAX_ATTEMPTS)).poll(any(Duration.class));
}

@Test
void testGetKafkaProperties() {
// When
Expand Down Expand Up @@ -242,7 +271,13 @@ public void testConvertDoubleEscapedCharactersRecordToKafkaInboundMessage() {
}

public KafkaExecutable getConsumerMock() {
return new KafkaExecutable(properties -> mockConsumer);
return new KafkaExecutable(
properties -> mockConsumer,
RetryPolicy.builder()
.handle(Exception.class)
.withDelay(Duration.ofMillis(50))
.withMaxAttempts(MAX_ATTEMPTS)
.build());
}

@ParameterizedTest
Expand Down
7 changes: 7 additions & 0 deletions parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ limitations under the License.</license.inlineheader>
<version.jackson-datatype-jsr310>2.17.1</version.jackson-datatype-jsr310>
<version.hibernate-validator>8.0.1.Final</version.hibernate-validator>
<version.jsonassert>1.5.1</version.jsonassert>
<version.failsafe>3.3.2</version.failsafe>

<version.spring-boot>3.3.0</version.spring-boot>
<version.spring-cloud-gcp-starter-logging>5.4.1</version.spring-cloud-gcp-starter-logging>
Expand Down Expand Up @@ -490,6 +491,12 @@ limitations under the License.</license.inlineheader>
<version>${version.bouncycastle}</version>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${version.failsafe}</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down