From a8d282d3e4f041824ef7479f22c306dbfb8ad569 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Mon, 13 Jun 2022 13:18:09 -0700 Subject: [PATCH] fix: Fix SparkKafkaProcessor `query_timeout` parameter (#2789) Signed-off-by: Felix Wang --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 4 +++- sdk/python/feast/infra/contrib/stream_processor.py | 2 +- sdk/python/feast/stream_feature_view.py | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 1e228714c8..57361e5a18 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -17,6 +17,8 @@ class SparkProcessorConfig(ProcessorConfig): spark_session: SparkSession + processing_time: str + query_timeout: int class SparkKafkaProcessor(StreamProcessor): @@ -31,7 +33,7 @@ def __init__( config: ProcessorConfig, write_function: MethodType, processing_time: str = "30 seconds", - query_timeout: str = "15 seconds", + query_timeout: int = 15, ): if not isinstance(sfv.stream_source, KafkaSource): raise ValueError("data source is not kafka source") diff --git a/sdk/python/feast/infra/contrib/stream_processor.py b/sdk/python/feast/infra/contrib/stream_processor.py index cb44b99cd6..2ccf1e59f8 100644 --- a/sdk/python/feast/infra/contrib/stream_processor.py +++ b/sdk/python/feast/infra/contrib/stream_processor.py @@ -81,7 +81,7 @@ def get_stream_processor_object( if config.mode == "spark" and config.source == "kafka": stream_processor = STREAM_PROCESSOR_CLASS_FOR_TYPE[("spark", "kafka")] module_name, class_name = stream_processor.rsplit(".", 1) - cls = import_class(module_name, class_name, "Processor") + cls = import_class(module_name, class_name, "StreamProcessor") return cls(sfv=sfv, config=config, write_function=write_function,) else: raise ValueError("other processors besides spark-kafka not supported") diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 214ab083ab..3bd525596b 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -132,10 +132,12 @@ def __eq__(self, other): if not super().__eq__(other): return False + if not self.udf: return not other.udf if not other.udf: return False + if ( self.mode != other.mode or self.timestamp_field != other.timestamp_field