Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [KafkaTableStream] Kafka Source #2105

Closed
3 tasks done
immustard opened this issue Jul 1, 2022 · 9 comments
Closed
3 tasks done

[Bug] [KafkaTableStream] Kafka Source #2105

immustard opened this issue Jul 1, 2022 · 9 comments
Assignees
Labels

Comments

@immustard
Copy link
Contributor

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

source is KafkaTableStream and the schema uses the example. but the flink job error : com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode

SeaTunnel Version

v2.1.2

SeaTunnel Config

env {
  execution.parallelism=1
}

source {
  KafkaTableStream {
      consumer.bootstrap.servers = "192.168.10.102:9092"
      consumer.group.id = "seatunnel-learn"
      topics = test_csv
      result_table_name = test
      format.type = csv
      schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
      format.field-delimiter = ";"
      format.allow-comments = "true"
      format.ignore-parse-errors = "true"
  }
}

transform {
  sql {
    sql = "select name, age from test where age > 18"
  }
}

sink {
  Kafka {
    topics = "test_sink"
    producer.bootstrap.server = "192.168.10.102:9092"
  }
}

Running Command

./bin/start-seatunnel-flink.sh --config ./config/kafkaETL.conf

Error Exception

Exception in thread "main" java.lang.RuntimeException: Execute Flink task error
	at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:84)
	at org.apache.seatunnel.core.base.Seatunnel.run(Seatunnel.java:39)
	at org.apache.seatunnel.example.flink.LocalFlinkExample.main(LocalFlinkExample.java:40)
Caused by: java.lang.RuntimeException: String json deserialization exception.
	at org.apache.seatunnel.common.utils.JsonUtils.parseObject(JsonUtils.java:242)
	at org.apache.seatunnel.flink.kafka.source.KafkaTableStream.prepare(KafkaTableStream.java:114)
	at org.apache.seatunnel.flink.kafka.source.KafkaTableStream.prepare(KafkaTableStream.java:51)
	at org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand.lambda$prepare$0(BaseTaskExecuteCommand.java:67)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand.prepare(BaseTaskExecuteCommand.java:67)
	at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:80)
	... 2 more
Caused by: java.lang.ClassCastException: com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode
	at org.apache.seatunnel.common.utils.JsonUtils.parseObject(JsonUtils.java:239)
	... 8 more

Flink or Spark Version

Flink 1.13.6

Java or Scala Version

Java 1.8
scala 2.12

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@immustard immustard added the bug label Jul 1, 2022
@san346596324
Copy link

san346596324 commented Jul 5, 2022

+1,2.11 no problem

@immustard
Copy link
Contributor Author

+1,2.11 no problem

yep, it's no problem in v2.1.1

@Hisoka-X
Copy link
Member

Hisoka-X commented Jul 6, 2022

I got, the reason is we replace fastjson by jackson, so if your schema is list not object, the convert will report exception, I will fix it. Thanks for report.

@Hisoka-X
Copy link
Member

Hisoka-X commented Jul 6, 2022

Did you try use list schema in 2.1.1, is it work fine?

@immustard
Copy link
Contributor Author

Did you try use list schema in 2.1.1, is it work fine?

yes, it's work fine in 2.1.1

@VictorZeng
Copy link

I got, the reason is we replace fastjson by jackson, so if your schema is list not object, the convert will report exception, I will fix it. Thanks for report.

Hi, Why use Jackson to replace fastjson?

@Hisoka-X
Copy link
Member

I got, the reason is we replace fastjson by jackson, so if your schema is list not object, the convert will report exception, I will fix it. Thanks for report.

Hi, Why use Jackson to replace fastjson?

Different json frameworks have been used in the project before. For unification, we chose jackson.

@Hisoka-X
Copy link
Member

Close by #2168

@immustard
Copy link
Contributor Author

immustard commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants