Skip to content

Commit

Permalink
[INLONG-10684][SDK] Inlong transform supports context
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Jul 23, 2024
1 parent 8f03397 commit 35773d6
Show file tree
Hide file tree
Showing 41 changed files with 270 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.process.Context;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;

/**
* CsvSourceDecoder
Expand Down Expand Up @@ -53,19 +54,19 @@ public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
}

@Override
public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
public SourceData decode(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
return this.decode(srcString, extParams);
return this.decode(srcString, context);
}

@Override
public SourceData decode(String srcString, Map<String, Object> extParams) {
public SourceData decode(String srcString, Context context) {
String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter, escapeChar, '\"', '\n', true);
CsvSourceData sourceData = new CsvSourceData();
for (int i = 0; i < rowValues.length; i++) {
String[] fieldValues = rowValues[i];
sourceData.addRow();
if (fields == null || fields.size() == 0) {
if (CollectionUtils.isEmpty(fields)) {
for (int j = 0; j < fieldValues.length; j++) {
String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j + 1);
sourceData.putField(fieldName, fieldValues[j]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* JsonSourceDecoder
Expand Down Expand Up @@ -65,26 +66,26 @@ public JsonSourceDecoder(JsonSourceInfo sourceInfo) {
/**
* decode
* @param srcBytes
* @param extParams
* @param context
* @return
*/
@Override
public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
public SourceData decode(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
return this.decode(srcString, extParams);
return this.decode(srcString, context);
}

/**
* decode
* @param srcString
* @param extParams
* @param context
* @return
*/
@Override
public SourceData decode(String srcString, Map<String, Object> extParams) {
public SourceData decode(String srcString, Context context) {
JsonObject root = gson.fromJson(srcString, JsonObject.class);
JsonArray childRoot = null;
if (this.childNodes != null && this.childNodes.size() > 0) {
if (CollectionUtils.isEmpty(childNodes)) {
JsonElement current = root;
for (JsonNode node : childNodes) {
if (!current.isJsonObject()) {
Expand Down Expand Up @@ -117,7 +118,6 @@ public SourceData decode(String srcString, Map<String, Object> extParams) {
}
childRoot = current.getAsJsonArray();
}
SourceData sourceData = new JsonSourceData(root, childRoot);
return sourceData;
return new JsonSourceData(root, childRoot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.Charset;
Expand All @@ -45,19 +47,19 @@ public KvSourceDecoder(KvSourceInfo sourceInfo) {
}

@Override
public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
public SourceData decode(byte[] srcBytes, Context context) {
String srcString = new String(srcBytes, srcCharset);
return this.decode(srcString, extParams);
return this.decode(srcString, context);
}

@Override
public SourceData decode(String srcString, Map<String, Object> extParams) {
public SourceData decode(String srcString, Context context) {
List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, '&', '=', '\\', '\"', '\n');
KvSourceData sourceData = new KvSourceData();
if (fields == null || fields.size() == 0) {
if (CollectionUtils.isEmpty(fields)) {
for (Map<String, String> row : rowValues) {
sourceData.addRow();
row.forEach((k, v) -> sourceData.putField(k, v));
row.forEach(sourceData::putField);
}
return sourceData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.decode;

import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.process.Context;

import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
Expand Down Expand Up @@ -89,13 +90,13 @@ public PbSourceDecoder(PbSourceInfo sourceInfo) {
/**
* decode
* @param srcBytes
* @param extParams
* @param context
* @return
* @throws InvalidProtocolBufferException
*/
@SuppressWarnings("unchecked")
@Override
public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
public SourceData decode(byte[] srcBytes, Context context) {
try {
// decode
DynamicMessage.Builder builder = DynamicMessage.newBuilder(rootDesc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.inlong.sdk.transform.decode;

import java.util.Map;
import org.apache.inlong.sdk.transform.process.Context;

/**
* SourceDecoder
*/
public interface SourceDecoder<Input> {

SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
SourceData decode(byte[] srcBytes, Context context);

SourceData decode(Input input, Map<String, Object> extParams);
SourceData decode(Input input, Context context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.process.Context;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -57,7 +58,7 @@ public CsvSinkEncoder(CsvSinkInfo sinkInfo) {
* @return
*/
@Override
public String encode(SinkData sinkData) {
public String encode(SinkData sinkData, Context context) {
builder.delete(0, builder.length());
if (fields == null || fields.size() == 0) {
if (escapeChar == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.process.Context;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -49,7 +50,7 @@ public KvSinkEncoder(KvSinkInfo sinkInfo) {
* @return
*/
@Override
public String encode(SinkData sinkData) {
public String encode(SinkData sinkData, Context context) {
builder.delete(0, builder.length());
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.sdk.transform.encode;

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.process.Context;

import java.util.List;

Expand All @@ -26,7 +27,7 @@
*/
public interface SinkEncoder<Output> {

Output encode(SinkData sinkData);
Output encode(SinkData sinkData, Context context);

List<FieldInfo> getFields();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;

import java.util.Map;

/**
* TransformConfig
Expand All @@ -28,9 +31,19 @@ public class TransformConfig {
@JsonProperty("transformSql")
private String transformSql;

@JsonProperty("configuration")
private Map<String, Object> configuration;

@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql) {
this(transformSql, ImmutableMap.of());
}

@JsonCreator
public TransformConfig(@JsonProperty("transformSql") String transformSql,
@JsonProperty("configuration") Map<String, Object> configuration) {
this.transformSql = transformSql;
this.configuration = configuration;
}

/**
Expand All @@ -42,6 +55,11 @@ public String getTransformSql() {
return transformSql;
}

@JsonProperty("configuration")
public Map<String, Object> getConfiguration() {
return configuration;
}

/**
* set transformSql
* @param transformSql the transformSql to set
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Context {

private final Map<String, Object> configuration;
private final Map<String, Object> extParams;
private final Map<String, Object> runtimeParams;

public Context(Map<String, Object> configuration, Map<String, Object> extParams) {
this.configuration = configuration;
this.extParams = extParams;
this.runtimeParams = new ConcurrentHashMap<>();
}

public Object put(String key, Object value) {
return runtimeParams.put(key, value);
}

public Object get(String key) {
Object obj = runtimeParams.get(key);
if (obj != null) {
return obj;
}
obj = extParams.get(key);
if (obj != null) {
return obj;
}
return configuration.get(key);
}

public String getString(String key) {
Object obj = this.get(key);
if (obj != null) {
return obj.toString();
}
return null;
}

public Integer getInteger(String key) {
Object obj = this.get(key);
if (obj != null) {
return Integer.getInteger(obj.toString());
}
return null;
}

public Long getLong(String key) {
Object obj = this.get(key);
if (obj != null) {
return Long.getLong(obj.toString());
}
return null;
}

}
Loading

0 comments on commit 35773d6

Please sign in to comment.