Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] support avro format #5084

Merged
merged 13 commits into from
Dec 14, 2023
Merged

[Feature] support avro format #5084

merged 13 commits into from
Dec 14, 2023

Conversation

liunaijie
Copy link
Member

  • added avro format and UT
  • kafka support avro format and e2e test

Purpose of this pull request

close #4821
same pr with #5064

Check list

@liunaijie
Copy link
Member Author

pls re-trigger the CI, no failed job, only one cancelled job

@liugddx liugddx closed this Jul 16, 2023
@liugddx liugddx reopened this Jul 16, 2023
@liugddx
Copy link
Member

liugddx commented Jul 17, 2023

Run mvn spotless:apply and fix code style error.

@liunaijie
Copy link
Member Author

can we retrigger the CI as we has fix the code style error

@liugddx liugddx closed this Jul 17, 2023
@liugddx liugddx reopened this Jul 17, 2023
@liunaijie liunaijie force-pushed the avro-format branch 2 times, most recently from d7d8610 to 1a6426b Compare July 20, 2023 06:58
@liunaijie liunaijie changed the title [Feature] support avro format [WIP][Feature] support avro format Jul 21, 2023
@liunaijie
Copy link
Member Author

run successful when disable on flink 1.15 https://github.com/liunaijie/seatunnel/actions/runs/5617980130

@liunaijie liunaijie changed the title [WIP][Feature] support avro format [Feature] support avro format Jul 24, 2023
@liunaijie
Copy link
Member Author

merge the conflict, pls re-trigger the CI

@liunaijie
Copy link
Member Author

https://github.com/liunaijie/seatunnel/actions/runs/5676004114/job/15382134463
this pr run successful in my repo's CI. But get NPE in this CI.
And also can't repet the error in local.

@ic4y
Copy link
Contributor

ic4y commented Aug 5, 2023

@liunaijie Great commit! Now we just need to resolve the conflicts.

@liunaijie liunaijie force-pushed the avro-format branch 2 times, most recently from 7498907 to 64fb875 Compare August 6, 2023 12:44
- added avro format and UT
- kafka support avro format and e2e test

Signed-off-by: Jarvis <liunaijie1996@163.com>
@liunaijie liunaijie force-pushed the avro-format branch 2 times, most recently from 53619ad to cf800c9 Compare August 6, 2023 14:25
# Conflicts:
#	seatunnel-connectors-v2/connector-kafka/pom.xml
#	seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
#	seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
#	seatunnel-formats/pom.xml
@ic4y
Copy link
Contributor

ic4y commented Aug 14, 2023

Once PRs #5303 and #5222 are merged, they will resolve the e2e testing issue.

@liunaijie
Copy link
Member Author

this pr always failed with PaimonIT, and this pr #5419 has fixed this issue, once this issue merged, pls re-trigger the CI, thanks.

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liunaijie
good pr, please fix the conflict

# Conflicts:
#	seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@liunaijie
Copy link
Member Author

@liunaijie good pr, please fix the conflict

@hailin0 Done. PTAL, thanks.

Copy link
Contributor

@FuYouJ FuYouJ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good pr, I learned something new from it

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Conflicts:
#	seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
#	seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
#	tools/dependencies/known-dependencies.txt
SEATUNNEL_ROW_TYPE,
MessageFormat.AVRO,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do this by adding e2e uniformly to All formats are integrated in this
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @zhilinli123, i use a method in KafkaIT and also KafKaIT has other format e2e test like json, text. so i don't want move to KafkaFormatIT.

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruanwenjun
Copy link
Member

Basically LGTM, wait ci pass

@liunaijie
Copy link
Member Author

Basically LGTM, wait ci pass

@ruanwenjun PTAL

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hailin0 hailin0 merged commit 93a0061 into apache:dev Dec 14, 2023
6 checks passed
@zhilinli123
Copy link
Contributor

hi I found some issues in kafka avro e2e testing. My configuration is as follows

source {
  Kafka {
    bootstrap.servers = "kafkaCluster:9092"
    topic = "test_avro_topic"
    result_table_name = "kafka_table"
 // When configured this will pull the consumption from kafka's starting offset kafka.auto.offset.reset = "earliest" does not take effect
    **start_mode = "earliest"** 
    format = avro
    format_error_handle_way = skip
    schema = {
      fields {
        id = bigint
        c_map = "map<string, smallint>"
        c_array = "array<tinyint>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(2, 1)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

sink {
  Console {
    source_table_name = "kafka_table"
  }
  Assert {
    source_table_name = "kafka_table"
    rules =
      {
        field_rules = [
          {
            field_name = id
            field_type = long
            field_value = [
              {
                rule_type = NOT_NULL
              },
              {
                rule_type = MIN
                rule_value = 0
              },
              {
                rule_type = MAX
                rule_value = 99
              }
            ]
          }
        ]
      }
  }
}

image
image
@liunaijie

alextinng pushed a commit to alextinng/seatunnel that referenced this pull request Dec 19, 2023
chaorongzhi pushed a commit to chaorongzhi/seatunnel that referenced this pull request Aug 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Connector]We need avro format for data when using rocketmq or kafka
8 participants