From a01cb42d220ad590d0740f5559bfcf9f35d6aaad Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Fri, 27 Oct 2023 16:49:52 -0700 Subject: [PATCH 1/3] Refactor of table path walking Signed-off-by: Daniel Henneberger --- .../datasqrl/calcite/NormalizeTablePath.java | 223 ++++++++++++ .../java/com/datasqrl/calcite/SqrlToSql.java | 79 +++- .../datasqrl/calcite/TablePathBuilder.java | 339 ------------------ .../datasqrl/calcite/plan/ScriptPlanner.java | 15 +- .../calcite/schema/sql/SqlBuilders.java | 11 +- .../com/datasqrl/calcite/sqrl/PathToSql.java | 72 ++++ .../ResolveTest/tableFunctionsBasic.txt | 6 +- 7 files changed, 381 insertions(+), 364 deletions(-) create mode 100644 sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/NormalizeTablePath.java delete mode 100644 sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/TablePathBuilder.java create mode 100644 sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/sqrl/PathToSql.java diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/NormalizeTablePath.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/NormalizeTablePath.java new file mode 100644 index 000000000..faf45abee --- /dev/null +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/NormalizeTablePath.java @@ -0,0 +1,223 @@ +package com.datasqrl.calcite; + +import static com.datasqrl.plan.validate.ScriptValidator.isSelfField; + +import com.datasqrl.calcite.SqrlToSql.Context; +import com.datasqrl.calcite.schema.PathWalker; +import com.datasqrl.calcite.schema.sql.SqlDataTypeSpecBuilder; +import com.datasqrl.calcite.sqrl.CatalogResolver; +import com.datasqrl.canonicalizer.ReservedName; +import com.datasqrl.function.SqrlFunctionParameter; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Value; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.FunctionParameter; +import org.apache.calcite.schema.TableFunction; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlDynamicParam; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction; + +/** + * Converts a table path to a normalized representation. + */ +public class NormalizeTablePath { + private static final String ALIAS_PREFIX = "_t"; + private final CatalogResolver catalogResolver; + private final Map paramMapping; + final AtomicInteger aliasInt = new AtomicInteger(0); + + + public NormalizeTablePath(CatalogResolver catalogResolver, Map paramMapping) { + this.catalogResolver = catalogResolver; + this.paramMapping = paramMapping; + } + + public TablePathResult convert(List items, Context context, List parameters) { + // Map items to higher level objects, then walk the objects + List params = new ArrayList<>(parameters); + PathWalker pathWalker = new PathWalker(catalogResolver); + + List pathItems = mapToPathItems(context, items, params, pathWalker); + return new TablePathResult(params, pathItems, pathWalker.getAbsolutePath()); + } + + private List mapToPathItems(Context context, List items, List params, + PathWalker pathWalker) { + List pathItems = new ArrayList<>(); + Iterator input = items.iterator(); + + SqlNode item = input.next(); + String identifier = getIdentifier(item) + .orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented")); + + String alias; + if (catalogResolver.getTableFunction(List.of(identifier)).isPresent()) { + SqlUserDefinedTableFunction op = catalogResolver.getTableFunction(List.of(identifier)).get(); + alias = generateAlias(); + pathItems.add(new TableFunctionPathItem(pathWalker.getPath(), op, List.of(), alias)); + pathWalker.walk(identifier); + } else if (context.hasAlias(identifier)) { + // For tables that start with an alias e.g. `o.entries` + if (!input.hasNext()) { + throw new RuntimeException("Alias by itself."); + } + alias = generateAlias(); + // Get absolute path of alias `o` + pathWalker.setPath(context.getAliasPath(identifier)); + String nextIdentifier = getIdentifier(input.next()) + .orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented")); + // Update path walker + pathWalker.walk(nextIdentifier); + + // Lookup function + Optional fnc = catalogResolver.getTableFunction(pathWalker.getPath()); + Preconditions.checkState(fnc.isPresent(), "Table function not found %s", pathWalker.getPath()); + + // Rewrite arguments so internal arguments are prefixed with the alias + List args = rewriteArgs(identifier, fnc.get().getFunction(), context, params); + + pathItems.add(new TableFunctionPathItem(pathWalker.getPath(), fnc.get(), args, alias)); + } else if (identifier.equals(ReservedName.SELF_IDENTIFIER.getCanonical())) { + //Tables that start with '@' + pathWalker.setPath(context.getCurrentPath()); + // Treat '@' as something to add to the table path list + boolean materializeSelf = context.isMaterializeSelf(); + + if (materializeSelf || !input.hasNext()) { + // Do a table scan on the source table + RelOptTable table = catalogResolver.getTableFromPath(pathWalker.getPath()); + pathItems.add(new SelfTablePathItem(pathWalker.getPath(), table)); + alias = ReservedName.SELF_IDENTIFIER.getCanonical(); + } else { + String nextIdentifier = getIdentifier(input.next()) + .orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented")); + pathWalker.walk(nextIdentifier); + + Optional fnc = catalogResolver.getTableFunction(pathWalker.getAbsolutePath()); + Preconditions.checkState(fnc.isPresent(), "Table function not found %s", pathWalker.getPath()); + + alias = generateAlias(); + // Rewrite arguments + List args = rewriteArgs(ReservedName.SELF_IDENTIFIER.getCanonical(), fnc.get().getFunction(), context, + params); + pathItems.add(new TableFunctionPathItem(pathWalker.getPath(), fnc.get(), args, alias)); + } + } else { + throw new RuntimeException("Unknown table: " + item); + } + + while (input.hasNext()) { + item = input.next(); + String nextIdentifier = getIdentifier(item) + .orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented")); + pathWalker.walk(nextIdentifier); + + Optional fnc = catalogResolver.getTableFunction(pathWalker.getPath()); + List args = rewriteArgs(alias, fnc.get().getFunction(), context, params); + String newAlias = generateAlias(); + pathItems.add(new TableFunctionPathItem(pathWalker.getPath(), fnc.get(), args, newAlias)); + alias = newAlias; + } + + return pathItems; + } + + private List rewriteArgs(String alias, TableFunction function, Context context, + List params) { + //if arg needs to by a dynamic expression, rewrite. + List nodes = new ArrayList<>(); + for (FunctionParameter parameter : function.getParameters()) { + SqrlFunctionParameter p = (SqrlFunctionParameter) parameter; + SqlIdentifier identifier = new SqlIdentifier(List.of(alias, p.getName()), + SqlParserPos.ZERO); + SqlNode rewritten = context.isMaterializeSelf() + ? identifier + : rewriteToDynamicParam(identifier, params); + nodes.add(rewritten); + } + return nodes; + } + + public SqlNode rewriteToDynamicParam(SqlIdentifier id, List params) { + //if self, check if param list, if not create one + if (!isSelfField(id.names)) { + return id; + } + + for (FunctionParameter p : params) { + if (paramMapping.containsKey(p)) { + return paramMapping.get(p); + } + } + + RelDataType anyType = catalogResolver.getTypeFactory().createSqlType(SqlTypeName.ANY); + SqrlFunctionParameter functionParameter = new SqrlFunctionParameter(id.names.get(1), Optional.empty(), + SqlDataTypeSpecBuilder.create(anyType), params.size(), anyType, true); + params.add(functionParameter); + SqlDynamicParam param = new SqlDynamicParam(functionParameter.getOrdinal(), + id.getParserPosition()); + paramMapping.put(functionParameter, param); + + return param; + } + + private Optional getIdentifier(SqlNode item) { + if (item instanceof SqlIdentifier) { + return Optional.of(((SqlIdentifier) item).getSimple()); + } else if (item instanceof SqlCall) { + return Optional.of(((SqlCall) item).getOperator().getName()); + } + + return Optional.empty(); + } + + private String generateAlias() { + return ALIAS_PREFIX + aliasInt.incrementAndGet(); + } + + + public interface PathItem { + String getAlias(); + } + + @AllArgsConstructor + @Getter + public class TableFunctionPathItem implements PathItem { + List path; + SqlUserDefinedTableFunction op; + List arguments; + String alias; + } + + @AllArgsConstructor + @Getter + public class SelfTablePathItem implements PathItem { + List path; + RelOptTable table; + + @Override + public String getAlias() { + return ReservedName.SELF_IDENTIFIER.getCanonical(); + } + } + + @Value + public class TablePathResult { + List params; + List pathItems; + List path; + } +} diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/SqrlToSql.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/SqrlToSql.java index b4f1d6146..87d2ab82e 100644 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/SqrlToSql.java +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/SqrlToSql.java @@ -5,12 +5,17 @@ import com.datasqrl.calcite.SqrlToSql.Context; import com.datasqrl.calcite.SqrlToSql.Result; +import com.datasqrl.calcite.NormalizeTablePath.PathItem; +import com.datasqrl.calcite.NormalizeTablePath.SelfTablePathItem; +import com.datasqrl.calcite.NormalizeTablePath.TablePathResult; import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlAliasCallBuilder; import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlCallBuilder; import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlJoinBuilder; import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlSelectBuilder; +import com.datasqrl.calcite.sqrl.PathToSql; import com.datasqrl.calcite.visitor.SqlNodeVisitor; import com.datasqrl.calcite.visitor.SqlRelationVisitor; +import com.datasqrl.canonicalizer.ReservedName; import com.datasqrl.plan.hints.TopNHint.Type; import com.google.common.base.Preconditions; import java.util.ArrayList; @@ -22,12 +27,15 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.AllArgsConstructor; import lombok.Value; +import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.schema.FunctionParameter; import org.apache.calcite.sql.CalciteFixes; +import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlHint; @@ -48,15 +56,18 @@ public class SqrlToSql implements SqlRelationVisitor { private final CatalogReader catalogReader; private final SqlOperatorTable operatorTable; - private final TablePathBuilder tablePathBuilder; + private final NormalizeTablePath normalizeTablePath; private List parameters; + private final AtomicInteger uniquePkId; public SqrlToSql(CatalogReader catalogReader, SqlOperatorTable operatorTable, - TablePathBuilder tablePathBuilder, List parameters) { + NormalizeTablePath normalizeTablePath, List parameters, + AtomicInteger uniquePkId) { this.catalogReader = catalogReader; this.operatorTable = operatorTable; - this.tablePathBuilder = tablePathBuilder; + this.normalizeTablePath = normalizeTablePath; this.parameters = parameters; + this.uniquePkId = uniquePkId; } public Result rewrite(SqlNode query, boolean materializeSelf, List currentPath) { @@ -102,8 +113,7 @@ public Result visitQuerySpecification(SqlSelect call, Context context) { result.condition.ifPresent(sqlSelectBuilder::appendWhere); SqlSelect top = new SqlSelectBuilder().setFrom(sqlSelectBuilder.build()).setDistinctOnHint(hintOps).build(); return new Result(top, result.getCurrentPath(), List.of(), List.of(), Optional.empty(), result.params); - } else if (call.isKeywordPresent(SqlSelectKeyword.DISTINCT) || - (context.isNested() && call.getFetch() != null)) { + } else if (call.isKeywordPresent(SqlSelectKeyword.DISTINCT) || (context.isNested() && call.getFetch() != null)) { //if is nested, get primary key nodes int keySize = context.isNested() ? catalogReader.getTableFromPath(context.currentPath).getKeys().get(0) @@ -234,9 +244,62 @@ public Result visitTable(SqlIdentifier node, Context context) { .map(name -> new SqlIdentifier(name, node.getComponentParserPosition(node.names.indexOf(name)))) .collect(Collectors.toList()); - Result result = tablePathBuilder.build(items, context, parameters); + TablePathResult result = normalizeTablePath.convert(items, context, parameters); + + PathToSql pathToSql = new PathToSql(); + SqlNode sqlNode = pathToSql.build(result.getPathItems()); + + List pullupColumns = (context.isNested && result.getPathItems().get(0) instanceof SelfTablePathItem) + ? buildPullupColumns((SelfTablePathItem)result.getPathItems().get(0)) + : List.of(); + // Wrap in a select to maintain sql semantics + if (!pullupColumns.isEmpty() || result.getPathItems().size() > 1) { + sqlNode = buildAndProjectLast(pullupColumns, sqlNode, result.getPathItems().get(0), + result.getPathItems().get(result.getPathItems().size()-1)); + } else if (result.getPathItems().size() == 1) { + //Just a table by itself + if (sqlNode instanceof SqlBasicCall) { + sqlNode = ((SqlBasicCall) sqlNode).getOperandList().get(0); + } + } + this.parameters = result.getParams(); //update parameters with new parameters - return result; + return new Result(sqlNode, result.getPath(), pullupColumns, + List.of(), Optional.empty(), result.getParams()); + } + + + public SqlNode buildAndProjectLast(List pullupCols, SqlNode sqlNode, + PathItem first, PathItem last) { + + SqlSelectBuilder select = new SqlSelectBuilder() + .setFrom(sqlNode); + + List selectList = new ArrayList<>(); + for (PullupColumn column : pullupCols) { + selectList.add( + SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, + new SqlIdentifier(List.of(first.getAlias(), column.getColumnName()), SqlParserPos.ZERO), + new SqlIdentifier(column.getDisplayName(), SqlParserPos.ZERO))); + } + + selectList.add(new SqlIdentifier(List.of(last.getAlias(), ""), SqlParserPos.ZERO)); + + select.setSelectList(selectList); + return select.build(); + } + + private List buildPullupColumns(SelfTablePathItem selfTablePathItem) { + RelOptTable table = selfTablePathItem.table; + return IntStream.range(0, table.getKeys().get(0).asSet().size()) + .mapToObj(i -> new PullupColumn( + table.getRowType().getFieldList().get(i).getName(), + String.format("%spk%d$%s", ReservedName.SYSTEM_HIDDEN_PREFIX, uniquePkId.incrementAndGet(), + table.getRowType().getFieldList().get(i).getName()), + String.format("%spk%d$%s", ReservedName.SYSTEM_HIDDEN_PREFIX, i + 1, + table.getRowType().getFieldList().get(i).getName()) + )) + .collect(Collectors.toList()); } @Override @@ -314,9 +377,9 @@ public Result visitSetOperation(SqlCall node, Context context) { @Value public static class PullupColumn { - String columnName; String displayName; + String finalName; } @AllArgsConstructor diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/TablePathBuilder.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/TablePathBuilder.java deleted file mode 100644 index f087d428f..000000000 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/TablePathBuilder.java +++ /dev/null @@ -1,339 +0,0 @@ -package com.datasqrl.calcite; - -import static com.datasqrl.plan.validate.ScriptValidator.isSelfField; - -import com.datasqrl.calcite.SqrlToSql.Context; -import com.datasqrl.calcite.SqrlToSql.PullupColumn; -import com.datasqrl.calcite.SqrlToSql.Result; -import com.datasqrl.calcite.schema.PathWalker; -import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlSelectBuilder; -import com.datasqrl.calcite.schema.sql.SqlDataTypeSpecBuilder; -import com.datasqrl.calcite.schema.sql.SqlJoinPathBuilder; -import com.datasqrl.calcite.sqrl.CatalogResolver; -import com.datasqrl.canonicalizer.ReservedName; -import com.datasqrl.function.SqrlFunctionParameter; -import com.datasqrl.util.CalciteUtil.RelDataTypeFieldBuilder; -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Stack; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import lombok.Getter; -import lombok.Value; -import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.schema.FunctionParameter; -import org.apache.calcite.schema.TableFunction; -import org.apache.calcite.sql.JoinConditionType; -import org.apache.calcite.sql.JoinType; -import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlDynamicParam; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlJoin; -import org.apache.calcite.sql.SqlLiteral; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction; -import org.apache.commons.collections.ListUtils; - -public class TablePathBuilder { - private static final String ALIAS_PREFIX = "_t"; - private final CatalogResolver catalogResolver; - private final Map paramMapping; - private final AtomicInteger uniquePkId; - - @Getter - final List tableHistory = new ArrayList<>(); - final Stack stack = new Stack<>(); - final AtomicInteger aliasInt = new AtomicInteger(0); - - - public TablePathBuilder(CatalogResolver catalogResolver, Map paramMapping, - AtomicInteger uniquePkId) { - this.catalogResolver = catalogResolver; - this.paramMapping = paramMapping; - this.uniquePkId = uniquePkId; - } - - public Result build(List items, Context context, List parameters) { - List params = new ArrayList<>(parameters); - Iterator input = items.iterator(); - PathWalker pathWalker = new PathWalker(catalogResolver); - List pullupColumns = processFirstItem(input, pathWalker, context, params); - processRemainingItems(input, pathWalker, context, params); - SqlNode sqlNode = buildAndProjectLast(pullupColumns); - return createResult(sqlNode, pathWalker.getAbsolutePath(), pullupColumns, params); - } - - private List processFirstItem(Iterator input, PathWalker pathWalker, - Context context, List params) { - SqlNode item = input.next(); - String identifier = getIdentifier(item) - .orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented")); - - if (catalogResolver.getTableFunction(List.of(identifier)).isPresent()) { - pathWalker.walk(identifier); - scanFunction(pathWalker.getPath(), List.of()); - } else if (context.hasAlias(identifier)) { - handleAlias(input, pathWalker, context, identifier, params); - } else if (identifier.equals(ReservedName.SELF_IDENTIFIER.getCanonical())) { - return handleSelf(input, pathWalker, context, params); - } else { - throw new RuntimeException("Unknown table: " + item); - } - - return List.of(); - } - - private void handleAlias(Iterator input, PathWalker pathWalker, Context context, String identifier, - List params) { - if (!input.hasNext()) { - throw new RuntimeException("Alias by itself."); - } - - pathWalker.setPath(context.getAliasPath(identifier)); - String nextIdentifier = getIdentifier(input.next()) - .orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented")); - pathWalker.walk(nextIdentifier); - - Optional fnc = catalogResolver.getTableFunction(pathWalker.getPath()); - Preconditions.checkState(fnc.isPresent(), "Table function not found %s", pathWalker.getPath()); - List args = rewriteArgs(identifier, fnc.get().getFunction(), context, params); - scanFunction(fnc.get(), args); - } - - private List handleSelf(Iterator input, PathWalker pathWalker, Context context, - List params) { - pathWalker.setPath(context.getCurrentPath()); - boolean isNested = context.isNested(); - boolean materializeSelf = context.isMaterializeSelf(); - - if (materializeSelf || !input.hasNext()) { - scanNestedTable(context.getCurrentPath()); - if (isNested) { - return buildPullupColumns(pathWalker); - } - } else { - String nextIdentifier = getIdentifier(input.next()) - .orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented")); - pathWalker.walk(nextIdentifier); - - Optional fnc = catalogResolver.getTableFunction(pathWalker.getAbsolutePath()); - Preconditions.checkState(fnc.isPresent(), "Table function not found %s", pathWalker.getPath()); - List args = rewriteArgs(ReservedName.SELF_IDENTIFIER.getCanonical(), fnc.get().getFunction(), context, - params); - scanFunction(fnc.get(), args); - } - return List.of(); - } - - private List buildPullupColumns(PathWalker pathWalker) { - RelOptTable table = catalogResolver.getTableFromPath(pathWalker.getAbsolutePath()); - return IntStream.range(0, table.getKeys().get(0).asSet().size()) - .mapToObj(i -> new PullupColumn( - String.format("%spk%d$%s", ReservedName.SYSTEM_HIDDEN_PREFIX, uniquePkId.incrementAndGet(), table.getRowType().getFieldList().get(i).getName()), - String.format("%spk%d$%s", ReservedName.SYSTEM_HIDDEN_PREFIX, i + 1, table.getRowType().getFieldList().get(i).getName()) - )) - .collect(Collectors.toList()); - } - - private void processRemainingItems(Iterator input, PathWalker pathWalker, - Context context, List params) { - - while (input.hasNext()) { - SqlNode item = input.next(); - String nextIdentifier = getIdentifier(item) - .orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented")); - pathWalker.walk(nextIdentifier); - - String alias = getLatestAlias(); - Optional fnc = catalogResolver.getTableFunction(pathWalker.getPath()); - if (fnc.isEmpty()) { - scanNestedTable(pathWalker.getPath()); - } else { - List args = rewriteArgs(alias, fnc.get().getFunction(), context, - params); - scanFunction(fnc.get(), args); - joinLateral(); - } - } - } - - private Result createResult(SqlNode sqlNode, List path, List pullupColumns, - List params) { - return new Result(sqlNode, path, pullupColumns, List.of(), Optional.empty(), params); - } - - private List rewriteArgs(String alias, TableFunction function, Context context, - List params) { - //if arg needs to by a dynamic expression, rewrite. - List nodes = new ArrayList<>(); - for (FunctionParameter parameter : function.getParameters()) { - SqrlFunctionParameter p = (SqrlFunctionParameter) parameter; - SqlIdentifier identifier = new SqlIdentifier(List.of(alias, p.getName()), - SqlParserPos.ZERO); - SqlNode rewritten = context.isMaterializeSelf() - ? identifier - : rewriteToDynamicParam(identifier, params); - nodes.add(rewritten); - } - return nodes; - } - - public SqlNode rewriteToDynamicParam(SqlIdentifier id, List params) { - //if self, check if param list, if not create one - if (!isSelfField(id.names)) { - return id; - } - - for (FunctionParameter p : params) { - if (paramMapping.containsKey(p)) { - return paramMapping.get(p); - } - } - - RelDataType anyType = catalogResolver.getTypeFactory().createSqlType(SqlTypeName.ANY); - SqrlFunctionParameter functionParameter = new SqrlFunctionParameter(id.names.get(1), Optional.empty(), - SqlDataTypeSpecBuilder.create(anyType), params.size(), anyType, true); - params.add(functionParameter); - SqlDynamicParam param = new SqlDynamicParam(functionParameter.getOrdinal(), - id.getParserPosition()); - paramMapping.put(functionParameter, param); - - return param; - } - - private Optional getIdentifier(SqlNode item) { - if (item instanceof SqlIdentifier) { - return Optional.of(((SqlIdentifier) item).getSimple()); - } else if (item instanceof SqlCall) { - return Optional.of(((SqlCall) item).getOperator().getName()); - } - - return Optional.empty(); - } - - private String generateAlias() { - return ALIAS_PREFIX + aliasInt.incrementAndGet(); - } - public void scanFunction(List path, List args) { - Optional op = catalogResolver.getTableFunction(path); - if (op.isEmpty() && args.isEmpty()) { - scanNestedTable(path); - return; - } - if (op.isEmpty()) { - throw new RuntimeException(String.format("Could not find table: %s", path)); - } - - scanFunction(op.get(), args); - } - - public void scanFunction(SqlUserDefinedTableFunction op, - List args) { - RelDataType type = op.getFunction().getRowType(null, null); - SqlCall call = op.createCall(SqlParserPos.ZERO, args); - - call = SqlStdOperatorTable.COLLECTION_TABLE.createCall(SqlParserPos.ZERO, call); - - String alias = generateAlias(); - SqlCall aliasedCall = SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, call, new SqlIdentifier(alias, SqlParserPos.ZERO)); - Frame frame = new Frame(false, type, aliasedCall, alias); - stack.push(frame); - tableHistory.add(frame); - } - - public void joinLateral() { - Frame right = stack.pop(); - Frame left = stack.pop(); - - SqlJoin join = new SqlJoin(SqlParserPos.ZERO, - left.getNode(), - SqlLiteral.createBoolean(false, SqlParserPos.ZERO), - JoinType.DEFAULT.symbol(SqlParserPos.ZERO), - SqlStdOperatorTable.LATERAL.createCall(SqlParserPos.ZERO, right.getNode()), - JoinConditionType.NONE.symbol(SqlParserPos.ZERO), - null); - - RelDataTypeFieldBuilder builder = new RelDataTypeFieldBuilder( - new FieldInfoBuilder(catalogResolver.getTypeFactory())); - builder.addAll(left.getType().getFieldList()); - builder.addAll(right.getType().getFieldList()); - RelDataType type = builder.build(); - - Frame frame = new Frame(right.subquery, type, join, right.getAlias()); - stack.push(frame); - } - - public SqlNode build() { - Frame frame = stack.pop(); - return frame.getNode(); - } - public SqlNode buildAndProjectLast(List pullupCols) { - Frame frame = stack.pop(); - Frame lastTable = tableHistory.get(tableHistory.size()-1); - if (frame.isSubquery()) { //subquery - return frame.getNode(); - } - SqlSelectBuilder select = new SqlSelectBuilder() - .setFrom(frame.getNode()); - select.setSelectList(ListUtils.union( - rename(createSelectList(tableHistory.get(0), pullupCols.size()), pullupCols), - createSelectList(lastTable, lastTable.type.getFieldCount()))); - return select.build(); - } - - private List rename(List selectList, List pullupCols) { - return IntStream.range(0, selectList.size()) - .mapToObj(i-> SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, selectList.get(i), - new SqlIdentifier(pullupCols.get(i).getColumnName(), SqlParserPos.ZERO))) - .collect(Collectors.toList()); - } - - private List createSelectList(Frame frame, int count) { - return IntStream.range(0, count) - .mapToObj(i-> - //todo fix: alias has null check for no alias on subquery - (frame.alias == null) - ? new SqlIdentifier(List.of( frame.getType().getFieldList().get(i).getName()), SqlParserPos.ZERO ) - : new SqlIdentifier(List.of(frame.alias, frame.getType().getFieldList().get(i).getName()), SqlParserPos.ZERO ) - ) - .collect(Collectors.toList()); - } - - public String getLatestAlias() { - return tableHistory.get(tableHistory.size()-1).alias; - } - - public void scanNestedTable(List currentPath) { - RelOptTable relOptTable = catalogResolver.getTableFromPath(currentPath); - if (relOptTable == null) { - throw new RuntimeException("Could not find table: " + currentPath); - } - String tableName = relOptTable.getQualifiedName().get(0); - SqlNode table = new SqlIdentifier(tableName, SqlParserPos.ZERO); - - String alias = "_t"+aliasInt.incrementAndGet(); - - SqlCall aliasedCall = SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, table, new SqlIdentifier(alias, SqlParserPos.ZERO)); - Frame frame = new Frame(false, relOptTable.getRowType(), aliasedCall, alias); - stack.push(frame); - tableHistory.add(frame); - } - - @Value - public class Frame { - boolean subquery; - RelDataType type; - SqlNode node; - String alias; - } -} diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java index be1dee912..0a6bc8ae7 100644 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java @@ -9,10 +9,11 @@ import com.datasqrl.calcite.SqrlTableFactory; import com.datasqrl.calcite.SqrlToSql; import com.datasqrl.calcite.SqrlToSql.Result; -import com.datasqrl.calcite.TablePathBuilder; +import com.datasqrl.calcite.NormalizeTablePath; import com.datasqrl.calcite.TimestampAssignableTable; import com.datasqrl.calcite.function.SqrlTableMacro; import com.datasqrl.calcite.schema.SqrlListUtil; +import com.datasqrl.calcite.sqrl.PathToSql; import com.datasqrl.calcite.visitor.SqlNodeVisitor; import com.datasqrl.canonicalizer.NamePath; import com.datasqrl.canonicalizer.ReservedName; @@ -25,7 +26,6 @@ import com.datasqrl.schema.Multiplicity; import com.datasqrl.schema.Relationship; import com.datasqrl.util.SqlNameUtil; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.Supplier; @@ -36,7 +36,6 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.Function; -import org.apache.calcite.schema.FunctionParameter; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqrlAssignTimestamp; @@ -110,13 +109,13 @@ public Void visit(SqrlAssignment assignment, Void context) { SqlNode node = validator.getPreprocessSql().get(assignment); boolean materializeSelf = validator.getIsMaterializeTable().get(assignment); List parentPath = getParentPath(assignment); - TablePathBuilder tablePathBuilder = new TablePathBuilder(planner.getCatalogReader(), - validator.getParamMapping(), framework.getUniquePkId()); - - SqrlToSql sqrlToSql = new SqrlToSql(planner.getCatalogReader(), planner.getOperatorTable(), tablePathBuilder, - validator.getParameters().get(assignment)); + NormalizeTablePath normalizeTablePath = new NormalizeTablePath(planner.getCatalogReader(), + validator.getParamMapping()); + SqrlToSql sqrlToSql = new SqrlToSql(planner.getCatalogReader(), planner.getOperatorTable(), + normalizeTablePath, validator.getParameters().get(assignment), framework.getUniquePkId()); Result result = sqrlToSql.rewrite(node, materializeSelf, parentPath); + System.out.println(planner.sqlToString(Dialect.CALCITE, result.getSqlNode())); RelNode relNode = planner.plan(Dialect.CALCITE, result.getSqlNode()); RelNode expanded = planner.expandMacros(relNode); diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/schema/sql/SqlBuilders.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/schema/sql/SqlBuilders.java index 46842ebdd..80f04707f 100644 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/schema/sql/SqlBuilders.java +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/schema/sql/SqlBuilders.java @@ -191,16 +191,16 @@ public void prependSelect(List keysToPullUp) { private List mapToIdentifier(List keysToPullUp) { return keysToPullUp.stream() - .map(n->new SqlIdentifier(n.getColumnName(), SqlParserPos.ZERO)) + .map(n->new SqlIdentifier(n.getDisplayName(), SqlParserPos.ZERO)) .collect(Collectors.toList()); } private List mapToSelectIdentifier(List keysToPullUp) { return keysToPullUp.stream() - .map(n->SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, - new SqlIdentifier(n.getColumnName(), SqlParserPos.ZERO), - new SqlIdentifier(n.getDisplayName(), SqlParserPos.ZERO) - )) + .map(n-> + SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, + new SqlIdentifier(n.getDisplayName(), SqlParserPos.ZERO), + new SqlIdentifier(n.getFinalName(), SqlParserPos.ZERO))) .collect(Collectors.toList()); } @@ -220,6 +220,7 @@ public void prependOrder(List keysToPullUp) { public boolean hasGroup() { return select.getGroup() != null && !select.getGroup().getList().isEmpty(); } + public boolean hasOrder() { return select.getOrderList() != null && !select.getOrderList().getList().isEmpty(); } diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/sqrl/PathToSql.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/sqrl/PathToSql.java new file mode 100644 index 000000000..bc799bf88 --- /dev/null +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/sqrl/PathToSql.java @@ -0,0 +1,72 @@ +package com.datasqrl.calcite.sqrl; + +import com.datasqrl.calcite.NormalizeTablePath.PathItem; +import com.datasqrl.calcite.NormalizeTablePath.SelfTablePathItem; +import com.datasqrl.calcite.NormalizeTablePath.TableFunctionPathItem; +import com.datasqrl.canonicalizer.ReservedName; +import java.util.List; +import java.util.Stack; +import lombok.AllArgsConstructor; +import lombok.Value; +import org.apache.calcite.sql.JoinConditionType; +import org.apache.calcite.sql.JoinType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParserPos; + +@AllArgsConstructor +public class PathToSql { + final Stack stack = new Stack<>(); + + public SqlNode build(List pathItems) { + for (int i = 0; i < pathItems.size(); i++) { + PathItem item = pathItems.get(i); + if (item instanceof SelfTablePathItem) { + SelfTablePathItem selfTablePathItem = (SelfTablePathItem) item; + String tableName = selfTablePathItem.getTable().getQualifiedName().get(0); + SqlNode table = new SqlIdentifier(tableName, SqlParserPos.ZERO); + + SqlCall aliasedCall = SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, table, + new SqlIdentifier(ReservedName.SELF_IDENTIFIER.getCanonical(), SqlParserPos.ZERO)); + stack.push(aliasedCall); + } else if (item instanceof TableFunctionPathItem) { + TableFunctionPathItem tableFncItm = (TableFunctionPathItem) item; + SqlCall call = tableFncItm.getOp().createCall(SqlParserPos.ZERO, tableFncItm.getArguments()); + call = SqlStdOperatorTable.COLLECTION_TABLE.createCall(SqlParserPos.ZERO, call); + SqlCall aliasedCall = SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, call, new SqlIdentifier(tableFncItm.getAlias(), + SqlParserPos.ZERO)); + stack.push(aliasedCall); + } + if (i > 0) { + joinLateral(); + } + } + + SqlNode sqlNode = stack.pop(); + return sqlNode; + } + + @Value + public class PathToSqlResult { + SqlNode sqlNode; + } + + public void joinLateral() { + SqlNode right = stack.pop(); + SqlNode left = stack.pop(); + + SqlJoin join = new SqlJoin(SqlParserPos.ZERO, + left, + SqlLiteral.createBoolean(false, SqlParserPos.ZERO), + JoinType.DEFAULT.symbol(SqlParserPos.ZERO), + SqlStdOperatorTable.LATERAL.createCall(SqlParserPos.ZERO, right), + JoinConditionType.NONE.symbol(SqlParserPos.ZERO), + null); + + stack.push(join); + } +} diff --git a/sqrl-planner/sqrl-planner-local/src/test/resources/snapshots/com/datasqrl/plan/local/analyze/ResolveTest/tableFunctionsBasic.txt b/sqrl-planner/sqrl-planner-local/src/test/resources/snapshots/com/datasqrl/plan/local/analyze/ResolveTest/tableFunctionsBasic.txt index e8f520058..1afc0fdda 100644 --- a/sqrl-planner/sqrl-planner-local/src/test/resources/snapshots/com/datasqrl/plan/local/analyze/ResolveTest/tableFunctionsBasic.txt +++ b/sqrl-planner/sqrl-planner-local/src/test/resources/snapshots/com/datasqrl/plan/local/analyze/ResolveTest/tableFunctionsBasic.txt @@ -1,10 +1,8 @@ LogicalProject(_uuid=[$0], _ingest_time=[$1], id=[$2], customerid=[$3], time=[$4], entries=[$5]) LogicalFilter(condition=[>($2, ?0)]) - LogicalProject(_uuid=[$0], _ingest_time=[$1], id=[$2], customerid=[$3], time=[$4], entries=[$5]) - LogicalTableScan(table=[[orders$2]]) + LogicalTableScan(table=[[orders$2]]) LogicalProject(id=[$2], delta=[-($2, ?0)], time=[$4]) LogicalFilter(condition=[>($2, ?0)]) - LogicalProject(_uuid=[$0], _ingest_time=[$1], id=[$2], customerid=[$3], time=[$4], entries=[$5]) - LogicalTableScan(table=[[orders$2]]) + LogicalTableScan(table=[[orders$2]]) From 89b5a7cafe3813a2063a77513253d942e5a59015 Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Fri, 27 Oct 2023 17:02:10 -0700 Subject: [PATCH 2/3] Slight cleanup Signed-off-by: Daniel Henneberger --- .../datasqrl/calcite/NormalizeTablePath.java | 4 +- .../com/datasqrl/calcite/sqrl/PathToSql.java | 47 ++++++++++--------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/NormalizeTablePath.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/NormalizeTablePath.java index faf45abee..6ce4e4df6 100644 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/NormalizeTablePath.java +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/NormalizeTablePath.java @@ -37,8 +37,7 @@ public class NormalizeTablePath { private static final String ALIAS_PREFIX = "_t"; private final CatalogResolver catalogResolver; private final Map paramMapping; - final AtomicInteger aliasInt = new AtomicInteger(0); - + private final AtomicInteger aliasInt = new AtomicInteger(0); public NormalizeTablePath(CatalogResolver catalogResolver, Map paramMapping) { this.catalogResolver = catalogResolver; @@ -49,7 +48,6 @@ public TablePathResult convert(List items, Context context, List params = new ArrayList<>(parameters); PathWalker pathWalker = new PathWalker(catalogResolver); - List pathItems = mapToPathItems(context, items, params, pathWalker); return new TablePathResult(params, pathItems, pathWalker.getAbsolutePath()); } diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/sqrl/PathToSql.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/sqrl/PathToSql.java index bc799bf88..dc79f243f 100644 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/sqrl/PathToSql.java +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/sqrl/PathToSql.java @@ -4,8 +4,9 @@ import com.datasqrl.calcite.NormalizeTablePath.SelfTablePathItem; import com.datasqrl.calcite.NormalizeTablePath.TableFunctionPathItem; import com.datasqrl.canonicalizer.ReservedName; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; -import java.util.Stack; import lombok.AllArgsConstructor; import lombok.Value; import org.apache.calcite.sql.JoinConditionType; @@ -20,53 +21,57 @@ @AllArgsConstructor public class PathToSql { - final Stack stack = new Stack<>(); + private final Deque stack = new ArrayDeque<>(); public SqlNode build(List pathItems) { for (int i = 0; i < pathItems.size(); i++) { PathItem item = pathItems.get(i); if (item instanceof SelfTablePathItem) { - SelfTablePathItem selfTablePathItem = (SelfTablePathItem) item; - String tableName = selfTablePathItem.getTable().getQualifiedName().get(0); + String tableName = ((SelfTablePathItem)item).getTable().getQualifiedName().get(0); SqlNode table = new SqlIdentifier(tableName, SqlParserPos.ZERO); - - SqlCall aliasedCall = SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, table, - new SqlIdentifier(ReservedName.SELF_IDENTIFIER.getCanonical(), SqlParserPos.ZERO)); + SqlCall aliasedCall = SqlStdOperatorTable.AS.createCall( + SqlParserPos.ZERO, + table, + new SqlIdentifier(ReservedName.SELF_IDENTIFIER.getCanonical(), SqlParserPos.ZERO) + ); stack.push(aliasedCall); } else if (item instanceof TableFunctionPathItem) { TableFunctionPathItem tableFncItm = (TableFunctionPathItem) item; SqlCall call = tableFncItm.getOp().createCall(SqlParserPos.ZERO, tableFncItm.getArguments()); call = SqlStdOperatorTable.COLLECTION_TABLE.createCall(SqlParserPos.ZERO, call); - SqlCall aliasedCall = SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO, call, new SqlIdentifier(tableFncItm.getAlias(), - SqlParserPos.ZERO)); + SqlCall aliasedCall = SqlStdOperatorTable.AS.createCall( + SqlParserPos.ZERO, + call, + new SqlIdentifier(tableFncItm.getAlias(), SqlParserPos.ZERO) + ); stack.push(aliasedCall); } if (i > 0) { joinLateral(); } } - - SqlNode sqlNode = stack.pop(); - return sqlNode; - } - - @Value - public class PathToSqlResult { - SqlNode sqlNode; + return stack.pop(); } - public void joinLateral() { + private void joinLateral() { SqlNode right = stack.pop(); SqlNode left = stack.pop(); - SqlJoin join = new SqlJoin(SqlParserPos.ZERO, + SqlJoin join = new SqlJoin( + SqlParserPos.ZERO, left, SqlLiteral.createBoolean(false, SqlParserPos.ZERO), JoinType.DEFAULT.symbol(SqlParserPos.ZERO), SqlStdOperatorTable.LATERAL.createCall(SqlParserPos.ZERO, right), JoinConditionType.NONE.symbol(SqlParserPos.ZERO), - null); + null + ); stack.push(join); } -} + + @Value + public static class PathToSqlResult { + SqlNode sqlNode; + } +} \ No newline at end of file From 403afb1533e95564ed9f8f374164a5692705247c Mon Sep 17 00:00:00 2001 From: Daniel Henneberger Date: Fri, 27 Oct 2023 18:16:59 -0700 Subject: [PATCH 3/3] remove print Signed-off-by: Daniel Henneberger --- .../src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java index 0a6bc8ae7..3ccc644cc 100644 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/plan/ScriptPlanner.java @@ -115,7 +115,6 @@ public Void visit(SqrlAssignment assignment, Void context) { normalizeTablePath, validator.getParameters().get(assignment), framework.getUniquePkId()); Result result = sqrlToSql.rewrite(node, materializeSelf, parentPath); - System.out.println(planner.sqlToString(Dialect.CALCITE, result.getSqlNode())); RelNode relNode = planner.plan(Dialect.CALCITE, result.getSqlNode()); RelNode expanded = planner.expandMacros(relNode);