From 076f894b201a4f9492c649df9f3bc48bf9087041 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Wed, 7 Aug 2024 15:40:00 +0800 Subject: [PATCH] [INLONG-10758][Manager] Support obtaining serialization configuration when wrapType is raw (#10759) --- .../common/constant/DeserializationType.java | 9 ++++--- .../DeserializationConfig.java | 1 + .../RawDeserializationConfig.java | 26 +++++++++++++++++++ .../message/RawMsgDeserializeOperator.java | 9 +++++++ .../src/test/java/SortConfig.conf | 4 +-- .../src/test/resources/SortConfig.conf | 4 +-- 6 files changed, 45 insertions(+), 8 deletions(-) create mode 100644 inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/RawDeserializationConfig.java diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java index 66999634fac..31c7fb26e42 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/DeserializationType.java @@ -19,8 +19,9 @@ public class DeserializationType { - public static final String INLONG_MSG = "INLONG_MSG"; - public static final String INLONG_MSG_PB = "INLONG_MSG_PB"; - public static final String CSV = "CSV"; - public static final String KV = "KV"; + public static final String INLONG_MSG = "inlong_msg"; + public static final String INLONG_MSG_PB = "inlong_msg_pb"; + public static final String RAW = "raw"; + public static final String CSV = "csv"; + public static final String KV = "kv"; } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java index b854af7c90d..4391ae7838a 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java @@ -28,6 +28,7 @@ @JsonSubTypes({ @JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name = DeserializationType.INLONG_MSG), @JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name = DeserializationType.INLONG_MSG_PB), + @JsonSubTypes.Type(value = RawDeserializationConfig.class, name = DeserializationType.RAW), }) public interface DeserializationConfig extends Serializable { diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/RawDeserializationConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/RawDeserializationConfig.java new file mode 100644 index 00000000000..c0d4c946168 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/RawDeserializationConfig.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.pojo.sort.dataflow.deserialization; + +import lombok.Data; + +@Data +public class RawDeserializationConfig implements DeserializationConfig { + + private String streamId; +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java index d55c7d6cada..d8ac5c8a9a4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java @@ -20,6 +20,8 @@ import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.common.enums.MessageWrapType; import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig; +import org.apache.inlong.common.pojo.sort.dataflow.deserialization.RawDeserializationConfig; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo; @@ -82,4 +84,11 @@ public List decodeMsg(InlongStreamInfo streamInfo, List