Skip to content

Commit

Permalink
[INLONG-9867][Manager] Unified configuration process for standalone a…
Browse files Browse the repository at this point in the history
…nd sortflink (#9868)
  • Loading branch information
fuweng11 authored May 6, 2024
1 parent ba475c1 commit 22d6859
Show file tree
Hide file tree
Showing 70 changed files with 2,476 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
@Builder
public class SortClusterConfig implements Serializable {

private String clusterTag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.inlong.common.pojo.sort.node.NodeConfig;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
@Builder
public class SortTaskConfig implements Serializable {

private String sortTaskName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;
import java.util.Map;

@Data
@Builder
public class DataFlowConfig implements Serializable {

private String dataflowId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.common.pojo.sort.dataflow;

import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;

Expand All @@ -32,5 +33,6 @@ public class SourceConfig implements Serializable {
private String subscription;
private String encodingType;
private DeserializationConfig deserializationConfig;
private DataTypeConfig dataTypeConfig;
private List<FieldConfig> fieldConfigs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sort.dataflow.deserialization;
package org.apache.inlong.common.pojo.sort.dataflow.dataType;

import lombok.Data;

@Data
public class CsvDeserializationConfig implements DeserializationConfig {
public class CsvConfig implements DataTypeConfig {

private char delimiter;
private Character escapeChar;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sort.dataflow.dataType;

import org.apache.inlong.common.constant.DeserializationType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;

import java.io.Serializable;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = CsvConfig.class, name = DeserializationType.CSV),
@JsonSubTypes.Type(value = KvConfig.class, name = DeserializationType.KV),
})
public interface DataTypeConfig extends Serializable {

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sort.dataflow.deserialization;
package org.apache.inlong.common.pojo.sort.dataflow.dataType;

import lombok.Data;

@Data
public class KvDeserializationConfig implements DeserializationConfig {
public class KvConfig implements DataTypeConfig {

private char entrySplitter;
private char kvSplitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
@JsonSubTypes({
@JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name = DeserializationType.INLONG_MSG),
@JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name = DeserializationType.INLONG_MSG_PB),
@JsonSubTypes.Type(value = CsvDeserializationConfig.class, name = DeserializationType.CSV),
@JsonSubTypes.Type(value = KvDeserializationConfig.class, name = DeserializationType.KV),
})
public interface DeserializationConfig extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@
public class InlongMsgDeserializationConfig implements DeserializationConfig {

private String streamId;
private DeserializationConfig innerDeserializationConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@
@Data
public class InlongMsgPbDeserialiationConfig implements DeserializationConfig {

private final String compressionType;
private DeserializationConfig innerDeserializationConfig;
private String compressionType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.pojo.sortstandalone;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SortConfigResponse {

public static final int SUCC = 0;
public static final int NO_UPDATE = 1;
public static final int FAIL = -1;
public static final int REQUEST_PARAMS_ERROR = -101;

String msg;
int code;
String md5;
String data;

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ public enum ProcessName {
/**
* Delete inlong stream process
*/
DELETE_STREAM_RESOURCE("Delete Stream");
DELETE_STREAM_RESOURCE("Delete Stream"),

/**
* Create cluster resource process
*/
CREATE_CLUSTER_RESOURCE("Create Cluster");

private final String displayName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.dao.entity;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* Cluster config entity, including cluster tag, etc.
*/
@Data
public class ClusterConfigEntity implements Serializable {

private static final long serialVersionUID = 1L;
private Integer id;
private String clusterTag;
private String clusterType;
private String configParams;
private Integer isDeleted;
private String creator;
private String modifier;
private Date createTime;
private Date modifyTime;
private Integer version;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.dao.entity;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* Sort config entity, including sink type, sink id, etc.
*/
@Data
public class SortConfigEntity implements Serializable {

private static final long serialVersionUID = 1L;
private Integer id;
private Integer sinkId;
private String sinkType;
private String inlongClusterTag;
private String inlongClusterName;
private String sortTaskName;
private String dataNodeName;
private String configParams;
private Integer isDeleted;
private String creator;
private String modifier;
private Date createTime;
private Date modifyTime;
private Integer version;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.dao.mapper;

import org.apache.inlong.manager.common.tenant.MultiTenantQuery;
import org.apache.inlong.manager.dao.entity.ClusterConfigEntity;

import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.mapping.ResultSetType;
import org.springframework.stereotype.Repository;

@Repository
public interface ClusterConfigEntityMapper {

int insert(ClusterConfigEntity record);

ClusterConfigEntity selectByPrimaryKey(Integer id);

ClusterConfigEntity selectByClusterTag(@Param("clusterTag") String clusterTag);

int updateByIdSelective(ClusterConfigEntity record);

@MultiTenantQuery(with = false)
@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
Cursor<ClusterConfigEntity> selectAllClusterConfigs();

boolean deleteByClusterTag(@Param("clusterTag") String clusterTag);

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ List<Map<String, Object>> countGroupByUser(@Param(value = "username") String use

List<InlongGroupEntity> selectByClusterTag(@Param(value = "inlongClusterTag") String inlongClusterTag);

@MultiTenantQuery(with = false)
List<InlongGroupEntity> selectByClusterTagWithoutTenant(@Param(value = "inlongClusterTag") String inlongClusterTag);

List<InlongGroupEntity> selectByTopicRequest(InlongGroupTopicRequest request);

List<InlongGroupEntity> selectByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);

/**
* Select all group info for sort sdk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ InlongGroupExtEntity selectByUniqueKey(
@Param("inlongGroupId") String inlongGroupId,
@Param("keyName") String keyName);

List<String> selectGroupIdByKeyNameAndValue(@Param("keyName") String keyName, @Param("keyValue") String keyValue);

@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
Cursor<InlongGroupExtEntity> selectByKeyName(@Param("keyName") String keyName);

Expand Down
Loading

0 comments on commit 22d6859

Please sign in to comment.