Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An JsonParseException error occurred while calling createOrReplace when use pyspark #179

Closed
sketchmind opened this issue Aug 5, 2022 · 17 comments

Comments

@sketchmind
Copy link
Contributor

sketchmind commented Aug 5, 2022

Env:

  • pyspark3.2.0
  • clickhouse-spark-runtime-3.2_2.12-0.5.0-SNAPSHOT.jar
  • clickhouse-jdbc-0.3.2-patch11-all.jar

Code:

from typing import Dict, List

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType

from common.sessions import spark


def set_nullable(ori: DataFrame, cols: List[str] = None, nullable: bool = False,
                 case_sensitive: bool = False) -> DataFrame:
    if cols is None:
        cols = ori.columns
    if not case_sensitive:
        cols = [col.lower() for col in cols]
    fields = []
    for x in ori.schema:
        if x.name if case_sensitive else x.name.lower() in cols:
            x.nullable = nullable
        fields.append(x)
    schema = StructType(fields)
    return spark.createDataFrame(ori.rdd, schema)


def set_comment(ori: DataFrame, comments: Dict[str, str], case_sensitive: bool = False) -> DataFrame:
    fields = []
    if not case_sensitive:
        comments = {k.lower(): v for k, v in comments.items()}
    for x in ori.schema:
        c = comments.get(x.name if case_sensitive else x.name.lower())
        if c is not None:
            x.metadata.update(comment=c)
        fields.append(x)
    schema = StructType(fields)
    return spark.createDataFrame(ori.rdd, schema)


demo = spark.sql(
    "SELECT * "
    "FROM VALUES (1, 'rice',0.8, 'can eat'), (2, 'vegetable', 1.2, NULL) AS tab(id, food, price, remark)")

df = set_nullable(set_comment(demo, {"food": "food", "price": "price usd"}), ["remark"], True)
df.show()
df.printSchema()

df.writeTo("clickhouse.app.test_schema_nullable") \
    .tableProperty("engine", "MergeTree()") \
    .tableProperty("order_by", "(food,)") \
    .tableProperty("settings.index_granularity", "8192") \
    .createOrReplace()

Console:
image

py4j.protocol.Py4JJavaError: An error occurred while calling o214.createOrReplace.
: xenon.relocated.com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.clickhouse.client.stream.WrappedInputStream); line: 1, column: 2]
	at xenon.relocated.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
	at xenon.relocated.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735)
	at xenon.relocated.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659)
	at xenon.relocated.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2737)
	at xenon.relocated.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
	at xenon.relocated.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
	at xenon.relocated.com.fasterxml.jackson.databind.ObjectReader._bindAndReadValues(ObjectReader.java:2152)
	at xenon.relocated.com.fasterxml.jackson.databind.ObjectReader.readValues(ObjectReader.java:1861)
	at xenon.clickhouse.format.JSONEachRowSimpleOutput$.deserialize(JSONOutputFormat.scala:34)
	at xenon.clickhouse.client.NodeClient.$anonfun$syncQueryAndCheckOutputJSONEachRow$1(NodeClient.scala:68)
	at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:138)
	at xenon.clickhouse.client.NodeClient.syncQueryAndCheck(NodeClient.scala:149)
	at xenon.clickhouse.client.NodeClient.syncQueryAndCheckOutputJSONEachRow(NodeClient.scala:68)
	at xenon.clickhouse.ClickHouseCatalog.initialize(ClickHouseCatalog.scala:69)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
	at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:123)
	at org.apache.spark.sql.connector.catalog.LookupCatalog$NonSessionCatalogAndIdentifier$.unapply(LookupCatalog.scala:73)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$NonSessionCatalogAndTable$.unapply(ResolveCatalogs.scala:99)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:69)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:33)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:33)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:28)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:194)
	at org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:211)
	at org.apache.spark.sql.DataFrameWriterV2.createOrReplace(DataFrameWriterV2.scala:132)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
@pan3793
Copy link
Collaborator

pan3793 commented Aug 6, 2022

Can not reproduce.

I'm not familiar w/ python, I translated and simplify you test case to a scala version. No error occurs.

  test("create or replace table 2") {
    def set_comment(ori: DataFrame, comments: Map[String, String]): DataFrame = {
      val newSchema = ori.schema.map { field =>
        comments.get(field.name).map(comment => field.withComment(comment)).getOrElse(field)
      }
      spark.createDataFrame(ori.rdd, StructType(newSchema))
    }

    val demo = spark.sql(
      """SELECT * FROM VALUES 
        |  (1, 'rice',      0.8, 'can eat'),
        |  (2, 'vegetable', 1.2, NULL     ) AS tab(id, food, price, remark)
        |""".stripMargin
    )
    val df = set_comment(demo, Map("food"-> "food", "price" -> "price usd") )
    spark.sql("create database app")
    df.writeTo("clickhouse.app.test_schema_nullable")
      .tableProperty("engine", "MergeTree()")
      .tableProperty("order_by", "(food,)")
      .tableProperty("settings.index_granularity", "8192")
      .tableProperty("settings.allow_nullable_key", "1")  // it's known issue
      .createOrReplace()
  }

@sketchmind
Copy link
Contributor Author

Can not reproduce.

I'm not familiar w/ python, I translated and simplify you test case to a scala version. No error occurs.

  test("create or replace table 2") {
    def set_comment(ori: DataFrame, comments: Map[String, String]): DataFrame = {
      val newSchema = ori.schema.map { field =>
        comments.get(field.name).map(comment => field.withComment(comment)).getOrElse(field)
      }
      spark.createDataFrame(ori.rdd, StructType(newSchema))
    }

    val demo = spark.sql(
      """SELECT * FROM VALUES 
        |  (1, 'rice',      0.8, 'can eat'),
        |  (2, 'vegetable', 1.2, NULL     ) AS tab(id, food, price, remark)
        |""".stripMargin
    )
    val df = set_comment(demo, Map("food"-> "food", "price" -> "price usd") )
    spark.sql("create database app")
    df.writeTo("clickhouse.app.test_schema_nullable")
      .tableProperty("engine", "MergeTree()")
      .tableProperty("order_by", "(food,)")
      .tableProperty("settings.index_granularity", "8192")
      .tableProperty("settings.allow_nullable_key", "1")  // it's known issue
      .createOrReplace()
  }

I don't have this problem using 0.4.0, but 0.5.0-SNAPSHOT which get from snapshot repositories instead of compiled by me will also throw the error

@pan3793
Copy link
Collaborator

pan3793 commented Aug 8, 2022

Local test w/ Spark 3.2.1 and Spark 3.3.0, everything goes well

@camper42
Copy link
Contributor

@sketchmind your clickhouse server version ?

@sketchmind
Copy link
Contributor Author

@sketchmind your clickhouse server version ?

21.3.7.62

@camper42
Copy link
Contributor

not same as us, can't do a quick test

btw, after #177 , @pan3793 align jackson version instead of create a shaded jar

xenon.relocated.com.fasterxml.jackson point it come from a snapshot before #177 and after #153 ?

try 0.5.0 stable ?

@sketchmind
Copy link
Contributor Author

not same as us, can't do a quick test

btw, after #177 , @pan3793 align jackson version instead of create a shaded jar

xenon.relocated.com.fasterxml.jackson point it come from a snapshot before #177 and after #153 ?

try 0.5.0 stable ?

Thx, let me have a test

@sketchmind
Copy link
Contributor Author

not same as us, can't do a quick test

btw, after #177 , @pan3793 align jackson version instead of create a shaded jar

xenon.relocated.com.fasterxml.jackson point it come from a snapshot before #177 and after #153 ?

try 0.5.0 stable ?

0.5.0 stable still has this problem, I will recently configured a scala environment to debug it

@gfunc
Copy link

gfunc commented Aug 22, 2022

experienced the same error when upgraded to 0.5.0 release and configed to use grpc port(9100) with sparksql, but things worked when switched to http port (8123)

jars used

  • clickhouse-jdbc-0.3.2-patch11-all.jar
  • clickhouse-native-jdbc-2.6.5.jar
  • clickhouse-spark-runtime-3.3_2.12-0.5.0.jar

setup:

  • clickhouse version 21.10.3.9
  • spark 3.3

error message:

22/08/22 06:37:40 ERROR SparkSQLDriver: Failed in [use clickhouse]
com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.clickhouse.client.stream.WrappedInputStream); line: 1, column: 2]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2737)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
        at com.fasterxml.jackson.databind.ObjectReader._bindAndReadValues(ObjectReader.java:2152)
        at com.fasterxml.jackson.databind.ObjectReader.readValues(ObjectReader.java:1861)
        at xenon.clickhouse.format.JSONEachRowSimpleOutput$.deserialize(JSONOutputFormat.scala:34)
        at xenon.clickhouse.client.NodeClient.$anonfun$syncQueryAndCheckOutputJSONEachRow$1(NodeClient.scala:67)
        at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:138)
        at xenon.clickhouse.client.NodeClient.syncQueryAndCheck(NodeClient.scala:150)
        at xenon.clickhouse.client.NodeClient.syncQueryAndCheckOutputJSONEachRow(NodeClient.scala:67)
        at xenon.clickhouse.ClickHouseCatalog.initialize(ClickHouseCatalog.scala:69)
        at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
        at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
        at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndNamespace$.unapply(LookupCatalog.scala:86)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:33)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
        at org.apache.spark.sql.catalyst.plans.logical.SetCatalogAndNamespace.mapChildren(v2Commands.scala:752)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:32)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:28)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:286)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.clickhouse.client.stream.WrappedInputStream); line: 1, column: 2]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2737)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
        at com.fasterxml.jackson.databind.ObjectReader._bindAndReadValues(ObjectReader.java:2152)
        at com.fasterxml.jackson.databind.ObjectReader.readValues(ObjectReader.java:1861)
        at xenon.clickhouse.format.JSONEachRowSimpleOutput$.deserialize(JSONOutputFormat.scala:34)
        at xenon.clickhouse.client.NodeClient.$anonfun$syncQueryAndCheckOutputJSONEachRow$1(NodeClient.scala:67)
        at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:138)
        at xenon.clickhouse.client.NodeClient.syncQueryAndCheck(NodeClient.scala:150)
        at xenon.clickhouse.client.NodeClient.syncQueryAndCheckOutputJSONEachRow(NodeClient.scala:67)
        at xenon.clickhouse.ClickHouseCatalog.initialize(ClickHouseCatalog.scala:69)
        at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
        at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
        at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndNamespace$.unapply(LookupCatalog.scala:86)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:33)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
        at org.apache.spark.sql.catalyst.plans.logical.SetCatalogAndNamespace.mapChildren(v2Commands.scala:752)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:32)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:28)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:286)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

@pan3793
Copy link
Collaborator

pan3793 commented Aug 22, 2022

Hi @gfunc, have you set other configurations?

@pan3793
Copy link
Collaborator

pan3793 commented Aug 22, 2022

Please disable compression when you are using gRPC w/ old(before 22.3) ClickHouse and see what will happen.

spark.clickhouse.write.compression.codec=none
spark.clickhouse.read.compression.codec=none

@sketchmind
Copy link
Contributor Author

Please disable compression when you are using gRPC w/ old(before 22.3) ClickHouse and see what will happen.

spark.clickhouse.write.compression.codec=none
spark.clickhouse.read.compression.codec=none

I have set options like this before, still the same error

@pan3793
Copy link
Collaborator

pan3793 commented Aug 22, 2022

I just merged #183 which added clickhouse 21.3 21.8 22.3 22.8 in CI for testing, it would be good if you can tune the build-in test case to reproduce the error in CI.

@gfunc
Copy link

gfunc commented Aug 23, 2022

Please disable compression when you are using gRPC w/ old(before 22.3) ClickHouse and see what will happen.

spark.clickhouse.write.compression.codec=none
spark.clickhouse.read.compression.codec=none

the same error with below configuration

kubectl exec -ti -n ${namespace} ${spark_cluster}-master-0 -- spark-sql \
		--master spark://${spark_cluster}-master-svc:7077 \
		--conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \
		--conf spark.sql.catalog.clickhouse.host=clickhouse-prod.infra.svc.cluster.local \
		--conf spark.sql.catalog.clickhouse.protocol=grpc \
		--conf spark.sql.catalog.clickhouse.grpc_port=9100 \
		--conf spark.clickhouse.write.compression.codec=none \
		--conf spark.clickhouse.read.compression.codec=none \
		--conf spark.sql.catalog.clickhouse.user=${CH_TEST_USER} \
		--conf spark.sql.catalog.clickhouse.password=${CH_TEST_USER_PW} \
		--conf spark.sql.catalog.clickhouse.database=default

@camper42
Copy link
Contributor

I notice that we run our tests against http protocol only @pan3793

we use http write to clickhouse catalog no matter grpcEnabled

@camper42
Copy link
Contributor

https://github.com/ClickHouse/clickhouse-java#features

⚠️ experimental, works with 22.3+, known to has issue with lz4 compression and may cause high memory usage on server

@mzitnik
Copy link
Collaborator

mzitnik commented Jun 30, 2024

GRCP will not be supported with the next version of the Java client. We started to deprecate the package

@mzitnik mzitnik closed this as completed Jun 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants