From 732771469907a2f129bd950c9eef80e1c8c34e7e Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 28 Jan 2021 11:38:56 +0800 Subject: [PATCH 1/2] use data partitioning column in FileSource Signed-off-by: Oleksii Moskalenko --- .../feast/ingestion/sources/file/FileReader.scala | 12 ++++++++++-- .../test/scala/feast/ingestion/BatchPipelineIT.scala | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala index 5bad029b5b..99660cc5ac 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala @@ -16,7 +16,7 @@ */ package feast.ingestion.sources.file -import java.sql.Timestamp +import java.sql.{Timestamp, Date} import feast.ingestion.FileSource import org.apache.spark.sql.functions.col @@ -30,9 +30,17 @@ object FileReader { start: DateTime, end: DateTime ): DataFrame = { - sqlContext.read + val reader = sqlContext.read .parquet(source.path) .filter(col(source.eventTimestampColumn) >= new Timestamp(start.getMillis)) .filter(col(source.eventTimestampColumn) < new Timestamp(end.getMillis)) + + if (source.datePartitionColumn.nonEmpty) { + reader + .filter(col(source.datePartitionColumn.get) >= new Date(start.getMillis)) + .filter(col(source.datePartitionColumn.get) <= new Date(end.getMillis)) + } else { + reader + } } } diff --git a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala index 0e9a96eb42..e0f57f6f0d 100644 --- a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala +++ b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala @@ -110,7 +110,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer { val rows = generateDistinctRows(gen, 10000, groupByEntity) val tempPath = storeAsParquet(sparkSession, rows) val configWithOfflineSource = config.copy( - source = FileSource(tempPath, Map.empty, "eventTimestamp") + source = FileSource(tempPath, Map.empty, "eventTimestamp", datePartitionColumn = Some("date")) ) BatchPipeline.createPipeline(sparkSession, configWithOfflineSource) From 8e2453ad028c90a1a1746d392ce348e6818d809c Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 28 Jan 2021 12:13:41 +0800 Subject: [PATCH 2/2] check column non empty Signed-off-by: Oleksii Moskalenko --- .../feast/ingestion/sources/file/FileReader.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala index 99660cc5ac..55d5c901ea 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala @@ -35,12 +35,12 @@ object FileReader { .filter(col(source.eventTimestampColumn) >= new Timestamp(start.getMillis)) .filter(col(source.eventTimestampColumn) < new Timestamp(end.getMillis)) - if (source.datePartitionColumn.nonEmpty) { - reader - .filter(col(source.datePartitionColumn.get) >= new Date(start.getMillis)) - .filter(col(source.datePartitionColumn.get) <= new Date(end.getMillis)) - } else { - reader + source.datePartitionColumn match { + case Some(partitionColumn) if partitionColumn.nonEmpty => + reader + .filter(col(partitionColumn) >= new Date(start.getMillis)) + .filter(col(partitionColumn) <= new Date(end.getMillis)) + case _ => reader } } }