From 174ae2335c1b565259bf347874a6c2fe76b33227 Mon Sep 17 00:00:00 2001 From: Matthias Broecheler Date: Wed, 4 Oct 2023 15:32:47 -0700 Subject: [PATCH] Honor existing interfaces for timestamp assignment. --- .../java/com/datasqrl/calcite/ModifiableTable.java | 2 +- .../com/datasqrl/calcite/schema/ScriptPlanner.java | 14 +++++++------- .../plan/table/ProxyImportRelationalTable.java | 14 +++++++------- .../datasqrl/plan/table/ScriptRelationalTable.java | 5 +++-- 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/ModifiableTable.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/ModifiableTable.java index f16c7e134..86ec4eb5e 100644 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/ModifiableTable.java +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/ModifiableTable.java @@ -5,7 +5,7 @@ import org.apache.calcite.rex.RexNode; public interface ModifiableTable { - void addColumn(String name, RexNode column, RelDataTypeFactory typeFactory); + int addColumn(String name, RexNode column, RelDataTypeFactory typeFactory); SQRLTable getSqrlTable(); diff --git a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/schema/ScriptPlanner.java b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/schema/ScriptPlanner.java index 611fd8ec9..d7cf1701f 100644 --- a/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/schema/ScriptPlanner.java +++ b/sqrl-planner/sqrl-common/src/main/java/com/datasqrl/calcite/schema/ScriptPlanner.java @@ -131,17 +131,17 @@ public Void visit(SqrlAssignTimestamp query, Void context) { RexNode node = planner.planExpression(query.getTimestamp(), table.getRowType()); TimestampAssignableTable timestampAssignableTable = table.unwrap(TimestampAssignableTable.class); + int timestampIndex; if (!(node instanceof RexInputRef) && query.getTimestampAlias().isEmpty()) { - addColumn(node, "_time", table); - timestampAssignableTable.assignTimestamp(-1); + timestampIndex = addColumn(node, "_time", table); } else if (query.getTimestampAlias().isPresent()) { //otherwise, add new column - addColumn(node, query.getTimestampAlias().get().getSimple(), + timestampIndex = addColumn(node, query.getTimestampAlias().get().getSimple(), planner.getCatalogReader().getSqrlTable(tableName)); - timestampAssignableTable.assignTimestamp(-1); } else { - timestampAssignableTable.assignTimestamp(((RexInputRef) node).getIndex()); + timestampIndex = ((RexInputRef) node).getIndex(); } + timestampAssignableTable.assignTimestamp(timestampIndex); return null; } @@ -283,13 +283,13 @@ public SqlNode visit(SqlIdentifier id) { return Pair.of(newParams, node); } - private void addColumn(RexNode node, String cName, RelOptTable table) { + private int addColumn(RexNode node, String cName, RelOptTable table) { if (table.unwrap(ModifiableTable.class) != null) { ModifiableTable table1 = (ModifiableTable) table.unwrap(Table.class); SQRLTable sqrlTable = table1.getSqrlTable(); Column column = sqrlTable.addColumn(nameUtil.toName(cName), null, true, node.getType()); column.setVtName(column.getId().getCanonical()); - table1.addColumn(column.getId().getCanonical(), node, framework.getTypeFactory()); + return table1.addColumn(column.getId().getCanonical(), node, framework.getTypeFactory()); } else { throw new RuntimeException(); } diff --git a/sqrl-planner/sqrl-planner-local/src/main/java/com/datasqrl/plan/table/ProxyImportRelationalTable.java b/sqrl-planner/sqrl-planner-local/src/main/java/com/datasqrl/plan/table/ProxyImportRelationalTable.java index 3527eee9c..a0084836a 100644 --- a/sqrl-planner/sqrl-planner-local/src/main/java/com/datasqrl/plan/table/ProxyImportRelationalTable.java +++ b/sqrl-planner/sqrl-planner-local/src/main/java/com/datasqrl/plan/table/ProxyImportRelationalTable.java @@ -3,6 +3,7 @@ */ package com.datasqrl.plan.table; +import com.datasqrl.calcite.TimestampAssignableTable; import com.datasqrl.canonicalizer.Name; import com.datasqrl.engine.pipeline.ExecutionPipeline; import com.datasqrl.engine.pipeline.ExecutionStage; @@ -25,7 +26,7 @@ *

* This is a phyiscal relation with a schema that captures the input data. */ -public class ProxyImportRelationalTable extends PhysicalRelationalTable { +public class ProxyImportRelationalTable extends PhysicalRelationalTable implements TimestampAssignableTable { @Getter private final ImportedRelationalTableImpl baseTable; @@ -37,12 +38,6 @@ public ProxyImportRelationalTable(@NonNull Name rootTableId, @NonNull Name table this.baseTable = baseTable; } - public int addTimestampColumn(AddedColumn column, @NonNull RelDataTypeFactory typeFactory) { - int index = super.addColumn(column, typeFactory); - this.timestamp = TimestampInference.buildImport().addImport(index, 100).build(); - return index; - } - @Override public List getSupportedStages(ExecutionPipeline pipeline, ErrorCollector errors) { List stages = pipeline.getStages().stream().filter(stage -> @@ -59,4 +54,9 @@ public ConfigBuilder getBaseConfig() { getAssignedStage().ifPresent(stage -> builder.stage(stage)); return builder; } + + @Override + public void assignTimestamp(int index) { + this.timestamp = TimestampInference.buildImport().addImport(index, 100).build(); + } } diff --git a/sqrl-planner/sqrl-planner-local/src/main/java/com/datasqrl/plan/table/ScriptRelationalTable.java b/sqrl-planner/sqrl-planner-local/src/main/java/com/datasqrl/plan/table/ScriptRelationalTable.java index a6d0462e4..cadc215d0 100644 --- a/sqrl-planner/sqrl-planner-local/src/main/java/com/datasqrl/plan/table/ScriptRelationalTable.java +++ b/sqrl-planner/sqrl-planner-local/src/main/java/com/datasqrl/plan/table/ScriptRelationalTable.java @@ -70,8 +70,9 @@ public Iterable getAddedColumns() { return addedColumns; } - public void addColumn(String name, RexNode column, RelDataTypeFactory typeFactory) { - addColumn(new AddedColumn(name, column), typeFactory); + @Override + public int addColumn(String name, RexNode column, RelDataTypeFactory typeFactory) { + return addColumn(new AddedColumn(name, column), typeFactory); } public abstract TableStatistic getTableStatistic();