Skip to content

Commit

Permalink
[SPARK-36034][SQL][3.0] Rebase datetime in pushed down filters to par…
Browse files Browse the repository at this point in the history
…quet

### What changes were proposed in this pull request?
In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib.

Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values.

After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description #28067 shows the diffs between two calendars.

### Why are the changes needed?
The changes fix the bug portrayed by the following example from SPARK-36034:
```scala
In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
>>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
>>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show()
+----+
|date|
+----+
+----+
```
The result must have the date value `0001-01-01`.

### Does this PR introduce _any_ user-facing change?
In some sense, yes. Query results can be different in some cases. For the example above:
```scala
scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false)
+----------+
|date      |
+----------+
|0001-01-01|
+----------+
```

### How was this patch tested?
By running the modified test suite `ParquetFilterSuite`:
```
$ build/sbt "test:testOnly *ParquetV1FilterSuite"
$ build/sbt "test:testOnly *ParquetV2FilterSuite"
```

Authored-by: Max Gekk <max.gekkgmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223apache.org>
(cherry picked from commit b09b7f7)
(cherry picked from commit ba71172)

Closes #33387 from MaxGekk/fix-parquet-ts-filter-pushdown-3.0.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
MaxGekk authored and HyukjinKwon committed Jul 16, 2021
1 parent 86a9059 commit e41ecef
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,21 @@ class ParquetFileFormat

lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand All @@ -300,10 +310,6 @@ class ParquetFileFormat
None
}

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))

val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._

import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros}
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -48,7 +50,8 @@ class ParquetFilters(
pushDownDecimal: Boolean,
pushDownStartWith: Boolean,
pushDownInFilterThreshold: Int,
caseSensitive: Boolean) {
caseSensitive: Boolean,
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
// A map which contains parquet field name and data type, if predicate push down applies.
//
// Each key in `nameToParquetField` represents a column; `dots` are used as separators for
Expand Down Expand Up @@ -124,14 +127,26 @@ class ParquetFilters(
private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null)
private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null)

private def dateToDays(date: Any): SQLDate = date match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
private def dateToDays(date: Any): SQLDate = {
val gregorianDays = date match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
}
datetimeRebaseMode match {
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays)
case _ => gregorianDays
}
}

private def timestampToMicros(v: Any): JLong = v match {
case i: Instant => DateTimeUtils.instantToMicros(i)
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
private def timestampToMicros(v: Any): JLong = {
val gregorianMicros = v match {
case i: Instant => DateTimeUtils.instantToMicros(i)
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
}
datetimeRebaseMode match {
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros)
case _ => gregorianMicros
}
}

private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,21 @@ case class ParquetPartitionReaderFactory(

lazy val footerFileMetaData =
ParquetFileReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand Down Expand Up @@ -171,9 +181,6 @@ case class ParquetPartitionReaderFactory(
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
val reader = buildReaderFunc(
split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, datetimeRebaseMode)
reader.initialize(split, hadoopAttemptContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter}
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -51,8 +52,17 @@ case class ParquetScanBuilder(
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetSchema =
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema)
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
// The rebase mode doesn't matter here because the filters are used to determine
// whether they is convertible.
LegacyBehaviorPolicy.CORRECTED)
parquetFilters.convertibleFilters(this.filters).toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.tags.ExtendedSQLTest
Expand Down Expand Up @@ -70,11 +72,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared

protected def createParquetFilters(
schema: MessageType,
caseSensitive: Option[Boolean] = None): ParquetFilters =
caseSensitive: Option[Boolean] = None,
datetimeRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED
): ParquetFilters =
new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
conf.parquetFilterPushDownInFilterThreshold,
caseSensitive.getOrElse(conf.caseSensitiveAnalysis))
caseSensitive.getOrElse(conf.caseSensitiveAnalysis),
datetimeRebaseMode)

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down Expand Up @@ -548,97 +553,102 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
def date: Date = Date.valueOf(s)
}

val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21")
import testImplicits._

Seq(false, true).foreach { java8Api =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
implicit val df: DataFrame = inputDF
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) {
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
implicit val df: DataFrame = inputDF

def resultFun(dateStr: String): Any = {
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
fun(parsed)
}

def resultFun(dateStr: String): Any = {
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
fun(parsed)
val dateAttr: Expression = df(colName).expr
assert(df(colName).expr.dataType === DateType)

checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr === "1000-01-01".date, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr <=> "1000-01-01".date, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr =!= "1000-01-01".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(dateAttr <= "1000-01-01".date, classOf[LtEq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, classOf[LtEq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21"))))
}

val dateAttr: Expression = df(colName).expr
assert(df(colName).expr.dataType === DateType)

checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))

checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
resultFun("2018-03-21"))

checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
}
}
}
}

test("filter pushdown - timestamp") {
Seq(true, false).foreach { java8Api =>
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
val millisData = Seq(
"1000-06-14 08:28:53.123",
"1582-06-15 08:28:53.001",
"1900-06-16 08:28:53.0",
"2018-06-17 08:28:53.999")
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MILLIS.toString) {
testTimestampPushdown(millisData, java8Api)
}

// spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
val microsData = Seq(
"1000-06-14 08:28:53.123456",
"1582-06-15 08:28:53.123456",
"1900-06-16 08:28:53.123456",
"2018-06-17 08:28:53.123456")
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MICROS.toString) {
testTimestampPushdown(microsData, java8Api)
}

// spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.INT96.toString) {
// INT96 doesn't support pushdown
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) {
import testImplicits._
withTempPath { file =>
millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF
Expand Down

0 comments on commit e41ecef

Please sign in to comment.