Skip to content

Commit

Permalink
[INLONG-10716][SDK] Inlong Transform usage optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Jul 24, 2024
1 parent 4425586 commit 27d8b3f
Show file tree
Hide file tree
Showing 19 changed files with 172 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class DataFlowConfig implements Serializable {
private String auditTag;
private String inlongGroupId;
private String inlongStreamId;
private String transformSql;
private SourceConfig sourceConfig;
private SinkConfig sinkConfig;
private Map<String, Object> properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
@Data
public class CsvConfig implements DataTypeConfig {

private char delimiter;
private Character delimiter;
private Character escapeChar;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
@Data
public class KvConfig implements DataTypeConfig {

private char entrySplitter;
private char kvSplitter;
private Character entrySplitter;
private Character kvSplitter;
private Character escapeChar;
private Character lineSeparator;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public class CsvSourceDecoder implements SourceDecoder<String> {

public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
this.sourceInfo = sourceInfo;
if (!StringUtils.isBlank(sourceInfo.getDelimiter())) {
this.delimiter = sourceInfo.getDelimiter().charAt(0);
if (sourceInfo.getDelimiter() != null) {
this.delimiter = sourceInfo.getDelimiter();
}
if (!StringUtils.isBlank(sourceInfo.getEscapeChar())) {
this.escapeChar = sourceInfo.getEscapeChar().charAt(0);
if (sourceInfo.getEscapeChar() != null) {
this.escapeChar = sourceInfo.getEscapeChar();
}
if (!StringUtils.isBlank(sourceInfo.getCharset())) {
this.srcCharset = Charset.forName(sourceInfo.getCharset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
public class KvSourceDecoder implements SourceDecoder<String> {

protected KvSourceInfo sourceInfo;
private Character entryDelimiter = '&';
private Character kvDelimiter = '=';
private Character escapeChar = '\\';
private Character quoteChar = '\"';
private Character lineDelimiter = '\n';
private Charset srcCharset = Charset.defaultCharset();
private List<FieldInfo> fields;

Expand All @@ -43,6 +48,22 @@ public KvSourceDecoder(KvSourceInfo sourceInfo) {
if (!StringUtils.isBlank(sourceInfo.getCharset())) {
this.srcCharset = Charset.forName(sourceInfo.getCharset());
}
if (sourceInfo.getEntryDelimiter() != null) {
this.entryDelimiter = sourceInfo.getEntryDelimiter();
}
if (sourceInfo.getKvDelimiter() != null) {
this.kvDelimiter = sourceInfo.getKvDelimiter();
}
if (sourceInfo.getEscapeChar() != null) {
this.escapeChar = sourceInfo.getEscapeChar();
}
if (sourceInfo.getQuoteChar() != null) {
this.quoteChar = sourceInfo.getQuoteChar();
}
if (sourceInfo.getLineDelimiter() != null) {
this.lineDelimiter = sourceInfo.getLineDelimiter();
}

this.fields = sourceInfo.getFields();
}

Expand All @@ -54,7 +75,8 @@ public SourceData decode(byte[] srcBytes, Context context) {

@Override
public SourceData decode(String srcString, Context context) {
List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, '&', '=', '\\', '\"', '\n');
List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, entryDelimiter, kvDelimiter,
escapeChar, quoteChar, lineDelimiter);
KvSourceData sourceData = new KvSourceData();
if (CollectionUtils.isEmpty(fields)) {
for (Map<String, String> row : rowValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public class CsvSinkEncoder implements SinkEncoder<String> {

public CsvSinkEncoder(CsvSinkInfo sinkInfo) {
this.sinkInfo = sinkInfo;
if (!StringUtils.isBlank(sinkInfo.getDelimiter())) {
this.delimiter = sinkInfo.getDelimiter().charAt(0);
if (sinkInfo.getDelimiter() != null) {
this.delimiter = sinkInfo.getDelimiter();
}
if (!StringUtils.isBlank(sinkInfo.getEscapeChar())) {
this.escapeChar = sinkInfo.getEscapeChar().charAt(0);
if (sinkInfo.getDelimiter() != null) {
this.escapeChar = sinkInfo.getEscapeChar();
}
if (!StringUtils.isBlank(sinkInfo.getCharset())) {
this.sinkCharset = Charset.forName(sinkInfo.getCharset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class KvSinkEncoder implements SinkEncoder<String> {

protected KvSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();
private Character entryDelimiter = '&';
private Character kvDelimiter = '=';
private List<FieldInfo> fields;
private StringBuilder builder = new StringBuilder();

Expand All @@ -41,6 +43,12 @@ public KvSinkEncoder(KvSinkInfo sinkInfo) {
if (!StringUtils.isBlank(sinkInfo.getCharset())) {
this.sinkCharset = Charset.forName(sinkInfo.getCharset());
}
if (sinkInfo.getEntryDelimiter() != null) {
this.entryDelimiter = sinkInfo.getEntryDelimiter();
}
if (sinkInfo.getKvDelimiter() != null) {
this.kvDelimiter = sinkInfo.getKvDelimiter();
}
this.fields = sinkInfo.getFields();
}

Expand All @@ -55,13 +63,13 @@ public String encode(SinkData sinkData, Context context) {
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
builder.append(fieldName).append('=').append(fieldValue).append('&');
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
}
} else {
for (FieldInfo field : fields) {
String fieldName = field.getName();
String fieldValue = sinkData.getField(fieldName);
builder.append(fieldName).append('=').append(fieldValue).append('&');
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
}
}
return builder.substring(0, builder.length() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class CsvSinkInfo extends SinkInfo {

private String delimiter;
private String escapeChar;
private Character delimiter;
private Character escapeChar;
private List<FieldInfo> fields;

@JsonCreator
public CsvSinkInfo(
@JsonProperty("charset") String charset,
@JsonProperty("delimiter") String delimiter,
@JsonProperty("escapeChar") String escapeChar,
@JsonProperty("delimiter") Character delimiter,
@JsonProperty("escapeChar") Character escapeChar,
@JsonProperty("fields") List<FieldInfo> fields) {
super(SourceInfo.CSV, charset);
this.delimiter = delimiter;
Expand All @@ -55,15 +55,15 @@ public CsvSinkInfo(
* @return the delimiter
*/
@JsonProperty("delimiter")
public String getDelimiter() {
public Character getDelimiter() {
return delimiter;
}

/**
* set delimiter
* @param delimiter the delimiter to set
*/
public void setDelimiter(String delimiter) {
public void setDelimiter(Character delimiter) {
this.delimiter = delimiter;
}

Expand All @@ -72,15 +72,15 @@ public void setDelimiter(String delimiter) {
* @return the escapeChar
*/
@JsonProperty("escapeChar")
public String getEscapeChar() {
public Character getEscapeChar() {
return escapeChar;
}

/**
* set escapeChar
* @param escapeChar the escapeChar to set
*/
public void setEscapeChar(String escapeChar) {
public void setEscapeChar(Character escapeChar) {
this.escapeChar = escapeChar;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.experimental.SuperBuilder;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -28,17 +29,18 @@
* CsvSourceInfo
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@SuperBuilder
public class CsvSourceInfo extends SourceInfo {

private String delimiter;
private String escapeChar;
private Character delimiter;
private Character escapeChar;
private List<FieldInfo> fields;

@JsonCreator
public CsvSourceInfo(
@JsonProperty("charset") String charset,
@JsonProperty("delimiter") String delimiter,
@JsonProperty("escapeChar") String escapeChar,
@JsonProperty("delimiter") Character delimiter,
@JsonProperty("escapeChar") Character escapeChar,
@JsonProperty("fields") List<FieldInfo> fields) {
super(charset);
this.delimiter = delimiter;
Expand All @@ -55,15 +57,15 @@ public CsvSourceInfo(
* @return the delimiter
*/
@JsonProperty("delimiter")
public String getDelimiter() {
public Character getDelimiter() {
return delimiter;
}

/**
* set delimiter
* @param delimiter the delimiter to set
*/
public void setDelimiter(String delimiter) {
public void setDelimiter(Character delimiter) {
this.delimiter = delimiter;
}

Expand All @@ -72,15 +74,15 @@ public void setDelimiter(String delimiter) {
* @return the escapeChar
*/
@JsonProperty("escapeChar")
public String getEscapeChar() {
public Character getEscapeChar() {
return escapeChar;
}

/**
* set escapeChar
* @param escapeChar the escapeChar to set
*/
public void setEscapeChar(String escapeChar) {
public void setEscapeChar(Character escapeChar) {
this.escapeChar = escapeChar;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.experimental.SuperBuilder;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -28,8 +30,12 @@
* KvSinkInfo
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@Data
@SuperBuilder
public class KvSinkInfo extends SinkInfo {

private Character kvDelimiter;
private Character entryDelimiter;
private List<FieldInfo> fields;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.experimental.SuperBuilder;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -28,8 +30,15 @@
* KvSourceInfo
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@SuperBuilder
@Data
public class KvSourceInfo extends SourceInfo {

private Character entryDelimiter;
private Character kvDelimiter;
private Character escapeChar;
private Character quoteChar;
private Character lineDelimiter;
private List<FieldInfo> fields;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.experimental.SuperBuilder;

import java.util.Optional;

Expand All @@ -35,6 +36,7 @@
@Type(value = CsvSinkInfo.class, name = SourceInfo.CSV),
@Type(value = KvSinkInfo.class, name = SourceInfo.KV),
})
@SuperBuilder
public abstract class SinkInfo {

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.experimental.SuperBuilder;

import java.util.Optional;

Expand All @@ -34,6 +35,7 @@
@Type(value = PbSourceInfo.class, name = SourceInfo.PB),
@Type(value = JsonSourceInfo.class, name = SourceInfo.JSON),
})
@SuperBuilder
public abstract class SourceInfo {

public static final String NODE_PATH_SEPARTOR = ".";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public Object get(String key) {
return configuration.get(key);
}

public String getStringOrDefault(String key, String defaultValue) {
String str = getString(key);
if (str == null) {
return defaultValue;
}
return str;
}

public String getString(String key) {
Object obj = this.get(key);
if (obj != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ public static BigDecimal parseBigDecimal(Object value) {
}
}

public static String parseString(Object value) {
return value.toString();
}

/**
* compareValue
* @param left
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class TestTransformArithmeticFunctionsProcessor {
FieldInfo field = new FieldInfo();
field.setName("result");
dstFields.add(field);
csvSource = new CsvSourceInfo("UTF-8", "|", "\\", srcFields);
csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
kvSink = new KvSinkInfo("UTF-8", dstFields);
}

Expand Down
Loading

0 comments on commit 27d8b3f

Please sign in to comment.