Skip to content

Commit

Permalink
[INLONG-10370][Manager] Support configuration of kV data format (#10371)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 authored Jun 7, 2024
1 parent 26927f5 commit e9cbc36
Show file tree
Hide file tree
Showing 15 changed files with 87 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public enum DataTypeEnum {

CSV("csv"),
KV("kv"),
AVRO("avro"),
JSON("json"),
CANAL("canal"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.common.util;

import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -274,4 +276,11 @@ public static long parseDateTime(String value) {
}
}

public static String parseChar(String charStr) {
if (StringUtils.isNumeric(charStr)) {
char numberChar = (char) Integer.parseInt(charStr);
charStr = Character.toString(numberChar);
}
return charStr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.util.StringUtil;
import org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.StreamField;
Expand All @@ -32,10 +33,10 @@
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.format.KvFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -90,24 +91,25 @@ default List<FieldInfo> parseStreamFieldInfos(List<StreamField> streamFields, St
* Parse format
*
* @param serializationType data serialization, support: csv, json, canal, avro, etc
* @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat}
* @param wrapType whether wrap content with {@link InLongMsgFormat}
* @param separatorStr the separator of data content
* @param kvSeparatorStr the kv separator
* @param escapeCharStr the escape char
* @param ignoreParseErrors whether ignore deserialization error data
* @return the format for serialized content
*/
default Format parsingFormat(
String serializationType,
String wrapType,
String separatorStr,
String kvSeparatorStr,
String escapeCharStr,
Boolean ignoreParseErrors) {
Format format;
DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
switch (dataType) {
case CSV:
if (StringUtils.isNumeric(separatorStr)) {
char dataSeparator = (char) Integer.parseInt(separatorStr);
separatorStr = Character.toString(dataSeparator);
}
separatorStr = StringUtil.parseChar(separatorStr);
CsvFormat csvFormat = new CsvFormat(separatorStr);
csvFormat.setIgnoreParseErrors(ignoreParseErrors);
format = csvFormat;
Expand All @@ -131,6 +133,12 @@ default Format parsingFormat(
case RAW:
format = new RawFormat();
break;
case KV:
separatorStr = StringUtil.parseChar(separatorStr);
kvSeparatorStr = StringUtil.parseChar(kvSeparatorStr);
escapeCharStr = StringUtil.parseChar(escapeCharStr);
format = new KvFormat(separatorStr, kvSeparatorStr, escapeCharStr, ignoreParseErrors, null, null, null);
break;
default:
throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
kafkaSource.getSerializationType(),
kafkaSource.getWrapType(),
kafkaSource.getDataSeparator(),
kafkaSource.getKvSeparator(),
kafkaSource.getDataEscapeChar(),
kafkaSource.getIgnoreParseError());

KafkaScanStartupMode startupMode = parseStartupMode(kafkaSource.getAutoOffsetReset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
Format format = parsingFormat(pulsarSource.getSerializationType(),
pulsarSource.getWrapType(),
pulsarSource.getDataSeparator(),
pulsarSource.getKvSeparator(),
pulsarSource.getDataEscapeChar(),
pulsarSource.getIgnoreParseError());

PulsarScanStartupMode startupMode = PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
source.getSerializationType(),
source.getWrapType(),
source.getDataSeparator(),
source.getKvSeparator(),
source.getDataEscapeChar(),
source.getIgnoreParseError());
Map<String, String> properties = parseProperties(source.getProperties());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class KafkaSource extends StreamSource {
@ApiModelProperty(value = "Data separator")
private String dataSeparator;

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import javax.validation.constraints.NotNull;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
Expand Down Expand Up @@ -91,10 +92,13 @@ public class KafkaSourceDTO {
private String primaryKey;

@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding;
private String dataEncoding = StandardCharsets.UTF_8.toString();

@ApiModelProperty(value = "Data separator")
private String dataSeparator;
private String dataSeparator = String.valueOf((int) '|');

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.nio.charset.StandardCharsets;

/**
* Kafka source request
*/
Expand Down Expand Up @@ -80,10 +78,13 @@ public class KafkaSourceRequest extends SourceRequest {
private String primaryKey;

@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding = StandardCharsets.UTF_8.toString();
private String dataEncoding;

@ApiModelProperty(value = "Data separator")
private String dataSeparator = String.valueOf((int) '|');
private String dataSeparator;

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class PulsarSource extends StreamSource {
@ApiModelProperty(value = "Data separator")
private String dataSeparator;

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import javax.validation.constraints.NotNull;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
Expand Down Expand Up @@ -64,10 +65,13 @@ public class PulsarSourceDTO {
private String primaryKey;

@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding;
private String dataEncoding = StandardCharsets.UTF_8.toString();

@ApiModelProperty(value = "Data separator")
private String dataSeparator;
private String dataSeparator = String.valueOf((int) '|');

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.nio.charset.StandardCharsets;

/**
* Pulsar source request
*/
Expand Down Expand Up @@ -61,10 +59,13 @@ public class PulsarSourceRequest extends SourceRequest {
private String primaryKey;

@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding = StandardCharsets.UTF_8.toString();
private String dataEncoding;

@ApiModelProperty(value = "Data separator")
private String dataSeparator = String.valueOf((int) '|');
private String dataSeparator;

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,18 @@ public class TubeMQSource extends StreamSource {
@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;

@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding;

@ApiModelProperty(value = "Data separator")
private String dataSeparator;

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;

/**
* The TubeMQ consumers use this streamId set to filter records reading from server.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import javax.validation.constraints.NotNull;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.TreeSet;

Expand All @@ -55,8 +56,17 @@ public class TubeMQSourceDTO {
@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;

@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding = StandardCharsets.UTF_8.toString();

@ApiModelProperty(value = "Data separator")
private String dataSeparator;
private String dataSeparator = String.valueOf((int) '|');

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;

@ApiModelProperty(value = "The message body wrap wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,18 @@ public class TubeMQSourceRequest extends SourceRequest {
@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;

@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
private String dataEncoding;

@ApiModelProperty(value = "Data separator")
private String dataSeparator;

@ApiModelProperty(value = "KV separator")
private String kvSeparator;

@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;

@ApiModelProperty(value = "The message body wrap wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;

Expand Down

0 comments on commit e9cbc36

Please sign in to comment.