Skip to content

Commit

Permalink
MINOR: update usage of deprecated APIs
Browse files Browse the repository at this point in the history
Remove deprecated usage of "schema.registry.basic.auth.user.info"
Replace deprecated state store usage
  • Loading branch information
saurabh-slacklife authored Apr 13, 2021
1 parent c9aada8 commit 9ecea97
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

Expand Down Expand Up @@ -216,7 +217,7 @@ public void close() {
}

// Processor that keeps the global store updated.
private static class GlobalStoreUpdater<K, V> implements Processor<K, V> {
private static class GlobalStoreUpdater<K, V, KIn, KOut> implements Processor<K, V, KIn, KOut> {

private final String storeName;

Expand All @@ -227,17 +228,18 @@ public GlobalStoreUpdater(final String storeName) {
private KeyValueStore<K, V> store;

@Override
public void init(final ProcessorContext processorContext) {
store = (KeyValueStore<K, V>) processorContext.getStateStore(storeName);
public void init(
final org.apache.kafka.streams.processor.api.ProcessorContext<KIn, KOut> processorContext) {
store = processorContext.getStateStore(storeName);
}

@Override
public void process(final K key, final V value) {
public void process(final Record<K, V> record) {
// We are only supposed to put operation the keep the store updated.
// We should not filter record or modify the key or value
// Doing so would break fault-tolerance.
// see https://issues.apache.org/jira/browse/KAFKA-7663
store.put(key, value);
store.put(record.key(), record.value());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package io.confluent.examples.streams.interactivequeries;

import java.util.Collections;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.state.StreamsMetadata;

import javax.ws.rs.NotFoundException;
Expand Down Expand Up @@ -70,14 +72,13 @@ public <K> HostStoreInfo streamsMetadataForStoreAndKey(final String store,
final Serializer<K> serializer) {
// Get metadata for the instances of this Kafka Streams application hosting the store and
// potentially the value for key
final StreamsMetadata metadata = streams.metadataForKey(store, key, serializer);
final KeyQueryMetadata metadata = streams.queryMetadataForKey(store, key, serializer);
if (metadata == null) {
throw new NotFoundException();
}

return new HostStoreInfo(metadata.host(),
metadata.port(),
metadata.stateStoreNames());
return new HostStoreInfo(metadata.activeHost().host(),
metadata.activeHost().port(), Collections.singleton(store));
}

private List<HostStoreInfo> mapInstancesToHostStoreInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
Expand Down Expand Up @@ -89,7 +90,8 @@ public KeyValueBean byKey(@PathParam("storeName") final String storeName,
}

// Lookup the KeyValueStore with the provided storeName
final ReadOnlyKeyValueStore<String, Long> store = streams.store(storeName, QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
if (store == null) {
throw new NotFoundException();
}
Expand Down Expand Up @@ -158,8 +160,8 @@ public List<KeyValueBean> windowedByKey(@PathParam("storeName") final String sto
@PathParam("to") final Long to) {

// Lookup the WindowStore with the provided storeName
final ReadOnlyWindowStore<String, Long> store = streams.store(storeName,
QueryableStoreTypes.windowStore());
final ReadOnlyWindowStore<String, Long> store = streams
.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.windowStore()));
if (store == null) {
throw new NotFoundException();
}
Expand Down Expand Up @@ -227,7 +229,8 @@ private List<KeyValueBean> rangeForKeyValueStore(final String storeName,
KeyValueIterator<String, Long>> rangeFunction) {

// Get the KeyValue Store
final ReadOnlyKeyValueStore<String, Long> store = streams.store(storeName, QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
final List<KeyValueBean> results = new ArrayList<>();
// Apply the function, i.e., query the store
final KeyValueIterator<String, Long> range = rangeFunction.apply(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
Expand Down Expand Up @@ -130,7 +131,7 @@ private List<SongPlayCountBean> topFiveSongs(final String key,
final String storeName) {

final ReadOnlyKeyValueStore<String, KafkaMusicExample.TopFiveSongs> topFiveStore =
streams.store(storeName, QueryableStoreTypes.keyValueStore());
streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
// Get the value from the store
final KafkaMusicExample.TopFiveSongs value = topFiveStore.get(key);
if (value == null) {
Expand All @@ -157,8 +158,9 @@ private List<SongPlayCountBean> topFiveSongs(final String key,
songPlayCount.getPlays()));
} else {
// look in the local store
final ReadOnlyKeyValueStore<Long, Song> songStore = streams.store(KafkaMusicExample.ALL_SONGS,
QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<Long, Song> songStore = streams.store(StoreQueryParameters
.fromNameAndType(KafkaMusicExample.ALL_SONGS, QueryableStoreTypes.keyValueStore()));

final Song song = songStore.get(songPlayCount.getSongId());
results.add(new SongPlayCountBean(song.getArtist(), song.getAlbum(), song.getName(),
songPlayCount.getPlays()));
Expand All @@ -171,8 +173,8 @@ private List<SongPlayCountBean> topFiveSongs(final String key,
@Path("/song/{id}")
@Produces(MediaType.APPLICATION_JSON)
public SongBean song(@PathParam("id") final Long songId) {
final ReadOnlyKeyValueStore<Long, Song> songStore = streams.store(KafkaMusicExample.ALL_SONGS,
QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<Long, Song> songStore = streams.store(StoreQueryParameters
.fromNameAndType(KafkaMusicExample.ALL_SONGS, QueryableStoreTypes.keyValueStore()));
final Song song = songStore.get(songId);
if (song == null) {
throw new NotFoundException(String.format("Song with id [%d] was not found", songId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ private static void createTopics() {
map.put(BASIC_AUTH_CREDENTIALS_SOURCE, config.getProperty(BASIC_AUTH_CREDENTIALS_SOURCE));
if (config.containsKey(USER_INFO_CONFIG))
map.put(USER_INFO_CONFIG, config.getProperty(USER_INFO_CONFIG));
else if (config.containsKey(SCHEMA_REGISTRY_USER_INFO_CONFIG))
map.put(USER_INFO_CONFIG, config.getProperty(SCHEMA_REGISTRY_USER_INFO_CONFIG));
return map;
}
public static void configureSerdes(final Properties config) {
Expand Down

0 comments on commit 9ecea97

Please sign in to comment.