Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for timeParserPolicy=LEGACY #2875

Merged
merged 8 commits into from
Jul 9, 2021
34 changes: 30 additions & 4 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,12 @@ the specified format string will fall into one of three categories:
- Supported on GPU but may produce different results to Spark
- Unsupported on GPU

The formats which are supported on GPU and 100% compatible with Spark are :
The formats which are supported on GPU vary depending on the setting for `timeParserPolicy`.

### CORRECTED and EXCEPTION timeParserPolicy

With timeParserPolicy set to `CORRECTED` or `EXCEPTION` (the default), the following formats are supported
on the GPU without requiring any additional settings.

- `dd/MM/yyyy`
- `yyyy/MM`
Expand All @@ -366,10 +371,11 @@ The formats which are supported on GPU and 100% compatible with Spark are :
- `dd-MM`
- `dd/MM`

Examples of supported formats that may produce different results are:
Valid Spark date/time formats that do not appear in the list above may also be supported but have not been
extensively tested and may produce different results compared to the CPU. Known issues include:

- Trailing characters (including whitespace) may return a non-null value on GPU and Spark will
return null
- Valid dates and timestamps followed by trailing characters (including whitespace) may be parsed to non-null
values on GPU where Spark would treat the data as invalid and return null

To attempt to use other formats on the GPU, set
[`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled)
Expand All @@ -391,6 +397,26 @@ Formats that contain any of the following words are unsupported and will fall ba
"d", "S", "SS", "SSS", "SSSS", "SSSSS", "SSSSSSSSS", "SSSSSSS", "SSSSSSSS"
```

### LEGACY timeParserPolicy

With timeParserPolicy set to `LEGACY` and
[`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled)
set to `true`, and `spark.sql.ansi.enabled` set to `false`, the following formats are supported but not
guaranteed to produce the same results as the CPU:

- `dd-MM-yyyy`
- `dd/MM/yyyy`
- `yyyy/MM/dd`
- `yyyy-MM-dd`
- `yyyy/MM/dd HH:mm:ss`
- `yyyy-MM-dd HH:mm:ss`

LEGACY timeParserPolicy support has the following limitations when running on the GPU:

- Only 4 digit years are supported
- The proleptic Gregorian calendar is used instead of the hybrid Julian+Gregorian calender
that Spark uses in legacy mode

## Formatting dates and timestamps as strings

When formatting dates and timestamps as strings using functions such as `from_unixtime`, only a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPoli
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.CalendarInterval
Expand Down Expand Up @@ -398,19 +399,44 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi
case Some(rightLit) =>
sparkFormat = rightLit
if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) {
willNotWorkOnGpu("legacyTimeParserPolicy LEGACY is not supported")
try {
// try and convert the format to cuDF format - this will throw an exception if
// the format contains unsupported characters or words
strfFormat = DateUtils.toStrf(sparkFormat,
expr.left.dataType == DataTypes.StringType)
// format parsed ok but we have no 100% compatible formats in LEGACY mode
if (GpuToTimestamp.LEGACY_COMPATIBLE_FORMATS.contains(sparkFormat)) {
// LEGACY support has a number of issues that mean we cannot guarantee
// compatibility with CPU
// - we can only support 4 digit years but Spark supports a wider range
// - we use a proleptic Gregorian calender but Spark uses a hybrid Julian+Gregorian
// calender in LEGACY mode
if (SQLConf.get.ansiEnabled) {
willNotWorkOnGpu("LEGACY format in ANSI mode is not supported on the GPU")
} else if (!conf.incompatDateFormats) {
willNotWorkOnGpu(s"LEGACY format '$sparkFormat' on the GPU is not guaranteed " +
s"to produce the same results as Spark on CPU. Set " +
s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.")
}
} else {
willNotWorkOnGpu(s"LEGACY format '$sparkFormat' is not supported on the GPU.")
}
} catch {
case e: TimestampFormatConversionException =>
willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}")
}
} else {
try {
// try and convert the format to cuDF format - this will throw an exception if
// the format contains unsupported characters or words
strfFormat = DateUtils.toStrf(sparkFormat,
expr.left.dataType == DataTypes.StringType)
// format parsed ok, so it is either compatible (tested/certified) or incompatible
if (!GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat) &&
if (!GpuToTimestamp.CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat) &&
!conf.incompatDateFormats) {
willNotWorkOnGpu(s"format '$sparkFormat' on the GPU is not guaranteed " +
willNotWorkOnGpu(s"CORRECTED format '$sparkFormat' on the GPU is not guaranteed " +
s"to produce the same results as Spark on CPU. Set " +
s"spark.rapids.sql.incompatibleDateFormats.enabled=true to force onto GPU.")
s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.")
}
} catch {
case e: TimestampFormatConversionException =>
Expand All @@ -430,8 +456,10 @@ object ExceptionTimeParserPolicy extends TimeParserPolicy
object CorrectedTimeParserPolicy extends TimeParserPolicy

object GpuToTimestamp extends Arm {
/** We are compatible with Spark for these formats */
val COMPATIBLE_FORMATS = Seq(
// We are compatible with Spark for these formats when the timeParserPolicy is CORRECTED
// or EXCEPTION. It is possible that other formats may be supported but these are the only
// ones that we have tests for.
val CORRECTED_COMPATIBLE_FORMATS = Seq(
"yyyy-MM-dd",
"yyyy-MM",
"yyyy/MM/dd",
Expand All @@ -444,6 +472,61 @@ object GpuToTimestamp extends Arm {
"dd/MM"
)

// We are compatible with Spark for these formats when the timeParserPolicy is LEGACY. It
// is possible that other formats may be supported but these are the only ones that we have
// tests for.
val LEGACY_COMPATIBLE_FORMATS = Map(
"yyyy-MM-dd" -> LegacyParseFormat('-', isTimestamp = false,
raw"\A\d{4}-\d{2}-\d{2}(\D|\s|\Z)"),
"yyyy/MM/dd" -> LegacyParseFormat('/', isTimestamp = false,
raw"\A\d{4}/\d{2}/\d{2}(\D|\s|\Z)"),
"dd-MM-yyyy" -> LegacyParseFormat('-', isTimestamp = false,
raw"\A\d{2}-\d{2}-\d{4}(\D|\s|\Z)"),
"dd/MM/yyyy" -> LegacyParseFormat('/', isTimestamp = false,
raw"\A\d{2}/\d{2}/\d{4}(\D|\s|\Z)"),
"yyyy-MM-dd HH:mm:ss" -> LegacyParseFormat('-', isTimestamp = true,
raw"\A\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(\D|\s|\Z)"),
"yyyy/MM/dd HH:mm:ss" -> LegacyParseFormat('/', isTimestamp = true,
raw"\A\d{4}/\d{2}/\d{2}[ T]\d{2}:\d{2}:\d{2}(\D|\s|\Z)")
)

/** remove whitespace before month and day */
val REMOVE_WHITESPACE_FROM_MONTH_DAY: RegexReplace =
RegexReplace(raw"(\A\d+)-([ \t]*)(\d+)-([ \t]*)(\d+)", raw"\1-\3-\5")

/** Regex rule to replace "yyyy-m-" with "yyyy-mm-" */
val FIX_SINGLE_DIGIT_MONTH: RegexReplace =
RegexReplace(raw"(\A\d+)-(\d{1}-)", raw"\1-0\2")

/** Regex rule to replace "yyyy-mm-d" with "yyyy-mm-dd" */
val FIX_SINGLE_DIGIT_DAY: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2})-(\d{1})([\D\s]|\Z)", raw"\1-0\2\3")

/** Regex rule to replace "yyyy-mm-dd[ T]h:" with "yyyy-mm-dd hh:" */
val FIX_SINGLE_DIGIT_HOUR: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2})[ T](\d{1}:)", raw"\1 0\2")

/** Regex rule to replace "yyyy-mm-dd[ T]hh:m:" with "yyyy-mm-dd[ T]hh:mm:" */
val FIX_SINGLE_DIGIT_MINUTE: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2}[ T]\d{2}):(\d{1}:)", raw"\1:0\2")

/** Regex rule to replace "yyyy-mm-dd[ T]hh:mm:s" with "yyyy-mm-dd[ T]hh:mm:ss" */
val FIX_SINGLE_DIGIT_SECOND: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2}[ T]\d{2}:\d{2}):(\d{1})([\D\s]|\Z)", raw"\1:0\2\3")

/** Convert dates to standard format */
val FIX_DATES = Seq(
REMOVE_WHITESPACE_FROM_MONTH_DAY,
FIX_SINGLE_DIGIT_MONTH,
FIX_SINGLE_DIGIT_DAY)

/** Convert timestamps to standard format */
val FIX_TIMESTAMPS = Seq(
FIX_SINGLE_DIGIT_HOUR,
FIX_SINGLE_DIGIT_MINUTE,
FIX_SINGLE_DIGIT_SECOND
)

def daysScalarSeconds(name: String): Scalar = {
Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, DateUtils.specialDatesSeconds(name))
}
Expand All @@ -459,7 +542,7 @@ object GpuToTimestamp extends Arm {
}

def isTimestamp(col: ColumnVector, sparkFormat: String, strfFormat: String) : ColumnVector = {
if (COMPATIBLE_FORMATS.contains(sparkFormat)) {
if (CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat)) {
// the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime
// and ToUnixTime and will support parsing a subset of a string so we check the length of
// the string as well which works well for fixed-length formats but if/when we want to
Expand Down Expand Up @@ -489,12 +572,10 @@ object GpuToTimestamp extends Arm {
daysScalar: String => Scalar,
asTimestamp: (ColumnVector, String) => ColumnVector): ColumnVector = {

val isTimestamp = GpuToTimestamp.isTimestamp(lhs.getBase, sparkFormat, strfFormat)

// in addition to date/timestamp strings, we also need to check for special dates and null
// values, since anything else is invalid and should throw an error or be converted to null
// depending on the policy
withResource(isTimestamp) { isTimestamp =>
withResource(isTimestamp(lhs.getBase, sparkFormat, strfFormat)) { isTimestamp =>
withResource(daysEqual(lhs.getBase, DateUtils.EPOCH)) { isEpoch =>
withResource(daysEqual(lhs.getBase, DateUtils.NOW)) { isNow =>
withResource(daysEqual(lhs.getBase, DateUtils.TODAY)) { isToday =>
Expand Down Expand Up @@ -534,8 +615,99 @@ object GpuToTimestamp extends Arm {
}
}
}

/**
* Parse string to timestamp when timeParserPolicy is LEGACY. This was the default behavior
* prior to Spark 3.0
*/
def parseStringAsTimestampWithLegacyParserPolicy(
lhs: GpuColumnVector,
sparkFormat: String,
strfFormat: String,
dtype: DType,
asTimestamp: (ColumnVector, String) => ColumnVector): ColumnVector = {

val format = LEGACY_COMPATIBLE_FORMATS.getOrElse(sparkFormat,
throw new IllegalStateException(s"Unsupported format $sparkFormat"))

// optimization to apply only the necessary rules depending on whether we are
// parsing to a date or timestamp
val regexReplaceRules = if (format.isTimestamp) {
FIX_DATES ++ FIX_TIMESTAMPS
} else {
FIX_DATES
}

// we support date formats using either `-` or `/` to separate year, month, and day and the
// regex rules are written with '-' so we need to replace '-' with '/' here as necessary
val rulesWithSeparator = format.separator match {
case '/' =>
regexReplaceRules.map {
case RegexReplace(pattern, backref) =>
RegexReplace(pattern.replace('-', '/'), backref.replace('-', '/'))
}
case '-' =>
regexReplaceRules
}

// apply each rule in turn to the data
val fixedUp = rulesWithSeparator
.foldLeft(rejectLeadingNewlineThenStrip(lhs))((cv, regexRule) => {
withResource(cv) {
_.stringReplaceWithBackrefs(regexRule.search, regexRule.replace)
}
})

// check the final value against a regex to determine if it is valid or not, so we produce
// null values for any invalid inputs
withResource(Scalar.fromNull(dtype)) { nullValue =>
withResource(fixedUp.matchesRe(format.validRegex)) { isValidDate =>
withResource(asTimestampOrNull(fixedUp, dtype, strfFormat, asTimestamp)) { timestamp =>
isValidDate.ifElse(timestamp, nullValue)
}
}
}
}

/**
* Filter out strings that have a newline before the first non-whitespace character
* and then strip all leading and trailing whitespace.
*/
private def rejectLeadingNewlineThenStrip(lhs: GpuColumnVector) = {
withResource(lhs.getBase.matchesRe("\\A[ \\t]*[\\n]+")) { hasLeadingNewline =>
withResource(Scalar.fromNull(DType.STRING)) { nullValue =>
withResource(lhs.getBase.strip()) { stripped =>
hasLeadingNewline.ifElse(nullValue, stripped)
}
}
}
}

/**
* Parse a string column to timestamp.
*/
def asTimestampOrNull(
cv: ColumnVector,
dtype: DType,
strfFormat: String,
asTimestamp: (ColumnVector, String) => ColumnVector): ColumnVector = {
withResource(cv) { _ =>
withResource(Scalar.fromNull(dtype)) { nullValue =>
withResource(cv.isTimestamp(strfFormat)) { isTimestamp =>
withResource(asTimestamp(cv, strfFormat)) { timestamp =>
isTimestamp.ifElse(timestamp, nullValue)
}
}
}
}
}

}

case class LegacyParseFormat(separator: Char, isTimestamp: Boolean, validRegex: String)

case class RegexReplace(search: String, replace: String)

/**
* A direct conversion of Spark's ToTimestamp class which converts time to UNIX timestamp by
* first converting to microseconds and then dividing by the downScaleFactor
Expand Down Expand Up @@ -572,13 +744,22 @@ abstract class GpuToTimestamp
override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = {
val tmp = if (lhs.dataType == StringType) {
// rhs is ignored we already parsed the format
parseStringAsTimestamp(
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_MICROSECONDS,
daysScalarMicros,
(col, strfFormat) => col.asTimestampMicroseconds(strfFormat))
if (getTimeParserPolicy == LegacyTimeParserPolicy) {
parseStringAsTimestampWithLegacyParserPolicy(
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_MICROSECONDS,
(col, strfFormat) => col.asTimestampMicroseconds(strfFormat))
} else {
parseStringAsTimestamp(
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_MICROSECONDS,
daysScalarMicros,
(col, strfFormat) => col.asTimestampMicroseconds(strfFormat))
}
} else { // Timestamp or DateType
lhs.getBase.asTimestampMicroseconds()
}
Expand Down Expand Up @@ -614,13 +795,22 @@ abstract class GpuToTimestampImproved extends GpuToTimestamp {
override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = {
val tmp = if (lhs.dataType == StringType) {
// rhs is ignored we already parsed the format
parseStringAsTimestamp(
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_SECONDS,
daysScalarSeconds,
(col, strfFormat) => col.asTimestampSeconds(strfFormat))
if (getTimeParserPolicy == LegacyTimeParserPolicy) {
parseStringAsTimestampWithLegacyParserPolicy(
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_SECONDS,
(col, strfFormat) => col.asTimestampSeconds(strfFormat))
} else {
parseStringAsTimestamp(
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_SECONDS,
daysScalarSeconds,
(col, strfFormat) => col.asTimestampSeconds(strfFormat))
}
} else if (lhs.dataType() == DateType){
lhs.getBase.asTimestampSeconds()
} else { // Timestamp
Expand Down
Loading