diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index 95d3a3e9c4..2daa83fbfb 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -19,7 +19,7 @@ feast: store: # Path containing the store configuration for this serving store. - config-path: ${FEAST_STORE_CONFIG_PATH:./sample_redis_config.yml} + config-path: ${FEAST_STORE_CONFIG_PATH:serving/sample_redis_config.yml} # If serving redis, the redis pool max size redis-pool-max-size: ${FEAST_REDIS_POOL_MAX_SIZE:128} # If serving redis, the redis pool max idle conns diff --git a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java index c1b64ab484..84c3f96035 100644 --- a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java +++ b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java @@ -11,9 +11,9 @@ import com.google.protobuf.Timestamp; import feast.core.FeatureSetProto.EntitySpec; import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.serving.ServingAPIProto.FeatureSetRequest; import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest; import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; -import feast.serving.ServingAPIProto.FeatureSetRequest; import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; import feast.storage.RedisProto.RedisKey; @@ -53,14 +53,6 @@ public class RedisServingServiceTest { @Before public void setUp() { initMocks(this); - FeatureSetSpec featureSetSpec = FeatureSetSpec.newBuilder() - .addEntities(EntitySpec.newBuilder().setName("entity1")) - .addEntities(EntitySpec.newBuilder().setName("entity2")) - .setMaxAge(Duration.newBuilder().setSeconds(30)) // default - .build(); - - when(specService.getFeatureSet("featureSet", 1)) - .thenReturn(featureSetSpec); redisServingService = new RedisServingService(jedisPool, specService, tracer); redisKeyList = Lists.newArrayList( @@ -124,6 +116,72 @@ public void shouldReturnResponseWithValuesIfKeysPresent() { List featureRowBytes = featureRows.stream() .map(AbstractMessageLite::toByteArray) .collect(Collectors.toList()); + when(specService.getFeatureSet("featureSet", 1)).thenReturn(getFeatureSetSpec()); + when(jedisPool.getResource()).thenReturn(jedis); + when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); + when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); + + GetOnlineFeaturesResponse expected = GetOnlineFeaturesResponse.newBuilder() + .addFieldValues(FieldValues.newBuilder() + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a")) + .putFields("featureSet:1:feature1", intValue(1)) + .putFields("featureSet:1:feature2", intValue(1))) + .addFieldValues(FieldValues.newBuilder() + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b")) + .putFields("featureSet:1:feature1", intValue(2)) + .putFields("featureSet:1:feature2", intValue(2))) + .build(); + GetOnlineFeaturesResponse actual = redisServingService.getOnlineFeatures(request); + assertThat(responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray())); + } + + @Test + public void shouldReturnResponseWithValuesWhenFeatureSetSpecHasUnspecifiedMaxAge() { + GetOnlineFeaturesRequest request = GetOnlineFeaturesRequest.newBuilder() + .addFeatureSets(FeatureSetRequest.newBuilder() + .setName("featureSet") + .setVersion(1) + .addAllFeatureNames(Lists.newArrayList("feature1", "feature2")) + .build()) + .addEntityRows(EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(1)) + .putFields("entity2", strValue("a"))) + .addEntityRows(EntityRow.newBuilder() + .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100)) + .putFields("entity1", intValue(2)) + .putFields("entity2", strValue("b"))) + .build(); + + List featureRows = Lists.newArrayList( + FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setSeconds(2)) // much older timestamp + .addAllFields(Lists + .newArrayList( + Field.newBuilder().setName("entity1").setValue(intValue(1)).build(), + Field.newBuilder().setName("entity2").setValue(strValue("a")).build(), + Field.newBuilder().setName("feature1").setValue(intValue(1)).build(), + Field.newBuilder().setName("feature2").setValue(intValue(1)).build())) + .setFeatureSet("featureSet:1") + .build(), + FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setSeconds(15)) // much older timestamp + .addAllFields(Lists + .newArrayList( + Field.newBuilder().setName("entity1").setValue(intValue(2)).build(), + Field.newBuilder().setName("entity2").setValue(strValue("b")).build(), + Field.newBuilder().setName("feature1").setValue(intValue(2)).build(), + Field.newBuilder().setName("feature2").setValue(intValue(2)).build())) + .setFeatureSet("featureSet:1") + .build() + ); + + List featureRowBytes = featureRows.stream() + .map(AbstractMessageLite::toByteArray) + .collect(Collectors.toList()); + when(specService.getFeatureSet("featureSet", 1)).thenReturn(getFeatureSetSpecWithNoMaxAge()); when(jedisPool.getResource()).thenReturn(jedis); when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); @@ -187,6 +245,7 @@ public void shouldReturnResponseWithUnsetValuesIfKeysNotPresent() { ); List featureRowBytes = Lists.newArrayList(featureRows.get(0).toByteArray(), null); + when(specService.getFeatureSet("featureSet", 1)).thenReturn(getFeatureSetSpec()); when(jedisPool.getResource()).thenReturn(jedis); when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); @@ -253,6 +312,7 @@ public void shouldReturnResponseWithUnsetValuesIfMaxAgeIsExceeded() { List featureRowBytes = featureRows.stream() .map(AbstractMessageLite::toByteArray) .collect(Collectors.toList()); + when(specService.getFeatureSet("featureSet", 1)).thenReturn(getFeatureSetSpec()); when(jedisPool.getResource()).thenReturn(jedis); when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); @@ -319,6 +379,7 @@ public void shouldReturnResponseWithUnsetValuesIfDefaultMaxAgeIsExceeded() { List featureRowBytes = featureRows.stream() .map(AbstractMessageLite::toByteArray) .collect(Collectors.toList()); + when(specService.getFeatureSet("featureSet", 1)).thenReturn(getFeatureSetSpec()); when(jedisPool.getResource()).thenReturn(jedis); when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); @@ -385,6 +446,7 @@ public void shouldFilterOutUndesiredRows() { List featureRowBytes = featureRows.stream() .map(AbstractMessageLite::toByteArray) .collect(Collectors.toList()); + when(specService.getFeatureSet("featureSet", 1)).thenReturn(getFeatureSetSpec()); when(jedisPool.getResource()).thenReturn(jedis); when(jedis.mget(redisKeyList)).thenReturn(featureRowBytes); when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class)); @@ -414,4 +476,20 @@ private Value intValue(int val) { private Value strValue(String val) { return Value.newBuilder().setStringVal(val).build(); } + private FeatureSetSpec getFeatureSetSpec() { + return FeatureSetSpec.newBuilder() + .addEntities(EntitySpec.newBuilder().setName("entity1")) + .addEntities(EntitySpec.newBuilder().setName("entity2")) + .setMaxAge(Duration.newBuilder().setSeconds(30)) // default + .build(); + } + + private FeatureSetSpec getFeatureSetSpecWithNoMaxAge() { + return FeatureSetSpec.newBuilder() + .addEntities(EntitySpec.newBuilder().setName("entity1")) + .addEntities(EntitySpec.newBuilder().setName("entity2")) + .setMaxAge(Duration.newBuilder().setSeconds(0).setNanos(0).build()) + .build(); + } + } \ No newline at end of file