From 4e201d61cfd9ca6ab3323c2aae6b1555f28c7e78 Mon Sep 17 00:00:00 2001 From: Xinyuan Yang Date: Tue, 25 Jul 2023 18:31:57 +0800 Subject: [PATCH] Spark 3.4: Adapt to hash function under clickhouse-core --- .../ClickHouseClusterHashUDFSuite.scala | 17 +- .../xenon/clickhouse/ClickHouseCatalog.scala | 3 +- .../func/{clickhouse => }/CityHash64.scala | 22 +- .../func/{clickhouse => }/Days.scala | 3 +- .../clickhouse/func/FunctionRegistry.scala | 1 - .../func/{clickhouse => }/Hours.scala | 5 +- .../func/{clickhouse => }/Mod.scala | 3 +- .../func/{clickhouse => }/Months.scala | 3 +- ...gsHash.scala => MultiStringArgsHash.scala} | 45 ++- .../func/{clickhouse => }/MurmurHash2.scala | 29 +- .../func/{clickhouse => }/MurmurHash3.scala | 30 +- .../func/{clickhouse => }/XxHash64.scala | 3 +- .../func/{clickhouse => }/Years.scala | 3 +- .../clickhouse/cityhash/CityHash_v1_0_2.java | 344 ------------------ .../func/clickhouse/cityhash/UInt128.java | 34 -- .../clickhouse/FunctionRegistrySuite.scala | 8 +- 16 files changed, 60 insertions(+), 493 deletions(-) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/CityHash64.scala (52%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/Days.scala (95%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/Hours.scala (93%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/Mod.scala (96%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/Months.scala (95%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{MultiArgsHash.scala => MultiStringArgsHash.scala} (55%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/MurmurHash2.scala (52%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/MurmurHash3.scala (51%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/XxHash64.scala (97%) rename spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/{clickhouse => }/Years.scala (95%) delete mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/cityhash/CityHash_v1_0_2.java delete mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/cityhash/UInt128.java diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala index adf3d9de..65f667b2 100644 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterHashUDFSuite.scala @@ -15,8 +15,12 @@ package org.apache.spark.sql.clickhouse.cluster import org.apache.spark.sql.clickhouse.TestUtils.om -import xenon.clickhouse.func.{CompositeFunctionRegistry, DynamicFunctionRegistry, StaticFunctionRegistry} -import xenon.clickhouse.func.clickhouse.ClickHouseXxHash64Shard +import xenon.clickhouse.func.{ + ClickHouseXxHash64Shard, + CompositeFunctionRegistry, + DynamicFunctionRegistry, + StaticFunctionRegistry +} import java.lang.{Long => JLong} @@ -30,15 +34,6 @@ class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest { new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry)) } - def product[A](xs: Seq[Seq[A]]): Seq[Seq[A]] = - xs.toList match { - case Nil => Seq(Seq()) - case head :: tail => for { - h <- head - t <- product(tail) - } yield h +: t - } - def runTest(funcSparkName: String, funcCkName: String, stringVal: String): Unit = { val sparkResult = spark.sql( s"""SELECT diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala index caff6a50..6db307f3 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala @@ -26,8 +26,7 @@ import xenon.clickhouse.Constants._ import xenon.clickhouse.client.NodeClient import xenon.clickhouse.exception.CHClientException import xenon.clickhouse.exception.ClickHouseErrCode._ -import xenon.clickhouse.func.clickhouse.ClickHouseXxHash64Shard -import xenon.clickhouse.func.{FunctionRegistry, _} +import xenon.clickhouse.func.{ClickHouseXxHash64Shard, FunctionRegistry, _} import xenon.clickhouse.spec._ import java.time.ZoneId diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/CityHash64.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala similarity index 52% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/CityHash64.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala index 160d45e9..b78f8ee3 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/CityHash64.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/CityHash64.scala @@ -12,29 +12,15 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func -import io.netty.buffer.{ByteBuf, Unpooled} -import org.apache.spark.unsafe.types.UTF8String -import xenon.clickhouse.func.MultiArgsHash -import xenon.clickhouse.func.clickhouse.cityhash.{CityHash_v1_0_2, UInt128} +import xenon.clickhouse.hash -object CityHash64 extends MultiArgsHash { +object CityHash64 extends MultiStringArgsHash { // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L694 override protected def funcName: String = "clickhouse_cityHash64" override val ckFuncNames: Array[String] = Array("cityHash64") - def convertToByteBuf(array: Array[Byte]): ByteBuf = { - val byteBuf = Unpooled.buffer(array.length).writeBytes(array) - byteBuf - } - - override def invokeBase(value: UTF8String): Long = { - // ignore UInt64 vs Int64 - val data = value.getBytes - CityHash_v1_0_2.CityHash64(convertToByteBuf(data), 0, data.length) - } - - override def combineHashes(v1: Long, v2: Long): Long = CityHash_v1_0_2.Hash128to64(new UInt128(v1, v2)) + override def applyHash(input: Array[Any]): Long = hash.CityHash64(input) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Days.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala similarity index 95% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Days.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala index 672fd44f..3008d7fd 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Days.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala @@ -12,11 +12,10 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} import org.apache.spark.sql.types._ -import xenon.clickhouse.func.ClickhouseEquivFunction import java.time.LocalDate import java.time.format.DateTimeFormatter diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala index c6f01110..d7856c3c 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala @@ -15,7 +15,6 @@ package xenon.clickhouse.func import org.apache.spark.sql.connector.catalog.functions.UnboundFunction -import xenon.clickhouse.func.clickhouse._ import scala.collection.mutable diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Hours.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala similarity index 93% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Hours.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala index 0abe25cb..e88907be 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Hours.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala @@ -12,13 +12,12 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} import org.apache.spark.sql.types._ -import xenon.clickhouse.func.ClickhouseEquivFunction -import java.sql.{Date, Timestamp} +import java.sql.Timestamp import java.text.SimpleDateFormat object Hours extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Mod.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala similarity index 96% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Mod.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala index b10f0f7e..69fdedc9 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Mod.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala @@ -12,11 +12,10 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} import org.apache.spark.sql.types._ -import xenon.clickhouse.func.ClickhouseEquivFunction object Mod extends UnboundFunction with ScalarFunction[Long] with ClickhouseEquivFunction { diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Months.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala similarity index 95% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Months.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala index 846dd245..13e06d88 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Months.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala @@ -12,11 +12,10 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} import org.apache.spark.sql.types._ -import xenon.clickhouse.func.ClickhouseEquivFunction import java.time.LocalDate import java.time.format.DateTimeFormatter diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiArgsHash.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala similarity index 55% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiArgsHash.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala index adc3a382..69ce07c1 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiArgsHash.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala @@ -19,32 +19,41 @@ import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFu import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -abstract class MultiArgsHash extends UnboundFunction with ClickhouseEquivFunction { +abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivFunction { + + def applyHash(input: Array[Any]): Long + + protected def funcName: String + + override val ckFuncNames: Array[String] + + override def description: String = s"$name: (value: string, ...) => hash_value: long" + private def isExceptedType(dt: DataType): Boolean = dt.isInstanceOf[StringType] final override def name: String = funcName + final override def bind(inputType: StructType): BoundFunction = { val inputDataTypes = inputType.fields.map(_.dataType) - if (inputDataTypes.forall(isExceptedType)) new ScalarFunction[Long] { - override def inputTypes(): Array[DataType] = inputDataTypes - override def name: String = funcName - override def canonicalName: String = s"clickhouse.$name" - override def resultType: DataType = LongType - override def toString: String = name - override def produceResult(input: InternalRow): Long = { - val inputStrings: Seq[UTF8String] = - input.toSeq(Seq.fill(input.numFields)(StringType)).asInstanceOf[Seq[UTF8String]] - inputStrings.map(invokeBase).reduce(combineHashes) + if (inputDataTypes.forall(isExceptedType)) { + // need to new a ScalarFunction instance for each bind, + // because we do not know the number of arguments in advance + new ScalarFunction[Long] { + override def inputTypes(): Array[DataType] = inputDataTypes + override def name: String = funcName + override def canonicalName: String = s"clickhouse.$name" + override def resultType: DataType = LongType + override def toString: String = name + override def produceResult(input: InternalRow): Long = { + val inputStrings: Array[Any] = + input.toSeq(Seq.fill(input.numFields)(StringType)).asInstanceOf[Seq[UTF8String]].toArray + .map(_.getBytes) + applyHash(inputStrings) + } } - } - else throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description") + } else throw new UnsupportedOperationException(s"Expect multiple STRING argument. $description") } - protected def funcName: String - override val ckFuncNames: Array[String] - override def description: String = s"$name: (value: string, ...) => hash_value: long" - def invokeBase(value: UTF8String): Long - def combineHashes(v1: Long, v2: Long): Long } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/MurmurHash2.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala similarity index 52% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/MurmurHash2.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala index f2ff9ed2..9fac4d60 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/MurmurHash2.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash2.scala @@ -12,40 +12,25 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func -import org.apache.commons.codec.digest.{MurmurHash2, MurmurHash3} -import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String -import xenon.clickhouse.func.{ClickhouseEquivFunction, MultiArgsHash, Util} +import xenon.clickhouse.hash +import xenon.clickhouse.hash.HashUtils -object MurmurHash2_64 extends MultiArgsHash { +object MurmurHash2_64 extends MultiStringArgsHash { // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L460 override protected def funcName: String = "clickhouse_murmurHash2_64" override val ckFuncNames: Array[String] = Array("murmurHash2_64") - override def invokeBase(value: UTF8String): Long = { - // ignore UInt64 vs Int64 - val data = value.getBytes - MurmurHash2.hash64(data, data.length, 0) - } - - override def combineHashes(v1: Long, v2: Long): Long = Util.intHash64Impl(v1) ^ v2 + override def applyHash(input: Array[Any]): Long = hash.Murmurhash2_64(input) } -object MurmurHash2_32 extends MultiArgsHash { +object MurmurHash2_32 extends MultiStringArgsHash { // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 override protected def funcName: String = "clickhouse_murmurHash2_32" override val ckFuncNames: Array[String] = Array("murmurHash2_32") - override def invokeBase(value: UTF8String): Long = { - val data = value.getBytes - val v = MurmurHash2.hash32(data, data.length, 0) - Util.toUInt32Range(v) - } - - override def combineHashes(v1: Long, v2: Long): Long = Util.toUInt32Range(Util.int32Impl(v1) ^ v2) + override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash2_32(input)) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/MurmurHash3.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala similarity index 51% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/MurmurHash3.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala index 1db654c1..848bb3b0 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/MurmurHash3.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MurmurHash3.scala @@ -12,41 +12,25 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func -import org.apache.commons.codec.digest.MurmurHash3 -import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String -import xenon.clickhouse.func.{ClickhouseEquivFunction, MultiArgsHash, Util} +import xenon.clickhouse.hash +import xenon.clickhouse.hash.HashUtils -object MurmurHash3_64 extends MultiArgsHash { +object MurmurHash3_64 extends MultiStringArgsHash { // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L543 override protected def funcName: String = "clickhouse_murmurHash3_64" override val ckFuncNames: Array[String] = Array("murmurHash3_64") - override def invokeBase(value: UTF8String): Long = { - // ignore UInt64 vs Int64 - val data = value.getBytes - val hashes = MurmurHash3.hash128x64(data, 0, data.length, 0) - hashes(0) ^ hashes(1) - } - - override def combineHashes(v1: Long, v2: Long): Long = Util.intHash64Impl(v1) ^ v2 + override def applyHash(input: Array[Any]): Long = hash.Murmurhash3_64(input) } -object MurmurHash3_32 extends MultiArgsHash { +object MurmurHash3_32 extends MultiStringArgsHash { // https://github.com/ClickHouse/ClickHouse/blob/v23.5.3.24-stable/src/Functions/FunctionsHashing.h#L519 override protected def funcName: String = "clickhouse_murmurHash3_32" override val ckFuncNames: Array[String] = Array("murmurHash3_32") - override def invokeBase(value: UTF8String): Long = { - val data = value.getBytes - val v = MurmurHash3.hash32x86(data, 0, data.length, 0) - Util.toUInt32Range(v) - } - - override def combineHashes(v1: Long, v2: Long): Long = Util.toUInt32Range(Util.int32Impl(v1) ^ v2) + override def applyHash(input: Array[Any]): Long = HashUtils.toUInt32(hash.Murmurhash3_32(input)) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/XxHash64.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala similarity index 97% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/XxHash64.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala index 241ae9d8..3c4a5b1a 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/XxHash64.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala @@ -12,13 +12,12 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func import org.apache.spark.sql.catalyst.expressions.XxHash64Function import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import xenon.clickhouse.func.ClickhouseEquivFunction import xenon.clickhouse.spec.{ClusterSpec, ShardUtils} /** diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Years.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala similarity index 95% rename from spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Years.scala rename to spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala index 4b2e650d..6bf987fb 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/Years.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala @@ -12,11 +12,10 @@ * limitations under the License. */ -package xenon.clickhouse.func.clickhouse +package xenon.clickhouse.func import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} import org.apache.spark.sql.types._ -import xenon.clickhouse.func.ClickhouseEquivFunction import java.time.LocalDate import java.time.format.DateTimeFormatter diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/cityhash/CityHash_v1_0_2.java b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/cityhash/CityHash_v1_0_2.java deleted file mode 100644 index df218df3..00000000 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/cityhash/CityHash_v1_0_2.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xenon.clickhouse.func.clickhouse.cityhash; - -import io.netty.buffer.ByteBuf; - -// copy from https://github.com/dpoluyanov/achord/blob/master/src/main/java/com/github/mangelion/achord/CityHash_v1_0_2.java -// fixed some bugs involving int32 to uint32 conversion -final public class CityHash_v1_0_2 { - - private static final long kMul = 0x9ddfea08eb382d69L; - // Some primes between 2^63 and 2^64 for various uses. - private static final long k0 = 0xc3a5c85c97cb3127L; - private static final long k1 = 0xb492b66fbe98f273L; - private static final long k2 = 0x9ae16a3b2f90404fL; - private static final long k3 = 0xc949d7c7509e6557L; - - private CityHash_v1_0_2() { /* restricted */ } - - private static long Fetch64(ByteBuf p, int index) { - return p.getLongLE(index); - } - - private static int Fetch32(ByteBuf p, int index) { - return p.getIntLE(index); - } - - private static long toUint32(int x) { - return x & 0xFFFFFFFFL; - } - - // Equivalent to Rotate(), but requires the second arg to be non-zero. -// On x86-64, and probably others, it's possible for this to compile -// to a single instruction if both args are already in registers. - private static long RotateByAtLeast1(long val, int shift) { - return (val >>> shift) | (val << (64 - shift)); - } - - private static long ShiftMix(long val) { - return val ^ (val >>> 47); - } - - private static long Uint128Low64(UInt128 x) { - return x.first; - } - - private static long Rotate(long val, int shift) { - return shift == 0 ? val : (val >>> shift) | (val << (64 - shift)); - } - - private static long Uint128High64(UInt128 x) { - return x.second; - } - - // Hash 128 input bits down to 64 bits of output. -// This is intended to be a reasonably good hash function. - public static long Hash128to64(UInt128 x) { - // Murmur-inspired hashing. - long a = (Uint128Low64(x) ^ Uint128High64(x)) * kMul; - a ^= (a >>> 47); - long b = (Uint128High64(x) ^ a) * kMul; - b ^= (b >>> 47); - b *= kMul; - return b; - } - - private static long HashLen16(long u, long v) { - return Hash128to64(UInt128.of(u, v)); - } - - private static long HashLen0to16(ByteBuf s, int index, int len) { - if (len > 8) { - long a = Fetch64(s, index); - long b = Fetch64(s, index + len - 8); - return HashLen16(a, RotateByAtLeast1(b + len, len)) ^ b; - } - if (len >= 4) { - long a = toUint32(Fetch32(s, index)); - return HashLen16(len + (a << 3), toUint32(Fetch32(s, index + len - 4))); - } - if (len > 0) { - byte a = s.getByte(index); - byte b = s.getByte(index + len >>> 1); - byte c = s.getByte(index + len - 1); - int y = (a & 0xFF) + ((b & 0xFF) << 8); - int z = len + ((c & 0xFF) << 2); - return ShiftMix(y * k2 ^ z * k3) * k2; - } - return k2; - } - - // This probably works well for 16-byte strings as well, but it may be overkill -// in that case. - private static long HashLen17to32(ByteBuf s, int index, int len) { - long a = Fetch64(s, index) * k1; - long b = Fetch64(s, index + 8); - long c = Fetch64(s, index + len - 8) * k2; - long d = Fetch64(s, index + len - 16) * k0; - return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d, - a + Rotate(b ^ k3, 20) - c + len); - } - - // Return a 16-byte hash for 48 bytes. Quick and dirty. -// Callers do best to use "random-looking" values for a and b. - private static UInt128 WeakHashLen32WithSeeds( - long w, long x, long y, long z, long a, long b) { - a += w; - b = Rotate(b + a + z, 21); - long c = a; - a += x; - a += y; - b += Rotate(a, 44); - return UInt128.of(a + z, b + c); - } - - // Return a 16-byte hash for s[0] ... s[31], a, and b. Quick and dirty. - private static UInt128 WeakHashLen32WithSeeds(ByteBuf s, int index, long a, long b) { - return WeakHashLen32WithSeeds(Fetch64(s, index), - Fetch64(s, index + 8), - Fetch64(s, index + 16), - Fetch64(s, index + 24), - a, - b); - } - - // Return an 8-byte hash for 33 to 64 bytes. - private static long HashLen33to64(ByteBuf s, int index, int len) { - long z = Fetch64(s, index + 24); - long a = Fetch64(s, index) + (len + Fetch64(s, index + len - 16)) * k0; - long b = Rotate(a + z, 52); - long c = Rotate(a, 37); - a += Fetch64(s, index + 8); - c += Rotate(a, 7); - a += Fetch64(s, index + 16); - long vf = a + z; - long vs = b + Rotate(a, 31) + c; - a = Fetch64(s, index + 16) + Fetch64(s, index + len - 32); - z = Fetch64(s, index + len - 8); - b = Rotate(a + z, 52); - c = Rotate(a, 37); - a += Fetch64(s, index + len - 24); - c += Rotate(a, 7); - a += Fetch64(s, index + len - 16); - long wf = a + z; - long ws = b + Rotate(a, 31) + c; - long r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0); - return ShiftMix(r * k0 + vs) * k2; - } - - // A subroutine for CityHash128(). Returns a decent 128-bit hash for strings -// of any length representable in ssize_t. Based on City and Murmur. - private static UInt128 CityMurmur(ByteBuf s, int index, int len, UInt128 seed) { - long a = Uint128Low64(seed); - long b = Uint128High64(seed); - long c; - long d; - int l = len - 16; - if (l <= 0) { // len <= 16 - a = ShiftMix(a * k1) * k1; - c = b * k1 + HashLen0to16(s, index, len); - d = ShiftMix(a + (len >= 8 ? Fetch64(s, index) : c)); - } else { // len > 16 - c = HashLen16(Fetch64(s, index + len - 8) + k1, a); - d = HashLen16(b + len, c + Fetch64(s, index + len - 16)); - a += d; - do { - a ^= ShiftMix(Fetch64(s, index) * k1) * k1; - a *= k1; - b ^= a; - c ^= ShiftMix(Fetch64(s, index + 8) * k1) * k1; - c *= k1; - d ^= c; - index += 16; - l -= 16; - } while (l > 0); - } - a = HashLen16(a, c); - b = HashLen16(d, b); - return UInt128.of(a ^ b, HashLen16(b, a)); - } - - public static long CityHash64(ByteBuf s, int index, int len) { - if (len <= 32) { - if (len <= 16) { - return HashLen0to16(s, index, len); - } else { - return HashLen17to32(s, index, len); - } - } else if (len <= 64) { - return HashLen33to64(s, index, len); - } - - // For strings over 64 bytes we hash the end first, and then as we - // loop we keep 56 bytes of state: v, w, x, y, and z. - long x = Fetch64(s, index); - long y = Fetch64(s, index + len - 16) ^ k1; - long z = Fetch64(s, index + len - 56) ^ k0; - UInt128 v = WeakHashLen32WithSeeds(s, len - 64, len, y); - UInt128 w = WeakHashLen32WithSeeds(s, len - 32, len * k1, k0); - z += ShiftMix(v.second) * k1; - x = Rotate(z + x, 39) * k1; - y = Rotate(y, 33) * k1; - - // Decrease len to the nearest multiple of 64, and operate on 64-byte chunks. - len = (len - 1) & ~63; - do { - x = Rotate(x + y + v.first + Fetch64(s, index + 16), 37) * k1; - y = Rotate(y + v.second + Fetch64(s, index + 48), 42) * k1; - x ^= w.second; - y ^= v.first; - z = Rotate(z ^ w.first, 33); - v = WeakHashLen32WithSeeds(s, index, v.second * k1, x + w.first); - w = WeakHashLen32WithSeeds(s, index + 32, z + w.second, y); - // swap - long t = z; - z = x; - x = t; - index += 64; - len -= 64; - } while (len != 0); - return HashLen16(HashLen16(v.first, w.first) + ShiftMix(y) * k1 + z, - HashLen16(v.second, w.second) + x); - } - - private static long CityHash64WithSeed(ByteBuf s, int index, int len, long seed) { - return CityHash64WithSeeds(s, index, len, k2, seed); - } - - private static long CityHash64WithSeeds(ByteBuf s, int index, int len, - long seed0, long seed1) { - return HashLen16(CityHash64(s, index, len) - seed0, seed1); - } - - private static UInt128 CityHash128WithSeed(ByteBuf s, int index, int len, UInt128 seed) { - if (len < 128) { - return CityMurmur(s, index, len, seed); - } - - // We expect len >= 128 to be the common case. Keep 56 bytes of state: - // v, w, x, y, and z. - UInt128 v, w; - long x = Uint128Low64(seed); - long y = Uint128High64(seed); - long z = len * k1; - long vFirst = Rotate(y ^ k1, 49) * k1 + Fetch64(s, index); - long vSecond = Rotate(vFirst, 42) * k1 + Fetch64(s, index + 8); - long wFirst = Rotate(y + z, 35) * k1 + x; - long wSecond = Rotate(x + Fetch64(s, index + 88), 53) * k1; - -// v = UInt128.of(vFirst, vSecond); -// w = UInt128.of(wFirst, wSecond); - - // This is the same inner loop as CityHash64(), manually unrolled. - do { - x = Rotate(x + y + vFirst + Fetch64(s, index + 16), 37) * k1; - y = Rotate(y + vSecond + Fetch64(s, index + 48), 42) * k1; - x ^= wSecond; - y ^= vFirst; - z = Rotate(z ^ wFirst, 33); - v = WeakHashLen32WithSeeds(s, index, vSecond * k1, x + wFirst); - w = WeakHashLen32WithSeeds(s, index + 32, z + wSecond, y); - - vFirst = v.first; - vSecond = v.second; - wFirst = w.first; - wSecond = w.second; - { - long swap = z; - z = x; - x = swap; - } - index += 64; - x = Rotate(x + y + vFirst + Fetch64(s, index + 16), 37) * k1; - y = Rotate(y + vSecond + Fetch64(s, index + 48), 42) * k1; - x ^= wSecond; - y ^= vFirst; - z = Rotate(z ^ wFirst, 33); - v = WeakHashLen32WithSeeds(s, index, vSecond * k1, x + wFirst); - w = WeakHashLen32WithSeeds(s, index + 32, z + wSecond, y); - - vFirst = v.first; - vSecond = v.second; - wFirst = w.first; - wSecond = w.second; - { - long swap = z; - z = x; - x = swap; - } - index += 64; - len -= 128; - } while (len >= 128); - y += Rotate(wFirst, 37) * k0 + z; - x += Rotate(vFirst + z, 49) * k0; - // If 0 < len < 128, hash up to 4 chunks of 32 bytes each from the end of s. - for (int tail_done = 0; tail_done < len; ) { - tail_done += 32; - y = Rotate(y - x, 42) * k0 + vSecond; - wFirst += Fetch64(s, index + len - tail_done + 16); - x = Rotate(x, 49) * k0 + wFirst; - wFirst += vFirst; - v = WeakHashLen32WithSeeds(s, index + len - tail_done, vFirst, vSecond); - - vFirst = v.first; - vSecond = v.second; - } - // At this point our 48 bytes of state should contain more than - // enough information for a strong 128-bit hash. We use two - // different 48-byte-to-8-byte hashes to get a 16-byte final result. - x = HashLen16(x, vFirst); - y = HashLen16(y, wFirst); - return UInt128.of(HashLen16(x + vSecond, wSecond) + y, - HashLen16(x + wSecond, y + vSecond)); - } - - public static UInt128 CityHash128(ByteBuf s, int len) { - if (len >= 16) { - return CityHash128WithSeed(s, 16, - len - 16, - UInt128.of(Fetch64(s, 0) ^ k3, - Fetch64(s, 8))); - } else if (len >= 8) { - return CityHash128WithSeed(null, - 0, 0, - UInt128.of(Fetch64(s, 0) ^ (len * k0), - Fetch64(s, len - 8) ^ k1)); - } else { - return CityHash128WithSeed(s, 0, len, UInt128.of(k0, k1)); - } - } -} - diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/cityhash/UInt128.java b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/cityhash/UInt128.java deleted file mode 100644 index 2ba6c1f7..00000000 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/clickhouse/cityhash/UInt128.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xenon.clickhouse.func.clickhouse.cityhash; - -/** - * @author Dmitriy Poluyanov - * @since 15/02/2018 - * copy from https://github.com/dpoluyanov/achord/blob/master/src/main/java/com/github/mangelion/achord/UInt128.java - */ -final public class UInt128 { - final public long first; - final public long second; - - public UInt128(long first, long second) { - this.first = first; - this.second = second; - } - - static UInt128 of(long first, long second) { - return new UInt128(first, second); - } -} diff --git a/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala b/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala index 34254907..d241e87b 100644 --- a/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala +++ b/spark-3.4/clickhouse-spark/src/test/scala/org/apache/spark/sql/clickhouse/FunctionRegistrySuite.scala @@ -17,13 +17,7 @@ package org.apache.spark.sql.clickhouse import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.scalatest.funsuite.AnyFunSuite import xenon.clickhouse.ClickHouseHelper -import xenon.clickhouse.func.clickhouse.ClickHouseXxHash64 -import xenon.clickhouse.func.{ - ClickhouseEquivFunction, - CompositeFunctionRegistry, - DynamicFunctionRegistry, - StaticFunctionRegistry -} +import xenon.clickhouse.func.{ClickHouseXxHash64, ClickhouseEquivFunction, CompositeFunctionRegistry, DynamicFunctionRegistry, StaticFunctionRegistry} import scala.collection.JavaConverters._