Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Terence <terencelimxp@gmail.com>
  • Loading branch information
terryyylim committed Oct 28, 2020
1 parent 5eb68bd commit 1f407e3
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,30 +104,19 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re
FeatureTableSpec featureTableSpec =
specService.getFeatureTableSpec(projectName, feature.get().getFeatureReference());
FeatureProto.FeatureSpecV2 tempFeatureSpecV2 =
featureTableSpec.getFeaturesList().stream()
.filter(
featureSpecV2 ->
featureSpecV2
.getName()
.equals(feature.get().getFeatureReference().getName()))
.collect(Collectors.toList())
.get(0);
String valueTypeString = tempFeatureSpecV2.getValueType().toString();
String valueCaseString = feature.get().getFeatureValue().getValCase().toString();
boolean isSameType = checkSameType(valueTypeString, valueCaseString);
specService.getFeatureSpec(feature.get().getFeatureReference());
ValueProto.ValueType.Enum valueTypeEnum = tempFeatureSpecV2.getValueType();
ValueProto.Value.ValCase valueCase = feature.get().getFeatureValue().getValCase();
boolean isSameFeatureType = checkSameType(valueTypeEnum, valueCase);

boolean isOutsideMaxAge = checkOutsideMaxAge(featureTableSpec, entityRow, feature);
Map<String, ValueProto.Value> valueMap =
unpackValueMap(feature, isOutsideMaxAge, isSameType);
unpackValueMap(feature, isOutsideMaxAge, isSameFeatureType);
allValueMaps.putAll(valueMap);

boolean isNotFound = false;
if (!isSameType) {
isNotFound = true;
}
// Generate metadata for feature values and merge into entityFieldsMap
Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, isNotFound, isOutsideMaxAge);
getMetadataMap(valueMap, !isSameFeatureType, isOutsideMaxAge);
allStatusMaps.putAll(statusMap);

// Populate metrics/log request
Expand Down Expand Up @@ -171,31 +160,32 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re
}
}

private boolean checkSameType(String valueTypeString, String valueCaseString) {
HashMap<String, String> typingMap =
private boolean checkSameType(
ValueProto.ValueType.Enum valueTypeEnum, ValueProto.Value.ValCase valueCase) {
HashMap<ValueProto.ValueType.Enum, ValueProto.Value.ValCase> typingMap =
new HashMap<>() {
{
put("BYTES", "BYTES_VAL");
put("STRING", "STRING_VAL");
put("INT32", "INT32_VAL");
put("INT64", "INT64_VAL");
put("DOUBLE", "DOUBLE_VAL");
put("FLOAT", "FLOAT_VAL");
put("BOOL", "BOOL_VAL");
put("BYTES_LIST", "BYTES_LIST_VAL");
put("STRING_LIST", "STRING_LIST_VAL");
put("INT32_LIST", "INT32_LIST_VAL");
put("INT64_LIST", "INT64_LIST_VAL");
put("DOUBLE_LIST", "DOUBLE_LIST_VAL");
put("FLOAT_LIST", "FLOAT_LIST_VAL");
put("BOOL_LIST", "BOOL_LIST_VAL");
put(ValueProto.ValueType.Enum.BYTES, ValueProto.Value.ValCase.BYTES_VAL);
put(ValueProto.ValueType.Enum.STRING, ValueProto.Value.ValCase.STRING_VAL);
put(ValueProto.ValueType.Enum.INT32, ValueProto.Value.ValCase.INT32_VAL);
put(ValueProto.ValueType.Enum.INT64, ValueProto.Value.ValCase.INT64_VAL);
put(ValueProto.ValueType.Enum.DOUBLE, ValueProto.Value.ValCase.DOUBLE_VAL);
put(ValueProto.ValueType.Enum.FLOAT, ValueProto.Value.ValCase.FLOAT_VAL);
put(ValueProto.ValueType.Enum.BOOL, ValueProto.Value.ValCase.BOOL_VAL);
put(ValueProto.ValueType.Enum.BYTES_LIST, ValueProto.Value.ValCase.BYTES_LIST_VAL);
put(ValueProto.ValueType.Enum.STRING_LIST, ValueProto.Value.ValCase.STRING_LIST_VAL);
put(ValueProto.ValueType.Enum.INT32_LIST, ValueProto.Value.ValCase.INT32_LIST_VAL);
put(ValueProto.ValueType.Enum.INT64_LIST, ValueProto.Value.ValCase.INT64_LIST_VAL);
put(ValueProto.ValueType.Enum.DOUBLE_LIST, ValueProto.Value.ValCase.DOUBLE_LIST_VAL);
put(ValueProto.ValueType.Enum.FLOAT_LIST, ValueProto.Value.ValCase.FLOAT_LIST_VAL);
put(ValueProto.ValueType.Enum.BOOL_LIST, ValueProto.Value.ValCase.BOOL_LIST_VAL);
}
};
if (valueCaseString.equals("VAL_NOT_SET")) {
if (valueCase.equals(ValueProto.Value.ValCase.VAL_NOT_SET)) {
return true;
}

return typingMap.get(valueTypeString).equals(valueCaseString);
return typingMap.get(valueTypeEnum).equals(valueCase);
}

private static Map<FeatureReferenceV2, Optional<Feature>> getFeatureRefFeatureMap(
Expand Down Expand Up @@ -241,11 +231,11 @@ private static Map<String, GetOnlineFeaturesResponse.FieldStatus> getMetadataMap
}

private static Map<String, ValueProto.Value> unpackValueMap(
Optional<Feature> feature, boolean isOutsideMaxAge, boolean isSameType) {
Optional<Feature> feature, boolean isOutsideMaxAge, boolean isSameFeatureType) {
Map<String, ValueProto.Value> valueMap = new HashMap<>();

if (feature.isPresent()) {
if (!isOutsideMaxAge && isSameType) {
if (!isOutsideMaxAge && isSameFeatureType) {
valueMap.put(
FeatureV2.getFeatureStringRef(feature.get().getFeatureReference()),
feature.get().getFeatureValue());
Expand Down
55 changes: 51 additions & 4 deletions serving/src/main/java/feast/serving/specs/CachedSpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import feast.proto.core.CoreServiceProto.ListFeatureTablesRequest;
import feast.proto.core.CoreServiceProto.ListFeatureTablesResponse;
import feast.proto.core.CoreServiceProto.ListProjectsRequest;
import feast.proto.core.FeatureProto;
import feast.proto.core.FeatureSetProto.FeatureSet;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetProto.FeatureSpec;
Expand All @@ -51,6 +52,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

Expand Down Expand Up @@ -92,6 +94,9 @@ public class CachedSpecService {
.help("number of feature sets served by this instance")
.register();

private final LoadingCache<ServingAPIProto.FeatureReferenceV2, FeatureProto.FeatureSpecV2>
featureCache;

public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) {
this.coreService = coreService;
this.store = coreService.registerStore(store);
Expand All @@ -104,12 +109,19 @@ public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) {
CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(featureSetCacheLoader);
featureSetCache.putAll(featureSets);

Map<String, FeatureTableSpec> featureTables = getFeatureTableMap();
Map<String, FeatureTableSpec> featureTables = getFeatureTableMap().getLeft();
CacheLoader<String, FeatureTableSpec> featureTableCacheLoader =
CacheLoader.from(featureTables::get);
featureTableCache =
CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(featureTableCacheLoader);
featureTableCache.putAll(featureTables);

Map<ServingAPIProto.FeatureReferenceV2, FeatureProto.FeatureSpecV2> features =
getFeatureTableMap().getRight();
CacheLoader<ServingAPIProto.FeatureReferenceV2, FeatureProto.FeatureSpecV2> featureCacheLoader =
CacheLoader.from(features::get);
featureCache = CacheBuilder.newBuilder().build(featureCacheLoader);
featureCache.putAll(features);
}

/**
Expand Down Expand Up @@ -241,12 +253,18 @@ public void populateCache() {

featureSetsCount.set(featureSetCache.size());

Map<String, FeatureTableSpec> featureTableMap = getFeatureTableMap();
Map<String, FeatureTableSpec> featureTableMap = getFeatureTableMap().getLeft();

featureTableCache.invalidateAll();
featureTableCache.putAll(featureTableMap);

featureTablesCount.set(featureTableCache.size());

Map<ServingAPIProto.FeatureReferenceV2, FeatureProto.FeatureSpecV2> featureMap =
getFeatureTableMap().getRight();
featureCache.invalidateAll();
featureCache.putAll(featureMap);

cacheLastUpdated.set(System.currentTimeMillis());
}

Expand Down Expand Up @@ -369,8 +387,13 @@ private Pair<String, String> generateFeatureToFeatureSetMapping(
*
* @return Map in the format of <project/featuretable_name: FeatureTableSpec>
*/
private Map<String, FeatureTableSpec> getFeatureTableMap() {
private ImmutablePair<
Map<String, FeatureTableSpec>,
Map<ServingAPIProto.FeatureReferenceV2, FeatureProto.FeatureSpecV2>>
getFeatureTableMap() {
HashMap<String, FeatureTableSpec> featureTables = new HashMap<>();
HashMap<ServingAPIProto.FeatureReferenceV2, FeatureProto.FeatureSpecV2> features =
new HashMap<>();

List<String> projects =
coreService.listProjects(ListProjectsRequest.newBuilder().build()).getProjectsList();
Expand All @@ -386,13 +409,24 @@ private Map<String, FeatureTableSpec> getFeatureTableMap() {
FeatureTableSpec spec = featureTable.getSpec();
// Key of Map is in the form of <project/featuretable_name>
featureTables.put(getFeatureTableStringRef(project, spec), spec);

String featureTableName = spec.getName();
List<FeatureProto.FeatureSpecV2> featureSpecs = spec.getFeaturesList();
for (FeatureProto.FeatureSpecV2 featureSpec : featureSpecs) {
ServingAPIProto.FeatureReferenceV2 featureReference =
ServingAPIProto.FeatureReferenceV2.newBuilder()
.setFeatureTable(featureTableName)
.setName(featureSpec.getName())
.build();
features.put(featureReference, featureSpec);
}
}
} catch (StatusRuntimeException e) {
throw new RuntimeException(
String.format("Unable to retrieve specs matching project %s", project), e);
}
}
return featureTables;
return ImmutablePair.of(featureTables, features);
}

public FeatureTableSpec getFeatureTableSpec(
Expand All @@ -408,4 +442,17 @@ public FeatureTableSpec getFeatureTableSpec(

return featureTableSpec;
}

public FeatureProto.FeatureSpecV2 getFeatureSpec(
ServingAPIProto.FeatureReferenceV2 featureReference) {
FeatureProto.FeatureSpecV2 featureSpec;
try {
featureSpec = featureCache.get(featureReference);
} catch (ExecutionException e) {
throw new SpecRetrievalException(
String.format("Unable to find Feature with name: %s", featureReference.getName()), e);
}

return featureSpec;
}
}

0 comments on commit 1f407e3

Please sign in to comment.