diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml index 75e56237b24..4159e8783aa 100644 --- a/seatunnel-connectors-v2/connector-kafka/pom.xml +++ b/seatunnel-connectors-v2/connector-kafka/pom.xml @@ -41,6 +41,11 @@ kafka-clients ${kafka.client.version} + + org.apache.seatunnel + seatunnel-format-json + ${project.version} + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index ee1492044e6..d48d12cf646 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -31,7 +31,7 @@ public class Config { /** * The server address of kafka cluster. */ - public static final String BOOTSTRAP_SERVER = "bootstrap.server"; + public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; public static final String KAFKA_CONFIG_PREFIX = "kafka."; diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java index ad9a24ef27a..e80a7d306b0 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java @@ -19,32 +19,22 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.Common; -import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.format.json.JsonSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; -import java.util.HashMap; -import java.util.Map; - -public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer { +public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer { private final String topic; - private final SeaTunnelRowType seaTunnelRowType; + private final JsonSerializationSchema jsonSerializationSchema; public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType seaTunnelRowType) { this.topic = topic; - this.seaTunnelRowType = seaTunnelRowType; + this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType); } @Override - public ProducerRecord serializeRow(SeaTunnelRow row) { - Map map = new HashMap<>(Common.COLLECTION_SIZE); - String[] fieldNames = seaTunnelRowType.getFieldNames(); - Object[] fields = row.getFields(); - for (int i = 0; i < fieldNames.length; i++) { - map.put(fieldNames[i], fields[i]); - } - return new ProducerRecord<>(topic, null, JsonUtils.toJsonString(map)); + public ProducerRecord serializeRow(SeaTunnelRow row) { + return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row)); } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java index b3a20d0b152..3f71c308525 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java @@ -33,7 +33,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import java.util.List; import java.util.Optional; @@ -51,19 +51,18 @@ public class KafkaSinkWriter implements SinkWriter kafkaProducerSender; + private final KafkaProduceSender kafkaProducerSender; + private final SeaTunnelRowSerializer seaTunnelRowSerializer; private static final int PREFIX_RANGE = 10000; // check config @Override public void write(SeaTunnelRow element) { - ProducerRecord producerRecord = seaTunnelRowSerializer.serializeRow(element); + ProducerRecord producerRecord = seaTunnelRowSerializer.serializeRow(element); kafkaProducerSender.send(producerRecord); } - private final SeaTunnelRowSerializer seaTunnelRowSerializer; - public KafkaSinkWriter( SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, @@ -124,19 +123,19 @@ public void close() { private Properties getKafkaProperties(Config pluginConfig) { Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig, - org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, true); + org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, false); Properties kafkaProperties = new Properties(); kafkaConfig.entrySet().forEach(entry -> { kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped()); }); kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); return kafkaProperties; } // todo: parse the target field from config - private SeaTunnelRowSerializer getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) { + private SeaTunnelRowSerializer getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) { return new DefaultSeaTunnelRowSerializer(pluginConfig.getString("topics"), seaTunnelRowType); } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java index 6f6b921d89d..ee942aaefc7 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java @@ -27,7 +27,7 @@ public class ConsumerMetadata implements Serializable { private String topic; private boolean isPattern = false; - private String bootstrapServer; + private String bootstrapServers; private Properties properties; private String consumerGroup; private boolean commitOnCheckpoint = false; @@ -56,12 +56,12 @@ public void setPattern(boolean pattern) { isPattern = pattern; } - public String getBootstrapServer() { - return bootstrapServer; + public String getBootstrapServers() { + return bootstrapServers; } - public void setBootstrapServer(String bootstrapServer) { - this.bootstrapServer = bootstrapServer; + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; } public Properties getProperties() { diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index 62314b87a63..a6b6199f5f0 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.source; -import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVER; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN; @@ -67,7 +67,7 @@ public String getPluginName() { @Override public void prepare(Config config) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVER); + CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVERS); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } @@ -75,7 +75,7 @@ public void prepare(Config config) throws PrepareFailException { if (config.hasPath(PATTERN)) { this.metadata.setPattern(config.getBoolean(PATTERN)); } - this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER)); + this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS)); this.metadata.setProperties(new Properties()); if (config.hasPath(CONSUMER_GROUP)) { diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java index d4272e4a3fd..5a53aae3b2e 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java @@ -73,7 +73,7 @@ public class KafkaSourceReader implements SourceReader { private static final String CLIENT_ID_PREFIX = "seatunnel"; @@ -119,7 +121,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { private AdminClient initAdminClient(Properties properties) { Properties props = new Properties(properties); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.metadata.getBootstrapServer()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.metadata.getBootstrapServers()); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-admin-client-" + this.hashCode()); return AdminClient.create(props); } @@ -133,6 +135,8 @@ private Set getTopicInfo() throws ExecutionException, Interrup } else { topics = Arrays.asList(this.metadata.getTopic().split(",")); } + log.info("Discovered topics: {}", topics); + Collection partitions = adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> t.partitions().stream() .map(p -> new TopicPartition(t.name(), p.partition()))).collect(Collectors.toSet());