diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md
index 835344126bb..17ff091061a 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -30,19 +30,19 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
## Sink Options
-| Name | Type | Required | Default | Description |
-|----------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. |
-| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
-| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). |
-| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. |
-| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. |
-| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. |
-| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. |
-| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. |
-| format | String | No | json | Data format. The default format is json. Optional text format, canal-json and debezium-json.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. |
-| field_delimiter | String | No | , | Customize the field delimiter for data format. |
-| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
+| Name | Type | Required | Default | Description |
+|----------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. |
+| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
+| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). |
+| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. |
+| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. |
+| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. |
+| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. |
+| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. |
+| format | String | No | json | Data format. The default format is json. Optional text format, canal-json, debezium-json and avro.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. |
+| field_delimiter | String | No | , | Customize the field delimiter for data format. |
+| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
## Parameter Interpretation
diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md
index 16b9c5420b3..6b69e8e9316 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -32,23 +32,23 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
## Source Options
-| Name | Type | Required | Default | Description |
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. |
-| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
-| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. |
-| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. |
-| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. |
-| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). |
-| schema | Config | No | - | The structure of the data, including field names and field types. |
-| format | String | No | json | Data format. The default format is json. Optional text format, canal-json and debezium-json.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. |
-| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. |
-| field_delimiter | String | No | , | Customize the field delimiter for data format. |
-| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. |
-| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. |
-| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". |
-| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. |
-| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
+| Name | Type | Required | Default | Description |
+|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. |
+| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
+| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. |
+| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. |
+| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. |
+| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). |
+| schema | Config | No | - | The structure of the data, including field names and field types. |
+| format | String | No | json | Data format. The default format is json. Optional text format, canal-json, debezium-json and avro.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. |
+| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. |
+| field_delimiter | String | No | , | Customize the field delimiter for data format. |
+| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. |
+| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. |
+| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". |
+| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. |
+| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
## Task Example
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 7955ab3f546..6e470f12ba2 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -72,7 +72,11 @@
connect-json
${kafka.client.version}
-
+
+ org.apache.seatunnel
+ seatunnel-format-avro
+ ${project.version}
+
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 491990fd33f..18e466e41c6 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -24,5 +24,6 @@ public enum MessageFormat {
DEBEZIUM_JSON,
COMPATIBLE_DEBEZIUM_JSON,
COMPATIBLE_KAFKA_CONNECT_JSON,
- OGG_JSON
+ OGG_JSON,
+ AVRO
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 934c33cd28f..10638f8e0a0 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -24,6 +24,7 @@
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+import org.apache.seatunnel.format.avro.AvroSerializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
@@ -227,6 +228,8 @@ private static SerializationSchema createSerializationSchema(
return new DebeziumJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
+ case AVRO:
+ return new AvroSerializationSchema(rowType);
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index f1a7bd85b8a..3fbf6bb99bd 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -46,7 +46,8 @@ public OptionRule optionRule() {
MessageFormat.JSON,
MessageFormat.CANAL_JSON,
MessageFormat.TEXT,
- MessageFormat.OGG_JSON),
+ MessageFormat.OGG_JSON,
+ MessageFormat.AVRO),
Config.TOPIC)
.optional(
Config.KAFKA_CONFIG,
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index c0caff97c60..4eae67bf1cb 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -33,6 +33,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
@@ -235,6 +236,8 @@ private DeserializationSchema createDeserializationSchema(
case DEBEZIUM_JSON:
boolean includeSchema = readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
return new DebeziumJsonDeserializationSchema(seaTunnelRowType, true, includeSchema);
+ case AVRO:
+ return new AvroDeserializationSchema(seaTunnelRowType);
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 310d0618335..5bbf86c50c3 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -219,8 +219,8 @@ The text of each license is the standard Apache 2.0 license.
(Apache License 2.0) aircompressor (io.airlift:aircompressor:0.10 - http://github.com/airlift/aircompressor)
(Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.11.0 - https://yetus.apache.org/audience-annotations)
- (The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.8.2 - http://avro.apache.org)
- (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
+ (The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.11.1 - http://avro.apache.org)
+ (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/)
(Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.20 - https://commons.apache.org/proper/commons-compress/)
(The Apache Software License, Version 2.0) Commons Lang (commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
diff --git a/seatunnel-dist/release-docs/NOTICE b/seatunnel-dist/release-docs/NOTICE
index 4bed2d4be19..1c6a43afe3f 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -49,16 +49,16 @@ Apache Avro NOTICE
========================================================================
Apache Avro
-Copyright 2010-2015 The Apache Software Foundation
+Copyright 2010-2019 The Apache Software Foundation
This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
+The Apache Software Foundation (https://www.apache.org/).
NUnit license acknowledgement:
| Portions Copyright © 2002-2012 Charlie Poole or Copyright © 2002-2004 James
| W. Newkirk, Michael C. Two, Alexei A. Vorontsov or Copyright © 2000-2002
-| Philip A. Craig
+| Philip A. Craig
Based upon the representations of upstream licensors, it is understood that
portions of the mapreduce API included in the Java implementation are licensed
@@ -74,7 +74,7 @@ is:
| (the "License"); you may not use this file except in compliance
| with the License. You may obtain a copy of the License at
|
-| http://www.apache.org/licenses/LICENSE-2.0
+| https://www.apache.org/licenses/LICENSE-2.0
|
| Unless required by applicable law or agreed to in writing, software
| distributed under the License is distributed on an "AS IS" BASIS,
@@ -85,7 +85,7 @@ is:
The Odiago NOTICE at the time of the contribution:
| This product includes software developed by Odiago, Inc.
-| (http://www.wibidata.com).
+| (https://www.wibidata.com).
Apache Ivy includes the following in its NOTICE file:
@@ -93,18 +93,18 @@ Apache Ivy includes the following in its NOTICE file:
| Copyright 2007-2010 The Apache Software Foundation
|
| This product includes software developed by
-| The Apache Software Foundation (http://www.apache.org/).
+| The Apache Software Foundation (https://www.apache.org/).
|
| Portions of Ivy were originally developed by
| Jayasoft SARL (http://www.jayasoft.fr/)
| and are licensed to the Apache Software Foundation under the
| "Software Grant License Agreement"
|
-| SSH and SFTP support is provided by the JCraft JSch package,
+| SSH and SFTP support is provided by the JCraft JSch package,
| which is open source software, available under
-| the terms of a BSD style license.
+| the terms of a BSD style license.
| The original software and related information is available
-| at http://www.jcraft.com/jsch/.
+| at http://www.jcraft.com/jsch/.
Apache Log4Net includes the following in its NOTICE file:
@@ -112,7 +112,16 @@ Apache Log4Net includes the following in its NOTICE file:
| Copyright 2004-2015 The Apache Software Foundation
|
| This product includes software developed at
-| The Apache Software Foundation (http://www.apache.org/).
+| The Apache Software Foundation (https://www.apache.org/).
+
+csharp reflect serializers were contributed by Pitney Bowes Inc.
+
+| Copyright 2019 Pitney Bowes Inc.
+| Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
+| You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0.
+| Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and limitations under the License.
========================================================================
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index b75c55153ea..0bf222bf87d 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -34,6 +34,7 @@
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -292,6 +293,30 @@ public void testSourceKafkaStartConfig(TestContainer container)
testKafkaGroupOffsetsToConsole(container);
}
+ @TestTemplate
+ @DisabledOnContainer(TestContainerId.SPARK_2_4)
+ public void testFakeSourceToKafkaAvroFormat(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/avro/fake_source_to_kafka_avro_format.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(TestContainerId.SPARK_2_4)
+ public void testKafkaAvroToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ "test_avro_topic",
+ SEATUNNEL_ROW_TYPE,
+ MessageFormat.AVRO,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ Container.ExecResult execResult = container.executeJob("/avro/kafka_avro_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
public void testKafkaLatestToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
new file mode 100644
index 00000000000..c6f5d6944f6
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic"
+ format = avro
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
new file mode 100644
index 00000000000..3fc1a57c3d7
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ format = avro
+ format_error_handle_way = skip
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "kafka_table"
+ }
+ Assert {
+ source_table_name = "kafka_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml
index 7fc09b356a0..a330e9b0e05 100644
--- a/seatunnel-formats/pom.xml
+++ b/seatunnel-formats/pom.xml
@@ -31,6 +31,7 @@
seatunnel-format-text
seatunnel-format-compatible-debezium-json
seatunnel-format-compatible-connect-json
+ seatunnel-format-avro
diff --git a/seatunnel-formats/seatunnel-format-avro/pom.xml b/seatunnel-formats/seatunnel-format-avro/pom.xml
new file mode 100644
index 00000000000..513b0fa0f4a
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-avro/pom.xml
@@ -0,0 +1,46 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-formats
+ ${revision}
+
+
+ seatunnel-format-avro
+ SeaTunnel : Formats : Avro
+
+
+ 1.11.1
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ ${project.version}
+ provided
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
+
+
+
diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
new file mode 100644
index 00000000000..b682a8e6431
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+
+import java.io.IOException;
+
+public class AvroDeserializationSchema implements DeserializationSchema {
+
+ private static final long serialVersionUID = -7907358485475741366L;
+
+ private final SeaTunnelRowType rowType;
+ private final AvroToRowConverter converter;
+
+ public AvroDeserializationSchema(SeaTunnelRowType rowType) {
+ this.rowType = rowType;
+ this.converter = new AvroToRowConverter(rowType);
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(message, null);
+ GenericRecord record = this.converter.getReader().read(null, decoder);
+ return converter.converter(record, rowType);
+ }
+
+ @Override
+ public SeaTunnelDataType getProducedType() {
+ return this.rowType;
+ }
+}
diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
new file mode 100644
index 00000000000..3d9a828bf74
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class AvroSerializationSchema implements SerializationSchema {
+
+ private static final long serialVersionUID = 4438784443025715370L;
+
+ private final ByteArrayOutputStream out;
+ private final BinaryEncoder encoder;
+ private final RowToAvroConverter converter;
+ private final DatumWriter writer;
+
+ public AvroSerializationSchema(SeaTunnelRowType rowType) {
+ this.out = new ByteArrayOutputStream();
+ this.encoder = EncoderFactory.get().binaryEncoder(out, null);
+ this.converter = new RowToAvroConverter(rowType);
+ this.writer = this.converter.getWriter();
+ }
+
+ @Override
+ public byte[] serialize(SeaTunnelRow element) {
+ GenericRecord record = converter.convertRowToGenericRecord(element);
+ try {
+ out.reset();
+ writer.write(record, encoder);
+ encoder.flush();
+ return out.toByteArray();
+ } catch (IOException e) {
+ throw new SeaTunnelAvroFormatException(
+ AvroFormatErrorCode.SERIALIZATION_ERROR,
+ "Serialization error on record : " + element);
+ }
+ }
+}
diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
new file mode 100644
index 00000000000..989087ee570
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class AvroToRowConverter implements Serializable {
+
+ private static final long serialVersionUID = 8177020083886379563L;
+
+ private DatumReader reader = null;
+ private Schema schema;
+
+ public AvroToRowConverter(SeaTunnelRowType rowType) {
+ schema = SeaTunnelRowTypeToAvroSchemaConverter.buildAvroSchemaWithRowType(rowType);
+ }
+
+ public DatumReader getReader() {
+ if (reader == null) {
+ reader = createReader();
+ }
+ return reader;
+ }
+
+ private DatumReader createReader() {
+ GenericDatumReader datumReader = new GenericDatumReader<>(schema, schema);
+ datumReader.getData().addLogicalTypeConversion(new Conversions.DecimalConversion());
+ datumReader.getData().addLogicalTypeConversion(new TimeConversions.DateConversion());
+ datumReader
+ .getData()
+ .addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
+ return datumReader;
+ }
+
+ public SeaTunnelRow converter(GenericRecord record, SeaTunnelRowType rowType) {
+ String[] fieldNames = rowType.getFieldNames();
+
+ Object[] values = new Object[fieldNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (record.getSchema().getField(fieldNames[i]) == null) {
+ values[i] = null;
+ continue;
+ }
+ values[i] =
+ convertField(
+ rowType.getFieldType(i),
+ record.getSchema().getField(fieldNames[i]),
+ record.get(fieldNames[i]));
+ }
+ return new SeaTunnelRow(values);
+ }
+
+ private Object convertField(SeaTunnelDataType> dataType, Schema.Field field, Object val) {
+ switch (dataType.getSqlType()) {
+ case MAP:
+ case STRING:
+ case BOOLEAN:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case NULL:
+ case BYTES:
+ case DATE:
+ case DECIMAL:
+ case TIMESTAMP:
+ return val;
+ case TINYINT:
+ Class> typeClass = dataType.getTypeClass();
+ if (typeClass == Byte.class) {
+ Integer integer = (Integer) val;
+ return integer.byteValue();
+ }
+ return val;
+ case ARRAY:
+ BasicType> basicType = ((ArrayType, ?>) dataType).getElementType();
+ List