Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update parse date to leverage cuDF support for single digit components #3496

Merged
merged 2 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,74 +463,51 @@ object GpuToTimestamp extends Arm {
// We are compatible with Spark for these formats when the timeParserPolicy is CORRECTED
// or EXCEPTION. It is possible that other formats may be supported but these are the only
// ones that we have tests for.
val CORRECTED_COMPATIBLE_FORMATS = Seq(
"yyyy-MM-dd",
"yyyy-MM",
"yyyy/MM/dd",
"yyyy/MM",
"dd/MM/yyyy",
"yyyy-MM-dd HH:mm:ss",
"MM-dd",
"MM/dd",
"dd-MM",
"dd/MM"
val CORRECTED_COMPATIBLE_FORMATS = Map(
"yyyy-MM-dd" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{4}-\d{2}-\d{2}\Z"),
"yyyy/MM/dd" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{4}/\d{1,2}/\d{1,2}\Z"),
"yyyy-MM" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{4}-\d{2}\Z"),
"yyyy/MM" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{4}/\d{2}\Z"),
"dd/MM/yyyy" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{2}/\d{2}/\d{4}\Z"),
"yyyy-MM-dd HH:mm:ss" -> ParseFormatMeta('-', isTimestamp = true,
raw"\A\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}\Z"),
"MM-dd" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{2}-\d{2}\Z"),
"MM/dd" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{2}/\d{2}\Z"),
"dd-MM" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{2}-\d{2}\Z"),
"dd/MM" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{2}/\d{2}\Z")
)

// We are compatible with Spark for these formats when the timeParserPolicy is LEGACY. It
// is possible that other formats may be supported but these are the only ones that we have
// tests for.
val LEGACY_COMPATIBLE_FORMATS = Map(
"yyyy-MM-dd" -> LegacyParseFormat('-', isTimestamp = false,
raw"\A\d{4}-\d{2}-\d{2}(\D|\s|\Z)"),
"yyyy/MM/dd" -> LegacyParseFormat('/', isTimestamp = false,
raw"\A\d{4}/\d{2}/\d{2}(\D|\s|\Z)"),
"dd-MM-yyyy" -> LegacyParseFormat('-', isTimestamp = false,
raw"\A\d{2}-\d{2}-\d{4}(\D|\s|\Z)"),
"dd/MM/yyyy" -> LegacyParseFormat('/', isTimestamp = false,
raw"\A\d{2}/\d{2}/\d{4}(\D|\s|\Z)"),
"yyyy-MM-dd HH:mm:ss" -> LegacyParseFormat('-', isTimestamp = true,
raw"\A\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(\D|\s|\Z)"),
"yyyy/MM/dd HH:mm:ss" -> LegacyParseFormat('/', isTimestamp = true,
raw"\A\d{4}/\d{2}/\d{2}[ T]\d{2}:\d{2}:\d{2}(\D|\s|\Z)")
"yyyy-MM-dd" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{4}-\d{1,2}-\d{1,2}(\D|\s|\Z)"),
"yyyy/MM/dd" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{4}/\d{1,2}/\d{1,2}(\D|\s|\Z)"),
"dd-MM-yyyy" -> ParseFormatMeta('-', isTimestamp = false,
raw"\A\d{1,2}-\d{1,2}-\d{4}(\D|\s|\Z)"),
"dd/MM/yyyy" -> ParseFormatMeta('/', isTimestamp = false,
raw"\A\d{1,2}/\d{1,2}/\d{4}(\D|\s|\Z)"),
"yyyy-MM-dd HH:mm:ss" -> ParseFormatMeta('-', isTimestamp = true,
raw"\A\d{4}-\d{1,2}-\d{1,2}[ T]\d{1,2}:\d{1,2}:\d{1,2}(\D|\s|\Z)"),
"yyyy/MM/dd HH:mm:ss" -> ParseFormatMeta('/', isTimestamp = true,
raw"\A\d{4}/\d{1,2}/\d{1,2}[ T]\d{1,2}:\d{1,2}:\d{1,2}(\D|\s|\Z)")
)

/** remove whitespace before month and day */
val REMOVE_WHITESPACE_FROM_MONTH_DAY: RegexReplace =
RegexReplace(raw"(\A\d+)-([ \t]*)(\d+)-([ \t]*)(\d+)", raw"\1-\3-\5")

/** Regex rule to replace "yyyy-m-" with "yyyy-mm-" */
val FIX_SINGLE_DIGIT_MONTH: RegexReplace =
RegexReplace(raw"(\A\d+)-(\d{1}-)", raw"\1-0\2")

/** Regex rule to replace "yyyy-mm-d" with "yyyy-mm-dd" */
val FIX_SINGLE_DIGIT_DAY: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2})-(\d{1})([\D\s]|\Z)", raw"\1-0\2\3")

/** Regex rule to replace "yyyy-mm-dd[ T]h:" with "yyyy-mm-dd hh:" */
val FIX_SINGLE_DIGIT_HOUR: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2})[ T](\d{1}:)", raw"\1 0\2")

/** Regex rule to replace "yyyy-mm-dd[ T]hh:m:" with "yyyy-mm-dd[ T]hh:mm:" */
val FIX_SINGLE_DIGIT_MINUTE: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2}[ T]\d{2}):(\d{1}:)", raw"\1:0\2")

/** Regex rule to replace "yyyy-mm-dd[ T]hh:mm:s" with "yyyy-mm-dd[ T]hh:mm:ss" */
val FIX_SINGLE_DIGIT_SECOND: RegexReplace =
RegexReplace(raw"(\A\d+-\d{2}-\d{2}[ T]\d{2}:\d{2}):(\d{1})([\D\s]|\Z)", raw"\1:0\2\3")

/** Convert dates to standard format */
val FIX_DATES = Seq(
REMOVE_WHITESPACE_FROM_MONTH_DAY,
FIX_SINGLE_DIGIT_MONTH,
FIX_SINGLE_DIGIT_DAY)

/** Convert timestamps to standard format */
val FIX_TIMESTAMPS = Seq(
FIX_SINGLE_DIGIT_HOUR,
FIX_SINGLE_DIGIT_MINUTE,
FIX_SINGLE_DIGIT_SECOND
)

def daysScalarSeconds(name: String): Scalar = {
Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, DateUtils.specialDatesSeconds(name))
}
Expand All @@ -546,25 +523,22 @@ object GpuToTimestamp extends Arm {
}

def isTimestamp(col: ColumnVector, sparkFormat: String, strfFormat: String) : ColumnVector = {
if (CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat)) {
// the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime
// and ToUnixTime and will support parsing a subset of a string so we check the length of
// the string as well which works well for fixed-length formats but if/when we want to
// support variable-length formats (such as timestamps with milliseconds) then we will need
// to use regex instead.
withResource(col.getCharLengths) { actualLen =>
withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen =>
withResource(actualLen.equalTo(expectedLen)) { lengthOk =>
withResource(col.isTimestamp(strfFormat)) { isTimestamp =>
isTimestamp.and(lengthOk)
}
CORRECTED_COMPATIBLE_FORMATS.get(sparkFormat) match {
case Some(fmt) =>
// the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime
// and ToUnixTime and will support parsing a subset of a string so we check the length of
// the string as well which works well for fixed-length formats but if/when we want to
// support variable-length formats (such as timestamps with milliseconds) then we will need
// to use regex instead.
withResource(col.matchesRe(fmt.validRegex)) { matches =>
withResource(col.isTimestamp(strfFormat)) { isTimestamp =>
isTimestamp.and(matches)
}
}
}
} else {
// this is the incompatibleDateFormats case where we do not guarantee compatibility with
// Spark and assume that all non-null inputs are valid
ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt)
case _ =>
// this is the incompatibleDateFormats case where we do not guarantee compatibility with
// Spark and assume that all non-null inputs are valid
ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt)
}
}

Expand Down Expand Up @@ -634,13 +608,7 @@ object GpuToTimestamp extends Arm {
val format = LEGACY_COMPATIBLE_FORMATS.getOrElse(sparkFormat,
throw new IllegalStateException(s"Unsupported format $sparkFormat"))

// optimization to apply only the necessary rules depending on whether we are
// parsing to a date or timestamp
val regexReplaceRules = if (format.isTimestamp) {
FIX_DATES ++ FIX_TIMESTAMPS
} else {
FIX_DATES
}
val regexReplaceRules = Seq(REMOVE_WHITESPACE_FROM_MONTH_DAY)

// we support date formats using either `-` or `/` to separate year, month, and day and the
// regex rules are written with '-' so we need to replace '-' with '/' here as necessary
Expand Down Expand Up @@ -708,7 +676,7 @@ object GpuToTimestamp extends Arm {

}

case class LegacyParseFormat(separator: Char, isTimestamp: Boolean, validRegex: String)
case class ParseFormatMeta(separator: Char, isTimestamp: Boolean, validRegex: String)

case class RegexReplace(search: String, replace: String)

Expand Down
128 changes: 1 addition & 127 deletions tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.functions.{col, to_date, to_timestamp, unix_timestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuToTimestamp.{FIX_DATES, FIX_SINGLE_DIGIT_DAY, FIX_SINGLE_DIGIT_HOUR, FIX_SINGLE_DIGIT_MINUTE, FIX_SINGLE_DIGIT_MONTH, FIX_SINGLE_DIGIT_SECOND, FIX_TIMESTAMPS, REMOVE_WHITESPACE_FROM_MONTH_DAY}
import org.apache.spark.sql.rapids.GpuToTimestamp.REMOVE_WHITESPACE_FROM_MONTH_DAY
import org.apache.spark.sql.rapids.RegexReplace

class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterEach {
Expand Down Expand Up @@ -58,15 +58,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
}


// Test removed temporarily due to known issues
// https://github.com/NVIDIA/spark-rapids/issues/3478
/*
testSparkResultsAreEqual("to_date yyyy-MM-dd",
datesAsStrings,
conf = CORRECTED_TIME_PARSER_POLICY) {
df => df.withColumn("c1", to_date(col("c0"), "yyyy-MM-dd"))
}
*/

testSparkResultsAreEqual("to_date yyyy-MM-dd LEGACY",
datesAsStrings,
Expand Down Expand Up @@ -104,15 +100,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
df => df.withColumn("c1", to_date(col("c0"), "yyyy-MM-dd"))
}

// Test removed temporarily due to known issues
// https://github.com/NVIDIA/spark-rapids/issues/3478
/*
testSparkResultsAreEqual("to_timestamp yyyy-MM-dd",
timestampsAsStrings,
conf = CORRECTED_TIME_PARSER_POLICY) {
df => df.withColumn("c1", to_timestamp(col("c0"), "yyyy-MM-dd"))
}
*/

testSparkResultsAreEqual("to_timestamp dd/MM/yyyy",
timestampsAsStrings,
Expand All @@ -126,15 +118,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
df => df.withColumn("c1", to_date(col("c0")))
}

// Test removed temporarily due to known issues
// https://github.com/NVIDIA/spark-rapids/issues/3478
/*
testSparkResultsAreEqual("unix_timestamp parse date",
timestampsAsStrings,
CORRECTED_TIME_PARSER_POLICY) {
df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd"))
}
*/

testSparkResultsAreEqual("unix_timestamp parse yyyy/MM",
timestampsAsStrings,
Expand Down Expand Up @@ -256,120 +244,6 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
Seq("1-1-1", "1-1-1", "1-1-1", null))
}

test("Regex: Fix single digit month") {
testRegex(FIX_SINGLE_DIGIT_MONTH,
Seq("1-2-3", "1111-2-3", "2000-7-7\n9\t8568:\n", null),
Seq("1-02-3", "1111-02-3", "2000-07-7\n9\t8568:\n", null))
}

test("Regex: Fix single digit day followed by non digit char") {
// single digit day followed by non digit
testRegex(FIX_SINGLE_DIGIT_DAY,
Seq("1111-02-3 ", "1111-02-3:", "2000-03-192", "2000-07-7\n9\t8568:\n", null),
Seq("1111-02-03 ", "1111-02-03:", "2000-03-192", "2000-07-07\n9\t8568:\n", null))
}

test("Regex: Fix single digit day at end of string") {
// single digit day at end of string
testRegex(FIX_SINGLE_DIGIT_DAY,
Seq("1-02-3", "1111-02-3", "1111-02-03", "2000-03-192", null),
Seq("1-02-03", "1111-02-03", "1111-02-03", "2000-03-192", null))
}

test("Regex: Fix single digit hour 1") {
// single digit hour with space separating date and time
testRegex(FIX_SINGLE_DIGIT_HOUR,
Seq("2001-12-31 1:2:3", "2001-12-31 1:22:33", null),
Seq("2001-12-31 01:2:3", "2001-12-31 01:22:33", null))
}

test("Regex: Fix single digit hour 2") {
// single digit hour with 'T' separating date and time
// note that the T gets replaced with whitespace in this case
testRegex(FIX_SINGLE_DIGIT_HOUR,
Seq("2001-12-31T1:2:3", "2001-12-31T1:22:33", null),
Seq("2001-12-31 01:2:3", "2001-12-31 01:22:33", null))
}

test("Regex: Fix single digit minute") {
// single digit minute at end of string
testRegex(FIX_SINGLE_DIGIT_MINUTE,
Seq("2001-12-31 01:2:3", "2001-12-31 01:22:33", null),
Seq("2001-12-31 01:02:3", "2001-12-31 01:22:33", null))
}

test("Regex: Fix single digit second followed by non digit") {
// single digit second followed by non digit
testRegex(FIX_SINGLE_DIGIT_SECOND,
Seq("2001-12-31 01:02:3:", "2001-12-31 01:22:33:", "2001-12-31 01:02:3 ", null),
Seq("2001-12-31 01:02:03:", "2001-12-31 01:22:33:", "2001-12-31 01:02:03 ", null))
}

test("Regex: Fix single digit second at end of string") {
// single digit day at end of string
testRegex(FIX_SINGLE_DIGIT_SECOND,
Seq("2001-12-31 01:02:3", "2001-12-31 01:22:33", null),
Seq("2001-12-31 01:02:03", "2001-12-31 01:22:33", null))
}

test("Regex: Apply all date rules") {
// end to end test of all date rules being applied in sequence
val testPairs = Seq(
("2001- 1-1", "2001-01-01"),
("2001-1- 1", "2001-01-01"),
("2001- 1- 1", "2001-01-01"),
("1999-12-31", "1999-12-31"),
("1999-2-31", "1999-02-31"),
("1999-2-3:", "1999-02-03:"),
("1999-2-3", "1999-02-03"),
("1999-2-3 1:2:3.4", "1999-02-03 1:2:3.4"),
("1999-2-3T1:2:3.4", "1999-02-03T1:2:3.4")
)
val values = testPairs.map(_._1)
val expected = testPairs.map(_._2)
withResource(ColumnVector.fromStrings(values: _*)) { v =>
withResource(ColumnVector.fromStrings(expected: _*)) { expected =>
val actual = FIX_DATES.foldLeft(v.incRefCount())((a, b) => {
withResource(a) {
_.stringReplaceWithBackrefs(b.search, b.replace)
}
})
withResource(actual) { _ =>
CudfTestHelper.assertColumnsAreEqual(expected, actual)
}
}
}
}

test("Regex: Apply all date and timestamp rules") {
// end to end test of all date and timestamp rules being applied in sequence
val testPairs = Seq(
("2001- 1-1", "2001-01-01"),
("2001-1- 1", "2001-01-01"),
("2001- 1- 1", "2001-01-01"),
("1999-12-31", "1999-12-31"),
("1999-2-31", "1999-02-31"),
("1999-2-3:", "1999-02-03:"),
("1999-2-3", "1999-02-03"),
("1999-2-3 1:2:3.4", "1999-02-03 01:02:03.4"),
("1999-2-3T1:2:3.4", "1999-02-03 01:02:03.4")
)
val values = testPairs.map(_._1)
val expected = testPairs.map(_._2)
withResource(ColumnVector.fromStrings(values: _*)) { v =>
withResource(ColumnVector.fromStrings(expected: _*)) { expected =>
val actual = (FIX_DATES ++ FIX_TIMESTAMPS).foldLeft(v.incRefCount())((a, b) => {
withResource(a) {
_.stringReplaceWithBackrefs(b.search, b.replace)
}
})
withResource(actual) { _ =>
CudfTestHelper.assertColumnsAreEqual(expected, actual)
}
}
}
}

test("literals: ensure time literals are correct") {
val conf = new SparkConf()
val df = withGpuSparkSession(spark => {
Expand Down