From 8736a97ac139c1e8198852c14ddc9bce47398d68 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 3 Feb 2023 20:57:22 +0800 Subject: [PATCH] Spark: Fix timestamp value transformation (#216) * Spark: Fix timestamp value transformation * nit --- .../spark/sql/clickhouse/SparkTest.scala | 15 +++++++ .../cluster/BaseClusterWriteSuite.scala | 18 ++++---- .../cluster/SparkClickHouseClusterTest.scala | 11 +++-- .../single/ClickHouseDataTypeSuite.scala | 44 ++++++++++++------- .../single/ClickHouseSingleSuite.scala | 28 ++++++------ .../single/SparkClickHouseSingleTest.scala | 29 ++++++++++-- .../WriteDistributionAndOrderingSuite.scala | 9 ++-- .../scala/xenon/clickhouse/SQLHelper.scala | 16 +++---- 8 files changed, 106 insertions(+), 64 deletions(-) diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/SparkTest.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/SparkTest.scala index 3c7bfb24..97eadc5f 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/SparkTest.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/SparkTest.scala @@ -19,10 +19,25 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{DataFrame, QueryTest} import xenon.clickhouse.ClickHouseCommandRunner +import java.sql.{Date, Timestamp} +import java.time.Instant + trait SparkTest extends QueryTest with SharedSparkSession { def cmdRunnerOptions: Map[String, String] + /** + * @param text format yyyy-[m]m-[d]d + * @return A SQL Date + */ + def date(text: String): Date = Date.valueOf(text) + + /** + * @param text format 2007-12-03T10:15:30.00Z + * @return A SQL Timestamp + */ + def timestamp(text: String): Timestamp = Timestamp.from(Instant.parse(text)) + override protected def sparkConf: SparkConf = super.sparkConf .setMaster("local[2]") .setAppName("spark-ut") diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/BaseClusterWriteSuite.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/BaseClusterWriteSuite.scala index 65e6b9ac..d2380668 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/BaseClusterWriteSuite.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/BaseClusterWriteSuite.scala @@ -18,8 +18,6 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.types._ -import java.sql.Timestamp - abstract class BaseClusterWriteSuite extends SparkClickHouseClusterTest { test("clickhouse write cluster") { @@ -38,28 +36,28 @@ abstract class BaseClusterWriteSuite extends SparkClickHouseClusterTest { .table(s"$db.$tbl_dist") .select("create_time", "y", "m", "id", "value"), Seq( - Row(Timestamp.valueOf("2021-01-01 10:10:10"), 2021, 1, 1L, "1"), - Row(Timestamp.valueOf("2022-02-02 10:10:10"), 2022, 2, 2L, "2"), - Row(Timestamp.valueOf("2023-03-03 10:10:10"), 2023, 3, 3L, "3"), - Row(Timestamp.valueOf("2024-04-04 10:10:10"), 2024, 4, 4L, "4") + Row(timestamp("2021-01-01T10:10:10Z"), 2021, 1, 1L, "1"), + Row(timestamp("2022-02-02T10:10:10Z"), 2022, 2, 2L, "2"), + Row(timestamp("2023-03-03T10:10:10Z"), 2023, 3, 3L, "3"), + Row(timestamp("2024-04-04T10:10:10Z"), 2024, 4, 4L, "4") ) ) checkAnswer( spark.table(s"clickhouse_s1r1.$db.$tbl_local"), - Row(Timestamp.valueOf("2024-04-04 10:10:10"), 2024, 4, 4L, "4") :: Nil + Row(timestamp("2024-04-04T10:10:10Z"), 2024, 4, 4L, "4") :: Nil ) checkAnswer( spark.table(s"clickhouse_s1r2.$db.$tbl_local"), - Row(Timestamp.valueOf("2021-01-01 10:10:10"), 2021, 1, 1L, "1") :: Nil + Row(timestamp("2021-01-01T10:10:10Z"), 2021, 1, 1L, "1") :: Nil ) checkAnswer( spark.table(s"clickhouse_s2r1.$db.$tbl_local"), - Row(Timestamp.valueOf("2022-02-02 10:10:10"), 2022, 2, 2L, "2") :: Nil + Row(timestamp("2022-02-02T10:10:10Z"), 2022, 2, 2L, "2") :: Nil ) checkAnswer( spark.table(s"clickhouse_s2r2.$db.$tbl_local"), - Row(Timestamp.valueOf("2023-03-03 10:10:10"), 2023, 3, 3L, "3") :: Nil + Row(timestamp("2023-03-03T10:10:10Z"), 2023, 3, 3L, "3") :: Nil ) } } diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala index 3ca8b88f..df11b03e 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.clickhouse.cluster import org.apache.spark.SparkConf import org.apache.spark.sql.clickhouse.SparkTest -import org.apache.spark.sql.functions.{month, to_timestamp, year} +import org.apache.spark.sql.functions.{month, year} import xenon.clickhouse.base.ClickHouseClusterMixIn trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn { @@ -143,12 +143,11 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn { if (writeData) { val tblSchema = spark.table(s"$db.$tbl_dist").schema val dataDF = spark.createDataFrame(Seq( - ("2021-01-01 10:10:10", 1L, "1"), - ("2022-02-02 10:10:10", 2L, "2"), - ("2023-03-03 10:10:10", 3L, "3"), - ("2024-04-04 10:10:10", 4L, "4") + (timestamp("2021-01-01T10:10:10Z"), 1L, "1"), + (timestamp("2022-02-02T10:10:10Z"), 2L, "2"), + (timestamp("2023-03-03T10:10:10Z"), 3L, "3"), + (timestamp("2024-04-04T10:10:10Z"), 4L, "4") )).toDF("create_time", "id", "value") - .withColumn("create_time", to_timestamp($"create_time")) .withColumn("y", year($"create_time")) .withColumn("m", month($"create_time")) .select($"create_time", $"y", $"m", $"id", $"value") diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseDataTypeSuite.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseDataTypeSuite.scala index c3c2b535..6b98a213 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseDataTypeSuite.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseDataTypeSuite.scala @@ -18,8 +18,6 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types.DataTypes.{createArrayType, createMapType} import org.apache.spark.sql.types._ -import java.sql.Date - class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest { test("write supported data types") { @@ -28,7 +26,7 @@ class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest { StructField("col_string", StringType, false) :: StructField("col_date", DateType, false) :: StructField("col_array_string", createArrayType(StringType, false), false) :: - StructField("col_map_string", createMapType(StringType, StringType, false), false) :: + StructField("col_map_string_string", createMapType(StringType, StringType, false), false) :: Nil ) val db = "t_w_s_db" @@ -39,9 +37,9 @@ class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest { // assert(StructType(structFields) === tblSchema) val dataDF = spark.createDataFrame(Seq( - (1L, "a", Date.valueOf("1996-06-06"), Seq("a", "b", "c"), Map("a" -> "x")), - (2L, "A", Date.valueOf("2022-04-12"), Seq("A", "B", "C"), Map("A" -> "X")) - )).toDF("id", "col_string", "col_date", "col_array_string", "col_map_string") + (1L, "a", date("1996-06-06"), Seq("a", "b", "c"), Map("a" -> "x")), + (2L, "A", date("2022-04-12"), Seq("A", "B", "C"), Map("A" -> "X")) + )).toDF("id", "col_string", "col_date", "col_array_string", "col_map_string_string") spark.createDataFrame(dataDF.rdd, tblSchema) .writeTo(s"$db.$tbl") @@ -49,19 +47,33 @@ class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest { checkAnswer( spark.table(s"$db.$tbl").sort("id"), - Row(1L, "a", Date.valueOf("1996-06-06"), Seq("a", "b", "c"), Map("a" -> "x")) :: - Row(2L, "A", Date.valueOf("2022-04-12"), Seq("A", "B", "C"), Map("A" -> "X")) :: Nil + Row(1L, "a", date("1996-06-06"), Seq("a", "b", "c"), Map("a" -> "x")) :: + Row(2L, "A", date("2022-04-12"), Seq("A", "B", "C"), Map("A" -> "X")) :: Nil ) } } - test("write unsupported data types") {} - - test("read supported data types") {} - - test("read unsupported data types") {} - - test("spark to clickhouse data type mappings") {} + test("DateType - DateTime") { + val db = "t_r_s_db" + val tbl = "t_r_s_tbl" + withKVTable(db, tbl, valueColDef = "DateTime") { + runClickHouseSQL( + s"""INSERT INTO $db.$tbl VALUES + |(1, '2021-01-01 01:01:01'), + |(2, '2022-02-02 02:02:02') + |""".stripMargin + ) - test("clickhouse to spark data type mappings") {} + val data = spark.sql(s"SELECT key, value FROM $db.$tbl ORDER BY key") + checkAnswer( + data, + Row(1, timestamp("2021-01-01T01:01:01Z")) :: + Row(2, timestamp("2022-02-02T02:02:02Z")) :: Nil + ) + checkAnswer( + data.filter("value > '2022-01-01 01:01:01'"), + Row(2, timestamp("2022-02-02T02:02:02Z")) :: Nil + ) + } + } } diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseSingleSuite.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseSingleSuite.scala index f2e32fc9..a452ff98 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseSingleSuite.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/ClickHouseSingleSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.clickhouse.single import org.apache.spark.sql.Row import org.apache.spark.sql.types._ -import java.sql.{Date, Timestamp} - class ClickHouseSingleSuite extends SparkClickHouseSingleTest { import testImplicits._ @@ -124,18 +122,18 @@ class ClickHouseSingleSuite extends SparkClickHouseSingleTest { |""".stripMargin ) spark.createDataFrame(Seq( - (21L, Date.valueOf("2022-04-21")), - (22L, Date.valueOf("2022-04-22")) + (21L, date("2022-04-21")), + (22L, date("2022-04-22")) )) .toDF("id", "date") .writeTo(s"$db.$tbl").append checkAnswer( spark.table(s"$db.$tbl").orderBy($"id"), - Row(11L, Date.valueOf("2022-04-11")) :: - Row(12L, Date.valueOf("2022-04-12")) :: - Row(21L, Date.valueOf("2022-04-21")) :: - Row(22L, Date.valueOf("2022-04-22")) :: Nil + Row(11L, date("2022-04-11")) :: + Row(12L, date("2022-04-12")) :: + Row(21L, date("2022-04-21")) :: + Row(22L, date("2022-04-22")) :: Nil ) checkAnswer( @@ -221,10 +219,10 @@ class ClickHouseSingleSuite extends SparkClickHouseSingleTest { checkAnswer( spark.table(s"$db.$tbl").orderBy($"id"), - Row(11L, Date.valueOf("2022-04-11"), 1) :: - Row(12L, Date.valueOf("2022-04-12"), 2) :: - Row(21L, Date.valueOf("2022-04-21"), 1) :: - Row(22L, Date.valueOf("2022-04-22"), 2) :: Nil + Row(11L, date("2022-04-11"), 1) :: + Row(12L, date("2022-04-12"), 2) :: + Row(21L, date("2022-04-21"), 1) :: + Row(22L, date("2022-04-22"), 2) :: Nil ) checkAnswer( @@ -345,14 +343,14 @@ class ClickHouseSingleSuite extends SparkClickHouseSingleTest { checkAnswer( spark.table(s"$db.$tbl").sort("m"), Seq( - Row(1L, "1", Timestamp.valueOf("2021-01-01 10:10:10"), 1), - Row(2L, "2", Timestamp.valueOf("2022-02-02 10:10:10"), 2) + Row(1L, "1", timestamp("2021-01-01T10:10:10Z"), 1), + Row(2L, "2", timestamp("2022-02-02T10:10:10Z"), 2) ) ) checkAnswer( spark.table(s"$db.$tbl").filter($"id" > 1), - Row(2L, "2", Timestamp.valueOf("2022-02-02 10:10:10"), 2) :: Nil + Row(2L, "2", timestamp("2022-02-02T10:10:10Z"), 2) :: Nil ) assert(spark.table(s"$db.$tbl").filter($"id" > 1).count === 1) diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala index 8822917b..6264c21b 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.clickhouse.single import org.apache.spark.SparkConf import org.apache.spark.sql.clickhouse.SparkTest -import org.apache.spark.sql.functions.{month, to_timestamp} +import org.apache.spark.sql.functions.month import org.apache.spark.sql.types.StructType import xenon.clickhouse.base.ClickHouseSingleMixIn @@ -94,6 +94,28 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn { runClickHouseSQL(s"DROP DATABASE IF EXISTS $db") } + def withKVTable( + db: String, + tbl: String, + keyColDef: String = "Int32", + valueColDef: String + )(f: => Unit): Unit = + try { + runClickHouseSQL(s"CREATE DATABASE IF NOT EXISTS $db") + runClickHouseSQL( + s"""CREATE TABLE $db.$tbl ( + | key $keyColDef, + | value $valueColDef + |) ENGINE = MergeTree() + |ORDER BY key + |""".stripMargin + ) + f + } finally { + runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl") + runClickHouseSQL(s"DROP DATABASE IF EXISTS $db") + } + def withSimpleTable( db: String, tbl: String, @@ -121,10 +143,9 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn { if (writeData) { val tblSchema = spark.table(s"$db.$tbl").schema val dataDF = spark.createDataFrame(Seq( - (1L, "1", "2021-01-01 10:10:10"), - (2L, "2", "2022-02-02 10:10:10") + (1L, "1", timestamp("2021-01-01T10:10:10Z")), + (2L, "2", timestamp("2022-02-02T10:10:10Z")) )).toDF("id", "value", "create_time") - .withColumn("create_time", to_timestamp($"create_time")) .withColumn("m", month($"create_time")) .select($"id", $"value", $"create_time", $"m") diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/WriteDistributionAndOrderingSuite.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/WriteDistributionAndOrderingSuite.scala index dbbedb07..fe9ba535 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/WriteDistributionAndOrderingSuite.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/WriteDistributionAndOrderingSuite.scala @@ -19,7 +19,6 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{AnalysisException, Row} -import java.sql.Date import java.time.LocalDate class WriteDistributionAndOrderingSuite extends SparkClickHouseSingleTest { @@ -32,16 +31,16 @@ class WriteDistributionAndOrderingSuite extends SparkClickHouseSingleTest { private def write(): Unit = spark.range(3) .toDF("id") .withColumn("id", $"id".cast(StringType)) - .withColumn("load_date", lit(LocalDate.of(2022, 5, 27))) + .withColumn("load_date", lit(date("2022-05-27"))) .writeTo(s"$db.$tbl") .append private def check(): Unit = checkAnswer( spark.sql(s"SELECT id, load_date FROM $db.$tbl"), Seq( - Row("0", Date.valueOf("2022-05-27")), - Row("1", Date.valueOf("2022-05-27")), - Row("2", Date.valueOf("2022-05-27")) + Row("0", date("2022-05-27")), + Row("1", date("2022-05-27")), + Row("2", date("2022-05-27")) ) ) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/SQLHelper.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/SQLHelper.scala index 4a00f8fa..cd84abfd 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/SQLHelper.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/SQLHelper.scala @@ -15,8 +15,7 @@ package xenon.clickhouse import java.sql.{Date, Timestamp} -import java.time.{Instant, LocalDate, ZoneId} - +import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.connector.expressions.aggregate._ import org.apache.spark.sql.connector.expressions.NamedReference @@ -32,13 +31,14 @@ trait SQLHelper { def escapeSql(value: String): String = StringUtils.replace(value, "'", "''") def compileValue(value: Any)(implicit tz: ZoneId): Any = value match { - case stringValue: String => s"'${escapeSql(stringValue)}'" + case string: String => s"'${escapeSql(string)}'" case utf8: UTF8String => s"'${escapeSql(utf8.toString)}'" - case timestampValue: Timestamp => "'" + timestampValue + "'" - case timestampValue: Instant => s"'${dateTimeFmt.withZone(tz).format(timestampValue)}'" - case dateValue: Date => "'" + dateValue + "'" - case dateValue: LocalDate => s"'${dateFmt.format(dateValue)}'" - case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") + case instant: Instant => s"'${dateTimeFmt.withZone(tz).format(instant)}'" + case timestamp: Timestamp => s"'${legacyDateTimeFmt.format(timestamp)}'" + case localDateTime: LocalDateTime => s"'${dateTimeFmt.format(localDateTime)}'" + case legacyDate: Date => s"'${legacyDateFmt.format(legacyDate)}'" + case localDate: LocalDate => s"'${dateFmt.format(localDate)}'" + case array: Array[Any] => array.map(compileValue).mkString(",") case _ => value }