diff --git a/common/pom.xml b/common/pom.xml new file mode 100644 index 0000000000..7f22764748 --- /dev/null +++ b/common/pom.xml @@ -0,0 +1,60 @@ + + + + 4.0.0 + + + feast-parent + dev.feast + ${revision} + + + Feast Common + Feast common module with functionality that can be reused + feast-common + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M4 + + -Xms2048m -Xmx2048m -Djdk.net.URLClassPath.disableClassPathURLCheck=true + + + + + + + + dev.feast + datatypes-java + ${project.version} + compile + + + junit + junit + 4.12 + test + + + \ No newline at end of file diff --git a/serving/src/main/java/feast/serving/util/RefUtil.java b/common/src/main/java/feast/common/models/Feature.java similarity index 60% rename from serving/src/main/java/feast/serving/util/RefUtil.java rename to common/src/main/java/feast/common/models/Feature.java index e3c36f1f9f..413193b79a 100644 --- a/serving/src/main/java/feast/serving/util/RefUtil.java +++ b/common/src/main/java/feast/common/models/Feature.java @@ -1,6 +1,6 @@ /* * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors + * Copyright 2018-2020 The Feast Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,25 +14,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.serving.util; +package feast.common.models; -import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.serving.ServingAPIProto.FeatureReference; -public class RefUtil { - public static String generateFeatureStringRef(FeatureReference featureReference) { +public class Feature { + + /** + * Accepts FeatureReference object and returns its reference in String + * "project/featureset_name:feature_name". + * + * @param featureReference {@link FeatureReference} + * @param ignoreProject Flag whether to return FeatureReference with project name + * @return String format of FeatureReference + */ + public static String getFeatureStringRef( + FeatureReference featureReference, boolean ignoreProject) { String ref = featureReference.getName(); if (!featureReference.getFeatureSet().isEmpty()) { ref = featureReference.getFeatureSet() + ":" + ref; } - if (!featureReference.getProject().isEmpty()) { + if (!featureReference.getProject().isEmpty() && !ignoreProject) { ref = featureReference.getProject() + "/" + ref; } return ref; } - - public static String generateFeatureSetStringRef(FeatureSetSpec featureSetSpec) { - String ref = String.format("%s/%s", featureSetSpec.getProject(), featureSetSpec.getName()); - return ref; - } } diff --git a/common/src/main/java/feast/common/models/FeatureSet.java b/common/src/main/java/feast/common/models/FeatureSet.java new file mode 100644 index 0000000000..f9db0f744f --- /dev/null +++ b/common/src/main/java/feast/common/models/FeatureSet.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.common.models; + +import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.FeatureSetReferenceProto.FeatureSetReference; + +public class FeatureSet { + + /** + * Accepts FeatureSetSpec object and returns its reference in String "project/featureset_name". + * + * @param featureSetSpec {@link FeatureSetSpec} + * @return String format of FeatureSetReference + */ + public static String getFeatureSetStringRef(FeatureSetSpec featureSetSpec) { + return String.format("%s/%s", featureSetSpec.getProject(), featureSetSpec.getName()); + } + + /** + * Accepts FeatureSetReference object and returns its reference in String + * "project/featureset_name". + * + * @param featureSetReference {@link FeatureSetReference} + * @return String format of FeatureSetReference + */ + public static String getFeatureSetStringRef(FeatureSetReference featureSetReference) { + return String.format("%s/%s", featureSetReference.getProject(), featureSetReference.getName()); + } +} diff --git a/common/src/main/java/feast/common/models/Store.java b/common/src/main/java/feast/common/models/Store.java new file mode 100644 index 0000000000..0644d5f46a --- /dev/null +++ b/common/src/main/java/feast/common/models/Store.java @@ -0,0 +1,98 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.common.models; + +import feast.proto.core.StoreProto.Store.Subscription; +import java.util.List; +import java.util.regex.Pattern; + +public class Store { + + /** + * Accepts a Subscription class object and returns it in string format + * + * @param subscription Subscription class to be converted to string format + * @return String formatted Subscription class + */ + public static String parseSubscriptionFrom(Subscription subscription) { + if (subscription.getName().isEmpty() || subscription.getProject().isEmpty()) { + throw new IllegalArgumentException( + String.format("Missing arguments in subscription string: %s", subscription.toString())); + } + + return String.format("%s:%s", subscription.getProject(), subscription.getName()); + } + + /** + * Accepts a exclude parameter to determine whether to return subscriptions that are excluded. + * + * @param subscription String formatted Subscription to be converted to Subscription class + * @return Subscription class with its respective attributes + */ + public static Subscription convertStringToSubscription(String subscription) { + if (subscription.equals("")) { + return Subscription.newBuilder().build(); + } + String[] split = subscription.split(":"); + return Subscription.newBuilder().setProject(split[0]).setName(split[1]).build(); + } + + /** + * The current use of this function is to determine whether a FeatureRow is subscribed to a + * Featureset. + * + * @param subscriptions List of Subscriptions available in Store + * @param projectName Project name used for matching Subscription's Project + * @param featureSetName Featureset name used for matching Subscription's Featureset + * @return boolean flag to signify if FeatureRow is subscribed to Featureset + */ + public static boolean isSubscribedToFeatureSet( + List subscriptions, String projectName, String featureSetName) { + for (Subscription sub : subscriptions) { + // If configuration missing, fail + if (sub.getProject().isEmpty() || sub.getName().isEmpty()) { + throw new IllegalArgumentException( + String.format("Subscription is missing arguments: %s", sub.toString())); + } + + // If all wildcards, subscribe to everything + if (sub.getProject().equals("*") || sub.getName().equals("*")) { + return true; + } + + // Match project name + if (!projectName.equals(sub.getProject())) { + continue; + } + + // Convert wildcard to regex + String subName = sub.getName(); + if (!sub.getName().contains(".*")) { + subName = subName.replace("*", ".*"); + } + + // Match feature set name to pattern + Pattern pattern = Pattern.compile(subName); + if (!pattern.matcher(featureSetName).matches()) { + continue; + } + return true; + } + + return false; + } +} diff --git a/common/src/test/java/feast/common/models/FeatureSetTest.java b/common/src/test/java/feast/common/models/FeatureSetTest.java new file mode 100644 index 0000000000..52b7dd36ad --- /dev/null +++ b/common/src/test/java/feast/common/models/FeatureSetTest.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.common.models; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +import feast.proto.core.FeatureSetProto.EntitySpec; +import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.FeatureSetProto.FeatureSpec; +import feast.proto.core.FeatureSetReferenceProto; +import feast.proto.types.ValueProto; +import java.util.Arrays; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.tensorflow.metadata.v0.*; + +public class FeatureSetTest { + + private List entitySpecs; + private List featureSpecs; + + @Before + public void setUp() { + // Entity Specs + EntitySpec entitySpec1 = + EntitySpec.newBuilder() + .setName("entity1") + .setValueType(ValueProto.ValueType.Enum.INT64) + .build(); + EntitySpec entitySpec2 = + EntitySpec.newBuilder() + .setName("entity2") + .setValueType(ValueProto.ValueType.Enum.INT64) + .build(); + + // Feature Specs + FeatureSpec featureSpec1 = + FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(ValueProto.ValueType.Enum.INT64) + .setPresence(FeaturePresence.getDefaultInstance()) + .setShape(FixedShape.getDefaultInstance()) + .setDomain("mydomain") + .build(); + FeatureSpec featureSpec2 = + FeatureSpec.newBuilder() + .setName("feature2") + .setValueType(ValueProto.ValueType.Enum.INT64) + .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setIntDomain(IntDomain.getDefaultInstance()) + .build(); + + entitySpecs = Arrays.asList(entitySpec1, entitySpec2); + featureSpecs = Arrays.asList(featureSpec1, featureSpec2); + } + + @Test + public void shouldReturnFeatureSetStringRef() { + FeatureSetSpec featureSetSpec = + FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("featureSetWithConstraints") + .addAllEntities(entitySpecs) + .addAllFeatures(featureSpecs) + .build(); + + FeatureSetReferenceProto.FeatureSetReference featureSetReference = + FeatureSetReferenceProto.FeatureSetReference.newBuilder() + .setName(featureSetSpec.getName()) + .setProject(featureSetSpec.getProject()) + .build(); + + String actualFeatureSetStringRef1 = FeatureSet.getFeatureSetStringRef(featureSetSpec); + String actualFeatureSetStringRef2 = FeatureSet.getFeatureSetStringRef(featureSetReference); + String expectedFeatureSetStringRef = "project1/featureSetWithConstraints"; + + assertThat(actualFeatureSetStringRef1, equalTo(expectedFeatureSetStringRef)); + assertThat(actualFeatureSetStringRef2, equalTo(expectedFeatureSetStringRef)); + } +} diff --git a/common/src/test/java/feast/common/models/FeaturesTest.java b/common/src/test/java/feast/common/models/FeaturesTest.java new file mode 100644 index 0000000000..10d62878ba --- /dev/null +++ b/common/src/test/java/feast/common/models/FeaturesTest.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.common.models; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +import feast.proto.core.FeatureSetProto.EntitySpec; +import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.FeatureSetProto.FeatureSpec; +import feast.proto.serving.ServingAPIProto; +import feast.proto.types.ValueProto; +import java.util.Arrays; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.tensorflow.metadata.v0.*; + +public class FeaturesTest { + + private List entitySpecs; + private List featureSpecs; + + @Before + public void setUp() { + // Entity Specs + EntitySpec entitySpec1 = + EntitySpec.newBuilder() + .setName("entity1") + .setValueType(ValueProto.ValueType.Enum.INT64) + .build(); + EntitySpec entitySpec2 = + EntitySpec.newBuilder() + .setName("entity2") + .setValueType(ValueProto.ValueType.Enum.INT64) + .build(); + + // Feature Specs + FeatureSpec featureSpec1 = + FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(ValueProto.ValueType.Enum.INT64) + .setPresence(FeaturePresence.getDefaultInstance()) + .setShape(FixedShape.getDefaultInstance()) + .setDomain("mydomain") + .build(); + FeatureSpec featureSpec2 = + FeatureSpec.newBuilder() + .setName("feature2") + .setValueType(ValueProto.ValueType.Enum.INT64) + .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setIntDomain(IntDomain.getDefaultInstance()) + .build(); + + entitySpecs = Arrays.asList(entitySpec1, entitySpec2); + featureSpecs = Arrays.asList(featureSpec1, featureSpec2); + } + + @Test + public void shouldReturnFeatureStringRef() { + FeatureSetSpec featureSetSpec = + FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("featureSetWithConstraints") + .addAllEntities(entitySpecs) + .addAllFeatures(featureSpecs) + .build(); + + ServingAPIProto.FeatureReference featureReference = + ServingAPIProto.FeatureReference.newBuilder() + .setProject(featureSetSpec.getProject()) + .setFeatureSet(featureSetSpec.getName()) + .setName(featureSetSpec.getFeatures(0).getName()) + .build(); + + String actualFeatureStringRef = Feature.getFeatureStringRef(featureReference, false); + String actualFeatureIgnoreProjectStringRef = + Feature.getFeatureStringRef(featureReference, true); + String expectedFeatureStringRef = "project1/featureSetWithConstraints:feature1"; + String expectedFeatureIgnoreProjectStringRef = "featureSetWithConstraints:feature1"; + + assertThat(actualFeatureStringRef, equalTo(expectedFeatureStringRef)); + assertThat(actualFeatureIgnoreProjectStringRef, equalTo(expectedFeatureIgnoreProjectStringRef)); + } +} diff --git a/core/pom.xml b/core/pom.xml index c4b3c0f4aa..ab6239365c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -69,6 +69,11 @@ feast-ingestion ${project.version} + + dev.feast + feast-common + ${project.version} + diff --git a/core/src/main/java/feast/core/model/Store.java b/core/src/main/java/feast/core/model/Store.java index 5561433012..9288217e74 100644 --- a/core/src/main/java/feast/core/model/Store.java +++ b/core/src/main/java/feast/core/model/Store.java @@ -16,6 +16,9 @@ */ package feast.core.model; +import static feast.common.models.Store.convertStringToSubscription; +import static feast.common.models.Store.parseSubscriptionFrom; + import com.google.protobuf.InvalidProtocolBufferException; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.BigQueryConfig; @@ -71,7 +74,7 @@ public Store() { public static Store fromProto(StoreProto.Store storeProto) throws IllegalArgumentException { List subs = new ArrayList<>(); for (Subscription s : storeProto.getSubscriptionsList()) { - subs.add(convertSubscriptionToString(s)); + subs.add(parseSubscriptionFrom(s)); } byte[] config; switch (storeProto.getType()) { @@ -119,28 +122,17 @@ public StoreProto.Store toProto() throws InvalidProtocolBufferException { } } + /** + * Returns a List of Subscriptions. + * + * @return List of Subscription + */ public List getSubscriptions() { return Arrays.stream(subscriptions.split(",")) - .map(this::convertStringToSubscription) + .map(s -> convertStringToSubscription(s)) .collect(Collectors.toList()); } - private static String convertSubscriptionToString(Subscription sub) { - if (sub.getName().isEmpty() || sub.getProject().isEmpty()) { - throw new IllegalArgumentException( - String.format("Missing arguments in subscription string: %s", sub.toString())); - } - return String.format("%s:%s", sub.getProject(), sub.getName()); - } - - private Subscription convertStringToSubscription(String sub) { - if (sub.equals("")) { - return Subscription.newBuilder().build(); - } - String[] split = sub.split(":", 2); - return Subscription.newBuilder().setProject(split[0]).setName(split[1]).build(); - } - @Override public int hashCode() { return Objects.hash(this.name, this.type, this.subscriptions) ^ Arrays.hashCode(this.config); diff --git a/ingestion/pom.xml b/ingestion/pom.xml index 3eadcdca25..de50789a67 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -132,6 +132,12 @@ ${project.version} + + dev.feast + feast-common + ${project.version} + + com.google.auto.value auto-value-annotations diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowToStoreAllocator.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowToStoreAllocator.java index 48889773c4..aa101203e0 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowToStoreAllocator.java +++ b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowToStoreAllocator.java @@ -19,7 +19,7 @@ import static feast.ingestion.utils.SpecUtil.parseFeatureSetReference; import com.google.auto.value.AutoValue; -import feast.ingestion.utils.SpecUtil; +import feast.common.models.Store; import feast.proto.core.StoreProto; import feast.proto.types.FeatureRowProto; import java.util.List; @@ -75,7 +75,7 @@ public void process(ProcessContext c, @Element FeatureRowProto.FeatureRow row) { getStores().stream() .filter( s -> - SpecUtil.isSubscribedToFeatureSet( + Store.isSubscribedToFeatureSet( s.getSubscriptionsList(), projectAndSetNames.getLeft(), projectAndSetNames.getRight())) diff --git a/ingestion/src/main/java/feast/ingestion/transform/specs/FilterRelevantFunction.java b/ingestion/src/main/java/feast/ingestion/transform/specs/FilterRelevantFunction.java index cb2c84e2cd..1a97064350 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/specs/FilterRelevantFunction.java +++ b/ingestion/src/main/java/feast/ingestion/transform/specs/FilterRelevantFunction.java @@ -16,7 +16,7 @@ */ package feast.ingestion.transform.specs; -import feast.ingestion.utils.SpecUtil; +import feast.common.models.Store; import feast.proto.core.FeatureSetProto; import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; @@ -43,7 +43,7 @@ public Boolean apply(KV input) throws Ex return stores.stream() .anyMatch( s -> - SpecUtil.isSubscribedToFeatureSet( + Store.isSubscribedToFeatureSet( s.getSubscriptionsList(), input.getValue().getProject(), input.getValue().getName())) diff --git a/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java b/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java index 79c81c162d..e7ea04cd40 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java @@ -26,21 +26,15 @@ import feast.proto.core.IngestionJobProto.SpecsStreamingUpdateConfig; import feast.proto.core.SourceProto.Source; import feast.proto.core.StoreProto.Store; -import feast.proto.core.StoreProto.Store.Subscription; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.regex.Pattern; import org.apache.commons.lang3.tuple.Pair; public class SpecUtil { public static String PROJECT_DEFAULT_NAME = "default"; - public static String getFeatureSetReference(FeatureSetSpec featureSetSpec) { - return String.format("%s/%s", featureSetSpec.getProject(), featureSetSpec.getName()); - } - public static Pair parseFeatureSetReference(String reference) { String[] split = reference.split("/", 2); if (split.length == 1) { @@ -50,44 +44,6 @@ public static Pair parseFeatureSetReference(String reference) { } } - /** Get only feature set specs that matches the subscription */ - public static boolean isSubscribedToFeatureSet( - List subscriptions, String projectName, String featureSetName) { - - for (Subscription sub : subscriptions) { - // If configuration missing, fail - if (sub.getProject().isEmpty() || sub.getName().isEmpty()) { - throw new IllegalArgumentException( - String.format("Subscription is missing arguments: %s", sub.toString())); - } - - // If all wildcards, subscribe to everything - if (sub.getProject().equals("*") || sub.getName().equals("*")) { - return true; - } - - // Match project name - if (!projectName.equals(sub.getProject())) { - continue; - } - - // Convert wildcard to regex - String subName = sub.getName(); - if (!sub.getName().contains(".*")) { - subName = subName.replace("*", ".*"); - } - - // Match feature set name to pattern - Pattern pattern = Pattern.compile(subName); - if (!pattern.matcher(featureSetName).matches()) { - continue; - } - return true; - } - - return false; - } - public static List parseStoreJsonList(List jsonList) throws InvalidProtocolBufferException { List stores = new ArrayList<>(); diff --git a/ingestion/src/main/java/feast/ingestion/values/FeatureSet.java b/ingestion/src/main/java/feast/ingestion/values/FeatureSet.java index f49ab2f98a..7395762f33 100644 --- a/ingestion/src/main/java/feast/ingestion/values/FeatureSet.java +++ b/ingestion/src/main/java/feast/ingestion/values/FeatureSet.java @@ -16,7 +16,7 @@ */ package feast.ingestion.values; -import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; +import static feast.common.models.FeatureSet.getFeatureSetStringRef; import static feast.ingestion.utils.SpecUtil.getFieldsByName; import feast.proto.core.FeatureSetProto; @@ -35,7 +35,7 @@ public class FeatureSet implements Serializable { private final Map fields; public FeatureSet(FeatureSetProto.FeatureSetSpec featureSetSpec) { - this.reference = getFeatureSetReference(featureSetSpec); + this.reference = getFeatureSetStringRef(featureSetSpec); this.fields = getFieldsByName(featureSetSpec); } diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 47b5aa3f9b..7f0806bd66 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -16,7 +16,7 @@ */ package feast.ingestion; -import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; +import static feast.common.models.FeatureSet.getFeatureSetStringRef; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; @@ -238,7 +238,7 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() TestUtil.publishToKafka( KAFKA_BOOTSTRAP_SERVERS, KAFKA_SPECS_TOPIC, - ImmutableList.of(Pair.of(getFeatureSetReference(spec), spec)), + ImmutableList.of(Pair.of(getFeatureSetStringRef(spec), spec)), ByteArraySerializer.class, KAFKA_PUBLISH_TIMEOUT_SEC); TestUtil.publishToKafka( diff --git a/ingestion/src/test/java/feast/ingestion/transform/specs/FeatureSetSpecReadAndWriteTest.java b/ingestion/src/test/java/feast/ingestion/transform/specs/FeatureSetSpecReadAndWriteTest.java index b0cad59b5e..139d9b559a 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/specs/FeatureSetSpecReadAndWriteTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/specs/FeatureSetSpecReadAndWriteTest.java @@ -16,7 +16,7 @@ */ package feast.ingestion.transform.specs; -import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; +import static feast.common.models.FeatureSet.getFeatureSetStringRef; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; @@ -223,7 +223,7 @@ private void publishSpecToKafka( TestUtil.publishToKafka( KAFKA_BOOTSTRAP_SERVERS, KAFKA_SPECS_TOPIC, - ImmutableList.of(Pair.of(getFeatureSetReference(spec), spec)), + ImmutableList.of(Pair.of(getFeatureSetStringRef(spec), spec)), ByteArraySerializer.class, KAFKA_PUBLISH_TIMEOUT_SEC); } diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index 91a8b9ba7e..f3ae9f6a98 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -16,7 +16,7 @@ */ package feast.test; -import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; +import static feast.common.models.FeatureSet.getFeatureSetStringRef; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; @@ -220,7 +220,7 @@ public static FeatureRow createRandomFeatureRow( FeatureSetSpec featureSetSpec, int randomStringSize) { Builder builder = FeatureRow.newBuilder() - .setFeatureSet(getFeatureSetReference(featureSetSpec)) + .setFeatureSet(getFeatureSetStringRef(featureSetSpec)) .setEventTimestamp(Timestamps.fromMillis(System.currentTimeMillis())); featureSetSpec @@ -330,7 +330,7 @@ public static Value createRandomValue(ValueType.Enum type, int randomStringSize) */ public static RedisKey createRedisKey(FeatureSetSpec featureSetSpec, FeatureRow row) { RedisKey.Builder builder = - RedisKey.newBuilder().setFeatureSet(getFeatureSetReference(featureSetSpec)); + RedisKey.newBuilder().setFeatureSet(getFeatureSetStringRef(featureSetSpec)); featureSetSpec .getEntitiesList() .forEach( diff --git a/pom.xml b/pom.xml index 38ff13cf71..1bf07e5b4f 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ serving sdk/java docs/coverage/java + common diff --git a/sdk/java/pom.xml b/sdk/java/pom.xml index 7856afd47e..b52119f18e 100644 --- a/sdk/java/pom.xml +++ b/sdk/java/pom.xml @@ -28,6 +28,12 @@ ${project.version} + + dev.feast + feast-common + ${project.version} + + io.grpc diff --git a/sdk/java/src/main/java/com/gojek/feast/FeastClient.java b/sdk/java/src/main/java/com/gojek/feast/FeastClient.java index 11b4f352f6..00e0541e7c 100644 --- a/sdk/java/src/main/java/com/gojek/feast/FeastClient.java +++ b/sdk/java/src/main/java/com/gojek/feast/FeastClient.java @@ -16,6 +16,7 @@ */ package com.gojek.feast; +import feast.common.models.Feature; import feast.proto.serving.ServingAPIProto.FeatureReference; import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest; import feast.proto.serving.ServingAPIProto.GetFeastServingInfoResponse; @@ -150,7 +151,7 @@ public List getOnlineFeatures( // Strip project from string Feature References from returned from serving FeatureReference featureRef = RequestUtil.parseFeatureRef(fieldName, true).build(); - stripFieldName = RequestUtil.renderFeatureRef(featureRef); + stripFieldName = Feature.getFeatureStringRef(featureRef, true); row.set( stripFieldName, fieldValues.getFieldsMap().get(fieldName), diff --git a/sdk/java/src/main/java/com/gojek/feast/RequestUtil.java b/sdk/java/src/main/java/com/gojek/feast/RequestUtil.java index 3a1a0919d3..08a2a53573 100644 --- a/sdk/java/src/main/java/com/gojek/feast/RequestUtil.java +++ b/sdk/java/src/main/java/com/gojek/feast/RequestUtil.java @@ -85,21 +85,4 @@ public static FeatureReference.Builder parseFeatureRef( featureRef.setName(featureRefString); return featureRef; } - - /** - * Render a feature reference as string. - * - * @param featureReference to render as string - * @return string represenation of feature reference. - */ - public static String renderFeatureRef(FeatureReference featureReference) { - String refStr = ""; - // In protov3, unset string and int fields default to "" and 0 respectively - if (!featureReference.getFeatureSet().isEmpty()) { - refStr += featureReference.getFeatureSet() + ":"; - } - refStr = refStr + featureReference.getName(); - - return refStr; - } } diff --git a/sdk/java/src/test/java/com/gojek/feast/RequestUtilTest.java b/sdk/java/src/test/java/com/gojek/feast/RequestUtilTest.java index 6064311555..0b34a4d4bb 100644 --- a/sdk/java/src/test/java/com/gojek/feast/RequestUtilTest.java +++ b/sdk/java/src/test/java/com/gojek/feast/RequestUtilTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.TextFormat; +import feast.common.models.Feature; import feast.proto.serving.ServingAPIProto.FeatureReference; import java.util.Arrays; import java.util.Comparator; @@ -75,7 +76,9 @@ void renderFeatureRef_ShouldReturnFeatureRefString( .map(ref -> ref.toBuilder().clearProject().build()) .collect(Collectors.toList()); List actual = - input.stream().map(ref -> RequestUtil.renderFeatureRef(ref)).collect(Collectors.toList()); + input.stream() + .map(ref -> Feature.getFeatureStringRef(ref, true)) + .collect(Collectors.toList()); assertEquals(expected.size(), actual.size()); for (int i = 0; i < expected.size(); i++) { assertEquals(expected.get(i), actual.get(i)); diff --git a/serving/pom.xml b/serving/pom.xml index 8304899b51..66a840e009 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -116,6 +116,12 @@ ${project.version} + + dev.feast + feast-common + ${project.version} + + org.slf4j diff --git a/serving/src/main/java/feast/serving/service/OnlineServingService.java b/serving/src/main/java/feast/serving/service/OnlineServingService.java index 6d581f6c71..dbf15877c5 100644 --- a/serving/src/main/java/feast/serving/service/OnlineServingService.java +++ b/serving/src/main/java/feast/serving/service/OnlineServingService.java @@ -18,6 +18,8 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.Duration; +import feast.common.models.Feature; +import feast.common.models.FeatureSet; import feast.proto.serving.ServingAPIProto.*; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldStatus; @@ -27,7 +29,6 @@ import feast.proto.types.ValueProto.Value; import feast.serving.specs.CachedSpecService; import feast.serving.util.Metrics; -import feast.serving.util.RefUtil; import feast.storage.api.retriever.FeatureSetRequest; import feast.storage.api.retriever.OnlineRetriever; import io.grpc.Status; @@ -174,7 +175,7 @@ private static Map unpackValueMap( Collectors.toMap( featureRowField -> { FeatureReference featureRef = nameRefMap.get(featureRowField.getName()); - return RefUtil.generateFeatureStringRef(featureRef); + return Feature.getFeatureStringRef(featureRef, false); }, featureRowField -> { // drop feature values with an age outside feature set's max age. @@ -187,7 +188,7 @@ private static Map unpackValueMap( // create empty values for features specified in request but not present in feature row. Set missingFeatures = nameRefMap.values().stream() - .map(ref -> RefUtil.generateFeatureStringRef(ref)) + .map(ref -> Feature.getFeatureStringRef(ref, false)) .collect(Collectors.toSet()); missingFeatures.removeAll(valueMap.keySet()); missingFeatures.forEach(refString -> valueMap.put(refString, Value.newBuilder().build())); @@ -264,7 +265,7 @@ private void logFeatureRowsTrace( FeatureRow.Builder nullFeatureRowBuilder = FeatureRow.newBuilder() .setFeatureSet( - RefUtil.generateFeatureSetStringRef(featureSetRequest.getSpec())); + FeatureSet.getFeatureSetStringRef(featureSetRequest.getSpec())); for (FeatureReference featureReference : featureSetRequest.getFeatureReferences()) { nullFeatureRowBuilder.addFields( diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index 07b0b8bbbd..ed5458d974 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -16,8 +16,8 @@ */ package feast.serving.specs; -import static feast.serving.util.RefUtil.generateFeatureSetStringRef; -import static feast.serving.util.RefUtil.generateFeatureStringRef; +import static feast.common.models.Feature.getFeatureStringRef; +import static feast.common.models.FeatureSet.getFeatureSetStringRef; import static java.util.stream.Collectors.groupingBy; import com.google.common.cache.CacheBuilder; @@ -117,18 +117,18 @@ public List getFeatureSets(List featureRefe featureReference -> { // map feature reference to coresponding feature set name String fsName = - featureToFeatureSetMapping.get(generateFeatureStringRef(featureReference)); + featureToFeatureSetMapping.get(getFeatureStringRef(featureReference, false)); if (fsName == null) { throw new SpecRetrievalException( String.format( "Unable to find Feature Set for the given Feature Reference: %s", - generateFeatureStringRef(featureReference))); + getFeatureStringRef(featureReference, false))); } else if (fsName == FEATURE_SET_CONFLICT_FLAG) { throw new SpecRetrievalException( String.format( "Given Feature Reference is amibigous as it matches multiple Feature Sets: %s." + "Please specify a more specific Feature Reference (ie specify the project or feature set)", - generateFeatureStringRef(featureReference))); + getFeatureStringRef(featureReference, false))); } return Pair.of(fsName, featureReference); }) @@ -206,7 +206,7 @@ private Map getFeatureSetMap() { for (FeatureSet featureSet : featureSetsResponse.getFeatureSetsList()) { FeatureSetSpec spec = featureSet.getSpec(); - featureSets.put(generateFeatureSetStringRef(spec), spec); + featureSets.put(getFeatureSetStringRef(spec), spec); } } catch (StatusRuntimeException e) { throw new RuntimeException( @@ -291,6 +291,6 @@ private Pair generateFeatureToFeatureSetMapping( featureRef = featureRef.clearFeatureSet(); } return Pair.of( - generateFeatureStringRef(featureRef.build()), generateFeatureSetStringRef(featureSetSpec)); + getFeatureStringRef(featureRef.build(), false), getFeatureSetStringRef(featureSetSpec)); } }