-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support writing with functions in distribute/partition expressions #253
base: main
Are you sure you want to change the base?
Conversation
I want to provide more information on the last limitation that time functions support only one input type. I tried but failed to use auto type casting via
create database if not exists test on cluster single_replica;
create or replace table test.test_month_shard on cluster single_replica
(
create_time timestamp,
create_date date,
value String
)
engine = MergeTree()
order by create_time;
create or replace table test.test_month on cluster single_replica
as test.test_month_shard
engine =Distributed(single_replica, 'test', 'test_month_shard', toYYYYMM(create_date));
insert into clickhouse_s1r1.test.test_month
values
(timestamp'2021-01-01 10:10:10', timestamp'2021-01-01 10:10:10', '1'),
(timestamp'2022-02-02 10:10:10', timestamp'2022-02-02 10:10:10', '2'),
(timestamp'2023-03-03 10:10:10', timestamp'2023-03-03 10:10:10', '3'),
(timestamp'2024-04-04 10:10:10', timestamp'2024-04-04 10:10:10', '4') AS tab(create_time, create_date, value)
use clickhouse_s1r1.test;
select clickhouse_months(timestamp'2024-04-04 10:10:10') So it seems that this functionality is broken with I believe adding a type cast before calling UDF is always helpful because we do not know the inserted data type in advance. However, this is also not achievable because |
@Yxang awesome! will take a look soon. |
spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Util.scala
Show resolved
Hide resolved
Some educations I learned from my last time investigation in this area: the hash result depends on two things 1) algorithm, 2) memory layout of input data. UTF8 has a consistent memory layout, but integers are not, it mostly depends on hardware(sometimes maybe related to the OS platform and virtual machine, e.g. JVM), even a single ClickHouse cluster can not work properly on machines composed of different arch CPUs. |
It sounds like a Spark side issue, thanks for providing detailed reproduce steps and your analysis, let me investigate it a bit. |
spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Util.scala
Outdated
Show resolved
Hide resolved
spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala
Show resolved
Hide resolved
Thanks but I want to clarify, ambiguous exceptions and auto-casting not working are two different issues (limitation no. 3 and 4), and I am addressing the auto-casting issue in the reproducing steps. The current behavior is when the input type is not supported, job will fail fast with |
object Arg2 extends Base { | ||
override def name: String = s"${funcName}_2" | ||
override def inputTypes: Array[DataType] = Array.fill(2)(StringType) | ||
def invoke(v1: UTF8String, v2: UTF8String): Long = Seq(v1, v2).map(invokeBase).reduce(combineHashes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of implementing def invoke
, there is another option to implement default R produceResult(InternalRow input)
instead. this approach may address the limitation 1 you mentioned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To my understanding, using produceResult(InternalRow input)
still needs Spark to convert the input to correct type (e.g. DateType
-> int
, and we call getInt
from InternalRow
), and if this conversion is missing somehow (likely because error says corresponds invoke
is not found), getting data from InternalRow
will still be incorrect. On the other hand, if produceResult
has to handle multiple input types inside, it would still be tricky without knowing them in advance, because we need to call the correct get_
function on InternalRow
.
I believe one solution is to make requiredDistribution
handles cast expression on the Spark side, and add a type casting in spark clickhouse connector when calling UDFs.
Please correct me if I misunderstood the procedure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, sorry, I mean the 5 arguments limitation
Hash functions support for maximum of 5 arguments, limited by Spark's v2 function does not support variable length of inputs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this possible? I thought UnboundFunction
needs to bind to a specific data type, then do type casting when input type is not inputType
attribute of its BoundFunction
(where var length input is not supported), then actually calling it. Though it will definitely reduce 5 invoke implementation into 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the varargs
support is one of the goals of the produceResult
, you may be interested in the original discussion of the FunctionCatalog
API design in the Spark community.
https://www.mail-archive.com/dev@spark.apache.org/msg27274.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed a commit supporting varargs, changing invoke
to produceResult
. Please take a look when you have time!
BTW, Spark 3.4.1 was just released these days, do you mind sending another PR to upgrade it? Once you get a PR merged, your following PR will not be blocked by "workflows awaiting approval" to run CI. |
Iceberg 1.3.0 was released with Spark 3.4 support, it can be upgraded to Spark 3.4.1 and Iceberg 1.3.0 in master, PR is welcome. |
Sure thing |
With the full stacktrace, I think I know what happened on the Spark side. The Cast was applied in codegen mode but failed due to lack of zoneId, and seems the interpreter mode has another issue that does not apply Cast. full stacktrace
|
Please check #254 |
With roughly thought, the timezone resolution would be fixed by diff --git forkSrcPrefix/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala forkDstPrefix/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
index 9b1155ef6987e76bcb07add3958e5c124ff7e716..202ba4abab22f2465f2d2ca59ff38c146f244d3d 100644
--- forkSrcPrefix/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
+++ forkDstPrefix/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, TypeCoercion}
+import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, ResolveTimeZone, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, SortOrder, TransformExpression, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, RepartitionByExpression, Sort}
-import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.connector.catalog.FunctionCatalog
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction
import org.apache.spark.sql.connector.distributions._
@@ -83,13 +83,17 @@ object DistributionAndOrderingUtils {
queryWithDistribution
}
- // Apply typeCoercionRules since the converted expression from TransformExpression
- // implemented ImplicitCastInputTypes
- typeCoercionRules.foldLeft(queryWithDistributionAndOrdering)((plan, rule) => rule(plan))
+ ResolveTimezoneAndCastExecutor.execute(queryWithDistributionAndOrdering)
case _ =>
query
}
+ private object ResolveTimezoneAndCastExecutor extends RuleExecutor[LogicalPlan] {
+ override val batches =
+ Batch("Resolve TypeCoercion", Once, typeCoercionRules: _*) ::
+ Batch("Resolve TimeZone", Once, ResolveTimeZone) :: Nil
+ }
+
private def resolveTransformExpression(expr: Expression): Expression = expr.transform {
case TransformExpression(scalarFunc: ScalarFunction[_], arguments, Some(numBuckets)) =>
V2ExpressionUtils.resolveScalarFunction(scalarFunc, Seq(Literal(numBuckets)) ++ arguments) |
@Yxang SPARK-44180(apache/spark#41725) is opened to address the cast issue(codegen enabled), before it gets fixed, you can construct non-timezone aware cases to test the implicit cast feature. |
Thanks for the fast fix! I will investigate other casting cases to check if this work. Do we now wait for the Spark patch to be merged, or we create another patch later then? |
The Spark has a quite long release schedule, SPARK-44180 only fixes the timezone-aware cast case, it does not block us to do other work. |
Sorry, I'm busy with other things these days, I did find time snippets to play with the code and find some room to improve, it may take several days to feedback in detail. |
return Hash128to64(UInt128.of(u, v)); | ||
} | ||
|
||
private static long HashLen0to16(ByteBuf s, int index, int len) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can totally avoid ByteBuf
in this class, as the clickhouse-java
does
Also cc @zhicwu, it would be good if we can provide some Java version ClickHouse hash function in the official Java client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so. Is it same as in https://github.com/EwainCai/cityHash102-java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so. Is it same as in https://github.com/EwainCai/cityHash102-java?
Need to check because I made some changes about signed/unsigned int. My reference of the implementation is at the top of this file. I will check it as I get to my PC, or you might kindly check it if available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be the same as https://github.com/EwainCai/cityHash102-java except ByteBuf
v.s. byte[]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can totally avoid
ByteBuf
in this class, as theclickhouse-java
doesAlso cc @zhicwu, it would be good if we can provide some Java version ClickHouse hash function in the official Java client.
One advantage of using ByteBuf
though is if in the future we want to support beyond utf8 string, we can handle type conversion between raw byte arrays and target type by e.g. ByteBuf.writeDoubleLE
.
Thanks, and also sorry for busy with other work. About the implicit type conversation problem, though not fully understand it, I have some upgrades:
Also, I used
|
Pushed commits fixing the above |
That's right. By default, Spark prefers to use codegen mode to execute expressions, and fallback to interpret mode if failed. SPARK-44180 fixed the codegen mode and I didn't take a look at what interpret mode happened, but I think it should have another issue. I will find another time to take a look at what happened, TBH, interpret mode is rarely used in production.
I think we can split this PR non-Spark part and merge it first, specifically:
|
f4ae4ad
to
bb594cb
Compare
Oops, I messed up with diffs, pushed to fix that. Opened a PR #261 for the hash part in the last comment. I will adjust the code in this PR once that is merged. Please kindly review that. |
@Yxang No worries. Thanks for your excellent work. |
What does this PR do?
This PR introduces support for writing to ClickHouse tables with functions in distribute/partition expressions after Spark 3.4's merge of SPARK-39607, with the need to implement ClickHouse functions into corresponding Spark ones.
The implemented functions in this PR include:
toYear
,toYYYYMM
, andtoYYYYMMDD
(with only Sparkdate
input), andtoHour
(with only Sparktimestamp
ortimestamp_ntz
)Transform
likeYearsTransform
is not evaluablemurmurHash3_64
,murmurHash3_32
,murmurHash2_64
,murmurHash2_32
, andcityHash64
with variable number ofstring
arguments.Also existing v2 function for
xxHash64
is available.Corresponding tests are also included.
Limitations
Main limitation is Spark function cannot fully match Clickhouse function's behavior, including:
Hash functions support for maximum of 5 arguments, limited by Spark's v2 function does not support variable length of inputsstring
data type, due to the complexity in aligning Spark's and ClickHouse's implementation of hash function and type castingorg.apache.spark.sql.AnalysisException: some_function(arg) is not currently supported
is ambiguous and misleading