Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][KafkaSource]Add customize the row separator. #4494

Closed
wants to merge 10 commits into from

Conversation

lightzhao
Copy link
Contributor

Purpose of this pull request

Add customize the row separator, if a message contains multiple row of data, you can customize the line separator to split.
Config:

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  execution.planner = blink
  job.name = "kafka_hive_row_delimiter_test"
  execution.checkpoint.interval = 60000
}

source {
  Kafka {
    result_table_name = "kafka_table"
    schema = {
      fields {
        c1 = "string"
        c2 = "string"
        c3 = "string"
      }
    }
    format = text
    field_delimiter = ","
    topic = "test_topic_row_delimiter"
    bootstrap.servers = "kafkacluster:9092"
    kafka.max.poll.records = 500
    row_delimiter = "\\n"
 }
}

transform {
  sql {
    sql = "select c1,c2,c3,CAST(DATE_FORMAT(CAST(NOW() AS VARCHAR),'yyyyMMdd') as VARCHAR) as dt from kafka_table"
  }
}

sink {
  Console{}
  Hive {
    table_name = "db_test1.tmp_test03"
    metastore_uri = "thrift://hive:9083"
    partition_dir_expression = "${v0}"
  }
}

Test screenshot
image
image

Check list

@lightzhao
Copy link
Contributor Author

please approve ci check.

@lightzhao
Copy link
Contributor Author

@Hisoka-X @TyrantLucifer please approve ci check.

@lightzhao
Copy link
Contributor Author

@Hisoka-X @TyrantLucifer @hailin0 PTAL.

1 similar comment
@lightzhao
Copy link
Contributor Author

@Hisoka-X @TyrantLucifer @hailin0 PTAL.

@lightzhao
Copy link
Contributor Author

please approve ci , thanks.

lightzhao added 2 commits May 25, 2023 09:13
# Conflicts:
#	seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
…seatunnel into kafka-row-delimiter

� Conflicts:
�	seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@lightzhao
Copy link
Contributor Author

please approve ci.

@lightzhao
Copy link
Contributor Author

@Hisoka-X @EricJoy2048 @TyrantLucifer please approve ci.

@lightzhao
Copy link
Contributor Author

@Hisoka-X @EricJoy2048 @TyrantLucifer please approve ci.

lightzhao added 3 commits June 21, 2023 14:26
# Conflicts:
#	docs/en/connector-v2/source/kafka.md
@Hisoka-X
Copy link
Member

Sorry for late response. Let me check now!

@@ -150,8 +160,27 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
recordList) {

try {
deserializationSchema.deserialize(
record.value(), output);
if (StringUtils.isBlank(rowDelimiter)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the message in json contains row_delimiter value like \n:

{"key":"value",
"key2":"value2"
}

The split will produce two wrong message which can't convert to normal json string.

So I believe the feature only work normally when format are text.

Why not put this feature into TextDeserializationSchema? So that other connector can get this feature too. cc @TyrantLucifer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the premise is that the content of the message cannot contain newline symbols, otherwise there will be wrong parsing, in fact, the same problem will occur in the text format.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also believe it's best to add this feature to TextDeserializationSchema.

@lightzhao lightzhao closed this Aug 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants