Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ANSI intervals to/from Parquet #4810

Merged

Conversation

res-life
Copy link
Collaborator

@res-life res-life commented Feb 17, 2022

Closes #4145

SPARK-36825 added the read and write functions for interval data types(YearMonthIntervalType and DayTimeIntervalType).
Support ANSI intervals to/from Parquet.

Pyspark 330 only contains DayTimeIntervalType in 'pyspark.sql.types', not has YearMonthIntervalType, so python tests only tested DayTimeIntervalType.

Filed #4811 for the following features.

Signed-off-by: Chong Gao res_life@163.com

Signed-off-by: Chong Gao <res_life@163.com>
@res-life
Copy link
Collaborator Author

build

@revans2
Copy link
Collaborator

revans2 commented Feb 17, 2022

Is there no special metadata for these types? Are they really just stored as ints and longs and Spark can understand that?

@sameerz sameerz added the audit_3.3.0 Audit related tasks for 3.3.0 label Feb 18, 2022
@sameerz sameerz added this to the Feb 14 - Feb 25 milestone Feb 18, 2022
@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

Spark uses int32 representing YearMonth and int64 representing DayTime.
And also Spark saves ANSI intervals as int32 and int64 for Parquet file.
See parquet schema test case: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala#L2194toL2210

  testCatalystToParquet(
    "SPARK-36825: Year-month interval written and read as INT32",
    StructType(Seq(StructField("f1", YearMonthIntervalType()))),
    """message root {
      |  optional INT32 f1;
      |}
    """.stripMargin,
    writeLegacyParquetFormat = false)

See Spark TimeAdd(Timestamp, DayTimeIntervalType) code:
Spark uses asInstanceOf[Long] get the timestampAddDayTime interval value
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L1576toL1582

  override def nullSafeEval(start: Any, interval: Any): Any = right.dataType match {
    case _: DayTimeIntervalType =>
      timestampAddDayTime(start.asInstanceOf[Long], interval.asInstanceOf[Long], zoneIdInEval)
  }

Rapids plugin also uses intvlS.getValue.asInstanceOf[Long] to get the interval Scala value:
https://github.com/NVIDIA/spark-rapids/blob/branch-22.04/sql-plugin/src/main/320+/scala/org/apache/spark/sql/rapids/shims/v2/datetimeExpressions.scala#L77

              case _: DayTimeIntervalType =>
                // Scalar does not support 'DayTimeIntervalType' now, so use
                // the Scala value instead.
                intvlS.getValue.asInstanceOf[Long]

@revans2
Copy link
Collaborator

revans2 commented Feb 18, 2022

Okay spark is storing it as ints and longs. I still would like to see documentation on it, and where possible use an interval type if available. Just because I see us having to bit cast the column into corresponding interval type more frequently than not.

@res-life res-life marked this pull request as draft February 22, 2022 13:52
@res-life
Copy link
Collaborator Author

Cudf supports Timestamp + Duration, no need to convert timestamp to long by bitCastTo again.
I've updated GpuTimeAdd by using Cudf operator +(Timestamp, Duration).
Now the DayTimeInterval is stored as long internally:
    Spark Parquet saves it as long, and Spark saves it as long internally, this can keep consistent.
    And we can't benifit from storing it as Cudf Duration. 
    If stores as Duration, we should bitCastTo the long cv to Duration cv after reading the Parquet long column.
        For a sql "select timestamp_column + interval_column", 
        the flow is:  
            parquet read interval_column as long  
            bitCastTo long to Duration // this can be added in GpuParquetScan by updating the evolveSchema code
            compute (Timestamp + Duration) // in GpuDateAdd
    If stores as long
        the flow is amolst the same, just the bitCastTo is in GpuDateAdd
            parquet read interval_column as long  
            bitCastTo long to Duration // this is in GpuDateAdd
            compute (Timestamp + Duration) // in GpuDateAdd
YEAR-MONTH interval is calculated by the formula:  -/+ (12 * YEAR + MONTH)
YEAR-MONTH interval is the number of months, the "interval 01-01 year to month" is stored as 13.
The Cudf does not support Duration month, Add month is not simple in Java code, seems Cudf does not suport.
So removed the support YEAR-MONTH interval.

@res-life
Copy link
Collaborator Author

build

@res-life res-life marked this pull request as ready for review February 23, 2022 10:44
@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DayTimeIntervalType.scala#L45toL51

case class DayTimeIntervalType(startField: Byte, endField: Byte) extends AnsiIntervalType {
  /**
   * Internally, values of day-time intervals are stored in `Long` values as amount of time in terms
   * of microseconds that are calculated by the formula:
   *   -/+ (24*60*60 * DAY + 60*60 * HOUR + 60 * MINUTE + SECOND) * 1000000
   */
  private[sql] type InternalType = Long

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/YearMonthIntervalType.scala#L43toL49

case class YearMonthIntervalType(startField: Byte, endField: Byte) extends AnsiIntervalType {
  /**
   * Internally, values of year-month intervals are stored in `Int` values as amount of months
   * that are calculated by the formula:
   *   -/+ (12 * YEAR + MONTH)
   */
  private[sql] type InternalType = Int

@res-life
Copy link
Collaborator Author

build

@sameerz sameerz added this to the Feb 28 - Mar 18 milestone Feb 26, 2022
@res-life
Copy link
Collaborator Author

build

1 similar comment
@res-life
Copy link
Collaborator Author

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good.

@res-life
Copy link
Collaborator Author

res-life commented Mar 7, 2022

build

1 similar comment
@res-life
Copy link
Collaborator Author

res-life commented Mar 7, 2022

build

@res-life
Copy link
Collaborator Author

res-life commented Mar 7, 2022

build

Copy link
Collaborator

@firestarman firestarman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits I can live without. For others LGTM.

@res-life
Copy link
Collaborator Author

res-life commented Mar 7, 2022

build

@res-life
Copy link
Collaborator Author

res-life commented Mar 7, 2022

build

@res-life
Copy link
Collaborator Author

res-life commented Mar 7, 2022

@revans2 Help review again.

revans2
revans2 previously approved these changes Mar 7, 2022
@res-life
Copy link
Collaborator Author

res-life commented Mar 8, 2022

build

@res-life res-life merged commit fbb2f07 into NVIDIA:branch-22.04 Mar 8, 2022
@res-life res-life deleted the support-ansi-intervals-for-parquet branch March 13, 2022 05:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
audit_3.3.0 Audit related tasks for 3.3.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Audit][FEA][SPARK-36825] Support ANSI intervals to/from Parquet
4 participants