Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Support nested projection pushdown #4667

Merged
merged 8 commits into from
Dec 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.utils.Filter;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -125,18 +124,6 @@ default ReadBuilder withFilter(List<Predicate> predicates) {
*/
ReadBuilder withProjection(int[] projection);

/** Apply projection to the reader, only support top level projection. */
@Deprecated
default ReadBuilder withProjection(int[][] projection) {
if (projection == null) {
return this;
}
if (Arrays.stream(projection).anyMatch(arr -> arr.length > 1)) {
throw new IllegalStateException("Not support nested projection");
}
return withProjection(Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray());
}

/** the row number pushed down. */
ReadBuilder withLimit(int limit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,18 @@ private List<DataField> readDataFields(List<DataField> allDataFields) {
.filter(f -> f.id() == dataField.id())
.findFirst()
.ifPresent(
field ->
readDataFields.add(
dataField.newType(
pruneDataType(
field.type(), dataField.type()))));
field -> {
DataType prunedType =
pruneDataType(field.type(), dataField.type());
if (prunedType != null) {
readDataFields.add(dataField.newType(prunedType));
}
});
}
return readDataFields;
}

@Nullable
private DataType pruneDataType(DataType readType, DataType dataType) {
switch (readType.getTypeRoot()) {
case ROW:
Expand All @@ -261,25 +264,40 @@ private DataType pruneDataType(DataType readType, DataType dataType) {
for (DataField rf : r.getFields()) {
if (d.containsField(rf.id())) {
DataField df = d.getField(rf.id());
newFields.add(df.newType(pruneDataType(rf.type(), df.type())));
DataType newType = pruneDataType(rf.type(), df.type());
if (newType == null) {
continue;
}
newFields.add(df.newType(newType));
}
}
if (newFields.isEmpty()) {
// When all fields are pruned, we should not return an empty row type
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @nullable to pruneDataType

}
return d.copy(newFields);
case MAP:
return ((MapType) dataType)
.newKeyValueType(
pruneDataType(
((MapType) readType).getKeyType(),
((MapType) dataType).getKeyType()),
pruneDataType(
((MapType) readType).getValueType(),
((MapType) dataType).getValueType()));
DataType keyType =
pruneDataType(
((MapType) readType).getKeyType(),
((MapType) dataType).getKeyType());
DataType valueType =
pruneDataType(
((MapType) readType).getValueType(),
((MapType) dataType).getValueType());
if (keyType == null || valueType == null) {
return null;
}
return ((MapType) dataType).newKeyValueType(keyType, valueType);
case ARRAY:
return ((ArrayType) dataType)
.newElementType(
pruneDataType(
((ArrayType) readType).getElementType(),
((ArrayType) dataType).getElementType()));
DataType elementType =
pruneDataType(
((ArrayType) readType).getElementType(),
((ArrayType) dataType).getElementType());
if (elementType == null) {
return null;
}
return ((ArrayType) dataType).newElementType(elementType);
default:
return dataType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testTagIncremental() throws Exception {
GenericRow.of(fromString("+I"), 1, 6, 1));

// read tag1 tag3 projection
result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
result = read(table, new int[] {1}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6));

assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected List<InternalRow> read(Table table, Pair<ConfigOption<?>, String>... d

protected List<InternalRow> read(
Table table,
@Nullable int[][] projection,
@Nullable int[] projection,
Pair<ConfigOption<?>, String>... dynamicOptions)
throws Exception {
Map<String, String> options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void before() throws Exception {

@Test
public void testBucketsTable() throws Exception {
assertThat(read(bucketsTable, new int[][] {{0}, {1}, {2}, {4}}))
assertThat(read(bucketsTable, new int[] {0, 1, 2, 4}))
.containsExactlyInAnyOrder(
GenericRow.of(BinaryString.fromString("[1]"), 0, 2L, 2L),
GenericRow.of(BinaryString.fromString("[2]"), 0, 2L, 2L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testPartitionRecordCount() throws Exception {
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L));

// Only read partition and record count, record size may not stable.
List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}});
List<InternalRow> result = read(partitionsTable, new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

Expand All @@ -105,7 +105,7 @@ public void testPartitionTimeTravel() throws Exception {
read(
partitionsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")),
new int[][] {{0}, {1}});
new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

Expand All @@ -117,7 +117,7 @@ public void testPartitionValue() throws Exception {
expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L));

List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}, {3}});
List<InternalRow> result = read(partitionsTable, new int[] {0, 1, 3});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}
}
Loading
Loading