Skip to content

Commit

Permalink
Spark: Use clickhouse java client to parse schema (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 authored Feb 3, 2023
1 parent c314a5a commit 0feb931
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,59 @@

package org.apache.spark.sql.clickhouse

import scala.util.matching.Regex

import com.clickhouse.client.ClickHouseDataType._
import com.clickhouse.client.{ClickHouseColumn, ClickHouseDataType}
import org.apache.spark.sql.types._
import xenon.clickhouse.exception.CHClientException

object SchemaUtils {

// format: off
private[clickhouse] val arrayTypePattern: Regex = """^Array\((.+)\)$""".r
private[clickhouse] val mapTypePattern: Regex = """^Map\((\w+),\s*(.+)\)$""".r
private[clickhouse] val dateTypePattern: Regex = """^Date$""".r
private[clickhouse] val dateTimeTypePattern: Regex = """^DateTime(64)?(\((.*)\))?$""".r
private[clickhouse] val decimalTypePattern: Regex = """^Decimal\((\d+),\s*(\d+)\)$""".r
private[clickhouse] val decimalTypePattern2: Regex = """^Decimal(32|64|128|256)\((\d+)\)$""".r
private[clickhouse] val enumTypePattern: Regex = """^Enum(8|16)$""".r
private[clickhouse] val fixedStringTypePattern: Regex = """^FixedString\((\d+)\)$""".r
private[clickhouse] val nullableTypePattern: Regex = """^Nullable\((.*)\)""".r
private[clickhouse] val lowCardinalityTypePattern: Regex = """^LowCardinality\((.*)\)""".r
// format: on

def fromClickHouseType(chType: String): (DataType, Boolean) = {
val (unwrappedChType, nullable) = unwrapNullable(unwrapLowCardinalityTypePattern(chType))
val catalystType = unwrappedChType match {
case "String" | "UUID" | fixedStringTypePattern() | enumTypePattern(_) => StringType
case "Bool" => BooleanType
case "Int8" => ByteType
case "UInt8" | "Int16" => ShortType
case "UInt16" | "Int32" => IntegerType
case "UInt32" | "Int64" | "UInt64" | "IPv4" => LongType
case "Int128" | "Int256" | "UInt256" =>
throw CHClientException(s"unsupported type: $chType") // not support
case "Float32" => FloatType
case "Float64" => DoubleType
case dateTypePattern() => DateType
case dateTimeTypePattern(_, _, _) => TimestampType
case decimalTypePattern(precision, scale) => DecimalType(precision.toInt, scale.toInt)
case decimalTypePattern2(w, scale) => w match {
case "32" => DecimalType(9, scale.toInt)
case "64" => DecimalType(18, scale.toInt)
case "128" => DecimalType(38, scale.toInt)
case "256" => DecimalType(76, scale.toInt) // throw exception, spark support precision up to 38
}
case arrayTypePattern(nestedChType) =>
val (_type, _nullable) = fromClickHouseType(nestedChType)
ArrayType(_type, _nullable)
case mapTypePattern(keyChType, valueChType) =>
val (_keyType, _keyNullable) = fromClickHouseType(keyChType)
require(!_keyNullable, s"Illegal type: $keyChType, the key type of Map should not be nullable")
val (_valueType, _valueNullable) = fromClickHouseType(valueChType)
MapType(_keyType, _valueType, _valueNullable)
case _ => throw CHClientException(s"Unsupported type: $chType")
def fromClickHouseType(chColumn: ClickHouseColumn): (DataType, Boolean) = {
val catalystType = chColumn.getDataType match {
case Nothing => NullType
case Bool => BooleanType
case String | FixedString | JSON | UUID | Enum | Enum8 | Enum16 | IPv4 | IPv6 => StringType
case Int8 => ByteType
case UInt8 | Int16 => ShortType
case UInt16 | Int32 => IntegerType
case UInt32 | Int64 | UInt64 => LongType
case Int128 | UInt128 | Int256 | UInt256 => DecimalType(38, 0)
case Float32 => FloatType
case Float64 => DoubleType
case Date | Date32 => DateType
case DateTime | DateTime32 | DateTime64 => TimestampType
case ClickHouseDataType.Decimal if chColumn.getScale <= 38 =>
DecimalType(chColumn.getPrecision, chColumn.getScale)
case Decimal32 => DecimalType(9, chColumn.getScale)
case Decimal64 => DecimalType(18, chColumn.getScale)
case Decimal128 => DecimalType(38, chColumn.getScale)
case IntervalYear => YearMonthIntervalType(YearMonthIntervalType.YEAR)
case IntervalMonth => YearMonthIntervalType(YearMonthIntervalType.MONTH)
case IntervalDay => DayTimeIntervalType(DayTimeIntervalType.DAY)
case IntervalHour => DayTimeIntervalType(DayTimeIntervalType.HOUR)
case IntervalMinute => DayTimeIntervalType(DayTimeIntervalType.MINUTE)
case IntervalSecond => DayTimeIntervalType(DayTimeIntervalType.SECOND)
case Array =>
val elementChCols = chColumn.getNestedColumns
assert(elementChCols.size == 1)
val (elementType, elementNullable) = fromClickHouseType(elementChCols.get(0))
ArrayType(elementType, elementNullable)
case Map =>
val kvChCols = chColumn.getNestedColumns
assert(kvChCols.size == 2)
val (keyChType, valueChType) = (kvChCols.get(0), kvChCols.get(1))
val (keyType, keyNullable) = fromClickHouseType(keyChType)
require(
!keyNullable,
s"Illegal type: ${keyChType.getOriginalTypeName}, the key type of Map should not be nullable"
)
val (valueType, valueNullable) = fromClickHouseType(valueChType)
MapType(keyType, valueType, valueNullable)
case Object | Nested | Tuple | Point | Polygon | MultiPolygon | Ring | IntervalQuarter | IntervalWeek |
Decimal256 | AggregateFunction | SimpleAggregateFunction =>
throw CHClientException(s"Unsupported type: ${chColumn.getOriginalTypeName}")
}
(catalystType, nullable)
(catalystType, chColumn.isNullable)
}

def toClickHouseType(catalystType: DataType): String =
Expand All @@ -92,11 +92,12 @@ object SchemaUtils {
}

def fromClickHouseSchema(chSchema: Seq[(String, String)]): StructType = {
val structFields = chSchema
.map { case (name, maybeNullableType) =>
val (catalyst, nullable) = fromClickHouseType(maybeNullableType)
StructField(name, catalyst, nullable)
}
val structFields = chSchema.map { case (name, maybeNullableType) =>
val chCols = ClickHouseColumn.parse(s"`$name` $maybeNullableType")
assert(chCols.size == 1)
val (sparkType, nullable) = fromClickHouseType(chCols.get(0))
StructField(name, sparkType, nullable)
}
StructType(structFields)
}

Expand All @@ -111,16 +112,4 @@ object SchemaUtils {
if (nullable) wrapNullable(chType) else chType

private[clickhouse] def wrapNullable(chType: String): String = s"Nullable($chType)"

private[clickhouse] def unwrapNullable(maybeNullableType: String): (String, Boolean) =
maybeNullableType match {
case nullableTypePattern(typeName) => (typeName, true)
case _ => (maybeNullableType, false)
}

private[clickhouse] def unwrapLowCardinalityTypePattern(maybeLowCardinalityType: String): String =
maybeLowCardinalityType match {
case lowCardinalityTypePattern(typeName) => typeName
case _ => maybeLowCardinalityType
}
}
Loading

0 comments on commit 0feb931

Please sign in to comment.