Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Commit

Permalink
Retries for HostInfo in InteractiveQueryService
Browse files Browse the repository at this point in the history
InteractiveQueryService methods for finding the host info for Kafka Streams
currently throw exceptions if the underlying KafkaStreams are not ready yet.
Introduce a retry mechanism so that the users can control the behaviour of
these methods by providing the following properties.

spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts (default 1)
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backoffPeriod (default 1000 ms).

Resolves #1185
  • Loading branch information
sobychacko committed Jan 11, 2022
1 parent 406e20f commit 3770db7
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 19 deletions.
6 changes: 5 additions & 1 deletion docs/src/main/asciidoc/kafka-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ ReadOnlyKeyValueStore<Object, Object> keyValueStore =
----

During the startup, the above method call to retrieve the store might fail.
For e.g it might still be in the middle of initializing the state store.
For example, it might still be in the middle of initializing the state store.
In such cases, it will be useful to retry this operation.
Kafka Streams binder provides a simple retry mechanism to accommodate this.

Expand Down Expand Up @@ -1296,6 +1296,10 @@ else {
}
----

For more information on these host finding methods, please see the Javadoc on the methods.
For these methods also, during startup, if the underlying KafkaStreams objects are not ready, they might throw exceptions.
The aforementioned retry properties are applicable for these methods as well.

==== Other API methods available through the InteractiveQueryService

Use the following API method to retrieve the `KeyQueryMetadata` object associated with the combination of given store and key.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,15 +84,7 @@ public InteractiveQueryService(KafkaStreamsRegistry kafkaStreamsRegistry,
*/
public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType) {

RetryTemplate retryTemplate = new RetryTemplate();

KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());

retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
final RetryTemplate retryTemplate = getRetryTemplate();

KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams();

Expand Down Expand Up @@ -191,7 +183,7 @@ public HostInfo getCurrentHostInfo() {
* through all the consumer instances under the same application id and retrieves the
* proper host.
*
* Note that the end user applications must provide `applicaiton.server` as a
* Note that the end user applications must provide `application.server` as a
* configuration property for all the application instances when calling this method.
* If this is not available, then null maybe returned.
* @param <K> generic type for key
Expand All @@ -201,11 +193,40 @@ public HostInfo getCurrentHostInfo() {
* @return the {@link HostInfo} where the key for the provided store is hosted currently
*/
public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
.stream()
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
return keyQueryMetadata != null ? keyQueryMetadata.getActiveHost() : null;
final RetryTemplate retryTemplate = getRetryTemplate();


return retryTemplate.execute(context -> {
Throwable throwable = null;
try {
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
.stream()
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
if (keyQueryMetadata != null) {
return keyQueryMetadata.activeHost();
}
}
catch (Exception e) {
throwable = e;
}
throw new IllegalStateException(
"Error when retrieving state store", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
});
}

private RetryTemplate getRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();

KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());

retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);

return retryTemplate;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -125,11 +126,39 @@ public void testStateStoreRetrievalRetry() {
catch (Exception ignored) {

}

Mockito.verify(mockKafkaStreams, times(3))
.store(StoreQueryParameters.fromNameAndType("foo", storeType));
}

@Test
public void testStateStoreRetrievalRetryForHostInfoService() {
StreamsBuilderFactoryBean mock = Mockito.mock(StreamsBuilderFactoryBean.class);
KafkaStreams mockKafkaStreams = Mockito.mock(KafkaStreams.class);
Mockito.when(mock.getKafkaStreams()).thenReturn(mockKafkaStreams);
KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry();
kafkaStreamsRegistry.registerKafkaStreams(mock);
Mockito.when(mock.isRunning()).thenReturn(true);
Properties mockProperties = new Properties();
mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "foobarApp-123");
Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties);
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties =
new KafkaStreamsBinderConfigurationProperties(new KafkaProperties());
binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);
InteractiveQueryService interactiveQueryService = new InteractiveQueryService(kafkaStreamsRegistry,
binderConfigurationProperties);

QueryableStoreType<ReadOnlyKeyValueStore<Object, Object>> storeType = QueryableStoreTypes.keyValueStore();
final StringSerializer serializer = new StringSerializer();
try {
interactiveQueryService.getHostInfo("foo", "fooKey", serializer);
}
catch (Exception ignored) {

}
Mockito.verify(mockKafkaStreams, times(3))
.queryMetadataForKey("foo", "fooKey", serializer);
}

@Test
@Ignore
public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {
Expand Down

0 comments on commit 3770db7

Please sign in to comment.