diff --git a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/source/KuduInputFormat.java b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/source/KuduInputFormat.java index d54227f7de..1856c58bf4 100644 --- a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/source/KuduInputFormat.java +++ b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/source/KuduInputFormat.java @@ -23,6 +23,7 @@ import com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter; import com.dtstack.chunjun.connector.kudu.converter.KuduRawTypeConverter; import com.dtstack.chunjun.connector.kudu.util.KuduUtil; +import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.source.format.BaseRichInputFormat; import com.dtstack.chunjun.throwable.ReadRecordException; import com.dtstack.chunjun.util.TableUtil; @@ -86,15 +87,13 @@ protected void openInternal(InputSplit inputSplit) throws IOException { KuduInputSplit kuduTableSplit = (KuduInputSplit) inputSplit; scanner = KuduScanToken.deserializeIntoScanner(kuduTableSplit.getToken(), client); - List columnNames = new ArrayList<>(); - List fieldConfList = sourceConf.getColumn(); - fieldConfList.forEach(field -> columnNames.add(field.getName())); - RowType rowType = TableUtil.createRowType(fieldConfList, KuduRawTypeConverter::apply); - - setRowConverter( - rowConverter == null - ? new KuduColumnConverter(rowType, columnNames) - : rowConverter); + if (rowConverter == null) { + List columnNames = new ArrayList<>(); + List fieldConfList = sourceConf.getColumn(); + fieldConfList.forEach(field -> columnNames.add(field.getName())); + RowType rowType = TableUtil.createRowType(fieldConfList, KuduRawTypeConverter::apply); + setRowConverter(new KuduColumnConverter(rowType, columnNames)); + } } @Override @@ -157,4 +156,8 @@ public void setSourceConf(KuduSourceConf sourceConf) { public KuduSourceConf getSourceConf() { return sourceConf; } + + public AbstractRowConverter getRowConverter() { + return rowConverter; + } } diff --git a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/source/KuduInputFormatBuilder.java b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/source/KuduInputFormatBuilder.java index 9f757626b6..0342052527 100644 --- a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/source/KuduInputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/source/KuduInputFormatBuilder.java @@ -50,7 +50,9 @@ protected void checkFormat() { StringBuilder sb = new StringBuilder(256); if (columns == null || columns.size() == 0) { - sb.append("Columns can not be empty.\n"); + if (format.getRowConverter() == null) { + sb.append("At least one of the Column and rowConverter is not empty.\n"); + } } if (sourceConf.getBatchSizeBytes() > ConstantValue.STORE_SIZE_G) { diff --git a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableSink.java b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableSink.java index 07f8c79a79..82acc80178 100644 --- a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableSink.java @@ -18,24 +18,18 @@ package com.dtstack.chunjun.connector.kudu.table; -import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.kudu.conf.KuduSinkConf; -import com.dtstack.chunjun.connector.kudu.converter.KuduRawTypeConverter; import com.dtstack.chunjun.connector.kudu.converter.KuduRowConverter; import com.dtstack.chunjun.connector.kudu.sink.KuduOutputFormatBuilder; import com.dtstack.chunjun.sink.DtOutputFormatSinkFunction; -import com.dtstack.chunjun.util.TableUtil; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; -import org.apache.flink.table.types.AtomicDataType; -import org.apache.flink.table.types.logical.NullType; import org.apache.flink.table.types.logical.RowType; -import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; /** * @author tiezhu @@ -64,33 +58,11 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { KuduOutputFormatBuilder builder = new KuduOutputFormatBuilder(); - String[] fieldNames = tableSchema.getFieldNames(); - List columnList = new ArrayList<>(fieldNames.length); - List columnNameList = new ArrayList<>(); - - for (int index = 0; index < fieldNames.length; index++) { - String name = fieldNames[index]; - columnNameList.add(name); - - FieldConf field = new FieldConf(); - field.setName(name); - field.setType( - tableSchema - .getFieldDataType(name) - .orElse(new AtomicDataType(new NullType())) - .getLogicalType() - .getTypeRoot() - .name()); - field.setIndex(index); - columnList.add(field); - } - sinkConf.setColumn(columnList); - - final RowType rowType = - TableUtil.createRowType(sinkConf.getColumn(), KuduRawTypeConverter::apply); - builder.setSinkConf(sinkConf); - builder.setRowConverter(new KuduRowConverter(rowType, columnNameList)); + builder.setRowConverter( + new KuduRowConverter( + (RowType) tableSchema.toRowDataType().getLogicalType(), + Arrays.asList(tableSchema.getFieldNames()))); return SinkFunctionProvider.of( new DtOutputFormatSinkFunction(builder.finish()), sinkConf.getParallelism()); diff --git a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableSource.java b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableSource.java index 78782ac259..f554b21985 100644 --- a/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-kudu/src/main/java/com/dtstack/chunjun/connector/kudu/table/KuduDynamicTableSource.java @@ -18,10 +18,8 @@ package com.dtstack.chunjun.connector.kudu.table; -import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.kudu.conf.KuduLookupConf; import com.dtstack.chunjun.connector.kudu.conf.KuduSourceConf; -import com.dtstack.chunjun.connector.kudu.converter.KuduRawTypeConverter; import com.dtstack.chunjun.connector.kudu.converter.KuduRowConverter; import com.dtstack.chunjun.connector.kudu.source.KuduInputFormatBuilder; import com.dtstack.chunjun.connector.kudu.table.lookup.KuduAllTableFunction; @@ -31,7 +29,6 @@ import com.dtstack.chunjun.table.connector.source.ParallelAsyncTableFunctionProvider; import com.dtstack.chunjun.table.connector.source.ParallelSourceFunctionProvider; import com.dtstack.chunjun.table.connector.source.ParallelTableFunctionProvider; -import com.dtstack.chunjun.util.TableUtil; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; @@ -41,16 +38,11 @@ import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.apache.flink.table.types.AtomicDataType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.NullType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; /** * @author tiezhu @@ -78,37 +70,12 @@ public KuduDynamicTableSource( public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { KuduInputFormatBuilder builder = new KuduInputFormatBuilder(); - LogicalType logicalType = tableSchema.toRowDataType().getLogicalType(); - InternalTypeInfo typeInfo = InternalTypeInfo.of(logicalType); - - String[] fieldNames = tableSchema.getFieldNames(); - List columnList = new ArrayList<>(fieldNames.length); - List columnNameList = new ArrayList<>(); - - for (int index = 0; index < fieldNames.length; index++) { - String name = fieldNames[index]; - columnNameList.add(name); - - FieldConf field = new FieldConf(); - field.setName(name); - field.setType( - tableSchema - .getFieldDataType(name) - .orElse(new AtomicDataType(new NullType())) - .getLogicalType() - .getTypeRoot() - .name()); - field.setIndex(index); - columnList.add(field); - } - - sourceConf.setColumn(columnList); - - RowType rowType = - TableUtil.createRowType(sourceConf.getColumn(), KuduRawTypeConverter::apply); + RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); + InternalTypeInfo typeInfo = InternalTypeInfo.of(rowType); builder.setKuduSourceConf(sourceConf); - builder.setRowConverter(new KuduRowConverter(rowType, columnNameList)); + builder.setRowConverter( + new KuduRowConverter(rowType, Arrays.asList(tableSchema.getFieldNames()))); return ParallelSourceFunctionProvider.of( new DtInputFormatSourceFunction<>(builder.finish(), typeInfo),