From c4e7e7601225e5e722f3a4a8a5922e0e1a4c3d3e Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 9 Dec 2024 16:02:54 +0800 Subject: [PATCH] [flink] Support nested projection pushdown --- .../paimon/table/source/ReadBuilder.java | 13 - .../paimon/table/IncrementalTableTest.java | 2 +- .../apache/paimon/table/TableTestBase.java | 2 +- .../paimon/table/system/BucketsTableTest.java | 2 +- .../table/system/PartitionsTableTest.java | 6 +- .../org/apache/paimon/flink/Projection.java | 200 ++++++++++++++ .../paimon/flink/ProjectionRowData.java | 250 ++++++++++++++++++ .../source/ContinuousFileStoreSource.java | 8 +- .../flink/source/FileStoreSourceReader.java | 6 +- .../source/FlinkRecordsWithSplitIds.java | 10 +- .../paimon/flink/source/FlinkSource.java | 13 +- .../flink/source/FlinkSourceBuilder.java | 51 +++- .../paimon/flink/source/FlinkTableSource.java | 5 +- .../flink/source/LogHybridSourceFactory.java | 24 +- .../flink/source/StaticFileStoreSource.java | 8 +- .../flink/source/SystemTableSource.java | 29 +- .../AlignedContinuousFileStoreSource.java | 10 +- .../source/align/AlignedSourceReader.java | 6 +- .../source/operator/MonitorFunction.java | 6 +- .../flink/source/operator/ReadOperator.java | 14 +- .../apache/paimon/flink/ProjectionTest.java | 71 +++++ .../source/FileStoreSourceMetricsTest.java | 2 +- .../source/FileStoreSourceReaderTest.java | 1 + .../source/FlinkRecordsWithSplitIdsTest.java | 3 +- .../source/ProjectionPushDownITCase.java | 128 +++++++++ .../source/align/AlignedSourceReaderTest.java | 3 +- .../source/operator/OperatorSourceTest.java | 4 +- 27 files changed, 808 insertions(+), 69 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java index 0c1386ce441d..d12de1211bfd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java @@ -27,7 +27,6 @@ import org.apache.paimon.utils.Filter; import java.io.Serializable; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -125,18 +124,6 @@ default ReadBuilder withFilter(List predicates) { */ ReadBuilder withProjection(int[] projection); - /** Apply projection to the reader, only support top level projection. */ - @Deprecated - default ReadBuilder withProjection(int[][] projection) { - if (projection == null) { - return this; - } - if (Arrays.stream(projection).anyMatch(arr -> arr.length > 1)) { - throw new IllegalStateException("Not support nested projection"); - } - return withProjection(Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray()); - } - /** the row number pushed down. */ ReadBuilder withLimit(int limit); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index b4b905d36453..43fbe2b6460a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -271,7 +271,7 @@ public void testTagIncremental() throws Exception { GenericRow.of(fromString("+I"), 1, 6, 1)); // read tag1 tag3 projection - result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); + result = read(table, new int[] {1}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6)); assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1"))) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 7f850a7725b4..eb4a2bfc7d25 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -167,7 +167,7 @@ protected List read(Table table, Pair, String>... d protected List read( Table table, - @Nullable int[][] projection, + @Nullable int[] projection, Pair, String>... dynamicOptions) throws Exception { Map options = new HashMap<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java index 57cb6605a922..b6bd71087412 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java @@ -72,7 +72,7 @@ public void before() throws Exception { @Test public void testBucketsTable() throws Exception { - assertThat(read(bucketsTable, new int[][] {{0}, {1}, {2}, {4}})) + assertThat(read(bucketsTable, new int[] {0, 1, 2, 4})) .containsExactlyInAnyOrder( GenericRow.of(BinaryString.fromString("[1]"), 0, 2L, 2L), GenericRow.of(BinaryString.fromString("[2]"), 0, 2L, 2L)); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java index 8d12dc707bf5..74d9a5eb7b1f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java @@ -90,7 +90,7 @@ public void testPartitionRecordCount() throws Exception { expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L)); // Only read partition and record count, record size may not stable. - List result = read(partitionsTable, new int[][] {{0}, {1}}); + List result = read(partitionsTable, new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } @@ -105,7 +105,7 @@ public void testPartitionTimeTravel() throws Exception { read( partitionsTable.copy( Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")), - new int[][] {{0}, {1}}); + new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } @@ -117,7 +117,7 @@ public void testPartitionValue() throws Exception { expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L)); expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L)); - List result = read(partitionsTable, new int[][] {{0}, {1}, {3}}); + List result = read(partitionsTable, new int[] {0, 1, 3}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java index 521d8a01311a..2e5197b04cdb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java @@ -18,14 +18,20 @@ package org.apache.paimon.flink; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DataTypeVisitor; + import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * {@link Projection} represents a list of (possibly nested) indexes that can be used to project @@ -38,6 +44,11 @@ private Projection() {} public abstract RowType project(RowType logicalType); + public abstract org.apache.paimon.types.RowType project( + org.apache.paimon.types.RowType rowType); + + public abstract ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType); + /** @return {@code true} whether this projection is nested or not. */ public abstract boolean isNested(); @@ -132,6 +143,16 @@ public RowType project(RowType dataType) { return new NestedProjection(toNestedIndexes()).project(dataType); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).project(rowType); + } + + @Override + public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).getRowData(rowType); + } + @Override public boolean isNested() { return false; @@ -184,6 +205,66 @@ public RowType project(RowType rowType) { return new RowType(rowType.isNullable(), updatedFields); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + MutableRowType result = + new MutableRowType(rowType.isNullable(), Collections.emptyList()); + for (int[] indexPath : this.projection) { + org.apache.paimon.types.RowType sourceType = rowType; + MutableRowType targetType = result; + int index; + for (index = 0; index < indexPath.length - 1; index++) { + String fieldName = sourceType.getFieldNames().get(indexPath[index]); + DataField field = sourceType.getField(fieldName); + sourceType = + (org.apache.paimon.types.RowType) sourceType.getField(fieldName).type(); + if (!targetType.containsField(fieldName)) { + targetType.appendDataField( + fieldName, + field.id(), + new MutableRowType( + sourceType.isNullable(), Collections.emptyList()), + field.description()); + } + targetType = (MutableRowType) targetType.getField(fieldName).type(); + } + + String fieldName = sourceType.getFieldNames().get(indexPath[index]); + DataField field = sourceType.getField(fieldName); + targetType.appendDataField( + fieldName, + field.id(), + sourceType.getField(fieldName).type(), + field.description()); + } + return result.toRowType(); + } + + @Override + public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { + org.apache.paimon.types.RowType resultType = project(rowType); + + int[][] resultIndices = new int[this.projection.length][]; + for (int i = 0; i < this.projection.length; i++) { + org.apache.paimon.types.RowType sourceType = rowType; + org.apache.paimon.types.RowType targetType = resultType; + resultIndices[i] = new int[this.projection[i].length]; + for (int j = 0; j < this.projection[i].length; j++) { + DataField sourceField = sourceType.getFields().get(this.projection[i][j]); + String fieldName = sourceField.name(); + resultIndices[i][j] = targetType.getFieldIndex(fieldName); + if (j < this.projection[i].length - 1) { + targetType = + (org.apache.paimon.types.RowType) + targetType.getField(fieldName).type(); + sourceType = (org.apache.paimon.types.RowType) sourceField.type(); + } + } + } + + return new ProjectionRowData(resultType, resultIndices); + } + @Override public boolean isNested() { return nested; @@ -217,6 +298,16 @@ public RowType project(RowType dataType) { return new NestedProjection(toNestedIndexes()).project(dataType); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).project(rowType); + } + + @Override + public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).getRowData(rowType); + } + @Override public boolean isNested() { return false; @@ -232,4 +323,113 @@ public int[][] toNestedIndexes() { return Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new); } } + + /** + * A mutable version of {@link org.apache.paimon.types.RowType} to facilitate the building + * process of projections. + * + *

It is mutable in aspect of the {@link #appendDataField} method. + */ + public static class MutableRowType extends org.apache.paimon.types.DataType { + private final List fields; + private final boolean isNullable; + + public MutableRowType(org.apache.paimon.types.RowType rowType) { + this(rowType.isNullable(), rowType.getFields()); + } + + public MutableRowType(boolean isNullable, List fields) { + super(isNullable, DataTypeRoot.ROW); + this.fields = new ArrayList<>(fields); + this.isNullable = isNullable; + } + + public org.apache.paimon.types.RowType toRowType() { + for (int i = 0; i < fields.size(); i++) { + DataField field = fields.get(i); + if (field.type() instanceof MutableRowType) { + fields.set( + i, + new DataField( + field.id(), + field.name(), + ((MutableRowType) field.type()).toRowType(), + field.description())); + } + } + return new org.apache.paimon.types.RowType(isNullable, fields); + } + + public List getFields() { + return fields; + } + + public List getFieldNames() { + return fields.stream().map(DataField::name).collect(Collectors.toList()); + } + + public List getFieldTypes() { + return fields.stream().map(DataField::type).collect(Collectors.toList()); + } + + public int getFieldCount() { + return fields.size(); + } + + public boolean containsField(String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return true; + } + } + return false; + } + + public DataField getField(String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return field; + } + } + + throw new RuntimeException("Cannot find field: " + fieldName); + } + + public DataField getField(int fieldId) { + for (DataField field : fields) { + if (field.id() == fieldId) { + return field; + } + } + throw new RuntimeException("Cannot find field by field id: " + fieldId); + } + + public void appendDataField( + String name, int newId, org.apache.paimon.types.DataType type, String description) { + if (type instanceof org.apache.paimon.types.RowType) { + type = new MutableRowType((org.apache.paimon.types.RowType) type); + } + fields.add(new DataField(newId, name, type, description)); + } + + @Override + public int defaultSize() { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.paimon.types.DataType copy(boolean isNullable) { + throw new UnsupportedOperationException(); + } + + @Override + public String asSQLString() { + throw new UnsupportedOperationException(); + } + + @Override + public R accept(DataTypeVisitor visitor) { + throw new UnsupportedOperationException(); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java new file mode 100644 index 000000000000..d913830e8120 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.types.RowType; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** + * A {@link RowData} that provides a mapping view of the original {@link RowData} according to + * projection information. + */ +public class ProjectionRowData implements RowData, Serializable { + private final RowType producedDataType; + private final int[][] projectedFields; + private final int[] lastProjectedFields; + private transient RowData row; + + ProjectionRowData(RowType producedDataType, int[][] projectedFields) { + this.producedDataType = producedDataType; + this.projectedFields = projectedFields; + this.lastProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + this.lastProjectedFields[i] = projectedFields[i][projectedFields[i].length - 1]; + } + } + + public ProjectionRowData replaceRow(RowData row) { + this.row = row; + return this; + } + + public static @Nullable ProjectionRowData copy(@Nullable ProjectionRowData rowData) { + if (rowData == null) { + return null; + } + return new ProjectionRowData(rowData.producedDataType, rowData.projectedFields); + } + + @Override + public int getArity() { + return projectedFields.length; + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind rowKind) { + row.setRowKind(rowKind); + } + + @Override + public boolean isNullAt(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return true; + } + return rowData.isNullAt(lastProjectedFields[i]); + } + + private @Nullable RowData extractInternalRow(int i) { + int[] projectedField = projectedFields[i]; + RowData rowData = this.row; + RowType dataType = producedDataType; + for (int j = 0; j < projectedField.length - 1; j++) { + dataType = (RowType) dataType.getTypeAt(projectedField[j]); + if (rowData.isNullAt(projectedField[j])) { + return null; + } + rowData = rowData.getRow(projectedField[j], dataType.getFieldCount()); + } + return rowData; + } + + @Override + public boolean getBoolean(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getBoolean(lastProjectedFields[i]); + } + + @Override + public byte getByte(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getByte(lastProjectedFields[i]); + } + + @Override + public short getShort(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getShort(lastProjectedFields[i]); + } + + @Override + public int getInt(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getInt(lastProjectedFields[i]); + } + + @Override + public long getLong(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getLong(lastProjectedFields[i]); + } + + @Override + public float getFloat(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getFloat(lastProjectedFields[i]); + } + + @Override + public double getDouble(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getDouble(lastProjectedFields[i]); + } + + @Override + public StringData getString(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getString(lastProjectedFields[i]); + } + + @Override + public DecimalData getDecimal(int i, int i1, int i2) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getDecimal(lastProjectedFields[i], i1, i2); + } + + @Override + public TimestampData getTimestamp(int i, int i1) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getTimestamp(lastProjectedFields[i], i1); + } + + @Override + public RawValueData getRawValue(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getRawValue(lastProjectedFields[i]); + } + + @Override + public byte[] getBinary(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getBinary(lastProjectedFields[i]); + } + + @Override + public ArrayData getArray(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getArray(lastProjectedFields[i]); + } + + @Override + public MapData getMap(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getMap(lastProjectedFields[i]); + } + + @Override + public RowData getRow(int i, int i1) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getRow(lastProjectedFields[i], i1); + } + + @VisibleForTesting + public int[][] getProjectedFields() { + return projectedFields; + } + + @VisibleForTesting + public RowType getRowType() { + return producedDataType; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java index 559976921e2e..45f9b876a63e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.options.Options; import org.apache.paimon.table.BucketMode; @@ -48,15 +49,16 @@ public class ContinuousFileStoreSource extends FlinkSource { public ContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit) { - this(readBuilder, options, limit, BucketMode.HASH_FIXED); + this(readBuilder, options, limit, BucketMode.HASH_FIXED, null); } public ContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit, - BucketMode bucketMode) { - super(readBuilder, limit); + BucketMode bucketMode, + @Nullable ProjectionRowData rowData) { + super(readBuilder, limit, rowData); this.options = options; this.bucketMode = bucketMode; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java index 8fc78c868ba5..17761c7fee86 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.table.source.TableRead; @@ -48,7 +49,8 @@ public FileStoreSourceReader( TableRead tableRead, FileStoreSourceReaderMetrics metrics, IOManager ioManager, - @Nullable Long limit) { + @Nullable Long limit, + @Nullable ProjectionRowData rowData) { // limiter is created in SourceReader, it can be shared in all split readers super( () -> @@ -56,7 +58,7 @@ public FileStoreSourceReader( tableRead, RecordLimiter.create(limit), metrics), (element, output, state) -> FlinkRecordsWithSplitIds.emitRecord( - readerContext, element, output, state, metrics), + readerContext, element, output, state, metrics, rowData), readerContext.getConfiguration(), readerContext); this.ioManager = ioManager; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java index ecb304f83c58..acb92d35ecf9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.utils.Reference; @@ -109,7 +110,8 @@ public static void emitRecord( RecordIterator element, SourceOutput output, FileStoreSourceSplitState state, - FileStoreSourceReaderMetrics metrics) { + FileStoreSourceReaderMetrics metrics, + @Nullable ProjectionRowData projectionRowData) { long timestamp = TimestampAssigner.NO_TIMESTAMP; if (metrics.getLatestFileCreationTime() != FileStoreSourceReaderMetrics.UNDEFINED) { timestamp = metrics.getLatestFileCreationTime(); @@ -131,7 +133,11 @@ public static void emitRecord( numRecordsIn.inc(); } - output.collect(record.getRecord(), timestamp); + RowData rowData = record.getRecord(); + if (projectionRowData != null) { + rowData = projectionRowData.replaceRow(rowData); + } + output.collect(rowData, timestamp); state.setPosition(record); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java index 7643f7b775d3..ee2cd769e5eb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.ReadBuilder; @@ -43,10 +44,13 @@ public abstract class FlinkSource protected final ReadBuilder readBuilder; @Nullable protected final Long limit; + @Nullable protected final ProjectionRowData rowData; - public FlinkSource(ReadBuilder readBuilder, @Nullable Long limit) { + public FlinkSource( + ReadBuilder readBuilder, @Nullable Long limit, @Nullable ProjectionRowData rowData) { this.readBuilder = readBuilder; this.limit = limit; + this.rowData = rowData; } @Override @@ -56,7 +60,12 @@ public SourceReader createReader(SourceReaderCont FileStoreSourceReaderMetrics sourceReaderMetrics = new FileStoreSourceReaderMetrics(context.metricGroup()); return new FileStoreSourceReader( - context, readBuilder.newRead(), sourceReaderMetrics, ioManager, limit); + context, + readBuilder.newRead(), + sourceReaderMetrics, + ioManager, + limit, + ProjectionRowData.copy(rowData)); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index e864ec050045..3c9fe04a6351 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.StreamingReadMode; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.Projection; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource; @@ -171,9 +172,12 @@ FlinkSourceBuilder logSourceProvider(LogSourceProvider logSourceProvider) { return this; } - private ReadBuilder createReadBuilder() { - ReadBuilder readBuilder = - table.newReadBuilder().withProjection(projectedFields).withFilter(predicate); + private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType readType) { + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + readBuilder.withFilter(predicate); if (limit != null) { readBuilder.withLimit(limit.intValue()); } @@ -184,24 +188,33 @@ private DataStream buildStaticFileSource() { Options options = Options.fromMap(table.options()); return toDataStream( new StaticFileStoreSource( - createReadBuilder(), + createReadBuilder(projectedRowType()), limit, options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE), options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE), - dynamicPartitionFilteringInfo)); + dynamicPartitionFilteringInfo, + projectedRowData())); } private DataStream buildContinuousFileSource() { return toDataStream( new ContinuousFileStoreSource( - createReadBuilder(), table.options(), limit, bucketMode)); + createReadBuilder(projectedRowType()), + table.options(), + limit, + bucketMode, + projectedRowData())); } private DataStream buildAlignedContinuousFileSource() { assertStreamingConfigurationForAlignMode(env); return toDataStream( new AlignedContinuousFileStoreSource( - createReadBuilder(), table.options(), limit, bucketMode)); + createReadBuilder(projectedRowType()), + table.options(), + limit, + bucketMode, + projectedRowData())); } private DataStream toDataStream(Source source) { @@ -237,6 +250,20 @@ private TypeInformation produceTypeInfo() { return InternalTypeInfo.of(produceType); } + private @Nullable org.apache.paimon.types.RowType projectedRowType() { + return Optional.ofNullable(projectedFields) + .map(Projection::of) + .map(p -> p.project(table.rowType())) + .orElse(null); + } + + private @Nullable ProjectionRowData projectedRowData() { + return Optional.ofNullable(projectedFields) + .map(Projection::of) + .map(p -> p.getRowData(table.rowType())) + .orElse(null); + } + /** Build source {@link DataStream} with {@link RowData}. */ public DataStream buildForRow() { DataType rowType = fromLogicalToDataType(toLogicalType(table.rowType())); @@ -280,7 +307,10 @@ public DataStream build() { return toDataStream( HybridSource.builder( LogHybridSourceFactory.buildHybridFirstSource( - table, projectedFields, predicate)) + table, + projectedRowType(), + predicate, + projectedRowData())) .addSource( new LogHybridSourceFactory(logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED) @@ -310,12 +340,13 @@ private DataStream buildContinuousStreamOperator() { env, sourceName, produceTypeInfo(), - createReadBuilder(), + createReadBuilder(projectedRowType()), conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(), watermarkStrategy == null, conf.get( FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION), - bucketMode); + bucketMode, + projectedRowData()); if (parallelism != null) { dataStream.getTransformation().setParallelism(parallelism); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 12b579589d0f..741754fc3376 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -40,6 +40,7 @@ import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,11 +124,11 @@ public Result applyFilters(List filters) { @Override public boolean supportsNestedProjection() { - return false; + return true; } @Override - public void applyProjection(int[][] projectedFields) { + public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.projectFields = projectedFields; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java index 90c283bf87c9..f1ade3de9130 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.options.Options; @@ -29,6 +30,7 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.api.connector.source.Boundedness; @@ -69,7 +71,10 @@ public LogHybridSourceFactory(LogSourceProvider provider) { } public static FlinkSource buildHybridFirstSource( - Table table, @Nullable int[][] projectedFields, @Nullable Predicate predicate) { + Table table, + @Nullable RowType readType, + @Nullable Predicate predicate, + @Nullable ProjectionRowData rowData) { if (!(table instanceof DataTable)) { throw new UnsupportedOperationException( String.format( @@ -79,10 +84,16 @@ public static FlinkSource buildHybridFirstSource( DataTable dataTable = (DataTable) table; + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + return new FlinkHybridFirstSource( - table.newReadBuilder().withProjection(projectedFields).withFilter(predicate), + readBuilder.withFilter(predicate), dataTable.snapshotManager(), - dataTable.coreOptions().toConfiguration()); + dataTable.coreOptions().toConfiguration(), + rowData); } /** The first source of a log {@link HybridSource}. */ @@ -94,8 +105,11 @@ private static class FlinkHybridFirstSource extends FlinkSource { private final Options options; public FlinkHybridFirstSource( - ReadBuilder readBuilder, SnapshotManager snapshotManager, Options options) { - super(readBuilder, null); + ReadBuilder readBuilder, + SnapshotManager snapshotManager, + Options options, + @Nullable ProjectionRowData rowData) { + super(readBuilder, null, rowData); this.snapshotManager = snapshotManager; this.options = options; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java index af425aab5e46..48b977559b2e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner; import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; @@ -53,7 +54,7 @@ public StaticFileStoreSource( @Nullable Long limit, int splitBatchSize, SplitAssignMode splitAssignMode) { - this(readBuilder, limit, splitBatchSize, splitAssignMode, null); + this(readBuilder, limit, splitBatchSize, splitAssignMode, null, null); } public StaticFileStoreSource( @@ -61,8 +62,9 @@ public StaticFileStoreSource( @Nullable Long limit, int splitBatchSize, SplitAssignMode splitAssignMode, - @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { - super(readBuilder, limit); + @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo, + @Nullable ProjectionRowData rowData) { + super(readBuilder, limit, rowData); this.splitBatchSize = splitBatchSize; this.splitAssignMode = splitAssignMode; this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index e914c617fffb..ec4683eadbcc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -20,8 +20,11 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.PaimonDataStreamScanProvider; +import org.apache.paimon.flink.Projection; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; @@ -32,8 +35,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; import org.apache.flink.table.data.RowData; import javax.annotation.Nullable; @@ -80,13 +81,29 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { Source source; - ReadBuilder readBuilder = - table.newReadBuilder().withProjection(projectFields).withFilter(predicate); + + ProjectionRowData rowData = null; + org.apache.paimon.types.RowType readType = null; + if (projectFields != null) { + Projection projection = Projection.of(projectFields); + rowData = projection.getRowData(table.rowType()); + readType = projection.project(table.rowType()); + } + + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + readBuilder.withFilter(predicate); if (isStreamingMode && table instanceof DataTable) { - source = new ContinuousFileStoreSource(readBuilder, table.options(), limit); + source = + new ContinuousFileStoreSource( + readBuilder, table.options(), limit, BucketMode.HASH_FIXED, rowData); } else { - source = new StaticFileStoreSource(readBuilder, limit, splitBatchSize, splitAssignMode); + source = + new StaticFileStoreSource( + readBuilder, limit, splitBatchSize, splitAssignMode, null, rowData); } return new PaimonDataStreamScanProvider( source.getBoundedness() == Boundedness.BOUNDED, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java index 705e1d9a7a4c..01b6c3f59ece 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.ContinuousFileStoreSource; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.PendingSplitsCheckpoint; @@ -52,8 +53,9 @@ public AlignedContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit, - BucketMode bucketMode) { - super(readBuilder, options, limit, bucketMode); + BucketMode bucketMode, + @Nullable ProjectionRowData rowData) { + super(readBuilder, options, limit, bucketMode, rowData); } @Override @@ -72,8 +74,8 @@ public SourceReader createReader(SourceReaderCont ioManager, limit, new FutureCompletingBlockingQueue<>( - context.getConfiguration() - .get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY))); + context.getConfiguration().get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)), + rowData); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java index a8ffe3de561f..9efd02453da8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source.align; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.FileStoreSourceReader; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.FileStoreSourceSplitState; @@ -57,8 +58,9 @@ public AlignedSourceReader( IOManager ioManager, @Nullable Long limit, FutureCompletingBlockingQueue>> - elementsQueue) { - super(readerContext, tableRead, metrics, ioManager, limit); + elementsQueue, + @Nullable ProjectionRowData rowData) { + super(readerContext, tableRead, metrics, ioManager, limit, rowData); this.elementsQueue = elementsQueue; this.nextCheckpointId = null; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java index 3805f6f8c536..fadb1ec0a812 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source.operator; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.ChannelComputer; @@ -235,7 +236,8 @@ public static DataStream buildSource( long monitorInterval, boolean emitSnapshotWatermark, boolean shuffleBucketWithPartition, - BucketMode bucketMode) { + BucketMode bucketMode, + ProjectionRowData projectionRowData) { SingleOutputStreamOperator singleOutputStreamOperator = env.addSource( new MonitorFunction( @@ -251,7 +253,7 @@ public static DataStream buildSource( singleOutputStreamOperator, shuffleBucketWithPartition); return sourceDataStream.transform( - name + "-Reader", typeInfo, new ReadOperator(readBuilder)); + name + "-Reader", typeInfo, new ReadOperator(readBuilder, projectionRowData)); } private static DataStream shuffleUnwareBucket( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index d884724c6749..28bb700668fa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkRowData; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; @@ -36,6 +37,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; +import javax.annotation.Nullable; + /** * The operator that reads the {@link Split splits} received from the preceding {@link * MonitorFunction}. Contrary to the {@link MonitorFunction} which has a parallelism of 1, this @@ -47,6 +50,7 @@ public class ReadOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; private final ReadBuilder readBuilder; + @Nullable private final ProjectionRowData projectionRowData; private transient TableRead read; private transient StreamRecord reuseRecord; @@ -61,8 +65,9 @@ public class ReadOperator extends AbstractStreamOperator private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; private transient Counter numRecordsIn; - public ReadOperator(ReadBuilder readBuilder) { + public ReadOperator(ReadBuilder readBuilder, @Nullable ProjectionRowData projectionRowData) { this.readBuilder = readBuilder; + this.projectionRowData = projectionRowData; } @Override @@ -85,7 +90,12 @@ public void open() throws Exception { .getSpillingDirectoriesPaths()); this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); - this.reuseRecord = new StreamRecord<>(reuseRow); + if (projectionRowData != null) { + projectionRowData.replaceRow(this.reuseRow); + this.reuseRecord = new StreamRecord<>(projectionRowData); + } else { + this.reuseRecord = new StreamRecord<>(reuseRow); + } this.idlingStarted(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java new file mode 100644 index 000000000000..eec145412f1a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link Projection}. */ +public class ProjectionTest { + @Test + public void testNestedProjection() { + RowType writeType = + DataTypes.ROW( + DataTypes.FIELD(0, "f0", DataTypes.INT()), + DataTypes.FIELD( + 1, + "f1", + DataTypes.ROW( + DataTypes.FIELD(2, "f0", DataTypes.INT()), + DataTypes.FIELD(3, "f1", DataTypes.INT()), + DataTypes.FIELD(4, "f2", DataTypes.INT())))); + + // skip read f0, f1.f1 + RowType readType = + DataTypes.ROW( + DataTypes.FIELD( + 1, + "f1", + DataTypes.ROW( + DataTypes.FIELD(2, "f0", DataTypes.INT()), + DataTypes.FIELD(4, "f2", DataTypes.INT())))); + + Projection projection = Projection.of(new int[][] {{1, 0}, {1, 2}}); + assertThat(projection.project(writeType)).isEqualTo(readType); + + RowType readTypeForFlink = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "f1", + DataTypes.ROW( + DataTypes.FIELD(0, "f0", DataTypes.INT()), + DataTypes.FIELD(1, "f2", DataTypes.INT())))); + + ProjectionRowData rowData = projection.getRowData(writeType); + + assertThat(rowData.getRowType()).isEqualTo(readTypeForFlink); + + assertThat(rowData.getProjectedFields()).isEqualTo(new int[][] {{0, 0}, {0, 1}}); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java index 13613dab063a..24f35cdfdb8c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java @@ -129,7 +129,7 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception { public void logHybridFileStoreSourceScanMetricsTest() throws Exception { writeOnce(); FlinkSource logHybridFileStoreSource = - LogHybridSourceFactory.buildHybridFirstSource(table, null, null); + LogHybridSourceFactory.buildHybridFirstSource(table, null, null, null); logHybridFileStoreSource.restoreEnumerator(context, null); assertThat(TestingMetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) .isEqualTo(1L); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java index 882763cf74da..608daa85978d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java @@ -141,6 +141,7 @@ protected FileStoreSourceReader createReader(TestingReaderContext context) { new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(), new FileStoreSourceReaderMetrics(new DummyMetricGroup()), IOManager.create(tempDir.toString()), + null, null); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java index 7f5f2f174d6f..eb83790ec277 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java @@ -69,7 +69,8 @@ public void testEmitRecord() { iterator, output, state, - new FileStoreSourceReaderMetrics(new DummyMetricGroup())); + new FileStoreSourceReaderMetrics(new DummyMetricGroup()), + null); assertThat(output.getEmittedRecords()).containsExactly(rows); assertThat(state.recordsToSkip()).isEqualTo(2); assertThat(records.nextRecordFromSplit()).isNull(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java new file mode 100644 index 000000000000..4f938a75edae --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.flink.CatalogITCaseBase; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.api.ExplainFormat; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Tests for {@link SupportsProjectionPushDown}. */ +public class ProjectionPushDownITCase extends CatalogITCaseBase { + + @Override + public List ddl() { + return ImmutableList.of( + "CREATE TABLE T (" + + "a INT, b ROW, c STRING, d ROW, d1 BOOLEAN, d2 INT>) PARTITIONED BY (a);"); + } + + @BeforeEach + @Override + public void before() throws IOException { + super.before(); + batchSql( + "INSERT INTO T VALUES " + + "(1, ROW(10, 'value1'), '1', ROW(ROW('valued1', 1), true, 10)), " + + "(1, ROW(20, 'value2'), '2', ROW(ROW('valued2', 2), false, 20)), " + + "(2, ROW(30, 'value3'), '3', ROW(ROW('valued3', 3), true, 30)), " + + "(3, ROW(30, 'value3'), '3', ROW(ROW('valued3', 3), false, 30))"); + } + + @Test + public void testProjectionPushDown() { + String sql = "SELECT a, c FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, c]]], fields=[a, c])", + Row.ofKind(RowKind.INSERT, 1, "1"), + Row.ofKind(RowKind.INSERT, 1, "2"), + Row.ofKind(RowKind.INSERT, 2, "3"), + Row.ofKind(RowKind.INSERT, 3, "3")); + } + + @Test + public void testNestedProjectionPushDown() { + String sql = "SELECT a, b.b1 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, b_b1]]], fields=[a, b_b1])", + Row.ofKind(RowKind.INSERT, 1, "value1"), + Row.ofKind(RowKind.INSERT, 1, "value2"), + Row.ofKind(RowKind.INSERT, 2, "value3"), + Row.ofKind(RowKind.INSERT, 3, "value3")); + } + + @Test + public void testNestedProjectionPushDownTripleLevel() { + String sql = "SELECT a, d.d0.d00 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, d_d0_d00]]], fields=[a, d_d0_d00])", + Row.ofKind(RowKind.INSERT, 1, "valued1"), + Row.ofKind(RowKind.INSERT, 1, "valued2"), + Row.ofKind(RowKind.INSERT, 2, "valued3"), + Row.ofKind(RowKind.INSERT, 3, "valued3")); + } + + @Test + public void testNestedProjectionPushDownMultipleFields() { + String sql = "SELECT a, b.b1, d.d2 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, b_b1, d_d2]]], fields=[a, b_b1, d_d2])", + Row.ofKind(RowKind.INSERT, 1, "value1", 10), + Row.ofKind(RowKind.INSERT, 1, "value2", 20), + Row.ofKind(RowKind.INSERT, 2, "value3", 30), + Row.ofKind(RowKind.INSERT, 3, "value3", 30)); + } + + @Test + public void testMultipleNestedProjectionPushDownWithUnorderedColumns() { + String sql = "SELECT c, d.d1, b.b1, a FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[c, d_d1, b_b1, a]]], fields=[c, d_d1, b_b1, a])", + Row.ofKind(RowKind.INSERT, "1", true, "value1", 1), + Row.ofKind(RowKind.INSERT, "2", false, "value2", 1), + Row.ofKind(RowKind.INSERT, "3", true, "value3", 2), + Row.ofKind(RowKind.INSERT, "3", false, "value3", 3)); + } + + private void assertPlanAndResult(String sql, String planIdentifier, Row... expectedRows) { + String plan = tEnv.explainSql(sql, ExplainFormat.TEXT); + String[] lines = plan.split("\n"); + String trimmed = Arrays.stream(lines).map(String::trim).collect(Collectors.joining("\n")); + Assertions.assertThat(trimmed).contains(planIdentifier); + List result = batchSql(sql); + Assertions.assertThat(result).containsExactlyInAnyOrder(expectedRows); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java index c36af0f6dcbc..f815dbe6321b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java @@ -77,6 +77,7 @@ protected FileStoreSourceReader createReader(TestingReaderContext context) { new FileStoreSourceReaderMetrics(new DummyMetricGroup()), IOManager.create(tempDir.toString()), null, - new FutureCompletingBlockingQueue<>(2)); + new FutureCompletingBlockingQueue<>(2), + null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 0bce8c8901ea..2eb5a90fc151 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -159,7 +159,7 @@ public void testMonitorFunction() throws Exception { @Test public void testReadOperator() throws Exception { - ReadOperator readOperator = new ReadOperator(table.newReadBuilder()); + ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(readOperator); harness.setup( @@ -181,7 +181,7 @@ public void testReadOperator() throws Exception { @Test public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { - ReadOperator readOperator = new ReadOperator(table.newReadBuilder()); + ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(readOperator); harness.setup(