Skip to content

Commit

Permalink
Refactor kafka-0.11 sink. See #41
Browse files Browse the repository at this point in the history
  • Loading branch information
chengscu committed Feb 13, 2020
1 parent 70a84bf commit d1caeb9
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 591 deletions.
24 changes: 19 additions & 5 deletions connectors/connector-kafka-0.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>alink</artifactId>
<artifactId>alink_connectors</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>0.1-SNAPSHOT</version>
</parent>
Expand Down Expand Up @@ -69,13 +69,27 @@
<!-- dependency on KAFKA -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_${alink.scala.major.version}</artifactId>
<artifactId>flink-connector-kafka-0.11_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${alink.scala.major.version}</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>

<!-- Declare kafka-junit4 dependency -->
<dependency>
<groupId>com.salesforce.kafka.test</groupId>
<artifactId>kafka-junit4</artifactId>
<version>3.2.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.alibaba.alink.operator.common.io.kafka011;

import com.alibaba.alink.operator.common.io.serde.RowToCsvSerialization;
import com.alibaba.alink.operator.common.io.serde.RowToJsonSerialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Kafka011SinkBuilder {
private String topic;
private String format;
private String fieldDelimiter;
private String[] fieldNames;
private TypeInformation<?>[] fieldTypes;
private Properties properties;

public void setTopic(String topic) {
this.topic = topic;
}

public void setFieldDelimiter(String fieldDelimiter) {
this.fieldDelimiter = fieldDelimiter;
}

public void setFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
}

public void setFieldTypes(TypeInformation<?>[] fieldTypes) {
this.fieldTypes = fieldTypes;
}

public void setFormat(String format) {
this.format = format;
}

public void setProperties(Properties properties) {
this.properties = properties;
}

public Kafka011SinkBuilder() {
}

public RichSinkFunction<Row> build() {
SerializationSchema<Row> serializationSchema;
if (format.equalsIgnoreCase("csv")) {
serializationSchema = new RowToCsvSerialization(fieldTypes, fieldDelimiter);
} else if (format.equalsIgnoreCase("json")) {
serializationSchema = new RowToJsonSerialization(fieldNames);
} else {
throw new IllegalArgumentException("Unknown format " + format);
}
return new FlinkKafkaProducer011<Row>(topic, serializationSchema, properties);
}
}
Loading

0 comments on commit d1caeb9

Please sign in to comment.