Skip to content

Commit

Permalink
[Improve][Connector-V2] Fix kafka connector (apache#2745)
Browse files Browse the repository at this point in the history
* Fix SeaTunnelRow convert to json format error
* Rename `bootstrap.server` to `bootstrap.servers` in kafka source config
* Remove `kafka.` prefix in sink producer configuration key
  • Loading branch information
hailin0 authored and TyrantLucifer committed Sep 18, 2022
1 parent 12a1fd5 commit 84fc245
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 36 deletions.
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class Config {
/**
* The server address of kafka cluster.
*/
public static final String BOOTSTRAP_SERVER = "bootstrap.server";
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";

public static final String KAFKA_CONFIG_PREFIX = "kafka.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,22 @@

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<String, String> {
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {

private final String topic;
private final SeaTunnelRowType seaTunnelRowType;
private final JsonSerializationSchema jsonSerializationSchema;

public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType seaTunnelRowType) {
this.topic = topic;
this.seaTunnelRowType = seaTunnelRowType;
this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType);
}

@Override
public ProducerRecord<String, String> serializeRow(SeaTunnelRow row) {
Map<Object, Object> map = new HashMap<>(Common.COLLECTION_SIZE);
String[] fieldNames = seaTunnelRowType.getFieldNames();
Object[] fields = row.getFields();
for (int i = 0; i < fieldNames.length; i++) {
map.put(fieldNames[i], fields[i]);
}
return new ProducerRecord<>(topic, null, JsonUtils.toJsonString(map));
public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.util.List;
import java.util.Optional;
Expand All @@ -51,19 +51,18 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
private String transactionPrefix;
private long lastCheckpointId = 0;

private final KafkaProduceSender<String, String> kafkaProducerSender;
private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;

private static final int PREFIX_RANGE = 10000;

// check config
@Override
public void write(SeaTunnelRow element) {
ProducerRecord<String, String> producerRecord = seaTunnelRowSerializer.serializeRow(element);
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
kafkaProducerSender.send(producerRecord);
}

private final SeaTunnelRowSerializer<String, String> seaTunnelRowSerializer;

public KafkaSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
Expand Down Expand Up @@ -124,19 +123,19 @@ public void close() {

private Properties getKafkaProperties(Config pluginConfig) {
Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, true);
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, false);
Properties kafkaProperties = new Properties();
kafkaConfig.entrySet().forEach(entry -> {
kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
});
kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
return kafkaProperties;
}

// todo: parse the target field from config
private SeaTunnelRowSerializer<String, String> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString("topics"), seaTunnelRowType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ConsumerMetadata implements Serializable {

private String topic;
private boolean isPattern = false;
private String bootstrapServer;
private String bootstrapServers;
private Properties properties;
private String consumerGroup;
private boolean commitOnCheckpoint = false;
Expand Down Expand Up @@ -56,12 +56,12 @@ public void setPattern(boolean pattern) {
isPattern = pattern;
}

public String getBootstrapServer() {
return bootstrapServer;
public String getBootstrapServers() {
return bootstrapServers;
}

public void setBootstrapServer(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public Properties getProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
Expand Down Expand Up @@ -67,15 +67,15 @@ public String getPluginName() {

@Override
public void prepare(Config config) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVER);
CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVERS);
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
this.metadata.setTopic(config.getString(TOPIC));
if (config.hasPath(PATTERN)) {
this.metadata.setPattern(config.getBoolean(PATTERN));
}
this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER));
this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS));
this.metadata.setProperties(new Properties());

if (config.hasPath(CONSUMER_GROUP)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource

@Override
public void open() {
this.consumer = initConsumer(this.metadata.getBootstrapServer(), this.metadata.getConsumerGroup(),
this.consumer = initConsumer(this.metadata.getBootstrapServers(), this.metadata.getConsumerGroup(),
this.metadata.getProperties(), !this.metadata.isCommitOnCheckpoint());
isRunning = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -40,6 +41,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

@Slf4j
public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> {

private static final String CLIENT_ID_PREFIX = "seatunnel";
Expand Down Expand Up @@ -119,7 +121,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {

private AdminClient initAdminClient(Properties properties) {
Properties props = new Properties(properties);
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.metadata.getBootstrapServer());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.metadata.getBootstrapServers());
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + "-enumerator-admin-client-" + this.hashCode());
return AdminClient.create(props);
}
Expand All @@ -133,6 +135,8 @@ private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, Interrup
} else {
topics = Arrays.asList(this.metadata.getTopic().split(","));
}
log.info("Discovered topics: {}", topics);

Collection<TopicPartition> partitions =
adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> t.partitions().stream()
.map(p -> new TopicPartition(t.name(), p.partition()))).collect(Collectors.toSet());
Expand Down

0 comments on commit 84fc245

Please sign in to comment.