Skip to content

Commit

Permalink
merge from master
Browse files Browse the repository at this point in the history
  • Loading branch information
achals committed Jun 13, 2022
2 parents 8d939f7 + a49cc35 commit b789593
Show file tree
Hide file tree
Showing 17 changed files with 495 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ formats:
- pdf

python:
version: 3.7
version: "3.8"
install:
- requirements: sdk/python/docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@
import com.google.auto.value.AutoValue;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.protobuf.Empty;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import io.grpc.Status.Code;
import java.lang.reflect.Type;
import java.util.UUID;

/** MessageAuditLogEntry records the handling of a Protobuf message by a service call. */
Expand Down Expand Up @@ -103,20 +100,17 @@ public String toJSON() {
new GsonBuilder()
.registerTypeAdapter(
Message.class,
new JsonSerializer<Message>() {
@Override
public JsonElement serialize(
Message message, Type type, JsonSerializationContext context) {
try {
String messageJSON = JsonFormat.printer().print(message);
return new JsonParser().parse(messageJSON);
} catch (InvalidProtocolBufferException e) {

throw new RuntimeException(
"Unexpected exception converting Protobuf to JSON", e);
}
}
})
(JsonSerializer<Message>)
(message, type, context) -> {
try {
String messageJSON = JsonFormat.printer().print(message);
return new JsonParser().parse(messageJSON);
} catch (InvalidProtocolBufferException e) {

throw new RuntimeException(
"Unexpected exception converting Protobuf to JSON", e);
}
})
.create();
return gson.toJson(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* GrpcMessageInterceptor assumes that all service calls are unary (ie single request/response).
*/
public class GrpcMessageInterceptor implements ServerInterceptor {
private LoggingProperties loggingProperties;
private final LoggingProperties loggingProperties;

/**
* Construct GrpcMessageIntercetor.
Expand Down Expand Up @@ -78,7 +78,7 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(

// Register forwarding call to intercept outgoing response and log to audit log
call =
new SimpleForwardingServerCall<ReqT, RespT>(call) {
new SimpleForwardingServerCall<>(call) {
@Override
public void sendMessage(RespT message) {
// 2. Track the response & Log entry to audit logger
Expand All @@ -97,7 +97,7 @@ public void close(Status status, Metadata trailers) {
};

ServerCall.Listener<ReqT> listener = next.startCall(call, headers);
return new SimpleForwardingServerCallListener<ReqT>(listener) {
return new SimpleForwardingServerCallListener<>(listener) {
@Override
// Register listener to intercept incoming request messages and log to audit log
public void onMessage(ReqT message) {
Expand Down
4 changes: 1 addition & 3 deletions java/sdk/src/main/java/dev/feast/RequestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ public static List<FeatureReferenceV2> createFeatureRefs(List<String> featureRef
}

List<FeatureReferenceV2> featureRefs =
featureRefStrings.stream()
.map(refStr -> parseFeatureRef(refStr))
.collect(Collectors.toList());
featureRefStrings.stream().map(RequestUtil::parseFeatureRef).collect(Collectors.toList());

return featureRefs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Optional;

public class LocalRegistryFile implements RegistryFile {
private RegistryProto.Registry cachedRegistry;
private final RegistryProto.Registry cachedRegistry;

public LocalRegistryFile(String path) {
try {
Expand Down
13 changes: 8 additions & 5 deletions java/serving/src/main/java/feast/serving/registry/Registry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package feast.serving.registry;

import feast.proto.core.*;
import feast.proto.core.FeatureServiceProto.FeatureService;
import feast.proto.core.FeatureViewProto.FeatureView;
import feast.proto.core.OnDemandFeatureViewProto.OnDemandFeatureView;
import feast.proto.serving.ServingAPIProto;
import feast.serving.exception.SpecRetrievalException;
import java.util.List;
Expand All @@ -26,24 +29,24 @@

public class Registry {
private final RegistryProto.Registry registry;
private Map<String, FeatureViewProto.FeatureViewSpec> featureViewNameToSpec;
private final Map<String, FeatureViewProto.FeatureViewSpec> featureViewNameToSpec;
private Map<String, OnDemandFeatureViewProto.OnDemandFeatureViewSpec>
onDemandFeatureViewNameToSpec;
private Map<String, FeatureServiceProto.FeatureServiceSpec> featureServiceNameToSpec;
private final Map<String, FeatureServiceProto.FeatureServiceSpec> featureServiceNameToSpec;

Registry(RegistryProto.Registry registry) {
this.registry = registry;
List<FeatureViewProto.FeatureViewSpec> featureViewSpecs =
registry.getFeatureViewsList().stream()
.map(fv -> fv.getSpec())
.map(FeatureView::getSpec)
.collect(Collectors.toList());
this.featureViewNameToSpec =
featureViewSpecs.stream()
.collect(
Collectors.toMap(FeatureViewProto.FeatureViewSpec::getName, Function.identity()));
List<OnDemandFeatureViewProto.OnDemandFeatureViewSpec> onDemandFeatureViewSpecs =
registry.getOnDemandFeatureViewsList().stream()
.map(odfv -> odfv.getSpec())
.map(OnDemandFeatureView::getSpec)
.collect(Collectors.toList());
this.onDemandFeatureViewNameToSpec =
onDemandFeatureViewSpecs.stream()
Expand All @@ -53,7 +56,7 @@ public class Registry {
Function.identity()));
this.featureServiceNameToSpec =
registry.getFeatureServicesList().stream()
.map(fs -> fs.getSpec())
.map(FeatureService::getSpec)
.collect(
Collectors.toMap(
FeatureServiceProto.FeatureServiceSpec::getName, Function.identity()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ public void processTransformFeaturesResponse(
} catch (IOException e) {
log.info(e.toString());
throw Status.INTERNAL
.withDescription(
"Unable to correctly process transform features response: " + e.toString())
.withDescription("Unable to correctly process transform features response: " + e)
.asRuntimeException();
}
}
Expand All @@ -249,11 +248,10 @@ public void processTransformFeaturesResponse(
public ValueType serializeValuesIntoArrowIPC(List<Pair<String, List<ValueProto.Value>>> values) {
// In order to be serialized correctly, the data must be packaged in a VectorSchemaRoot.
// We first construct all the columns.
Map<String, FieldVector> columnNameToColumn = new HashMap<String, FieldVector>();
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

List<Field> columnFields = new ArrayList<Field>();
List<FieldVector> columns = new ArrayList<FieldVector>();
List<Field> columnFields = new ArrayList<>();
List<FieldVector> columns = new ArrayList<>();

for (Pair<String, List<ValueProto.Value>> columnEntry : values) {
// The Python FTS does not expect full feature names, so we extract the feature name.
Expand Down Expand Up @@ -316,8 +314,7 @@ public ValueType serializeValuesIntoArrowIPC(List<Pair<String, List<ValueProto.V
} catch (IOException e) {
log.info(e.toString());
throw Status.INTERNAL
.withDescription(
"ArrowFileWriter could not write properly; failed with error: " + e.toString())
.withDescription("ArrowFileWriter could not write properly; failed with error: " + e)
.asRuntimeException();
}
byte[] byteData = out.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public ValueProto.Value getFeatureValue(ValueProto.ValueType.Enum valueType) {
.build();
break;
default:
throw new RuntimeException("FeatureType is not supported");
throw new RuntimeException(
String.format("FeatureType %s is not supported", valueType.name()));
}
} catch (ClassCastException e) {
// Feature type has changed
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
-e ".[ci]"
-e ".[docs]"
2 changes: 1 addition & 1 deletion sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ def __hash__(self):
@staticmethod
def from_proto(data_source: DataSourceProto):
watermark = None
if data_source.kafka_options.HasField("watermark"):
if data_source.kafka_options.watermark:
watermark = (
timedelta(days=0)
if data_source.kafka_options.watermark.ToNanoseconds() == 0
Expand Down
137 changes: 137 additions & 0 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from types import MethodType
from typing import List

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col, from_json

from feast.data_format import AvroFormat, JsonFormat
from feast.data_source import KafkaSource
from feast.infra.contrib.stream_processor import (
ProcessorConfig,
StreamProcessor,
StreamTable,
)
from feast.stream_feature_view import StreamFeatureView


class SparkProcessorConfig(ProcessorConfig):
spark_session: SparkSession


class SparkKafkaProcessor(StreamProcessor):
spark: SparkSession
format: str
write_function: MethodType
join_keys: List[str]

def __init__(
self,
sfv: StreamFeatureView,
config: ProcessorConfig,
write_function: MethodType,
processing_time: str = "30 seconds",
query_timeout: str = "15 seconds",
):
if not isinstance(sfv.stream_source, KafkaSource):
raise ValueError("data source is not kafka source")
if not isinstance(
sfv.stream_source.kafka_options.message_format, AvroFormat
) and not isinstance(
sfv.stream_source.kafka_options.message_format, JsonFormat
):
raise ValueError(
"spark streaming currently only supports json or avro format for kafka source schema"
)

self.format = (
"json"
if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat)
else "avro"
)

if not isinstance(config, SparkProcessorConfig):
raise ValueError("config is not spark processor config")
self.spark = config.spark_session
self.write_function = write_function
self.processing_time = processing_time
self.query_timeout = query_timeout
super().__init__(sfv=sfv, data_source=sfv.stream_source)

def ingest_stream_feature_view(self) -> None:
ingested_stream_df = self._ingest_stream_data()
transformed_df = self._construct_transformation_plan(ingested_stream_df)
online_store_query = self._write_to_online_store(transformed_df)
return online_store_query

def _ingest_stream_data(self) -> StreamTable:
"""Only supports json and avro formats currently."""
if self.format == "json":
if not isinstance(
self.data_source.kafka_options.message_format, JsonFormat
):
raise ValueError("kafka source message format is not jsonformat")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_json(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
)
else:
if not isinstance(
self.data_source.kafka_options.message_format, AvroFormat
):
raise ValueError("kafka source message format is not avro format")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_avro(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
)
return stream_df

def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
return self.sfv.udf.__call__(df) if self.sfv.udf else df

def _write_to_online_store(self, df: StreamTable):
# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
def batch_write(row: DataFrame, batch_id: int):
pd_row = row.toPandas()
self.write_function(
pd_row, input_timestamp="event_timestamp", output_timestamp=""
)

query = (
df.writeStream.outputMode("update")
.option("checkpointLocation", "/tmp/checkpoint/")
.trigger(processingTime=self.processing_time)
.foreachBatch(batch_write)
.start()
)

query.awaitTermination(timeout=self.query_timeout)
return query
Loading

0 comments on commit b789593

Please sign in to comment.