Skip to content

Commit

Permalink
Spark: Fix timestamp value transformation (#216)
Browse files Browse the repository at this point in the history
* Spark: Fix timestamp value transformation

* nit
  • Loading branch information
pan3793 authored Feb 3, 2023
1 parent 0feb931 commit 8736a97
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,25 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{DataFrame, QueryTest}
import xenon.clickhouse.ClickHouseCommandRunner

import java.sql.{Date, Timestamp}
import java.time.Instant

trait SparkTest extends QueryTest with SharedSparkSession {

def cmdRunnerOptions: Map[String, String]

/**
* @param text format yyyy-[m]m-[d]d
* @return A SQL Date
*/
def date(text: String): Date = Date.valueOf(text)

/**
* @param text format 2007-12-03T10:15:30.00Z
* @return A SQL Timestamp
*/
def timestamp(text: String): Timestamp = Timestamp.from(Instant.parse(text))

override protected def sparkConf: SparkConf = super.sparkConf
.setMaster("local[2]")
.setAppName("spark-ut")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

import java.sql.Timestamp

abstract class BaseClusterWriteSuite extends SparkClickHouseClusterTest {

test("clickhouse write cluster") {
Expand All @@ -38,28 +36,28 @@ abstract class BaseClusterWriteSuite extends SparkClickHouseClusterTest {
.table(s"$db.$tbl_dist")
.select("create_time", "y", "m", "id", "value"),
Seq(
Row(Timestamp.valueOf("2021-01-01 10:10:10"), 2021, 1, 1L, "1"),
Row(Timestamp.valueOf("2022-02-02 10:10:10"), 2022, 2, 2L, "2"),
Row(Timestamp.valueOf("2023-03-03 10:10:10"), 2023, 3, 3L, "3"),
Row(Timestamp.valueOf("2024-04-04 10:10:10"), 2024, 4, 4L, "4")
Row(timestamp("2021-01-01T10:10:10Z"), 2021, 1, 1L, "1"),
Row(timestamp("2022-02-02T10:10:10Z"), 2022, 2, 2L, "2"),
Row(timestamp("2023-03-03T10:10:10Z"), 2023, 3, 3L, "3"),
Row(timestamp("2024-04-04T10:10:10Z"), 2024, 4, 4L, "4")
)
)

checkAnswer(
spark.table(s"clickhouse_s1r1.$db.$tbl_local"),
Row(Timestamp.valueOf("2024-04-04 10:10:10"), 2024, 4, 4L, "4") :: Nil
Row(timestamp("2024-04-04T10:10:10Z"), 2024, 4, 4L, "4") :: Nil
)
checkAnswer(
spark.table(s"clickhouse_s1r2.$db.$tbl_local"),
Row(Timestamp.valueOf("2021-01-01 10:10:10"), 2021, 1, 1L, "1") :: Nil
Row(timestamp("2021-01-01T10:10:10Z"), 2021, 1, 1L, "1") :: Nil
)
checkAnswer(
spark.table(s"clickhouse_s2r1.$db.$tbl_local"),
Row(Timestamp.valueOf("2022-02-02 10:10:10"), 2022, 2, 2L, "2") :: Nil
Row(timestamp("2022-02-02T10:10:10Z"), 2022, 2, 2L, "2") :: Nil
)
checkAnswer(
spark.table(s"clickhouse_s2r2.$db.$tbl_local"),
Row(Timestamp.valueOf("2023-03-03 10:10:10"), 2023, 3, 3L, "3") :: Nil
Row(timestamp("2023-03-03T10:10:10Z"), 2023, 3, 3L, "3") :: Nil
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package org.apache.spark.sql.clickhouse.cluster

import org.apache.spark.SparkConf
import org.apache.spark.sql.clickhouse.SparkTest
import org.apache.spark.sql.functions.{month, to_timestamp, year}
import org.apache.spark.sql.functions.{month, year}
import xenon.clickhouse.base.ClickHouseClusterMixIn

trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn {
Expand Down Expand Up @@ -143,12 +143,11 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn {
if (writeData) {
val tblSchema = spark.table(s"$db.$tbl_dist").schema
val dataDF = spark.createDataFrame(Seq(
("2021-01-01 10:10:10", 1L, "1"),
("2022-02-02 10:10:10", 2L, "2"),
("2023-03-03 10:10:10", 3L, "3"),
("2024-04-04 10:10:10", 4L, "4")
(timestamp("2021-01-01T10:10:10Z"), 1L, "1"),
(timestamp("2022-02-02T10:10:10Z"), 2L, "2"),
(timestamp("2023-03-03T10:10:10Z"), 3L, "3"),
(timestamp("2024-04-04T10:10:10Z"), 4L, "4")
)).toDF("create_time", "id", "value")
.withColumn("create_time", to_timestamp($"create_time"))
.withColumn("y", year($"create_time"))
.withColumn("m", month($"create_time"))
.select($"create_time", $"y", $"m", $"id", $"value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes.{createArrayType, createMapType}
import org.apache.spark.sql.types._

import java.sql.Date

class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest {

test("write supported data types") {
Expand All @@ -28,7 +26,7 @@ class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest {
StructField("col_string", StringType, false) ::
StructField("col_date", DateType, false) ::
StructField("col_array_string", createArrayType(StringType, false), false) ::
StructField("col_map_string", createMapType(StringType, StringType, false), false) ::
StructField("col_map_string_string", createMapType(StringType, StringType, false), false) ::
Nil
)
val db = "t_w_s_db"
Expand All @@ -39,29 +37,43 @@ class ClickHouseDataTypeSuite extends SparkClickHouseSingleTest {
// assert(StructType(structFields) === tblSchema)

val dataDF = spark.createDataFrame(Seq(
(1L, "a", Date.valueOf("1996-06-06"), Seq("a", "b", "c"), Map("a" -> "x")),
(2L, "A", Date.valueOf("2022-04-12"), Seq("A", "B", "C"), Map("A" -> "X"))
)).toDF("id", "col_string", "col_date", "col_array_string", "col_map_string")
(1L, "a", date("1996-06-06"), Seq("a", "b", "c"), Map("a" -> "x")),
(2L, "A", date("2022-04-12"), Seq("A", "B", "C"), Map("A" -> "X"))
)).toDF("id", "col_string", "col_date", "col_array_string", "col_map_string_string")

spark.createDataFrame(dataDF.rdd, tblSchema)
.writeTo(s"$db.$tbl")
.append

checkAnswer(
spark.table(s"$db.$tbl").sort("id"),
Row(1L, "a", Date.valueOf("1996-06-06"), Seq("a", "b", "c"), Map("a" -> "x")) ::
Row(2L, "A", Date.valueOf("2022-04-12"), Seq("A", "B", "C"), Map("A" -> "X")) :: Nil
Row(1L, "a", date("1996-06-06"), Seq("a", "b", "c"), Map("a" -> "x")) ::
Row(2L, "A", date("2022-04-12"), Seq("A", "B", "C"), Map("A" -> "X")) :: Nil
)
}
}

test("write unsupported data types") {}

test("read supported data types") {}

test("read unsupported data types") {}

test("spark to clickhouse data type mappings") {}
test("DateType - DateTime") {
val db = "t_r_s_db"
val tbl = "t_r_s_tbl"
withKVTable(db, tbl, valueColDef = "DateTime") {
runClickHouseSQL(
s"""INSERT INTO $db.$tbl VALUES
|(1, '2021-01-01 01:01:01'),
|(2, '2022-02-02 02:02:02')
|""".stripMargin
)

test("clickhouse to spark data type mappings") {}
val data = spark.sql(s"SELECT key, value FROM $db.$tbl ORDER BY key")
checkAnswer(
data,
Row(1, timestamp("2021-01-01T01:01:01Z")) ::
Row(2, timestamp("2022-02-02T02:02:02Z")) :: Nil
)
checkAnswer(
data.filter("value > '2022-01-01 01:01:01'"),
Row(2, timestamp("2022-02-02T02:02:02Z")) :: Nil
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package org.apache.spark.sql.clickhouse.single
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

import java.sql.{Date, Timestamp}

class ClickHouseSingleSuite extends SparkClickHouseSingleTest {

import testImplicits._
Expand Down Expand Up @@ -124,18 +122,18 @@ class ClickHouseSingleSuite extends SparkClickHouseSingleTest {
|""".stripMargin
)
spark.createDataFrame(Seq(
(21L, Date.valueOf("2022-04-21")),
(22L, Date.valueOf("2022-04-22"))
(21L, date("2022-04-21")),
(22L, date("2022-04-22"))
))
.toDF("id", "date")
.writeTo(s"$db.$tbl").append

checkAnswer(
spark.table(s"$db.$tbl").orderBy($"id"),
Row(11L, Date.valueOf("2022-04-11")) ::
Row(12L, Date.valueOf("2022-04-12")) ::
Row(21L, Date.valueOf("2022-04-21")) ::
Row(22L, Date.valueOf("2022-04-22")) :: Nil
Row(11L, date("2022-04-11")) ::
Row(12L, date("2022-04-12")) ::
Row(21L, date("2022-04-21")) ::
Row(22L, date("2022-04-22")) :: Nil
)

checkAnswer(
Expand Down Expand Up @@ -221,10 +219,10 @@ class ClickHouseSingleSuite extends SparkClickHouseSingleTest {

checkAnswer(
spark.table(s"$db.$tbl").orderBy($"id"),
Row(11L, Date.valueOf("2022-04-11"), 1) ::
Row(12L, Date.valueOf("2022-04-12"), 2) ::
Row(21L, Date.valueOf("2022-04-21"), 1) ::
Row(22L, Date.valueOf("2022-04-22"), 2) :: Nil
Row(11L, date("2022-04-11"), 1) ::
Row(12L, date("2022-04-12"), 2) ::
Row(21L, date("2022-04-21"), 1) ::
Row(22L, date("2022-04-22"), 2) :: Nil
)

checkAnswer(
Expand Down Expand Up @@ -345,14 +343,14 @@ class ClickHouseSingleSuite extends SparkClickHouseSingleTest {
checkAnswer(
spark.table(s"$db.$tbl").sort("m"),
Seq(
Row(1L, "1", Timestamp.valueOf("2021-01-01 10:10:10"), 1),
Row(2L, "2", Timestamp.valueOf("2022-02-02 10:10:10"), 2)
Row(1L, "1", timestamp("2021-01-01T10:10:10Z"), 1),
Row(2L, "2", timestamp("2022-02-02T10:10:10Z"), 2)
)
)

checkAnswer(
spark.table(s"$db.$tbl").filter($"id" > 1),
Row(2L, "2", Timestamp.valueOf("2022-02-02 10:10:10"), 2) :: Nil
Row(2L, "2", timestamp("2022-02-02T10:10:10Z"), 2) :: Nil
)

assert(spark.table(s"$db.$tbl").filter($"id" > 1).count === 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package org.apache.spark.sql.clickhouse.single

import org.apache.spark.SparkConf
import org.apache.spark.sql.clickhouse.SparkTest
import org.apache.spark.sql.functions.{month, to_timestamp}
import org.apache.spark.sql.functions.month
import org.apache.spark.sql.types.StructType
import xenon.clickhouse.base.ClickHouseSingleMixIn

Expand Down Expand Up @@ -94,6 +94,28 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn {
runClickHouseSQL(s"DROP DATABASE IF EXISTS $db")
}

def withKVTable(
db: String,
tbl: String,
keyColDef: String = "Int32",
valueColDef: String
)(f: => Unit): Unit =
try {
runClickHouseSQL(s"CREATE DATABASE IF NOT EXISTS $db")
runClickHouseSQL(
s"""CREATE TABLE $db.$tbl (
| key $keyColDef,
| value $valueColDef
|) ENGINE = MergeTree()
|ORDER BY key
|""".stripMargin
)
f
} finally {
runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl")
runClickHouseSQL(s"DROP DATABASE IF EXISTS $db")
}

def withSimpleTable(
db: String,
tbl: String,
Expand Down Expand Up @@ -121,10 +143,9 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn {
if (writeData) {
val tblSchema = spark.table(s"$db.$tbl").schema
val dataDF = spark.createDataFrame(Seq(
(1L, "1", "2021-01-01 10:10:10"),
(2L, "2", "2022-02-02 10:10:10")
(1L, "1", timestamp("2021-01-01T10:10:10Z")),
(2L, "2", timestamp("2022-02-02T10:10:10Z"))
)).toDF("id", "value", "create_time")
.withColumn("create_time", to_timestamp($"create_time"))
.withColumn("m", month($"create_time"))
.select($"id", $"value", $"create_time", $"m")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{AnalysisException, Row}

import java.sql.Date
import java.time.LocalDate

class WriteDistributionAndOrderingSuite extends SparkClickHouseSingleTest {
Expand All @@ -32,16 +31,16 @@ class WriteDistributionAndOrderingSuite extends SparkClickHouseSingleTest {
private def write(): Unit = spark.range(3)
.toDF("id")
.withColumn("id", $"id".cast(StringType))
.withColumn("load_date", lit(LocalDate.of(2022, 5, 27)))
.withColumn("load_date", lit(date("2022-05-27")))
.writeTo(s"$db.$tbl")
.append

private def check(): Unit = checkAnswer(
spark.sql(s"SELECT id, load_date FROM $db.$tbl"),
Seq(
Row("0", Date.valueOf("2022-05-27")),
Row("1", Date.valueOf("2022-05-27")),
Row("2", Date.valueOf("2022-05-27"))
Row("0", date("2022-05-27")),
Row("1", date("2022-05-27")),
Row("2", date("2022-05-27"))
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
package xenon.clickhouse

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate, ZoneId}

import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.connector.expressions.aggregate._
import org.apache.spark.sql.connector.expressions.NamedReference
Expand All @@ -32,13 +31,14 @@ trait SQLHelper {
def escapeSql(value: String): String = StringUtils.replace(value, "'", "''")

def compileValue(value: Any)(implicit tz: ZoneId): Any = value match {
case stringValue: String => s"'${escapeSql(stringValue)}'"
case string: String => s"'${escapeSql(string)}'"
case utf8: UTF8String => s"'${escapeSql(utf8.toString)}'"
case timestampValue: Timestamp => "'" + timestampValue + "'"
case timestampValue: Instant => s"'${dateTimeFmt.withZone(tz).format(timestampValue)}'"
case dateValue: Date => "'" + dateValue + "'"
case dateValue: LocalDate => s"'${dateFmt.format(dateValue)}'"
case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
case instant: Instant => s"'${dateTimeFmt.withZone(tz).format(instant)}'"
case timestamp: Timestamp => s"'${legacyDateTimeFmt.format(timestamp)}'"
case localDateTime: LocalDateTime => s"'${dateTimeFmt.format(localDateTime)}'"
case legacyDate: Date => s"'${legacyDateFmt.format(legacyDate)}'"
case localDate: LocalDate => s"'${dateFmt.format(localDate)}'"
case array: Array[Any] => array.map(compileValue).mkString(",")
case _ => value
}

Expand Down

0 comments on commit 8736a97

Please sign in to comment.