From 0feb9315472a95d354ab7b92a777dd8a99878593 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 3 Feb 2023 17:56:07 +0800 Subject: [PATCH] Spark: Use clickhouse java client to parse schema (#215) --- .../spark/sql/clickhouse/SchemaUtils.scala | 117 +++--- .../sql/clickhouse/SchemaUtilsSuite.scala | 336 ++++++++---------- 2 files changed, 203 insertions(+), 250 deletions(-) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala index 4d73dd2d..9cc34748 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala @@ -14,59 +14,59 @@ package org.apache.spark.sql.clickhouse -import scala.util.matching.Regex - +import com.clickhouse.client.ClickHouseDataType._ +import com.clickhouse.client.{ClickHouseColumn, ClickHouseDataType} import org.apache.spark.sql.types._ import xenon.clickhouse.exception.CHClientException object SchemaUtils { - // format: off - private[clickhouse] val arrayTypePattern: Regex = """^Array\((.+)\)$""".r - private[clickhouse] val mapTypePattern: Regex = """^Map\((\w+),\s*(.+)\)$""".r - private[clickhouse] val dateTypePattern: Regex = """^Date$""".r - private[clickhouse] val dateTimeTypePattern: Regex = """^DateTime(64)?(\((.*)\))?$""".r - private[clickhouse] val decimalTypePattern: Regex = """^Decimal\((\d+),\s*(\d+)\)$""".r - private[clickhouse] val decimalTypePattern2: Regex = """^Decimal(32|64|128|256)\((\d+)\)$""".r - private[clickhouse] val enumTypePattern: Regex = """^Enum(8|16)$""".r - private[clickhouse] val fixedStringTypePattern: Regex = """^FixedString\((\d+)\)$""".r - private[clickhouse] val nullableTypePattern: Regex = """^Nullable\((.*)\)""".r - private[clickhouse] val lowCardinalityTypePattern: Regex = """^LowCardinality\((.*)\)""".r - // format: on - - def fromClickHouseType(chType: String): (DataType, Boolean) = { - val (unwrappedChType, nullable) = unwrapNullable(unwrapLowCardinalityTypePattern(chType)) - val catalystType = unwrappedChType match { - case "String" | "UUID" | fixedStringTypePattern() | enumTypePattern(_) => StringType - case "Bool" => BooleanType - case "Int8" => ByteType - case "UInt8" | "Int16" => ShortType - case "UInt16" | "Int32" => IntegerType - case "UInt32" | "Int64" | "UInt64" | "IPv4" => LongType - case "Int128" | "Int256" | "UInt256" => - throw CHClientException(s"unsupported type: $chType") // not support - case "Float32" => FloatType - case "Float64" => DoubleType - case dateTypePattern() => DateType - case dateTimeTypePattern(_, _, _) => TimestampType - case decimalTypePattern(precision, scale) => DecimalType(precision.toInt, scale.toInt) - case decimalTypePattern2(w, scale) => w match { - case "32" => DecimalType(9, scale.toInt) - case "64" => DecimalType(18, scale.toInt) - case "128" => DecimalType(38, scale.toInt) - case "256" => DecimalType(76, scale.toInt) // throw exception, spark support precision up to 38 - } - case arrayTypePattern(nestedChType) => - val (_type, _nullable) = fromClickHouseType(nestedChType) - ArrayType(_type, _nullable) - case mapTypePattern(keyChType, valueChType) => - val (_keyType, _keyNullable) = fromClickHouseType(keyChType) - require(!_keyNullable, s"Illegal type: $keyChType, the key type of Map should not be nullable") - val (_valueType, _valueNullable) = fromClickHouseType(valueChType) - MapType(_keyType, _valueType, _valueNullable) - case _ => throw CHClientException(s"Unsupported type: $chType") + def fromClickHouseType(chColumn: ClickHouseColumn): (DataType, Boolean) = { + val catalystType = chColumn.getDataType match { + case Nothing => NullType + case Bool => BooleanType + case String | FixedString | JSON | UUID | Enum | Enum8 | Enum16 | IPv4 | IPv6 => StringType + case Int8 => ByteType + case UInt8 | Int16 => ShortType + case UInt16 | Int32 => IntegerType + case UInt32 | Int64 | UInt64 => LongType + case Int128 | UInt128 | Int256 | UInt256 => DecimalType(38, 0) + case Float32 => FloatType + case Float64 => DoubleType + case Date | Date32 => DateType + case DateTime | DateTime32 | DateTime64 => TimestampType + case ClickHouseDataType.Decimal if chColumn.getScale <= 38 => + DecimalType(chColumn.getPrecision, chColumn.getScale) + case Decimal32 => DecimalType(9, chColumn.getScale) + case Decimal64 => DecimalType(18, chColumn.getScale) + case Decimal128 => DecimalType(38, chColumn.getScale) + case IntervalYear => YearMonthIntervalType(YearMonthIntervalType.YEAR) + case IntervalMonth => YearMonthIntervalType(YearMonthIntervalType.MONTH) + case IntervalDay => DayTimeIntervalType(DayTimeIntervalType.DAY) + case IntervalHour => DayTimeIntervalType(DayTimeIntervalType.HOUR) + case IntervalMinute => DayTimeIntervalType(DayTimeIntervalType.MINUTE) + case IntervalSecond => DayTimeIntervalType(DayTimeIntervalType.SECOND) + case Array => + val elementChCols = chColumn.getNestedColumns + assert(elementChCols.size == 1) + val (elementType, elementNullable) = fromClickHouseType(elementChCols.get(0)) + ArrayType(elementType, elementNullable) + case Map => + val kvChCols = chColumn.getNestedColumns + assert(kvChCols.size == 2) + val (keyChType, valueChType) = (kvChCols.get(0), kvChCols.get(1)) + val (keyType, keyNullable) = fromClickHouseType(keyChType) + require( + !keyNullable, + s"Illegal type: ${keyChType.getOriginalTypeName}, the key type of Map should not be nullable" + ) + val (valueType, valueNullable) = fromClickHouseType(valueChType) + MapType(keyType, valueType, valueNullable) + case Object | Nested | Tuple | Point | Polygon | MultiPolygon | Ring | IntervalQuarter | IntervalWeek | + Decimal256 | AggregateFunction | SimpleAggregateFunction => + throw CHClientException(s"Unsupported type: ${chColumn.getOriginalTypeName}") } - (catalystType, nullable) + (catalystType, chColumn.isNullable) } def toClickHouseType(catalystType: DataType): String = @@ -92,11 +92,12 @@ object SchemaUtils { } def fromClickHouseSchema(chSchema: Seq[(String, String)]): StructType = { - val structFields = chSchema - .map { case (name, maybeNullableType) => - val (catalyst, nullable) = fromClickHouseType(maybeNullableType) - StructField(name, catalyst, nullable) - } + val structFields = chSchema.map { case (name, maybeNullableType) => + val chCols = ClickHouseColumn.parse(s"`$name` $maybeNullableType") + assert(chCols.size == 1) + val (sparkType, nullable) = fromClickHouseType(chCols.get(0)) + StructField(name, sparkType, nullable) + } StructType(structFields) } @@ -111,16 +112,4 @@ object SchemaUtils { if (nullable) wrapNullable(chType) else chType private[clickhouse] def wrapNullable(chType: String): String = s"Nullable($chType)" - - private[clickhouse] def unwrapNullable(maybeNullableType: String): (String, Boolean) = - maybeNullableType match { - case nullableTypePattern(typeName) => (typeName, true) - case _ => (maybeNullableType, false) - } - - private[clickhouse] def unwrapLowCardinalityTypePattern(maybeLowCardinalityType: String): String = - maybeLowCardinalityType match { - case lowCardinalityTypePattern(typeName) => typeName - case _ => maybeLowCardinalityType - } } diff --git a/spark-3.3/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/SchemaUtilsSuite.scala b/spark-3.3/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/SchemaUtilsSuite.scala index 754ff8b8..738ca71d 100644 --- a/spark-3.3/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/SchemaUtilsSuite.scala +++ b/spark-3.3/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/SchemaUtilsSuite.scala @@ -14,207 +14,171 @@ package org.apache.spark.sql.clickhouse +import com.clickhouse.client.ClickHouseColumn import org.apache.spark.sql.clickhouse.SchemaUtils._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ import org.scalatest.funsuite.AnyFunSuite class SchemaUtilsSuite extends AnyFunSuite { - test("regex ArrayType") { - "Array(String)" match { - case arrayTypePattern(nestType) => assert("String" == nestType) - case _ => fail() - } - - "Array(Nullable(String))" match { - case arrayTypePattern(nestType) => assert("Nullable(String)" == nestType) - case _ => fail() - } - - "Array(Array(String))" match { - case arrayTypePattern(nestType) => assert("Array(String)" == nestType) - case _ => fail() - } - - "array(String)" match { - case arrayTypePattern(_) => fail() - case _ => - } - - "Array(String" match { - case arrayTypePattern(_) => fail() - case _ => - } - } - - test("regex MapType") { - "Map(String, String)" match { - case mapTypePattern(keyType, valueType) => - assert("String" == keyType) - assert("String" == valueType) - case _ => fail() - } - "Map(String,Int32)" match { - case mapTypePattern(keyType, valueType) => - assert("String" == keyType) - assert("Int32" == valueType) - case _ => fail() - } - "Map(String,Nullable(UInt32))" match { - case mapTypePattern(keyType, valueType) => - assert("String" == keyType) - assert("Nullable(UInt32)" == valueType) - case _ => fail() - } - "Map(String,)" match { - case mapTypePattern(_) => fail() - case _ => - } - } - - test("regex DateType") { - "Date" match { - case dateTypePattern() => - case _ => fail() - } - - "DT" match { - case dateTypePattern(_) => fail() - case _ => - } - } - test("regex DateTimeType") { - "DateTime" match { - case dateTimeTypePattern(_, _, _) => - case _ => fail() - } - - "DateTime(Asia/Shanghai)" match { - case dateTimeTypePattern(_, _, tz) => assert("Asia/Shanghai" == tz) - case _ => fail() - } + case class TestBean(chTypeStr: String, sparkType: DataType, nullable: Boolean) - "DateTime64" match { - case dateTimeTypePattern(_64, _, _) => assert("64" == _64) - case _ => fail() + private def assertPositive(positives: TestBean*): Unit = + positives.foreach { case TestBean(chTypeStr, expectedSparkType, expectedNullable) => + test(s"ch2spark - $chTypeStr") { + val chCols = ClickHouseColumn.parse(s"`col` $chTypeStr") + assert(chCols.size == 1) + val (actualSparkType, actualNullable) = fromClickHouseType(chCols.get(0)) + assert(actualSparkType === expectedSparkType) + assert(actualNullable === expectedNullable) + } } - "DateTime64(Europe/Moscow)" match { - case dateTimeTypePattern(_64, _, tz) => - assert("64" == _64) - assert("Europe/Moscow" == tz) - case _ => fail() - } - - "DT" match { - case dateTimeTypePattern(_) => fail() - case _ => + private def assertNegative(negatives: String*): Unit = negatives.foreach { chTypeStr => + test(s"ch2spark - $chTypeStr") { + intercept[Exception] { + ClickHouseColumn.parse(s"`col` $chTypeStr") + val chCols = ClickHouseColumn.parse(s"`col` $chTypeStr") + assert(chCols.size == 1) + fromClickHouseType(chCols.get(0)) + } } } - test("DecimalType") { - "Decimal(1,2)" match { - case decimalTypePattern(p, s) => - assert("1" == p) - assert("2" == s) - case _ => fail() - } - - "Decimal" match { - case decimalTypePattern(_, _) => fail() - case _ => - } - - "Decimal(String" match { - case decimalTypePattern(_, _) => fail() - case _ => - } - } - - test("regex DecimalType - 2") { - "Decimal32(5)" match { - case decimalTypePattern2(a, s) => assert(("32", "5") == (a, s)) - case _ => fail() - } - - "Decimal64(5)" match { - case decimalTypePattern2(a, s) => assert(("64", "5") == (a, s)) - case _ => fail() - } - - "Decimal128(5)" match { - case decimalTypePattern2(a, s) => assert(("128", "5") == (a, s)) - case _ => fail() - } - - "Decimal256(5)" match { - case decimalTypePattern2(a, s) => assert(("256", "5") == (a, s)) - case _ => fail() - } - - "Decimal32(5" match { - case decimalTypePattern2(a, s) => fail() - case _ => - } - } - - test("regex FixedStringType") { - "FixedString(5)" match { - case fixedStringTypePattern(l) => assert("5" == l) - case _ => fail() - } - - "fixedString(5)" match { - case fixedStringTypePattern(_) => fail() - case _ => - } + assertPositive( + TestBean( + "Array(String)", + ArrayType(StringType, containsNull = false), + nullable = false + ), + TestBean( + "Array(Nullable(String))", + ArrayType(StringType, containsNull = true), + nullable = false + ), + TestBean( + "Array(Array(String))", + ArrayType(ArrayType(StringType, containsNull = false), containsNull = false), + nullable = false + ) + ) + + assertNegative( + "array(String)", + "Array(String" + ) + + assertPositive( + TestBean( + "Map(String, String)", + MapType(StringType, StringType, valueContainsNull = false), + nullable = false + ), + TestBean( + "Map(String,Int32)", + MapType(StringType, IntegerType, valueContainsNull = false), + nullable = false + ), + TestBean( + "Map(String,Nullable(UInt32))", + MapType(StringType, LongType, valueContainsNull = true), + nullable = false + ) + ) + + assertNegative( + "Map(String,)" + ) + + assertPositive( + TestBean( + "Date", + DateType, + nullable = false + ), + TestBean( + "DateTime", + TimestampType, + nullable = false + ), + TestBean( + "DateTime(Asia/Shanghai)", + TimestampType, + nullable = false + ), + TestBean( + "DateTime64", + TimestampType, + nullable = false + ) + // TestBean( + // "DateTime64(Europe/Moscow)", + // TimestampType, + // nullable = false + // ), + ) + + assertNegative( + "DT" + ) + + assertPositive( + TestBean( + "Decimal(2,1)", + DecimalType(2, 1), + nullable = false + ), + TestBean( + "Decimal32(5)", + DecimalType(9, 5), + nullable = false + ), + TestBean( + "Decimal64(5)", + DecimalType(18, 5), + nullable = false + ), + TestBean( + "Decimal128(5)", + DecimalType(38, 5), + nullable = false + ) + ) + + assertNegative( + "Decimal", // overflow + "Decimal256(5)", // overflow + "Decimal(String" + // "Decimal32(5" + ) + + assertPositive( + TestBean( + "String", + StringType, + nullable = false + ), + TestBean( + "FixedString(5)", + StringType, + nullable = false + ) + ) - "String" match { - case decimalTypePattern2(a, s) => fail() - case _ => - } - } + assertNegative("fixedString(5)") - test("testNullableTypeRegex") { - assert(("String", true) == unwrapNullable("Nullable(String)")) - assert(("nullable(String)", false) == unwrapNullable("nullable(String)")) - assert(("String", false) == unwrapNullable("String")) - } - test("testToClickHouseSchema") { + test("spark2ch") { val catalystSchema = StructType.fromString( """{ - | "fields": [ - | { - | "name": "id", - | "type": "integer", - | "nullable": false, - | "metadata": {} - | }, - | { - | "name": "food", - | "type": "string", - | "nullable": false, - | "metadata": { - | "comment": "food" - | } - | }, - | { - | "name": "price", - | "type": "decimal(2,1)", - | "nullable": false, - | "metadata": { - | "comment": "price usd" - | } - | }, - | { - | "name": "remark", - | "type": "string", - | "nullable": true, - | "metadata": {} - | } - | ], - | "type": "struct" - |}""".stripMargin + | "type": "struct", + | "fields": [ + | {"name": "id", "type": "integer", "nullable": false, "metadata": {}}, + | {"name": "food", "type": "string", "nullable": false, "metadata": {"comment": "food"}}, + | {"name": "price", "type": "decimal(2,1)", "nullable": false, "metadata": {"comment": "price usd"}}, + | {"name": "remark", "type": "string", "nullable": true, "metadata": {}} + | ] + |} + |""".stripMargin ) assert(Seq( ("id", "Int32", ""),