Skip to content

Commit

Permalink
[flink] Support nested projection pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Dec 15, 2024
1 parent 62a5f70 commit c4e7e76
Show file tree
Hide file tree
Showing 27 changed files with 808 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.utils.Filter;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -125,18 +124,6 @@ default ReadBuilder withFilter(List<Predicate> predicates) {
*/
ReadBuilder withProjection(int[] projection);

/** Apply projection to the reader, only support top level projection. */
@Deprecated
default ReadBuilder withProjection(int[][] projection) {
if (projection == null) {
return this;
}
if (Arrays.stream(projection).anyMatch(arr -> arr.length > 1)) {
throw new IllegalStateException("Not support nested projection");
}
return withProjection(Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray());
}

/** the row number pushed down. */
ReadBuilder withLimit(int limit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testTagIncremental() throws Exception {
GenericRow.of(fromString("+I"), 1, 6, 1));

// read tag1 tag3 projection
result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
result = read(table, new int[] {1}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6));

assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected List<InternalRow> read(Table table, Pair<ConfigOption<?>, String>... d

protected List<InternalRow> read(
Table table,
@Nullable int[][] projection,
@Nullable int[] projection,
Pair<ConfigOption<?>, String>... dynamicOptions)
throws Exception {
Map<String, String> options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void before() throws Exception {

@Test
public void testBucketsTable() throws Exception {
assertThat(read(bucketsTable, new int[][] {{0}, {1}, {2}, {4}}))
assertThat(read(bucketsTable, new int[] {0, 1, 2, 4}))
.containsExactlyInAnyOrder(
GenericRow.of(BinaryString.fromString("[1]"), 0, 2L, 2L),
GenericRow.of(BinaryString.fromString("[2]"), 0, 2L, 2L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testPartitionRecordCount() throws Exception {
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L));

// Only read partition and record count, record size may not stable.
List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}});
List<InternalRow> result = read(partitionsTable, new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

Expand All @@ -105,7 +105,7 @@ public void testPartitionTimeTravel() throws Exception {
read(
partitionsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")),
new int[][] {{0}, {1}});
new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

Expand All @@ -117,7 +117,7 @@ public void testPartitionValue() throws Exception {
expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L));

List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}, {3}});
List<InternalRow> result = read(partitionsTable, new int[] {0, 1, 3});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@

package org.apache.paimon.flink;

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypeVisitor;

import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* {@link Projection} represents a list of (possibly nested) indexes that can be used to project
Expand All @@ -38,6 +44,11 @@ private Projection() {}

public abstract RowType project(RowType logicalType);

public abstract org.apache.paimon.types.RowType project(
org.apache.paimon.types.RowType rowType);

public abstract ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType);

/** @return {@code true} whether this projection is nested or not. */
public abstract boolean isNested();

Expand Down Expand Up @@ -132,6 +143,16 @@ public RowType project(RowType dataType) {
return new NestedProjection(toNestedIndexes()).project(dataType);
}

@Override
public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) {
return new NestedProjection(toNestedIndexes()).project(rowType);
}

@Override
public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) {
return new NestedProjection(toNestedIndexes()).getRowData(rowType);
}

@Override
public boolean isNested() {
return false;
Expand Down Expand Up @@ -184,6 +205,66 @@ public RowType project(RowType rowType) {
return new RowType(rowType.isNullable(), updatedFields);
}

@Override
public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) {
MutableRowType result =
new MutableRowType(rowType.isNullable(), Collections.emptyList());
for (int[] indexPath : this.projection) {
org.apache.paimon.types.RowType sourceType = rowType;
MutableRowType targetType = result;
int index;
for (index = 0; index < indexPath.length - 1; index++) {
String fieldName = sourceType.getFieldNames().get(indexPath[index]);
DataField field = sourceType.getField(fieldName);
sourceType =
(org.apache.paimon.types.RowType) sourceType.getField(fieldName).type();
if (!targetType.containsField(fieldName)) {
targetType.appendDataField(
fieldName,
field.id(),
new MutableRowType(
sourceType.isNullable(), Collections.emptyList()),
field.description());
}
targetType = (MutableRowType) targetType.getField(fieldName).type();
}

String fieldName = sourceType.getFieldNames().get(indexPath[index]);
DataField field = sourceType.getField(fieldName);
targetType.appendDataField(
fieldName,
field.id(),
sourceType.getField(fieldName).type(),
field.description());
}
return result.toRowType();
}

@Override
public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) {
org.apache.paimon.types.RowType resultType = project(rowType);

int[][] resultIndices = new int[this.projection.length][];
for (int i = 0; i < this.projection.length; i++) {
org.apache.paimon.types.RowType sourceType = rowType;
org.apache.paimon.types.RowType targetType = resultType;
resultIndices[i] = new int[this.projection[i].length];
for (int j = 0; j < this.projection[i].length; j++) {
DataField sourceField = sourceType.getFields().get(this.projection[i][j]);
String fieldName = sourceField.name();
resultIndices[i][j] = targetType.getFieldIndex(fieldName);
if (j < this.projection[i].length - 1) {
targetType =
(org.apache.paimon.types.RowType)
targetType.getField(fieldName).type();
sourceType = (org.apache.paimon.types.RowType) sourceField.type();
}
}
}

return new ProjectionRowData(resultType, resultIndices);
}

@Override
public boolean isNested() {
return nested;
Expand Down Expand Up @@ -217,6 +298,16 @@ public RowType project(RowType dataType) {
return new NestedProjection(toNestedIndexes()).project(dataType);
}

@Override
public org.apache.paimon.types.RowType project(org.apache.paimon.types.RowType rowType) {
return new NestedProjection(toNestedIndexes()).project(rowType);
}

@Override
public ProjectionRowData getRowData(org.apache.paimon.types.RowType rowType) {
return new NestedProjection(toNestedIndexes()).getRowData(rowType);
}

@Override
public boolean isNested() {
return false;
Expand All @@ -232,4 +323,113 @@ public int[][] toNestedIndexes() {
return Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new);
}
}

/**
* A mutable version of {@link org.apache.paimon.types.RowType} to facilitate the building
* process of projections.
*
* <p>It is mutable in aspect of the {@link #appendDataField} method.
*/
public static class MutableRowType extends org.apache.paimon.types.DataType {
private final List<DataField> fields;
private final boolean isNullable;

public MutableRowType(org.apache.paimon.types.RowType rowType) {
this(rowType.isNullable(), rowType.getFields());
}

public MutableRowType(boolean isNullable, List<DataField> fields) {
super(isNullable, DataTypeRoot.ROW);
this.fields = new ArrayList<>(fields);
this.isNullable = isNullable;
}

public org.apache.paimon.types.RowType toRowType() {
for (int i = 0; i < fields.size(); i++) {
DataField field = fields.get(i);
if (field.type() instanceof MutableRowType) {
fields.set(
i,
new DataField(
field.id(),
field.name(),
((MutableRowType) field.type()).toRowType(),
field.description()));
}
}
return new org.apache.paimon.types.RowType(isNullable, fields);
}

public List<DataField> getFields() {
return fields;
}

public List<String> getFieldNames() {
return fields.stream().map(DataField::name).collect(Collectors.toList());
}

public List<org.apache.paimon.types.DataType> getFieldTypes() {
return fields.stream().map(DataField::type).collect(Collectors.toList());
}

public int getFieldCount() {
return fields.size();
}

public boolean containsField(String fieldName) {
for (DataField field : fields) {
if (field.name().equals(fieldName)) {
return true;
}
}
return false;
}

public DataField getField(String fieldName) {
for (DataField field : fields) {
if (field.name().equals(fieldName)) {
return field;
}
}

throw new RuntimeException("Cannot find field: " + fieldName);
}

public DataField getField(int fieldId) {
for (DataField field : fields) {
if (field.id() == fieldId) {
return field;
}
}
throw new RuntimeException("Cannot find field by field id: " + fieldId);
}

public void appendDataField(
String name, int newId, org.apache.paimon.types.DataType type, String description) {
if (type instanceof org.apache.paimon.types.RowType) {
type = new MutableRowType((org.apache.paimon.types.RowType) type);
}
fields.add(new DataField(newId, name, type, description));
}

@Override
public int defaultSize() {
throw new UnsupportedOperationException();
}

@Override
public org.apache.paimon.types.DataType copy(boolean isNullable) {
throw new UnsupportedOperationException();
}

@Override
public String asSQLString() {
throw new UnsupportedOperationException();
}

@Override
public <R> R accept(DataTypeVisitor<R> visitor) {
throw new UnsupportedOperationException();
}
}
}
Loading

0 comments on commit c4e7e76

Please sign in to comment.