Skip to content

Commit

Permalink
[Feature][transform] sql transform support lateral view explode
Browse files Browse the repository at this point in the history
  • Loading branch information
njh_cmss committed Nov 6, 2024
1 parent 97ea914 commit 27f320d
Showing 1 changed file with 50 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,53 +734,68 @@ private List<SeaTunnelRow> explode(
for (SeaTunnelRow row : seaTunnelRows) {
int fieldIndex = outRowType.indexOf(column);
Object splitFieldValue = row.getField(fieldIndex);
if (splitFieldValue == null) {
continue;
}
if (splitFieldValue instanceof Object[]) {
Object[] rowList = (Object[]) splitFieldValue;
if (ArrayUtils.isEmpty(rowList) && isUsingOuter) {
next.add(copy(outRowType.getTotalFields(), row, aliasFieldIndex, null));
} else {
for (Object fieldValue : rowList) {
next.add(
copy(
outRowType.getTotalFields(),
row,
aliasFieldIndex,
fieldValue));
}
}
}
transformExplodeValue(
splitFieldValue,
outRowType,
isUsingOuter,
next,
aliasFieldIndex,
row,
expression);
}
seaTunnelRows = next;
} else if (expression instanceof Function) {
List<SeaTunnelRow> next = new ArrayList<>();
for (SeaTunnelRow row : seaTunnelRows) {
Object values = computeForValue(expression, row.getFields());
if (values.getClass().isArray()) {
for (Object fieldValue : (Object[]) values) {
next.add(
copy(
outRowType.getTotalFields(),
row,
aliasFieldIndex,
fieldValue.toString()));
}
} else {
throw new SeaTunnelRuntimeException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"Transform config error! UnSupport explode function:"
+ ((Function) expression).getName());
}
Object splitFieldValue = computeForValue(expression, row.getFields());
transformExplodeValue(
splitFieldValue,
outRowType,
isUsingOuter,
next,
aliasFieldIndex,
row,
expression);
}
seaTunnelRows = next;
}
}
return seaTunnelRows;
}

private SeaTunnelRow copy(int length, SeaTunnelRow row, int fieldIndex, Object fieldValue) {
private void transformExplodeValue(
Object splitFieldValue,
SeaTunnelRowType outRowType,
boolean isUsingOuter,
List<SeaTunnelRow> next,
int aliasFieldIndex,
SeaTunnelRow row,
Expression expression) {
if (splitFieldValue == null) {
if (isUsingOuter) {
next.add(copySeaTunnelRow(outRowType.getTotalFields(), row, aliasFieldIndex, null));
}
return;
}
if (splitFieldValue.getClass().isArray()) {
for (Object fieldValue : (Object[]) splitFieldValue) {
next.add(
copySeaTunnelRow(
outRowType.getTotalFields(),
row,
aliasFieldIndex,
fieldValue.toString()));
}
} else {
throw new SeaTunnelRuntimeException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"Transform config error! UnSupport explode function:"
+ ((Function) expression).getName());
}
}

private SeaTunnelRow copySeaTunnelRow(
int length, SeaTunnelRow row, int fieldIndex, Object fieldValue) {
Object[] fields = new Object[length];
System.arraycopy(row.getFields(), 0, fields, 0, row.getFields().length);
SeaTunnelRow outputRow = new SeaTunnelRow(fields);
Expand Down

0 comments on commit 27f320d

Please sign in to comment.