Skip to content

Commit

Permalink
Update parse date to leverage cuDF support for single digit components (
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Sep 16, 2021
1 parent ccf2f43 commit b47a245
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,74 +463,51 @@ object GpuToTimestamp extends Arm {
// We are compatible with Spark for these formats when the timeParserPolicy is CORRECTED
// or EXCEPTION. It is possible that other formats may be supported but these are the only
// ones that we have tests for.
val CORRECTED_COMPATIBLE_FORMATS = Seq(
"yyyy-MM-dd",
"yyyy-MM",
"yyyy/MM/dd",
"yyyy/MM",
"dd/MM/yyyy",
"yyyy-MM-dd HH:mm:ss",
"MM-dd",
"MM/dd",
"dd-MM",
"dd/MM"
val CORRECTED_COMPATIBLE_FORMATS = Map(
"yyyy-MM-dd" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{4}-\d{2}-\d{2}\Z"),
"yyyy/MM/dd" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{4}/\d{1,2}/\d{1,2}\Z"),
"yyyy-MM" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{4}-\d{2}\Z"),
"yyyy/MM" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{4}/\d{2}\Z"),
"dd/MM/yyyy" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{2}/\d{2}/\d{4}\Z"),
"yyyy-MM-dd HH:mm:ss" -> ParseFormatMeta('-', isTimestamp = true,
raw"\A\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}\Z"),
"MM-dd" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{2}-\d{2}\Z"),
"MM/dd" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{2}/\d{2}\Z"),
"dd-MM" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{2}-\d{2}\Z"),
"dd/MM" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{2}/\d{2}\Z")
)

// We are compatible with Spark for these formats when the timeParserPolicy is LEGACY. It
// is possible that other formats may be supported but these are the only ones that we have
// tests for.
val LEGACY_COMPATIBLE_FORMATS = Map(
"yyyy-MM-dd" -> LegacyParseFormat('-', isTimestamp = false,
raw"\A\d{4}-\d{2}-\d{2}(\D|\s|\Z)"),
"yyyy/MM/dd" -> LegacyParseFormat('/', isTimestamp = false,
raw"\A\d{4}/\d{2}/\d{2}(\D|\s|\Z)"),
"dd-MM-yyyy" -> LegacyParseFormat('-', isTimestamp = false,
raw"\A\d{2}-\d{2}-\d{4}(\D|\s|\Z)"),
"dd/MM/yyyy" -> LegacyParseFormat('/', isTimestamp = false,
raw"\A\d{2}/\d{2}/\d{4}(\D|\s|\Z)"),
"yyyy-MM-dd HH:mm:ss" -> LegacyParseFormat('-', isTimestamp = true,
raw"\A\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(\D|\s|\Z)"),
"yyyy/MM/dd HH:mm:ss" -> LegacyParseFormat('/', isTimestamp = true,
raw"\A\d{4}/\d{2}/\d{2}[ T]\d{2}:\d{2}:\d{2}(\D|\s|\Z)")
"yyyy-MM-dd" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{4}-\d{1,2}-\d{1,2}(\D|\s|\Z)"),
"yyyy/MM/dd" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{4}/\d{1,2}/\d{1,2}(\D|\s|\Z)"),
"dd-MM-yyyy" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{1,2}-\d{1,2}-\d{4}(\D|\s|\Z)"),
"dd/MM/yyyy" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{1,2}/\d{1,2}/\d{4}(\D|\s|\Z)"),
"yyyy-MM-dd HH:mm:ss" -> ParseFormatMeta('-', isTimestamp = true,
raw"\A\d{4}-\d{1,2}-\d{1,2}[ T]\d{1,2}:\d{1,2}:\d{1,2}(\D|\s|\Z)"),
"yyyy/MM/dd HH:mm:ss" -> ParseFormatMeta('/', isTimestamp = true,
raw"\A\d{4}/\d{1,2}/\d{1,2}[ T]\d{1,2}:\d{1,2}:\d{1,2}(\D|\s|\Z)")
)

/** remove whitespace before month and day */
val REMOVE_WHITESPACE_FROM_MONTH_DAY: RegexReplace =
RegexReplace(raw"(\A\d+)-([ \t]*)(\d+)-([ \t]*)(\d+)", raw"\1-\3-\5")

/** Regex rule to replace "yyyy-m-" with "yyyy-mm-" */
val FIX_SINGLE_DIGIT_MONTH: RegexReplace =
RegexReplace(raw"(\A\d+)-(\d{1}-)", raw"\1-0\2")

/** Regex rule to replace "yyyy-mm-d" with "yyyy-mm-dd" */
val FIX_SINGLE_DIGIT_DAY: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2})-(\d{1})([\D\s]|\Z)", raw"\1-0\2\3")

/** Regex rule to replace "yyyy-mm-dd[ T]h:" with "yyyy-mm-dd hh:" */
val FIX_SINGLE_DIGIT_HOUR: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2})[ T](\d{1}:)", raw"\1 0\2")

/** Regex rule to replace "yyyy-mm-dd[ T]hh:m:" with "yyyy-mm-dd[ T]hh:mm:" */
val FIX_SINGLE_DIGIT_MINUTE: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2}[ T]\d{2}):(\d{1}:)", raw"\1:0\2")

/** Regex rule to replace "yyyy-mm-dd[ T]hh:mm:s" with "yyyy-mm-dd[ T]hh:mm:ss" */
val FIX_SINGLE_DIGIT_SECOND: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2}[ T]\d{2}:\d{2}):(\d{1})([\D\s]|\Z)", raw"\1:0\2\3")

/** Convert dates to standard format */
val FIX_DATES = Seq(
REMOVE_WHITESPACE_FROM_MONTH_DAY,
FIX_SINGLE_DIGIT_MONTH,
FIX_SINGLE_DIGIT_DAY)

/** Convert timestamps to standard format */
val FIX_TIMESTAMPS = Seq(
FIX_SINGLE_DIGIT_HOUR,
FIX_SINGLE_DIGIT_MINUTE,
FIX_SINGLE_DIGIT_SECOND
)

def daysScalarSeconds(name: String): Scalar = {
Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, DateUtils.specialDatesSeconds(name))
}
Expand All @@ -546,25 +523,22 @@ object GpuToTimestamp extends Arm {
}

def isTimestamp(col: ColumnVector, sparkFormat: String, strfFormat: String) : ColumnVector = {
if (CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat)) {
// the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime
// and ToUnixTime and will support parsing a subset of a string so we check the length of
// the string as well which works well for fixed-length formats but if/when we want to
// support variable-length formats (such as timestamps with milliseconds) then we will need
// to use regex instead.
withResource(col.getCharLengths) { actualLen =>
withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen =>
withResource(actualLen.equalTo(expectedLen)) { lengthOk =>
withResource(col.isTimestamp(strfFormat)) { isTimestamp =>
isTimestamp.and(lengthOk)
}
CORRECTED_COMPATIBLE_FORMATS.get(sparkFormat) match {
case Some(fmt) =>
// the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime
// and ToUnixTime and will support parsing a subset of a string so we check the length of
// the string as well which works well for fixed-length formats but if/when we want to
// support variable-length formats (such as timestamps with milliseconds) then we will need
// to use regex instead.
withResource(col.matchesRe(fmt.validRegex)) { matches =>
withResource(col.isTimestamp(strfFormat)) { isTimestamp =>
isTimestamp.and(matches)
}
}
}
} else {
// this is the incompatibleDateFormats case where we do not guarantee compatibility with
// Spark and assume that all non-null inputs are valid
ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt)
case _ =>
// this is the incompatibleDateFormats case where we do not guarantee compatibility with
// Spark and assume that all non-null inputs are valid
ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt)
}
}

Expand Down Expand Up @@ -634,13 +608,7 @@ object GpuToTimestamp extends Arm {
val format = LEGACY_COMPATIBLE_FORMATS.getOrElse(sparkFormat,
throw new IllegalStateException(s"Unsupported format $sparkFormat"))

// optimization to apply only the necessary rules depending on whether we are
// parsing to a date or timestamp
val regexReplaceRules = if (format.isTimestamp) {
FIX_DATES ++ FIX_TIMESTAMPS
} else {
FIX_DATES
}
val regexReplaceRules = Seq(REMOVE_WHITESPACE_FROM_MONTH_DAY)

// we support date formats using either `-` or `/` to separate year, month, and day and the
// regex rules are written with '-' so we need to replace '-' with '/' here as necessary
Expand Down Expand Up @@ -708,7 +676,7 @@ object GpuToTimestamp extends Arm {

}

case class LegacyParseFormat(separator: Char, isTimestamp: Boolean, validRegex: String)
case class ParseFormatMeta(separator: Char, isTimestamp: Boolean, validRegex: String)

case class RegexReplace(search: String, replace: String)

Expand Down
128 changes: 1 addition & 127 deletions tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.functions.{col, to_date, to_timestamp, unix_timestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuToTimestamp.{FIX_DATES, FIX_SINGLE_DIGIT_DAY, FIX_SINGLE_DIGIT_HOUR, FIX_SINGLE_DIGIT_MINUTE, FIX_SINGLE_DIGIT_MONTH, FIX_SINGLE_DIGIT_SECOND, FIX_TIMESTAMPS, REMOVE_WHITESPACE_FROM_MONTH_DAY}
import org.apache.spark.sql.rapids.GpuToTimestamp.REMOVE_WHITESPACE_FROM_MONTH_DAY
import org.apache.spark.sql.rapids.RegexReplace

class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterEach {
Expand Down Expand Up @@ -58,15 +58,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
}


// Test removed temporarily due to known issues
// https://github.com/NVIDIA/spark-rapids/issues/3478
/*
testSparkResultsAreEqual("to_date yyyy-MM-dd",
datesAsStrings,
conf = CORRECTED_TIME_PARSER_POLICY) {
df => df.withColumn("c1", to_date(col("c0"), "yyyy-MM-dd"))
}
*/

testSparkResultsAreEqual("to_date yyyy-MM-dd LEGACY",
datesAsStrings,
Expand Down Expand Up @@ -104,15 +100,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
df => df.withColumn("c1", to_date(col("c0"), "yyyy-MM-dd"))
}

// Test removed temporarily due to known issues
// https://github.com/NVIDIA/spark-rapids/issues/3478
/*
testSparkResultsAreEqual("to_timestamp yyyy-MM-dd",
timestampsAsStrings,
conf = CORRECTED_TIME_PARSER_POLICY) {
df => df.withColumn("c1", to_timestamp(col("c0"), "yyyy-MM-dd"))
}
*/

testSparkResultsAreEqual("to_timestamp dd/MM/yyyy",
timestampsAsStrings,
Expand All @@ -126,15 +118,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
df => df.withColumn("c1", to_date(col("c0")))
}

// Test removed temporarily due to known issues
// https://github.com/NVIDIA/spark-rapids/issues/3478
/*
testSparkResultsAreEqual("unix_timestamp parse date",
timestampsAsStrings,
CORRECTED_TIME_PARSER_POLICY) {
df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd"))
}
*/

testSparkResultsAreEqual("unix_timestamp parse yyyy/MM",
timestampsAsStrings,
Expand Down Expand Up @@ -256,120 +244,6 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
Seq("1-1-1", "1-1-1", "1-1-1", null))
}

test("Regex: Fix single digit month") {
testRegex(FIX_SINGLE_DIGIT_MONTH,
Seq("1-2-3", "1111-2-3", "2000-7-7\n9\t8568:\n", null),
Seq("1-02-3", "1111-02-3", "2000-07-7\n9\t8568:\n", null))
}

test("Regex: Fix single digit day followed by non digit char") {
// single digit day followed by non digit
testRegex(FIX_SINGLE_DIGIT_DAY,
Seq("1111-02-3 ", "1111-02-3:", "2000-03-192", "2000-07-7\n9\t8568:\n", null),
Seq("1111-02-03 ", "1111-02-03:", "2000-03-192", "2000-07-07\n9\t8568:\n", null))
}

test("Regex: Fix single digit day at end of string") {
// single digit day at end of string
testRegex(FIX_SINGLE_DIGIT_DAY,
Seq("1-02-3", "1111-02-3", "1111-02-03", "2000-03-192", null),
Seq("1-02-03", "1111-02-03", "1111-02-03", "2000-03-192", null))
}

test("Regex: Fix single digit hour 1") {
// single digit hour with space separating date and time
testRegex(FIX_SINGLE_DIGIT_HOUR,
Seq("2001-12-31 1:2:3", "2001-12-31 1:22:33", null),
Seq("2001-12-31 01:2:3", "2001-12-31 01:22:33", null))
}

test("Regex: Fix single digit hour 2") {
// single digit hour with 'T' separating date and time
// note that the T gets replaced with whitespace in this case
testRegex(FIX_SINGLE_DIGIT_HOUR,
Seq("2001-12-31T1:2:3", "2001-12-31T1:22:33", null),
Seq("2001-12-31 01:2:3", "2001-12-31 01:22:33", null))
}

test("Regex: Fix single digit minute") {
// single digit minute at end of string
testRegex(FIX_SINGLE_DIGIT_MINUTE,
Seq("2001-12-31 01:2:3", "2001-12-31 01:22:33", null),
Seq("2001-12-31 01:02:3", "2001-12-31 01:22:33", null))
}

test("Regex: Fix single digit second followed by non digit") {
// single digit second followed by non digit
testRegex(FIX_SINGLE_DIGIT_SECOND,
Seq("2001-12-31 01:02:3:", "2001-12-31 01:22:33:", "2001-12-31 01:02:3 ", null),
Seq("2001-12-31 01:02:03:", "2001-12-31 01:22:33:", "2001-12-31 01:02:03 ", null))
}

test("Regex: Fix single digit second at end of string") {
// single digit day at end of string
testRegex(FIX_SINGLE_DIGIT_SECOND,
Seq("2001-12-31 01:02:3", "2001-12-31 01:22:33", null),
Seq("2001-12-31 01:02:03", "2001-12-31 01:22:33", null))
}

test("Regex: Apply all date rules") {
// end to end test of all date rules being applied in sequence
val testPairs = Seq(
("2001- 1-1", "2001-01-01"),
("2001-1- 1", "2001-01-01"),
("2001- 1- 1", "2001-01-01"),
("1999-12-31", "1999-12-31"),
("1999-2-31", "1999-02-31"),
("1999-2-3:", "1999-02-03:"),
("1999-2-3", "1999-02-03"),
("1999-2-3 1:2:3.4", "1999-02-03 1:2:3.4"),
("1999-2-3T1:2:3.4", "1999-02-03T1:2:3.4")
)
val values = testPairs.map(_._1)
val expected = testPairs.map(_._2)
withResource(ColumnVector.fromStrings(values: _*)) { v =>
withResource(ColumnVector.fromStrings(expected: _*)) { expected =>
val actual = FIX_DATES.foldLeft(v.incRefCount())((a, b) => {
withResource(a) {
_.stringReplaceWithBackrefs(b.search, b.replace)
}
})
withResource(actual) { _ =>
CudfTestHelper.assertColumnsAreEqual(expected, actual)
}
}
}
}

test("Regex: Apply all date and timestamp rules") {
// end to end test of all date and timestamp rules being applied in sequence
val testPairs = Seq(
("2001- 1-1", "2001-01-01"),
("2001-1- 1", "2001-01-01"),
("2001- 1- 1", "2001-01-01"),
("1999-12-31", "1999-12-31"),
("1999-2-31", "1999-02-31"),
("1999-2-3:", "1999-02-03:"),
("1999-2-3", "1999-02-03"),
("1999-2-3 1:2:3.4", "1999-02-03 01:02:03.4"),
("1999-2-3T1:2:3.4", "1999-02-03 01:02:03.4")
)
val values = testPairs.map(_._1)
val expected = testPairs.map(_._2)
withResource(ColumnVector.fromStrings(values: _*)) { v =>
withResource(ColumnVector.fromStrings(expected: _*)) { expected =>
val actual = (FIX_DATES ++ FIX_TIMESTAMPS).foldLeft(v.incRefCount())((a, b) => {
withResource(a) {
_.stringReplaceWithBackrefs(b.search, b.replace)
}
})
withResource(actual) { _ =>
CudfTestHelper.assertColumnsAreEqual(expected, actual)
}
}
}
}

test("literals: ensure time literals are correct") {
val conf = new SparkConf()
val df = withGpuSparkSession(spark => {
Expand Down

0 comments on commit b47a245

Please sign in to comment.