Skip to content

Commit

Permalink
[INLONG-9280][Manager] Support different size of extended fields of I…
Browse files Browse the repository at this point in the history
…nlongStream
  • Loading branch information
vernedeng committed Nov 14, 2023
1 parent f1ba307 commit 9cc42f4
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ public class ClsSinkDTO {
@ApiModelProperty("Cloud log service index tokenizer")
private String tokenizer;

@ApiModelProperty("contentOffset")
private Integer contentOffset = 0;

@ApiModelProperty("fieldOffset")
private Integer fieldOffset;

@ApiModelProperty("separator")
private String separator;

/**
* Get the dto instance from the request
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ElasticsearchSinkDTO {
private String indexNamePattern;

@ApiModelProperty("contentOffset")
private Integer contentOffset;
private Integer contentOffset = 0;

@ApiModelProperty("fieldOffset")
private Integer fieldOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,4 @@ public class ElasticsearchSinkRequest extends SinkRequest {
@ApiModelProperty("indexNamePattern")
private String indexNamePattern;

@ApiModelProperty("contentOffset")
private Integer contentOffset;

@ApiModelProperty("fieldOffset")
private Integer fieldOffset;

@ApiModelProperty("separator")
private String separator;

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "Predefined fields")
private String predefinedFields;

@ApiModelProperty(value = "Extended field size")
private Integer extendedFieldSize = 0;

/**
* Pack extended attributes into ExtParams
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;

@ApiModelProperty(value = "Extended field size")
private Integer extendedFieldSize = 0;

@ApiModelProperty(value = "Whether to ignore the parse errors of field value")
private Boolean ignoreParseError = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;

@ApiModelProperty(value = "Extended field size")
private Integer extendedFieldSize = 0;

@ApiModelProperty(value = "The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
private String wrapType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.node.DataNodeOperator;
Expand All @@ -37,7 +35,6 @@

import com.google.gson.Gson;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -281,7 +278,6 @@ private List<Map<String, String>> parseIdParams(List<StreamSinkEntity> streams,
StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType());
List<String> fields = fieldMap.get(streamSink.getId());
Map<String, String> params = operator.parse2IdParams(streamSink, fields, dataNodeInfo);
setFiledOffset(streamSink, params);
return params;
} catch (Exception e) {
LOGGER.error("fail to parse id params of groupId={}, streamId={} name={}, type={}}",
Expand All @@ -294,17 +290,6 @@ private List<Map<String, String>> parseIdParams(List<StreamSinkEntity> streams,
.collect(Collectors.toList());
}

private void setFiledOffset(StreamSinkEntity streamSink, Map<String, String> params) {

SortSourceStreamInfo sortSourceStreamInfo = allStreams.get(streamSink.getInlongGroupId())
.get(streamSink.getInlongStreamId());
InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject(
sortSourceStreamInfo.getExtParams(), InlongStreamExtParam.class);
if (ObjectUtils.anyNotNull(inlongStreamExtParam) && !inlongStreamExtParam.getUseExtendedFields()) {
params.put(FILED_OFFSET, String.valueOf(0));
}
}

private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) {
DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType());
return operator.parse2SinkParams(nodeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
Expand All @@ -36,6 +37,7 @@
import org.apache.inlong.manager.pojo.sink.cls.ClsSink;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -74,6 +76,15 @@ protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntit
ClsSinkRequest sinkRequest = (ClsSinkRequest) request;
try {
ClsSinkDTO dto = ClsSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams());

InlongStreamEntity stream = inlongStreamEntityMapper
.selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId());
dto.setSeparator(String.valueOf((char) (Integer.parseInt(stream.getDataSeparator()))));

InlongStreamExtParam streamExt =
JsonUtils.parseObject(stream.getExtParams(), InlongStreamExtParam.class);
dto.setFieldOffset(streamExt.getExtendedFieldSize());

targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
Expand Down Expand Up @@ -116,16 +127,16 @@ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<Stri
DataNodeInfo dataNodeInfo) {
Map<String, String> params = super.parse2IdParams(streamSink, fields, dataNodeInfo);
ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(streamSink.getExtParams(), ClsSinkDTO.class);
params.put(TOPIC_ID, clsSinkDTO.getTopicId());
params.computeIfAbsent(TOPIC_ID, k -> clsSinkDTO.getTopicId());
ClsDataNodeInfo clsDataNodeInfo = (ClsDataNodeInfo) dataNodeInfo;
params.put(SECRET_ID, clsDataNodeInfo.getSendSecretId());
params.put(SECRET_KEY, clsDataNodeInfo.getSendSecretKey());
params.put(END_POINT, clsDataNodeInfo.getEndpoint());
params.computeIfAbsent(SECRET_ID, k -> clsDataNodeInfo.getSendSecretId());
params.computeIfAbsent(SECRET_KEY, k -> clsDataNodeInfo.getSendSecretKey());
params.computeIfAbsent(END_POINT, k -> clsDataNodeInfo.getEndpoint());
StringBuilder fieldNames = new StringBuilder();
for (String field : fields) {
fieldNames.append(field).append(InlongConstants.BLANK);
}
params.put(KEY_FIELDS, fieldNames.toString());
params.computeIfAbsent(KEY_FIELDS, k -> fieldNames.toString());
return params;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
Expand All @@ -32,6 +34,7 @@
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -78,6 +81,14 @@ protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntit
try {
ElasticsearchSinkDTO dto = ElasticsearchSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams());

InlongStreamEntity stream = inlongStreamEntityMapper
.selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId());
dto.setSeparator(String.valueOf((char) (Integer.parseInt(stream.getDataSeparator()))));

InlongStreamExtParam streamExt =
JsonUtils.parseObject(stream.getExtParams(), InlongStreamExtParam.class);
dto.setFieldOffset(streamExt.getExtendedFieldSize());

targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
Expand Down Expand Up @@ -108,7 +119,7 @@ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<Stri
for (String field : fields) {
sb.append(field).append(" ");
}
idParams.put(KEY_FIELDS, sb.toString());
idParams.computeIfAbsent(KEY_FIELDS, k -> sb.toString());
return idParams;
}

Expand Down

0 comments on commit 9cc42f4

Please sign in to comment.