Skip to content

Commit

Permalink
Honor existing interfaces for timestamp assignment.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroecheler authored and henneberger committed Oct 13, 2023
1 parent b8031b7 commit 174ae23
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.apache.calcite.rex.RexNode;

public interface ModifiableTable {
void addColumn(String name, RexNode column, RelDataTypeFactory typeFactory);
int addColumn(String name, RexNode column, RelDataTypeFactory typeFactory);

SQRLTable getSqrlTable();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,17 @@ public Void visit(SqrlAssignTimestamp query, Void context) {
RexNode node = planner.planExpression(query.getTimestamp(), table.getRowType());
TimestampAssignableTable timestampAssignableTable = table.unwrap(TimestampAssignableTable.class);

int timestampIndex;
if (!(node instanceof RexInputRef) && query.getTimestampAlias().isEmpty()) {
addColumn(node, "_time", table);
timestampAssignableTable.assignTimestamp(-1);
timestampIndex = addColumn(node, "_time", table);
} else if (query.getTimestampAlias().isPresent()) {
//otherwise, add new column
addColumn(node, query.getTimestampAlias().get().getSimple(),
timestampIndex = addColumn(node, query.getTimestampAlias().get().getSimple(),
planner.getCatalogReader().getSqrlTable(tableName));
timestampAssignableTable.assignTimestamp(-1);
} else {
timestampAssignableTable.assignTimestamp(((RexInputRef) node).getIndex());
timestampIndex = ((RexInputRef) node).getIndex();
}
timestampAssignableTable.assignTimestamp(timestampIndex);

return null;
}
Expand Down Expand Up @@ -283,13 +283,13 @@ public SqlNode visit(SqlIdentifier id) {
return Pair.of(newParams, node);
}

private void addColumn(RexNode node, String cName, RelOptTable table) {
private int addColumn(RexNode node, String cName, RelOptTable table) {
if (table.unwrap(ModifiableTable.class) != null) {
ModifiableTable table1 = (ModifiableTable) table.unwrap(Table.class);
SQRLTable sqrlTable = table1.getSqrlTable();
Column column = sqrlTable.addColumn(nameUtil.toName(cName), null, true, node.getType());
column.setVtName(column.getId().getCanonical());
table1.addColumn(column.getId().getCanonical(), node, framework.getTypeFactory());
return table1.addColumn(column.getId().getCanonical(), node, framework.getTypeFactory());
} else {
throw new RuntimeException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package com.datasqrl.plan.table;

import com.datasqrl.calcite.TimestampAssignableTable;
import com.datasqrl.canonicalizer.Name;
import com.datasqrl.engine.pipeline.ExecutionPipeline;
import com.datasqrl.engine.pipeline.ExecutionStage;
Expand All @@ -25,7 +26,7 @@
* <p>
* This is a phyiscal relation with a schema that captures the input data.
*/
public class ProxyImportRelationalTable extends PhysicalRelationalTable {
public class ProxyImportRelationalTable extends PhysicalRelationalTable implements TimestampAssignableTable {

@Getter
private final ImportedRelationalTableImpl baseTable;
Expand All @@ -37,12 +38,6 @@ public ProxyImportRelationalTable(@NonNull Name rootTableId, @NonNull Name table
this.baseTable = baseTable;
}

public int addTimestampColumn(AddedColumn column, @NonNull RelDataTypeFactory typeFactory) {
int index = super.addColumn(column, typeFactory);
this.timestamp = TimestampInference.buildImport().addImport(index, 100).build();
return index;
}

@Override
public List<ExecutionStage> getSupportedStages(ExecutionPipeline pipeline, ErrorCollector errors) {
List<ExecutionStage> stages = pipeline.getStages().stream().filter(stage ->
Expand All @@ -59,4 +54,9 @@ public ConfigBuilder getBaseConfig() {
getAssignedStage().ifPresent(stage -> builder.stage(stage));
return builder;
}

@Override
public void assignTimestamp(int index) {
this.timestamp = TimestampInference.buildImport().addImport(index, 100).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ public Iterable<AddedColumn> getAddedColumns() {
return addedColumns;
}

public void addColumn(String name, RexNode column, RelDataTypeFactory typeFactory) {
addColumn(new AddedColumn(name, column), typeFactory);
@Override
public int addColumn(String name, RexNode column, RelDataTypeFactory typeFactory) {
return addColumn(new AddedColumn(name, column), typeFactory);
}

public abstract TableStatistic getTableStatistic();
Expand Down

0 comments on commit 174ae23

Please sign in to comment.