From 3bcaee376e8a87931dfad297b6b4925436e2664b Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Sun, 15 Dec 2024 09:26:28 +0800 Subject: [PATCH 1/8] [core] Fix empty row type --- .../paimon/utils/FormatReaderMapping.java | 55 ++++++++++++------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index f6c6287f51b4..5a882f2faf00 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -243,11 +243,13 @@ private List readDataFields(List allDataFields) { .filter(f -> f.id() == dataField.id()) .findFirst() .ifPresent( - field -> - readDataFields.add( - dataField.newType( - pruneDataType( - field.type(), dataField.type())))); + field -> { + DataType prunedType = + pruneDataType(field.type(), dataField.type()); + if (prunedType != null) { + readDataFields.add(dataField.newType(prunedType)); + } + }); } return readDataFields; } @@ -261,25 +263,40 @@ private DataType pruneDataType(DataType readType, DataType dataType) { for (DataField rf : r.getFields()) { if (d.containsField(rf.id())) { DataField df = d.getField(rf.id()); - newFields.add(df.newType(pruneDataType(rf.type(), df.type()))); + DataType newType = pruneDataType(rf.type(), df.type()); + if (newType == null) { + continue; + } + newFields.add(df.newType(newType)); } } + if (newFields.isEmpty()) { + // When all fields are pruned, we should not return an empty row type + return null; + } return d.copy(newFields); case MAP: - return ((MapType) dataType) - .newKeyValueType( - pruneDataType( - ((MapType) readType).getKeyType(), - ((MapType) dataType).getKeyType()), - pruneDataType( - ((MapType) readType).getValueType(), - ((MapType) dataType).getValueType())); + DataType keyType = + pruneDataType( + ((MapType) readType).getKeyType(), + ((MapType) dataType).getKeyType()); + DataType valueType = + pruneDataType( + ((MapType) readType).getValueType(), + ((MapType) dataType).getValueType()); + if (keyType == null || valueType == null) { + return null; + } + return ((MapType) dataType).newKeyValueType(keyType, valueType); case ARRAY: - return ((ArrayType) dataType) - .newElementType( - pruneDataType( - ((ArrayType) readType).getElementType(), - ((ArrayType) dataType).getElementType())); + DataType elementType = + pruneDataType( + ((ArrayType) readType).getElementType(), + ((ArrayType) dataType).getElementType()); + if (elementType == null) { + return null; + } + return ((ArrayType) dataType).newElementType(elementType); default: return dataType; } From dc8714ef7562149f54b2f05067909bbdfd051fb7 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 9 Dec 2024 16:02:54 +0800 Subject: [PATCH 2/8] [flink] Support nested projection pushdown --- .../paimon/table/source/ReadBuilder.java | 13 - .../paimon/table/IncrementalTableTest.java | 2 +- .../apache/paimon/table/TableTestBase.java | 2 +- .../paimon/table/system/BucketsTableTest.java | 2 +- .../table/system/PartitionsTableTest.java | 6 +- .../org/apache/paimon/flink/Projection.java | 200 ++++++++++++++ .../paimon/flink/ProjectionRowData.java | 250 ++++++++++++++++++ .../source/ContinuousFileStoreSource.java | 8 +- .../flink/source/FileStoreSourceReader.java | 6 +- .../source/FlinkRecordsWithSplitIds.java | 10 +- .../paimon/flink/source/FlinkSource.java | 13 +- .../flink/source/FlinkSourceBuilder.java | 51 +++- .../paimon/flink/source/FlinkTableSource.java | 5 +- .../flink/source/LogHybridSourceFactory.java | 24 +- .../flink/source/StaticFileStoreSource.java | 8 +- .../flink/source/SystemTableSource.java | 29 +- .../AlignedContinuousFileStoreSource.java | 10 +- .../source/align/AlignedSourceReader.java | 6 +- .../flink/source/operator/MonitorSource.java | 6 +- .../flink/source/operator/ReadOperator.java | 14 +- .../apache/paimon/flink/ProjectionTest.java | 71 +++++ .../source/FileStoreSourceMetricsTest.java | 2 +- .../source/FileStoreSourceReaderTest.java | 1 + .../source/FlinkRecordsWithSplitIdsTest.java | 3 +- .../source/ProjectionPushDownITCase.java | 128 +++++++++ .../source/align/AlignedSourceReaderTest.java | 3 +- .../source/operator/OperatorSourceTest.java | 4 +- 27 files changed, 808 insertions(+), 69 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java index 0c1386ce441d..d12de1211bfd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java @@ -27,7 +27,6 @@ import org.apache.paimon.utils.Filter; import java.io.Serializable; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -125,18 +124,6 @@ default ReadBuilder withFilter(List predicates) { */ ReadBuilder withProjection(int[] projection); - /** Apply projection to the reader, only support top level projection. */ - @Deprecated - default ReadBuilder withProjection(int[][] projection) { - if (projection == null) { - return this; - } - if (Arrays.stream(projection).anyMatch(arr -> arr.length > 1)) { - throw new IllegalStateException("Not support nested projection"); - } - return withProjection(Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray()); - } - /** the row number pushed down. */ ReadBuilder withLimit(int limit); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index b4b905d36453..43fbe2b6460a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -271,7 +271,7 @@ public void testTagIncremental() throws Exception { GenericRow.of(fromString("+I"), 1, 6, 1)); // read tag1 tag3 projection - result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); + result = read(table, new int[] {1}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6)); assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1"))) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 7d7617cf8bd1..aedf5553e020 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -171,7 +171,7 @@ protected List read(Table table, Pair, String>... d protected List read( Table table, - @Nullable int[][] projection, + @Nullable int[] projection, Pair, String>... dynamicOptions) throws Exception { Map options = new HashMap<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java index 57cb6605a922..b6bd71087412 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BucketsTableTest.java @@ -72,7 +72,7 @@ public void before() throws Exception { @Test public void testBucketsTable() throws Exception { - assertThat(read(bucketsTable, new int[][] {{0}, {1}, {2}, {4}})) + assertThat(read(bucketsTable, new int[] {0, 1, 2, 4})) .containsExactlyInAnyOrder( GenericRow.of(BinaryString.fromString("[1]"), 0, 2L, 2L), GenericRow.of(BinaryString.fromString("[2]"), 0, 2L, 2L)); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java index 8d12dc707bf5..74d9a5eb7b1f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java @@ -90,7 +90,7 @@ public void testPartitionRecordCount() throws Exception { expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L)); // Only read partition and record count, record size may not stable. - List result = read(partitionsTable, new int[][] {{0}, {1}}); + List result = read(partitionsTable, new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } @@ -105,7 +105,7 @@ public void testPartitionTimeTravel() throws Exception { read( partitionsTable.copy( Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")), - new int[][] {{0}, {1}}); + new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } @@ -117,7 +117,7 @@ public void testPartitionValue() throws Exception { expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L)); expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L)); - List result = read(partitionsTable, new int[][] {{0}, {1}, {3}}); + List result = read(partitionsTable, new int[] {0, 1, 3}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java index 521d8a01311a..2e5197b04cdb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java @@ -18,14 +18,20 @@ package org.apache.paimon.flink; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DataTypeVisitor; + import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * {@link Projection} represents a list of (possibly nested) indexes that can be used to project @@ -38,6 +44,11 @@ private Projection() {} public abstract RowType project(RowType logicalType); + public abstract org.apache.paimon.types.RowType project( + org.apache.paimon.types.RowType rowType); + + public abstract ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType); + /** @return {@code true} whether this projection is nested or not. */ public abstract boolean isNested(); @@ -132,6 +143,16 @@ public RowType project(RowType dataType) { return new NestedProjection(toNestedIndexes()).project(dataType); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).project(rowType); + } + + @Override + public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).getRowData(rowType); + } + @Override public boolean isNested() { return false; @@ -184,6 +205,66 @@ public RowType project(RowType rowType) { return new RowType(rowType.isNullable(), updatedFields); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + MutableRowType result = + new MutableRowType(rowType.isNullable(), Collections.emptyList()); + for (int[] indexPath : this.projection) { + org.apache.paimon.types.RowType sourceType = rowType; + MutableRowType targetType = result; + int index; + for (index = 0; index < indexPath.length - 1; index++) { + String fieldName = sourceType.getFieldNames().get(indexPath[index]); + DataField field = sourceType.getField(fieldName); + sourceType = + (org.apache.paimon.types.RowType) sourceType.getField(fieldName).type(); + if (!targetType.containsField(fieldName)) { + targetType.appendDataField( + fieldName, + field.id(), + new MutableRowType( + sourceType.isNullable(), Collections.emptyList()), + field.description()); + } + targetType = (MutableRowType) targetType.getField(fieldName).type(); + } + + String fieldName = sourceType.getFieldNames().get(indexPath[index]); + DataField field = sourceType.getField(fieldName); + targetType.appendDataField( + fieldName, + field.id(), + sourceType.getField(fieldName).type(), + field.description()); + } + return result.toRowType(); + } + + @Override + public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { + org.apache.paimon.types.RowType resultType = project(rowType); + + int[][] resultIndices = new int[this.projection.length][]; + for (int i = 0; i < this.projection.length; i++) { + org.apache.paimon.types.RowType sourceType = rowType; + org.apache.paimon.types.RowType targetType = resultType; + resultIndices[i] = new int[this.projection[i].length]; + for (int j = 0; j < this.projection[i].length; j++) { + DataField sourceField = sourceType.getFields().get(this.projection[i][j]); + String fieldName = sourceField.name(); + resultIndices[i][j] = targetType.getFieldIndex(fieldName); + if (j < this.projection[i].length - 1) { + targetType = + (org.apache.paimon.types.RowType) + targetType.getField(fieldName).type(); + sourceType = (org.apache.paimon.types.RowType) sourceField.type(); + } + } + } + + return new ProjectionRowData(resultType, resultIndices); + } + @Override public boolean isNested() { return nested; @@ -217,6 +298,16 @@ public RowType project(RowType dataType) { return new NestedProjection(toNestedIndexes()).project(dataType); } + @Override + public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).project(rowType); + } + + @Override + public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).getRowData(rowType); + } + @Override public boolean isNested() { return false; @@ -232,4 +323,113 @@ public int[][] toNestedIndexes() { return Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new); } } + + /** + * A mutable version of {@link org.apache.paimon.types.RowType} to facilitate the building + * process of projections. + * + *

It is mutable in aspect of the {@link #appendDataField} method. + */ + public static class MutableRowType extends org.apache.paimon.types.DataType { + private final List fields; + private final boolean isNullable; + + public MutableRowType(org.apache.paimon.types.RowType rowType) { + this(rowType.isNullable(), rowType.getFields()); + } + + public MutableRowType(boolean isNullable, List fields) { + super(isNullable, DataTypeRoot.ROW); + this.fields = new ArrayList<>(fields); + this.isNullable = isNullable; + } + + public org.apache.paimon.types.RowType toRowType() { + for (int i = 0; i < fields.size(); i++) { + DataField field = fields.get(i); + if (field.type() instanceof MutableRowType) { + fields.set( + i, + new DataField( + field.id(), + field.name(), + ((MutableRowType) field.type()).toRowType(), + field.description())); + } + } + return new org.apache.paimon.types.RowType(isNullable, fields); + } + + public List getFields() { + return fields; + } + + public List getFieldNames() { + return fields.stream().map(DataField::name).collect(Collectors.toList()); + } + + public List getFieldTypes() { + return fields.stream().map(DataField::type).collect(Collectors.toList()); + } + + public int getFieldCount() { + return fields.size(); + } + + public boolean containsField(String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return true; + } + } + return false; + } + + public DataField getField(String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return field; + } + } + + throw new RuntimeException("Cannot find field: " + fieldName); + } + + public DataField getField(int fieldId) { + for (DataField field : fields) { + if (field.id() == fieldId) { + return field; + } + } + throw new RuntimeException("Cannot find field by field id: " + fieldId); + } + + public void appendDataField( + String name, int newId, org.apache.paimon.types.DataType type, String description) { + if (type instanceof org.apache.paimon.types.RowType) { + type = new MutableRowType((org.apache.paimon.types.RowType) type); + } + fields.add(new DataField(newId, name, type, description)); + } + + @Override + public int defaultSize() { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.paimon.types.DataType copy(boolean isNullable) { + throw new UnsupportedOperationException(); + } + + @Override + public String asSQLString() { + throw new UnsupportedOperationException(); + } + + @Override + public R accept(DataTypeVisitor visitor) { + throw new UnsupportedOperationException(); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java new file mode 100644 index 000000000000..d913830e8120 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.types.RowType; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** + * A {@link RowData} that provides a mapping view of the original {@link RowData} according to + * projection information. + */ +public class ProjectionRowData implements RowData, Serializable { + private final RowType producedDataType; + private final int[][] projectedFields; + private final int[] lastProjectedFields; + private transient RowData row; + + ProjectionRowData(RowType producedDataType, int[][] projectedFields) { + this.producedDataType = producedDataType; + this.projectedFields = projectedFields; + this.lastProjectedFields = new int[projectedFields.length]; + for (int i = 0; i < projectedFields.length; i++) { + this.lastProjectedFields[i] = projectedFields[i][projectedFields[i].length - 1]; + } + } + + public ProjectionRowData replaceRow(RowData row) { + this.row = row; + return this; + } + + public static @Nullable ProjectionRowData copy(@Nullable ProjectionRowData rowData) { + if (rowData == null) { + return null; + } + return new ProjectionRowData(rowData.producedDataType, rowData.projectedFields); + } + + @Override + public int getArity() { + return projectedFields.length; + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind rowKind) { + row.setRowKind(rowKind); + } + + @Override + public boolean isNullAt(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return true; + } + return rowData.isNullAt(lastProjectedFields[i]); + } + + private @Nullable RowData extractInternalRow(int i) { + int[] projectedField = projectedFields[i]; + RowData rowData = this.row; + RowType dataType = producedDataType; + for (int j = 0; j < projectedField.length - 1; j++) { + dataType = (RowType) dataType.getTypeAt(projectedField[j]); + if (rowData.isNullAt(projectedField[j])) { + return null; + } + rowData = rowData.getRow(projectedField[j], dataType.getFieldCount()); + } + return rowData; + } + + @Override + public boolean getBoolean(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getBoolean(lastProjectedFields[i]); + } + + @Override + public byte getByte(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getByte(lastProjectedFields[i]); + } + + @Override + public short getShort(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getShort(lastProjectedFields[i]); + } + + @Override + public int getInt(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getInt(lastProjectedFields[i]); + } + + @Override + public long getLong(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getLong(lastProjectedFields[i]); + } + + @Override + public float getFloat(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getFloat(lastProjectedFields[i]); + } + + @Override + public double getDouble(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + throw new NullPointerException(); + } + return rowData.getDouble(lastProjectedFields[i]); + } + + @Override + public StringData getString(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getString(lastProjectedFields[i]); + } + + @Override + public DecimalData getDecimal(int i, int i1, int i2) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getDecimal(lastProjectedFields[i], i1, i2); + } + + @Override + public TimestampData getTimestamp(int i, int i1) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getTimestamp(lastProjectedFields[i], i1); + } + + @Override + public RawValueData getRawValue(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getRawValue(lastProjectedFields[i]); + } + + @Override + public byte[] getBinary(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getBinary(lastProjectedFields[i]); + } + + @Override + public ArrayData getArray(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getArray(lastProjectedFields[i]); + } + + @Override + public MapData getMap(int i) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getMap(lastProjectedFields[i]); + } + + @Override + public RowData getRow(int i, int i1) { + RowData rowData = extractInternalRow(i); + if (rowData == null) { + return null; + } + return rowData.getRow(lastProjectedFields[i], i1); + } + + @VisibleForTesting + public int[][] getProjectedFields() { + return projectedFields; + } + + @VisibleForTesting + public RowType getRowType() { + return producedDataType; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java index 559976921e2e..45f9b876a63e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.options.Options; import org.apache.paimon.table.BucketMode; @@ -48,15 +49,16 @@ public class ContinuousFileStoreSource extends FlinkSource { public ContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit) { - this(readBuilder, options, limit, BucketMode.HASH_FIXED); + this(readBuilder, options, limit, BucketMode.HASH_FIXED, null); } public ContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit, - BucketMode bucketMode) { - super(readBuilder, limit); + BucketMode bucketMode, + @Nullable ProjectionRowData rowData) { + super(readBuilder, limit, rowData); this.options = options; this.bucketMode = bucketMode; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java index 8fc78c868ba5..17761c7fee86 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.table.source.TableRead; @@ -48,7 +49,8 @@ public FileStoreSourceReader( TableRead tableRead, FileStoreSourceReaderMetrics metrics, IOManager ioManager, - @Nullable Long limit) { + @Nullable Long limit, + @Nullable ProjectionRowData rowData) { // limiter is created in SourceReader, it can be shared in all split readers super( () -> @@ -56,7 +58,7 @@ public FileStoreSourceReader( tableRead, RecordLimiter.create(limit), metrics), (element, output, state) -> FlinkRecordsWithSplitIds.emitRecord( - readerContext, element, output, state, metrics), + readerContext, element, output, state, metrics, rowData), readerContext.getConfiguration(), readerContext); this.ioManager = ioManager; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java index ecb304f83c58..acb92d35ecf9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.utils.Reference; @@ -109,7 +110,8 @@ public static void emitRecord( RecordIterator element, SourceOutput output, FileStoreSourceSplitState state, - FileStoreSourceReaderMetrics metrics) { + FileStoreSourceReaderMetrics metrics, + @Nullable ProjectionRowData projectionRowData) { long timestamp = TimestampAssigner.NO_TIMESTAMP; if (metrics.getLatestFileCreationTime() != FileStoreSourceReaderMetrics.UNDEFINED) { timestamp = metrics.getLatestFileCreationTime(); @@ -131,7 +133,11 @@ public static void emitRecord( numRecordsIn.inc(); } - output.collect(record.getRecord(), timestamp); + RowData rowData = record.getRecord(); + if (projectionRowData != null) { + rowData = projectionRowData.replaceRow(rowData); + } + output.collect(rowData, timestamp); state.setPosition(record); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java index 7643f7b775d3..ee2cd769e5eb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.ReadBuilder; @@ -43,10 +44,13 @@ public abstract class FlinkSource protected final ReadBuilder readBuilder; @Nullable protected final Long limit; + @Nullable protected final ProjectionRowData rowData; - public FlinkSource(ReadBuilder readBuilder, @Nullable Long limit) { + public FlinkSource( + ReadBuilder readBuilder, @Nullable Long limit, @Nullable ProjectionRowData rowData) { this.readBuilder = readBuilder; this.limit = limit; + this.rowData = rowData; } @Override @@ -56,7 +60,12 @@ public SourceReader createReader(SourceReaderCont FileStoreSourceReaderMetrics sourceReaderMetrics = new FileStoreSourceReaderMetrics(context.metricGroup()); return new FileStoreSourceReader( - context, readBuilder.newRead(), sourceReaderMetrics, ioManager, limit); + context, + readBuilder.newRead(), + sourceReaderMetrics, + ioManager, + limit, + ProjectionRowData.copy(rowData)); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index b85d5274b241..ad8f14286d25 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.StreamingReadMode; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.Projection; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource; @@ -171,9 +172,12 @@ FlinkSourceBuilder logSourceProvider(LogSourceProvider logSourceProvider) { return this; } - private ReadBuilder createReadBuilder() { - ReadBuilder readBuilder = - table.newReadBuilder().withProjection(projectedFields).withFilter(predicate); + private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType readType) { + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + readBuilder.withFilter(predicate); if (limit != null) { readBuilder.withLimit(limit.intValue()); } @@ -184,24 +188,33 @@ private DataStream buildStaticFileSource() { Options options = Options.fromMap(table.options()); return toDataStream( new StaticFileStoreSource( - createReadBuilder(), + createReadBuilder(projectedRowType()), limit, options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE), options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE), - dynamicPartitionFilteringInfo)); + dynamicPartitionFilteringInfo, + projectedRowData())); } private DataStream buildContinuousFileSource() { return toDataStream( new ContinuousFileStoreSource( - createReadBuilder(), table.options(), limit, bucketMode)); + createReadBuilder(projectedRowType()), + table.options(), + limit, + bucketMode, + projectedRowData())); } private DataStream buildAlignedContinuousFileSource() { assertStreamingConfigurationForAlignMode(env); return toDataStream( new AlignedContinuousFileStoreSource( - createReadBuilder(), table.options(), limit, bucketMode)); + createReadBuilder(projectedRowType()), + table.options(), + limit, + bucketMode, + projectedRowData())); } private DataStream toDataStream(Source source) { @@ -237,6 +250,20 @@ private TypeInformation produceTypeInfo() { return InternalTypeInfo.of(produceType); } + private @Nullable org.apache.paimon.types.RowType projectedRowType() { + return Optional.ofNullable(projectedFields) + .map(Projection::of) + .map(p -> p.project(table.rowType())) + .orElse(null); + } + + private @Nullable ProjectionRowData projectedRowData() { + return Optional.ofNullable(projectedFields) + .map(Projection::of) + .map(p -> p.getRowData(table.rowType())) + .orElse(null); + } + /** Build source {@link DataStream} with {@link RowData}. */ public DataStream buildForRow() { DataType rowType = fromLogicalToDataType(toLogicalType(table.rowType())); @@ -280,7 +307,10 @@ public DataStream build() { return toDataStream( HybridSource.builder( LogHybridSourceFactory.buildHybridFirstSource( - table, projectedFields, predicate)) + table, + projectedRowType(), + predicate, + projectedRowData())) .addSource( new LogHybridSourceFactory(logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED) @@ -310,12 +340,13 @@ private DataStream buildContinuousStreamOperator() { env, sourceName, produceTypeInfo(), - createReadBuilder(), + createReadBuilder(projectedRowType()), conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(), watermarkStrategy == null, conf.get( FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION), - bucketMode); + bucketMode, + projectedRowData()); if (parallelism != null) { dataStream.getTransformation().setParallelism(parallelism); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 12b579589d0f..741754fc3376 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -40,6 +40,7 @@ import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,11 +124,11 @@ public Result applyFilters(List filters) { @Override public boolean supportsNestedProjection() { - return false; + return true; } @Override - public void applyProjection(int[][] projectedFields) { + public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.projectFields = projectedFields; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java index 90c283bf87c9..f1ade3de9130 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.options.Options; @@ -29,6 +30,7 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.api.connector.source.Boundedness; @@ -69,7 +71,10 @@ public LogHybridSourceFactory(LogSourceProvider provider) { } public static FlinkSource buildHybridFirstSource( - Table table, @Nullable int[][] projectedFields, @Nullable Predicate predicate) { + Table table, + @Nullable RowType readType, + @Nullable Predicate predicate, + @Nullable ProjectionRowData rowData) { if (!(table instanceof DataTable)) { throw new UnsupportedOperationException( String.format( @@ -79,10 +84,16 @@ public static FlinkSource buildHybridFirstSource( DataTable dataTable = (DataTable) table; + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + return new FlinkHybridFirstSource( - table.newReadBuilder().withProjection(projectedFields).withFilter(predicate), + readBuilder.withFilter(predicate), dataTable.snapshotManager(), - dataTable.coreOptions().toConfiguration()); + dataTable.coreOptions().toConfiguration(), + rowData); } /** The first source of a log {@link HybridSource}. */ @@ -94,8 +105,11 @@ private static class FlinkHybridFirstSource extends FlinkSource { private final Options options; public FlinkHybridFirstSource( - ReadBuilder readBuilder, SnapshotManager snapshotManager, Options options) { - super(readBuilder, null); + ReadBuilder readBuilder, + SnapshotManager snapshotManager, + Options options, + @Nullable ProjectionRowData rowData) { + super(readBuilder, null, rowData); this.snapshotManager = snapshotManager; this.options = options; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java index af425aab5e46..48b977559b2e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner; import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; @@ -53,7 +54,7 @@ public StaticFileStoreSource( @Nullable Long limit, int splitBatchSize, SplitAssignMode splitAssignMode) { - this(readBuilder, limit, splitBatchSize, splitAssignMode, null); + this(readBuilder, limit, splitBatchSize, splitAssignMode, null, null); } public StaticFileStoreSource( @@ -61,8 +62,9 @@ public StaticFileStoreSource( @Nullable Long limit, int splitBatchSize, SplitAssignMode splitAssignMode, - @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { - super(readBuilder, limit); + @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo, + @Nullable ProjectionRowData rowData) { + super(readBuilder, limit, rowData); this.splitBatchSize = splitBatchSize; this.splitAssignMode = splitAssignMode; this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index e914c617fffb..ec4683eadbcc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -20,8 +20,11 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.PaimonDataStreamScanProvider; +import org.apache.paimon.flink.Projection; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; @@ -32,8 +35,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; -import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; import org.apache.flink.table.data.RowData; import javax.annotation.Nullable; @@ -80,13 +81,29 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { Source source; - ReadBuilder readBuilder = - table.newReadBuilder().withProjection(projectFields).withFilter(predicate); + + ProjectionRowData rowData = null; + org.apache.paimon.types.RowType readType = null; + if (projectFields != null) { + Projection projection = Projection.of(projectFields); + rowData = projection.getRowData(table.rowType()); + readType = projection.project(table.rowType()); + } + + ReadBuilder readBuilder = table.newReadBuilder(); + if (readType != null) { + readBuilder.withReadType(readType); + } + readBuilder.withFilter(predicate); if (isStreamingMode && table instanceof DataTable) { - source = new ContinuousFileStoreSource(readBuilder, table.options(), limit); + source = + new ContinuousFileStoreSource( + readBuilder, table.options(), limit, BucketMode.HASH_FIXED, rowData); } else { - source = new StaticFileStoreSource(readBuilder, limit, splitBatchSize, splitAssignMode); + source = + new StaticFileStoreSource( + readBuilder, limit, splitBatchSize, splitAssignMode, null, rowData); } return new PaimonDataStreamScanProvider( source.getBoundedness() == Boundedness.BOUNDED, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java index 705e1d9a7a4c..01b6c3f59ece 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.ContinuousFileStoreSource; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.PendingSplitsCheckpoint; @@ -52,8 +53,9 @@ public AlignedContinuousFileStoreSource( ReadBuilder readBuilder, Map options, @Nullable Long limit, - BucketMode bucketMode) { - super(readBuilder, options, limit, bucketMode); + BucketMode bucketMode, + @Nullable ProjectionRowData rowData) { + super(readBuilder, options, limit, bucketMode, rowData); } @Override @@ -72,8 +74,8 @@ public SourceReader createReader(SourceReaderCont ioManager, limit, new FutureCompletingBlockingQueue<>( - context.getConfiguration() - .get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY))); + context.getConfiguration().get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)), + rowData); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java index a8ffe3de561f..9efd02453da8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source.align; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.FileStoreSourceReader; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.FileStoreSourceSplitState; @@ -57,8 +58,9 @@ public AlignedSourceReader( IOManager ioManager, @Nullable Long limit, FutureCompletingBlockingQueue>> - elementsQueue) { - super(readerContext, tableRead, metrics, ioManager, limit); + elementsQueue, + @Nullable ProjectionRowData rowData) { + super(readerContext, tableRead, metrics, ioManager, limit, rowData); this.elementsQueue = elementsQueue; this.nextCheckpointId = null; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java index 4ec0a4f99d9f..aecd3b2bc991 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source.operator; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; import org.apache.paimon.flink.source.SimpleSourceSplit; @@ -210,7 +211,8 @@ public static DataStream buildSource( long monitorInterval, boolean emitSnapshotWatermark, boolean shuffleBucketWithPartition, - BucketMode bucketMode) { + BucketMode bucketMode, + ProjectionRowData projectionRowData) { SingleOutputStreamOperator singleOutputStreamOperator = env.fromSource( new MonitorSource( @@ -227,7 +229,7 @@ public static DataStream buildSource( singleOutputStreamOperator, shuffleBucketWithPartition); return sourceDataStream.transform( - name + "-Reader", typeInfo, new ReadOperator(readBuilder)); + name + "-Reader", typeInfo, new ReadOperator(readBuilder, projectionRowData)); } private static DataStream shuffleUnwareBucket( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index ccc66194560e..815722a4e705 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkRowData; +import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; @@ -36,6 +37,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; +import javax.annotation.Nullable; + /** * The operator that reads the {@link Split splits} received from the preceding {@link * MonitorSource}. Contrary to the {@link MonitorSource} which has a parallelism of 1, this operator @@ -47,6 +50,7 @@ public class ReadOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; private final ReadBuilder readBuilder; + @Nullable private final ProjectionRowData projectionRowData; private transient TableRead read; private transient StreamRecord reuseRecord; @@ -61,8 +65,9 @@ public class ReadOperator extends AbstractStreamOperator private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; private transient Counter numRecordsIn; - public ReadOperator(ReadBuilder readBuilder) { + public ReadOperator(ReadBuilder readBuilder, @Nullable ProjectionRowData projectionRowData) { this.readBuilder = readBuilder; + this.projectionRowData = projectionRowData; } @Override @@ -85,7 +90,12 @@ public void open() throws Exception { .getSpillingDirectoriesPaths()); this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); - this.reuseRecord = new StreamRecord<>(reuseRow); + if (projectionRowData != null) { + projectionRowData.replaceRow(this.reuseRow); + this.reuseRecord = new StreamRecord<>(projectionRowData); + } else { + this.reuseRecord = new StreamRecord<>(reuseRow); + } this.idlingStarted(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java new file mode 100644 index 000000000000..eec145412f1a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link Projection}. */ +public class ProjectionTest { + @Test + public void testNestedProjection() { + RowType writeType = + DataTypes.ROW( + DataTypes.FIELD(0, "f0", DataTypes.INT()), + DataTypes.FIELD( + 1, + "f1", + DataTypes.ROW( + DataTypes.FIELD(2, "f0", DataTypes.INT()), + DataTypes.FIELD(3, "f1", DataTypes.INT()), + DataTypes.FIELD(4, "f2", DataTypes.INT())))); + + // skip read f0, f1.f1 + RowType readType = + DataTypes.ROW( + DataTypes.FIELD( + 1, + "f1", + DataTypes.ROW( + DataTypes.FIELD(2, "f0", DataTypes.INT()), + DataTypes.FIELD(4, "f2", DataTypes.INT())))); + + Projection projection = Projection.of(new int[][] {{1, 0}, {1, 2}}); + assertThat(projection.project(writeType)).isEqualTo(readType); + + RowType readTypeForFlink = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "f1", + DataTypes.ROW( + DataTypes.FIELD(0, "f0", DataTypes.INT()), + DataTypes.FIELD(1, "f2", DataTypes.INT())))); + + ProjectionRowData rowData = projection.getRowData(writeType); + + assertThat(rowData.getRowType()).isEqualTo(readTypeForFlink); + + assertThat(rowData.getProjectedFields()).isEqualTo(new int[][] {{0, 0}, {0, 1}}); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java index 13613dab063a..24f35cdfdb8c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java @@ -129,7 +129,7 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception { public void logHybridFileStoreSourceScanMetricsTest() throws Exception { writeOnce(); FlinkSource logHybridFileStoreSource = - LogHybridSourceFactory.buildHybridFirstSource(table, null, null); + LogHybridSourceFactory.buildHybridFirstSource(table, null, null, null); logHybridFileStoreSource.restoreEnumerator(context, null); assertThat(TestingMetricUtils.getGauge(scanMetricGroup, "lastScannedManifests").getValue()) .isEqualTo(1L); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java index 882763cf74da..608daa85978d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java @@ -141,6 +141,7 @@ protected FileStoreSourceReader createReader(TestingReaderContext context) { new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(), new FileStoreSourceReaderMetrics(new DummyMetricGroup()), IOManager.create(tempDir.toString()), + null, null); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java index 7f5f2f174d6f..eb83790ec277 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java @@ -69,7 +69,8 @@ public void testEmitRecord() { iterator, output, state, - new FileStoreSourceReaderMetrics(new DummyMetricGroup())); + new FileStoreSourceReaderMetrics(new DummyMetricGroup()), + null); assertThat(output.getEmittedRecords()).containsExactly(rows); assertThat(state.recordsToSkip()).isEqualTo(2); assertThat(records.nextRecordFromSplit()).isNull(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java new file mode 100644 index 000000000000..4f938a75edae --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.flink.CatalogITCaseBase; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.api.ExplainFormat; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** Tests for {@link SupportsProjectionPushDown}. */ +public class ProjectionPushDownITCase extends CatalogITCaseBase { + + @Override + public List ddl() { + return ImmutableList.of( + "CREATE TABLE T (" + + "a INT, b ROW, c STRING, d ROW, d1 BOOLEAN, d2 INT>) PARTITIONED BY (a);"); + } + + @BeforeEach + @Override + public void before() throws IOException { + super.before(); + batchSql( + "INSERT INTO T VALUES " + + "(1, ROW(10, 'value1'), '1', ROW(ROW('valued1', 1), true, 10)), " + + "(1, ROW(20, 'value2'), '2', ROW(ROW('valued2', 2), false, 20)), " + + "(2, ROW(30, 'value3'), '3', ROW(ROW('valued3', 3), true, 30)), " + + "(3, ROW(30, 'value3'), '3', ROW(ROW('valued3', 3), false, 30))"); + } + + @Test + public void testProjectionPushDown() { + String sql = "SELECT a, c FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, c]]], fields=[a, c])", + Row.ofKind(RowKind.INSERT, 1, "1"), + Row.ofKind(RowKind.INSERT, 1, "2"), + Row.ofKind(RowKind.INSERT, 2, "3"), + Row.ofKind(RowKind.INSERT, 3, "3")); + } + + @Test + public void testNestedProjectionPushDown() { + String sql = "SELECT a, b.b1 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, b_b1]]], fields=[a, b_b1])", + Row.ofKind(RowKind.INSERT, 1, "value1"), + Row.ofKind(RowKind.INSERT, 1, "value2"), + Row.ofKind(RowKind.INSERT, 2, "value3"), + Row.ofKind(RowKind.INSERT, 3, "value3")); + } + + @Test + public void testNestedProjectionPushDownTripleLevel() { + String sql = "SELECT a, d.d0.d00 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, d_d0_d00]]], fields=[a, d_d0_d00])", + Row.ofKind(RowKind.INSERT, 1, "valued1"), + Row.ofKind(RowKind.INSERT, 1, "valued2"), + Row.ofKind(RowKind.INSERT, 2, "valued3"), + Row.ofKind(RowKind.INSERT, 3, "valued3")); + } + + @Test + public void testNestedProjectionPushDownMultipleFields() { + String sql = "SELECT a, b.b1, d.d2 FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[a, b_b1, d_d2]]], fields=[a, b_b1, d_d2])", + Row.ofKind(RowKind.INSERT, 1, "value1", 10), + Row.ofKind(RowKind.INSERT, 1, "value2", 20), + Row.ofKind(RowKind.INSERT, 2, "value3", 30), + Row.ofKind(RowKind.INSERT, 3, "value3", 30)); + } + + @Test + public void testMultipleNestedProjectionPushDownWithUnorderedColumns() { + String sql = "SELECT c, d.d1, b.b1, a FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[c, d_d1, b_b1, a]]], fields=[c, d_d1, b_b1, a])", + Row.ofKind(RowKind.INSERT, "1", true, "value1", 1), + Row.ofKind(RowKind.INSERT, "2", false, "value2", 1), + Row.ofKind(RowKind.INSERT, "3", true, "value3", 2), + Row.ofKind(RowKind.INSERT, "3", false, "value3", 3)); + } + + private void assertPlanAndResult(String sql, String planIdentifier, Row... expectedRows) { + String plan = tEnv.explainSql(sql, ExplainFormat.TEXT); + String[] lines = plan.split("\n"); + String trimmed = Arrays.stream(lines).map(String::trim).collect(Collectors.joining("\n")); + Assertions.assertThat(trimmed).contains(planIdentifier); + List result = batchSql(sql); + Assertions.assertThat(result).containsExactlyInAnyOrder(expectedRows); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java index c36af0f6dcbc..f815dbe6321b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java @@ -77,6 +77,7 @@ protected FileStoreSourceReader createReader(TestingReaderContext context) { new FileStoreSourceReaderMetrics(new DummyMetricGroup()), IOManager.create(tempDir.toString()), null, - new FutureCompletingBlockingQueue<>(2)); + new FutureCompletingBlockingQueue<>(2), + null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 0cd969707cfa..6c4c7860e5fa 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -183,7 +183,7 @@ public void testMonitorSource() throws Exception { @Test public void testReadOperator() throws Exception { - ReadOperator readOperator = new ReadOperator(table.newReadBuilder()); + ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(readOperator); harness.setup( @@ -205,7 +205,7 @@ public void testReadOperator() throws Exception { @Test public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { - ReadOperator readOperator = new ReadOperator(table.newReadBuilder()); + ReadOperator readOperator = new ReadOperator(table.newReadBuilder(), null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(readOperator); harness.setup( From 1c10be103fde3e1cb590fbdf3ca2fcffe61d2d25 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 16 Dec 2024 20:23:25 +0800 Subject: [PATCH 3/8] [flink] Fix implementation of ProjectionRowData and misc comments --- .../paimon/utils/FormatReaderMapping.java | 1 + .../org/apache/paimon/flink/Projection.java | 16 +- .../paimon/flink/ProjectionRowData.java | 155 ++++++------------ .../flink/source/FlinkSourceBuilder.java | 14 +- .../flink/source/SystemTableSource.java | 2 +- .../apache/paimon/flink/ProjectionTest.java | 5 +- 6 files changed, 73 insertions(+), 120 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index 5a882f2faf00..9dd234cd2055 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -254,6 +254,7 @@ private List readDataFields(List allDataFields) { return readDataFields; } + @Nullable private DataType pruneDataType(DataType readType, DataType dataType) { switch (readType.getTypeRoot()) { case ROW: diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java index 2e5197b04cdb..29eadb5be3ee 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java @@ -33,6 +33,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; + /** * {@link Projection} represents a list of (possibly nested) indexes that can be used to project * data types. A row projection includes both reducing the accessible fields and reordering them. @@ -47,7 +49,7 @@ private Projection() {} public abstract org.apache.paimon.types.RowType project( org.apache.paimon.types.RowType rowType); - public abstract ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType); + public abstract ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType); /** @return {@code true} whether this projection is nested or not. */ public abstract boolean isNested(); @@ -149,8 +151,8 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r } @Override - public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { - return new NestedProjection(toNestedIndexes()).getRowData(rowType); + public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).getOuterProjectRow(rowType); } @Override @@ -241,7 +243,7 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r } @Override - public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { + public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { org.apache.paimon.types.RowType resultType = project(rowType); int[][] resultIndices = new int[this.projection.length][]; @@ -262,7 +264,7 @@ public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { } } - return new ProjectionRowData(resultType, resultIndices); + return new ProjectionRowData(toLogicalType(resultType), resultIndices); } @Override @@ -304,8 +306,8 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r } @Override - public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) { - return new NestedProjection(toNestedIndexes()).getRowData(rowType); + public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + return new NestedProjection(toNestedIndexes()).getOuterProjectRow(rowType); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java index d913830e8120..49c46dec8866 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java @@ -18,16 +18,17 @@ package org.apache.paimon.flink; -import org.apache.paimon.types.RowType; - import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RawValueData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import javax.annotation.Nullable; @@ -39,22 +40,48 @@ * projection information. */ public class ProjectionRowData implements RowData, Serializable { + private static final long serialVersionUID = 1L; + private final RowType producedDataType; private final int[][] projectedFields; - private final int[] lastProjectedFields; - private transient RowData row; + private final FieldGetter[][] fieldGetters; + + private transient GenericRowData row; ProjectionRowData(RowType producedDataType, int[][] projectedFields) { this.producedDataType = producedDataType; this.projectedFields = projectedFields; - this.lastProjectedFields = new int[projectedFields.length]; + this.fieldGetters = new FieldGetter[projectedFields.length][]; for (int i = 0; i < projectedFields.length; i++) { - this.lastProjectedFields[i] = projectedFields[i][projectedFields[i].length - 1]; + this.fieldGetters[i] = new FieldGetter[projectedFields[i].length]; + LogicalType currentType = producedDataType; + for (int j = 0; j < projectedFields[i].length; j++) { + currentType = ((RowType) currentType).getTypeAt(projectedFields[i][j]); + this.fieldGetters[i][j] = + RowData.createFieldGetter(currentType, projectedFields[i][j]); + } } } - public ProjectionRowData replaceRow(RowData row) { - this.row = row; + public ProjectionRowData replaceRow(RowData inputRow) { + if (this.row == null) { + this.row = new GenericRowData(inputRow.getRowKind(), fieldGetters.length); + } + + for (int i = 0; i < fieldGetters.length; i++) { + Object currentRow = inputRow; + for (int j = 0; j < fieldGetters[i].length; j++) { + if (currentRow == null) { + break; + } + currentRow = this.fieldGetters[i][j].getFieldOrNull((RowData) currentRow); + } + this.row.setField(i, currentRow); + } + + if (inputRow != null) { + this.row.setRowKind(inputRow.getRowKind()); + } return this; } @@ -67,7 +94,7 @@ public ProjectionRowData replaceRow(RowData row) { @Override public int getArity() { - return projectedFields.length; + return this.row.getArity(); } @Override @@ -82,160 +109,82 @@ public void setRowKind(RowKind rowKind) { @Override public boolean isNullAt(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return true; - } - return rowData.isNullAt(lastProjectedFields[i]); - } - - private @Nullable RowData extractInternalRow(int i) { - int[] projectedField = projectedFields[i]; - RowData rowData = this.row; - RowType dataType = producedDataType; - for (int j = 0; j < projectedField.length - 1; j++) { - dataType = (RowType) dataType.getTypeAt(projectedField[j]); - if (rowData.isNullAt(projectedField[j])) { - return null; - } - rowData = rowData.getRow(projectedField[j], dataType.getFieldCount()); - } - return rowData; + return this.row.isNullAt(i); } @Override public boolean getBoolean(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - throw new NullPointerException(); - } - return rowData.getBoolean(lastProjectedFields[i]); + return this.row.getBoolean(i); } @Override public byte getByte(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - throw new NullPointerException(); - } - return rowData.getByte(lastProjectedFields[i]); + return this.row.getByte(i); } @Override public short getShort(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - throw new NullPointerException(); - } - return rowData.getShort(lastProjectedFields[i]); + return this.row.getShort(i); } @Override public int getInt(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - throw new NullPointerException(); - } - return rowData.getInt(lastProjectedFields[i]); + return this.row.getInt(i); } @Override public long getLong(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - throw new NullPointerException(); - } - return rowData.getLong(lastProjectedFields[i]); + return this.row.getLong(i); } @Override public float getFloat(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - throw new NullPointerException(); - } - return rowData.getFloat(lastProjectedFields[i]); + return this.row.getFloat(i); } @Override public double getDouble(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - throw new NullPointerException(); - } - return rowData.getDouble(lastProjectedFields[i]); + return this.row.getDouble(i); } @Override public StringData getString(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return null; - } - return rowData.getString(lastProjectedFields[i]); + return this.row.getString(i); } @Override public DecimalData getDecimal(int i, int i1, int i2) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return null; - } - return rowData.getDecimal(lastProjectedFields[i], i1, i2); + return this.row.getDecimal(i, i1, i2); } @Override public TimestampData getTimestamp(int i, int i1) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return null; - } - return rowData.getTimestamp(lastProjectedFields[i], i1); + return this.row.getTimestamp(i, i1); } @Override public RawValueData getRawValue(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return null; - } - return rowData.getRawValue(lastProjectedFields[i]); + return this.row.getRawValue(i); } @Override public byte[] getBinary(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return null; - } - return rowData.getBinary(lastProjectedFields[i]); + return this.row.getBinary(i); } @Override public ArrayData getArray(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return null; - } - return rowData.getArray(lastProjectedFields[i]); + return this.row.getArray(i); } @Override public MapData getMap(int i) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return null; - } - return rowData.getMap(lastProjectedFields[i]); + return this.row.getMap(i); } @Override public RowData getRow(int i, int i1) { - RowData rowData = extractInternalRow(i); - if (rowData == null) { - return null; - } - return rowData.getRow(lastProjectedFields[i], i1); + return this.row.getRow(i, i1); } @VisibleForTesting diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index ad8f14286d25..c67f6a6e5a3c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -193,7 +193,7 @@ private DataStream buildStaticFileSource() { options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE), options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE), dynamicPartitionFilteringInfo, - projectedRowData())); + outerProject())); } private DataStream buildContinuousFileSource() { @@ -203,7 +203,7 @@ private DataStream buildContinuousFileSource() { table.options(), limit, bucketMode, - projectedRowData())); + outerProject())); } private DataStream buildAlignedContinuousFileSource() { @@ -214,7 +214,7 @@ private DataStream buildAlignedContinuousFileSource() { table.options(), limit, bucketMode, - projectedRowData())); + outerProject())); } private DataStream toDataStream(Source source) { @@ -257,10 +257,10 @@ private TypeInformation produceTypeInfo() { .orElse(null); } - private @Nullable ProjectionRowData projectedRowData() { + private @Nullable ProjectionRowData outerProject() { return Optional.ofNullable(projectedFields) .map(Projection::of) - .map(p -> p.getRowData(table.rowType())) + .map(p -> p.getOuterProjectRow(table.rowType())) .orElse(null); } @@ -310,7 +310,7 @@ public DataStream build() { table, projectedRowType(), predicate, - projectedRowData())) + outerProject())) .addSource( new LogHybridSourceFactory(logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED) @@ -346,7 +346,7 @@ private DataStream buildContinuousStreamOperator() { conf.get( FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION), bucketMode, - projectedRowData()); + outerProject()); if (parallelism != null) { dataStream.getTransformation().setParallelism(parallelism); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index ec4683eadbcc..7e140ee40291 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -86,7 +86,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { org.apache.paimon.types.RowType readType = null; if (projectFields != null) { Projection projection = Projection.of(projectFields); - rowData = projection.getRowData(table.rowType()); + rowData = projection.getOuterProjectRow(table.rowType()); readType = projection.project(table.rowType()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java index eec145412f1a..df1c45c81625 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java @@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test; +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link Projection}. */ @@ -62,9 +63,9 @@ public void testNestedProjection() { DataTypes.FIELD(0, "f0", DataTypes.INT()), DataTypes.FIELD(1, "f2", DataTypes.INT())))); - ProjectionRowData rowData = projection.getRowData(writeType); + ProjectionRowData rowData = projection.getOuterProjectRow(writeType); - assertThat(rowData.getRowType()).isEqualTo(readTypeForFlink); + assertThat(rowData.getRowType()).isEqualTo(toLogicalType(readTypeForFlink)); assertThat(rowData.getProjectedFields()).isEqualTo(new int[][] {{0, 0}, {0, 1}}); } From b4e68f65acb258cb5fc3a8ddbdd23899e2895601 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Tue, 17 Dec 2024 11:24:16 +0800 Subject: [PATCH 4/8] [flink] Fix replace logic for reused row --- .../org/apache/paimon/flink/source/operator/ReadOperator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 815722a4e705..20b9e75313cb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -91,7 +91,6 @@ public void open() throws Exception { this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); if (projectionRowData != null) { - projectionRowData.replaceRow(this.reuseRow); this.reuseRecord = new StreamRecord<>(projectionRowData); } else { this.reuseRecord = new StreamRecord<>(reuseRow); @@ -126,6 +125,9 @@ public void processElement(StreamRecord record) throws Exception { } reuseRow.replace(iterator.next()); + if (projectionRowData != null) { + projectionRowData.replaceRow(this.reuseRow); + } output.collect(reuseRecord); } } From a8dece2442f6e5a4ab6f5c0e862d2df18bb0d114 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Tue, 17 Dec 2024 14:45:28 +0800 Subject: [PATCH 5/8] [flink] Avoid data copy in non-nested cases --- .../java/org/apache/paimon/flink/Projection.java | 9 +++++++++ .../flink/source/ProjectionPushDownITCase.java | 12 ++++++++++++ 2 files changed, 21 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java index 29eadb5be3ee..863b6e74c55b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java @@ -209,6 +209,11 @@ public RowType project(RowType rowType) { @Override public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) { + if (!nested) { + return rowType.project( + Arrays.stream(this.projection).mapToInt(x -> x[0]).toArray()); + } + MutableRowType result = new MutableRowType(rowType.isNullable(), Collections.emptyList()); for (int[] indexPath : this.projection) { @@ -244,6 +249,10 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r @Override public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + if (!nested) { + return null; + } + org.apache.paimon.types.RowType resultType = project(rowType); int[][] resultIndices = new int[this.projection.length][]; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java index 4f938a75edae..940d7daae395 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java @@ -69,6 +69,18 @@ public void testProjectionPushDown() { Row.ofKind(RowKind.INSERT, 3, "3")); } + @Test + public void testProjectionPushDownWithUnorderedColumns() { + String sql = "SELECT c, a FROM T"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T, project=[c, a]]], fields=[c, a])", + Row.ofKind(RowKind.INSERT, "1", 1), + Row.ofKind(RowKind.INSERT, "2", 1), + Row.ofKind(RowKind.INSERT, "3", 2), + Row.ofKind(RowKind.INSERT, "3", 3)); + } + @Test public void testNestedProjectionPushDown() { String sql = "SELECT a, b.b1 FROM T"; From 7e6cddc67863d0e2cf47b306782df0a5ca995a9b Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Thu, 19 Dec 2024 15:58:07 +0800 Subject: [PATCH 6/8] [flink] Optimize ProjectionRowData and add tests for system table --- .../org/apache/paimon/flink/Projection.java | 57 +++---- .../paimon/flink/ProjectionRowData.java | 140 ++++++++++++------ .../source/ProjectionPushDownITCase.java | 9 ++ 3 files changed, 122 insertions(+), 84 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java index 863b6e74c55b..51e72b9cf12a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java @@ -31,7 +31,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; @@ -223,8 +222,7 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r for (index = 0; index < indexPath.length - 1; index++) { String fieldName = sourceType.getFieldNames().get(indexPath[index]); DataField field = sourceType.getField(fieldName); - sourceType = - (org.apache.paimon.types.RowType) sourceType.getField(fieldName).type(); + sourceType = (org.apache.paimon.types.RowType) field.type(); if (!targetType.containsField(fieldName)) { targetType.appendDataField( fieldName, @@ -239,10 +237,7 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r String fieldName = sourceType.getFieldNames().get(indexPath[index]); DataField field = sourceType.getField(fieldName); targetType.appendDataField( - fieldName, - field.id(), - sourceType.getField(fieldName).type(), - field.description()); + fieldName, field.id(), field.type(), field.description()); } return result.toRowType(); } @@ -341,21 +336,21 @@ public int[][] toNestedIndexes() { * *

It is mutable in aspect of the {@link #appendDataField} method. */ - public static class MutableRowType extends org.apache.paimon.types.DataType { + private static class MutableRowType extends org.apache.paimon.types.DataType { private final List fields; private final boolean isNullable; - public MutableRowType(org.apache.paimon.types.RowType rowType) { + private MutableRowType(org.apache.paimon.types.RowType rowType) { this(rowType.isNullable(), rowType.getFields()); } - public MutableRowType(boolean isNullable, List fields) { + private MutableRowType(boolean isNullable, List fields) { super(isNullable, DataTypeRoot.ROW); this.fields = new ArrayList<>(fields); this.isNullable = isNullable; } - public org.apache.paimon.types.RowType toRowType() { + private org.apache.paimon.types.RowType toRowType() { for (int i = 0; i < fields.size(); i++) { DataField field = fields.get(i); if (field.type() instanceof MutableRowType) { @@ -371,23 +366,7 @@ public org.apache.paimon.types.RowType toRowType() { return new org.apache.paimon.types.RowType(isNullable, fields); } - public List getFields() { - return fields; - } - - public List getFieldNames() { - return fields.stream().map(DataField::name).collect(Collectors.toList()); - } - - public List getFieldTypes() { - return fields.stream().map(DataField::type).collect(Collectors.toList()); - } - - public int getFieldCount() { - return fields.size(); - } - - public boolean containsField(String fieldName) { + private boolean containsField(String fieldName) { for (DataField field : fields) { if (field.name().equals(fieldName)) { return true; @@ -396,7 +375,7 @@ public boolean containsField(String fieldName) { return false; } - public DataField getField(String fieldName) { + private DataField getField(String fieldName) { for (DataField field : fields) { if (field.name().equals(fieldName)) { return field; @@ -406,17 +385,23 @@ public DataField getField(String fieldName) { throw new RuntimeException("Cannot find field: " + fieldName); } - public DataField getField(int fieldId) { + private void appendDataField( + String name, int newId, org.apache.paimon.types.DataType type, String description) { for (DataField field : fields) { - if (field.id() == fieldId) { - return field; + if (field.name().equals(name)) { + throw new IllegalStateException( + String.format( + "A field with name %s has already been appended. Existing fields: %s", + name, fields)); + } + if (field.id() == newId) { + throw new IllegalStateException( + String.format( + "A field with id %s has already been appended. Existing fields: %s", + newId, fields)); } } - throw new RuntimeException("Cannot find field by field id: " + fieldId); - } - public void appendDataField( - String name, int newId, org.apache.paimon.types.DataType type, String description) { if (type instanceof org.apache.paimon.types.RowType) { type = new MutableRowType((org.apache.paimon.types.RowType) type); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java index 49c46dec8866..2126b0bd20b1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java @@ -21,19 +21,19 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RawValueData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Arrays; +import java.util.function.BiFunction; /** * A {@link RowData} that provides a mapping view of the original {@link RowData} according to @@ -44,44 +44,34 @@ public class ProjectionRowData implements RowData, Serializable { private final RowType producedDataType; private final int[][] projectedFields; - private final FieldGetter[][] fieldGetters; + private final int[] lastProjectedFields; - private transient GenericRowData row; + private final Object[] cachedFields; + private final boolean[] isFieldsCached; + + private final boolean[] cachedNullAt; + private final boolean[] isNullAtCached; + + private transient RowData row; ProjectionRowData(RowType producedDataType, int[][] projectedFields) { this.producedDataType = producedDataType; this.projectedFields = projectedFields; - this.fieldGetters = new FieldGetter[projectedFields.length][]; + this.lastProjectedFields = new int[projectedFields.length]; for (int i = 0; i < projectedFields.length; i++) { - this.fieldGetters[i] = new FieldGetter[projectedFields[i].length]; - LogicalType currentType = producedDataType; - for (int j = 0; j < projectedFields[i].length; j++) { - currentType = ((RowType) currentType).getTypeAt(projectedFields[i][j]); - this.fieldGetters[i][j] = - RowData.createFieldGetter(currentType, projectedFields[i][j]); - } + this.lastProjectedFields[i] = projectedFields[i][projectedFields[i].length - 1]; } - } - public ProjectionRowData replaceRow(RowData inputRow) { - if (this.row == null) { - this.row = new GenericRowData(inputRow.getRowKind(), fieldGetters.length); - } - - for (int i = 0; i < fieldGetters.length; i++) { - Object currentRow = inputRow; - for (int j = 0; j < fieldGetters[i].length; j++) { - if (currentRow == null) { - break; - } - currentRow = this.fieldGetters[i][j].getFieldOrNull((RowData) currentRow); - } - this.row.setField(i, currentRow); - } + this.cachedFields = new Object[projectedFields.length]; + this.isFieldsCached = new boolean[projectedFields.length]; + this.cachedNullAt = new boolean[projectedFields.length]; + this.isNullAtCached = new boolean[projectedFields.length]; + } - if (inputRow != null) { - this.row.setRowKind(inputRow.getRowKind()); - } + public ProjectionRowData replaceRow(RowData row) { + this.row = row; + Arrays.fill(isFieldsCached, false); + Arrays.fill(isNullAtCached, false); return this; } @@ -94,7 +84,7 @@ public ProjectionRowData replaceRow(RowData inputRow) { @Override public int getArity() { - return this.row.getArity(); + return projectedFields.length; } @Override @@ -109,82 +99,136 @@ public void setRowKind(RowKind rowKind) { @Override public boolean isNullAt(int i) { - return this.row.isNullAt(i); + if (isNullAtCached[i]) { + return cachedNullAt[i]; + } + + RowData rowData = extractInternalRow(i); + boolean result; + if (rowData == null) { + result = true; + } else { + result = rowData.isNullAt(lastProjectedFields[i]); + } + + isNullAtCached[i] = true; + cachedNullAt[i] = result; + + return result; } @Override public boolean getBoolean(int i) { - return this.row.getBoolean(i); + return getFieldAs(i, RowData::getBoolean); } @Override public byte getByte(int i) { - return this.row.getByte(i); + return getFieldAs(i, RowData::getByte); } @Override public short getShort(int i) { - return this.row.getShort(i); + return getFieldAs(i, RowData::getShort); } @Override public int getInt(int i) { - return this.row.getInt(i); + return getFieldAs(i, RowData::getInt); } @Override public long getLong(int i) { - return this.row.getLong(i); + return getFieldAs(i, RowData::getLong); } @Override public float getFloat(int i) { - return this.row.getFloat(i); + return getFieldAs(i, RowData::getFloat); } @Override public double getDouble(int i) { - return this.row.getDouble(i); + return getFieldAs(i, RowData::getDouble); } @Override public StringData getString(int i) { - return this.row.getString(i); + return getFieldAs(i, RowData::getString); } @Override public DecimalData getDecimal(int i, int i1, int i2) { - return this.row.getDecimal(i, i1, i2); + return getFieldAs(i, (rowData, j) -> rowData.getDecimal(j, i1, i2)); } @Override public TimestampData getTimestamp(int i, int i1) { - return this.row.getTimestamp(i, i1); + return getFieldAs(i, (rowData, j) -> rowData.getTimestamp(j, i1)); } @Override public RawValueData getRawValue(int i) { - return this.row.getRawValue(i); + return getFieldAs(i, RowData::getRawValue); } @Override public byte[] getBinary(int i) { - return this.row.getBinary(i); + return getFieldAs(i, RowData::getBinary); } @Override public ArrayData getArray(int i) { - return this.row.getArray(i); + return getFieldAs(i, RowData::getArray); } @Override public MapData getMap(int i) { - return this.row.getMap(i); + return getFieldAs(i, RowData::getMap); } @Override public RowData getRow(int i, int i1) { - return this.row.getRow(i, i1); + return getFieldAs(i, (rowData, j) -> rowData.getRow(j, i1)); + } + + private @Nullable RowData extractInternalRow(int i) { + int[] projectedField = projectedFields[i]; + RowData rowData = this.row; + RowType dataType = producedDataType; + for (int j = 0; j < projectedField.length - 1; j++) { + dataType = (RowType) dataType.getTypeAt(projectedField[j]); + if (rowData.isNullAt(projectedField[j])) { + return null; + } + rowData = rowData.getRow(projectedField[j], dataType.getFieldCount()); + } + return rowData; + } + + @SuppressWarnings("unchecked") + private T getFieldAs(int i, BiFunction getter) { + if (isFieldsCached[i]) { + return (T) cachedFields[i]; + } + + RowData rowData = extractInternalRow(i); + T result; + if (rowData == null) { + isNullAtCached[i] = true; + cachedNullAt[i] = true; + isFieldsCached[i] = true; + cachedFields[i] = null; + result = null; + } else { + result = getter.apply(rowData, lastProjectedFields[i]); + isNullAtCached[i] = true; + cachedNullAt[i] = result == null; + isFieldsCached[i] = true; + cachedFields[i] = result; + } + + return result; } @VisibleForTesting diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java index 940d7daae395..62aa52c2d28e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ProjectionPushDownITCase.java @@ -129,6 +129,15 @@ public void testMultipleNestedProjectionPushDownWithUnorderedColumns() { Row.ofKind(RowKind.INSERT, "3", false, "value3", 3)); } + @Test + public void testSystemTableProjectionPushDown() { + String sql = "SELECT schema_id, primary_keys FROM T$schemas"; + assertPlanAndResult( + sql, + "TableSourceScan(table=[[PAIMON, default, T$schemas, project=[schema_id, primary_keys]]], fields=[schema_id, primary_keys])", + Row.ofKind(RowKind.INSERT, 0L, "[]")); + } + private void assertPlanAndResult(String sql, String planIdentifier, Row... expectedRows) { String plan = tEnv.explainSql(sql, ExplainFormat.TEXT); String[] lines = plan.split("\n"); From 57353c8d75cc3839aba945511ebd5425905f32b6 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Thu, 19 Dec 2024 17:09:32 +0800 Subject: [PATCH 7/8] [flink] Rename ProjectionRowData to NestedProjectedRowData --- ...RowData.java => NestedProjectedRowData.java} | 13 +++++++------ .../org/apache/paimon/flink/Projection.java | 11 ++++++----- .../flink/source/ContinuousFileStoreSource.java | 4 ++-- .../flink/source/FileStoreSourceReader.java | 4 ++-- .../flink/source/FlinkRecordsWithSplitIds.java | 8 ++++---- .../apache/paimon/flink/source/FlinkSource.java | 10 ++++++---- .../paimon/flink/source/FlinkSourceBuilder.java | 4 ++-- .../flink/source/LogHybridSourceFactory.java | 6 +++--- .../flink/source/StaticFileStoreSource.java | 4 ++-- .../paimon/flink/source/SystemTableSource.java | 4 ++-- .../align/AlignedContinuousFileStoreSource.java | 4 ++-- .../flink/source/align/AlignedSourceReader.java | 4 ++-- .../flink/source/operator/MonitorSource.java | 6 +++--- .../flink/source/operator/ReadOperator.java | 17 +++++++++-------- .../org/apache/paimon/flink/ProjectionTest.java | 2 +- 15 files changed, 53 insertions(+), 48 deletions(-) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/{ProjectionRowData.java => NestedProjectedRowData.java} (92%) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java similarity index 92% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java index 2126b0bd20b1..8c2026e24a12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectionRowData.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java @@ -37,9 +37,10 @@ /** * A {@link RowData} that provides a mapping view of the original {@link RowData} according to - * projection information. + * projection information. Compared with {@link ProjectedRowData}, this class supports nested + * projection. */ -public class ProjectionRowData implements RowData, Serializable { +public class NestedProjectedRowData implements RowData, Serializable { private static final long serialVersionUID = 1L; private final RowType producedDataType; @@ -54,7 +55,7 @@ public class ProjectionRowData implements RowData, Serializable { private transient RowData row; - ProjectionRowData(RowType producedDataType, int[][] projectedFields) { + NestedProjectedRowData(RowType producedDataType, int[][] projectedFields) { this.producedDataType = producedDataType; this.projectedFields = projectedFields; this.lastProjectedFields = new int[projectedFields.length]; @@ -68,18 +69,18 @@ public class ProjectionRowData implements RowData, Serializable { this.isNullAtCached = new boolean[projectedFields.length]; } - public ProjectionRowData replaceRow(RowData row) { + public NestedProjectedRowData replaceRow(RowData row) { this.row = row; Arrays.fill(isFieldsCached, false); Arrays.fill(isNullAtCached, false); return this; } - public static @Nullable ProjectionRowData copy(@Nullable ProjectionRowData rowData) { + public static @Nullable NestedProjectedRowData copy(@Nullable NestedProjectedRowData rowData) { if (rowData == null) { return null; } - return new ProjectionRowData(rowData.producedDataType, rowData.projectedFields); + return new NestedProjectedRowData(rowData.producedDataType, rowData.projectedFields); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java index 51e72b9cf12a..a6b99b132f01 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/Projection.java @@ -48,7 +48,8 @@ private Projection() {} public abstract org.apache.paimon.types.RowType project( org.apache.paimon.types.RowType rowType); - public abstract ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType); + public abstract NestedProjectedRowData getOuterProjectRow( + org.apache.paimon.types.RowType rowType); /** @return {@code true} whether this projection is nested or not. */ public abstract boolean isNested(); @@ -150,7 +151,7 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r } @Override - public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + public NestedProjectedRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { return new NestedProjection(toNestedIndexes()).getOuterProjectRow(rowType); } @@ -243,7 +244,7 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r } @Override - public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + public NestedProjectedRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { if (!nested) { return null; } @@ -268,7 +269,7 @@ public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowT } } - return new ProjectionRowData(toLogicalType(resultType), resultIndices); + return new NestedProjectedRowData(toLogicalType(resultType), resultIndices); } @Override @@ -310,7 +311,7 @@ public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType r } @Override - public ProjectionRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { + public NestedProjectedRowData getOuterProjectRow(org.apache.paimon.types.RowType rowType) { return new NestedProjection(toNestedIndexes()).getOuterProjectRow(rowType); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java index 45f9b876a63e..cc33ef167c7d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java @@ -20,7 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.options.Options; import org.apache.paimon.table.BucketMode; @@ -57,7 +57,7 @@ public ContinuousFileStoreSource( Map options, @Nullable Long limit, BucketMode bucketMode, - @Nullable ProjectionRowData rowData) { + @Nullable NestedProjectedRowData rowData) { super(readBuilder, limit, rowData); this.options = options; this.bucketMode = bucketMode; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java index 17761c7fee86..937d54c9f758 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.table.source.TableRead; @@ -50,7 +50,7 @@ public FileStoreSourceReader( FileStoreSourceReaderMetrics metrics, IOManager ioManager, @Nullable Long limit, - @Nullable ProjectionRowData rowData) { + @Nullable NestedProjectedRowData rowData) { // limiter is created in SourceReader, it can be shared in all split readers super( () -> diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java index acb92d35ecf9..9860c1c055fe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.source; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.utils.Reference; @@ -111,7 +111,7 @@ public static void emitRecord( SourceOutput output, FileStoreSourceSplitState state, FileStoreSourceReaderMetrics metrics, - @Nullable ProjectionRowData projectionRowData) { + @Nullable NestedProjectedRowData nestedProjectedRowData) { long timestamp = TimestampAssigner.NO_TIMESTAMP; if (metrics.getLatestFileCreationTime() != FileStoreSourceReaderMetrics.UNDEFINED) { timestamp = metrics.getLatestFileCreationTime(); @@ -134,8 +134,8 @@ public static void emitRecord( } RowData rowData = record.getRecord(); - if (projectionRowData != null) { - rowData = projectionRowData.replaceRow(rowData); + if (nestedProjectedRowData != null) { + rowData = nestedProjectedRowData.replaceRow(rowData); } output.collect(rowData, timestamp); state.setPosition(record); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java index ee2cd769e5eb..5fcfc9b379c6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.ReadBuilder; @@ -44,10 +44,12 @@ public abstract class FlinkSource protected final ReadBuilder readBuilder; @Nullable protected final Long limit; - @Nullable protected final ProjectionRowData rowData; + @Nullable protected final NestedProjectedRowData rowData; public FlinkSource( - ReadBuilder readBuilder, @Nullable Long limit, @Nullable ProjectionRowData rowData) { + ReadBuilder readBuilder, + @Nullable Long limit, + @Nullable NestedProjectedRowData rowData) { this.readBuilder = readBuilder; this.limit = limit; this.rowData = rowData; @@ -65,7 +67,7 @@ public SourceReader createReader(SourceReaderCont sourceReaderMetrics, ioManager, limit, - ProjectionRowData.copy(rowData)); + NestedProjectedRowData.copy(rowData)); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index c67f6a6e5a3c..d99efae0539d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -22,8 +22,8 @@ import org.apache.paimon.CoreOptions.StartupMode; import org.apache.paimon.CoreOptions.StreamingReadMode; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.Projection; -import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource; @@ -257,7 +257,7 @@ private TypeInformation produceTypeInfo() { .orElse(null); } - private @Nullable ProjectionRowData outerProject() { + private @Nullable NestedProjectedRowData outerProject() { return Optional.ofNullable(projectedFields) .map(Projection::of) .map(p -> p.getOuterProjectRow(table.rowType())) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java index f1ade3de9130..bc361cdbf3a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java @@ -20,7 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.options.Options; @@ -74,7 +74,7 @@ public static FlinkSource buildHybridFirstSource( Table table, @Nullable RowType readType, @Nullable Predicate predicate, - @Nullable ProjectionRowData rowData) { + @Nullable NestedProjectedRowData rowData) { if (!(table instanceof DataTable)) { throw new UnsupportedOperationException( String.format( @@ -108,7 +108,7 @@ public FlinkHybridFirstSource( ReadBuilder readBuilder, SnapshotManager snapshotManager, Options options, - @Nullable ProjectionRowData rowData) { + @Nullable NestedProjectedRowData rowData) { super(readBuilder, null, rowData); this.snapshotManager = snapshotManager; this.options = options; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java index 48b977559b2e..624f5434810d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.source; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner; import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; @@ -63,7 +63,7 @@ public StaticFileStoreSource( int splitBatchSize, SplitAssignMode splitAssignMode, @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo, - @Nullable ProjectionRowData rowData) { + @Nullable NestedProjectedRowData rowData) { super(readBuilder, limit, rowData); this.splitBatchSize = splitBatchSize; this.splitAssignMode = splitAssignMode; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index 7e140ee40291..5198bd42136b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -19,9 +19,9 @@ package org.apache.paimon.flink.source; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.PaimonDataStreamScanProvider; import org.apache.paimon.flink.Projection; -import org.apache.paimon.flink.ProjectionRowData; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.BucketMode; @@ -82,7 +82,7 @@ public ChangelogMode getChangelogMode() { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { Source source; - ProjectionRowData rowData = null; + NestedProjectedRowData rowData = null; org.apache.paimon.types.RowType readType = null; if (projectFields != null) { Projection projection = Projection.of(projectFields); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java index 01b6c3f59ece..63b3f63f7f50 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java @@ -21,7 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.ContinuousFileStoreSource; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.PendingSplitsCheckpoint; @@ -54,7 +54,7 @@ public AlignedContinuousFileStoreSource( Map options, @Nullable Long limit, BucketMode bucketMode, - @Nullable ProjectionRowData rowData) { + @Nullable NestedProjectedRowData rowData) { super(readBuilder, options, limit, bucketMode, rowData); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java index 9efd02453da8..7d6f47296a74 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.source.align; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.FileStoreSourceReader; import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.flink.source.FileStoreSourceSplitState; @@ -59,7 +59,7 @@ public AlignedSourceReader( @Nullable Long limit, FutureCompletingBlockingQueue>> elementsQueue, - @Nullable ProjectionRowData rowData) { + @Nullable NestedProjectedRowData rowData) { super(readerContext, tableRead, metrics, ioManager, limit, rowData); this.elementsQueue = elementsQueue; this.nextCheckpointId = null; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java index aecd3b2bc991..2783b0ae0173 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.source.operator; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; import org.apache.paimon.flink.source.SimpleSourceSplit; @@ -212,7 +212,7 @@ public static DataStream buildSource( boolean emitSnapshotWatermark, boolean shuffleBucketWithPartition, BucketMode bucketMode, - ProjectionRowData projectionRowData) { + NestedProjectedRowData nestedProjectedRowData) { SingleOutputStreamOperator singleOutputStreamOperator = env.fromSource( new MonitorSource( @@ -229,7 +229,7 @@ public static DataStream buildSource( singleOutputStreamOperator, shuffleBucketWithPartition); return sourceDataStream.transform( - name + "-Reader", typeInfo, new ReadOperator(readBuilder, projectionRowData)); + name + "-Reader", typeInfo, new ReadOperator(readBuilder, nestedProjectedRowData)); } private static DataStream shuffleUnwareBucket( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 20b9e75313cb..1757a859df44 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -21,7 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkRowData; -import org.apache.paimon.flink.ProjectionRowData; +import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; @@ -50,7 +50,7 @@ public class ReadOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; private final ReadBuilder readBuilder; - @Nullable private final ProjectionRowData projectionRowData; + @Nullable private final NestedProjectedRowData nestedProjectedRowData; private transient TableRead read; private transient StreamRecord reuseRecord; @@ -65,9 +65,10 @@ public class ReadOperator extends AbstractStreamOperator private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; private transient Counter numRecordsIn; - public ReadOperator(ReadBuilder readBuilder, @Nullable ProjectionRowData projectionRowData) { + public ReadOperator( + ReadBuilder readBuilder, @Nullable NestedProjectedRowData nestedProjectedRowData) { this.readBuilder = readBuilder; - this.projectionRowData = projectionRowData; + this.nestedProjectedRowData = nestedProjectedRowData; } @Override @@ -90,8 +91,8 @@ public void open() throws Exception { .getSpillingDirectoriesPaths()); this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); - if (projectionRowData != null) { - this.reuseRecord = new StreamRecord<>(projectionRowData); + if (nestedProjectedRowData != null) { + this.reuseRecord = new StreamRecord<>(nestedProjectedRowData); } else { this.reuseRecord = new StreamRecord<>(reuseRow); } @@ -125,8 +126,8 @@ public void processElement(StreamRecord record) throws Exception { } reuseRow.replace(iterator.next()); - if (projectionRowData != null) { - projectionRowData.replaceRow(this.reuseRow); + if (nestedProjectedRowData != null) { + nestedProjectedRowData.replaceRow(this.reuseRow); } output.collect(reuseRecord); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java index df1c45c81625..c2c0993b1e86 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ProjectionTest.java @@ -63,7 +63,7 @@ public void testNestedProjection() { DataTypes.FIELD(0, "f0", DataTypes.INT()), DataTypes.FIELD(1, "f2", DataTypes.INT())))); - ProjectionRowData rowData = projection.getOuterProjectRow(writeType); + NestedProjectedRowData rowData = projection.getOuterProjectRow(writeType); assertThat(rowData.getRowType()).isEqualTo(toLogicalType(readTypeForFlink)); From b8aec0571edeb7795d308419d09ffa2eadabd732 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Fri, 20 Dec 2024 11:39:56 +0800 Subject: [PATCH 8/8] [flink] Improve naming of arguments in NestedProjectedRowData --- .../paimon/flink/NestedProjectedRowData.java | 114 +++++++++--------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java index 8c2026e24a12..810cc1ae4218 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java @@ -99,134 +99,136 @@ public void setRowKind(RowKind rowKind) { } @Override - public boolean isNullAt(int i) { - if (isNullAtCached[i]) { - return cachedNullAt[i]; + public boolean isNullAt(int pos) { + if (isNullAtCached[pos]) { + return cachedNullAt[pos]; } - RowData rowData = extractInternalRow(i); + RowData rowData = extractInternalRow(pos); boolean result; if (rowData == null) { result = true; } else { - result = rowData.isNullAt(lastProjectedFields[i]); + result = rowData.isNullAt(lastProjectedFields[pos]); } - isNullAtCached[i] = true; - cachedNullAt[i] = result; + isNullAtCached[pos] = true; + cachedNullAt[pos] = result; return result; } @Override - public boolean getBoolean(int i) { - return getFieldAs(i, RowData::getBoolean); + public boolean getBoolean(int pos) { + return getFieldAs(pos, RowData::getBoolean); } @Override - public byte getByte(int i) { - return getFieldAs(i, RowData::getByte); + public byte getByte(int pos) { + return getFieldAs(pos, RowData::getByte); } @Override - public short getShort(int i) { - return getFieldAs(i, RowData::getShort); + public short getShort(int pos) { + return getFieldAs(pos, RowData::getShort); } @Override - public int getInt(int i) { - return getFieldAs(i, RowData::getInt); + public int getInt(int pos) { + return getFieldAs(pos, RowData::getInt); } @Override - public long getLong(int i) { - return getFieldAs(i, RowData::getLong); + public long getLong(int pos) { + return getFieldAs(pos, RowData::getLong); } @Override - public float getFloat(int i) { - return getFieldAs(i, RowData::getFloat); + public float getFloat(int pos) { + return getFieldAs(pos, RowData::getFloat); } @Override - public double getDouble(int i) { - return getFieldAs(i, RowData::getDouble); + public double getDouble(int pos) { + return getFieldAs(pos, RowData::getDouble); } @Override - public StringData getString(int i) { - return getFieldAs(i, RowData::getString); + public StringData getString(int pos) { + return getFieldAs(pos, RowData::getString); } @Override - public DecimalData getDecimal(int i, int i1, int i2) { - return getFieldAs(i, (rowData, j) -> rowData.getDecimal(j, i1, i2)); + public DecimalData getDecimal(int pos, int precision, int scale) { + return getFieldAs( + pos, (rowData, internalPos) -> rowData.getDecimal(internalPos, precision, scale)); } @Override - public TimestampData getTimestamp(int i, int i1) { - return getFieldAs(i, (rowData, j) -> rowData.getTimestamp(j, i1)); + public TimestampData getTimestamp(int pos, int precision) { + return getFieldAs( + pos, (rowData, internalPos) -> rowData.getTimestamp(internalPos, precision)); } @Override - public RawValueData getRawValue(int i) { - return getFieldAs(i, RowData::getRawValue); + public RawValueData getRawValue(int pos) { + return getFieldAs(pos, RowData::getRawValue); } @Override - public byte[] getBinary(int i) { - return getFieldAs(i, RowData::getBinary); + public byte[] getBinary(int pos) { + return getFieldAs(pos, RowData::getBinary); } @Override - public ArrayData getArray(int i) { - return getFieldAs(i, RowData::getArray); + public ArrayData getArray(int pos) { + return getFieldAs(pos, RowData::getArray); } @Override - public MapData getMap(int i) { - return getFieldAs(i, RowData::getMap); + public MapData getMap(int pos) { + return getFieldAs(pos, RowData::getMap); } @Override - public RowData getRow(int i, int i1) { - return getFieldAs(i, (rowData, j) -> rowData.getRow(j, i1)); + public RowData getRow(int pos, int numFields) { + return getFieldAs(pos, (rowData, internalPos) -> rowData.getRow(internalPos, numFields)); } - private @Nullable RowData extractInternalRow(int i) { - int[] projectedField = projectedFields[i]; + private @Nullable RowData extractInternalRow(int pos) { + int[] projectedField = projectedFields[pos]; RowData rowData = this.row; RowType dataType = producedDataType; - for (int j = 0; j < projectedField.length - 1; j++) { - dataType = (RowType) dataType.getTypeAt(projectedField[j]); - if (rowData.isNullAt(projectedField[j])) { + for (int i = 0; i < projectedField.length - 1; i++) { + dataType = (RowType) dataType.getTypeAt(projectedField[i]); + if (rowData.isNullAt(projectedField[i])) { return null; } - rowData = rowData.getRow(projectedField[j], dataType.getFieldCount()); + rowData = rowData.getRow(projectedField[i], dataType.getFieldCount()); } return rowData; } @SuppressWarnings("unchecked") - private T getFieldAs(int i, BiFunction getter) { - if (isFieldsCached[i]) { - return (T) cachedFields[i]; + private T getFieldAs(int pos, BiFunction getter) { + if (isFieldsCached[pos]) { + return (T) cachedFields[pos]; } - RowData rowData = extractInternalRow(i); + RowData rowData = extractInternalRow(pos); T result; if (rowData == null) { - isNullAtCached[i] = true; - cachedNullAt[i] = true; - isFieldsCached[i] = true; - cachedFields[i] = null; + isNullAtCached[pos] = true; + cachedNullAt[pos] = true; + isFieldsCached[pos] = true; + cachedFields[pos] = null; result = null; } else { - result = getter.apply(rowData, lastProjectedFields[i]); - isNullAtCached[i] = true; - cachedNullAt[i] = result == null; - isFieldsCached[i] = true; - cachedFields[i] = result; + result = getter.apply(rowData, lastProjectedFields[pos]); + isNullAtCached[pos] = true; + cachedNullAt[pos] = result == null; + isFieldsCached[pos] = true; + cachedFields[pos] = result; } return result;