Skip to content

Commit

Permalink
Spark 3.4: Adapt to hash function under clickhouse-core
Browse files Browse the repository at this point in the history
  • Loading branch information
Yxang committed Jul 25, 2023
1 parent 85a025f commit 4e201d6
Show file tree
Hide file tree
Showing 16 changed files with 60 additions and 493 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package org.apache.spark.sql.clickhouse.cluster

import org.apache.spark.sql.clickhouse.TestUtils.om
import xenon.clickhouse.func.{CompositeFunctionRegistry, DynamicFunctionRegistry, StaticFunctionRegistry}
import xenon.clickhouse.func.clickhouse.ClickHouseXxHash64Shard
import xenon.clickhouse.func.{
ClickHouseXxHash64Shard,
CompositeFunctionRegistry,
DynamicFunctionRegistry,
StaticFunctionRegistry
}

import java.lang.{Long => JLong}

Expand All @@ -30,15 +34,6 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest {
new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry))
}

def product[A](xs: Seq[Seq[A]]): Seq[Seq[A]] =
xs.toList match {
case Nil => Seq(Seq())
case head :: tail => for {
h <- head
t <- product(tail)
} yield h +: t
}

def runTest(funcSparkName: String, funcCkName: String, stringVal: String): Unit = {
val sparkResult = spark.sql(
s"""SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import xenon.clickhouse.Constants._
import xenon.clickhouse.client.NodeClient
import xenon.clickhouse.exception.CHClientException
import xenon.clickhouse.exception.ClickHouseErrCode._
import xenon.clickhouse.func.clickhouse.ClickHouseXxHash64Shard
import xenon.clickhouse.func.{FunctionRegistry, _}
import xenon.clickhouse.func.{ClickHouseXxHash64Shard, FunctionRegistry, _}
import xenon.clickhouse.spec._

import java.time.ZoneId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,15 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import io.netty.buffer.{ByteBuf, Unpooled}
import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.func.MultiArgsHash
import xenon.clickhouse.func.clickhouse.cityhash.{CityHash_v1_0_2, UInt128}
import xenon.clickhouse.hash

object CityHash64 extends MultiArgsHash {
object CityHash64 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L694

override protected def funcName: String = "clickhouse_cityHash64"
override val ckFuncNames: Array[String] = Array("cityHash64")

def convertToByteBuf(array: Array[Byte]): ByteBuf = {
val byteBuf = Unpooled.buffer(array.length).writeBytes(array)
byteBuf
}

override def invokeBase(value: UTF8String): Long = {
// ignore UInt64 vs Int64
val data = value.getBytes
CityHash_v1_0_2.CityHash64(convertToByteBuf(data), 0, data.length)
}

override def combineHashes(v1: Long, v2: Long): Long = CityHash_v1_0_2.Hash128to64(new UInt128(v1, v2))
override def applyHash(input: Array[Any]): Long = hash.CityHash64(input)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

import java.time.LocalDate
import java.time.format.DateTimeFormatter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import xenon.clickhouse.func.clickhouse._

import scala.collection.mutable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

import java.sql.{Date, Timestamp}
import java.sql.Timestamp
import java.text.SimpleDateFormat

object Hours extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

object Mod extends UnboundFunction with ScalarFunction[Long] with ClickhouseEquivFunction {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

import java.time.LocalDate
import java.time.format.DateTimeFormatter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,41 @@ import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFu
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

abstract class MultiArgsHash extends UnboundFunction with ClickhouseEquivFunction {
abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivFunction {

def applyHash(input: Array[Any]): Long

protected def funcName: String

override val ckFuncNames: Array[String]

override def description: String = s"$name: (value: string, ...) => hash_value: long"

private def isExceptedType(dt: DataType): Boolean =
dt.isInstanceOf[StringType]

final override def name: String = funcName

final override def bind(inputType: StructType): BoundFunction = {
val inputDataTypes = inputType.fields.map(_.dataType)
if (inputDataTypes.forall(isExceptedType)) new ScalarFunction[Long] {
override def inputTypes(): Array[DataType] = inputDataTypes
override def name: String = funcName
override def canonicalName: String = s"clickhouse.$name"
override def resultType: DataType = LongType
override def toString: String = name
override def produceResult(input: InternalRow): Long = {
val inputStrings: Seq[UTF8String] =
input.toSeq(Seq.fill(input.numFields)(StringType)).asInstanceOf[Seq[UTF8String]]
inputStrings.map(invokeBase).reduce(combineHashes)
if (inputDataTypes.forall(isExceptedType)) {
// need to new a ScalarFunction instance for each bind,
// because we do not know the number of arguments in advance
new ScalarFunction[Long] {
override def inputTypes(): Array[DataType] = inputDataTypes
override def name: String = funcName
override def canonicalName: String = s"clickhouse.$name"
override def resultType: DataType = LongType
override def toString: String = name
override def produceResult(input: InternalRow): Long = {
val inputStrings: Array[Any] =
input.toSeq(Seq.fill(input.numFields)(StringType)).asInstanceOf[Seq[UTF8String]].toArray
.map(_.getBytes)
applyHash(inputStrings)
}
}
}
else throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description")
} else throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description")

}

protected def funcName: String
override val ckFuncNames: Array[String]
override def description: String = s"$name: (value: string, ...) => hash_value: long"
def invokeBase(value: UTF8String): Long
def combineHashes(v1: Long, v2: Long): Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,25 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.commons.codec.digest.{MurmurHash2, MurmurHash3}
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.func.{ClickhouseEquivFunction, MultiArgsHash, Util}
import xenon.clickhouse.hash
import xenon.clickhouse.hash.HashUtils

object MurmurHash2_64 extends MultiArgsHash {
object MurmurHash2_64 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L460

override protected def funcName: String = "clickhouse_murmurHash2_64"
override val ckFuncNames: Array[String] = Array("murmurHash2_64")

override def invokeBase(value: UTF8String): Long = {
// ignore UInt64 vs Int64
val data = value.getBytes
MurmurHash2.hash64(data, data.length, 0)
}

override def combineHashes(v1: Long, v2: Long): Long = Util.intHash64Impl(v1) ^ v2
override def applyHash(input: Array[Any]): Long = hash.Murmurhash2_64(input)
}

object MurmurHash2_32 extends MultiArgsHash {
object MurmurHash2_32 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519

override protected def funcName: String = "clickhouse_murmurHash2_32"
override val ckFuncNames: Array[String] = Array("murmurHash2_32")

override def invokeBase(value: UTF8String): Long = {
val data = value.getBytes
val v = MurmurHash2.hash32(data, data.length, 0)
Util.toUInt32Range(v)
}

override def combineHashes(v1: Long, v2: Long): Long = Util.toUInt32Range(Util.int32Impl(v1) ^ v2)
override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash2_32(input))
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,25 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.commons.codec.digest.MurmurHash3
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.func.{ClickhouseEquivFunction, MultiArgsHash, Util}
import xenon.clickhouse.hash
import xenon.clickhouse.hash.HashUtils

object MurmurHash3_64 extends MultiArgsHash {
object MurmurHash3_64 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L543

override protected def funcName: String = "clickhouse_murmurHash3_64"
override val ckFuncNames: Array[String] = Array("murmurHash3_64")

override def invokeBase(value: UTF8String): Long = {
// ignore UInt64 vs Int64
val data = value.getBytes
val hashes = MurmurHash3.hash128x64(data, 0, data.length, 0)
hashes(0) ^ hashes(1)
}

override def combineHashes(v1: Long, v2: Long): Long = Util.intHash64Impl(v1) ^ v2
override def applyHash(input: Array[Any]): Long = hash.Murmurhash3_64(input)
}

object MurmurHash3_32 extends MultiArgsHash {
object MurmurHash3_32 extends MultiStringArgsHash {
// https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519

override protected def funcName: String = "clickhouse_murmurHash3_32"
override val ckFuncNames: Array[String] = Array("murmurHash3_32")

override def invokeBase(value: UTF8String): Long = {
val data = value.getBytes
val v = MurmurHash3.hash32x86(data, 0, data.length, 0)
Util.toUInt32Range(v)
}

override def combineHashes(v1: Long, v2: Long): Long = Util.toUInt32Range(Util.int32Impl(v1) ^ v2)
override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash3_32(input))
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.catalyst.expressions.XxHash64Function
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import xenon.clickhouse.func.ClickhouseEquivFunction
import xenon.clickhouse.spec.{ClusterSpec, ShardUtils}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
* limitations under the License.
*/

package xenon.clickhouse.func.clickhouse
package xenon.clickhouse.func

import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction}
import org.apache.spark.sql.types._
import xenon.clickhouse.func.ClickhouseEquivFunction

import java.time.LocalDate
import java.time.format.DateTimeFormatter
Expand Down
Loading

0 comments on commit 4e201d6

Please sign in to comment.