Skip to content

Commit

Permalink
fix(kafka): kafka connector loop retry (#2792)
Browse files Browse the repository at this point in the history
* fix(kafka): retry the main consumer loop with 30 seconds backoff

* reduce test wait duration

* lint

* switch to failsafe

* set number of retries to -1 (unlimited)

* use constant for retry number

* try fixing the integration test

* try fixing the integration test

* try fixing the integration test

* try fixing the integration test

* try fixing the integration test

* formatting

(cherry picked from commit 807071e)
  • Loading branch information
chillleader committed Jul 25, 2024
1 parent 10179b9 commit 3fa923e
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 24 deletions.
6 changes: 6 additions & 0 deletions connectors/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,16 @@ except in compliance with the proprietary license.</license.inlineheader>
<artifactId>commons-text</artifactId>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import dev.failsafe.function.CheckedSupplier;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.ForwardErrorToUpstream;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.Ignore;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.CorrelationResult.Failure;
import io.camunda.connector.api.inbound.CorrelationResult.Success;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import java.time.Duration;
Expand All @@ -36,12 +46,28 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectorConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectorConsumer.class);

private final InboundConnectorContext context;

private final ExecutorService executorService;

public CompletableFuture<?> future;

Consumer<Object, Object> consumer;

KafkaConnectorProperties elementProps;

private Health consumerStatus = Health.unknown();

private final RetryPolicy<Object> retryPolicy;

public static ObjectMapper objectMapper =
new ObjectMapper()
.registerModule(new Jdk8Module())
Expand All @@ -52,24 +78,20 @@ public class KafkaConnectorConsumer {
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES)
.enable(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature());
private final InboundConnectorContext context;
private final ExecutorService executorService;
private final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction;
public CompletableFuture<?> future;
Consumer<Object, Object> consumer;
KafkaConnectorProperties elementProps;
boolean shouldLoop = true;
private Health consumerStatus = Health.up();
private ObjectReader avroObjectReader;

public KafkaConnectorConsumer(
final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction,
final InboundConnectorContext connectorContext,
final KafkaConnectorProperties elementProps) {
final KafkaConnectorProperties elementProps,
final RetryPolicy<Object> retryPolicy) {
this.consumerCreatorFunction = consumerCreatorFunction;
this.context = connectorContext;
this.elementProps = elementProps;
this.executorService = Executors.newSingleThreadExecutor();
this.retryPolicy = retryPolicy;
}

public void startConsumer() {
Expand All @@ -80,13 +102,28 @@ public void startConsumer() {
AvroMapper avroMapper = new AvroMapper();
avroObjectReader = avroMapper.reader(avroSchema);
}
this.future =
CompletableFuture.runAsync(
() -> {
prepareConsumer();
consume();
},
this.executorService);

CheckedSupplier<Void> retryableFutureSupplier =
() -> {
try {
prepareConsumer();
consume();
return null;
} catch (Exception ex) {
LOG.error("Consumer loop failure, retry pending: {}", ex.getMessage());
throw ex;
}
};

future =
Failsafe.with(retryPolicy)
.with(executorService)
.getAsync(retryableFutureSupplier)
.exceptionally(
(e) -> {
shouldLoop = false;
return null;
});
}

private void prepareConsumer() {
Expand All @@ -111,9 +148,7 @@ public void consume() {
reportUp();
} catch (Exception ex) {
reportDown(ex);
if (ex instanceof OffsetOutOfRangeException) {
throw ex;
}
throw ex;
}
}
LOG.debug("Kafka inbound loop finished");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
*/
package io.camunda.connector.kafka.inbound;

import dev.failsafe.RetryPolicy;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.generator.dsl.BpmnType;
import io.camunda.connector.generator.java.annotation.ElementTemplate;
import java.time.Duration;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -23,13 +28,25 @@ public class KafkaExecutable implements InboundConnectorExecutable<InboundConnec
private final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction;
public KafkaConnectorConsumer kafkaConnectorConsumer;

private final RetryPolicy<Object> retryPolicy;

public KafkaExecutable(
final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction) {
final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction,
final RetryPolicy<Object> retryConfig) {
this.consumerCreatorFunction = consumerCreatorFunction;
this.retryPolicy = retryConfig;
}

private static final int INFINITE_RETRIES = -1;

public KafkaExecutable() {
this(KafkaConsumer::new);
this(
KafkaConsumer::new,
RetryPolicy.builder()
.handle(Exception.class)
.withDelay(Duration.ofSeconds(30))
.withMaxAttempts(INFINITE_RETRIES)
.build());
}

@Override
Expand All @@ -38,7 +55,7 @@ public void activate(InboundConnectorContext connectorContext) {
KafkaConnectorProperties elementProps =
connectorContext.bindProperties(KafkaConnectorProperties.class);
this.kafkaConnectorConsumer =
new KafkaConnectorConsumer(consumerCreatorFunction, connectorContext, elementProps);
new KafkaConnectorConsumer(consumerCreatorFunction, connectorContext, elementProps, retryPolicy);
this.kafkaConnectorConsumer.startConsumer();
} catch (Exception ex) {
connectorContext.reportHealth(Health.down(ex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import dev.failsafe.RetryPolicy;
import io.camunda.connector.kafka.model.KafkaAuthentication;
import io.camunda.connector.kafka.model.KafkaTopic;
import io.camunda.connector.test.inbound.InboundConnectorContextBuilder;
import io.camunda.connector.test.inbound.InboundConnectorDefinitionBuilder;
import io.camunda.connector.validation.impl.DefaultValidationProvider;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -59,8 +63,6 @@ public class KafkaExecutableTest {
private InboundConnectorContextBuilder.TestInboundConnectorContext originalContext;
private List<PartitionInfo> topicPartitions;
private KafkaConnectorProperties kafkaConnectorProperties;
@Mock private KafkaConsumer<Object, Object> mockConsumer;
private String topic;

private static Stream<Arguments> provideStringsForGetOffsets() {
return Stream.of(
Expand All @@ -72,8 +74,15 @@ private static Stream<Arguments> provideStringsForGetOffsets() {
Arguments.of("1,2,3,4,5", Arrays.asList(1L, 2L, 3L, 4L, 5L)),
Arguments.of(Arrays.asList(10L, 12L), Arrays.asList(10L, 12L)));
}
private KafkaTopic kafkaTopic;
private KafkaConsumer<Object, Object> mockConsumer;

private String topic;

private static final int MAX_ATTEMPTS = 3;

@BeforeEach
@SuppressWarnings("unchecked")
public void setUp() {
topic = "my-topic";
topicPartitions =
Expand All @@ -95,6 +104,7 @@ public void setUp() {
.validation(new DefaultValidationProvider())
.build();
originalContext = context;
mockConsumer = mock(KafkaConsumer.class);
}

@Test
Expand Down Expand Up @@ -146,6 +156,29 @@ void testActivateAndDeactivate() {
assertFalse(kafkaExecutable.kafkaConnectorConsumer.shouldLoop);
}

@Test
void testActivateAndDeactivate_consumerThrows() {
// Given
KafkaExecutable kafkaExecutable = getConsumerMock();
var groupMetadataMock = mock(ConsumerGroupMetadata.class);
when(groupMetadataMock.groupId()).thenReturn("groupId");
when(groupMetadataMock.groupInstanceId()).thenReturn(Optional.of("groupInstanceId"));
when(groupMetadataMock.generationId()).thenReturn(1);
when(mockConsumer.groupMetadata()).thenReturn(groupMetadataMock);

// When
when(mockConsumer.poll(any())).thenThrow(new RuntimeException("Test exception"));
kafkaExecutable.activate(context);
await()
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> assertFalse(kafkaExecutable.kafkaConnectorConsumer.shouldLoop));
kafkaExecutable.deactivate();

// Then
verify(mockConsumer, times(MAX_ATTEMPTS)).poll(any(Duration.class));
}

@Test
void testGetKafkaProperties() {
// When
Expand Down Expand Up @@ -234,7 +267,13 @@ public void testConvertDoubleEscapedCharactersRecordToKafkaInboundMessage() {
}

public KafkaExecutable getConsumerMock() {
return new KafkaExecutable(properties -> mockConsumer);
return new KafkaExecutable(
properties -> mockConsumer,
RetryPolicy.builder()
.handle(Exception.class)
.withDelay(Duration.ofMillis(50))
.withMaxAttempts(MAX_ATTEMPTS)
.build());
}

@ParameterizedTest
Expand Down
7 changes: 7 additions & 0 deletions parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ limitations under the License.</license.inlineheader>
<version.jackson-datatype-jsr310>2.17.1</version.jackson-datatype-jsr310>
<version.hibernate-validator>8.0.1.Final</version.hibernate-validator>
<version.jsonassert>1.5.3</version.jsonassert>
<version.failsafe>3.3.2</version.failsafe>

<version.spring-boot>3.2.4</version.spring-boot>
<version.spring-cloud-gcp-starter-logging>5.1.0</version.spring-cloud-gcp-starter-logging>
Expand Down Expand Up @@ -466,6 +467,12 @@ limitations under the License.</license.inlineheader>
<version>${version.bouncycastle}</version>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${version.failsafe}</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down

0 comments on commit 3fa923e

Please sign in to comment.