Skip to content

Commit

Permalink
read from configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Jan 8, 2021
1 parent c4df8c9 commit 29c3c33
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
8 changes: 8 additions & 0 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ message Store {
// Optional. This would be the fallback prefix to use if enable_fallback is true.
string fallback_prefix = 7;

// Optional. Priority of nodes when reading from cluster
enum ReadFrom {
MASTER = 0;
MASTER_PREFERRED = 1;
REPLICA = 2;
REPLICA_PREFERRED = 3;
}
ReadFrom read_from = 8;
}

message Subscription {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import feast.storage.api.retriever.OnlineRetrieverV2;
import feast.storage.connectors.redis.retriever.*;
import io.opentracing.Tracer;
import java.util.Map;
import org.slf4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -41,16 +40,16 @@ public ServingServiceV2 servingServiceV2(
ServingServiceV2 servingService = null;
FeastProperties.Store store = feastProperties.getActiveStore();
StoreProto.Store.StoreType storeType = store.toProto().getType();
Map<String, String> config = store.getConfig();

switch (storeType) {
case REDIS_CLUSTER:
RedisClientAdapter redisClusterClient = RedisClusterClient.create(config);
RedisClientAdapter redisClusterClient =
RedisClusterClient.create(store.toProto().getRedisClusterConfig());
OnlineRetrieverV2 redisClusterRetriever = new OnlineRetriever(redisClusterClient);
servingService = new OnlineServingServiceV2(redisClusterRetriever, specService, tracer);
break;
case REDIS:
RedisClientAdapter redisClient = RedisClient.create(config);
RedisClientAdapter redisClient = RedisClient.create(store.toProto().getRedisConfig());
OnlineRetrieverV2 redisRetriever = new OnlineRetriever(redisClient);
servingService = new OnlineServingServiceV2(redisRetriever, specService, tracer);
break;
Expand Down
3 changes: 2 additions & 1 deletion serving/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ feast:
type: REDIS_CLUSTER
config: # Store specific configuration.
# Connection string specifies the host:port of Redis instances in the redis cluster.
connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
read_from: MASTER
subscriptions:
- name: "*"
project: "*"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
*/
package feast.storage.connectors.redis.retriever;

import feast.proto.core.StoreProto;
import io.lettuce.core.KeyValue;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import java.util.List;
import java.util.Map;

public class RedisClient implements RedisClientAdapter {

Expand All @@ -46,11 +46,11 @@ private RedisClient(StatefulRedisConnection<byte[], byte[]> connection) {
this.asyncCommands.setAutoFlushCommands(false);
}

public static RedisClientAdapter create(Map<String, String> config) {
public static RedisClientAdapter create(StoreProto.Store.RedisConfig config) {

RedisURI uri = RedisURI.create(config.get("host"), Integer.parseInt(config.get("port")));
RedisURI uri = RedisURI.create(config.getHost(), config.getPort());

if (Boolean.parseBoolean(config.get("ssl"))) {
if (config.getSsl()) {
uri.setSsl(true);
}
StatefulRedisConnection<byte[], byte[]> connection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
*/
package feast.storage.connectors.redis.retriever;

import com.google.common.collect.ImmutableMap;
import feast.proto.core.StoreProto;
import feast.proto.core.StoreProto.Store.RedisClusterConfig;
import feast.storage.connectors.redis.serializer.RedisKeyPrefixSerializerV2;
import feast.storage.connectors.redis.serializer.RedisKeySerializerV2;
import io.lettuce.core.KeyValue;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
Expand All @@ -36,6 +40,13 @@ public class RedisClusterClient implements RedisClientAdapter {
private final RedisKeySerializerV2 serializer;
@Nullable private final RedisKeySerializerV2 fallbackSerializer;

private static final Map<RedisClusterConfig.ReadFrom, ReadFrom> PROTO_TO_LETTUCE_TYPES =
ImmutableMap.of(
RedisClusterConfig.ReadFrom.MASTER, ReadFrom.MASTER,
RedisClusterConfig.ReadFrom.MASTER_PREFERRED, ReadFrom.MASTER_PREFERRED,
RedisClusterConfig.ReadFrom.REPLICA, ReadFrom.REPLICA,
RedisClusterConfig.ReadFrom.REPLICA_PREFERRED, ReadFrom.REPLICA_PREFERRED);

@Override
public RedisFuture<List<KeyValue<byte[], byte[]>>> hmget(byte[] key, byte[]... fields) {
return asyncCommands.hmget(key, fields);
Expand Down Expand Up @@ -73,13 +84,16 @@ private RedisClusterClient(Builder builder) {
this.serializer = builder.serializer;
this.fallbackSerializer = builder.fallbackSerializer;

// allows reading from replicas
this.asyncCommands.readOnly();

// Disable auto-flushing
this.asyncCommands.setAutoFlushCommands(false);
}

public static RedisClientAdapter create(Map<String, String> config) {
public static RedisClientAdapter create(StoreProto.Store.RedisClusterConfig config) {
List<RedisURI> redisURIList =
Arrays.stream(config.get("connection_string").split(","))
Arrays.stream(config.getConnectionString().split(","))
.map(
hostPort -> {
String[] hostPortSplit = hostPort.trim().split(":");
Expand All @@ -90,14 +104,15 @@ public static RedisClientAdapter create(Map<String, String> config) {
io.lettuce.core.cluster.RedisClusterClient.create(redisURIList)
.connect(new ByteArrayCodec());

RedisKeySerializerV2 serializer =
new RedisKeyPrefixSerializerV2(config.getOrDefault("key_prefix", ""));
connection.setReadFrom(PROTO_TO_LETTUCE_TYPES.get(config.getReadFrom()));

RedisKeySerializerV2 serializer = new RedisKeyPrefixSerializerV2(config.getKeyPrefix());

Builder builder = new Builder(connection, serializer);

if (Boolean.parseBoolean(config.getOrDefault("enable_fallback", "false"))) {
if (config.getEnableFallback()) {
RedisKeySerializerV2 fallbackSerializer =
new RedisKeyPrefixSerializerV2(config.getOrDefault("fallback_prefix", ""));
new RedisKeyPrefixSerializerV2(config.getKeyPrefix());
builder = builder.withFallbackSerializer(fallbackSerializer);
}

Expand Down

0 comments on commit 29c3c33

Please sign in to comment.