Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of table path sql building #389

Merged
merged 3 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package com.datasqrl.calcite;

import static com.datasqrl.plan.validate.ScriptValidator.isSelfField;

import com.datasqrl.calcite.SqrlToSql.Context;
import com.datasqrl.calcite.schema.PathWalker;
import com.datasqrl.calcite.schema.sql.SqlDataTypeSpecBuilder;
import com.datasqrl.calcite.sqrl.CatalogResolver;
import com.datasqrl.canonicalizer.ReservedName;
import com.datasqrl.function.SqrlFunctionParameter;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Value;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.FunctionParameter;
import org.apache.calcite.schema.TableFunction;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDynamicParam;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;

/**
* Converts a table path to a normalized representation.
*/
public class NormalizeTablePath {
private static final String ALIAS_PREFIX = "_t";
private final CatalogResolver catalogResolver;
private final Map<FunctionParameter, SqlDynamicParam> paramMapping;
private final AtomicInteger aliasInt = new AtomicInteger(0);

public NormalizeTablePath(CatalogResolver catalogResolver, Map<FunctionParameter, SqlDynamicParam> paramMapping) {
this.catalogResolver = catalogResolver;
this.paramMapping = paramMapping;
}

public TablePathResult convert(List<SqlNode> items, Context context, List<FunctionParameter> parameters) {
// Map items to higher level objects, then walk the objects
List<FunctionParameter> params = new ArrayList<>(parameters);
PathWalker pathWalker = new PathWalker(catalogResolver);
List<PathItem> pathItems = mapToPathItems(context, items, params, pathWalker);
return new TablePathResult(params, pathItems, pathWalker.getAbsolutePath());
}

private List<PathItem> mapToPathItems(Context context, List<SqlNode> items, List<FunctionParameter> params,
PathWalker pathWalker) {
List<PathItem> pathItems = new ArrayList<>();
Iterator<SqlNode> input = items.iterator();

SqlNode item = input.next();
String identifier = getIdentifier(item)
.orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented"));

String alias;
if (catalogResolver.getTableFunction(List.of(identifier)).isPresent()) {
SqlUserDefinedTableFunction op = catalogResolver.getTableFunction(List.of(identifier)).get();
alias = generateAlias();
pathItems.add(new TableFunctionPathItem(pathWalker.getPath(), op, List.of(), alias));
pathWalker.walk(identifier);
} else if (context.hasAlias(identifier)) {
// For tables that start with an alias e.g. `o.entries`
if (!input.hasNext()) {
throw new RuntimeException("Alias by itself.");
}
alias = generateAlias();
// Get absolute path of alias `o`
pathWalker.setPath(context.getAliasPath(identifier));
String nextIdentifier = getIdentifier(input.next())
.orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented"));
// Update path walker
pathWalker.walk(nextIdentifier);

// Lookup function
Optional<SqlUserDefinedTableFunction> fnc = catalogResolver.getTableFunction(pathWalker.getPath());
Preconditions.checkState(fnc.isPresent(), "Table function not found %s", pathWalker.getPath());

// Rewrite arguments so internal arguments are prefixed with the alias
List<SqlNode> args = rewriteArgs(identifier, fnc.get().getFunction(), context, params);

pathItems.add(new TableFunctionPathItem(pathWalker.getPath(), fnc.get(), args, alias));
} else if (identifier.equals(ReservedName.SELF_IDENTIFIER.getCanonical())) {
//Tables that start with '@'
pathWalker.setPath(context.getCurrentPath());
// Treat '@' as something to add to the table path list
boolean materializeSelf = context.isMaterializeSelf();

if (materializeSelf || !input.hasNext()) {
// Do a table scan on the source table
RelOptTable table = catalogResolver.getTableFromPath(pathWalker.getPath());
pathItems.add(new SelfTablePathItem(pathWalker.getPath(), table));
alias = ReservedName.SELF_IDENTIFIER.getCanonical();
} else {
String nextIdentifier = getIdentifier(input.next())
.orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented"));
pathWalker.walk(nextIdentifier);

Optional<SqlUserDefinedTableFunction> fnc = catalogResolver.getTableFunction(pathWalker.getAbsolutePath());
Preconditions.checkState(fnc.isPresent(), "Table function not found %s", pathWalker.getPath());

alias = generateAlias();
// Rewrite arguments
List<SqlNode> args = rewriteArgs(ReservedName.SELF_IDENTIFIER.getCanonical(), fnc.get().getFunction(), context,
params);
pathItems.add(new TableFunctionPathItem(pathWalker.getPath(), fnc.get(), args, alias));
}
} else {
throw new RuntimeException("Unknown table: " + item);
}

while (input.hasNext()) {
item = input.next();
String nextIdentifier = getIdentifier(item)
.orElseThrow(() -> new RuntimeException("Subqueries are not yet implemented"));
pathWalker.walk(nextIdentifier);

Optional<SqlUserDefinedTableFunction> fnc = catalogResolver.getTableFunction(pathWalker.getPath());
List<SqlNode> args = rewriteArgs(alias, fnc.get().getFunction(), context, params);
String newAlias = generateAlias();
pathItems.add(new TableFunctionPathItem(pathWalker.getPath(), fnc.get(), args, newAlias));
alias = newAlias;
}

return pathItems;
}

private List<SqlNode> rewriteArgs(String alias, TableFunction function, Context context,
List<FunctionParameter> params) {
//if arg needs to by a dynamic expression, rewrite.
List<SqlNode> nodes = new ArrayList<>();
for (FunctionParameter parameter : function.getParameters()) {
SqrlFunctionParameter p = (SqrlFunctionParameter) parameter;
SqlIdentifier identifier = new SqlIdentifier(List.of(alias, p.getName()),
SqlParserPos.ZERO);
SqlNode rewritten = context.isMaterializeSelf()
? identifier
: rewriteToDynamicParam(identifier, params);
nodes.add(rewritten);
}
return nodes;
}

public SqlNode rewriteToDynamicParam(SqlIdentifier id, List<FunctionParameter> params) {
//if self, check if param list, if not create one
if (!isSelfField(id.names)) {
return id;
}

for (FunctionParameter p : params) {
if (paramMapping.containsKey(p)) {
return paramMapping.get(p);
}
}

RelDataType anyType = catalogResolver.getTypeFactory().createSqlType(SqlTypeName.ANY);
SqrlFunctionParameter functionParameter = new SqrlFunctionParameter(id.names.get(1), Optional.empty(),
SqlDataTypeSpecBuilder.create(anyType), params.size(), anyType, true);
params.add(functionParameter);
SqlDynamicParam param = new SqlDynamicParam(functionParameter.getOrdinal(),
id.getParserPosition());
paramMapping.put(functionParameter, param);

return param;
}

private Optional<String> getIdentifier(SqlNode item) {
if (item instanceof SqlIdentifier) {
return Optional.of(((SqlIdentifier) item).getSimple());
} else if (item instanceof SqlCall) {
return Optional.of(((SqlCall) item).getOperator().getName());
}

return Optional.empty();
}

private String generateAlias() {
return ALIAS_PREFIX + aliasInt.incrementAndGet();
}


public interface PathItem {
String getAlias();
}

@AllArgsConstructor
@Getter
public class TableFunctionPathItem implements PathItem {
List<String> path;
SqlUserDefinedTableFunction op;
List<SqlNode> arguments;
String alias;
}

@AllArgsConstructor
@Getter
public class SelfTablePathItem implements PathItem {
List<String> path;
RelOptTable table;

@Override
public String getAlias() {
return ReservedName.SELF_IDENTIFIER.getCanonical();
}
}

@Value
public class TablePathResult {
List<FunctionParameter> params;
List<PathItem> pathItems;
List<String> path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@

import com.datasqrl.calcite.SqrlToSql.Context;
import com.datasqrl.calcite.SqrlToSql.Result;
import com.datasqrl.calcite.NormalizeTablePath.PathItem;
import com.datasqrl.calcite.NormalizeTablePath.SelfTablePathItem;
import com.datasqrl.calcite.NormalizeTablePath.TablePathResult;
import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlAliasCallBuilder;
import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlCallBuilder;
import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlJoinBuilder;
import com.datasqrl.calcite.schema.sql.SqlBuilders.SqlSelectBuilder;
import com.datasqrl.calcite.sqrl.PathToSql;
import com.datasqrl.calcite.visitor.SqlNodeVisitor;
import com.datasqrl.calcite.visitor.SqlRelationVisitor;
import com.datasqrl.canonicalizer.ReservedName;
import com.datasqrl.plan.hints.TopNHint.Type;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
Expand All @@ -22,12 +27,15 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.AllArgsConstructor;
import lombok.Value;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.schema.FunctionParameter;
import org.apache.calcite.sql.CalciteFixes;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlHint;
Expand All @@ -48,15 +56,18 @@
public class SqrlToSql implements SqlRelationVisitor<Result, Context> {
private final CatalogReader catalogReader;
private final SqlOperatorTable operatorTable;
private final TablePathBuilder tablePathBuilder;
private final NormalizeTablePath normalizeTablePath;
private List<FunctionParameter> parameters;
private final AtomicInteger uniquePkId;

public SqrlToSql(CatalogReader catalogReader, SqlOperatorTable operatorTable,
TablePathBuilder tablePathBuilder, List<FunctionParameter> parameters) {
NormalizeTablePath normalizeTablePath, List<FunctionParameter> parameters,
AtomicInteger uniquePkId) {
this.catalogReader = catalogReader;
this.operatorTable = operatorTable;
this.tablePathBuilder = tablePathBuilder;
this.normalizeTablePath = normalizeTablePath;
this.parameters = parameters;
this.uniquePkId = uniquePkId;
}

public Result rewrite(SqlNode query, boolean materializeSelf, List<String> currentPath) {
Expand Down Expand Up @@ -102,8 +113,7 @@ public Result visitQuerySpecification(SqlSelect call, Context context) {
result.condition.ifPresent(sqlSelectBuilder::appendWhere);
SqlSelect top = new SqlSelectBuilder().setFrom(sqlSelectBuilder.build()).setDistinctOnHint(hintOps).build();
return new Result(top, result.getCurrentPath(), List.of(), List.of(), Optional.empty(), result.params);
} else if (call.isKeywordPresent(SqlSelectKeyword.DISTINCT) ||
(context.isNested() && call.getFetch() != null)) {
} else if (call.isKeywordPresent(SqlSelectKeyword.DISTINCT) || (context.isNested() && call.getFetch() != null)) {
//if is nested, get primary key nodes
int keySize = context.isNested()
? catalogReader.getTableFromPath(context.currentPath).getKeys().get(0)
Expand Down Expand Up @@ -234,9 +244,62 @@ public Result visitTable(SqlIdentifier node, Context context) {
.map(name -> new SqlIdentifier(name, node.getComponentParserPosition(node.names.indexOf(name))))
.collect(Collectors.toList());

Result result = tablePathBuilder.build(items, context, parameters);
TablePathResult result = normalizeTablePath.convert(items, context, parameters);

PathToSql pathToSql = new PathToSql();
SqlNode sqlNode = pathToSql.build(result.getPathItems());

List<PullupColumn> pullupColumns = (context.isNested && result.getPathItems().get(0) instanceof SelfTablePathItem)
? buildPullupColumns((SelfTablePathItem)result.getPathItems().get(0))
: List.of();
// Wrap in a select to maintain sql semantics
if (!pullupColumns.isEmpty() || result.getPathItems().size() > 1) {
sqlNode = buildAndProjectLast(pullupColumns, sqlNode, result.getPathItems().get(0),
result.getPathItems().get(result.getPathItems().size()-1));
} else if (result.getPathItems().size() == 1) {
//Just a table by itself
if (sqlNode instanceof SqlBasicCall) {
sqlNode = ((SqlBasicCall) sqlNode).getOperandList().get(0);
}
}

this.parameters = result.getParams(); //update parameters with new parameters
return result;
return new Result(sqlNode, result.getPath(), pullupColumns,
List.of(), Optional.empty(), result.getParams());
}


public SqlNode buildAndProjectLast(List<PullupColumn> pullupCols, SqlNode sqlNode,
PathItem first, PathItem last) {

SqlSelectBuilder select = new SqlSelectBuilder()
.setFrom(sqlNode);

List<SqlNode> selectList = new ArrayList<>();
for (PullupColumn column : pullupCols) {
selectList.add(
SqlStdOperatorTable.AS.createCall(SqlParserPos.ZERO,
new SqlIdentifier(List.of(first.getAlias(), column.getColumnName()), SqlParserPos.ZERO),
new SqlIdentifier(column.getDisplayName(), SqlParserPos.ZERO)));
}

selectList.add(new SqlIdentifier(List.of(last.getAlias(), ""), SqlParserPos.ZERO));

select.setSelectList(selectList);
return select.build();
}

private List<PullupColumn> buildPullupColumns(SelfTablePathItem selfTablePathItem) {
RelOptTable table = selfTablePathItem.table;
return IntStream.range(0, table.getKeys().get(0).asSet().size())
.mapToObj(i -> new PullupColumn(
table.getRowType().getFieldList().get(i).getName(),
String.format("%spk%d$%s", ReservedName.SYSTEM_HIDDEN_PREFIX, uniquePkId.incrementAndGet(),
table.getRowType().getFieldList().get(i).getName()),
String.format("%spk%d$%s", ReservedName.SYSTEM_HIDDEN_PREFIX, i + 1,
table.getRowType().getFieldList().get(i).getName())
))
.collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -314,9 +377,9 @@ public Result visitSetOperation(SqlCall node, Context context) {

@Value
public static class PullupColumn {

String columnName;
String displayName;
String finalName;
}

@AllArgsConstructor
Expand Down
Loading
Loading