Skip to content

Commit

Permalink
[Improve][Doris Connector] Unified serialization method,Use RowToJson…
Browse files Browse the repository at this point in the history
…Converter and TextSerializationSchema (apache#7229)

* 1

* 1

* 1

* Update seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java

Co-authored-by: Jia Fan <fanjiaeminem@qq.com>

---------

Co-authored-by: gdliu3 <gdliu3@iflytek.com>
Co-authored-by: Jia Fan <fanjiaeminem@qq.com>
  • Loading branch information
3 people authored Jul 22, 2024
1 parent 4ec25f3 commit 4b3af9b
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -37,7 +35,7 @@ public static TypeConverter<BasicTypeDefine> getTypeConverter(@NonNull String do
|| dorisVersion.toLowerCase(Locale.ROOT).startsWith("selectdb-doris-2.")) {
return DorisTypeConverterV2.INSTANCE;
} else {
throw CommonError.unsupportedVersion(DorisConfig.IDENTIFIER, dorisVersion);
return DorisTypeConverterV2.INSTANCE;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,28 @@

package org.apache.seatunnel.connectors.doris.serialize;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;
import java.util.Arrays;
import java.util.List;

import static com.google.common.base.Preconditions.checkState;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.CSV;
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.JSON;
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.NULL_VALUE;

public class SeaTunnelRowSerializer extends SeaTunnelRowConverter implements DorisSerializer {
public class SeaTunnelRowSerializer implements DorisSerializer {
String type;
private ObjectMapper objectMapper;
private final SeaTunnelRowType seaTunnelRowType;
private final String fieldDelimiter;
private final boolean enableDelete;
Expand All @@ -51,48 +52,29 @@ public SeaTunnelRowSerializer(
this.seaTunnelRowType = seaTunnelRowType;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
if (JSON.equals(type)) {
objectMapper = new ObjectMapper();
}
}

@Override
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
String valString;
if (JSON.equals(type)) {
valString = buildJsonString(seaTunnelRow);
} else if (CSV.equals(type)) {
valString = buildCSVString(seaTunnelRow);
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
return valString.getBytes(StandardCharsets.UTF_8);
public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
throws IOException {

JsonSerializationSchema jsonSerializationSchema =
new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
ObjectMapper mapper = jsonSerializationSchema.getMapper();
mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
return jsonSerializationSchema.serialize(row);
}

public String buildJsonString(SeaTunnelRow row) throws IOException {
Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
throws IOException {

for (int i = 0; i < row.getFields().length; i++) {
Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
rowMap.put(seaTunnelRowType.getFieldName(i), value);
}
if (enableDelete) {
rowMap.put(LoadConstants.DORIS_DELETE_SIGN, parseDeleteSign(row.getRowKind()));
}
return objectMapper.writeValueAsString(rowMap);
}
TextSerializationSchema build =
TextSerializationSchema.builder()
.seaTunnelRowType(seaTunnelRowType)
.delimiter(fieldDelimiter)
.nullValue(NULL_VALUE)
.build();

public String buildCSVString(SeaTunnelRow row) throws IOException {
StringJoiner joiner = new StringJoiner(fieldDelimiter);
for (int i = 0; i < row.getFields().length; i++) {
Object field = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
String value = field != null ? field.toString() : NULL_VALUE;
joiner.add(value);
}
if (enableDelete) {
joiner.add(parseDeleteSign(row.getRowKind()));
}
return joiner.toString();
return build.serialize(row);
}

public String parseDeleteSign(RowKind rowKind) {
Expand All @@ -105,46 +87,40 @@ public String parseDeleteSign(RowKind rowKind) {
}
}

public static Builder builder() {
return new Builder();
}

/** Builder for RowDataSerializer. */
public static class Builder {
private SeaTunnelRowType seaTunnelRowType;
private String type;
private String fieldDelimiter;
private boolean deletable;

public Builder setType(String type) {
this.type = type;
return this;
}
@Override
public void open() throws IOException {}

public Builder setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
return this;
}
@Override
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {

public Builder setFieldDelimiter(String fieldDelimiter) {
this.fieldDelimiter = fieldDelimiter;
return this;
}
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
List<SeaTunnelDataType<?>> fieldTypes = Arrays.asList(seaTunnelRowType.getFieldTypes());

public Builder enableDelete(boolean deletable) {
this.deletable = deletable;
return this;
if (enableDelete) {
SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
seaTunnelRowEnableDelete.setField(
seaTunnelRow.getFields().length, parseDeleteSign(seaTunnelRow.getRowKind()));
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);
}

public SeaTunnelRowSerializer build() {
checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type));
return new SeaTunnelRowSerializer(type, seaTunnelRowType, fieldDelimiter, deletable);
if (JSON.equals(type)) {
return buildJsonString(
seaTunnelRow,
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
} else if (CSV.equals(type)) {
return buildCSVString(
seaTunnelRow,
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
}

@Override
public void open() throws IOException {}

@Override
public void close() throws IOException {}
}
Loading

0 comments on commit 4b3af9b

Please sign in to comment.